当前位置: 首页 > ds >正文

哨兵卫星影像定时任务下载

一 日志管理

urllib3.disable_warnings()
filterwarnings('ignore', category=PytzUsageWarning)


root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)# 配置根日志记录器

file_handler = logging.FileHandler('scheduler.log')
file_formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')# 创建文件处理器
file_handler.setFormatter(file_formatter)

console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))# #创建控制台处理器

for handler in root_logger.handlers[:]:
    root_logger.removeHandler(handler)                          # 移除所有现有处理器(避免重复)

# 添加新处理器
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
logger = logging.getLogger(__name__)  # 确保使用同一记录器

#添加过滤器
class StackFilter(logging.Filter):
    def filter(self, record):
        if not hasattr(record, 'stack_info'):
            record.stack_info = traceback.format_stack()[:-1]  #
        return True

logger.addFilter(StackFilter())

#调度器创建

scheduler = AsyncIOScheduler(
    #jobstores=jobstores,
    timezone="Asia/Shanghai",
    job_defaults={'misfire_grace_time': 3600, 'coalesce': True}
)

# 创建信号灯

semaphore = asyncio.Semaphore(2)

#任务访问锁

scheduler_lock = threading.Lock()

#任务队列

# ----------------- 异步任务队列配置 -----------------
task_queue = asyncio.Queue(1000)

#初始化 fastapi

app = FastAPI()
@app.on_event("startup")
async def startup_event():
    """启动时初始化"""
    global MAIN_LOOP
    MAIN_LOOP = asyncio.get_event_loop()
    asyncio.set_event_loop(MAIN_LOOP)
    asyncio.get_event_loop().set_debug(True)

#启动任务数据库同步
    await sync_scheduled_tasks()
    
    # 启动调度器
    if not scheduler.running:
        scheduler.start()
        print("APScheduler 已启动")


    if not scheduler.get_job('task_sync_job'):
        scheduler.add_job(sync_scheduled_tasks,'date', id='task_sync_job')
    
    # 启动任务处理器(关键修改)

    asyncio.ensure_future(task_processor())

@app.on_event("shutdown")
async def shutdown_event():
    """关闭时清理资源"""
    with scheduler_lock:
        if scheduler.running:
            scheduler.shutdown()
            print("APScheduler 已停止")
    async with lock:
        if current_task:
            current_task.cancel()

#任务调度

async def task_processor():
    """增强型任务处理器"""
    global current_task
    while True:
        try:
            
            current_task = await task_queue.get()
            if isinstance(current_task, tuple):  # 处理带参数的任务
                
                
                task_func, task_id, kwargs = current_task

                try:
                    await task_func(**kwargs)
                    #await task_manager.mark_success(task_id)
                except TypeError as e:  # 明确捕获类型错误
                    logger.critical(f"未知错误: {traceback.format_exc()}")
                    
                except Exception as e:
                    logger.critical(f"未知错误: {traceback.format_exc()}")
                    
                    #await task_manager.mark_failed(task_id, str(e))
            else:  # 原有任务处理
                await current_task
        except asyncio.CancelledError:
            logger.critical(f"未知错误: {traceback.format_exc()}")
            logger.warning("任务处理器被终止")
            break
        except Exception as e:
            logger.error(f"任务处理异常: {str(e)}")
        finally:
            task_queue.task_done()

#包装任务
async def sync_add_async_task(task_func: Callable, *args: Any, **kwargs: Any):
    """线程安全的任务添加方法"""
    # 生成任务记录
    task_params = {
        "username": kwargs.get("username"),
        "output_dir": kwargs.get("output_dir"),
        "query_satellite": kwargs.get("query_satellite"),
        # 其他必要参数...
    }
    task_id =  await  task_manager.create_task(task_params)

    # 包装任务协程
    async def _wrapped_task():
        async with lock:
            try:
                
                await task_func(task_id, *args, **kwargs)
                #await task_manager.mark_success(task_id)
            except asyncio.CancelledError:
                await task_manager.mark_failed(task_id, "用户取消")
            except TypeError as e:  # 明确捕获类型错误
                logger.critical(f"未知错误: {traceback.format_exc()}")
                traceback.print_exc()
                    
                
            except Exception as e:
                traceback.print_exc()
                await task_manager.mark_failed(task_id, str(e))
    
    # 将任务加入队列
    if MAIN_LOOP.is_running():
        # 当事件循环已在运行时,跨线程安全提交
        
        asyncio.run_coroutine_threadsafe(  task_queue.put(_wrapped_task()),   MAIN_LOOP        )
    else:
       
        # 单线程场景直接运行
        loop.run_until_complete(task_queue.put(_wrapped_task()))

