Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d12899f
Task estimator interface update.
JSOD11 Apr 17, 2026
19c5be8
Merge branch 'main' into jsod/routing-04-17-26
JSOD11 Apr 20, 2026
63e38e8
Progress.
JSOD11 Apr 22, 2026
7cefb92
Compiles.
JSOD11 Apr 23, 2026
a41a2db
Urls added to leaves according to plan_leaf_node, and tasks routed ac…
JSOD11 Apr 23, 2026
f3b51aa
Add custom routing to integration test.
JSOD11 Apr 23, 2026
6f3b7a3
Routing works and is verified in integration test.
JSOD11 Apr 23, 2026
959d487
Tolerate failure / graceful fallback.
JSOD11 Apr 23, 2026
d3a2356
Restructuring task distribution.
JSOD11 Apr 28, 2026
f43c5c2
Merge branch 'main' into jsod/routing-04-17-26
JSOD11 Apr 28, 2026
fc4aa28
Restructure interface to route at execution-time, avoiding the planni…
JSOD11 May 1, 2026
d11c1bb
Merge branch 'main' into jsod/routing-04-17-26
JSOD11 May 2, 2026
9f74e80
Setting up new testing scaffolding.
JSOD11 May 4, 2026
0cdf1ff
Test distributes query with custom exec node / codec.
JSOD11 May 4, 2026
052b25c
Custom exec node emits URLs in test results.
JSOD11 May 4, 2026
5655359
Basic test passes and verifies routing works.
JSOD11 May 5, 2026
522b4cc
Remove DistributedPlan struct.
JSOD11 May 5, 2026
0874a4a
URL emitter takes partition and task count as arguments.
JSOD11 May 5, 2026
b0e963f
Adding more tests.
JSOD11 May 5, 2026
954e80b
Remove out of scope API changes.
JSOD11 May 5, 2026
f2ba155
Remove unnecessary refactor.
JSOD11 May 5, 2026
0c35d00
Remove URL from DistributedTaskCtx in favor of LocalWorkerContext.
JSOD11 May 5, 2026
ce39d59
Merge branch 'main' into jsod/routing-04-17-26
JSOD11 May 5, 2026
5c7d3c5
Tests work with URL coming from LocalWorkerContext.
JSOD11 May 5, 2026
9434229
Add TaskRoutingContext, in memory resolver with static URL generation…
JSOD11 May 5, 2026
ff0e293
Adding tests.
JSOD11 May 5, 2026
98ddc9d
Applying changes from feedback.
JSOD11 May 6, 2026
46e8960
Add doc comments.
JSOD11 May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions examples/custom_execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use datafusion_distributed::test_utils::in_memory_channel_resolver::{
InMemoryChannelResolver, InMemoryWorkerResolver,
};
use datafusion_distributed::{
DistributedExt, DistributedTaskContext, SessionStateBuilderExt, TaskEstimation, TaskEstimator,
WorkerQueryContext, display_plan_ascii,
DistributedExt, DistributedPlan, DistributedTaskContext, SessionStateBuilderExt,
TaskEstimation, TaskEstimator, WorkerQueryContext, display_plan_ascii,
};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::protobuf;
Expand Down Expand Up @@ -314,22 +314,28 @@ impl TaskEstimator for NumbersTaskEstimator {
&self,
plan: &Arc<dyn ExecutionPlan>,
cfg: &datafusion::config::ConfigOptions,
) -> Option<TaskEstimation> {
let plan = plan.as_any().downcast_ref::<NumbersExec>()?;
let cfg: &NumbersConfig = cfg.extensions.get()?;
) -> Result<Option<TaskEstimation>> {
let Some(plan) = plan.as_any().downcast_ref::<NumbersExec>() else {
return Ok(None);
};
let Some(cfg) = cfg.extensions.get::<NumbersConfig>() else {
return Ok(None);
};
let task_count = (plan.ranges_per_task[0].end - plan.ranges_per_task[0].start) as f64
/ cfg.numbers_per_task as f64;

Some(TaskEstimation::desired(task_count.ceil() as usize))
Ok(Some(TaskEstimation::desired(task_count.ceil() as usize)))
}

