当前位置: 首页 > backend >正文

流处理、实时分析与RAG驱动的Python ETL框架:构建智能数据管道(中)

第四章:核心模块Python实现详解

在这里插入图片描述

4.1 数据接入模块:基于FastAPI + Kafka的通用接收器
# fastapi_kafka_ingestor.py
import asyncio
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from confluent_kafka import Producer, KafkaException
import json
import logging
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)# Kafka配置 (应从环境变量或配置中心读取)
KAFKA_BROKERS = "kafka1:9092,kafka2:9092"
KAFKA_TOPIC = "raw_events"# Pydantic模型用于请求验证
class EventModel(BaseModel):event_id: str = Field(..., description="Unique event identifier")event_type: str = Field(..., description="Type of the event")source: str = Field(..., description="Source system of the event")timestamp: Optional[str] = Field(None, description="Event timestamp (ISO 8601)")payload: Dict[str, Any] = Field(..., description="Event data payload")# Kafka Producer配置
conf = {'bootstrap.servers': KAFKA_BROKERS,'client.id': 'fastapi_ingestor',# 可选: 启用压缩# 'compression.codec': 'snappy',# 可选: 启用批处理# 'batch.num.messages': 100,# 'linger.ms': 10,# 可选: 启用ACKs保证# 'acks': 'all',# 可选: 重试# 'retries': 3,# 'retry.backoff.ms': 100,
}
producer = Producer(conf)# FastAPI应用
app = FastAPI(title="Real-Time Event Ingestion API",description="API for ingesting events into Kafka",version="1.0.0"
)# 异步发送消息到Kafka
async def produce_event(topic: str, key: str, value: dict):loop = asyncio.get_event_loop()try:# 在单独的线程中运行同步的producer.produceawait loop.run_in_executor(None, lambda: producer.produce(topic, key=key, value=json.dumps(value).encode('utf-8')))producer.poll(0)  # 触发回调logger.info(f"Event with key '{key}' sent to topic '{topic}'")except BufferError:logger.error(f"Kafka producer buffer full for key '{key}'")raise HTTPException(status_code=503, detail="Service temporarily unavailable (Kafka buffer full)")except KafkaException as e:logger.error(f"Kafka error for key '{key}': {e}")raise HTTPException(status_code=500, detail=f"Internal server error (Kafka: {e})")# 交付报告回调 (可选,用于确认消息是否成功发送)
def delivery_report(err, msg):if err is not None:logger.error(f'Message delivery failed: {err}')else:logger.info(f'Message delivered to {msg.topic()} [{msg.partition()}]')# 设置交付报告回调
producer = Producer({**conf, 'on_delivery': delivery_report})# API端点:接收单个事件
@app.post("/events/", response_model=Dict[str, str], status_code=202)
async def ingest_event(event: EventModel, background_tasks: BackgroundTasks):"""Ingest a single event into the Kafka topic."""try:# 如果没有提供时间戳,使用当前时间if not event.timestamp:from datetime import datetimeevent.timestamp = datetime.utcnow().isoformat() + "Z"# 构造Kafka消息kafka_message = event.dict()kafka_key = event.event_id  # 使用event_id作为Kafka key保证顺序性# 异步发送消息 (使用BackgroundTasks避免阻塞响应)background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message)return {"status": "accepted", "event_id": event.event_id}except HTTPException:raiseexcept Exception as e:logger.error(f"Unexpected error ingesting event {event.event_id}: {e}")raise HTTPException(status_code=500, detail="Internal server error")# API端点:批量接收事件
@app.post("/events/batch/", response_model=Dict[str, Any], status_code=207)
async def ingest_events_batch(events: list[EventModel], background_tasks: BackgroundTasks):"""Ingest a batch of events into the Kafka topic.Returns a multi-status response indicating success/failure per event."""results = []success_count = 0failure_count = 0for event in events:try:if not event.timestamp:from datetime import datetimeevent.timestamp = datetime.utcnow().isoformat() + "Z"kafka_message = event.dict()kafka_key = event.event_idbackground_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message)results.append({"event_id": event.event_id, "status": "accepted"}
http://www.xdnf.cn/news/17804.html

相关文章:

  • python中的map函数
  • Android UI(一)登录注册 - Compose
  • 【数据可视化-89】基孔肯雅热病例数据分析与可视化:Python + pyecharts洞察疫情动态
  • RH134 管理基本存储知识点
  • 【C#补全计划】泛型约束
  • OpenCv(二)——边界填充、阈值处理
  • 37 C++ STL模板库6-string_view
  • Mybatis实现页面增删改查
  • 解锁AI潜能:五步写出让大模型神级指令
  • C#面试题及详细答案120道(01-10)-- 基础语法与数据类型
  • 《嵌入式 C 语言编码规范个人笔记》参考华为C语言规范标准
  • 机器学习-支持向量机器(SVM)
  • CPP模板编程
  • Python学习-----3.基础语法(2)
  • 广义矩估计随机近似中1.2和2.1的差异
  • 如何手动开启 Hyper-V?Windows 10/11 详细开启教程
  • Mybatis 源码解读-Plugin插件源码
  • 系统设计——DDD领域模型驱动实践
  • 如何写出更清晰易读的布尔逻辑判断?
  • 码上爬第九题【协程+webpack】
  • rustdesk 开源遥控软件
  • Wireshark中捕获的大量UDP数据
  • C# 结构体与类的区别是什么?
  • 【论文阅读 | CVPR 2024 | UniRGB-IR:通过适配器调优实现可见光-红外语义任务的统一框架】
  • C++ 23种设计模式的分类总结
  • 软件著作权产生与登记关键点
  • PiscTrace基于YOLO追踪算法的物体速度检测系统详解
  • openvsx搭建私有插件仓库
  • mysql查询中的filesort是指什么
  • 云蝠智能 VoiceAgent:重构物流售后场景的智能化引擎