Skip to content

feat(observability): add cascade operation observability for high-fan-out deletions#16592

Merged
max-datahub merged 3 commits intomasterfrom
feat/cascade-operation-observability
Mar 31, 2026
Merged

feat(observability): add cascade operation observability for high-fan-out deletions#16592
max-datahub merged 3 commits intomasterfrom
feat/cascade-operation-observability

Conversation

@max-datahub
Copy link
Copy Markdown
Collaborator

@max-datahub max-datahub commented Mar 15, 2026

Summary

Cascade operations (one mutation triggering writes to many entities) currently have zero observability. When deleteReferencesTo processes 8,000 entities or PropertyDefinitionDeleteSideEffect scrolls through 10,000 SP assignments, there are no metrics, no MDC correlation, no completion logging, and errors are silently swallowed.

This PR adds a reusable CascadeOperationContext helper and applies it to the two highest-risk cascade paths, using only existing infrastructure (Micrometer, SLF4J MDC, SystemMetadata).

Note: This PR does not fix the underlying timeout — deleting a structured property applied to >5K entities will still return 503 (previously a silent 500). The cascade continues executing on the backend and completes. The fix is to make this timeout visible, correctly reported, and measurable. Preventing the timeout entirely requires async cascade processing (see Follow-up section).

Motivation

Stress testing DataHub at 10K entity scale revealed two critical observability gaps:

  1. deleteReferencesTo (19 callers — tag, domain, term, group, user, assertion, form, SP, business attribute, etc.): The handleError() method was a NO-OP, silently discarding all per-entity errors. No metrics on cascade duration, entity count, or error count. No way to detect stalls or partial completion.

  2. PropertyDefinitionDeleteSideEffect: Runs synchronously in the GMS request thread. At >5K entities, Spring's async timeout fires and returns HTTP 500 — but the cascade continues executing on the backend and completes silently. GMS logs nothing about this timeout. No metrics on cascade progress.

Changes

New: CascadeOperationContext (metadata-utils)

A lightweight AutoCloseable utility that any cascade code path can use:

try (CascadeOperationContext cascade = CascadeOperationContext.begin(
        metricUtils, "deleteReferencesTo", triggerUrn, estimatedTotal)) {
    for (Entity entity : entities) {
        processEntity(entity);
        cascade.recordEntityProcessed();
    }
} // close() → emit metrics, conditional log, restore MDC

Two factory methods for different threading models:

  • begin() — with MDC management. Use in try-with-resources on a single thread. Saves and restores previous MDC values on close, supporting nested cascade contexts.
  • beginWithoutMDC() — without MDC management. Use for stream-based callers where begin() and close() may run on different threads (e.g., .onClose(cascade::close)). Metrics and logging still work; only thread-local MDC is skipped.

On begin() / beginWithoutMDC():

  • Generates UUID cascade operation ID
  • If manageMDC=true: saves previous MDC state, sets cascade.operation.id, cascade.trigger.urn, cascade.operation.type
  • Logs at DEBUG (start)

On close():

attachToSystemMetadata(): Sets cascadeOperationId in SystemMetadata.properties for cross-service correlation. Null-safe (creates properties map if null, handles null SystemMetadata).

Public constants: MDC keys (MDC_CASCADE_OPERATION_ID, MDC_CASCADE_TRIGGER_URN, MDC_CASCADE_OPERATION_TYPE) and SystemMetadata property key (SYSTEM_METADATA_CASCADE_ID_KEY) are public static final for use by Kafka consumers, avoiding hard-coded string drift.

DeleteEntityService

  • Inject @Nullable MetricUtils (new constructor param, optional like S3Util)
  • Wrap deleteReferencesTo() scroll loop in CascadeOperationContext (with MDC, try-with-resources)
  • Pass scrollResult.getNumResults() as estimated total
  • Call cascade.recordEntityProcessed() per entity inside the forEach lambda
  • Add overloaded handleError(error, cascade) that records errors in cascade context
  • All 5 handleError call sites within the cascade flow updated to two-arg version
  • getAspects() overloaded: 4-arg version for dry-run path, 5-arg with cascade for deletion path

PropertyDefinitionDeleteSideEffect

  • Add @Nullable MetricUtils field (Lombok setter, chained)
  • Remove static from generatePatchMCPs() and generatePatchRemove() (both private, no external API change)
  • Create CascadeOperationContext via beginWithoutMDC() in generatePatchMCPs() — avoids thread-safety issues since the stream may be consumed on a different thread than it was created on
  • Each entity in the scroll records entity processed + attaches cascade ID to SystemMetadata
  • Stream .onClose(cascade::close) ensures metrics/logging fire even on partial consumption