#加载数据库任务

async def sync_scheduled_tasks():
     
    """同步数据库定时任务到APScheduler"""
    
    logger.info("开始同步定时任务...")
    
    async with AsyncSessionLocal() as session:
        try:
            # 获取所有激活任务
            stmt = select(ScheduledTask).where(ScheduledTask.is_active == True)
            result = await session.execute(stmt)
            tasks = result.scalars().all()
            
            existing_job_ids = {job.id for job in scheduler.get_jobs()}
            logger.info(f"从数据库加载到{len(tasks)}个激活任务")
            for task in tasks:
                job_id = f"task_{task.id}"
                logger.debug(f"处理任务ID={task.id}, cron={task.cron_expression}")
                try:
                    trigger = CronTrigger.from_crontab(
                        task.cron_expression,
                        timezone="Asia/Shanghai"
                    )
                except ValueError as e:
                    logger.error(f"任务{task.id} cron表达式错误: {task.cron_expression}")
                    continue
                
                # 添加或更新任务
                if job_id not in existing_job_ids:
                    scheduler.add_job(
                        trigger_task_execution,
                        trigger=trigger,
                        args=[task.id],
                        id=job_id,
                        replace_existing=True
                    )
                    logger.info(f"已添加任务: {job_id}")
                else:
                    scheduler.reschedule_job(job_id, trigger=trigger)
                    logger.info(f"已更新任务: {job_id}")
            
            # 清理已删除任务
            for job_id in existing_job_ids:
                if not any(f"task_{t.id}" == job_id for t in tasks):
                    scheduler.remove_job(job_id)
                    logger.info(f"已移除过期任务: {job_id}")
                    
        except Exception as e:
            logger.error(f"任务同步失败: {str(e)}", exc_info=True)

http://www.xdnf.cn/news/942.html

相关文章:

  • 网络原理 - 3(UDP 协议)
  • 线性DP:最短编辑距离
  • 【leetcode刷题日记】lc.62-不同路径
  • 【leetcode刷题日记】lc.416-分割等和子集
  • Linux操作系统--进程等待
  • 《Android 应用开发基础教程》——第五章:RecyclerView 列表视图与适配器机制
  • oracle expdp/impdp 用法详解
  • ACWing——算法基础课
  • Linux常见指令介绍中(入门级)
  • 包管理工具有哪些?主流软件分享
  • 网络原理——UDP
  • element-plus中,Steps 步骤条组件的使用
  • 从多个Excel批量筛查数据后合并到一起
  • CompletableFuture并行处理任务
  • 技术视界 | 开源新视野: 人形机器人技术崛起,开源社区驱动创新
  • Feign
  • IQ信号和实信号的关系与转换的matlab实现
  • kafka监控kafka manager(CMAK)部署配置
  • LX5-STM32F103C8T6引脚分布与定义
  • 在已有 Kubernetes 集群中最小化离线安装 KubeSphere4.1.3
  • 衡石 ChatBI 用户手册-使用指南
  • Docker安装beef-xss
  • 爱家桌面app官方正版下载 爱家最新版免费安装 固件升级方法
  • [特殊字符] Prompt如何驱动大模型对本地文件实现自主变更:Cline技术深度解析
  • stm32week12
  • 《小型支付商城系统》学习记录
  • 测试模版1
  • 4.21总结
  • 思科路由器做DNS服务器
  • [数据可视化] Datagear使用心得:从数据整备到可视化联动实践