哨兵卫星影像定时任务下载
一 日志管理
urllib3.disable_warnings()
filterwarnings('ignore', category=PytzUsageWarning)
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)# 配置根日志记录器
file_handler = logging.FileHandler('scheduler.log')
file_formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')# 创建文件处理器
file_handler.setFormatter(file_formatter)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))# #创建控制台处理器
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler) # 移除所有现有处理器(避免重复)
# 添加新处理器
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
logger = logging.getLogger(__name__) # 确保使用同一记录器
#添加过滤器
class StackFilter(logging.Filter):
def filter(self, record):
if not hasattr(record, 'stack_info'):
record.stack_info = traceback.format_stack()[:-1] #
return True
logger.addFilter(StackFilter())
#调度器创建
scheduler = AsyncIOScheduler(
#jobstores=jobstores,
timezone="Asia/Shanghai",
job_defaults={'misfire_grace_time': 3600, 'coalesce': True}
)
# 创建信号灯
semaphore = asyncio.Semaphore(2)
#任务访问锁
scheduler_lock = threading.Lock()
#任务队列
# ----------------- 异步任务队列配置 -----------------
task_queue = asyncio.Queue(1000)
#初始化 fastapi
app = FastAPI()
@app.on_event("startup")
async def startup_event():
"""启动时初始化"""
global MAIN_LOOP
MAIN_LOOP = asyncio.get_event_loop()
asyncio.set_event_loop(MAIN_LOOP)
asyncio.get_event_loop().set_debug(True)
#启动任务数据库同步
await sync_scheduled_tasks()
# 启动调度器
if not scheduler.running:
scheduler.start()
print("APScheduler 已启动")
if not scheduler.get_job('task_sync_job'):
scheduler.add_job(sync_scheduled_tasks,'date', id='task_sync_job')
# 启动任务处理器(关键修改)
asyncio.ensure_future(task_processor())
@app.on_event("shutdown")
async def shutdown_event():
"""关闭时清理资源"""
with scheduler_lock:
if scheduler.running:
scheduler.shutdown()
print("APScheduler 已停止")
async with lock:
if current_task:
current_task.cancel()
#任务调度
async def task_processor():
"""增强型任务处理器"""
global current_task
while True:
try:
current_task = await task_queue.get()
if isinstance(current_task, tuple): # 处理带参数的任务
task_func, task_id, kwargs = current_task
try:
await task_func(**kwargs)
#await task_manager.mark_success(task_id)
except TypeError as e: # 明确捕获类型错误
logger.critical(f"未知错误: {traceback.format_exc()}")
except Exception as e:
logger.critical(f"未知错误: {traceback.format_exc()}")
#await task_manager.mark_failed(task_id, str(e))
else: # 原有任务处理
await current_task
except asyncio.CancelledError:
logger.critical(f"未知错误: {traceback.format_exc()}")
logger.warning("任务处理器被终止")
break
except Exception as e:
logger.error(f"任务处理异常: {str(e)}")
finally:
task_queue.task_done()
#包装任务
async def sync_add_async_task(task_func: Callable, *args: Any, **kwargs: Any):
"""线程安全的任务添加方法"""
# 生成任务记录
task_params = {
"username": kwargs.get("username"),
"output_dir": kwargs.get("output_dir"),
"query_satellite": kwargs.get("query_satellite"),
# 其他必要参数...
}
task_id = await task_manager.create_task(task_params)
# 包装任务协程
async def _wrapped_task():
async with lock:
try:
await task_func(task_id, *args, **kwargs)
#await task_manager.mark_success(task_id)
except asyncio.CancelledError:
await task_manager.mark_failed(task_id, "用户取消")
except TypeError as e: # 明确捕获类型错误
logger.critical(f"未知错误: {traceback.format_exc()}")
traceback.print_exc()
except Exception as e:
traceback.print_exc()
await task_manager.mark_failed(task_id, str(e))
# 将任务加入队列
if MAIN_LOOP.is_running():
# 当事件循环已在运行时,跨线程安全提交
asyncio.run_coroutine_threadsafe( task_queue.put(_wrapped_task()), MAIN_LOOP )
else:
# 单线程场景直接运行
loop.run_until_complete(task_queue.put(_wrapped_task()))
#加载数据库任务
async def sync_scheduled_tasks():
"""同步数据库定时任务到APScheduler"""
logger.info("开始同步定时任务...")
async with AsyncSessionLocal() as session:
try:
# 获取所有激活任务
stmt = select(ScheduledTask).where(ScheduledTask.is_active == True)
result = await session.execute(stmt)
tasks = result.scalars().all()
existing_job_ids = {job.id for job in scheduler.get_jobs()}
logger.info(f"从数据库加载到{len(tasks)}个激活任务")
for task in tasks:
job_id = f"task_{task.id}"
logger.debug(f"处理任务ID={task.id}, cron={task.cron_expression}")
try:
trigger = CronTrigger.from_crontab(
task.cron_expression,
timezone="Asia/Shanghai"
)
except ValueError as e:
logger.error(f"任务{task.id} cron表达式错误: {task.cron_expression}")
continue
# 添加或更新任务
if job_id not in existing_job_ids:
scheduler.add_job(
trigger_task_execution,
trigger=trigger,
args=[task.id],
id=job_id,
replace_existing=True
)
logger.info(f"已添加任务: {job_id}")
else:
scheduler.reschedule_job(job_id, trigger=trigger)
logger.info(f"已更新任务: {job_id}")
# 清理已删除任务
for job_id in existing_job_ids:
if not any(f"task_{t.id}" == job_id for t in tasks):
scheduler.remove_job(job_id)
logger.info(f"已移除过期任务: {job_id}")
except Exception as e:
logger.error(f"任务同步失败: {str(e)}", exc_info=True)