【AI】OpenAI 使用 Opentelemetry 进行链路监控

简介

OpenTelemetry 是一个开源的可观测性框架,它提供了统一的方式来收集、处理和导出遥测数据(包括 traces、metrics 和 logs)。在使用 OpenAI API 进行AI应用开发时,通过集成 OpenTelemetry 可以帮助我们:

  • 监控API调用性能:跟踪每次 API 调用的延迟、成功率等关键指标
  • 调试和排错:通过链路追踪快速定位问题所在
  • 成本分析:监控 token 使用量,优化成本控制
  • 系统可观测性:将 AI 调用链路与整个应用系统的监控体系集成

环境准备

1. 安装依赖包

首先需要安装 OpenTelemetry 相关的依赖包:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 核心 OpenTelemetry 包
pip install opentelemetry-api opentelemetry-sdk

# OpenAI SDK
pip install openai

# OpenAI 的 OpenTelemetry 插件
pip install opentelemetry-instrumentation-openai

# 其他依赖
pip install rich  # 用于美化输出

2. API 密钥配置

确保你已经获取了相应的 API 密钥,并设置为环境变量:

1
2
3
4
5
# 如果使用 OpenAI
export OPENAI_API_KEY="your-openai-api-key"

# 如果使用阿里云通义千问(本例使用)
export QWEN_API_KEY="your-qwen-api-key"

实现步骤

步骤1:初始化 OpenTelemetry

OpenTelemetry 的初始化包括以下几个关键组件:

  • Resource:定义服务的基本信息(服务名、版本等)
  • TracerProvider:负责创建和管理 tracer
  • SpanProcessor:处理和导出 span 数据
  • Exporter:将遥测数据导出到指定目标

步骤2:配置数据导出器

支持多种导出方式:

  • ConsoleSpanExporter:将数据输出到控制台,适用于开发和调试
  • OTLPSpanExporter:通过 OTLP 协议导出到收集器,适用于生产环境
  • JaegerExporter:直接导出到 Jaeger
  • ZipkinExporter:直接导出到 Zipkin

步骤3:安装 OpenAI 插件

OpenTelemetry 提供了专门的 OpenAI 插件来自动监控 API 调用:

  • opentelemetry-instrumentation-openai:V1版本,稳定可用
  • opentelemetry-instrumentation-openai-v2:V2版本,功能更丰富但仍在完善中

步骤4:使用 OpenAI SDK

启用插件后,所有的 OpenAI API 调用都会自动生成相应的 span,包含以下信息:

  • 请求和响应的详细信息
  • 执行时间
  • token 使用量
  • 错误信息(如果有)

版本说明

V1 vs V2 插件对比

特性V1版本V2版本
稳定性稳定,生产可用开发中,不建议生产使用
数据捕获默认捕获详细内容默认不捕获敏感内容,需手动开启
日志支持使用 SpanEvent支持新的 Log 概念
OpenAI SDK 兼容性1.30 以下版本1.30 ~ 1.35 版本

最终代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
import rich
from openai import (
    OpenAI,
)

from opentelemetry import trace
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (BatchSpanProcessor, ConsoleSpanExporter)
from opentelemetry.sdk.trace.sampling import DEFAULT_ON

base_resource = Resource.create(attributes={SERVICE_NAME: "my-service", SERVICE_VERSION: "0.1.0"})

tracer_provider = TracerProvider(resource=base_resource, sampler=DEFAULT_ON)

otel_exporter = ConsoleSpanExporter()
span_processor = BatchSpanProcessor(otel_exporter)
tracer_provider.add_span_processor(span_processor)

# 可以初始化GRPC等其他类型的Remote采样导出器
# from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# otlp_exporter = OTLPSpanExporter(endpoint="your-collector:4317", insecure=True)
# span_processor = BatchSpanProcessor(otlp_exporter)
# tracer_provider.add_span_processor(span_processor)

# 全局注册
trace.set_tracer_provider(tracer_provider)

# 如果要使用 V2 需要额外的配置,新版的otel有了日志的概念所以将event作为log的一部分,区别于V1中将日志存放于的SpanEvent
# v2 2.1b0 兼容到 1.30 ~ 1.35,1.36对log有改动,LogRecord 被标记为过时的
def instrument_v2():
    # 与V1版本正好相反,V2 需主动打开详细日志监控,以免泄露敏感数据
    os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = "true"

    from opentelemetry import _events, _logs
    from opentelemetry.sdk._logs import LoggerProvider
    from opentelemetry.sdk._logs.export import ConsoleLogExporter, BatchLogRecordProcessor
    from opentelemetry.sdk._events import EventLoggerProvider, EventLogger
    _logs.set_logger_provider(LoggerProvider(resource=base_resource))
    _logs.get_logger_provider().add_log_record_processor(BatchLogRecordProcessor(ConsoleLogExporter()))
    _events.set_event_logger_provider(EventLoggerProvider())

    from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor

    OpenAIInstrumentor().instrument()

def instrument_v1():
    from opentelemetry.instrumentation.openai import OpenAIInstrumentor
    OpenAIInstrumentor().instrument()

instrument_v1()

client = OpenAI(
        api_key=os.getenv("QWEN_API_KEY"),
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    )
model="qwen-plus"

stream = client.chat.completions.create(
    model=model,
    temperature=0.3,
    messages=[
        {"role": "system", "content": "你是一名乐于助人的AI助手,请根据用户的问题给出回答."},
        {"role": "user", "content": "你好"},
    ],
    extra_body={
        "enable_thinking": False,
    },
    stream=True,
)

for chunk in stream:
    # rich.print(chunk, end="", flush=True)
    print(chunk.choices[0].delta.content, end="", flush=True)

后记

langgraph框架也是可以使用Opentelemetry来代替langsmith的部分能力

Licensed under CC BY-NC-SA 4.0