Add PerTaskPlanTransformer that allows per-task plan specialization#410
Add PerTaskPlanTransformer that allows per-task plan specialization#410shehab-ali wants to merge 1 commit intodatafusion-contrib:mainfrom
Conversation
| /// When no transformer is present, `shared_plan_proto` holds the pre-serialized bytes. | ||
| plan: Arc<dyn ExecutionPlan>, | ||
| /// Pre-serialized plan bytes shared across all tasks (used when no transformer is set). | ||
| shared_plan_proto: Option<Vec<u8>>, |
There was a problem hiding this comment.
this is the alternative when the PerTaskPlanTransformer is not set which was previously plan_proto
| /// .with_distributed_per_task_plan_transformer(MyTransformer) | ||
| /// .build(); | ||
| /// ``` | ||
| pub trait PerTaskPlanTransformer: Debug + Send + Sync { |
There was a problem hiding this comment.
I'd advocate towards not introducing a new abstraction for delivering this functionality. I'm afraid that if each little operation is modeled as a new trait, we are at risk of ending up with too many overlapping and conflicting extension points in this project.
For understanding what's a good place for delivering this functionality, I'd put my eyes on #378. I think the same place that allows users to provide a specific Worker URL given a plan and a task index, is probably the same place that should allow returning the modified plan along the URL to which it needs to go.
@JSOD11 is trying this here #409. I'd coordinate with him and come up with a solution that is good for holding both things:
- Given a plan and a task index, return the URL to which it needs to go
- Given a plan and a task index, return the modified plan with unnecessary files trimmed
Description
Previously, the coordinator serialized the same plan bytes and sent them to every worker task. This PR introduces a hook that allows the coordinator to produce distinct serialized plan bytes per task before sending.
The problem with creating the same serialized plan for all tasks is that each task ends up receiving all the file groups in the query even the ones it is not assigned to execute on causing unnecessary network calls & memory usage for each task.
This provides the infrastructure that makes coordinator-side per-task plan specialization possible. It doesn't implement any file group logic itself; it just creates the hook.
Motivation
If you have 1000 file groups across 10 tasks, each worker previously received metadata for all 1000 files — now it receives ~100. For large scans with many small files (the common metrics query pattern), that's a meaningful reduction in both wire bytes and worker memory.
Changes
Before: the coordinator serialized the plan once into a single Vec and sent the same bytes to every worker. There was no way to intercept that serialization to customize the plan per task.
After: the coordinator checks if a PerTaskPlanTransformer is registered. If one is, instead of serializing once and cloning, it calls transform_for_task(plan, task_index, task_count) for each task and serializes the result independently. Each worker gets its own distinct plan bytes.
New trait:
PerTaskPlanTransformer(src/execution_plans/per_task_plan_transformer.rs)A trait that the coordinator calls once per task before serialization:
Stored in
DistributedConfigvia aPerTaskPlanTransformerExtensionwrapper.Refactor
CoordinatorToWorkerTaskSpawner(src/execution_plans/distributed.rs)The spawner now holds the original stage plan alongside a
transformer: Option<&dyn PerTaskPlanTransformer>:transform_for_task(plan, i, task_count)per task and serializes the result, and each worker receives its own specialized plan bytes.Follow Up
https://github.com/DataDog/dd-source/pull/416389
After this is merged, I will update the distribution manager and task estimator in dd-source to use the new
PerTaskPlanTransformerwhich would help us build the optimization per plan that would lead to savings on:Network bandwidth: Each worker receives a smaller plan proto. A FileGroup contains per-file metadata (paths, sizes, partition values, statistics). With N tasks, each worker's plan carries roughly 1/N of that metadata instead of all of it.
Worker memory: Workers don't hold file metadata for partitions they'll never read.
Disk cache:. Workers won't issue reads that populate cache entries for files they don't own, which was the original motivation.
Worker startup latency: Without coordinator specialization, LazyFileGroupSpecializerExec would do a downcast + plan rebuild on the first execute() call. With the coordinator handling it upfront, that work is gone.
Note: we don't save on