当前位置: 首页 > java >正文

一个简单的分布式追踪系统

1. 准备工作

导入必要的库

import contextvars
import time
from typing import Any, Optional, Dict, List, Union
from dataclasses import dataclass, field

2. 定义上下文变量

# 定义两个上下文变量,存储当前 Span 和 Trace
_current_span: contextvars.ContextVar[Optional["Span"]] = contextvars.ContextVar("current_span", default=None
)_current_trace: contextvars.ContextVar[Optional["Trace"]] = contextvars.ContextVar("current_trace", default=None
)

3. 数据模型定义

3.1 SpanContext 类

@dataclass
class SpanContext:"""Span 的上下文信息(用于跨进程传递)"""trace_id: strspan_id: stris_remote: bool = False

3.2 Span 类

@dataclass
class Span:"""表示一个操作的时间段追踪"""name: strcontext: SpanContextparent: Optional["Span"] = Nonestart_time: float = field(default_factory=time.time)end_time: Optional[float] = Noneattributes: Dict[str, Any] = field(default_factory=dict)events: List[Dict[str, Any]] = field(default_factory=list)status: str = "UNSET"def end(self, status: str = "OK") -> None:"""结束 Span 并记录状态"""self.end_time = time.time()self.status = statusdef add_event(self, name: str, attributes: Optional[Dict[str, Any]] = None) -> None:"""添加事件到 Span"""self.events.append({"name": name,"timestamp": time.time(),"attributes": attributes or {}})def __enter__(self) -> "Span":"""支持 with 语句"""return selfdef __exit__(self, exc_type, exc_val, exc_tb) -> None:"""自动结束 Span"""self.end("ERROR" if exc_type else "OK")

3.3 Trace 类

@dataclass
class Trace:"""完整的追踪链"""root_span: Spanspans: List[Span] = field(default_factory=list)def add_span(self, span: Span) -> None:"""添加 Span 到 Trace"""self.spans.append(span)

4. 追踪 API 实现

4.1 辅助函数

def generate_id() -> str:"""生成追踪ID(简化版)"""return f"id-{int(time.time() * 1000)}"def get_current_span() -> Optional[Span]:"""获取当前 Span"""return _current_span.get()def get_current_trace() -> Optional[Trace]:"""获取当前 Trace"""return _current_trace.get()

4.2 核心函数

def start_span(name: str, attributes: Optional[Dict[str, Any]] = None) -> Span:"""创建并激活一个新 Span:param name: Span 名称:param attributes: 附加属性:return: 新创建的 Span"""parent = get_current_span()context = SpanContext(trace_id=parent.context.trace_id if parent else generate_id(),span_id=generate_id())span = Span(name=name, context=context, parent=parent)if attributes:span.attributes.update(attributes)# 设置当前 Span_current_span.set(span)# 如果是根 Span,则创建 Traceif parent is None:trace = Trace(root_span=span)_current_trace.set(trace)else:trace = get_current_trace()if trace:trace.add_span(span)return spandef end_span(status: str = "OK") -> None:"""结束当前 Span 并返回父 Span"""current = get_current_span()if current:current.end(status)_current_span.set(current.parent)

5. 数据导出器

class ConsoleExporter:"""将追踪数据打印到控制台"""@staticmethoddef export(trace: Trace) -> None:print("\n=== Exporting Trace ===")print(f"Trace ID: {trace.root_span.context.trace_id}")for span in trace.spans:duration = (span.end_time or time.time()) - span.start_timeprint(f"Span: {span.name} ({duration:.3f}s), Status: {span.status}")

6. 使用示例

6.1 同步代码示例

# 示例 1: 同步代码
with start_span("main_operation", {"type": "sync"}):# 当前 Span 是 "main_operation"with start_span("child_operation"):# 当前 Span 是 "child_operation"get_current_span().add_event("processing_start")time.sleep(0.1)get_current_span().add_event("processing_end")# 手动创建 Spanspan = start_span("manual_span")time.sleep(0.05)span.end()# 导出追踪数据
if trace := get_current_trace():ConsoleExporter.export(trace)

=== Exporting Trace ===
Trace ID: id-1751643441896
Span: main_operation (0.152s), Status: OK
Span: child_operation (0.101s), Status: OK
Span: manual_span (0.050s), Status: OK

6.2 异步代码示例(可选)

import asyncioasync def async_task():with start_span("async_operation"):print(f"Current span: {get_current_span().name}")await asyncio.sleep(0.1)async def main():tasks = [async_task() for _ in range(3)]await asyncio.gather(*tasks)# 运行异步示例
asyncio.run(main())

Current span: async_operation
Current span: async_operation
Current span: async_operation

7. 可视化追踪数据(可选)

import matplotlib.pyplot as pltdef visualize_trace(trace: Trace):fig, ax = plt.subplots(figsize=(10, 6))for i, span in enumerate(trace.spans):duration = (span.end_time or time.time()) - span.start_timeax.barh(span.name, duration, left=span.start_time, alpha=0.6)ax.text(span.start_time, i, f"{duration:.3f}s", va='center')ax.set_xlabel('Time')ax.set_title('Trace Visualization')plt.show()if trace := get_current_trace():visualize_trace(trace)

在这里插入图片描述
代码:https://github.com/zhouruiliangxian/Awesome-demo/blob/main/Distributed-Tracing/%E7%AE%80%E6%98%93%E5%88%86%E5%B8%83%E5%BC%8F%E8%BF%BD%E8%B8%AA%E7%B3%BB%E7%BB%9F.ipynb

http://www.xdnf.cn/news/14819.html

相关文章:

  • 区块链技术在物联网(IoT)中的核心应用场景
  • 利用TCP协议,创建一个多人聊天室
  • 图灵完备之路(数电学习三分钟)----数据选择器与总线
  • 本地区块链服务在物联网中的应用实例
  • python打卡day58@浙大疏锦行
  • 暴雨服务器成功中标华中科技大学集成电路学院服务器采购项目
  • JAVA-springboot 整合Redis
  • Go中使用国家新闻出版署实名认证
  • 【ACP】阿里云云计算高级运维工程师--ACP
  • 硬件嵌入式学习路线大总结(一):C语言与linux。内功心法——从入门到精通,彻底打通你的任督二脉!
  • Docker Desktop 安装到D盘(包括镜像下载等)+ 汉化
  • 7.4_面试_JAVA_
  • css-多条记录,自动换行与自动并行布局及gap兼容
  • linux_git的使用
  • 如何调节笔记本电脑亮度?其实有很多种方式可以调整亮度
  • 深入剖析MYSQL MVCC多版本并发控制+ReadView视图快照规避幻读问题
  • AD7780BRUZ-REEL ADI 24位低功耗ADC转换器 高精度传感器信号链一站式解决方案
  • js中的FileReader对象
  • 指针篇(7)- 指针运算笔试题(阿里巴巴)
  • 计算机科学导论(1)哈佛架构
  • 高功率的照明LN2系列助力电子元件薄膜片检测
  • 二叉树题解——验证二叉搜索树【LeetCode】后序遍历
  • 【狂飙AGI】第8课:AGI-行业大模型(系列2)
  • LangChain 全面入门
  • [ctfshow web入门] web94 `==`特性与intval特性
  • 【Python小工具】使用 OpenCV 获取视频时长的详细指南
  • 【Note】《深入理解Linux内核》Chapter 9 :深入理解 Linux 内核中的进程地址空间管理机制
  • FASTAPI+VUE3平价商贸管理系统
  • MySQL数据库----DML语句
  • 论文阅读笔记——Autoregressive Image Generation without Vector Quantization