EntityServiceImpl (fixes pre-existing stream resource leak)

  • Wrap processPostCommitMCLSideEffects() stream in try-with-resources
  • Captures the original applyPostMCPSideEffects() stream (not the re-wrapped Iterators.partition stream) to ensure onClose() fires
  • Guarantees PropertyDefinitionDeleteSideEffect's cascade context close() fires in all cases (normal completion, partial consumption, or exception)

AbstractKafkaListener (consumer-side MDC)

  • After setMDCContext(event), extract cascadeOperationId from SystemMetadata.properties into MDC
  • Uses shared constants from CascadeOperationContext (not hard-coded strings)
  • Enables grepping a single cascade operation ID across GMS + consumer logs to see the full lifecycle
  • Cleanup handled by existing MDC.clear() in finally block

GlobalControllerExceptionHandler

  • Add dedicated @ExceptionHandler(AsyncRequestTimeoutException.class)
  • Logs at WARN with request path and method
  • Emits datahub.http.async_timeout counter metric (path normalized via HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE for bounded cardinality)
  • Returns 503 Service Unavailable with Retry-After: 30 header and "Request timed out. The operation may still be completing in the background."
  • Spring resolves @ExceptionHandler by most-specific type, so this takes priority over the RuntimeException catch-all

SpringStandardPluginConfiguration

  • Pass MetricUtils to propertyDefinitionDeleteSideEffect() bean

DeleteEntityServiceFactory

  • Add @Autowired(required = false) @Nullable MetricUtils and pass to constructor

Code Flow & Failure Analysis

Flow 1: deleteReferencesTo (Graph-based reference deletion)

 GraphQL/REST API  →  Delete Entity Endpoint
         │
         ▼
 DeleteEntityService.deleteReferencesTo(opCtx, urn, dryRun=false)
  │
  ├─[1] deleteFileReferences(urn)          ← NO cascade context (not instrumented)
  │      └─ FAILURE: log.error + throw RuntimeException → aborts entire method
  │
  ├─[2] deleteSearchReferences(urn)        ← NO cascade context (not instrumented)
  │      └─ FAILURE: log.error per asset, continues to next asset
  │
  ├─[3] scrollRelatedEntities(initial)     ← Gets first batch + numResults estimate
  │
  └─[4] CascadeOperationContext.begin(metricUtils, "deleteReferencesTo", urn, numResults)
         │  ● manageMDC=true, saves previous MDC, try-with-resources
         │
         └─ do-while scroll loop:
              ├─ per entity: deleteReference(opCtx, urn, entity, cascade)
              │    ├─ getAspects → FAILURE: handleError → cascade.recordError (continues)
              │    ├─ clone fail → cascade.recordError("clone_failed") (skip aspect)
              │    ├─ deleteAspect/updateAspect fail → cascade.recordError (continues)
              │    └─ cascade.recordEntityProcessed()
              ├─ scrollId == null → break
              └─ next batch via scrollRelatedEntities(nextScrollId)
         │
         └─ close(): metrics + conditional log + restore MDC

Flow 2: PropertyDefinitionDeleteSideEffect (SP value removal)

 SP hard-delete → EntityServiceImpl.processPostCommitMCLSideEffects
  │
  └─ try (Stream sideEffectStream = applyPostMCPSideEffects(...))  ← resource leak fix
      └─ PropertyDefinitionDeleteSideEffect.generatePatchMCPs(...)
          ├─ CascadeOperationContext.beginWithoutMDC(...)   ← cross-thread safe
          └─ stream: per entity → recordEntityProcessed + attachToSystemMetadata
                     .onClose(cascade::close) → metrics + log
      │
      └─ FAILURE in ingestProposalAsync → try-with-resources closes stream
                                         → onClose fires → partial metrics emitted

Flow 3: Kafka Consumer Cascade ID Propagation

 Kafka MCL → AbstractKafkaListener.consume()
  ├─ setMDCContext(event)
  ├─ if systemMetadata.properties["cascadeOperationId"] → MDC.put(...)
  ├─ processWithHooks(event)  ← all log lines now include cascade ID
  └─ finally { MDC.clear() }

Flow 4: Async Timeout

 HTTP async timeout → GlobalControllerExceptionHandler.handleAsyncTimeout()
  ├─ log.WARN + metric: datahub.http.async_timeout
  └─ return 503 + Retry-After: 30
     (cascade continues on backend thread → cascade.close() fires later)

Failure Mode Summary

Failure Point Observable? Cascade Continues?
File S3 delete fails Yes (log) No — aborts method
Search reference update fails Yes (log) Yes
Entity concurrently deleted Silent Yes
Aspect spec not found Yes (metric + WARN) Yes
Aspect clone fails Yes (metric + WARN) Yes — skips aspect
MCP ingest fails Yes (metric + WARN) Yes
ingestProposalAsync throws (SP) Partial metrics No — aborted
Spring async timeout Yes (503 + metric) Yes — backend continues
Metric emission fails log.debug only N/A

