Skip to content

Commit 4820774

Browse files
committed
feat: not reliable monitor
1 parent 425f627 commit 4820774

22 files changed

Lines changed: 1831 additions & 597 deletions

File tree

src/langbot/pkg/api/http/controller/groups/monitoring.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,3 +263,22 @@ async def get_all_data() -> str:
263263
},
264264
}
265265
)
266+
267+
@self.route('/sessions/<session_id>/analysis', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
268+
async def get_session_analysis(session_id: str) -> str:
269+
"""Get detailed analysis for a specific session"""
270+
analysis = await self.ap.monitoring_service.get_session_analysis(session_id)
271+
272+
# Always return success with the analysis data
273+
# The frontend will handle the 'found: false' case
274+
return self.success(data=analysis)
275+
276+
@self.route('/messages/<message_id>/details', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
277+
async def get_message_details(message_id: str) -> str:
278+
"""Get detailed information for a specific message"""
279+
details = await self.ap.monitoring_service.get_message_details(message_id)
280+
281+
if not details.get('found'):
282+
return self.error(message=f'Message {message_id} not found', code=404)
283+
284+
return self.success(data=details)

src/langbot/pkg/api/http/service/monitoring.py

Lines changed: 260 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ async def record_message(
3131
level: str = 'info',
3232
platform: str | None = None,
3333
user_id: str | None = None,
34+
runner_name: str | None = None,
3435
) -> str:
3536
"""Record a message"""
3637
message_id = str(uuid.uuid4())
@@ -47,6 +48,7 @@ async def record_message(
4748
'level': level,
4849
'platform': platform,
4950
'user_id': user_id,
51+
'runner_name': runner_name,
5052
}
5153

5254
await self.ap.persistence_mgr.execute_async(
@@ -69,6 +71,7 @@ async def record_llm_call(
6971
status: str = 'success',
7072
cost: float | None = None,
7173
error_message: str | None = None,
74+
message_id: str | None = None,
7275
) -> str:
7376
"""Record an LLM call"""
7477
call_id = str(uuid.uuid4())
@@ -88,6 +91,7 @@ async def record_llm_call(
8891
'pipeline_name': pipeline_name,
8992
'session_id': session_id,
9093
'error_message': error_message,
94+
'message_id': message_id,
9195
}
9296

9397
await self.ap.persistence_mgr.execute_async(
@@ -125,16 +129,37 @@ async def record_session_start(
125129
sqlalchemy.insert(persistence_monitoring.MonitoringSession).values(session_data)
126130
)
127131

128-
async def update_session_activity(self, session_id: str) -> None:
129-
"""Update session last activity time and increment message count"""
130-
await self.ap.persistence_mgr.execute_async(
132+
async def update_session_activity(
133+
self,
134+
session_id: str,
135+
pipeline_id: str | None = None,
136+
pipeline_name: str | None = None,
137+
) -> bool:
138+
"""Update session last activity time and increment message count.
139+
140+
Also updates pipeline info if the bot's pipeline has changed.
141+
142+
Returns:
143+
True if session was found and updated, False if session doesn't exist.
144+
"""
145+
update_values = {
146+
'last_activity': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
147+
'message_count': persistence_monitoring.MonitoringSession.message_count + 1,
148+
}
149+
150+
# Update pipeline info if provided (handles pipeline switch)
151+
if pipeline_id is not None:
152+
update_values['pipeline_id'] = pipeline_id
153+
if pipeline_name is not None:
154+
update_values['pipeline_name'] = pipeline_name
155+
156+
result = await self.ap.persistence_mgr.execute_async(
131157
sqlalchemy.update(persistence_monitoring.MonitoringSession)
132158
.where(persistence_monitoring.MonitoringSession.session_id == session_id)
133-
.values({
134-
'last_activity': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
135-
'message_count': persistence_monitoring.MonitoringSession.message_count + 1,
136-
})
159+
.values(update_values)
137160
)
161+
# Check if any rows were updated
162+
return result.rowcount > 0
138163

139164
async def record_error(
140165
self,
@@ -146,6 +171,7 @@ async def record_error(
146171
error_message: str,
147172
session_id: str | None = None,
148173
stack_trace: str | None = None,
174+
message_id: str | None = None,
149175
) -> str:
150176
"""Record an error"""
151177
error_id = str(uuid.uuid4())
@@ -160,6 +186,7 @@ async def record_error(
160186
'pipeline_name': pipeline_name,
161187
'session_id': session_id,
162188
'stack_trace': stack_trace,
189+
'message_id': message_id,
163190
}
164191

165192
await self.ap.persistence_mgr.execute_async(
@@ -168,6 +195,23 @@ async def record_error(
168195

169196
return error_id
170197

198+
async def update_message_status(
199+
self,
200+
message_id: str,
201+
status: str,
202+
level: str | None = None,
203+
) -> None:
204+
"""Update message status"""
205+
update_values = {'status': status}
206+
if level is not None:
207+
update_values['level'] = level
208+
209+
await self.ap.persistence_mgr.execute_async(
210+
sqlalchemy.update(persistence_monitoring.MonitoringMessage)
211+
.where(persistence_monitoring.MonitoringMessage.id == message_id)
212+
.values(update_values)
213+
)
214+
171215
# ========== Query Methods ==========
172216

173217
async def get_overview_metrics(
@@ -228,7 +272,7 @@ async def get_overview_metrics(
228272

229273
success_result = await self.ap.persistence_mgr.execute_async(success_query)
230274
success_count = success_result.scalar() or 0
231-
success_rate = (success_count / total_messages * 100) if total_messages > 0 else 0
275+
success_rate = (success_count / total_messages * 100) if total_messages > 0 else 100
232276

233277
# Active sessions
234278
active_session_query = sqlalchemy.select(
@@ -339,7 +383,12 @@ async def get_llm_calls(
339383
llm_calls_rows = result.all()
340384

341385
return (
342-
[self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringLLMCall, row[0] if isinstance(row, tuple) else row) for row in llm_calls_rows],
386+
[
387+
self.ap.persistence_mgr.serialize_model(
388+
persistence_monitoring.MonitoringLLMCall, row[0] if isinstance(row, tuple) else row
389+
)
390+
for row in llm_calls_rows
391+
],
343392
total,
344393
)
345394

@@ -388,7 +437,12 @@ async def get_sessions(
388437
sessions_rows = result.all()
389438

390439
return (
391-
[self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSession, row[0] if isinstance(row, tuple) else row) for row in sessions_rows],
440+
[
441+
self.ap.persistence_mgr.serialize_model(
442+
persistence_monitoring.MonitoringSession, row[0] if isinstance(row, tuple) else row
443+
)
444+
for row in sessions_rows
445+
],
392446
total,
393447
)
394448

@@ -434,6 +488,201 @@ async def get_errors(
434488
errors_rows = result.all()
435489

436490
return (
437-
[self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row) for row in errors_rows],
491+
[
492+
self.ap.persistence_mgr.serialize_model(
493+
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
494+
)
495+
for row in errors_rows
496+
],
438497
total,
439498
)
499+
500+
async def get_session_analysis(
501+
self,
502+
session_id: str,
503+
) -> dict:
504+
"""Get detailed analysis for a specific session"""
505+
# Get session info
506+
session_query = sqlalchemy.select(persistence_monitoring.MonitoringSession).where(
507+
persistence_monitoring.MonitoringSession.session_id == session_id
508+
)
509+
session_result = await self.ap.persistence_mgr.execute_async(session_query)
510+
session_row = session_result.first()
511+
512+
if not session_row:
513+
return {
514+
'session_id': session_id,
515+
'found': False,
516+
}
517+
518+
session = session_row[0] if isinstance(session_row, tuple) else session_row
519+
520+
# Get messages for this session
521+
messages_query = (
522+
sqlalchemy.select(persistence_monitoring.MonitoringMessage)
523+
.where(persistence_monitoring.MonitoringMessage.session_id == session_id)
524+
.order_by(persistence_monitoring.MonitoringMessage.timestamp.asc())
525+
)
526+
messages_result = await self.ap.persistence_mgr.execute_async(messages_query)
527+
messages_rows = messages_result.all()
528+
529+
# Count messages by status
530+
success_messages = 0
531+
error_messages = 0
532+
pending_messages = 0
533+
for row in messages_rows:
534+
msg = row[0] if isinstance(row, tuple) else row
535+
if msg.status == 'success':
536+
success_messages += 1
537+
elif msg.status == 'error':
538+
error_messages += 1
539+
elif msg.status == 'pending':
540+
pending_messages += 1
541+
542+
# Get LLM calls for this session
543+
llm_query = sqlalchemy.select(persistence_monitoring.MonitoringLLMCall).where(
544+
persistence_monitoring.MonitoringLLMCall.session_id == session_id
545+
)
546+
llm_result = await self.ap.persistence_mgr.execute_async(llm_query)
547+
llm_rows = llm_result.all()
548+
549+
# Calculate LLM statistics
550+
total_llm_calls = len(llm_rows)
551+
total_input_tokens = 0
552+
total_output_tokens = 0
553+
total_tokens = 0
554+
total_duration = 0
555+
success_llm_calls = 0
556+
error_llm_calls = 0
557+
558+
for row in llm_rows:
559+
llm_call = row[0] if isinstance(row, tuple) else row
560+
total_input_tokens += llm_call.input_tokens
561+
total_output_tokens += llm_call.output_tokens
562+
total_tokens += llm_call.total_tokens
563+
total_duration += llm_call.duration
564+
if llm_call.status == 'success':
565+
success_llm_calls += 1
566+
else:
567+
error_llm_calls += 1
568+
569+
# Get errors for this session
570+
error_query = (
571+
sqlalchemy.select(persistence_monitoring.MonitoringError)
572+
.where(persistence_monitoring.MonitoringError.session_id == session_id)
573+
.order_by(persistence_monitoring.MonitoringError.timestamp.desc())
574+
)
575+
error_result = await self.ap.persistence_mgr.execute_async(error_query)
576+
error_rows = error_result.all()
577+
578+
errors = [
579+
self.ap.persistence_mgr.serialize_model(
580+
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
581+
)
582+
for row in error_rows
583+
]
584+
585+
# Calculate session duration
586+
if messages_rows:
587+
first_msg = messages_rows[0][0] if isinstance(messages_rows[0], tuple) else messages_rows[0]
588+
last_msg = messages_rows[-1][0] if isinstance(messages_rows[-1], tuple) else messages_rows[-1]
589+
session_duration_seconds = int((last_msg.timestamp - first_msg.timestamp).total_seconds())
590+
else:
591+
session_duration_seconds = 0
592+
593+
return {
594+
'session_id': session_id,
595+
'found': True,
596+
'session': self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSession, session),
597+
'message_stats': {
598+
'total': len(messages_rows),
599+
'success': success_messages,
600+
'error': error_messages,
601+
'pending': pending_messages,
602+
},
603+
'llm_stats': {
604+
'total_calls': total_llm_calls,
605+
'success_calls': success_llm_calls,
606+
'error_calls': error_llm_calls,
607+
'total_input_tokens': total_input_tokens,
608+
'total_output_tokens': total_output_tokens,
609+
'total_tokens': total_tokens,
610+
'average_duration_ms': int(total_duration / total_llm_calls) if total_llm_calls > 0 else 0,
611+
},
612+
'errors': errors,
613+
'session_duration_seconds': session_duration_seconds,
614+
}
615+
616+
async def get_message_details(
617+
self,
618+
message_id: str,
619+
) -> dict:
620+
"""Get detailed information for a specific message including associated LLM calls and errors"""
621+
# Get message info
622+
message_query = sqlalchemy.select(persistence_monitoring.MonitoringMessage).where(
623+
persistence_monitoring.MonitoringMessage.id == message_id
624+
)
625+
message_result = await self.ap.persistence_mgr.execute_async(message_query)
626+
message_row = message_result.first()
627+
628+
if not message_row:
629+
return {
630+
'message_id': message_id,
631+
'found': False,
632+
}
633+
634+
message = message_row[0] if isinstance(message_row, tuple) else message_row
635+
636+
# Get LLM calls for this message
637+
llm_query = (
638+
sqlalchemy.select(persistence_monitoring.MonitoringLLMCall)
639+
.where(persistence_monitoring.MonitoringLLMCall.message_id == message_id)
640+
.order_by(persistence_monitoring.MonitoringLLMCall.timestamp.asc())
641+
)
642+
llm_result = await self.ap.persistence_mgr.execute_async(llm_query)
643+
llm_rows = llm_result.all()
644+
645+
llm_calls = [
646+
self.ap.persistence_mgr.serialize_model(
647+
persistence_monitoring.MonitoringLLMCall, row[0] if isinstance(row, tuple) else row
648+
)
649+
for row in llm_rows
650+
]
651+
652+
# Calculate LLM statistics
653+
total_input_tokens = sum(call.input_tokens for call in llm_rows)
654+
total_output_tokens = sum(call.output_tokens for call in llm_rows)
655+
total_tokens = sum(call.total_tokens for call in llm_rows)
656+
total_duration = sum(call.duration for call in llm_rows)
657+
658+
# Get errors for this message
659+
error_query = (
660+
sqlalchemy.select(persistence_monitoring.MonitoringError)
661+
.where(persistence_monitoring.MonitoringError.message_id == message_id)
662+
.order_by(persistence_monitoring.MonitoringError.timestamp.asc())
663+
)
664+
error_result = await self.ap.persistence_mgr.execute_async(error_query)
665+
error_rows = error_result.all()
666+
667+
errors = [
668+
self.ap.persistence_mgr.serialize_model(
669+
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
670+
)
671+
for row in error_rows
672+
]
673+
674+
return {
675+
'message_id': message_id,
676+
'found': True,
677+
'message': self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringMessage, message),
678+
'llm_calls': llm_calls,
679+
'llm_stats': {
680+
'total_calls': len(llm_rows),
681+
'total_input_tokens': total_input_tokens,
682+
'total_output_tokens': total_output_tokens,
683+
'total_tokens': total_tokens,
684+
'total_duration_ms': total_duration,
685+
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
686+
},
687+
'errors': errors,
688+
}

0 commit comments

Comments
 (0)