fn scale_up_leaf_node(
fn distribute_plan(
&self,
plan: &Arc<dyn ExecutionPlan>,
task_count: usize,
_cfg: &datafusion::config::ConfigOptions,
) -> Option<Arc<dyn ExecutionPlan>> {
let plan = plan.as_any().downcast_ref::<NumbersExec>()?;
) -> Result<Option<DistributedPlan>> {
let Some(plan) = plan.as_any().downcast_ref::<NumbersExec>() else {
return Ok(None);
};
let range = &plan.ranges_per_task[0];
let chunk_size = ((range.end - range.start) as f64 / task_count as f64).ceil() as i64;

Expand All @@ -339,7 +345,17 @@ impl TaskEstimator for NumbersTaskEstimator {
start..end
});

Some(Arc::new(NumbersExec::new(ranges_per_task, plan.schema())))
let plan: Arc<dyn ExecutionPlan> =
Arc::new(NumbersExec::new(ranges_per_task, plan.schema()));
Ok(Some(DistributedPlan::from_plan(plan)))
}

fn route_tasks(
&self,
_tasks: Vec<datafusion_distributed::ExecutionTask>,
_urls: &[url::Url],
) -> Result<Option<Vec<url::Url>>> {
Ok(None)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/children_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ where
{
let children = children.as_ref();
if children.len() != 1 {
return plan_err!("Expected exactly 1 children, got {}", children.len());
return plan_err!("Expected exactly 1 child, got {}", children.len());
}
Ok(children[0].borrow().clone())
}
42 changes: 22 additions & 20 deletions src/distributed_planner/distribute_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::distributed_planner::plan_annotator::{
AnnotatedPlan, PlanOrNetworkBoundary, annotate_plan,
};
use crate::{
DistributedConfig, NetworkBoundaryExt, NetworkBroadcastExec, NetworkCoalesceExec,
NetworkShuffleExec, TaskEstimator,
DistributedConfig, DistributedPlan, NetworkBoundaryExt, NetworkBroadcastExec,
NetworkCoalesceExec, NetworkShuffleExec, TaskEstimator,
};
use datafusion::common::DataFusionError;
use datafusion::common::tree_node::TreeNode;
Expand Down Expand Up @@ -63,7 +63,7 @@ pub(super) async fn distribute_plan(

// Based on the annotations, place the actual network boundaries with the appropriate dimensions.
let mut stage_id = 1;
let plan = _distribute_plan(annotated, cfg, Uuid::new_v4(), &mut stage_id)?;
let plan = _distribute_plan(annotated, cfg, Uuid::new_v4(), &mut stage_id)?.plan();
if stage_id == 1 {
return Ok(None);
}
Expand All @@ -86,80 +86,82 @@ fn _distribute_plan(
cfg: &ConfigOptions,
query_id: Uuid,
stage_id: &mut usize,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
) -> Result<DistributedPlan, DataFusionError> {
Comment thread
JSOD11 marked this conversation as resolved.
Outdated
let d_cfg = DistributedConfig::from_config_options(cfg)?;
let children = annotated_plan.children;
let task_count = annotated_plan.task_count.as_usize();
let max_child_task_count = children.iter().map(|v| v.task_count.as_usize()).max();
let new_children = children
.into_iter()
.map(|child| _distribute_plan(child, cfg, query_id, stage_id))
.map(|child| child.map(|child| child.plan()))
.collect::<Result<Vec<_>, _>>()?;

match annotated_plan.plan_or_nb {
// This is a leaf node. It needs to be scaled up in order to account for it running in
// multiple tasks.
PlanOrNetworkBoundary::Plan(plan) if plan.children().is_empty() => {
let scaled_up = d_cfg.__private_task_estimator.scale_up_leaf_node(
&plan,
annotated_plan.task_count.as_usize(),
cfg,
);
Ok(scaled_up.unwrap_or(plan))
let distributed_plan = d_cfg
.__private_task_estimator
.distribute_plan(&plan, task_count, cfg)?;
Ok(distributed_plan.unwrap_or(DistributedPlan::from_plan(plan)))
}
// This is a normal intermediate plan, just pass it through with the mapped children.
PlanOrNetworkBoundary::Plan(plan) => plan.with_new_children(new_children),
PlanOrNetworkBoundary::Plan(plan) => Ok(DistributedPlan::from_plan(
plan.with_new_children(new_children)?,
)),
// This is a shuffle, so inject a NetworkShuffleExec here in the plan.
PlanOrNetworkBoundary::Shuffle => {
// It would need a network boundary, but on both sides of the boundary there is just 1 task,
// so we are fine with not introducing any network boundary.
if task_count == 1 && max_child_task_count == Some(1) {
return require_one_child(new_children);
return Ok(DistributedPlan::from_plan(require_one_child(new_children)?));
}
let node = Arc::new(NetworkShuffleExec::try_new(
let node: Arc<dyn ExecutionPlan> = Arc::new(NetworkShuffleExec::try_new(
require_one_child(new_children)?,
query_id,
*stage_id,
task_count,
max_child_task_count.unwrap_or(1),
)?);
stage_id.add_assign(1);
Ok(node)
Ok(DistributedPlan::from_plan(node))
}
// DataFusion is trying to coalesce multiple partitions into one, so we should do the
// same with tasks.
PlanOrNetworkBoundary::Coalesce => {
// It would need a network boundary, but on both sides of the boundary there is just 1 task,
// so we are fine with not introducing any network boundary.
if task_count == 1 && max_child_task_count == Some(1) {
return require_one_child(new_children);
return Ok(DistributedPlan::from_plan(require_one_child(new_children)?));
}
let node = Arc::new(NetworkCoalesceExec::try_new(
let node: Arc<dyn ExecutionPlan> = Arc::new(NetworkCoalesceExec::try_new(
require_one_child(new_children)?,
query_id,
*stage_id,
task_count,
max_child_task_count.unwrap_or(1),
)?);
stage_id.add_assign(1);
Ok(node)
Ok(DistributedPlan::from_plan(node))
}
// This is a CollectLeft HashJoinExec with the build side marked as being broadcast. we
// need to insert a NetworkBroadcastExec and scale up the BroadcastExec consumer_tasks.
PlanOrNetworkBoundary::Broadcast => {
// It would need a network boundary, but on both sides of the boundary there is just 1 task,
// so we are fine with not introducing any network boundary.
if task_count == 1 && max_child_task_count == Some(1) {
return require_one_child(new_children);
return Ok(DistributedPlan::from_plan(require_one_child(new_children)?));
}
let node = Arc::new(NetworkBroadcastExec::try_new(
let node: Arc<dyn ExecutionPlan> = Arc::new(NetworkBroadcastExec::try_new(
require_one_child(new_children)?,
query_id,
*stage_id,
task_count,
max_child_task_count.unwrap_or(1),
)?);
stage_id.add_assign(1);
Ok(node)
Ok(DistributedPlan::from_plan(node))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/distributed_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ mod task_estimator;
pub use distributed_config::DistributedConfig;
pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt};
pub use session_state_builder_ext::SessionStateBuilderExt;
pub(crate) use task_estimator::set_distributed_task_estimator;
pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator};
pub use task_estimator::{DistributedPlan, TaskCountAnnotation, TaskEstimation, TaskEstimator};
pub(crate) use task_estimator::{get_distributed_task_estimator, set_distributed_task_estimator};
61 changes: 40 additions & 21 deletions src/distributed_planner/plan_annotator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async fn _annotate_plan(
// This is a leaf node, maybe a DataSourceExec, or maybe something else custom from the
// user. We need to estimate how many tasks are needed for this leaf node, and we'll take
// this decision into account when deciding how many tasks will be actually used.
return if let Some(estimate) = estimator.task_estimation(&plan, cfg) {
return if let Some(estimate) = estimator.task_estimation(&plan, cfg)? {
Ok(AnnotatedPlan {
plan_or_nb: PlanOrNetworkBoundary::Plan(plan),
children: Vec::new(),
Expand All @@ -209,7 +209,7 @@ async fn _annotate_plan(
}

let mut task_count = estimator
.task_estimation(&plan, cfg)
.task_estimation(&plan, cfg)?
.map_or(Desired(1), |v| v.task_count);
if d_cfg.children_isolator_unions && plan.as_any().is::<UnionExec>() {
// Unions have the chance to decide how many tasks they should run on. If there's a union
Expand Down Expand Up @@ -410,11 +410,12 @@ mod tests {
BuildSideOneTaskEstimator, TestPlanOptions, base_session_builder, context_with_query,
sql_to_physical_plan,
};
use crate::{DistributedExt, TaskEstimation, TaskEstimator, assert_snapshot};
use crate::{DistributedExt, DistributedPlan, TaskEstimation, TaskEstimator, assert_snapshot};
use datafusion::config::ConfigOptions;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::filter::FilterExec;
use url::Url;
/* schema for the "weather" table

MinTemp [type=DOUBLE] [repetitiontype=OPTIONAL]
Expand Down Expand Up @@ -881,17 +882,25 @@ mod tests {
&self,
plan: &Arc<dyn ExecutionPlan>,
_: &ConfigOptions,
) -> Option<TaskEstimation> {
(self.f)(plan.as_ref())
) -> datafusion::error::Result<Option<TaskEstimation>> {
Ok((self.f)(plan.as_ref()))
}

fn scale_up_leaf_node(
fn distribute_plan(
&self,
_: &Arc<dyn ExecutionPlan>,
_: usize,
_: &ConfigOptions,
) -> Option<Arc<dyn ExecutionPlan>> {
None
_plan: &Arc<dyn ExecutionPlan>,
_task_count: usize,
_cfg: &ConfigOptions,
) -> Result<Option<DistributedPlan>> {
Ok(None)
}

fn route_tasks(
&self,
_tasks: Vec<crate::stage::ExecutionTask>,
_urls: &[Url],
) -> Result<Option<Vec<Url>>> {
Ok(None)
}
}

Expand All @@ -903,22 +912,32 @@ mod tests {
&self,
plan: &Arc<dyn ExecutionPlan>,
_: &ConfigOptions,
) -> Option<TaskEstimation> {
let coalesce = plan.as_any().downcast_ref::<CoalescePartitionsExec>()?;
) -> datafusion::error::Result<Option<TaskEstimation>> {
let Some(coalesce) = plan.as_any().downcast_ref::<CoalescePartitionsExec>() else {
return Ok(None);
};
if coalesce.input().as_any().is::<BroadcastExec>() {
Some(TaskEstimation::maximum(1))
Ok(Some(TaskEstimation::maximum(1)))
} else {
None
Ok(None)
}
}

fn scale_up_leaf_node(
fn distribute_plan(
&self,
_: &Arc<dyn ExecutionPlan>,
_: usize,
_: &ConfigOptions,
) -> Option<Arc<dyn ExecutionPlan>> {
None
_plan: &Arc<dyn ExecutionPlan>,
_task_count: usize,
_cfg: &ConfigOptions,
) -> Result<Option<crate::distributed_planner::task_estimator::DistributedPlan>> {
Ok(None)
}

fn route_tasks(
&self,
_tasks: Vec<crate::stage::ExecutionTask>,
_urls: &[url::Url],
) -> Result<Option<Vec<url::Url>>> {
Ok(None)
}
}

Expand Down
Loading
Loading