零基础学Python——第六章:Python标准库
第六章:Python标准库
6.1 日期时间处理
6.1.1 datetime模块
-
基本概念与用途
datetime模块提供了处理日期和时间的类和函数,可以轻松地创建、操作和格式化日期时间数据,就像我们在日常生活中管理时间一样。
# 导入datetime模块 import datetime# 获取当前日期和时间 now = datetime.datetime.now() print(f"当前日期和时间:{now}") # 输出类似:当前日期和时间:2023-07-15 14:30:45.123456
-
创建日期和时间对象
我们可以创建特定的日期、时间或日期时间对象,就像在日历上标记重要日期一样。
# 创建日期对象 birthday = datetime.date(1999, 12, 31) print(f"生日:{birthday}") # 输出:生日:1999-12-31# 创建时间对象 meeting_time = datetime.time(14, 30, 0) print(f"会议时间:{meeting_time}") # 输出:会议时间:14:30:00# 创建日期时间对象 appointment = datetime.datetime(2023, 7, 20, 10, 0, 0) print(f"预约时间:{appointment}") # 输出:预约时间:2023-07-20 10:00:00
-
日期时间运算
我们可以对日期和时间进行加减运算,计算两个日期之间的差异,就像计算活动倒计时一样。
# 导入timedelta用于日期时间运算 from datetime import datetime, timedelta# 当前时间 now = datetime.now()# 计算一周后的日期 one_week_later = now + timedelta(days=7) print(f"一周后:{one_week_later}")# 计算两小时前的时间 two_hours_ago = now - timedelta(hours=2) print(f"两小时前:{two_hours_ago}")# 计算两个日期之间的差异 future_date = datetime(2023, 12, 31, 23, 59, 59) time_difference = future_date - now print(f"距离2023年结束还有:{time_difference.days}天")
-
日期时间格式化
我们可以将日期时间对象转换为特定格式的字符串,或者将字符串解析为日期时间对象,就像在不同国家使用不同的日期表示方式一样。
from datetime import datetime# 日期时间对象格式化为字符串 now = datetime.now() formatted_date = now.strftime("%Y年%m月%d日 %H:%M:%S") print(f"格式化日期:{formatted_date}") # 输出类似:格式化日期:2023年07月15日 14:30:45# 字符串解析为日期时间对象 date_string = "2023-10-01 08:00:00" parsed_date = datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S") print(f"解析的日期:{parsed_date}") # 输出:解析的日期:2023-10-01 08:00:00
6.1.2 time模块
-
基本时间函数
time模块提供了各种与时间相关的函数,包括获取当前时间戳、休眠等功能。
import time# 获取当前时间戳(从1970年1月1日00:00:00 UTC开始的秒数) timestamp = time.time() print(f"当前时间戳:{timestamp}") # 输出类似:当前时间戳:1689418245.123456# 将时间戳转换为本地时间 local_time = time.localtime(timestamp) print(f"本地时间:{local_time}") # 输出时间的结构化表示# 格式化时间 formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", local_time) print(f"格式化时间:{formatted_time}") # 输出类似:格式化时间:2023-07-15 14:30:45
-
程序计时与休眠
time模块可以用于测量代码执行时间或让程序暂停一段时间,就像使用秒表或闹钟一样。
import time# 测量代码执行时间 start_time = time.time()# 模拟耗时操作 for i in range(1000000):passend_time = time.time() execution_time = end_time - start_time print(f"代码执行时间:{execution_time:.6f}秒")# 程序休眠(暂停执行) print("程序将暂停3秒...") time.sleep(3) # 暂停3秒 print("程序继续执行")
6.1.3 calendar模块
-
日历功能
calendar模块提供了与日历相关的功能,可以生成月历或年历,判断闰年等,就像我们使用实体日历一样。
import calendar# 判断是否为闰年 is_leap = calendar.isleap(2024) print(f"2024年是闰年吗?{is_leap}") # 输出:2024年是闰年吗?True# 获取某月的日历(返回多行字符串) month_calendar = calendar.month(2023, 7) print("2023年7月的日历:") print(month_calendar)# 获取某年的日历(返回多行字符串) # year_calendar = calendar.calendar(2023) # 这会生成很长的输出 # print(year_calendar)# 获取某月第一天是星期几(0是星期一,6是星期日) first_weekday = calendar.monthrange(2023, 7)[0] print(f"2023年7月第一天是星期{first_weekday + 1}") # 输出:2023年7月第一天是星期6# 获取某月的天数 days_in_month = calendar.monthrange(2023, 7)[1] print(f"2023年7月有{days_in_month}天") # 输出:2023年7月有31天
6.1.4 实际应用案例:日程管理系统
# 简单的日程管理系统
import datetimeclass ScheduleManager:def __init__(self):self.schedules = {} # 用字典存储日程,键为日期,值为该日期的日程列表def add_schedule(self, date_str, time_str, description):"""添加日程"""try:# 解析日期和时间date_obj = datetime.datetime.strptime(date_str, "%Y-%m-%d").date()time_obj = datetime.datetime.strptime(time_str, "%H:%M").time()# 创建完整的日期时间对象datetime_obj = datetime.datetime.combine(date_obj, time_obj)# 添加到日程字典if date_obj not in self.schedules:self.schedules[date_obj] = []self.schedules[date_obj].append((datetime_obj, description))print(f"成功添加日程:{date_str} {time_str} - {description}")except ValueError as e:print(f"日期或时间格式错误:{e}")def view_schedules(self, date_str=None):"""查看指定日期或所有日程"""if date_str:try:# 查看特定日期的日程date_obj = datetime.datetime.strptime(date_str, "%Y-%m-%d").date()if date_obj in self.schedules:print(f"\n{date_str}的日程:")# 按时间排序sorted_schedules = sorted(self.schedules[date_obj], key=lambda x: x[0])for idx, (dt, desc) in enumerate(sorted_schedules, 1):print(f" {idx}. {dt.strftime('%H:%M')} - {desc}")else:print(f"\n{date_str}没有安排日程")except ValueError:print("日期格式错误,请使用YYYY-MM-DD格式")else:# 查看所有日程,按日期排序if not self.schedules:print("\n没有任何日程安排")returnprint("\n所有日程安排:")for date in sorted(self.schedules.keys()):print(f"\n{date.strftime('%Y-%m-%d')}:")sorted_schedules = sorted(self.schedules[date], key=lambda x: x[0])for idx, (dt, desc) in enumerate(sorted_schedules, 1):print(f" {idx}. {dt.strftime('%H:%M')} - {desc}")def upcoming_schedules(self, days=7):"""查看未来几天的日程"""today = datetime.date.today()end_date = today + datetime.timedelta(days=days)print(f"\n未来{days}天的日程安排:")has_schedules = Falsefor date in sorted(self.schedules.keys()):if today <= date < end_date:has_schedules = Trueprint(f"\n{date.strftime('%Y-%m-%d')}:")sorted_schedules = sorted(self.schedules[date], key=lambda x: x[0])for idx, (dt, desc) in enumerate(sorted_schedules, 1):print(f" {idx}. {dt.strftime('%H:%M')} - {desc}")if not has_schedules:print(f"未来{days}天没有日程安排")# 使用示例
if __name__ == "__main__":manager = ScheduleManager()# 添加一些日程manager.add_schedule("2023-07-20", "09:30", "团队会议")manager.add_schedule("2023-07-20", "12:00", "午餐与客户")manager.add_schedule("2023-07-21", "14:00", "项目演示")manager.add_schedule("2023-07-25", "10:00", "季度回顾会议")# 查看特定日期的日程manager.view_schedules("2023-07-20")# 查看未来7天的日程manager.upcoming_schedules()# 查看所有日程# manager.view_schedules()
6.2 数学计算
6.2.1 math模块
-
基本数学函数
math模块提供了各种数学函数和常量,用于执行常见的数学运算,就像我们使用科学计算器一样。
import math# 数学常量 print(f"圆周率π:{math.pi}") # 输出:圆周率π:3.141592653589793 print(f"自然对数的底e:{math.e}") # 输出:自然对数的底e:2.718281828459045# 基本数学函数 print(f"8的平方根:{math.sqrt(8)}") # 输出:8的平方根:2.8284271247461903 print(f"5的阶乘:{math.factorial(5)}") # 输出:5的阶乘:120 print(f"向上取整4.3:{math.ceil(4.3)}") # 输出:向上取整4.3:5 print(f"向下取整4.7:{math.floor(4.7)}") # 输出:向下取整4.7:4 print(f"绝对值-10:{math.fabs(-10)}") # 输出:绝对值-10:10.0
-
三角函数与角度转换
math模块提供了三角函数和角度转换函数,用于处理角度和三角计算,就像在几何学中使用的一样。
import math# 角度与弧度转换 angle_degrees = 45 angle_radians = math.radians(angle_degrees) # 度转弧度 print(f"{angle_degrees}度 = {angle_radians}弧度")back_to_degrees = math.degrees(angle_radians) # 弧度转度 print(f"{angle_radians}弧度 = {back_to_degrees}度")# 三角函数(参数为弧度) print(f"sin(45°) = {math.sin(angle_radians)}") # 输出接近0.7071 print(f"cos(45°) = {math.cos(angle_radians)}") # 输出接近0.7071 print(f"tan(45°) = {math.tan(angle_radians)}") # 输出接近1.0
-
对数与指数函数
math模块提供了对数和指数函数,用于处理指数增长或衰减的情况,如复利计算、人口增长等。
import math# 指数函数 print(f"e的2次方:{math.exp(2)}") # 输出:e的2次方:7.38905609893065 print(f"2的10次方:{math.pow(2, 10)}") # 输出:2的10次方:1024.0# 对数函数 print(f"ln(10):{math.log(10)}") # 自然对数,输出:ln(10):2.302585092994046 print(f"log10(100):{math.log10(100)}") # 以10为底的对数,输出:log10(100):2.0 print(f"log2(8):{math.log2(8)}") # 以2为底的对数,输出:log2(8):3.0
6.2.2 random模块
-
随机数生成
random模块提供了生成随机数的函数,可用于模拟随机事件,如掷骰子、抽奖等。
import random# 生成0到1之间的随机浮点数 random_float = random.random() print(f"随机浮点数:{random_float}") # 输出0到1之间的随机数# 生成指定范围内的随机整数 random_int = random.randint(1, 100) # 1到100之间的随机整数(包括1和100) print(f"随机整数(1-100):{random_int}")# 生成指定范围内的随机浮点数 random_range = random.uniform(10.5, 20.5) # 10.5到20.5之间的随机浮点数 print(f"随机浮点数(10.5-20.5):{random_range}")
-
随机选择与洗牌
random模块可以从序列中随机选择元素或打乱序列的顺序,就像抽签或洗牌一样。
import random# 从序列中随机选择一个元素 fruits = ["苹果", "香蕉", "橙子", "葡萄", "西瓜"] random_fruit = random.choice(fruits) print(f"随机选择的水果:{random_fruit}")# 从序列中随机选择多个元素(可能有重复) random_fruits = random.choices(fruits, k=3) print(f"随机选择的3个水果(可能重复):{random_fruits}")# 从序列中随机选择多个元素(不重复) random_sample = random.sample(fruits, k=3) print(f"随机选择的3个水果(不重复):{random_sample}")# 打乱序列的顺序 numbers = [1, 2, 3, 4, 5] random.shuffle(numbers) # 直接修改原列表 print(f"打乱后的数字:{numbers}")
-
随机种子设置
通过设置随机种子,可以使随机数生成具有可重复性,这在需要可重现结果的情况下很有用,如科学实验或调试。
import random# 设置随机种子 random.seed(42) # 使用固定的种子值# 生成随机数 print(f"第一次随机数:{random.random()}") # 每次运行程序,这个值都相同 print(f"第二次随机数:{random.random()}") # 每次运行程序,这个值都相同# 重新设置相同的种子 random.seed(42)# 再次生成随机数,结果与上面相同 print(f"重置种子后第一次:{random.random()}") # 与第一次相同 print(f"重置种子后第二次:{random.random()}") # 与第二次相同
6.2.3 statistics模块(Python 3.4+)
-
统计函数
statistics模块提供了计算基本统计量的函数,如平均值、中位数、标准差等,用于数据分析和统计。
import statistics# 准备数据 data = [2, 5, 7, 9, 11, 13, 15, 18, 20, 25]# 计算基本统计量 mean_value = statistics.mean(data) # 算术平均值 median_value = statistics.median(data) # 中位数 mode_value = statistics.mode([1, 2, 2, 3, 3, 3, 4]) # 众数(出现最多的值) variance_value = statistics.variance(data) # 方差 stdev_value = statistics.stdev(data) # 标准差print(f"数据:{data}") print(f"平均值:{mean_value}") print(f"中位数:{median_value}") print(f"众数:{mode_value}") print(f"方差:{variance_value}") print(f"标准差:{stdev_value}")
6.2.4 实际应用案例:简单数据分析工具
# 简单的数据分析工具
import math
import statistics
import random
import matplotlib.pyplot as plt # 需要安装matplotlib库:pip install matplotlibclass DataAnalyzer:def __init__(self, data=None):self.data = data if data is not None else []def load_data(self, data):"""加载数据"""self.data = dataprint(f"已加载{len(data)}个数据点")def generate_random_data(self, size=100, min_val=0, max_val=100):"""生成随机数据"""self.data = [random.uniform(min_val, max_val) for _ in range(size)]print(f"已生成{size}个随机数据点")def basic_statistics(self):"""计算基本统计量"""if not self.data:print("没有数据可分析")return {}stats = {"样本数": len(self.data),"最小值": min(self.data),"最大值": max(self.data),"总和": sum(self.data),"平均值": statistics.mean(self.data),"中位数": statistics.median(self.data),"方差": statistics.variance(self.data) if len(self.data) > 1 else 0,"标准差": statistics.stdev(self.data) if len(self.data) > 1 else 0,"四分位数": [sorted(self.data)[int(len(self.data) * 0.25)], # 第一四分位数sorted(self.data)[int(len(self.data) * 0.5)], # 第二四分位数(中位数)sorted(self.data)[int(len(self.data) * 0.75)] # 第三四分位数]}print("\n基本统计分析结果:")for key, value in stats.items():print(f"{key}: {value}")return statsdef plot_histogram(self, bins=10):"""绘制直方图"""if not self.data:print("没有数据可绘图")returnplt.figure(figsize=(10, 6))plt.hist(self.data, bins=bins, alpha=0.7, color='skyblue', edgecolor='black')plt.title('数据分布直方图')plt.xlabel('值')plt.ylabel('频率')plt.grid(True, linestyle='--', alpha=0.7)plt.show()def plot_box(self):"""绘制箱线图"""if not self.data:print("没有数据可绘图")returnplt.figure(figsize=(10, 6))plt.boxplot(self.data, vert=False, patch_artist=True, boxprops=dict(facecolor='skyblue'))plt.title('数据箱线图')plt.xlabel('值')plt.grid(True, linestyle='--', alpha=0.7)plt.show()def normalize_data(self, method='min-max'):"""数据归一化"""if not self.data:print("没有数据可归一化")return []if method == 'min-max':# Min-Max归一化:将数据缩放到[0,1]区间min_val = min(self.data)max_val = max(self.data)range_val = max_val - min_valif range_val == 0: # 避免除以零normalized = [0.5 for _ in self.data]else:normalized = [(x - min_val) / range_val for x in self.data]print("已使用Min-Max方法归一化数据")elif method == 'z-score':# Z-score标准化:将数据转换为均值为0,标准差为1的分布mean_val = statistics.mean(self.data)stdev_val = statistics.stdev(self.data) if len(self.data) > 1 else 1if stdev_val == 0: # 避免除以零normalized = [0 for _ in self.data]else:normalized = [(x - mean_val) / stdev_val for x in self.data]print("已使用Z-score方法标准化数据")else:print(f"不支持的归一化方法:{method}")return self.datareturn normalized# 使用示例
if __name__ == "__main__":analyzer = DataAnalyzer()# 生成随机数据analyzer.generate_random_data(size=1000, min_val=10, max_val=100)# 计算基本统计量analyzer.basic_statistics()# 绘制直方图analyzer.plot_histogram(bins=20)# 绘制箱线图analyzer.plot_box()# 数据归一化normalized_data = analyzer.normalize_data(method='min-max')# 可以创建新的分析器来分析归一化后的数据# norm_analyzer = DataAnalyzer(normalized_data)# norm_analyzer.basic_statistics()
6.3 正则表达式
6.3.1 re模块基础
-
正则表达式概述
正则表达式是一种强大的文本模式匹配工具,可以用来搜索、替换和验证文本,就像在文档中使用高级搜索功能一样。
# 正则表达式的生活例子 # 就像我们在图书馆中按照特定规则(如作者名、书名的一部分、出版年份等)查找书籍
-
基本模式匹配
re模块提供了使用正则表达式进行模式匹配的功能,可以检查字符串是否包含特定模式。
import re# 检查字符串是否包含特定模式 text = "我的电话号码是13812345678,请联系我" pattern = r"1\d{10}" # 匹配1开头的11位数字(简化的手机号码模式)if re.search(pattern, text):print("找到了手机号码") else:print("没有找到手机号码")# 查找所有匹配 emails = "联系方式:user1@example.com 和 admin@example.org,请注意保密" email_pattern = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"all_emails = re.findall(email_pattern, emails) print(f"找到的所有邮箱:{all_emails}") # 输出:找到的所有邮箱:['user1@example.com', 'admin@example.org']
-
正则表达式元字符
正则表达式使用特殊字符(元字符)来表示特定的模式,就像使用特殊符号来表示数学运算一样。
import re# 常用元字符示例 patterns = {r"\d": "匹配任意数字,等同于[0-9]",r"\w": "匹配任意字母、数字或下划线,等同于[a-zA-Z0-9_]",r"\s": "匹配任意空白字符(空格、制表符、换行符等)",r".": "匹配除换行符外的任意字符",r"^": "匹配字符串的开头",r"$": "匹配字符串的结尾",r"*": "匹配前面的模式零次或多次",r"+": "匹配前面的模式一次或多次",r"?": "匹配前面的模式零次或一次(可选)",r"{n}": "匹配前面的模式恰好n次",r"{n,}": "匹配前面的模式至少n次",r"{n,m}": "匹配前面的模式n到m次" }# 示例:验证身份证号码(简化版,仅检查18位数字或17位数字+X) id_pattern = r"^\d{17}(\d|X)$" valid_id = "11010119990101123X" invalid_id = "1101011999010112"print(f"{valid_id} 是有效身份证号码吗?{bool(re.match(id_pattern, valid_id))}") # 输出:True print(f"{invalid_id} 是有效身份证号码吗?{bool(re.match(id_pattern, invalid_id))}") # 输出:False
6.3.2 分组与捕获
-
使用括号分组
正则表达式中的括号可以用来分组和捕获匹配的子字符串,就像在句子中使用括号来强调特定部分一样。
import re# 使用分组提取信息 text = "我的生日是1999年12月31日" date_pattern = r"(\d{4})年(\d{1,2})月(\d{1,2})日"match = re.search(date_pattern, text) if match:year, month, day = match.groups()print(f"年:{year}, 月:{month}, 日:{day}") # 输出:年:1999, 月:12, 日:31# 通过组号访问print(f"年:{match.group(1)}, 月:{match.group(2)}, 日:{match.group(3)}") # 输出同上# 完整匹配print(f"完整日期:{match.group(0)}") # 输出:完整日期:1999年12月31日
-
命名分组
除了使用数字索引,还可以给分组命名,使代码更具可读性,就像给变量起有意义的名字一样。
import re# 使用命名分组 text = "我的邮箱是user123@example.com" email_pattern = r"(?P<username>[a-zA-Z0-9._%+-]+)@(?P<domain>[a-zA-Z0-9.-]+\.(?P<tld>[a-zA-Z]{2,}))"match = re.search(email_pattern, text) if match:# 通过名称访问分组username = match.group('username')domain = match.group('domain')tld = match.group('tld')print(f"用户名:{username}") # 输出:用户名:user123print(f"域名:{domain}") # 输出:域名:example.comprint(f"顶级域名:{tld}") # 输出:顶级域名:com
-
非捕获分组
有时我们只想使用分组的功能而不需要捕获内容,可以使用非捕获分组,就像使用辅助工具但不保留它们一样。
import re# 非捕获分组使用 (?:...) text = "联系电话:13812345678 或 13987654321" phone_pattern = r"(?:1\d)\d{9}" # 匹配1开头的11位手机号,但不捕获前两位phones = re.findall(phone_pattern, text) print(f"找到的电话号码:{phones}") # 输出:找到的电话号码:['13812345678', '13987654321']
6.3.3 替换与分割
-
字符串替换
re模块的sub函数可以用于替换匹配的文本,就像在文档中使用查找和替换功能一样。
import re# 基本替换 text = "我的电话是13812345678,请记住这个号码" # 将电话号码中间部分用星号替换,保护隐私 masked_text = re.sub(r"(1\d{2})\d{4}(\d{4})", r"\1****\2", text) print(masked_text) # 输出:我的电话是138****5678,请记住这个号码# 使用函数进行替换 def convert_date(match):# 将YYYY-MM-DD格式转换为MM/DD/YYYY格式year, month, day = match.groups()return f"{month}/{day}/{year}"text = "会议安排在2023-07-15和2023-08-20" converted_text = re.sub(r"(\d{4})-(\d{2})-(\d{2})", convert_date, text) print(converted_text) # 输出:会议安排在07/15/2023和08/20/2023
-
字符串分割
re模块的split函数可以使用正则表达式作为分隔符来分割字符串,比内置的split方法更灵活。
import re# 使用正则表达式分割字符串 text = "苹果,香蕉;橙子,葡萄;西瓜"# 使用逗号或分号作为分隔符 fruits = re.split(r"[,;]", text) print(fruits) # 输出:['苹果', '香蕉', '橙子', '葡萄', '西瓜']# 限制分割次数 fruits_limited = re.split(r"[,;]", text, maxsplit=2) print(fruits_limited) # 输出:['苹果', '香蕉', '橙子,葡萄;西瓜']
6.3.4 实际应用案例:文本解析器
# 简单的文本解析器,用于提取结构化信息
import reclass TextParser:def __init__(self):# 预编译正则表达式以提高性能self.email_pattern = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")self.phone_pattern = re.compile(r"1[3-9]\d{9}")self.url_pattern = re.compile(r"https?://(?:www\.)?[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(?:/[^\s]*)?")self.date_pattern = re.compile(r"\d{4}[-/年]\d{1,2}[-/月]\d{1,2}[日]?")def extract_emails(self, text):"""提取文本中的所有邮箱地址"""return self.email_pattern.findall(text)def extract_phones(self, text):"""提取文本中的所有手机号码"""return self.phone_pattern.findall(text)def extract_urls(self, text):"""提取文本中的所有URL"""return self.url_pattern.findall(text)def extract_dates(self, text):"""提取文本中的所有日期"""return self.date_pattern.findall(text)def mask_sensitive_info(self, text):"""遮蔽文本中的敏感信息"""# 遮蔽邮箱text = self.email_pattern.sub(lambda m: m.group(0).split('@')[0][:3] + "***@" + m.group(0).split('@')[1], text)# 遮蔽手机号text = self.phone_pattern.sub(lambda m: m.group(0)[:3] + "****" + m.group(0)[-4:], text)return textdef extract_all(self, text):"""提取文本中的所有结构化信息"""return {"emails": self.extract_emails(text),"phones": self.extract_phones(text),"urls": self.extract_urls(text),"dates": self.extract_dates(text)}def parse_resume(self, text):"""解析简历文本,提取关键信息"""# 提取姓名(假设姓名后面跟着"简历"或在开头)name_match = re.search(r"^([\u4e00-\u9fa5]{2,4})(?:的简历|简历|个人简历)", text) or \re.search(r"姓名[::]+\s*([\u4e00-\u9fa5]{2,4})", text)name = name_match.group(1) if name_match else "未知"# 提取年龄age_match = re.search(r"年龄[::]+\s*(\d{1,2})[岁]?", text)age = age_match.group(1) if age_match else "未知"# 提取教育经历edu_matches = re.finditer(r"(\d{4})年?[~-](\d{4}|至今|现在)年?\s+([\u4e00-\u9fa5]+大学|[\u4e00-\u9fa5]+学院)", text)education = [(m.group(1), m.group(2), m.group(3)) for m in edu_matches]# 提取工作经历work_matches = re.finditer(r"(\d{4})年?[~-](\d{4}|至今|现在)年?\s+([\u4e00-\u9fa5]+公司|[\u4e00-\u9fa5]+集团|[\u4e00-\u9fa5]+企业)", text)work_experience = [(m.group(1), m.group(2), m.group(3)) for m in work_matches]# 提取联系方式contact_info = {"email": self.extract_emails(text)[0] if self.extract_emails(text) else "未知","phone": self.extract_phones(text)[0] if self.extract_phones(text) else "未知"}return {"name": name,"age": age,"education": education,"work_experience": work_experience,"contact_info": contact_info}# 使用示例
if __name__ == "__main__":parser = TextParser()# 示例文本sample_text = """张三的简历个人信息:年龄:28岁邮箱:zhangsan@example.com电话:13812345678教育经历:2010-2014 北京大学 计算机科学专业工作经历:2014-2018 腾讯科技有限公司 软件工程师2018-至今 阿里巴巴集团 高级工程师项目经验:1. 开发了公司官网 https://www.example.com/projects2. 参与了2022年3月15日的产品发布会"""# 提取所有结构化信息all_info = parser.extract_all(sample_text)print("提取的所有信息:")for key, value in all_info.items():print(f"{key}: {value}")# 解析简历resume_info = parser.parse_resume(sample_text)print("\n解析的简历信息:")for key, value in resume_info.items():print(f"{key}: {value}")# 遮蔽敏感信息masked_text = parser.mask_sensitive_info(sample_text)print("\n遮蔽敏感信息后的文本:")print(masked_text)
6.4 文件系统操作
6.4.1 os模块
-
路径操作
os模块提供了与操作系统交互的功能,包括文件路径操作,就像在文件资源管理器中导航文件系统一样。
import os# 获取当前工作目录 current_dir = os.getcwd() print(f"当前工作目录:{current_dir}")# 列出目录内容 files = os.listdir(current_dir) print(f"目录内容:{files}")# 创建目录 try:os.mkdir("test_folder") # 创建单个目录print("已创建test_folder目录")os.makedirs("test_folder/sub_folder/sub_sub_folder") # 创建多级目录print("已创建多级目录") except FileExistsError:print("目录已存在")# 删除目录 try:os.rmdir("test_folder/sub_folder/sub_sub_folder") # 删除单个目录(必须为空)print("已删除sub_sub_folder目录") except FileNotFoundError:print("目录不存在")
-
文件操作
os模块还提供了文件操作功能,如重命名、删除文件等,就像在文件资源管理器中对文件进行操作一样。
import os# 创建一个测试文件 with open("test_file.txt", "w") as f:f.write("这是一个测试文件")# 重命名文件 os.rename("test_file.txt", "renamed_file.txt") print("文件已重命名")# 获取文件信息 file_stat = os.stat("renamed_file.txt") print(f"文件大小:{file_stat.st_size} 字节") print(f"最后修改时间:{file_stat.st_mtime}")# 删除文件 os.remove("renamed_file.txt") print("文件已删除")
-
环境变量
os模块可以访问和修改环境变量,这些变量影响程序的运行环境,就像调整房间的温度和湿度一样。
import os# 获取所有环境变量 env_vars = os.environ print(f"环境变量数量:{len(env_vars)}")# 获取特定环境变量 path = os.environ.get("PATH") print(f"PATH环境变量:{path[:100]}...")# 设置环境变量(仅影响当前进程) os.environ["MY_VARIABLE"] = "my_value" print(f"设置的环境变量:{os.environ.get('MY_VARIABLE')}")
6.4.2 os.path模块
-
路径处理
os.path模块提供了处理文件路径的函数,可以跨平台地操作路径,就像使用地图导航一样。
import os.path# 路径拼接(处理不同操作系统的路径分隔符) path = os.path.join("folder", "subfolder", "file.txt") print(f"拼接的路径:{path}")# 获取绝对路径 abs_path = os.path.abspath("file.txt") print(f"绝对路径:{abs_path}")# 获取路径组件 dirname = os.path.dirname(abs_path) basename = os.path.basename(abs_path) print(f"目录名:{dirname}") print(f"文件名:{basename}")# 分割路径 dir_part, file_part = os.path.split(abs_path) print(f"目录部分:{dir_part}") print(f"文件部分:{file_part}")# 分割文件名和扩展名 filename, extension = os.path.splitext("file.txt") print(f"文件名部分:{filename}") print(f"扩展名部分:{extension}")
-
路径检查
os.path模块提供了检查路径状态的函数,如判断文件或目录是否存在,就像确认目的地是否可达一样。
import os.path# 检查路径是否存在 exists = os.path.exists("file.txt") print(f"file.txt 存在吗?{exists}")# 检查是文件还是目录 if os.path.exists("file.txt"):is_file = os.path.isfile("file.txt")is_dir = os.path.isdir("file.txt")print(f"file.txt 是文件吗?{is_file}")print(f"file.txt 是目录吗?{is_dir}")# 获取文件大小 if os.path.isfile("file.txt"):size = os.path.getsize("file.txt")print(f"file.txt 的大小:{size} 字节")# 获取最后修改时间 if os.path.exists("file.txt"):mtime = os.path.getmtime("file.txt")print(f"file.txt 的最后修改时间:{mtime}")
6.4.3 shutil模块
-
高级文件操作
shutil模块提供了更高级的文件操作功能,如复制、移动文件或目录,就像使用更强大的文件管理工具一样。
import shutil import os# 创建测试文件和目录 os.makedirs("source_dir/subdir", exist_ok=True) with open("source_dir/test.txt", "w") as f:f.write("测试内容")# 复制文件 shutil.copy("source_dir/test.txt", "source_dir/test_copy.txt") print("文件已复制")# 复制文件并保留元数据(如修改时间) shutil.copy2("source_dir/test.txt", "source_dir/test_copy2.txt") print("文件已复制(保留元数据)")# 复制目录及其内容 os.makedirs("dest_dir", exist_ok=True) shutil.copytree("source_dir", "dest_dir/source_dir_copy") print("目录已复制")# 移动文件或目录 shutil.move("source_dir/test_copy.txt", "dest_dir/moved_file.txt") print("文件已移动")# 删除目录及其所有内容 shutil.rmtree("dest_dir/source_dir_copy") print("目录已删除")
-
归档操作
shutil模块还提供了创建和解压归档文件(如zip、tar)的功能,就像使用压缩软件一样。
import shutil import os# 创建测试文件和目录 os.makedirs("archive_test/subdir", exist_ok=True) with open("archive_test/file1.txt", "w") as f:f.write("文件1内容") with open("archive_test/subdir/file2.txt", "w") as f:f.write("文件2内容")# 创建zip归档 shutil.make_archive("archive_test_zip", "zip", "archive_test") print("已创建zip归档")# 创建tar归档 shutil.make_archive("archive_test_tar", "tar", "archive_test") print("已创建tar归档")# 创建gztar归档(tar.gz) shutil.make_archive("archive_test_gztar", "gztar", "archive_test") print("已创建tar.gz归档")# 解压归档 os.makedirs("extracted", exist_ok=True) shutil.unpack_archive("archive_test_zip.zip", "extracted/zip") print("已解压zip归档")
6.4.4 实际应用案例:文件管理器
# 简单的文件管理器
import os
import shutil
import datetimeclass FileManager:def __init__(self, root_dir=None):# 设置根目录,默认为当前目录self.root_dir = root_dir if root_dir else os.getcwd()self.current_dir = self.root_dirdef list_items(self, show_hidden=False):"""列出当前目录中的文件和子目录"""try:items = os.listdir(self.current_dir)# 过滤隐藏文件(以.开头的文件)if not show_hidden:items = [item for item in items if not item.startswith('.')]# 分类并排序dirs = []files = []for item in items:item_path = os.path.join(self.current_dir, item)if os.path.isdir(item_path):dirs.append((item, "目录", "-", self._get_modified_time(item_path)))else:size = self._format_size(os.path.getsize(item_path))files.append((item, "文件", size, self._get_modified_time(item_path)))# 按名称排序dirs.sort(key=lambda x: x[0].lower())files.sort(key=lambda x: x[0].lower())# 合并目录和文件列表all_items = dirs + files# 打印表头print(f"\n当前目录: {self.current_dir}")print("-" * 80)print(f"{'名称':<30} {'类型':<10} {'大小':<10} {'修改时间':<20}")print("-" * 80)# 打印项目for name, item_type, size, mtime in all_items:print(f"{name:<30} {item_type:<10} {size:<10} {mtime:<20}")print("-" * 80)print(f"共 {len(dirs)} 个目录,{len(files)} 个文件")return all_itemsexcept Exception as e:print(f"列出目录内容时出错:{e}")return []def change_directory(self, path):"""更改当前目录"""try:# 处理特殊路径if path == "..":# 返回上一级目录new_dir = os.path.dirname(self.current_dir)elif path == "~":# 返回用户主目录new_dir = os.path.expanduser("~")elif os.path.isabs(path):# 绝对路径new_dir = pathelse:# 相对路径new_dir = os.path.join(self.current_dir, path)# 检查目录是否存在if os.path.exists(new_dir) and os.path.isdir(new_dir):self.current_dir = new_dirprint(f"已切换到目录: {self.current_dir}")else:print(f"错误: {new_dir} 不是有效的目录")except Exception as e:print(f"更改目录时出错: {e}")def create_directory(self, dir_name):"""创建新目录"""try:# 构建完整路径new_dir_path = os.path.join(self.current_dir, dir_name)# 检查目录是否已存在if os.path.exists(new_dir_path):print(f"错误: {dir_name} 已存在")return False# 创建目录os.makedirs(new_dir_path)print(f"已创建目录: {dir_name}")return Trueexcept Exception as e:print(f"创建目录时出错: {e}")return Falsedef copy_item(self, source, destination):"""复制文件或目录"""try:# 构建完整路径source_path = os.path.join(self.current_dir, source)dest_path = os.path.join(self.current_dir, destination)# 检查源是否存在if not os.path.exists(source_path):print(f"错误: 源 {source} 不存在")return False# 复制文件或目录if os.path.isfile(source_path):shutil.copy2(source_path, dest_path)print(f"已复制文件: {source} -> {destination}")else:shutil.copytree(source_path, dest_path)print(f"已复制目录: {source} -> {destination}")return Trueexcept Exception as e:print(f"复制时出错: {e}")return Falsedef move_item(self, source, destination):"""移动文件或目录"""try:# 构建完整路径source_path = os.path.join(self.current_dir, source)dest_path = os.path.join(self.current_dir, destination)# 检查源是否存在if not os.path.exists(source_path):print(f"错误: 源 {source} 不存在")return False# 移动文件或目录shutil.move(source_path, dest_path)print(f"已移动: {source} -> {destination}")return Trueexcept Exception as e:print(f"移动时出错: {e}")return Falsedef delete_item(self, path, confirm=True):"""删除文件或目录"""try:# 构建完整路径full_path = os.path.join(self.current_dir, path)# 检查项目是否存在if not os.path.exists(full_path):print(f"错误: {path} 不存在")return False# 确认删除if confirm:item_type = "目录" if os.path.isdir(full_path) else "文件"response = input(f"确定要删除{item_type} {path} 吗? (y/n): ")if response.lower() != 'y':print("已取消删除")return False# 删除文件或目录if os.path.isfile(full_path):os.remove(full_path)print(f"已删除文件: {path}")else:shutil.rmtree(full_path)print(f"已删除目录: {path}")return Trueexcept Exception as e:print(f"删除时出错: {e}")return Falsedef search_items(self, pattern, search_files=True):"""搜索文件和目录"""try:results = []pattern = pattern.lower() # 不区分大小写for root, dirs, files in os.walk(self.current_dir):# 搜索目录for dir_name in dirs:if pattern in dir_name.lower():rel_path = os.path.relpath(os.path.join(root, dir_name), self.current_dir)results.append((rel_path, "目录"))# 搜索文件if search_files:for file_name in files:if pattern in file_name.lower():rel_path = os.path.relpath(os.path.join(root, file_name), self.current_dir)results.append((rel_path, "文件"))# 打印结果if results:print(f"\n搜索 '{pattern}' 的结果:")print("-" * 80)print(f"{'路径':<60} {'类型':<10}")print("-" * 80)for path, item_type in results:print(f"{path:<60} {item_type:<10}")print("-" * 80)print(f"共找到 {len(results)} 个匹配项")else:print(f"\n没有找到匹配 '{pattern}' 的项目")return resultsexcept Exception as e:print(f"搜索时出错: {e}")return []def _get_modified_time(self, path):"""获取格式化的修改时间"""try:mtime = os.path.getmtime(path)dt = datetime.datetime.fromtimestamp(mtime)return dt.strftime("%Y-%m-%d %H:%M:%S")except:return "未知"def _format_size(self, size_bytes):"""格式化文件大小"""if size_bytes < 1024: # 小于1KBreturn f"{size_bytes}B"elif size_bytes < 1024 * 1024: # 小于1MBreturn f"{size_bytes/1024:.1f}KB"elif size_bytes < 1024 * 1024 * 1024: # 小于1GBreturn f"{size_bytes/(1024*1024):.1f}MB"else: # GB或更大return f"{size_bytes/(1024*1024*1024):.1f}GB"# 使用示例
if __name__ == "__main__":manager = FileManager()# 列出当前目录内容manager.list_items()# 创建新目录manager.create_directory("test_dir")# 切换到新目录manager.change_directory("test_dir")# 创建测试文件with open(os.path.join(manager.current_dir, "test_file.txt"), "w") as f:f.write("这是一个测试文件")# 列出当前目录内容manager.list_items()# 搜索文件manager.search_items("test")# 返回上一级目录manager.change_directory("..")# 清理(可选)# manager.delete_item("test_dir", confirm=False)
6.5 网络编程
6.5.1 socket模块
-
基本概念
socket模块提供了底层网络通信的接口,就像电话系统一样,允许不同计算机之间进行通信。
# 网络通信的基本概念 # 就像我们使用电话进行通话,计算机使用socket进行网络通信 # IP地址就像电话号码,端口号就像分机号
-
TCP服务器与客户端
TCP是一种可靠的连接导向的协议,就像打电话一样,需要先建立连接,然后才能通信。
import socket import threading# TCP服务器示例 def start_server():# 创建socket对象server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 绑定地址和端口server_socket.bind(('127.0.0.1', 8888))# 开始监听连接server_socket.listen(5)print("服务器已启动,等待连接...")try:while True:# 接受客户端连接client_socket, addr = server_socket.accept()print(f"接受来自 {addr} 的连接")# 创建新线程处理客户端client_thread = threading.Thread(target=handle_client, args=(client_socket, addr))client_thread.daemon = Trueclient_thread.start()except KeyboardInterrupt:print("服务器关闭")finally:server_socket.close()def handle_client(client_socket, addr):try:while True:# 接收数据data = client_socket.recv(1024)if not data:break# 处理接收到的数据message = data.decode('utf-8')print(f"从 {addr} 接收: {message}")# 发送响应response = f"服务器已收到: {message}"client_socket.send(response.encode('utf-8'))except Exception as e:print(f"处理客户端 {addr} 时出错: {e}")finally:client_socket.close()print(f"与 {addr} 的连接已关闭")# TCP客户端示例 def start_client():# 创建socket对象client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try:# 连接服务器client_socket.connect(('127.0.0.1', 8888))print("已连接到服务器")# 发送消息message = "你好,服务器!"client_socket.send(message.encode('utf-8'))print(f"已发送: {message}")# 接收响应response = client_socket.recv(1024).decode('utf-8')print(f"收到响应: {response}")except Exception as e:print(f"客户端出错: {e}")finally:client_socket.close()print("客户端已关闭")
-
UDP通信
UDP是一种无连接的协议,就像发送短信一样,不需要建立连接就可以直接发送数据。
import socket# UDP服务器示例 def start_udp_server():# 创建UDP socketserver_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)# 绑定地址和端口server_socket.bind(('127.0.0.1', 9999))print("UDP服务器已启动,等待数据...")try:while True:# 接收数据和客户端地址data, addr = server_socket.recvfrom(1024)message = data.decode('utf-8')print(f"从 {addr} 接收: {message}")# 发送响应response = f"服务器已收到: {message}"server_socket.sendto(response.encode('utf-8'), addr)except KeyboardInterrupt:print("UDP服务器关闭")finally:server_socket.close()# UDP客户端示例 def start_udp_client():# 创建UDP socketclient_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)try:# 服务器地址server_addr = ('127.0.0.1', 9999)# 发送消息message = "你好,UDP服务器!"client_socket.sendto(message.encode('utf-8'), server_addr)print(f"已发送: {message}")# 接收响应data, _ = client_socket.recvfrom(1024)response = data.decode('utf-8')print(f"收到响应: {response}")except Exception as e:print(f"UDP客户端出错: {e}")finally:client_socket.close()print("UDP客户端已关闭")
6.5.2 urllib和requests模块
-
基本HTTP请求
urllib是Python标准库中的HTTP客户端,而requests是一个更易用的第三方库,它们就像浏览器一样,可以发送HTTP请求并获取响应。
# 使用urllib进行HTTP请求 import urllib.request import urllib.parse# 发送GET请求 def urllib_get_example():try:# 打开URL并读取响应with urllib.request.urlopen('https://www.example.com') as response:html = response.read().decode('utf-8')print(f"状态码: {response.status}")print(f"响应头: {response.headers}")print(f"响应内容前100个字符: {html[:100]}...")except Exception as e:print(f"请求出错: {e}")# 发送POST请求 def urllib_post_example():try:# 准备POST数据data = {'name': '张三','age': '25'}encoded_data = urllib.parse.urlencode(data).encode('utf-8')# 创建请求对象req = urllib.request.Request('https://httpbin.org/post', data=encoded_data, method='POST')# 添加请求头req.add_header('Content-Type', 'application/x-www-form-urlencoded')req.add_header('User-Agent', 'Mozilla/5.0')# 发送请求并获取响应with urllib.request.urlopen(req) as response:result = response.read().decode('utf-8')print(f"状态码: {response.status}")print(f"响应内容: {result}")except Exception as e:print(f"请求出错: {e}")
# 使用requests进行HTTP请求(需要先安装:pip install requests) import requests# 发送GET请求 def requests_get_example():try:# 发送GET请求response = requests.get('https://www.example.com')# 检查请求是否成功response.raise_for_status()# 输出响应信息print(f"状态码: {response.status_code}")print(f"响应头: {response.headers}")print(f"响应内容前100个字符: {response.text[:100]}...")except requests.exceptions.RequestException as e:print(f"请求出错: {e}")# 发送POST请求 def requests_post_example():try:# 准备POST数据data = {'name': '张三','age': '25'}# 准备请求头headers = {'User-Agent': 'Mozilla/5.0','Content-Type': 'application/x-www-form-urlencoded'}# 发送POST请求response = requests.post('https://httpbin.org/post', data=data, headers=headers)# 检查请求是否成功response.raise_for_status()# 输出响应信息print(f"状态码: {response.status_code}")print(f"响应内容: {response.json()}")except requests.exceptions.RequestException as e:print(f"请求出错: {e}")
-
处理JSON数据
在网络编程中,JSON是一种常用的数据交换格式,就像不同语言之间的翻译器一样。
import requests import json# 获取并处理JSON数据 def handle_json_example():try:# 发送GET请求获取JSON数据response = requests.get('https://jsonplaceholder.typicode.com/posts/1')response.raise_for_status()# 解析JSON响应post_data = response.json()print(f"帖子标题: {post_data['title']}")print(f"帖子内容: {post_data['body']}")# 创建JSON数据new_post = {'title': '新帖子','body': '这是一个新帖子的内容','userId': 1}# 发送POST请求,提交JSON数据post_response = requests.post('https://jsonplaceholder.typicode.com/posts',json=new_post # 自动将字典转换为JSON并设置Content-Type)post_response.raise_for_status()# 输出响应created_post = post_response.json()print(f"创建的帖子ID: {created_post['id']}")print(f"创建的帖子: {created_post}")except requests.exceptions.RequestException as e:print(f"请求出错: {e}")except json.JSONDecodeError as e:print(f"JSON解析错误: {e}")
6.5.3 实际应用案例:简单天气查询应用
# 简单的天气查询应用
import requests
import json
import timeclass WeatherApp:def __init__(self, api_key=None):# 这里使用的是OpenWeatherMap API,需要注册获取API密钥# https://openweathermap.org/apiself.api_key = api_keyself.base_url = "http://api.openweathermap.org/data/2.5/weather"def get_weather(self, city, country_code=None):"""获取指定城市的天气信息"""try:# 准备请求参数params = {'q': city if not country_code else f"{city},{country_code}",'appid': self.api_key,'units': 'metric', # 使用摄氏度'lang': 'zh_cn' # 使用中文}# 发送请求response = requests.get(self.base_url, params=params)response.raise_for_status()# 解析响应weather_data = response.json()return weather_dataexcept requests.exceptions.RequestException as e:print(f"获取天气信息时出错: {e}")return Nonedef format_weather(self, weather_data):"""格式化天气信息"""if not weather_data:return "无法获取天气信息"try:# 提取关键信息city_name = weather_data['name']country = weather_data['sys']['country']weather_main = weather_data['weather'][0]['main']weather_desc = weather_data['weather'][0]['description']temp = weather_data['main']['temp']feels_like = weather_data['main']['feels_like']humidity = weather_data['main']['humidity']wind_speed = weather_data['wind']['speed']timestamp = weather_data['dt']local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))# 格式化输出weather_info = f"""\n{city_name}, {country} 天气情况:时间: {local_time}天气: {weather_desc} ({weather_main})温度: {temp}°C (体感温度: {feels_like}°C)湿度: {humidity}%风速: {wind_speed} m/s"""return weather_infoexcept KeyError as e:return f"解析天气数据时出错: {e}"def interactive_query(self):"""交互式天气查询"""print("\n欢迎使用天气查询应用!")print("输入'退出'结束程序\n")while True:city = input("请输入城市名称: ")if city.lower() in ['退出', 'exit', 'quit']:print("谢谢使用,再见!")breakcountry_code = input("请输入国家代码(可选,如中国为CN): ")if not country_code:country_code = None# 获取并显示天气weather_data = self.get_weather(city, country_code)weather_info = self.format_weather(weather_data)print(weather_info)print("-" * 50)# 使用示例
if __name__ == "__main__":# 替换为你的API密钥api_key = "your_api_key_here"app = WeatherApp(api_key)# 查询特定城市的天气# beijing_weather = app.get_weather("Beijing", "CN")# print(app.format_weather(beijing_weather))# 启动交互式查询app.interactive_query()
6.6 多线程与并发
6.6.1 threading模块
-
线程基础
线程是程序执行的最小单位,多线程允许程序同时执行多个任务,就像我们可以同时听音乐和浏览网页一样。
import threading import time# 定义线程要执行的函数 def worker(name, delay):print(f"{name} 线程开始")# 模拟工作for i in range(5):time.sleep(delay)print(f"{name}: 步骤 {i+1} 完成")print(f"{name} 线程结束")# 创建线程 def create_threads():# 创建两个线程thread1 = threading.Thread(target=worker, args=("线程1", 0.5))thread2 = threading.Thread(target=worker, args=("线程2", 1))# 启动线程thread1.start()thread2.start()# 等待线程结束thread1.join()thread2.join()print("所有线程已完成")
-
线程同步
当多个线程访问共享资源时,需要进行同步以避免冲突,就像多人使用同一个厨房时需要协调一样。
import threading import time# 共享资源 counter = 0# 创建锁 counter_lock = threading.Lock()# 不使用锁的函数(可能导致问题) def increment_without_lock():global counterlocal_counter = countertime.sleep(0.1) # 模拟其他操作counter = local_counter + 1# 使用锁的函数 def increment_with_lock():global counterwith counter_lock: # 获取锁local_counter = countertime.sleep(0.1) # 模拟其他操作counter = local_counter + 1# 测试不使用锁 def test_without_lock():global countercounter = 0threads = []# 创建10个线程for _ in range(10):thread = threading.Thread(target=increment_without_lock)threads.append(thread)thread.start()# 等待所有线程完成for thread in threads:thread.join()print(f"不使用锁的结果: {counter}")# 测试使用锁 def test_with_lock():global countercounter = 0threads = []# 创建10个线程for _ in range(10):thread = threading.Thread(target=increment_with_lock)threads.append(thread)thread.start()# 等待所有线程完成for thread in threads:thread.join()print(f"使用锁的结果: {counter}")
-
线程池
线程池是一种线程使用模式,它预先创建一定数量的线程,用于执行任务,避免频繁创建和销毁线程的开销,就像餐厅预先安排好服务员一样。
import concurrent.futures import time# 要执行的任务函数 def task(name):print(f"任务 {name} 开始执行")# 模拟耗时操作time.sleep(1)print(f"任务 {name} 执行完成")return f"任务 {name} 的结果"# 使用线程池 def use_thread_pool():# 创建线程池,最多同时运行5个线程with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:# 提交10个任务futures = [executor.submit(task, f"任务{i}") for i in range(10)]# 获取任务结果(按完成顺序)for future in concurrent.futures.as_completed(futures):try:result = future.result()print(f"获取结果: {result}")except Exception as e:print(f"任务执行出错: {e}")# 使用map方法 def use_thread_pool_map():with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:# 使用map方法批量提交任务并获取结果(按提交顺序)tasks = [f"任务{i}" for i in range(5)]results = executor.map(task, tasks)# 处理结果for result in results:print(f"map结果: {result}")
6.6.2 concurrent.futures模块
-
线程池和进程池
concurrent.futures模块提供了高级的异步执行接口,包括线程池和进程池,使并行编程更加简单,就像有一个管理团队帮你分配和监督工作一样。
import concurrent.futures import time import math# 计算密集型任务(适合进程池) def calculate_prime_factors(n):"""计算一个数的所有质因数"""factors = []# 检查2是否为因数while n % 2 == 0:factors.append(2)n //= 2# 检查从3开始的奇数for i in range(3, int(math.sqrt(n)) + 1, 2):while n % i == 0:factors.append(i)n //= i# 如果n大于2,那么n本身就是一个质数if n > 2:factors.append(n)return factors# 使用进程池处理计算密集型任务 def use_process_pool():numbers = [1234567, 9876543, 5555555, 7777777, 8888888, 9999999]start_time = time.time()# 使用进程池with concurrent.futures.ProcessPoolExecutor() as executor:# 提交任务并获取结果results = executor.map(calculate_prime_factors, numbers)# 处理结果for number, factors in zip(numbers, results):print(f"{number} 的质因数: {factors}")end_time = time.time()print(f"进程池执行时间: {end_time - start_time:.2f}秒")# 比较:使用单进程顺序执行 def use_single_process():numbers = [1234567, 9876543, 5555555, 7777777, 8888888, 9999999]start_time = time.time()# 顺序执行results = []for number in numbers:factors = calculate_prime_factors(number)results.append((number, factors))# 处理结果for number, factors in results:print(f"{number} 的质因数: {factors}")end_time = time.time()print(f"单进程执行时间: {end_time - start_time:.2f}秒")
-
Future对象
Future对象表示异步执行的结果,可以查询执行状态、获取结果或取消执行,就像跟踪订单状态一样。
import concurrent.futures import time import random# 模拟耗时任务 def long_task(task_id):# 随机执行时间duration = random.uniform(1, 5)print(f"任务{task_id}开始执行,预计耗时{duration:.2f}秒")time.sleep(duration)# 模拟任务可能失败if random.random() < 0.2: # 20%的失败率raise Exception(f"任务{task_id}执行失败")return f"任务{task_id}的结果"# 使用Future对象 def use_futures():with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:# 提交5个任务future_to_id = {executor.submit(long_task, i): i for i in range(5)}# 处理完成的任务for future in concurrent.futures.as_completed(future_to_id):task_id = future_to_id[future]try:# 获取结果(如果任务抛出异常,这里会重新抛出)result = future.result()print(f"任务{task_id}完成: {result}")except Exception as e:print(f"任务{task_id}出错: {e}")# 取消任务示例 def cancel_task_example():with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:# 提交任务futures = [executor.submit(long_task, i) for i in range(5)]# 尝试取消一些任务for i, future in enumerate(futures):if i % 2 == 0: # 取消偶数索引的任务cancelled = future.cancel()print(f"尝试取消任务{i}: {'成功' if cancelled else '失败'}")# 处理结果for i, future in enumerate(futures):if not future.cancelled():try:result = future.result()print(f"任务{i}完成: {result}")except Exception as e:print(f"任务{i}出错: {e}")else:print(f"任务{i}已取消")
6.6.3 multiprocessing模块
-
进程基础
multiprocessing模块提供了创建和管理进程的功能,进程是操作系统分配资源的基本单位,每个进程有独立的内存空间,就像不同的工厂各自独立运作一样。
import multiprocessing import os import time# 进程要执行的函数 def worker_process(name):print(f"进程 {name} 开始, 进程ID: {os.getpid()}")# 模拟工作for i in range(3):time.sleep(1)print(f"进程 {name}: 步骤 {i+1} 完成")print(f"进程 {name} 结束")# 创建进程 def create_processes():print(f"主进程ID: {os.getpid()}")# 创建两个进程process1 = multiprocessing.Process(target=worker_process, args=("进程1",))process2 = multiprocessing.Process(target=worker_process, args=("进程2",))# 启动进程process1.start()process2.start()# 等待进程结束process1.join()process2.join()print("所有进程已完成")
-
进程间通信
由于进程之间内存是隔离的,需要特殊的机制进行通信,就像不同城市之间需要邮件或电话来交流一样。
import multiprocessing import time# 使用队列进行进程间通信 def producer(queue):"""生产者进程,向队列中放入数据"""print("生产者进程开始")# 生产数据for i in range(5):item = f"数据项 {i}"queue.put(item)print(f"生产者放入: {item}")time.sleep(0.5)# 放入结束标记queue.put(None)print("生产者进程结束")def consumer(queue):"""消费者进程,从队列中获取数据"""print("消费者进程开始")# 消费数据while True:item = queue.get()if item is None: # 结束标记breakprint(f"消费者获取: {item}")time.sleep(1)print("消费者进程结束")# 使用管道进行进程间通信 def pipe_process_a(conn):"""管道一端的进程"""print("进程A开始")# 发送数据for i in range(3):message = f"来自进程A的消息 {i}"conn.send(message)print(f"进程A发送: {message}")time.sleep(0.5)# 接收数据for i in range(2):message = conn.recv()print(f"进程A接收: {message}")conn.close()print("进程A结束")def pipe_process_b(conn):"""管道另一端的进程"""print("进程B开始")# 接收数据for i in range(3):message = conn.recv()print(f"进程B接收: {message}")# 发送数据for i in range(2):message = f"来自进程B的回复 {i}"conn.send(message)print(f"进程B发送: {message}")time.sleep(0.5)conn.close()print("进程B结束")# 使用共享内存 def shared_memory_example():# 创建共享内存对象shared_value = multiprocessing.Value('i', 0) # 'i'表示整数类型shared_array = multiprocessing.Array('d', [0.0, 0.0, 0.0]) # 'd'表示双精度浮点数# 创建进程process1 = multiprocessing.Process(target=modify_shared_memory, args=(shared_value, shared_array, 1))process2 = multiprocessing.Process(target=modify_shared_memory, args=(shared_value, shared_array, 2))# 启动进程process1.start()process2.start()# 等待进程结束process1.join()process2.join()# 查看最终结果print(f"最终共享值: {shared_value.value}")print(f"最终共享数组: {list(shared_array)}")def modify_shared_memory(shared_value, shared_array, process_id):"""修改共享内存的进程"""print(f"进程{process_id}开始修改共享内存")# 修改共享值with shared_value.get_lock():shared_value.value += process_idprint(f"进程{process_id}将共享值增加到: {shared_value.value}")# 修改共享数组for i in range(len(shared_array)):shared_array[i] += process_id * (i + 1)print(f"进程{process_id}修改共享数组[{i}]为: {shared_array[i]}")time.sleep(0.1)print(f"进程{process_id}完成修改")
6.6.4 实际应用案例:并行下载管理器
# 并行下载管理器
import concurrent.futures
import requests
import time
import osclass DownloadManager:def __init__(self, max_workers=5, timeout=30):self.max_workers = max_workers # 最大工作线程数self.timeout = timeout # 下载超时时间(秒)self.download_dir = "downloads" # 下载目录# 确保下载目录存在if not os.path.exists(self.download_dir):os.makedirs(self.download_dir)def download_file(self, url, filename=None):"""下载单个文件"""try:# 如果没有指定文件名,从URL中提取if filename is None:filename = url.split('/')[-1]if '?' in filename:filename = filename.split('?')[0]# 构建完整的文件路径filepath = os.path.join(self.download_dir, filename)# 发送请求并下载文件start_time = time.time()response = requests.get(url, stream=True, timeout=self.timeout)response.raise_for_status() # 检查请求是否成功# 获取文件大小file_size = int(response.headers.get('content-length', 0))# 写入文件downloaded_size = 0with open(filepath, 'wb') as f:for chunk in response.iter_content(chunk_size=8192):if chunk:f.write(chunk)downloaded_size += len(chunk)# 计算下载时间和速度download_time = time.time() - start_timespeed = downloaded_size / download_time / 1024 if download_time > 0 else 0return {'url': url,'filename': filename,'filepath': filepath,'size': downloaded_size,'time': download_time,'speed': speed,'status': 'success'}except Exception as e:return {'url': url,'filename': filename if filename else url.split('/')[-1],'status': 'failed','error': str(e)}def download_files(self, urls, filenames=None):"""并行下载多个文件"""if filenames is None:filenames = [None] * len(urls)elif len(filenames) != len(urls):raise ValueError("URLs和文件名列表长度必须相同")results = []successful = 0failed = 0total_size = 0total_time = 0print(f"开始下载{len(urls)}个文件...")start_time = time.time()# 使用线程池并行下载with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:# 提交下载任务future_to_url = {executor.submit(self.download_file, url, filename): url for url, filename in zip(urls, filenames)}# 处理完成的任务for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:result = future.result()results.append(result)if result['status'] == 'success':successful += 1total_size += result['size']total_time += result['time']print(f"下载成功: {result['filename']} ({self._format_size(result['size'])}, "f"{result['time']:.2f}秒, {result['speed']:.2f} KB/s)")else:failed += 1print(f"下载失败: {result['filename']} - {result['error']}")except Exception as e:failed += 1print(f"处理下载任务时出错: {url} - {e}")results.append({'url': url,'status': 'failed','error': str(e)})# 计算总下载时间total_elapsed = time.time() - start_time# 打印下载摘要print("\n下载摘要:")print(f"总文件数: {len(urls)}")print(f"成功: {successful}, 失败: {failed}")print(f"总下载大小: {self._format_size(total_size)}")print(f"总耗时: {total_elapsed:.2f}秒")if total_elapsed > 0:print(f"平均下载速度: {total_size / total_elapsed / 1024:.2f} KB/s")return resultsdef _format_size(self, size_bytes):"""格式化文件大小"""if size_bytes < 1024:return f"{size_bytes}B"elif size_bytes < 1024 * 1024:return f"{size_bytes/1024:.2f}KB"elif size_bytes < 1024 * 1024 * 1024:return f"{size_bytes/(1024*1024):.2f}MB"else:return f"{size_bytes/(1024*1024*1024):.2f}GB"# 使用示例
if __name__ == "__main__":# 创建下载管理器manager = DownloadManager(max_workers=3)# 准备下载链接urls = ["https://www.python.org/static/img/python-logo.png","https://www.python.org/static/img/python-logo-large.png","https://www.python.org/static/community_logos/python-powered-h-140x182.png","https://www.python.org/static/favicon.ico"]# 自定义文件名(可选)filenames = ["python-logo.png","python-logo-large.png","python-powered.png","python-favicon.ico"]# 开始下载results = manager.download_files(urls, filenames)