Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions benchmarks/cdk/bin/datafusion-bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ async function main() {
.option('--max-tasks-per-stage <number>', 'Max tasks per stage', '0')
.option('--repartition-file-min-size <number>', 'repartition_file_min_size DF option', '10485760' /* upstream default */)
.option('--target-partitions <number>', 'target_partitions DF option', '8')
.option('--dynamic <boolean>', 'Use the dynamic task count assigner', 'false')
.option('--bytes-per-partition-per-second <number>', 'Target throughput in bytes per partition per second for the dynamic task count allocator', `${256 * 1024 * 1024}`)
.option('--queries <string>', 'Specific queries to run', undefined)
.option('--debug <boolean>', 'Print the generated plans to stdout')
.option('--warmup <boolean>', 'Perform a warmup query before the benchmarks', 'true')
Expand All @@ -46,6 +48,8 @@ async function main() {
const childrenIsolatorUnions = options.childrenIsolatorUnions === 'true' || options.childrenIsolatorUnions === 1
const broadcastJoins = options.broadcastJoins === 'true' || options.broadcastJoins === 1
const partialReduce = options.partialReduce === 'true' || options.partialReduce === 1
const dynamicTaskCount = options.dynamic === 'true' || options.dynamic === 1
const bytesPerPartitionPerSecond = parseInt(options.bytesPerPartitionPerSecond)
const debug = options.debug === true || options.debug === 'true' || options.debug === 1
const warmup = options.warmup === true || options.warmup === 'true' || options.warmup === 1

Expand All @@ -59,6 +63,8 @@ async function main() {
compression,
broadcastJoins,
partialReduce,
dynamicTaskCount,
bytesPerPartitionPerSecond,
maxTasksPerStage,
repartitionFileMinSize,
targetPartitions
Expand Down Expand Up @@ -97,6 +103,8 @@ class DataFusionRunner implements BenchmarkRunner {
childrenIsolatorUnions: boolean;
broadcastJoins: boolean;
partialReduce: boolean;
dynamicTaskCount: boolean;
bytesPerPartitionPerSecond: number;
maxTasksPerStage: number;
repartitionFileMinSize: number;
targetPartitions: number;
Expand Down Expand Up @@ -176,6 +184,8 @@ class DataFusionRunner implements BenchmarkRunner {
SET distributed.children_isolator_unions=${this.options.childrenIsolatorUnions};
SET distributed.broadcast_joins=${this.options.broadcastJoins};
SET distributed.partial_reduce=${this.options.partialReduce};
SET distributed.dynamic_task_count=${this.options.dynamicTaskCount};
SET distributed.bytes_per_partition_per_second=${this.options.bytesPerPartitionPerSecond};
SET distributed.max_tasks_per_stage=${this.options.maxTasksPerStage};
SET datafusion.optimizer.repartition_file_min_size=${this.options.repartitionFileMinSize};
SET datafusion.execution.target_partitions=${this.options.targetPartitions};
Expand Down
46 changes: 26 additions & 20 deletions benchmarks/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ use datafusion::common::utils::get_available_parallelism;
use datafusion::common::{config_err, exec_err, not_impl_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::physical_plan::collect;
use datafusion::prelude::*;
use datafusion_distributed::test_utils::localhost::LocalHostWorkerResolver;
use datafusion_distributed::{DistributedExt, NetworkBoundaryExt, SessionStateBuilderExt, Worker};
use datafusion_distributed::{
DistributedExt, DistributedMetricsFormat, NetworkBoundaryExt, SessionStateBuilderExt, Worker,
display_plan_ascii, rewrite_distributed_plan_with_metrics,
};
use datafusion_distributed_benchmarks::datasets::{clickbench, register_tables, tpcds, tpch};
use std::error::Error;
use std::fs;
Expand Down Expand Up @@ -112,6 +114,14 @@ pub struct RunOpt {
#[structopt(short = "s", long = "batch-size")]
batch_size: Option<usize>,

/// Dynamically assign tasks to stages based on runtime stats
#[structopt(long = "dynamic")]
dynamic: bool,

/// Amount of bytes per second each partition is expected to handle during dynamic execution
#[structopt(long = "bps")]
bytes_per_partition_per_second: Option<usize>,

/// Activate debug mode to see more details
#[structopt(short, long)]
debug: bool,
Expand Down Expand Up @@ -171,7 +181,7 @@ impl RunOpt {
}

async fn run_local(self) -> Result<()> {
let state = SessionStateBuilder::new()
let mut builder = SessionStateBuilder::new()
.with_default_features()
.with_config(self.config()?)
.with_distributed_worker_resolver(LocalHostWorkerResolver::new(self.workers.clone()))
Expand All @@ -192,8 +202,12 @@ impl RunOpt {
.with_distributed_broadcast_joins(self.broadcast_joins)?
.with_distributed_metrics_collection(self.collect_metrics)?
.with_distributed_max_tasks_per_stage(self.max_tasks_per_stage)?
.build();
let ctx = SessionContext::new_with_state(state);
.with_distributed_dynamic_task_count(self.dynamic)?;
if let Some(v) = self.bytes_per_partition_per_second {
builder.set_distributed_bytes_per_partition_per_second(v)?;
}

let ctx = SessionContext::new_with_state(builder.build());
register_tables(&ctx, &self.get_path()?).await?;

println!("Running benchmarks with the following options: {self:?}");
Expand Down Expand Up @@ -286,21 +300,8 @@ impl RunOpt {
let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();

if self.debug {
println!("=== Logical plan ===\n{plan}\n");
}

let plan = state.optimize(&plan)?;
if self.debug {
println!("=== Optimized logical plan ===\n{plan}\n");
}
let physical_plan = state.create_physical_plan(&plan).await?;
if self.debug {
println!(
"=== Physical plan ===\n{}\n",
displayable(physical_plan.as_ref()).indent(true)
);
}
let mut n_tasks = 0;
physical_plan.clone().transform_down(|node| {
if let Some(node) = node.as_network_boundary() {
Expand All @@ -310,9 +311,14 @@ impl RunOpt {
})?;
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
if self.debug {
let plan = rewrite_distributed_plan_with_metrics(
physical_plan,
DistributedMetricsFormat::Aggregated,
)
.await?;
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true)
display_plan_ascii(plan.as_ref(), true)
);
}
Ok((result, n_tasks))
Expand Down
98 changes: 72 additions & 26 deletions src/coordinator/distributed.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::DistributedConfig;
use crate::common::{require_one_child, serialize_uuid};
use crate::coordinator::metrics_store::MetricsStore;
use crate::coordinator::prepare_dynamic_plan::prepare_dynamic_plan;
use crate::coordinator::prepare_static_plan::prepare_static_plan;
use crate::distributed_planner::NetworkBoundaryExt;
use crate::worker::generated::worker::TaskKey;
Expand Down Expand Up @@ -27,22 +29,25 @@ use std::sync::Mutex;
/// over the wire.
#[derive(Debug)]
pub struct DistributedExec {
plan: Arc<dyn ExecutionPlan>,
prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
base_plan: Arc<dyn ExecutionPlan>,
final_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
head_stage: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
metrics: ExecutionPlanMetricsSet,
pub(crate) task_metrics: Option<Arc<MetricsStore>>,
}

pub(super) struct PreparedPlan {
pub(super) head_stage: Arc<dyn ExecutionPlan>,
pub(super) final_plan: Arc<dyn ExecutionPlan>,
pub(super) join_set: JoinSet<Result<()>>,
}

impl DistributedExec {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
pub fn new(base_plan: Arc<dyn ExecutionPlan>) -> Self {
Self {
plan,
prepared_plan: Arc::new(Mutex::new(None)),
base_plan,
final_plan: Arc::new(Mutex::new(None)),
head_stage: Arc::new(Mutex::new(None)),
metrics: ExecutionPlanMetricsSet::new(),
task_metrics: None,
}
Expand All @@ -69,7 +74,10 @@ impl DistributedExec {
let Some(task_metrics) = &self.task_metrics else {
return;
};
let _ = self.plan.apply(|plan| {
let Some(plan) = self.final_plan.lock().unwrap().as_ref().cloned() else {
return;
};
let _ = plan.apply(|plan| {
if let Some(boundary) = plan.as_network_boundary() {
let stage = boundary.input_stage();
for i in 0..stage.task_count() {
Expand All @@ -95,14 +103,26 @@ impl DistributedExec {
/// It is updated on every call to `execute()`. Returns an error if `.execute()` has not been
/// called.
pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
self.prepared_plan
self.final_plan
.lock()
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?
.clone()
.ok_or_else(|| {
internal_datafusion_err!("No prepared plan found. Was execute() called?")
})
}

/// Returns the head stage that was actually executed. Unlike [`Self::prepared_plan`] (which is
/// reconstructed for visualization, with `Stage::Local` boundaries and rebuilt ancestor
/// `Arc`s), this returns the original `Arc` instances whose metrics were populated during
/// execution.
pub(crate) fn head_stage(&self) -> Result<Arc<dyn ExecutionPlan>> {
self.head_stage
.lock()
.map_err(|e| internal_datafusion_err!("Failed to lock head stage: {}", e))?
.clone()
.ok_or_else(|| internal_datafusion_err!("No head stage found. Was execute() called?"))
}
}

impl DisplayAs for DistributedExec {
Expand All @@ -121,20 +141,21 @@ impl ExecutionPlan for DistributedExec {
}

fn properties(&self) -> &Arc<PlanProperties> {
self.plan.properties()
self.base_plan.properties()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.plan]
vec![&self.base_plan]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(DistributedExec {
plan: require_one_child(&children)?,
prepared_plan: self.prepared_plan.clone(),
base_plan: require_one_child(&children)?,
final_plan: Arc::new(Mutex::new(None)),
head_stage: Arc::new(Mutex::new(None)),
metrics: self.metrics.clone(),
task_metrics: self.task_metrics.clone(),
}))
Expand All @@ -155,31 +176,56 @@ impl ExecutionPlan for DistributedExec {
);
}

let PreparedPlan {
head_stage,
join_set,
} = prepare_static_plan(&self.plan, &self.metrics, &self.task_metrics, &context)?;
{
let mut guard = self
.prepared_plan
.lock()
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?;
*guard = Some(head_stage.clone());
}
let this = Self {
base_plan: Arc::clone(&self.base_plan),
final_plan: Arc::clone(&self.final_plan),
head_stage: Arc::clone(&self.head_stage),
metrics: self.metrics.clone(),
task_metrics: self.task_metrics.clone(),
};

let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1);
let tx = builder.tx();
// Spawn the task that pulls data from child...
builder.spawn(async move {
let d_cfg = DistributedConfig::from_config_options(context.session_config().options())?;

let PreparedPlan {
head_stage,
final_plan,
join_set,
} = match d_cfg.dynamic_task_count {
false => prepare_static_plan(
&this.base_plan,
&this.metrics,
&this.task_metrics,
&context,
)?,
true => {
prepare_dynamic_plan(
&this.base_plan,
&this.metrics,
&this.task_metrics,
&context,
)
.await?
}
};

this.final_plan
.lock()
.expect("poisoned lock")
.replace(final_plan);
this.head_stage
.lock()
.expect("poisoned lock")
.replace(Arc::clone(&head_stage));
let mut stream = head_stage.execute(partition, context)?;
while let Some(msg) = stream.next().await {
if tx.send(msg).await.is_err() {
break; // channel closed
}
}
Ok(())
});
// ...in parallel to the one that feeds the plan to workers.
builder.spawn(async move {
for res in join_set.join_all().await {
res?;
}
Expand Down
1 change: 1 addition & 0 deletions src/coordinator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod distributed;
mod metrics_store;
mod prepare_dynamic_plan;
mod prepare_static_plan;
mod task_spawner;

Expand Down
Loading
Loading