Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -1,166 +1,102 @@
use crate::common::require_one_child;
use crate::distributed_planner::inject_network_boundaries::inject_network_boundaries;
use crate::distributed_planner::insert_broadcast::insert_broadcast_execs;
use crate::distributed_planner::partial_reduce_below_network_shuffles::partial_reduce_below_network_shuffles;
use crate::distributed_planner::plan_annotator::{
AnnotatedPlan, PlanOrNetworkBoundary, annotate_plan,
};
use crate::{
DistributedConfig, NetworkBoundaryExt, NetworkBroadcastExec, NetworkCoalesceExec,
NetworkShuffleExec, TaskEstimator,
};
use datafusion::common::DataFusionError;
use crate::distributed_planner::prepare_network_boundaries::prepare_network_boundaries;
use crate::{DistributedExec, NetworkBoundaryExt};
use async_trait::async_trait;
use datafusion::common::tree_node::TreeNode;
use datafusion::config::ConfigOptions;
use datafusion::execution::SessionState;
use datafusion::execution::context::QueryPlanner;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use std::ops::AddAssign;
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use std::sync::Arc;
use uuid::Uuid;

/// Inspects the plan, places the appropriate network boundaries, and breaks it down into stages
/// that can be executed in a distributed manner.
/// Transforms a single-node physical plan into a distributed plan by injecting network
/// boundaries between stages.
///
/// It performs the following operations:
/// The pipeline runs four passes in order:
///
/// 1. It prepares the plan for distribution, adding some extra single-node nodes like
/// [BroadcastExec] or [CoalescePartitionsExec] that will signal the following steps to
/// introduce network boundaries in the appropriate places.
/// 1. **Pre-distribution shaping.** A [CoalescePartitionsExec] is wrapped on top of the plan
/// when it has more than one output partition (so [inject_network_boundaries] later sees a
/// partition-collecting parent and injects a `NetworkCoalesceExec` above its child). Then
/// [insert_broadcast_execs] adds `BroadcastExec` nodes on the build side of `CollectLeft`
/// hash joins so those build sides can later be wrapped in `NetworkBroadcastExec`.
///
/// 2. Annotate the plan with [annotate_plan]: adds some annotations to each node about how
/// many distributed tasks should be used in the stage containing them, and whether they
/// need a network boundary below or not.
/// For more information about this step, read [annotate_plan] docs.
/// 2. **Boundary injection.** [inject_network_boundaries] walks the plan, computes a task count
Comment thread
gabotechs marked this conversation as resolved.
/// for each node, and inserts `NetworkShuffleExec` / `NetworkBroadcastExec` /
/// `NetworkCoalesceExec` above the nodes that delimit a stage (hash `RepartitionExec`s,
/// build-side `BroadcastExec`s, and any node sitting under a `CoalescePartitionsExec` /
/// `SortPreservingMergeExec`).
///
/// 3. Based on the [AnnotatedPlan] returned by [annotate_plan], place all the appropriate
/// network boundaries ([NetworkShuffleExec] and [NetworkCoalesceExec]) with the task count
/// assignation that the annotations required. After this, the plan is already a distributed
/// executable plan.
/// 3. **Boundary preparation.** [prepare_network_boundaries] readies each injected boundary
Comment thread
gabotechs marked this conversation as resolved.
/// for execution: elides ones that aren't actually needed and finalises the survivors. If
/// no boundary survives, this function returns `None`.
Comment thread
gabotechs marked this conversation as resolved.
///
/// This function returns None if the plan was left undistributed.
pub(super) async fn distribute_plan(
original: Arc<dyn ExecutionPlan>,
cfg: &ConfigOptions,
) -> datafusion::common::Result<Option<Arc<dyn ExecutionPlan>>> {
// Keep this function idempotent.
if original.exists(|plan| Ok(plan.is_network_boundary()))? {
return Ok(None);
}
/// 4. **Shuffle-volume optimization.** [partial_reduce_below_network_shuffles] inserts partial
/// aggregation nodes underneath hash shuffles where it can, so less data crosses the network.
#[derive(Debug)]
pub(crate) struct DistributedQueryPlanner {
pub(crate) prev: Option<Arc<dyn QueryPlanner + Send + Sync>>,
}

let mut plan = Arc::clone(&original);
#[async_trait]
impl QueryPlanner for DistributedQueryPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let original_plan = match &self.prev {
None => {
// Use the default physical planner.
let planner = DefaultPhysicalPlanner::default();
planner
.create_physical_plan(logical_plan, session_state)
.await?
}
Some(prev) => {
prev.create_physical_plan(logical_plan, session_state)
.await?
}
};

// Add a CoalescePartitionsExec on top of the plan if necessary. The plan annotator will see
// this and will place a NetworkCoalesceExec below it.
if plan.output_partitioning().partition_count() > 1 {
plan = Arc::new(CoalescePartitionsExec::new(plan));
}
if original_plan.as_any().is::<DistributedExec>() {
return Ok(original_plan);
}

// Insert BroadcastExec nodes in collect left joins so that the plan annotator can inject
// broadcast network boundaries above.
plan = insert_broadcast_execs(plan, cfg)?;
// The plan already contains network boundaries set by the user. Just ensure they have nice
// unique identifiers for each stage, and move forward with it.
if original_plan.exists(|plan| Ok(plan.is_network_boundary()))? {
// Ensure the stages in the plan have nice unique identifiers.
let plan = prepare_network_boundaries(original_plan)?;
if !plan.exists(|plan| Ok(plan.is_network_boundary()))? {
return Ok(plan);
}
return Ok(Arc::new(DistributedExec::new(plan)));
}

// Annotate the plan with network boundary and task count information.
let annotated = annotate_plan(plan, cfg).await?;
let mut plan = Arc::clone(&original_plan);

// Based on the annotations, place the actual network boundaries with the appropriate dimensions.
let mut stage_id = 1;
let plan = _distribute_plan(annotated, cfg, Uuid::new_v4(), &mut stage_id)?;
if stage_id == 1 {
return Ok(None);
}
if plan.output_partitioning().partition_count() > 1 {
plan = Arc::new(CoalescePartitionsExec::new(plan));
}

// Insert PartialReduce aggregation nodes above hash repartitions to reduce shuffle data volume.
let plan = partial_reduce_below_network_shuffles(plan, cfg)?;
let cfg = session_state.config_options();

Ok(Some(plan))
}
plan = insert_broadcast_execs(plan, cfg)?;

