Skip to content

Commit 0f749c8

Browse files
authored
recording: preserve incoming timestamps by default (#993)
Update the recorder to use the new get_packet API that carries timestamps. We have timestamps coming in from packets, so don't attempt to re-write them which could cause playback-rate irregularities. Also do a bit of cleanup around legacy code: * Remove unused env vars * Remove now-unneeded timestamp rewriting helpers in the recorder * Remove some other unused code, eg `recording_coordinator.put` --------- Signed-off-by: Josh Allmann <joshua.allmann@gmail.com>
1 parent b9ad7c1 commit 0f749c8

7 files changed

Lines changed: 70 additions & 185 deletions

File tree

src/scope/cloud/livepeer_app.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,9 @@ async def _media_output_loop(
466466
# TODO make this blocking; we busy-wait a LOT
467467
frame_item = None
468468
if record_node_id is not None:
469-
frame_item = frame_processor.sink_manager.recording.get(record_node_id)
469+
frame_item = frame_processor.sink_manager.recording.get_packet(
470+
record_node_id
471+
)
470472
if frame_item is None:
471473
await asyncio.sleep(0.01) # no frame yet, wait a bit
472474
continue

src/scope/server/recording.py

Lines changed: 32 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
"""Recording-related utility functions for cleanup and download handling."""
22

3-
import fractions
43
import logging
54
import os
65
import shutil
76
import tempfile
87
import threading
9-
import time
108
from pathlib import Path
119

1210
from aiortc import MediaStreamTrack
1311
from aiortc.contrib.media import MediaRecorder, MediaRelay
14-
from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE
12+
from av import VideoFrame
1513

1614
logger = logging.getLogger(__name__)
1715

@@ -21,118 +19,28 @@
2119
"download": "scope_download_",
2220
}
2321

24-
# Environment variables
25-
RECORDING_ENABLED = os.getenv("RECORDING_ENABLED", "false").lower() == "true"
26-
RECORDING_STARTUP_CLEANUP_ENABLED = (
27-
os.getenv("RECORDING_STARTUP_CLEANUP_ENABLED", "true").lower() == "true"
28-
)
29-
3022
RECORDING_MAX_FPS = 30.0 # Must match MediaRecorder's hardcoded rate=30
3123

3224

33-
class TimestampNormalizingTrack(MediaStreamTrack):
34-
"""Wraps a track and assigns wall-clock timestamps starting from 0.
35-
36-
Uses monotonic wall-clock time to compute PTS so that the recorded
37-
MP4 plays back at real-time speed regardless of the source track's
38-
own PTS cadence. This is critical for cloud-relay recordings where
39-
frames may arrive slower than the source track's nominal rate (e.g.
40-
CloudTrack stamps every frame at 1/30 s intervals even when network
41-
round-trips deliver them at 10-15 FPS).
42-
43-
Important: We must create a copy of the frame rather than modifying it
44-
in place, because the relay shares frame objects across all subscribers.
45-
Modifying in place would affect the WebRTC sender and cause encoding errors.
46-
"""
47-
48-
def __init__(self, source_track: MediaStreamTrack):
49-
super().__init__()
50-
self.kind = source_track.kind
51-
self._source = source_track
52-
self._start_time: float | None = None
53-
self._last_frame_time: float | None = None
54-
self._min_frame_interval = 1.0 / RECORDING_MAX_FPS
55-
56-
async def recv(self):
57-
import av
58-
59-
while True:
60-
frame = await self._source.recv()
61-
62-
# Frame rate limiting - skip frames arriving faster than MAX_RECORDING_FPS
63-
current_time = time.monotonic()
64-
if self._last_frame_time is not None:
65-
elapsed = current_time - self._last_frame_time
66-
if elapsed < self._min_frame_interval:
67-
continue # Skip this frame
68-
self._last_frame_time = current_time
69-
70-
if self._start_time is None:
71-
self._start_time = current_time
72-
73-
# Create a new frame with wall-clock-based timestamp.
74-
# Pad to even dimensions — libx264 requires width and height divisible by 2.
75-
arr = frame.to_ndarray(format="rgb24")
76-
h, w = arr.shape[:2]
77-
pad_w = w % 2
78-
pad_h = h % 2
79-
if pad_w or pad_h:
80-
import numpy as np
81-
82-
arr = np.pad(arr, ((0, pad_h), (0, pad_w), (0, 0)), mode="edge")
83-
new_frame = av.VideoFrame.from_ndarray(arr, format="rgb24")
84-
new_frame.pts = int((current_time - self._start_time) * VIDEO_CLOCK_RATE)
85-
new_frame.time_base = VIDEO_TIME_BASE
86-
return new_frame
87-
88-
def stop(self):
89-
self._source.stop()
90-
super().stop()
91-
92-
93-
class AudioTimestampNormalizingTrack(MediaStreamTrack):
94-
"""Wraps an audio track and assigns wall-clock timestamps starting from 0.
95-
96-
Analogous to TimestampNormalizingTrack but for AudioFrame objects.
97-
Uses wall-clock time for PTS to stay in sync with the video track's
98-
wall-clock timestamps. Unlike video, audio frames are not rate-limited
99-
here because the source AudioProcessingTrack already paces at 20ms
100-
intervals.
101-
"""
102-
103-
kind = "audio"
104-
105-
def __init__(self, source_track: MediaStreamTrack):
106-
super().__init__()
107-
self._source = source_track
108-
self._start_time: float | None = None
109-
110-
async def recv(self):
111-
from av import AudioFrame as AvAudioFrame
112-
113-
frame = await self._source.recv()
114-
115-
current_time = time.monotonic()
116-
if self._start_time is None:
117-
self._start_time = current_time
25+
def ensure_even_video_frame(frame: VideoFrame) -> VideoFrame:
26+
"""Pad odd-dimension video frames so encoders like libx264 accept them."""
27+
pts = frame.pts
28+
time_base = frame.time_base
29+
arr = frame.to_ndarray(format="rgb24")
30+
h, w = arr.shape[:2]
31+
pad_w = w % 2
32+
pad_h = h % 2
33+
if not (pad_w or pad_h):
34+
return frame
11835

119-
# Create a copy with wall-clock PTS (relay shares frame objects,
120-
# so we must not mutate in place).
121-
new_frame = AvAudioFrame(
122-
format=frame.format.name,
123-
layout=frame.layout.name,
124-
samples=frame.samples,
125-
)
126-
new_frame.sample_rate = frame.sample_rate
127-
new_frame.pts = int((current_time - self._start_time) * frame.sample_rate)
128-
new_frame.time_base = fractions.Fraction(1, frame.sample_rate)
129-
for i, plane in enumerate(frame.planes):
130-
new_frame.planes[i].update(bytes(plane))
131-
return new_frame
36+
import numpy as np
13237

133-
def stop(self):
134-
self._source.stop()
135-
super().stop()
38+
padded = np.pad(arr, ((0, pad_h), (0, pad_w), (0, 0)), mode="edge")
39+
even_frame = VideoFrame.from_ndarray(padded, format="rgb24")
40+
even_frame.pts = pts
41+
if time_base is not None:
42+
even_frame.time_base = time_base
43+
return even_frame
13644

13745

13846
class RecordingManager:
@@ -182,42 +90,34 @@ def _stop_track_safe(track: MediaStreamTrack | None) -> None:
18290
logger.warning(f"Error stopping recording track: {e}")
18391

18492
def _create_recording_track(self) -> MediaStreamTrack | None:
185-
"""Create a video recording track.
186-
187-
Returns None if no video track is configured. The track is wrapped
188-
in TimestampNormalizingTrack to ensure frame timestamps start from 0
189-
for each new recording.
190-
"""
93+
"""Create a video recording track, preserving source timestamps."""
19194
if self.video_track is None:
19295
return None
19396
if self.relay:
194-
relay_track = self.relay.subscribe(self.video_track)
195-
return TimestampNormalizingTrack(relay_track)
196-
else:
197-
logger.warning("No relay available for recording, using track directly")
198-
return TimestampNormalizingTrack(self.video_track)
97+
return self.relay.subscribe(self.video_track)
98+
logger.warning("No relay available for recording, using track directly")
99+
return self.video_track
199100

200101
def _create_audio_recording_track(self) -> MediaStreamTrack | None:
201-
"""Create an audio recording track.
202-
203-
Returns None if no audio track is configured.
204-
"""
102+
"""Create an audio recording track, preserving source timestamps."""
205103
if self.audio_track is None:
206104
return None
207105
if self.audio_relay:
208-
relay_track = self.audio_relay.subscribe(self.audio_track)
209-
return AudioTimestampNormalizingTrack(relay_track)
210-
else:
211-
logger.warning(
212-
"No audio relay available for recording, using track directly"
213-
)
214-
return AudioTimestampNormalizingTrack(self.audio_track)
106+
return self.audio_relay.subscribe(self.audio_track)
107+
logger.warning("No audio relay available for recording, using track directly")
108+
return self.audio_track
215109

216110
def _create_media_recorder(self, file_path: str) -> MediaRecorder:
217111
"""Create a MediaRecorder instance with standard settings."""
218112
return MediaRecorder(
219113
file_path,
220114
format="mp4",
115+
options={
116+
# force timestamps to start at zero
117+
"use_editlist": "0",
118+
# allows playback before file is fully loaded, eg over http
119+
"movflags": "+faststart",
120+
},
221121
)
222122

223123
async def start_recording(self):
@@ -450,12 +350,6 @@ def cleanup_recording_files():
450350
Clean up all recording files from previous sessions.
451351
This handles cases where the process crashed and files weren't cleaned up.
452352
"""
453-
if not RECORDING_STARTUP_CLEANUP_ENABLED:
454-
logger.info(
455-
"Recording startup cleanup disabled via RECORDING_STARTUP_CLEANUP_ENABLED"
456-
)
457-
return
458-
459353
temp_dir = Path(tempfile.gettempdir())
460354
if not temp_dir.exists():
461355
return

src/scope/server/recording_coordinator.py

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,21 @@
77
import logging
88
import queue
99
from dataclasses import dataclass
10+
from typing import TYPE_CHECKING
1011

11-
import torch
12+
from .media_packets import VideoPacket, ensure_video_packet
1213

13-
from .media_packets import ensure_video_packet
14+
if TYPE_CHECKING:
15+
from .recording import RecordingManager
16+
from .tracks import QueueVideoTrack
1417

1518
logger = logging.getLogger(__name__)
1619

1720

1821
@dataclass
1922
class _RecordingEntry:
20-
manager: object # RecordingManager
21-
track: object # QueueVideoTrack
23+
manager: "RecordingManager"
24+
track: "QueueVideoTrack"
2225
stopped_file: str | None = None # File path after stop (before download)
2326

2427

@@ -89,35 +92,21 @@ def _drain_queue(q: queue.Queue) -> int:
8992
except queue.Empty:
9093
return dropped
9194

92-
def get(self, record_node_id: str) -> torch.Tensor | None:
93-
"""Read a frame from a record node's output queue."""
95+
def get_packet(self, record_node_id: str) -> VideoPacket | None:
96+
"""Read a packet from a record node's output queue."""
9497
rec_q = self._record_queues.get(record_node_id)
9598
if rec_q is None:
9699
return None
97100
try:
98-
frame = ensure_video_packet(rec_q.get_nowait()).tensor
101+
packet = ensure_video_packet(rec_q.get_nowait())
102+
frame = packet.tensor
99103
frame = frame.squeeze(0)
100104
if frame.is_cuda:
101105
frame = frame.cpu()
102-
return frame
106+
return VideoPacket(tensor=frame, timestamp=packet.timestamp)
103107
except queue.Empty:
104108
return None
105109

106-
def put(self, record_node_id: str, frame: torch.Tensor) -> bool:
107-
"""Write a frame into a record node's queue (cloud mode).
108-
109-
Returns True if the frame was enqueued, False if the queue is
110-
missing or full.
111-
"""
112-
rec_q = self._record_queues.get(record_node_id)
113-
if rec_q is None:
114-
return False
115-
try:
116-
rec_q.put_nowait(frame)
117-
return True
118-
except queue.Full:
119-
return False
120-
121110
# ------------------------------------------------------------------
122111
# Recording lifecycle
123112
# ------------------------------------------------------------------

src/scope/server/sink_manager.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010
import queue
1111
import threading
1212
import time
13+
from fractions import Fraction
1314
from typing import TYPE_CHECKING, Any
1415

1516
import numpy as np
1617
import torch
1718

18-
from .media_packets import VideoPacket, ensure_video_packet
19+
from .media_packets import MediaTimestamp, VideoPacket, ensure_video_packet
1920
from .recording_coordinator import RecordingCoordinator
2021

2122
if TYPE_CHECKING:
@@ -436,26 +437,30 @@ def _per_node_sink_loop(self, node_id: str, sink_type: str) -> None:
436437
# Recording
437438
# ------------------------------------------------------------------
438439

439-
def get_from_record(self, record_node_id: str):
440-
"""Read a frame from a record node's output queue."""
441-
return self._recording.get(record_node_id)
442-
443440
def put_to_record(self, node_id: str, frame) -> None:
444441
"""Convert a VideoFrame to tensor and put it into a record node's queue."""
445-
import torch
446-
447442
rec_q = self._recording._record_queues.get(node_id)
448443
if rec_q is None:
449444
return
450445
try:
451446
frame_np = frame.to_ndarray(format="rgb24")
452447
t = torch.as_tensor(frame_np, dtype=torch.uint8).unsqueeze(0)
448+
timestamp = MediaTimestamp()
449+
if (
450+
getattr(frame, "pts", None) is not None
451+
and getattr(frame, "time_base", None) is not None
452+
):
453+
timestamp = MediaTimestamp(
454+
pts=frame.pts,
455+
time_base=Fraction(frame.time_base),
456+
)
457+
packet = VideoPacket(tensor=t, timestamp=timestamp)
453458
try:
454-
rec_q.put_nowait(t)
459+
rec_q.put_nowait(packet)
455460
except queue.Full:
456461
try:
457462
rec_q.get_nowait()
458-
rec_q.put_nowait(t)
463+
rec_q.put_nowait(packet)
459464
except queue.Empty:
460465
pass
461466
except Exception as e:

src/scope/server/tracks.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from .media_packets import VideoPacket, ensure_video_packet
1818
from .pipeline_manager import PipelineManager
19+
from .recording import ensure_even_video_frame
1920

2021
if TYPE_CHECKING:
2122
from .frame_processor import FrameProcessor
@@ -135,8 +136,8 @@ async def recv(self) -> VideoFrame:
135136
if frame_squeezed.is_cuda:
136137
frame_squeezed = frame_squeezed.cpu()
137138

138-
video_frame = VideoFrame.from_ndarray(
139-
frame_squeezed.numpy(), format="rgb24"
139+
video_frame = ensure_even_video_frame(
140+
VideoFrame.from_ndarray(frame_squeezed.numpy(), format="rgb24")
140141
)
141142
if packet.timestamp.is_valid:
142143
await _pace_preserved_timestamp(self, self._pacing, packet)
@@ -181,7 +182,9 @@ async def recv(self) -> VideoFrame:
181182
packet = self._frame_getter(fp)
182183
if packet is not None:
183184
packet = ensure_video_packet(packet)
184-
frame = VideoFrame.from_ndarray(packet.tensor.numpy(), format="rgb24")
185+
frame = ensure_even_video_frame(
186+
VideoFrame.from_ndarray(packet.tensor.numpy(), format="rgb24")
187+
)
185188
if packet.timestamp.is_valid:
186189
await _pace_preserved_timestamp(self, self._pacing, packet)
187190
frame.pts = packet.timestamp.pts

0 commit comments

Comments
 (0)