【Python】日期计算和自动化运行脚本
一、日期计算
1. datetime模块介绍
Python 的 datetime 模块是处理日期和时间的核心模块,提供了多种类和工具,用于操作、格式化和计算日期时间。
objecttimedelta # 主要用于计算时间跨度tzinfo # 时区相关time # 只关注时间date # 只关注日期datetime # 同时有时间和日期
datetime 模块和 time 模块的区别在于time更接近系统底层,可以理解为 datetime 基于 time 进行了封装,提供了更多实用的函数。
2. 基础使用
查询当前时间,并获取时间的每个部分(年月日,时分秒)。
from datetime import datetime, timedelta# 1. 获取当前时间 / 创建自定义时间
now = datetime.now()
print("当前完整时间:", now) # 2024-05-20 15:30:45.123456dt = datetime(2024, 5, 20, 15, 30, 0) # 年, 月, 日, 时, 分, 秒
print(dt) # 2024-05-20 14:30:00# 2. 访问时间属性 (返回值为int类型)
print("\n时间属性分解:")
print("年份:", now.year) # 2024
print("月份:", now.month) # 5
print("日期:", now.day) # 20
print("小时:", now.hour) # 15
print("分钟:", now.minute) # 30
print("秒数:", now.second) # 45
print("微秒:", now.microsecond) # 123456# 3. 转换为 date 和 time 对象
date_part = now.date()
time_part = now.time()
print("\n日期部分:", date_part) # 2024-05-20
print("时间部分:", time_part) # 15:30:45.123456# 3. date 和 time 对象转换为 datetime 对象
new_dt = datetime.combine(date_part, time_part)
print("\n完整时间:", new_dt) # 2024-05-20 15:30:45.123456
3. 时间计算
具体时间的修改,时间大小的比较(直接使用<, >, ==等运算符直接比较 datetime 对象),时间的差值计算。
from datetime import datetime, timedeltanow = datetime.now()# 1. 使用 replace 修改时间
new_time = now.replace(year=2025, hour=8)
print("\n修改后的时间:", new_time) # 2025-05-20 08:30:45.123456# 2. 时间比较
time_1 = datetime(2025,5,20,13,0,0)
time_2 = datetime(2025,5,21,13,0,0)
print(time_1 > time_2) # False# 3. 使用timedelta 计算时间差
delta = timedelta(days=7, hours=3)
future = now + delta
past = now - delta
print("\n7天3小时后:", future)
print("7天3小时前:", past)
4. 时间的表示与输出
时间戳,字符串解析时间,时间的字符串表示。
from datetime import datetime, timedeltanow = datetime.now()# 1. 时间戳操作
timestamp = now.timestamp()
print("\n当前时间戳:", timestamp)
print("时间戳还原:", datetime.fromtimestamp(timestamp))# 2. 从字符串解析时间(strptime)
custom_str = "2023-12-25 20:15:30"
parsed_time = datetime.strptime(custom_str, "%Y-%m-%d %H:%M:%S")
print("\n解析后的时间:", parsed_time)# 3. 格式化输出(strftime)
formatted = now.strftime("%Y年%m月%d日 %H:%M:%S")
print("\n自定义格式:", formatted) # 2024年05月20日 15:30:45
5. 工作日计算
核心方法是weekday方法,基于该方法提供了一些工作日的简单计算函数。
def is_workday(date):"""判断是否为工作日(周一到周五)"""# weekday() 返回 0-4=工作日, 5=周六, 6=周日return date.weekday() < 5 def add_workdays(start_day, days):"""计算指定工作日后的日期(自动跳过周末)"""current = start_dayadded_days = 0while added_days < days:current += timedelta(days=1) # 每次加1天if is_workday(current):added_days += 1return current def count_workdays(start, end):"""计算两个日期之间的工作日数量(包含开始,不包含结束)"""if start > end:start, end = end, starttotal_days = (end - start).daysworkdays = 0for day in range(total_days):current = start + timedelta(days=day)if is_workday(current):workdays += 1return workdays# 示例:从2024-05-20(周一)计算5个工作日后的日期
target_date = add_workdays(start_date, 5)
print(f"\n{start_date.date()} 的5个工作日后是 {target_date.date()}") # 2024-05-27# 示例:计算2024-05-20 到 2024-05-27 之间的工作日数量
date_a = datetime(2024, 5, 20)
date_b = datetime(2024, 5, 27)
print(f"\n{date_a.date()} 到 {date_b.date()} 之间的工作日数: {count_workdays(date_a, date_b)}") # 5
二、APSchedule 脚本运行自动化
1. 基于Windows 任务管理器的自动化不详细描述,需要可以查看这篇文章:
【python】在Windows中定时执行Python脚本的详细用法教学_windows定时任务执行python-CSDN博客
2. 打包为 .exe 文件运行的方法在不详细描述,需要可以查看这篇文章:
如何在Windows下将Python项目秒变exe安装包:零基础教程,手把手教你_windows下生成python的exe-CSDN博客
1. 安装
APScheduler 支持三种调度任务:固定时间间隔,固定时间点,Linux Crontab 命令。同时还支持异步执行、后台执行调度任务。输入以下命令即可安装。
pip install APScheduler
2. 简单使用
简单创建一个调度程序步骤只需要三步:
1. 创建调度器(scheduler)
2. 添加任务(job)
3. 根据触发条件运行调度任务。
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime# 任务函数
def job(message):now = datetime.now().strftime("%H:%M:%S")print(f"[任务执行] {message} | 时间: {now}")# 创建调度器
scheduler = BlockingScheduler()# 添加任务时直接打印提示
print("[调度器] 添加间隔任务(每2秒执行)")
scheduler.add_job(job, 'interval', seconds=2, args=['间隔任务'])# 启动调度器
try:scheduler.start()
except KeyboardInterrupt:print("\n[调度器] 已停止")
3. 详细讲解
APSchedule 有四种组件:
1. 调度器(Scheduler)
2. 作业存储(Job Store)
3. 触发器(Trigger)
4. 执行器(Executor)
3.1 调度器(Scheduler)
任务调度器属于控制器角色,提供了 7 种调度器(最常用的是前3种)
种类 | 用法 |
---|---|
BlockingScheduler() | 当前进程的主线程中运行,阻塞当前线程。 |
BackgroundScheduler() | 后台线程中运行,不会阻塞当前线程。 |
AsyncIOScheduler() | 结合 asyncio模块使用。 |
GeventScheduler() | 使用 gevent作为IO模型,和 GeventExecutor 配合使用。 |
TornadoScheduler() | 使用Tornado的IO模型。用 ioloop.add_timeout 完成定时唤醒。 |
TwistedScheduler() | 配合TwistedExecutor,用 reactor.callLater 完成定时唤醒。 |
QtScheduler() | Qt 应用需使用QTimer完成定时唤醒。 |
它可以配置作业存储器和执行器。例如添加、修改和移除作业都可以在调度器中完成。
方法 | 功能说明 | 示例 |
---|---|---|
add_job() | 添加新任务 | scheduler.add_job(func, 'interval', seconds=5) |
remove_job(job_id) | 删除任务 | scheduler.remove_job('my_job') |
pause_job(job_id) | 暂停任务 | scheduler.pause_job('email_job') |
resume_job(job_id) | 恢复暂停的任务 | scheduler.resume_job('report_job') |
modify_job(job_id, **kwargs) | 修改任务属性 | scheduler.modify_job('sync_job', seconds=10) |
get_jobs() | 获取所有任务列表 | jobs = scheduler.get_jobs() |
3.2 触发器(Trigger)
APScheduler 有三种内置的 trigger:
(1)date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedeltadef job():print("一次性任务触发")scheduler = BlockingScheduler()# 示例1:5秒后执行
scheduler.add_job(job, 'date', run_date=datetime.now() + timedelta(seconds=5))# 示例2:指定具体时间(2023-12-31 23:59:59)
scheduler.add_job(job, 'date', run_date='2023-12-31 23:59:59')scheduler.start()
(2)interval 为固定时间间隔触发。interval 间隔调度。
def job():print("间隔任务触发")# 每30分钟执行一次
scheduler.add_job(job, 'interval', minutes=30,start_date='2023-08-15 09:00:00',end_date='2023-08-20 18:00:00')# weeks, days, hours, minutes, seconds
# start_date:首次触发时间(默认立即开始), end_date:停止触发时间# 每2小时15分钟执行一次(组合参数)
scheduler.add_job(job, 'interval', hours=2, minutes=15)
(3)cron 在特定时间周期性地触发,和Linux crontab格式兼容。
def job():print("Cron任务触发")# 每天8:30执行(传统Cron风格)
scheduler.add_job(job, 'cron', hour=8, minute=30)# 每周一至周五的9:00和17:00执行
scheduler.add_job(job, 'cron', day_of_week='mon-fri',hour='9,17')# 每15分钟执行一次(类似 interval)
scheduler.add_job(job, 'cron', minute='*/15')# 每月最后一天23:59执行
scheduler.add_job(job, 'cron', day='last', hour=23, minute=59)
3.3 作业存储器(Job Stores)
任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中,任务中的数据序列化后保存到持久化数据库,从数据库加载后又反序列化。
-
MemoryJobStore
默认存储器,所有作业保存在内存中,程序终止后丢失。适用于临时任务或测试环境。 -
SQLAlchemyJobStore
使用关系型数据库(如SQLite、MySQL、PostgreSQL)持久化存储。需指定数据库URL。 -
MongoDBJobStore
将作业存储在MongoDB中,需指定数据库和集合名。依赖pymongo
库。 -
RedisJobStore
使用Redis存储作业,支持高效的键值存储。依赖redis
库。 -
自定义存储器
通过继承BaseJobStore
实现自定义存储逻辑(如文件系统、云存储)。
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.jobstores.memory import MemoryJobStorejobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.db'),'memory': MemoryJobStore()
}
scheduler = BlockingScheduler(jobstores=jobstores)
3.4 执行器(Executors)
负责处理作业的运行,提交指定的可调用对象到一个线程或进程池完成作业。当作业完成时,执行器将会通知调度器。
关键参数:
1. max_workers:控制线程 / 进程池的大小
2. misfire_grace_time:任务错过触发后仍允许执行的时间(秒)
3. coalesce:合并多次未执行的任务。
- ThreadPoolExecutor:使用线程池管理任务。
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {'default': ThreadPoolExecutor(max_workers=5) # 最大线程数
}
- ThreadPoolExecutor:使用线程池管理任务。
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {'default': ProcessPoolExecutor(max_workers=3) # 最大进程数
}
- AsyncIOExecutor:基于
asyncio
的异步执行。
from apscheduler.executors.asyncio import AsyncIOExecutor
executors = {'default': AsyncIOExecutor()
}