Skip to content

Commit 794cf07

Browse files
feat(ai): add OpenTelemetry integration for AI span export (#494)
## Problem Users instrumenting their AI/LLM applications with OpenTelemetry have no way to route AI spans to PostHog without writing custom glue code. This is the Python equivalent of PostHog/posthog-js#3358. ## Changes Adds a `posthog.ai.otel` module with two integration patterns: - **`PostHogSpanProcessor`** (recommended) — self-contained `SpanProcessor` that wraps `BatchSpanProcessor` + `OTLPSpanExporter`, filtering to only forward AI spans to PostHog's OTLP endpoint at `/i/v0/ai/otel` - **`PostHogTraceExporter`** — `SpanExporter` that filters AI spans before forwarding, for setups that only accept an exporter (e.g. passed to `SimpleSpanProcessor`) - **`is_ai_span()`** — shared predicate matching `gen_ai.*`, `llm.*`, `ai.*`, `traceloop.*` prefixes on span names and attribute keys Also adds `posthog[otel]` optional dependency group for `opentelemetry-sdk` and `opentelemetry-exporter-otlp-proto-http`. ### Usage ```python from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.resources import Resource, SERVICE_NAME from posthog.ai.otel import PostHogSpanProcessor resource = Resource( attributes={ SERVICE_NAME: "my-app", "posthog.distinct_id": "user-123", } ) provider = TracerProvider(resource=resource) provider.add_span_processor( PostHogSpanProcessor(api_key="phc_...") ) trace.set_tracer_provider(provider) ``` ### Consistency with #482 Uses the same OTLP endpoint (`/i/v0/ai/otel`) and auth header format (`Authorization: Bearer {api_key}`) as the migrated examples in #482. ## How did you test this code? 31 unit tests covering span filtering, processor forwarding/dropping, exporter filtering, endpoint configuration, and lifecycle delegation.
1 parent c59ca59 commit 794cf07

File tree

11 files changed

+587
-3
lines changed

11 files changed

+587
-3
lines changed

posthog/ai/otel/__init__.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
"""PostHog OpenTelemetry integration for AI tracing.
2+
3+
Provides components to route AI-related OpenTelemetry spans to PostHog's
4+
OTLP endpoint. Only spans matching known AI semantic convention prefixes
5+
(gen_ai, llm, ai, traceloop) are forwarded; all other spans are silently
6+
dropped.
7+
8+
Two integration patterns are supported:
9+
10+
1. **PostHogSpanProcessor** (recommended) - Self-contained processor that
11+
handles batching and export internally::
12+
13+
provider = TracerProvider()
14+
provider.add_span_processor(
15+
PostHogSpanProcessor(api_key="phc_...")
16+
)
17+
18+
2. **PostHogTraceExporter** - Exporter for use with your own
19+
BatchSpanProcessor or frameworks that only accept a SpanExporter::
20+
21+
provider = TracerProvider()
22+
provider.add_span_processor(
23+
BatchSpanProcessor(
24+
PostHogTraceExporter(api_key="phc_...")
25+
)
26+
)
27+
"""
28+
29+
from posthog.ai.otel.exporter import PostHogTraceExporter
30+
from posthog.ai.otel.processor import PostHogSpanProcessor
31+
from posthog.ai.otel.spans import is_ai_span
32+
33+
__all__ = ["PostHogSpanProcessor", "PostHogTraceExporter", "is_ai_span"]

posthog/ai/otel/exporter.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""PostHog trace exporter for OpenTelemetry.
2+
3+
Provides a SpanExporter that filters AI-related spans before forwarding them
4+
to PostHog's OTLP endpoint. Use this when your setup only accepts a
5+
SpanExporter (e.g. as an argument to BatchSpanProcessor).
6+
"""
7+
8+
from typing import Optional, Sequence
9+
10+
from opentelemetry.sdk.trace import ReadableSpan
11+
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
12+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
13+
14+
from .spans import DEFAULT_HOST, is_ai_span
15+
16+
17+
class PostHogTraceExporter(SpanExporter):
18+
"""Span exporter that filters AI spans and forwards them to PostHog.
19+
20+
Wraps an OTLPSpanExporter configured for PostHog's OTLP endpoint. Spans
21+
that are not AI-related are silently dropped, returning SUCCESS immediately.
22+
23+
Usage::
24+
25+
from opentelemetry.sdk.trace import TracerProvider
26+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
27+
from posthog.ai.otel import PostHogTraceExporter
28+
29+
provider = TracerProvider()
30+
provider.add_span_processor(
31+
BatchSpanProcessor(
32+
PostHogTraceExporter(api_key="phc_...")
33+
)
34+
)
35+
"""
36+
37+
def __init__(
38+
self,
39+
api_key: str,
40+
host: str = DEFAULT_HOST,
41+
):
42+
"""
43+
Args:
44+
api_key: PostHog project API key.
45+
host: PostHog host URL. Defaults to US cloud.
46+
"""
47+
self._api_key = api_key
48+
self._host = host.rstrip("/")
49+
50+
self._exporter = OTLPSpanExporter(
51+
endpoint=f"{self._host}/i/v0/ai/otel",
52+
headers={"Authorization": f"Bearer {self._api_key}"},
53+
)
54+
55+
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
56+
ai_spans = [span for span in spans if is_ai_span(span)]
57+
if not ai_spans:
58+
return SpanExportResult.SUCCESS
59+
return self._exporter.export(ai_spans)
60+
61+
def shutdown(self) -> None:
62+
self._exporter.shutdown()
63+
64+
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
65+
if timeout_millis is not None:
66+
return self._exporter.force_flush(timeout_millis)
67+
return self._exporter.force_flush()

posthog/ai/otel/processor.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""PostHog span processor for OpenTelemetry.
2+
3+
Provides a self-contained SpanProcessor that filters AI-related spans and
4+
exports them to PostHog's OTLP endpoint. This is the recommended integration
5+
for setups using TracerProvider.add_span_processor().
6+
"""
7+
8+
from typing import Optional
9+
10+
from opentelemetry.context import Context
11+
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
12+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
13+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
14+
15+
from .spans import DEFAULT_HOST, is_ai_span
16+
17+
18+
class PostHogSpanProcessor(SpanProcessor):
19+
"""Span processor that filters AI spans and exports them to PostHog.
20+
21+
Wraps a BatchSpanProcessor and OTLPSpanExporter internally, configured
22+
to send to PostHog's OTLP traces endpoint. Only spans identified as
23+
AI-related (by name or attribute prefix) are forwarded for export.
24+
25+
Usage::
26+
27+
from opentelemetry.sdk.trace import TracerProvider
28+
from posthog.ai.otel import PostHogSpanProcessor
29+
30+
provider = TracerProvider()
31+
provider.add_span_processor(
32+
PostHogSpanProcessor(api_key="phc_...")
33+
)
34+
"""
35+
36+
def __init__(
37+
self,
38+
api_key: str,
39+
host: str = DEFAULT_HOST,
40+
):
41+
"""
42+
Args:
43+
api_key: PostHog project API key.
44+
host: PostHog host URL. Defaults to US cloud.
45+
"""
46+
self._api_key = api_key
47+
self._host = host.rstrip("/")
48+
49+
exporter = OTLPSpanExporter(
50+
endpoint=f"{self._host}/i/v0/ai/otel",
51+
headers={"Authorization": f"Bearer {self._api_key}"},
52+
)
53+
self._processor = BatchSpanProcessor(exporter)
54+
55+
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
56+
pass
57+
58+
def on_end(self, span: ReadableSpan) -> None:
59+
if not is_ai_span(span):
60+
return
61+
self._processor.on_end(span)
62+
63+
def shutdown(self) -> None:
64+
self._processor.shutdown()
65+
66+
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
67+
if timeout_millis is not None:
68+
return self._processor.force_flush(timeout_millis)
69+
return self._processor.force_flush()

posthog/ai/otel/spans.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Shared AI span filtering logic and constants for OpenTelemetry integration."""
2+
3+
from typing import TYPE_CHECKING
4+
5+
if TYPE_CHECKING:
6+
from opentelemetry.sdk.trace import ReadableSpan
7+
8+
DEFAULT_HOST = "https://us.i.posthog.com"
9+
10+
AI_SPAN_PREFIXES = ("gen_ai.", "llm.", "ai.", "traceloop.")
11+
12+
13+
def is_ai_span(span: "ReadableSpan") -> bool:
14+
"""Check if a span is AI-related by examining its name and attribute keys.
15+
16+
Matches spans whose name or any attribute key starts with one of the
17+
known AI semantic convention prefixes (gen_ai, llm, ai, traceloop).
18+
"""
19+
name = span.name
20+
if any(name.startswith(prefix) for prefix in AI_SPAN_PREFIXES):
21+
return True
22+
23+
attributes = span.attributes or {}
24+
for key in attributes:
25+
if any(key.startswith(prefix) for prefix in AI_SPAN_PREFIXES):
26+
return True
27+
28+
return False

posthog/test/ai/otel/__init__.py

Whitespace-only changes.

posthog/test/ai/otel/conftest.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from unittest.mock import MagicMock
2+
3+
4+
def make_span(name: str = "test", attributes: dict | None = None) -> MagicMock:
5+
span = MagicMock()
6+
span.name = name
7+
span.attributes = attributes or {}
8+
return span
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import unittest
2+
from unittest.mock import patch
3+
4+
from opentelemetry.sdk.trace.export import SpanExportResult
5+
6+
from posthog.ai.otel.exporter import PostHogTraceExporter
7+
from posthog.test.ai.otel.conftest import make_span
8+
9+
10+
class TestPostHogTraceExporter(unittest.TestCase):
11+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
12+
def test_configures_exporter_with_correct_endpoint(self, mock_otlp_cls):
13+
PostHogTraceExporter(api_key="phc_test123")
14+
mock_otlp_cls.assert_called_once_with(
15+
endpoint="https://us.i.posthog.com/i/v0/ai/otel",
16+
headers={"Authorization": "Bearer phc_test123"},
17+
)
18+
19+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
20+
def test_configures_custom_host(self, mock_otlp_cls):
21+
PostHogTraceExporter(api_key="phc_test", host="https://eu.i.posthog.com")
22+
mock_otlp_cls.assert_called_once_with(
23+
endpoint="https://eu.i.posthog.com/i/v0/ai/otel",
24+
headers={"Authorization": "Bearer phc_test"},
25+
)
26+
27+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
28+
def test_exports_ai_spans(self, mock_otlp_cls):
29+
exporter = PostHogTraceExporter(api_key="phc_test")
30+
inner = mock_otlp_cls.return_value
31+
inner.export.return_value = SpanExportResult.SUCCESS
32+
33+
spans = [make_span("gen_ai.chat"), make_span("llm.call")]
34+
result = exporter.export(spans)
35+
36+
self.assertEqual(result, SpanExportResult.SUCCESS)
37+
inner.export.assert_called_once_with(spans)
38+
39+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
40+
def test_filters_out_non_ai_spans(self, mock_otlp_cls):
41+
exporter = PostHogTraceExporter(api_key="phc_test")
42+
inner = mock_otlp_cls.return_value
43+
inner.export.return_value = SpanExportResult.SUCCESS
44+
45+
ai_span = make_span("gen_ai.chat")
46+
http_span = make_span("http.request")
47+
result = exporter.export([ai_span, http_span])
48+
49+
self.assertEqual(result, SpanExportResult.SUCCESS)
50+
inner.export.assert_called_once_with([ai_span])
51+
52+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
53+
def test_returns_success_when_no_ai_spans(self, mock_otlp_cls):
54+
exporter = PostHogTraceExporter(api_key="phc_test")
55+
inner = mock_otlp_cls.return_value
56+
57+
result = exporter.export([make_span("http.request"), make_span("db.query")])
58+
59+
self.assertEqual(result, SpanExportResult.SUCCESS)
60+
inner.export.assert_not_called()
61+
62+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
63+
def test_returns_success_for_empty_batch(self, mock_otlp_cls):
64+
exporter = PostHogTraceExporter(api_key="phc_test")
65+
inner = mock_otlp_cls.return_value
66+
67+
result = exporter.export([])
68+
69+
self.assertEqual(result, SpanExportResult.SUCCESS)
70+
inner.export.assert_not_called()
71+
72+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
73+
def test_exports_spans_with_ai_attributes(self, mock_otlp_cls):
74+
exporter = PostHogTraceExporter(api_key="phc_test")
75+
inner = mock_otlp_cls.return_value
76+
inner.export.return_value = SpanExportResult.SUCCESS
77+
78+
span = make_span("http.request", {"gen_ai.system": "openai"})
79+
result = exporter.export([span])
80+
81+
self.assertEqual(result, SpanExportResult.SUCCESS)
82+
inner.export.assert_called_once_with([span])
83+
84+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
85+
def test_shutdown_delegates(self, mock_otlp_cls):
86+
exporter = PostHogTraceExporter(api_key="phc_test")
87+
inner = mock_otlp_cls.return_value
88+
89+
exporter.shutdown()
90+
inner.shutdown.assert_called_once()
91+
92+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
93+
def test_force_flush_delegates(self, mock_otlp_cls):
94+
exporter = PostHogTraceExporter(api_key="phc_test")
95+
inner = mock_otlp_cls.return_value
96+
97+
exporter.force_flush(timeout_millis=5000)
98+
inner.force_flush.assert_called_once_with(5000)

0 commit comments

Comments
 (0)