Skip to content

auto-detect backfill segment count, remove backfill_concurrency config#281

Merged
tonyxiao merged 4 commits intov2from
dynamic_segments
Apr 14, 2026
Merged

auto-detect backfill segment count, remove backfill_concurrency config#281
tonyxiao merged 4 commits intov2from
dynamic_segments

Conversation

@Yostra
Copy link
Copy Markdown
Collaborator

@Yostra Yostra commented Apr 14, 2026

Summary

  • Remove the backfill_concurrency config option — segment count is now auto-detected per stream via a single probe API call
  • Smooth density-based segment count replaces discrete 1/10/50 tiers
  • Concurrency is decoupled from segment count (capped at 15)
  • Probe call doubles as the first page for sparse streams (zero wasted API calls)
  • Rate-limit boilerplate consolidated into a single withRateLimit wrapper

Motivation

backfill_concurrency was a static config knob that users had to guess at. Too few segments wasted parallelism on dense streams; too many added overhead on sparse ones. This makes it automatic — one extra list?limit=100 call per stream on first backfill measures data density and picks the right segment count.

How it works

Density probe → segment count

probeAndBuildSegments fetches 100 items with a created filter (forward-compatible if the range narrows later) and measures what fraction of the total time range they span. The segment count is a smooth inverse function:

segments = min(50, max(1, ceil(1 / timeProgress)))
100 items cover... timeProgress Segments Example
all data (has_more: false) n/a 1 New account with 30 customers
50% of time range 0.5 2 Light-use account
10% of time range 0.1 10 Moderate activity
2% of time range 0.02 50 (max) High-volume account

Decoupled concurrency

Segment count and concurrency are independent. Dense streams get fine-grained 50-segment splits but only MAX_CONCURRENCY=15 execute simultaneously — avoiding memory pressure from 50 concurrent promises racing while still saturating the rate limit.

Probe-as-first-page (single-segment streams)

For sparse streams (1 segment), the probe response is yielded directly as records instead of being discarded. The segment's cursor is set so paginateSegment continues from where the probe left off. For multi-segment streams, the probe is used purely for density estimation — each segment fetches its own data independently.

Rate-limit wrapper

withRateLimit(listFn, rateLimiter) wraps the list function once at construction time. paginateSegment, sequentialBackfillStream, and the probe all receive a plain ListFn — structurally impossible to forget rate limiting on new call sites.

Test plan

  • segmentCountFromDensity unit tests cover smooth curve: zero/negative, sparse, medium, dense, cap at 50, no cliff edges
  • probeAndBuildSegments unit tests cover: empty stream, single-page, sparse, dense, extremely dense, missing created fallback, division-by-zero guard, created filter assertion, firstPage data passthrough
  • Coexist test asserts probe call shape ({ limit: 100, created: ... })
  • All 116 source-stripe unit tests pass
  • All CI checks green including E2E Test Server, E2E Stripe, E2E Docker, E2E Service Docker

@Yostra Yostra marked this pull request as ready for review April 14, 2026 03:15
Copy link
Copy Markdown
Collaborator

@tonyxiao tonyxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Good direction — removing a user-facing config knob that required guessing and replacing it with auto-detection is the right call. The implementation is clean and the probeSegmentCount unit tests cover the key density tiers well. A few concrete issues and some broader design questions below.


Concrete Issues

1. Probe discards fetched data — wasted API call

probeSegmentCount calls listFn({ limit: 100 }), gets up to 100 items, uses them only for density estimation, then throws them away. The actual backfill then re-fetches the same data. For sparse streams where 100 items is the entire dataset, this means fetching everything twice. Should the probe response be returned so the first segment can use it as its first page?

2. Probe call lacks created filter — works today, fragile tomorrow

The probe calls listFn({ limit: 100 }) without a created range. This works because the backfill range is always [accountCreated, now+1] (full account lifetime), so all items fall within range. But if the range is ever narrowed (partial re-backfill, user-specified window), the density estimate would measure data outside the target range. Worth passing { limit: 100, created: { gte: range.gte, lt: range.lt } } for forward-compatibility.

3. 50 concurrent segments may be aggressive

Previously the default was 10 and user-tunable. Now dense streams jump to 50 concurrent generators all fed to mergeAsync(generators, numSegments). The rate limiter throttles individual requests, but 50 in-flight generators means 50 concurrent promises racing, which could cause memory pressure and bursty traffic. Consider decoupling segment count from concurrency — you can split into 50 time segments but only run N (e.g. 10–15) concurrently via mergeAsync.

4. Division-by-zero edge case

const totalSpan = range.lt - range.gte
const timeProgress = (range.lt - lastItem.created) / totalSpan

If range.lt === range.gte (brand-new account or degenerate input), this divides by zero → Infinity. Add: if (totalSpan <= 0) return 1.

