auto-detect backfill segment count, remove backfill_concurrency config#281
auto-detect backfill segment count, remove backfill_concurrency config#281
Conversation
tonyxiao
left a comment
There was a problem hiding this comment.
Review
Good direction — removing a user-facing config knob that required guessing and replacing it with auto-detection is the right call. The implementation is clean and the probeSegmentCount unit tests cover the key density tiers well. A few concrete issues and some broader design questions below.
Concrete Issues
1. Probe discards fetched data — wasted API call
probeSegmentCount calls listFn({ limit: 100 }), gets up to 100 items, uses them only for density estimation, then throws them away. The actual backfill then re-fetches the same data. For sparse streams where 100 items is the entire dataset, this means fetching everything twice. Should the probe response be returned so the first segment can use it as its first page?
2. Probe call lacks created filter — works today, fragile tomorrow
The probe calls listFn({ limit: 100 }) without a created range. This works because the backfill range is always [accountCreated, now+1] (full account lifetime), so all items fall within range. But if the range is ever narrowed (partial re-backfill, user-specified window), the density estimate would measure data outside the target range. Worth passing { limit: 100, created: { gte: range.gte, lt: range.lt } } for forward-compatibility.
3. 50 concurrent segments may be aggressive
Previously the default was 10 and user-tunable. Now dense streams jump to 50 concurrent generators all fed to mergeAsync(generators, numSegments). The rate limiter throttles individual requests, but 50 in-flight generators means 50 concurrent promises racing, which could cause memory pressure and bursty traffic. Consider decoupling segment count from concurrency — you can split into 50 time segments but only run N (e.g. 10–15) concurrently via mergeAsync.
4. Division-by-zero edge case
const totalSpan = range.lt - range.gte
const timeProgress = (range.lt - lastItem.created) / totalSpanIf range.lt === range.gte (brand-new account or degenerate input), this divides by zero → Infinity. Add: if (totalSpan <= 0) return 1.
5. buildSegments default changed from 10 to 50 — dead code
All call sites now pass numSegments explicitly, so the default parameter is dead. Either keep it at a sane value (10) or remove the default to make it required — having it silently be 50 is confusing if someone adds a call site later.
6. Test .slice(1) is correct but under-specified
The parallelListFn.mock.calls.slice(1) skip is correct, but the test should also assert the probe call's shape: expect(parallelListFn.mock.calls[0][0]).toEqual({ limit: 100 }). Documents that the first call is intentionally a probe, not a regression.
7. "Coexist" test still passes backfillConcurrency
The test at the bottom of index.test.ts ("parallel and sequential streams coexist") still passes backfillConcurrency: 3 — this should be removed now that the parameter is gone from the type signature.
Minor
- PR title typo: "segmen" → "segment"
Design Questions
Beyond the concrete fixes, a few things worth thinking about:
Should the rate limiter be built into the list function?
Right now rateLimiter is threaded through separately and every call site has to remember to call it. If rate limiting were baked into listFn itself (or a wrapper around it), it would be impossible to forget — and probeSegmentCount wouldn't need rateLimiter as a separate parameter. This is a bigger refactor but would simplify the API surface.
Should segment count be a smooth function rather than discrete tiers?
The current tiers (1 / 10 / 50) are coarse. A stream where 100 items cover 9.9% of the time range gets 50 segments; one at 10.1% gets 10. A continuous function like Math.min(50, Math.ceil(1 / timeProgress)) or a logarithmic mapping would avoid these cliff edges and produce more proportional concurrency. Something to consider.
Should probing be continuous/adaptive rather than a one-time measurement?
A single probe measures density across the entire account lifetime, but data density often varies dramatically (e.g. sparse early history, dense recent activity). A more sophisticated approach would be adaptive — start with a coarse split, then subdivide dense segments further as you discover them (like a binary search or quad-tree). This would naturally concentrate parallelism where the data actually is. Probably overkill for v1, but worth noting as a future direction if the fixed tiers turn out to be suboptimal in practice.
tonyxiao
left a comment
There was a problem hiding this comment.
Requesting changes — with a counter-proposal
The direction is right (auto-detect instead of making users guess), but the current implementation has enough rough edges that I think we should restructure before merging. I've written up a plan that addresses the issues from my earlier review and takes the idea further. The plan is in three independent, incrementally landable changes.
Change 1: Rate-limited ListFn wrapper
Today the same rate-limit boilerplate appears in paginateSegment, sequentialBackfillStream, and now probeSegmentCount:
const wait = await rateLimiter()
if (wait > 0) await new Promise((r) => setTimeout(r, wait * 1000))
const response = await listFn(params)Wrap it once at construction time:
function withRateLimit(listFn: ListFn, rateLimiter: RateLimiter): ListFn {
return async (params) => {
const wait = await rateLimiter()
if (wait > 0) await new Promise((r) => setTimeout(r, wait * 1000))
return listFn(params)
}
}Apply in listApiBackfill before passing listFn into anything:
const rateLimitedListFn = withRateLimit(resourceConfig.listFn!, rateLimiter)Then paginateSegment, sequentialBackfillStream, and probe all receive a plain ListFn and just call it. rateLimiter disappears from their signatures entirely. Pure refactor — structurally impossible to forget rate limiting on new call sites.
Change 2: Smooth segment count + decoupled concurrency
Replace the discrete 1 / 10 / 50 tiers with a continuous function:
const MAX_SEGMENTS = 50
const MAX_CONCURRENCY = 15
function segmentCountFromDensity(timeProgress: number): number {
if (timeProgress <= 0) return MAX_SEGMENTS
return Math.max(1, Math.min(MAX_SEGMENTS, Math.ceil(1 / timeProgress)))
}ceil(1 / timeProgress) is a natural inverse relationship — sparse data (timeProgress=0.5) → 2 segments, medium (0.1) → 10, dense (0.02) → 50. No cliff edges at arbitrary thresholds.
Key insight: segment count and concurrency should be independent. Segments define work granularity; concurrency limits how many run at once:
const numSegments = segmentCountFromDensity(timeProgress)
const segments = buildSegments(accountCreated, now, numSegments)
// ...
yield* mergeAsync(generators, MAX_CONCURRENCY) // always capped at 15This means dense streams get fine-grained 50-segment splits but only 15 execute simultaneously — avoiding the "50 concurrent promises racing" problem. MAX_CONCURRENCY at 15 balances saturating the 25 RPS rate limit (each segment does ~2 pages/sec × 15 ≈ 30 before throttling) without excessive memory pressure.
Also: add if (totalSpan <= 0) return 1 to guard the division-by-zero edge case.
Change 3a: Probe-as-first-page (zero waste)
Instead of a separate probe that discards its data, make the probe be the first page of the newest segment:
async function probeAndBuildSegments(opts: {
listFn: ListFn // already rate-limited from Change 1
range: { gte: number; lt: number }
}): Promise<{ segments: SegmentState[]; firstPage: ListResult }> {
const { listFn, range } = opts
// Probe WITH created filter (forward-compatible if range narrows later)
const firstPage = await listFn({
limit: 100,
created: { gte: range.gte, lt: range.lt },
})
if (!firstPage.has_more) {
return {
segments: [{ index: 0, gte: range.gte, lt: range.lt, page_cursor: null, status: 'pending' }],
firstPage,
}
}
const lastItem = firstPage.data[firstPage.data.length - 1] as { created?: number }
const totalSpan = range.lt - range.gte
if (totalSpan <= 0) {
return {
segments: [{ index: 0, gte: range.gte, lt: range.lt, page_cursor: null, status: 'pending' }],
firstPage,
}
}
const timeProgress = (range.lt - (lastItem?.created ?? range.gte)) / totalSpan
const numSegments = segmentCountFromDensity(timeProgress)
const segments = buildSegments(range.gte, range.lt - 1, numSegments)
return { segments, firstPage }
}The caller then:
- Yields the probe's records directly (they're real data, not waste)
- Sets the newest segment's
page_cursorto the probe's last item ID paginateSegmentpicks up from that cursor and continues normally
This fixes three issues at once: no wasted API call, created filter on the probe for forward-compat, and the density measurement feeds directly into segment construction.
Change 3b: Adaptive subdivision (future — not for this PR)
Each segment measures its own local density after its first page and subdivides if dense. A sparse 2011–2020 segment stays as one piece while a dense 2024–2026 segment splits into 20 sub-segments, concentrating parallelism where the data actually is. The compact state format already supports this (it stores completed ranges + in-flight cursors, not fixed segment arrays).
This is powerful but complex — save for a follow-up once 3a proves the concept and we have data on whether uniform splits are a real bottleneck.
Implementation order
Change 1: withRateLimit wrapper ← pure refactor, safe to land first
Change 2: smooth segments + cap conc ← replaces discrete tiers, adds div-by-zero guard
Change 3a: probe-as-first-page ← zero waste, created filter, combines probe + build
Change 3b: adaptive subdivision ← future follow-up
Each change is independently valuable and testable. Full plan with code sketches is in docs/plans/2026-04-13-adaptive-backfill.md.
…t-page Change 1: Extract withRateLimit(listFn, rateLimiter) wrapper so rate-limit boilerplate is applied once at construction time. paginateSegment, sequentialBackfillStream, and the probe no longer take rateLimiter as a parameter — structurally impossible to forget. Change 2: Replace discrete 1/10/50 segment tiers with smooth segmentCountFromDensity(timeProgress) = ceil(1/timeProgress), capped at MAX_SEGMENTS=50. Decouple concurrency from segment count — mergeAsync now always uses MAX_CONCURRENCY=15 regardless of segment count. Also adds division-by-zero guard for degenerate ranges. Change 3a: Replace probeSegmentCount with probeAndBuildSegments that returns the first page data alongside the segments. The probe call now uses a created filter (forward-compatible if range narrows later). The caller yields probe records directly — zero wasted API calls. For sparse streams the entire dataset comes from the probe with no re-fetch. Also: export ListResult from openapi barrel, remove buildSegments default parameter, assert probe call shape in coexist test. Made-with: Cursor Committed-By-Agent: cursor
The probe fetches from the full time range (newest-first), so its items may span multiple segment time ranges. Only yield probe data directly when numSegments === 1 (sparse/single-page streams). For multi-segment backfills, the probe is used purely for density estimation — each segment fetches its own data independently to avoid cursor/range mismatches. Made-with: Cursor Committed-By-Agent: cursor
Move MAX_SEGMENTS and MAX_CONCURRENCY alongside DEFAULT_MAX_RPS so all three throughput/concurrency knobs live in one file with comments explaining what each controls and how they relate to each other. Made-with: Cursor Committed-By-Agent: cursor
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Summary
backfill_concurrencyconfig option — segment count is now auto-detected per stream via a single probe API callwithRateLimitwrapperMotivation
backfill_concurrencywas a static config knob that users had to guess at. Too few segments wasted parallelism on dense streams; too many added overhead on sparse ones. This makes it automatic — one extralist?limit=100call per stream on first backfill measures data density and picks the right segment count.How it works
Density probe → segment count
probeAndBuildSegmentsfetches 100 items with acreatedfilter (forward-compatible if the range narrows later) and measures what fraction of the total time range they span. The segment count is a smooth inverse function:has_more: false)Decoupled concurrency
Segment count and concurrency are independent. Dense streams get fine-grained 50-segment splits but only
MAX_CONCURRENCY=15execute simultaneously — avoiding memory pressure from 50 concurrent promises racing while still saturating the rate limit.Probe-as-first-page (single-segment streams)
For sparse streams (1 segment), the probe response is yielded directly as records instead of being discarded. The segment's cursor is set so
paginateSegmentcontinues from where the probe left off. For multi-segment streams, the probe is used purely for density estimation — each segment fetches its own data independently.Rate-limit wrapper
withRateLimit(listFn, rateLimiter)wraps the list function once at construction time.paginateSegment,sequentialBackfillStream, and the probe all receive a plainListFn— structurally impossible to forget rate limiting on new call sites.Test plan
segmentCountFromDensityunit tests cover smooth curve: zero/negative, sparse, medium, dense, cap at 50, no cliff edgesprobeAndBuildSegmentsunit tests cover: empty stream, single-page, sparse, dense, extremely dense, missingcreatedfallback, division-by-zero guard,createdfilter assertion, firstPage data passthrough{ limit: 100, created: ... })