/// Takes an [AnnotatedPlan] and returns a modified [ExecutionPlan] with all the network boundaries
/// appropriately placed. This step performs the following modifications to the original
/// [ExecutionPlan]:
/// - The leaf nodes are scaled up in parallelism based on the number of distributed tasks in
/// which they are going to run. This is configurable by the user via the [TaskEstimator] trait.
/// - The appropriate network boundaries are placed in the plan depending on how it was annotated,
/// so new nodes like [NetworkBroadcastExec], [NetworkCoalesceExec] and [NetworkShuffleExec] will be present.
fn _distribute_plan(
annotated_plan: AnnotatedPlan,
cfg: &ConfigOptions,
query_id: Uuid,
stage_id: &mut usize,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let d_cfg = DistributedConfig::from_config_options(cfg)?;
let children = annotated_plan.children;
let task_count = annotated_plan.task_count.as_usize();
let max_child_task_count = children.iter().map(|v| v.task_count.as_usize()).max();
let new_children = children
.into_iter()
.map(|child| _distribute_plan(child, cfg, query_id, stage_id))
.collect::<Result<Vec<_>, _>>()?;
match annotated_plan.plan_or_nb {
// This is a leaf node. It needs to be scaled up in order to account for it running in
// multiple tasks.
PlanOrNetworkBoundary::Plan(plan) if plan.children().is_empty() => {
let scaled_up = d_cfg.__private_task_estimator.scale_up_leaf_node(
&plan,
annotated_plan.task_count.as_usize(),
cfg,
);
Ok(scaled_up.unwrap_or(plan))
}
// This is a normal intermediate plan, just pass it through with the mapped children.
PlanOrNetworkBoundary::Plan(plan) => plan.with_new_children(new_children),
// This is a shuffle, so inject a NetworkShuffleExec here in the plan.
PlanOrNetworkBoundary::Shuffle => {
// It would need a network boundary, but on both sides of the boundary there is just 1 task,
// so we are fine with not introducing any network boundary.
if task_count == 1 && max_child_task_count == Some(1) {
return require_one_child(new_children);
}
let node = Arc::new(NetworkShuffleExec::try_new(
require_one_child(new_children)?,
query_id,
*stage_id,
task_count,
max_child_task_count.unwrap_or(1),
)?);
stage_id.add_assign(1);
Ok(node)
}
// DataFusion is trying to coalesce multiple partitions into one, so we should do the
// same with tasks.
PlanOrNetworkBoundary::Coalesce => {
// It would need a network boundary, but on both sides of the boundary there is just 1 task,
// so we are fine with not introducing any network boundary.
if task_count == 1 && max_child_task_count == Some(1) {
return require_one_child(new_children);
}
let node = Arc::new(NetworkCoalesceExec::try_new(
require_one_child(new_children)?,
query_id,
*stage_id,
task_count,
max_child_task_count.unwrap_or(1),
)?);
stage_id.add_assign(1);
Ok(node)
}
// This is a CollectLeft HashJoinExec with the build side marked as being broadcast. we
// need to insert a NetworkBroadcastExec and scale up the BroadcastExec consumer_tasks.
PlanOrNetworkBoundary::Broadcast => {
// It would need a network boundary, but on both sides of the boundary there is just 1 task,
// so we are fine with not introducing any network boundary.
if task_count == 1 && max_child_task_count == Some(1) {
return require_one_child(new_children);
}
let node = Arc::new(NetworkBroadcastExec::try_new(
require_one_child(new_children)?,
query_id,
*stage_id,
task_count,
max_child_task_count.unwrap_or(1),
)?);
stage_id.add_assign(1);
Ok(node)
plan = inject_network_boundaries(plan, cfg).await?;

plan = prepare_network_boundaries(plan)?;
if !plan.exists(|plan| Ok(plan.is_network_boundary()))? {
return Ok(original_plan);
}

let plan = partial_reduce_below_network_shuffles(plan, cfg)?;

Ok(Arc::new(DistributedExec::new(plan)))
}
}

Expand Down
Loading
Loading