Key invariant: Observability code never breaks the cascade. close() is wrapped in try { metrics } catch { swallow } finally { MDC restore }.


Metrics Summary

Metric Type Tags Emitted by
datahub.cascade.duration Timer operation_type, trigger_urn_type, status CascadeOperationContext.close()
datahub.cascade.entities_processed Counter operation_type, trigger_urn_type CascadeOperationContext.close()
datahub.cascade.errors Counter operation_type, trigger_urn_type, error_type CascadeOperationContext.close()
datahub.http.async_timeout Counter request_path GlobalControllerExceptionHandler

Log Verbosity

Follows the guidelines from #16577 and #16578:

  • No per-entity logging — entity counts go to metrics only
  • DEBUG for cascade start and fast completion
  • INFO only for slow cascades (>5s threshold, same pattern as RestliLoggingFilter)
  • WARN only for actual errors or async timeouts

Tests

  • CascadeOperationContextTest (13 tests): MDC lifecycle, null MetricUtils safety, SystemMetadata null-safety and existing properties, close-never-throws, error recording, unique operation IDs, nested cascade MDC save/restore, beginWithoutMDC skips MDC, metric value assertions (entities_processed counter, duration timer, error counter with status tags)
  • GlobalControllerExceptionHandlerTest (4 new tests): 503 status + Retry-After header, metric emission with MetricUtils, null MetricUtils safety, null pattern attribute fallback to "unknown"
  • DeleteEntityServiceTest: All 10 existing tests pass with updated constructor (MetricUtils = null)

Follow-up Tasks (Separate PRs)

Observability Gaps

Task Description Effort
Instrument deleteSearchReferences / deleteFileReferences Called before cascade context opens; no observability for these phases of deleteReferencesTo Low
Propagate cascade ID in deleteReferencesTo MCPs from graph-ref deletions don't carry cascadeOperationId to Kafka consumers (only PropertyDefinitionDeleteSideEffect path does) Medium

Timeout Fix (Root Cause)

Task Description Effort
Async cascade for SP deletion Return 202 Accepted immediately, run PropertyDefinitionDeleteSideEffect asynchronously. This is the real fix for the 500/503 timeout — the current PR only makes the timeout visible and correctly reported. High

Lineage Graph Traversal Observability

Production incidents on large-scale instances have shown that lineage BFS traversal (GraphQueryBaseDAO.searchWithSlices) suffers from the same observability gap as deletion cascades. When a high-fan-out entity triggers a lineage query with many slices, failures cascade through multiple phases (PIT context expiration → HTTP connection pool saturation → unrelated queries failing), and operators must manually reconstruct the timeline from raw GMS logs with no correlation IDs.

The CascadeOperationContext pattern from this PR (metrics + MDC + operation ID + conditional logging) could be applied to lineage graph traversal to provide structured observability for these operations.

Task Description Effort
Apply operation tracking to searchWithSlices Add a lightweight operation context to GraphQueryBaseDAO.searchWithSlices so each lineage BFS traversal gets a unique operation ID, duration/entity-count metrics, and MDC correlation. This would allow operators to trace the full lifecycle (PIT creation → slice execution → timeout → cleanup → partial results) with a single query. Medium
Resource bounding for high-fan-out operations Both deletion cascades and lineage traversals can saturate the OpenSearch HTTP connection pool when the number of concurrent operations (scroll batches, PIT slices) exceeds the pool's maxConnPerRoute. Consider bounded parallelism or backpressure to prevent a single fan-out operation from starving unrelated queries. Related: #16808 fixed the PIT race condition and no-op timeout that caused orphaned slices to hold connections indefinitely. High
Treat exceptions as partial results in graph traversal When a slice fails due to a transient error (e.g., search_context_missing_exception after PIT cleanup), the current behavior either throws (blocking the response) or silently swallows the error. Instead, failed slices could set partialResults=true and return whatever was collected, consistent with how time-budget exhaustion and max-relations limits already behave. Medium