5. buildSegments default changed from 10 to 50 — dead code

All call sites now pass numSegments explicitly, so the default parameter is dead. Either keep it at a sane value (10) or remove the default to make it required — having it silently be 50 is confusing if someone adds a call site later.

6. Test .slice(1) is correct but under-specified

The parallelListFn.mock.calls.slice(1) skip is correct, but the test should also assert the probe call's shape: expect(parallelListFn.mock.calls[0][0]).toEqual({ limit: 100 }). Documents that the first call is intentionally a probe, not a regression.

7. "Coexist" test still passes backfillConcurrency

The test at the bottom of index.test.ts ("parallel and sequential streams coexist") still passes backfillConcurrency: 3 — this should be removed now that the parameter is gone from the type signature.

Minor

  • PR title typo: "segmen" → "segment"

Design Questions

Beyond the concrete fixes, a few things worth thinking about:

Should the rate limiter be built into the list function?

Right now rateLimiter is threaded through separately and every call site has to remember to call it. If rate limiting were baked into listFn itself (or a wrapper around it), it would be impossible to forget — and probeSegmentCount wouldn't need rateLimiter as a separate parameter. This is a bigger refactor but would simplify the API surface.

Should segment count be a smooth function rather than discrete tiers?

The current tiers (1 / 10 / 50) are coarse. A stream where 100 items cover 9.9% of the time range gets 50 segments; one at 10.1% gets 10. A continuous function like Math.min(50, Math.ceil(1 / timeProgress)) or a logarithmic mapping would avoid these cliff edges and produce more proportional concurrency. Something to consider.

Should probing be continuous/adaptive rather than a one-time measurement?

A single probe measures density across the entire account lifetime, but data density often varies dramatically (e.g. sparse early history, dense recent activity). A more sophisticated approach would be adaptive — start with a coarse split, then subdivide dense segments further as you discover them (like a binary search or quad-tree). This would naturally concentrate parallelism where the data actually is. Probably overkill for v1, but worth noting as a future direction if the fixed tiers turn out to be suboptimal in practice.

Copy link
Copy Markdown
Collaborator

@tonyxiao tonyxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting changes — with a counter-proposal

The direction is right (auto-detect instead of making users guess), but the current implementation has enough rough edges that I think we should restructure before merging. I've written up a plan that addresses the issues from my earlier review and takes the idea further. The plan is in three independent, incrementally landable changes.


Change 1: Rate-limited ListFn wrapper

Today the same rate-limit boilerplate appears in paginateSegment, sequentialBackfillStream, and now probeSegmentCount:

const wait = await rateLimiter()
if (wait > 0) await new Promise((r) => setTimeout(r, wait * 1000))
const response = await listFn(params)

Wrap it once at construction time:

function withRateLimit(listFn: ListFn, rateLimiter: RateLimiter): ListFn {
  return async (params) => {
    const wait = await rateLimiter()
    if (wait > 0) await new Promise((r) => setTimeout(r, wait * 1000))
    return listFn(params)
  }
}

Apply in listApiBackfill before passing listFn into anything:

const rateLimitedListFn = withRateLimit(resourceConfig.listFn!, rateLimiter)

Then paginateSegment, sequentialBackfillStream, and probe all receive a plain ListFn and just call it. rateLimiter disappears from their signatures entirely. Pure refactor — structurally impossible to forget rate limiting on new call sites.


Change 2: Smooth segment count + decoupled concurrency

Replace the discrete 1 / 10 / 50 tiers with a continuous function:

const MAX_SEGMENTS = 50
const MAX_CONCURRENCY = 15

function segmentCountFromDensity(timeProgress: number): number {
  if (timeProgress <= 0) return MAX_SEGMENTS
  return Math.max(1, Math.min(MAX_SEGMENTS, Math.ceil(1 / timeProgress)))
}

ceil(1 / timeProgress) is a natural inverse relationship — sparse data (timeProgress=0.5) → 2 segments, medium (0.1) → 10, dense (0.02) → 50. No cliff edges at arbitrary thresholds.

Key insight: segment count and concurrency should be independent. Segments define work granularity; concurrency limits how many run at once:

const numSegments = segmentCountFromDensity(timeProgress)
const segments = buildSegments(accountCreated, now, numSegments)
// ...
yield* mergeAsync(generators, MAX_CONCURRENCY) // always capped at 15

This means dense streams get fine-grained 50-segment splits but only 15 execute simultaneously — avoiding the "50 concurrent promises racing" problem. MAX_CONCURRENCY at 15 balances saturating the 25 RPS rate limit (each segment does ~2 pages/sec × 15 ≈ 30 before throttling) without excessive memory pressure.

Also: add if (totalSpan <= 0) return 1 to guard the division-by-zero edge case.


