python实现接收九数云的异常分析指标推送通知
背景
客户使用九数云做一些经营指标的分析,有些指标异常的情况下需要在业务系统生成待办给到对应的人员去处理。 可以通过九数云的自动化消息通知实现。
实现步骤
先实现一个本地webhook消息接收的服务
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicornapp = FastAPI()@app.post("/webhook")
async def handle_webhook(request: Request):try:# 获取请求头headers = dict(request.headers)# 尝试解析JSON格式的请求体try:data = await request.json()content_type = 'application/json'except:# 如果不是JSON格式,获取原始数据data = await request.body()data = data.decode('utf-8') if isinstance(data, bytes) else datacontent_type = headers.get('Content-Type', 'unknown')# 打印Webhook信息print("\n接收到Webhook请求:")print(f"来源IP: {request.client.host}")print(f"内容类型: {content_type}")print("请求头:")for key, value in headers.items():print(f" {key}: {value}")print("请求体:")print(data)return JSONResponse(content={"status": "success", "message": "Webhook received"})except Exception as e:print(f"处理Webhook时出错: {str(e)}")return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=41069)
配置九数云消息自动化
webhook选择企微机器人就好,自己搭建的webhook服务也可以接收消息