Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.calcite.rel.rules.PinotEnrichedJoinRule;
import org.apache.pinot.calcite.rel.rules.ImmutablePinotSortExchangeCopyRule;
import org.apache.pinot.calcite.rel.rules.PinotImplicitTableHintRule;
import org.apache.pinot.calcite.rel.rules.PinotJoinToDynamicBroadcastRule;
import org.apache.pinot.calcite.rel.rules.PinotQueryRuleSets;
import org.apache.pinot.calcite.rel.rules.PinotRelDistributionTraitRule;
import org.apache.pinot.calcite.rel.rules.PinotRuleUtils;
import org.apache.pinot.calcite.rel.rules.PinotSortExchangeCopyRule;
Expand Down Expand Up @@ -89,6 +88,9 @@
import org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment;
import org.apache.pinot.query.planner.physical.v2.RelToPRelConverter;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.rules.Phase;
import org.apache.pinot.query.planner.rules.PinotRuleSet;
import org.apache.pinot.query.planner.rules.RuleSetCustomizer;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.type.TypeFactory;
import org.apache.pinot.query.validate.BytesCastVisitor;
Expand Down Expand Up @@ -168,7 +170,7 @@ public QueryEnvironment(Config config, MultiClusterRoutingContext multiClusterRo
rootSchema, List.of(database), _typeFactory, CONNECTION_CONFIG, config.isCaseSensitive());
_defaultDisabledPlannerRules = _envConfig.defaultDisabledPlannerRules();
// default optProgram with no skip rule options and no use rule options
_optProgram = getOptProgram(Set.of(), Set.of(), _defaultDisabledPlannerRules);
_optProgram = getOptProgram(_envConfig.getRuleSet(), Set.of(), Set.of(), _defaultDisabledPlannerRules);
_multiClusterRoutingContext = multiClusterRoutingContext;
}