Change 3a: Probe-as-first-page (zero waste)

Instead of a separate probe that discards its data, make the probe be the first page of the newest segment:

async function probeAndBuildSegments(opts: {
  listFn: ListFn  // already rate-limited from Change 1
  range: { gte: number; lt: number }
}): Promise<{ segments: SegmentState[]; firstPage: ListResult }> {
  const { listFn, range } = opts

  // Probe WITH created filter (forward-compatible if range narrows later)
  const firstPage = await listFn({
    limit: 100,
    created: { gte: range.gte, lt: range.lt },
  })

  if (!firstPage.has_more) {
    return {
      segments: [{ index: 0, gte: range.gte, lt: range.lt, page_cursor: null, status: 'pending' }],
      firstPage,
    }
  }

  const lastItem = firstPage.data[firstPage.data.length - 1] as { created?: number }
  const totalSpan = range.lt - range.gte
  if (totalSpan <= 0) {
    return {
      segments: [{ index: 0, gte: range.gte, lt: range.lt, page_cursor: null, status: 'pending' }],
      firstPage,
    }
  }

  const timeProgress = (range.lt - (lastItem?.created ?? range.gte)) / totalSpan
  const numSegments = segmentCountFromDensity(timeProgress)
  const segments = buildSegments(range.gte, range.lt - 1, numSegments)

  return { segments, firstPage }
}

The caller then:

  1. Yields the probe's records directly (they're real data, not waste)
  2. Sets the newest segment's page_cursor to the probe's last item ID
  3. paginateSegment picks up from that cursor and continues normally

This fixes three issues at once: no wasted API call, created filter on the probe for forward-compat, and the density measurement feeds directly into segment construction.


Change 3b: Adaptive subdivision (future — not for this PR)

Each segment measures its own local density after its first page and subdivides if dense. A sparse 2011–2020 segment stays as one piece while a dense 2024–2026 segment splits into 20 sub-segments, concentrating parallelism where the data actually is. The compact state format already supports this (it stores completed ranges + in-flight cursors, not fixed segment arrays).

This is powerful but complex — save for a follow-up once 3a proves the concept and we have data on whether uniform splits are a real bottleneck.


Implementation order

Change 1: withRateLimit wrapper        ← pure refactor, safe to land first
Change 2: smooth segments + cap conc   ← replaces discrete tiers, adds div-by-zero guard
Change 3a: probe-as-first-page         ← zero waste, created filter, combines probe + build
Change 3b: adaptive subdivision        ← future follow-up

Each change is independently valuable and testable. Full plan with code sketches is in docs/plans/2026-04-13-adaptive-backfill.md.

…t-page

Change 1: Extract withRateLimit(listFn, rateLimiter) wrapper so rate-limit
boilerplate is applied once at construction time. paginateSegment,
sequentialBackfillStream, and the probe no longer take rateLimiter
as a parameter — structurally impossible to forget.

Change 2: Replace discrete 1/10/50 segment tiers with smooth
segmentCountFromDensity(timeProgress) = ceil(1/timeProgress), capped at
MAX_SEGMENTS=50. Decouple concurrency from segment count — mergeAsync
now always uses MAX_CONCURRENCY=15 regardless of segment count. Also
adds division-by-zero guard for degenerate ranges.

Change 3a: Replace probeSegmentCount with probeAndBuildSegments that
returns the first page data alongside the segments. The probe call now
uses a created filter (forward-compatible if range narrows later). The
caller yields probe records directly — zero wasted API calls. For sparse
streams the entire dataset comes from the probe with no re-fetch.

Also: export ListResult from openapi barrel, remove buildSegments default
parameter, assert probe call shape in coexist test.

Made-with: Cursor
Committed-By-Agent: cursor
The probe fetches from the full time range (newest-first), so its items
may span multiple segment time ranges. Only yield probe data directly
when numSegments === 1 (sparse/single-page streams). For multi-segment
backfills, the probe is used purely for density estimation — each segment
fetches its own data independently to avoid cursor/range mismatches.

Made-with: Cursor
Committed-By-Agent: cursor
@tonyxiao tonyxiao changed the title probe segmen count, make estimate, remove backfill concurrency setting auto-detect backfill segment count, remove backfill_concurrency config Apr 14, 2026
Move MAX_SEGMENTS and MAX_CONCURRENCY alongside DEFAULT_MAX_RPS so all
three throughput/concurrency knobs live in one file with comments
explaining what each controls and how they relate to each other.

Made-with: Cursor
Committed-By-Agent: cursor
@tonyxiao tonyxiao merged commit 71d5007 into v2 Apr 14, 2026
15 checks passed
@tonyxiao tonyxiao deleted the dynamic_segments branch April 14, 2026 18:58
tonyxiao pushed a commit that referenced this pull request Apr 15, 2026
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants