feat(observability): add cascade operation observability for high-fan-out deletions#16592
Conversation
Bundle ReportBundle size has no change ✅ |
Codecov Report❌ Patch coverage is ❌ Your patch status has failed because the patch coverage (65.03%) is below the target coverage (75.00%). You can increase the patch coverage or adjust the target coverage. 📢 Thoughts on this report? Let us know! |
0b7f91e to
040c461
Compare
|
Linear: PFP-3052 Thanks for your contribution! We have created an internal ticket to track this PR. A member of the core DataHub team will be assigned to review it within the next few business days - you will get a follow-up comment once a reviewer is assigned. |
…-out deletions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
040c461 to
61428a0
Compare
|
Your PR has been assigned to @david-leifker (david.leifker) for review (PFP-3052). |
david-leifker
left a comment
There was a problem hiding this comment.
One comment about duplicate close to consider but non-blocking. The only other thought was the 503 response may be better as a 504 if we interpret it as a more general timeout, again not a blocker and debate-able about whether it makes any more sense for the failure type.
…ion-observability Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add AtomicBoolean guard so that calling close() multiple times does not double-emit timers, counters, or log lines. Only the first invocation performs metrics emission and MDC cleanup; subsequent calls are no-ops. Addresses review feedback from @david-leifker. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… phases and propagate cascade ID Extends the CascadeOperationContext introduced in #16592 to cover all three deletion phases in deleteReferencesTo(), not just the graph-ref scroll loop: - Move CascadeOperationContext.begin() to wrap file refs, search refs, and graph refs under a single cascade operation ID - Thread cascade through deleteFileReferences() and deleteSearchReferences() for entity tracking and error recording - Attach cascade operation ID to SystemMetadata on MCPs generated by updateAspect() and deleteSearchReferences for downstream Kafka correlation - Extract dry-run path into deleteReferencesToDryRun() for clarity Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… phases and propagate cascade ID Extends the CascadeOperationContext introduced in #16592 to cover all three deletion phases in deleteReferencesTo(), not just the graph-ref scroll loop: - Move CascadeOperationContext.begin() to wrap file refs, search refs, and graph refs under a single cascade operation ID - Thread cascade through deleteFileReferences() and deleteSearchReferences() for entity tracking and error recording - Attach cascade operation ID to SystemMetadata on MCPs generated by updateAspect() and deleteSearchReferences for downstream Kafka correlation - Extract dry-run path into deleteReferencesToDryRun() for clarity Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Cascade operations (one mutation triggering writes to many entities) currently have zero observability. When
deleteReferencesToprocesses 8,000 entities orPropertyDefinitionDeleteSideEffectscrolls through 10,000 SP assignments, there are no metrics, no MDC correlation, no completion logging, and errors are silently swallowed.This PR adds a reusable
CascadeOperationContexthelper and applies it to the two highest-risk cascade paths, using only existing infrastructure (Micrometer, SLF4J MDC,SystemMetadata).Note: This PR does not fix the underlying timeout — deleting a structured property applied to >5K entities will still return 503 (previously a silent 500). The cascade continues executing on the backend and completes. The fix is to make this timeout visible, correctly reported, and measurable. Preventing the timeout entirely requires async cascade processing (see Follow-up section).
Motivation
Stress testing DataHub at 10K entity scale revealed two critical observability gaps:
deleteReferencesTo(19 callers — tag, domain, term, group, user, assertion, form, SP, business attribute, etc.): ThehandleError()method was a NO-OP, silently discarding all per-entity errors. No metrics on cascade duration, entity count, or error count. No way to detect stalls or partial completion.PropertyDefinitionDeleteSideEffect: Runs synchronously in the GMS request thread. At >5K entities, Spring's async timeout fires and returns HTTP 500 — but the cascade continues executing on the backend and completes silently. GMS logs nothing about this timeout. No metrics on cascade progress.Changes
New:
CascadeOperationContext(metadata-utils)A lightweight
AutoCloseableutility that any cascade code path can use:Two factory methods for different threading models:
begin()— with MDC management. Use in try-with-resources on a single thread. Saves and restores previous MDC values on close, supporting nested cascade contexts.beginWithoutMDC()— without MDC management. Use for stream-based callers wherebegin()andclose()may run on different threads (e.g.,.onClose(cascade::close)). Metrics and logging still work; only thread-local MDC is skipped.On
begin()/beginWithoutMDC():manageMDC=true: saves previous MDC state, setscascade.operation.id,cascade.trigger.urn,cascade.operation.typeOn
close():System.nanoTime()call (reused for both metric and log)datahub.cascade.duration(timer),datahub.cascade.entities_processed(counter),datahub.cascade.errors(counter)operation_type,trigger_urn_type(entity type from URN, not full URN — bounded cardinality),status,error_typemanageMDC=true: restores previous MDC values (or removes if none existed)attachToSystemMetadata(): SetscascadeOperationIdinSystemMetadata.propertiesfor cross-service correlation. Null-safe (creates properties map if null, handles null SystemMetadata).Public constants: MDC keys (
MDC_CASCADE_OPERATION_ID,MDC_CASCADE_TRIGGER_URN,MDC_CASCADE_OPERATION_TYPE) and SystemMetadata property key (SYSTEM_METADATA_CASCADE_ID_KEY) arepublic static finalfor use by Kafka consumers, avoiding hard-coded string drift.DeleteEntityService@Nullable MetricUtils(new constructor param, optional likeS3Util)deleteReferencesTo()scroll loop inCascadeOperationContext(with MDC, try-with-resources)scrollResult.getNumResults()as estimated totalcascade.recordEntityProcessed()per entity inside theforEachlambdahandleError(error, cascade)that records errors in cascade contexthandleErrorcall sites within the cascade flow updated to two-arg versiongetAspects()overloaded: 4-arg version for dry-run path, 5-arg with cascade for deletion pathPropertyDefinitionDeleteSideEffect@Nullable MetricUtilsfield (Lombok setter, chained)staticfromgeneratePatchMCPs()andgeneratePatchRemove()(bothprivate, no external API change)CascadeOperationContextviabeginWithoutMDC()ingeneratePatchMCPs()— avoids thread-safety issues since the stream may be consumed on a different thread than it was created on.onClose(cascade::close)ensures metrics/logging fire even on partial consumptionEntityServiceImpl(fixes pre-existing stream resource leak)processPostCommitMCLSideEffects()stream in try-with-resourcesapplyPostMCPSideEffects()stream (not the re-wrappedIterators.partitionstream) to ensureonClose()firesPropertyDefinitionDeleteSideEffect's cascade contextclose()fires in all cases (normal completion, partial consumption, or exception)AbstractKafkaListener(consumer-side MDC)setMDCContext(event), extractcascadeOperationIdfromSystemMetadata.propertiesinto MDCCascadeOperationContext(not hard-coded strings)MDC.clear()infinallyblockGlobalControllerExceptionHandler@ExceptionHandler(AsyncRequestTimeoutException.class)datahub.http.async_timeoutcounter metric (path normalized viaHandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTEfor bounded cardinality)Retry-After: 30header and"Request timed out. The operation may still be completing in the background."@ExceptionHandlerby most-specific type, so this takes priority over theRuntimeExceptioncatch-allSpringStandardPluginConfigurationMetricUtilstopropertyDefinitionDeleteSideEffect()beanDeleteEntityServiceFactory@Autowired(required = false) @Nullable MetricUtilsand pass to constructorCode Flow & Failure Analysis
Flow 1:
deleteReferencesTo(Graph-based reference deletion)Flow 2:
PropertyDefinitionDeleteSideEffect(SP value removal)Flow 3: Kafka Consumer Cascade ID Propagation
Flow 4: Async Timeout
Failure Mode Summary
ingestProposalAsyncthrows (SP)log.debugonlyKey invariant: Observability code never breaks the cascade.
close()is wrapped intry { metrics } catch { swallow } finally { MDC restore }.Metrics Summary
datahub.cascade.durationoperation_type,trigger_urn_type,statusCascadeOperationContext.close()datahub.cascade.entities_processedoperation_type,trigger_urn_typeCascadeOperationContext.close()datahub.cascade.errorsoperation_type,trigger_urn_type,error_typeCascadeOperationContext.close()datahub.http.async_timeoutrequest_pathGlobalControllerExceptionHandlerLog Verbosity
Follows the guidelines from #16577 and #16578:
RestliLoggingFilter)Tests
CascadeOperationContextTest(13 tests): MDC lifecycle, null MetricUtils safety, SystemMetadata null-safety and existing properties, close-never-throws, error recording, unique operation IDs, nested cascade MDC save/restore, beginWithoutMDC skips MDC, metric value assertions (entities_processed counter, duration timer, error counter with status tags)GlobalControllerExceptionHandlerTest(4 new tests): 503 status + Retry-After header, metric emission with MetricUtils, null MetricUtils safety, null pattern attribute fallback to "unknown"DeleteEntityServiceTest: All 10 existing tests pass with updated constructor (MetricUtils = null)Follow-up Tasks (Separate PRs)
Observability Gaps
deleteSearchReferences/deleteFileReferencesdeleteReferencesTodeleteReferencesTocascadeOperationIdto Kafka consumers (onlyPropertyDefinitionDeleteSideEffectpath does)Timeout Fix (Root Cause)
PropertyDefinitionDeleteSideEffectasynchronously. This is the real fix for the 500/503 timeout — the current PR only makes the timeout visible and correctly reported.Lineage Graph Traversal Observability
Production incidents on large-scale instances have shown that lineage BFS traversal (
GraphQueryBaseDAO.searchWithSlices) suffers from the same observability gap as deletion cascades. When a high-fan-out entity triggers a lineage query with many slices, failures cascade through multiple phases (PIT context expiration → HTTP connection pool saturation → unrelated queries failing), and operators must manually reconstruct the timeline from raw GMS logs with no correlation IDs.The
CascadeOperationContextpattern from this PR (metrics + MDC + operation ID + conditional logging) could be applied to lineage graph traversal to provide structured observability for these operations.searchWithSlicesGraphQueryBaseDAO.searchWithSlicesso each lineage BFS traversal gets a unique operation ID, duration/entity-count metrics, and MDC correlation. This would allow operators to trace the full lifecycle (PIT creation → slice execution → timeout → cleanup → partial results) with a single query.maxConnPerRoute. Consider bounded parallelism or backpressure to prevent a single fan-out operation from starving unrelated queries. Related: #16808 fixed the PIT race condition and no-op timeout that caused orphaned slices to hold connections indefinitely.search_context_missing_exceptionafter PIT cleanup), the current behavior either throws (blocking the response) or silently swallows the error. Instead, failed slices could setpartialResults=trueand return whatever was collected, consistent with how time-budget exhaustion and max-relations limits already behave.Checklist
metadata-utils,metadata-io,metadata-service/services,metadata-service/factories,metadata-service/openapi-servlet,metadata-jobs/common)CascadeOperationContext(13 tests including nesting, beginWithoutMDC, metric value assertions)GlobalControllerExceptionHandler(4 tests for async timeout handler)