Expand Down Expand Up @@ -206,7 +208,7 @@ private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) {
Set<String> skipRuleSet = QueryOptionsUtils.getSkipPlannerRules(options);
if (!skipRuleSet.isEmpty() || !useRuleSet.isEmpty()) {
// dynamically create optProgram according to rule options
optProgram = getOptProgram(skipRuleSet, useRuleSet, _defaultDisabledPlannerRules);
optProgram = getOptProgram(_envConfig.getRuleSet(), skipRuleSet, useRuleSet, _defaultDisabledPlannerRules);
}
}
int sortExchangeCopyLimit = QueryOptionsUtils.getSortExchangeCopyThreshold(options,
Expand Down Expand Up @@ -536,24 +538,25 @@ private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContex
* @param defaultDisabledRuleSet parsed default disabled rule set from broker config
* @return HepProgram that performs logical transformations
*/
private static HepProgram getOptProgram(Set<String> skipRuleSet, Set<String> useRuleSet,
private static HepProgram getOptProgram(PinotRuleSet ruleSet, Set<String> skipRuleSet, Set<String> useRuleSet,
Set<String> defaultDisabledRuleSet) {
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
// Set the match order as DEPTH_FIRST. The default is arbitrary which works the same as DEPTH_FIRST, but it's
// best to be explicit.
hepProgramBuilder.addMatchOrder(HepMatchOrder.DEPTH_FIRST);

// ----
// Rules are disabled if its corresponding value is set to false in ruleFlags
// construct filtered BASIC_RULES, FILTER_PUSHDOWN_RULES, PROJECT_PUSHDOWN_RULES, PRUNE_RULES
// Rules are disabled if its corresponding value is set to false in ruleFlags.
// Sources come from PinotRuleSet (after every RuleSetCustomizer ran); per-query
// skip/use options are then applied by filterRuleList on a fresh copy.
List<RelOptRule> basicRules =
filterRuleList(PinotQueryRuleSets.BASIC_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet);
filterRuleList(ruleSet.rulesFor(Phase.BASIC), skipRuleSet, useRuleSet, defaultDisabledRuleSet);
List<RelOptRule> filterPushdownRules =
filterRuleList(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet);
filterRuleList(ruleSet.rulesFor(Phase.FILTER_PUSHDOWN), skipRuleSet, useRuleSet, defaultDisabledRuleSet);
List<RelOptRule> projectPushdownRules =
filterRuleList(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet);
filterRuleList(ruleSet.rulesFor(Phase.PROJECT_PUSHDOWN), skipRuleSet, useRuleSet, defaultDisabledRuleSet);
List<RelOptRule> pruneRules =
filterRuleList(PinotQueryRuleSets.PRUNE_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet);
filterRuleList(ruleSet.rulesFor(Phase.PRUNE), skipRuleSet, useRuleSet, defaultDisabledRuleSet);

// Run the Calcite CORE rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity:
// the rules used here can rest assured that they are the only ones evaluated in a dedicated graph-traversal.
Expand Down Expand Up @@ -628,32 +631,46 @@ private static boolean isRuleSkipped(String ruleName, Set<String> skipRuleSet, S
private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, Config config,
boolean usePhysicalOptimizer, Set<String> useRuleSet, int sortExchangeCopyLimit) {
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
PinotRuleSet ruleSet = config.getRuleSet();

// Set the match order as BOTTOM_UP.
hepProgramBuilder.addMatchOrder(HepMatchOrder.BOTTOM_UP);

// ----
// Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule.
if (!usePhysicalOptimizer) {
for (RelOptRule relOptRule : PinotQueryRuleSets.getPinotPostRules(sortExchangeCopyLimit)) {
// POST_LOGICAL list comes from PinotRuleSet; we copy it because we may need to
// swap every PinotSortExchangeCopyRule with one configured for the per-query
// (or broker-config) sortExchangeCopyLimit if it differs from the rule's default.
List<RelOptRule> postLogical = new ArrayList<>(ruleSet.rulesFor(Phase.POST_LOGICAL));
if (sortExchangeCopyLimit != PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.config.getFetchLimitThreshold()) {
PinotSortExchangeCopyRule overridden = ImmutablePinotSortExchangeCopyRule.Config.builder()
.from(PinotSortExchangeCopyRule.Config.DEFAULT)
.fetchLimitThreshold(sortExchangeCopyLimit)
.build()
.toRule();
postLogical.replaceAll(r -> r instanceof PinotSortExchangeCopyRule ? overridden : r);
}
for (RelOptRule relOptRule : postLogical) {
if (isEligibleQueryPostRule(relOptRule, config)) {
hepProgramBuilder.addRuleInstance(relOptRule);
}
}
if (!isRuleSkipped(CommonConstants.Broker.PlannerRuleNames.JOIN_TO_ENRICHED_JOIN, Set.of(), useRuleSet,
config.defaultDisabledPlannerRules())) {
// push filter and project above join to enrichedJoin, does not work with physical optimizer
hepProgramBuilder.addRuleCollection(PinotEnrichedJoinRule.PINOT_ENRICHED_JOIN_RULES);
hepProgramBuilder.addRuleCollection(ruleSet.rulesFor(Phase.POST_LOGICAL_ENRICHED_JOIN));
}
} else {
for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES_V2) {
for (RelOptRule relOptRule : ruleSet.rulesFor(Phase.POST_LOGICAL_V2)) {
if (isEligibleQueryPostRule(relOptRule, config)) {
hepProgramBuilder.addRuleInstance(relOptRule);
}
}
}
if (!usePhysicalOptimizer) {
// apply RelDistribution trait to all nodes
// apply RelDistribution trait to all nodes — these rules depend on the
// per-query WorkerManager, so they stay outside PinotRuleSet.
if (workerManager != null) {
hepProgramBuilder.addRuleInstance(PinotImplicitTableHintRule.withWorkerManager(workerManager));
}
Expand Down Expand Up @@ -695,6 +712,17 @@ public interface Config {
@Nullable
TableCache getTableCache();

/**
* The multi-stage planner's per-phase Calcite rule lists. Defaults to the
* process-wide singleton built from {@link java.util.ServiceLoader}-discovered
* {@link RuleSetCustomizer}s, so per-query {@link Config} instances do not
* repeat discovery work.
*/
@Value.Default
default PinotRuleSet getRuleSet() {
return PinotRuleSet.defaultInstance();
}

@Value.Default
default boolean isNullHandlingEnabled() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.calcite.rel.rules;
package org.apache.pinot.query.planner.rules;

import java.util.List;
import org.apache.calcite.plan.RelOptRule;
Expand All @@ -43,26 +43,44 @@
import org.apache.calcite.rel.rules.SortJoinTransposeRule;
import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.rel.rules.UnionToDistinctRule;
import org.apache.pinot.calcite.rel.rules.PinotAggregateExchangeNodeInsertRule;
import org.apache.pinot.calcite.rel.rules.PinotAggregateFunctionRewriteRule;
import org.apache.pinot.calcite.rel.rules.PinotAggregateReduceFunctionsRule;
import org.apache.pinot.calcite.rel.rules.PinotEnrichedJoinRule;
import org.apache.pinot.calcite.rel.rules.PinotEvaluateLiteralRule;
import org.apache.pinot.calcite.rel.rules.PinotExchangeEliminationRule;
import org.apache.pinot.calcite.rel.rules.PinotFilterJoinRule.PinotFilterIntoJoinRule;
import org.apache.pinot.calcite.rel.rules.PinotFilterJoinRule.PinotJoinConditionPushRule;
import org.apache.pinot.calcite.rel.rules.PinotJoinExchangeNodeInsertRule;
import org.apache.pinot.calcite.rel.rules.PinotJoinPushTransitivePredicatesRule;
import org.apache.pinot.calcite.rel.rules.PinotJoinToDynamicBroadcastRule;
import org.apache.pinot.calcite.rel.rules.PinotLogicalAggregateRule;
import org.apache.pinot.calcite.rel.rules.PinotProjectJoinTransposeRule;
import org.apache.pinot.calcite.rel.rules.PinotSemiJoinDistinctProjectRule;
import org.apache.pinot.calcite.rel.rules.PinotSetOpExchangeNodeInsertRule;
import org.apache.pinot.calcite.rel.rules.PinotSingleValueAggregateRemoveRule;
import org.apache.pinot.calcite.rel.rules.PinotSortExchangeCopyRule;
import org.apache.pinot.calcite.rel.rules.PinotSortExchangeNodeInsertRule;
import org.apache.pinot.calcite.rel.rules.PinotTableScanConverterRule;
import org.apache.pinot.calcite.rel.rules.PinotWindowExchangeNodeInsertRule;
import org.apache.pinot.calcite.rel.rules.PinotWindowSplitRule;
import org.apache.pinot.spi.utils.CommonConstants.Broker.PlannerRuleNames;


/**
* Default rule sets for Pinot query
* Defaultly disabled rules are defined in
* {@link org.apache.pinot.spi.utils.CommonConstants.Broker#DEFAULT_DISABLED_RULES}
*
* TODO: This class started as a list of constant rule sets, but since then we have added dynamic rule generation
* to it as well. We should probably refactor the class to make it easier to understand, maintain and change the rules
* based on contextual information like query options.
*/
public class PinotQueryRuleSets {
private PinotQueryRuleSets() {
}
/// [RuleSetCustomizer] that seeds every [Phase] with the OSS default Calcite
/// rules for the multi-stage query planner. Registered as a [java.util.ServiceLoader]
/// service entry so it is picked up automatically by [PinotRuleSet].
///
/// `POST_LOGICAL` includes `PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY`
/// configured with the rule's hard-coded default `fetchLimitThreshold`.
/// Per-query overrides (and broker-config overrides) of that limit are
/// applied later by `QueryEnvironment.getTraitProgram`, which swaps the
/// configured `PinotSortExchangeCopyRule` on a per-query copy of the
/// `POST_LOGICAL` list.
public final class DefaultRuleSetCustomizer implements RuleSetCustomizer {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rename removes the existing public org.apache.pinot.calcite.rel.rules.PinotQueryRuleSets type entirely. Any out-of-tree planner extension or test compiled against current releases will fail to load or compile after upgrade. Please keep a deprecated bridge in the old package instead of replacing the class outright.


//@formatter:off
public static final List<RelOptRule> BASIC_RULES = List.of(
static final List<RelOptRule> BASIC_RULES = List.of(
// push a filter into a join
PinotFilterIntoJoinRule
.instanceWithDescription(PlannerRuleNames.FILTER_INTO_JOIN),
Expand Down Expand Up @@ -162,7 +180,7 @@ private PinotQueryRuleSets() {

// Filter pushdown rules run using a RuleCollection since we want to push down a filter as much as possible in a
// single HepInstruction.
public static final List<RelOptRule> FILTER_PUSHDOWN_RULES = List.of(
static final List<RelOptRule> FILTER_PUSHDOWN_RULES = List.of(
PinotFilterIntoJoinRule
.instanceWithDescription(PlannerRuleNames.FILTER_INTO_JOIN),
FilterAggregateTransposeRule.Config.DEFAULT
Expand All @@ -175,7 +193,7 @@ private PinotQueryRuleSets() {

// Project pushdown rules run using a RuleCollection since we want to push down a project as much as possible in a
// single HepInstruction.
public static final List<RelOptRule> PROJECT_PUSHDOWN_RULES = List.of(
static final List<RelOptRule> PROJECT_PUSHDOWN_RULES = List.of(
ProjectFilterTransposeRule.Config.DEFAULT
.withDescription(PlannerRuleNames.PROJECT_FILTER_TRANSPOSE).toRule(),
PinotProjectJoinTransposeRule
Expand All @@ -185,7 +203,7 @@ private PinotQueryRuleSets() {
);

// The pruner rules run top-down to ensure Calcite restarts from root node after applying a transformation.
public static final List<RelOptRule> PRUNE_RULES = List.of(
static final List<RelOptRule> PRUNE_RULES = List.of(
AggregateProjectMergeRule.Config.DEFAULT
.withDescription(PlannerRuleNames.AGGREGATE_PROJECT_MERGE).toRule(),
ProjectMergeRule.Config.DEFAULT
Expand Down Expand Up @@ -218,7 +236,38 @@ private PinotQueryRuleSets() {
.withDescription(PlannerRuleNames.PRUNE_EMPTY_UNION).toRule()
);

public static final List<RelOptRule> PINOT_POST_RULES_V2 = List.of(
/// Pinot specific rules that should be run AFTER all other rules.
/// Includes [PinotSortExchangeCopyRule#SORT_EXCHANGE_COPY] (configured with
/// the rule's hard-coded default fetch limit). `QueryEnvironment` swaps the
/// rule on a per-query copy of this list when the per-query
/// `sortExchangeCopyLimit` differs from that default.
static final List<RelOptRule> POST_LOGICAL_RULES = List.of(
// TODO: Merge the following 2 rules into a single rule
// add an extra exchange for sort
PinotSortExchangeNodeInsertRule.INSTANCE,
PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY,

PinotSingleValueAggregateRemoveRule.INSTANCE,
PinotJoinExchangeNodeInsertRule.INSTANCE,
PinotAggregateExchangeNodeInsertRule.SortProjectAggregate.INSTANCE,
PinotAggregateExchangeNodeInsertRule.SortAggregate.INSTANCE,
PinotAggregateExchangeNodeInsertRule.WithoutSort.INSTANCE,
PinotWindowSplitRule.INSTANCE,
PinotWindowExchangeNodeInsertRule.INSTANCE,
PinotSetOpExchangeNodeInsertRule.INSTANCE,

// apply dynamic broadcast rule after exchange is inserted
PinotJoinToDynamicBroadcastRule.INSTANCE,

// remove exchanges when there's duplicates
PinotExchangeEliminationRule.INSTANCE,

// Evaluate the Literal filter nodes
CoreRules.FILTER_REDUCE_EXPRESSIONS,
PinotTableScanConverterRule.INSTANCE
);

static final List<RelOptRule> POST_LOGICAL_V2_RULES = List.of(
PinotTableScanConverterRule.INSTANCE,
PinotLogicalAggregateRule.SortProjectAggregate.INSTANCE,
PinotLogicalAggregateRule.SortAggregate.INSTANCE,
Expand All @@ -229,44 +278,38 @@ private PinotQueryRuleSets() {
);
//@formatter:on

/// Pinot specific rules that should be run AFTER all other rules
public static List<RelOptRule> getPinotPostRules(int sortExchangeCopyLimit) {
/// No-arg constructor required by [java.util.ServiceLoader].
public DefaultRuleSetCustomizer() {
}

// copy exchanges down, this must be done after SortExchangeNodeInsertRule
PinotSortExchangeCopyRule sortExchangeCopyRule;
if (sortExchangeCopyLimit != PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.config.getFetchLimitThreshold()) {
sortExchangeCopyRule = ImmutablePinotSortExchangeCopyRule.Config.builder()
.from(PinotSortExchangeCopyRule.Config.DEFAULT)
.fetchLimitThreshold(sortExchangeCopyLimit)
.build()
.toRule();
} else {
sortExchangeCopyRule = PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY;
@Override
public void customize(Phase phase, List<RelOptRule> rules) {
switch (phase) {
case BASIC:
rules.addAll(BASIC_RULES);
return;
case FILTER_PUSHDOWN:
rules.addAll(FILTER_PUSHDOWN_RULES);
return;
case PROJECT_PUSHDOWN:
rules.addAll(PROJECT_PUSHDOWN_RULES);
return;
case PRUNE:
rules.addAll(PRUNE_RULES);
return;
case POST_LOGICAL:
rules.addAll(POST_LOGICAL_RULES);
return;
case POST_LOGICAL_V2:
rules.addAll(POST_LOGICAL_V2_RULES);
return;
case POST_LOGICAL_ENRICHED_JOIN:
rules.addAll(PinotEnrichedJoinRule.PINOT_ENRICHED_JOIN_RULES);
return;
default:
throw new IllegalStateException(
"DefaultRuleSetCustomizer is missing OSS rule defaults for Phase." + phase
+ "; extend the switch when adding a new Phase value.");
}
return List.of(
// TODO: Merge the following 2 rules into a single rule
// add an extra exchange for sort
PinotSortExchangeNodeInsertRule.INSTANCE,
sortExchangeCopyRule,

PinotSingleValueAggregateRemoveRule.INSTANCE,
PinotJoinExchangeNodeInsertRule.INSTANCE,
PinotAggregateExchangeNodeInsertRule.SortProjectAggregate.INSTANCE,
PinotAggregateExchangeNodeInsertRule.SortAggregate.INSTANCE,
PinotAggregateExchangeNodeInsertRule.WithoutSort.INSTANCE,
PinotWindowSplitRule.INSTANCE,
PinotWindowExchangeNodeInsertRule.INSTANCE,
PinotSetOpExchangeNodeInsertRule.INSTANCE,

// apply dynamic broadcast rule after exchange is inserted/
PinotJoinToDynamicBroadcastRule.INSTANCE,

// remove exchanges when there's duplicates
PinotExchangeEliminationRule.INSTANCE,

// Evaluate the Literal filter nodes
CoreRules.FILTER_REDUCE_EXPRESSIONS,
PinotTableScanConverterRule.INSTANCE
);
}
}
Loading
Loading