Checklist

  • All affected modules compile (metadata-utils, metadata-io, metadata-service/services, metadata-service/factories, metadata-service/openapi-servlet, metadata-jobs/common)
  • All existing tests pass
  • New tests for CascadeOperationContext (13 tests including nesting, beginWithoutMDC, metric value assertions)
  • New tests for GlobalControllerExceptionHandler (4 tests for async timeout handler)
  • Spotless formatting applied
  • Zero new dependencies
  • Backward compatible (MetricUtils is nullable/optional everywhere)
  • Rebased on master (includes fix(delete-refs): fix cascade stall when deleting entities referenced by >5000 entities #16601 scroll-based pagination fix)

@github-actions github-actions Bot added product PR or Issue related to the DataHub UI/UX devops PR or Issue related to DataHub backend & deployment community-contribution PR or Issue raised by member(s) of DataHub Community labels Mar 15, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 15, 2026

Bundle Report

Bundle size has no change ✅

@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 15, 2026

Codecov Report

❌ Patch coverage is 65.03067% with 57 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
.../linkedin/metadata/entity/DeleteEntityService.java 0.00% 38 Missing ⚠️
...om/linkedin/metadata/entity/EntityServiceImpl.java 40.00% 9 Missing ⚠️
...etadata/utils/metrics/CascadeOperationContext.java 88.15% 5 Missing and 4 partials ⚠️
...gms/factory/entity/DeleteEntityServiceFactory.java 0.00% 1 Missing ⚠️

❌ Your patch status has failed because the patch coverage (65.03%) is below the target coverage (75.00%). You can increase the patch coverage or adjust the target coverage.

📢 Thoughts on this report? Let us know!

@max-datahub max-datahub force-pushed the feat/cascade-operation-observability branch from 0b7f91e to 040c461 Compare March 20, 2026 09:16
@max-datahub max-datahub marked this pull request as ready for review March 20, 2026 09:26
@github-actions
Copy link
Copy Markdown
Contributor

Linear: PFP-3052

Thanks for your contribution! We have created an internal ticket to track this PR. A member of the core DataHub team will be assigned to review it within the next few business days - you will get a follow-up comment once a reviewer is assigned.

…-out deletions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@max-datahub max-datahub force-pushed the feat/cascade-operation-observability branch from 040c461 to 61428a0 Compare March 20, 2026 09:38
@maggiehays maggiehays added the needs-review Label for PRs that need review from a maintainer. label Mar 20, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Your PR has been assigned to @david-leifker (david.leifker) for review (PFP-3052).

@github-actions github-actions Bot requested a review from david-leifker March 24, 2026 06:07
Copy link
Copy Markdown
Collaborator

@david-leifker david-leifker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment about duplicate close to consider but non-blocking. The only other thought was the 503 response may be better as a 504 if we interpret it as a more general timeout, again not a blocker and debate-able about whether it makes any more sense for the failure type.

@maggiehays maggiehays added merge-pending-ci A PR that has passed review and should be merged once CI is green. and removed needs-review Label for PRs that need review from a maintainer. labels Mar 30, 2026
max-datahub and others added 2 commits March 31, 2026 09:54
…ion-observability

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add AtomicBoolean guard so that calling close() multiple times does not
double-emit timers, counters, or log lines. Only the first invocation
performs metrics emission and MDC cleanup; subsequent calls are no-ops.

Addresses review feedback from @david-leifker.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@max-datahub max-datahub enabled auto-merge (squash) March 31, 2026 07:57
@max-datahub max-datahub merged commit 9c08f53 into master Mar 31, 2026
47 of 48 checks passed
@max-datahub max-datahub deleted the feat/cascade-operation-observability branch March 31, 2026 08:29
max-datahub added a commit that referenced this pull request Mar 31, 2026
… phases and propagate cascade ID

Extends the CascadeOperationContext introduced in #16592 to cover all
three deletion phases in deleteReferencesTo(), not just the graph-ref
scroll loop:

- Move CascadeOperationContext.begin() to wrap file refs, search refs,
  and graph refs under a single cascade operation ID
- Thread cascade through deleteFileReferences() and
  deleteSearchReferences() for entity tracking and error recording
- Attach cascade operation ID to SystemMetadata on MCPs generated by
  updateAspect() and deleteSearchReferences for downstream Kafka
  correlation
- Extract dry-run path into deleteReferencesToDryRun() for clarity

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
max-datahub added a commit that referenced this pull request Mar 31, 2026
… phases and propagate cascade ID

Extends the CascadeOperationContext introduced in #16592 to cover all
three deletion phases in deleteReferencesTo(), not just the graph-ref
scroll loop:

- Move CascadeOperationContext.begin() to wrap file refs, search refs,
  and graph refs under a single cascade operation ID
- Thread cascade through deleteFileReferences() and
  deleteSearchReferences() for entity tracking and error recording
- Attach cascade operation ID to SystemMetadata on MCPs generated by
  updateAspect() and deleteSearchReferences for downstream Kafka
  correlation
- Extract dry-run path into deleteReferencesToDryRun() for clarity

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution PR or Issue raised by member(s) of DataHub Community devops PR or Issue related to DataHub backend & deployment merge-pending-ci A PR that has passed review and should be merged once CI is green. product PR or Issue related to the DataHub UI/UX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants