diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 7535cdcbca07..c7d263e40a1b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -57,10 +57,9 @@ import org.apache.calcite.tools.RelBuilder; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pinot.calcite.rel.rules.PinotEnrichedJoinRule; +import org.apache.pinot.calcite.rel.rules.ImmutablePinotSortExchangeCopyRule; import org.apache.pinot.calcite.rel.rules.PinotImplicitTableHintRule; import org.apache.pinot.calcite.rel.rules.PinotJoinToDynamicBroadcastRule; -import org.apache.pinot.calcite.rel.rules.PinotQueryRuleSets; import org.apache.pinot.calcite.rel.rules.PinotRelDistributionTraitRule; import org.apache.pinot.calcite.rel.rules.PinotRuleUtils; import org.apache.pinot.calcite.rel.rules.PinotSortExchangeCopyRule; @@ -89,6 +88,9 @@ import org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment; import org.apache.pinot.query.planner.physical.v2.RelToPRelConverter; import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.rules.Phase; +import org.apache.pinot.query.planner.rules.PinotRuleSet; +import org.apache.pinot.query.planner.rules.RuleSetCustomizer; import org.apache.pinot.query.routing.WorkerManager; import org.apache.pinot.query.type.TypeFactory; import org.apache.pinot.query.validate.BytesCastVisitor; @@ -168,7 +170,7 @@ public QueryEnvironment(Config config, MultiClusterRoutingContext multiClusterRo rootSchema, List.of(database), _typeFactory, CONNECTION_CONFIG, config.isCaseSensitive()); _defaultDisabledPlannerRules = _envConfig.defaultDisabledPlannerRules(); // default optProgram with no skip rule options and no use rule options - _optProgram = getOptProgram(Set.of(), Set.of(), _defaultDisabledPlannerRules); + _optProgram = getOptProgram(_envConfig.getRuleSet(), Set.of(), Set.of(), _defaultDisabledPlannerRules); _multiClusterRoutingContext = multiClusterRoutingContext; } @@ -206,7 +208,7 @@ private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) { Set skipRuleSet = QueryOptionsUtils.getSkipPlannerRules(options); if (!skipRuleSet.isEmpty() || !useRuleSet.isEmpty()) { // dynamically create optProgram according to rule options - optProgram = getOptProgram(skipRuleSet, useRuleSet, _defaultDisabledPlannerRules); + optProgram = getOptProgram(_envConfig.getRuleSet(), skipRuleSet, useRuleSet, _defaultDisabledPlannerRules); } } int sortExchangeCopyLimit = QueryOptionsUtils.getSortExchangeCopyThreshold(options, @@ -536,7 +538,7 @@ private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContex * @param defaultDisabledRuleSet parsed default disabled rule set from broker config * @return HepProgram that performs logical transformations */ - private static HepProgram getOptProgram(Set skipRuleSet, Set useRuleSet, + private static HepProgram getOptProgram(PinotRuleSet ruleSet, Set skipRuleSet, Set useRuleSet, Set defaultDisabledRuleSet) { HepProgramBuilder hepProgramBuilder = new HepProgramBuilder(); // Set the match order as DEPTH_FIRST. The default is arbitrary which works the same as DEPTH_FIRST, but it's @@ -544,16 +546,17 @@ private static HepProgram getOptProgram(Set skipRuleSet, Set use hepProgramBuilder.addMatchOrder(HepMatchOrder.DEPTH_FIRST); // ---- - // Rules are disabled if its corresponding value is set to false in ruleFlags - // construct filtered BASIC_RULES, FILTER_PUSHDOWN_RULES, PROJECT_PUSHDOWN_RULES, PRUNE_RULES + // Rules are disabled if its corresponding value is set to false in ruleFlags. + // Sources come from PinotRuleSet (after every RuleSetCustomizer ran); per-query + // skip/use options are then applied by filterRuleList on a fresh copy. List basicRules = - filterRuleList(PinotQueryRuleSets.BASIC_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet); + filterRuleList(ruleSet.rulesFor(Phase.BASIC), skipRuleSet, useRuleSet, defaultDisabledRuleSet); List filterPushdownRules = - filterRuleList(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet); + filterRuleList(ruleSet.rulesFor(Phase.FILTER_PUSHDOWN), skipRuleSet, useRuleSet, defaultDisabledRuleSet); List projectPushdownRules = - filterRuleList(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet); + filterRuleList(ruleSet.rulesFor(Phase.PROJECT_PUSHDOWN), skipRuleSet, useRuleSet, defaultDisabledRuleSet); List pruneRules = - filterRuleList(PinotQueryRuleSets.PRUNE_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet); + filterRuleList(ruleSet.rulesFor(Phase.PRUNE), skipRuleSet, useRuleSet, defaultDisabledRuleSet); // Run the Calcite CORE rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity: // the rules used here can rest assured that they are the only ones evaluated in a dedicated graph-traversal. @@ -628,6 +631,7 @@ private static boolean isRuleSkipped(String ruleName, Set skipRuleSet, S private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, Config config, boolean usePhysicalOptimizer, Set useRuleSet, int sortExchangeCopyLimit) { HepProgramBuilder hepProgramBuilder = new HepProgramBuilder(); + PinotRuleSet ruleSet = config.getRuleSet(); // Set the match order as BOTTOM_UP. hepProgramBuilder.addMatchOrder(HepMatchOrder.BOTTOM_UP); @@ -635,7 +639,19 @@ private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, // ---- // Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule. if (!usePhysicalOptimizer) { - for (RelOptRule relOptRule : PinotQueryRuleSets.getPinotPostRules(sortExchangeCopyLimit)) { + // POST_LOGICAL list comes from PinotRuleSet; we copy it because we may need to + // swap every PinotSortExchangeCopyRule with one configured for the per-query + // (or broker-config) sortExchangeCopyLimit if it differs from the rule's default. + List postLogical = new ArrayList<>(ruleSet.rulesFor(Phase.POST_LOGICAL)); + if (sortExchangeCopyLimit != PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.config.getFetchLimitThreshold()) { + PinotSortExchangeCopyRule overridden = ImmutablePinotSortExchangeCopyRule.Config.builder() + .from(PinotSortExchangeCopyRule.Config.DEFAULT) + .fetchLimitThreshold(sortExchangeCopyLimit) + .build() + .toRule(); + postLogical.replaceAll(r -> r instanceof PinotSortExchangeCopyRule ? overridden : r); + } + for (RelOptRule relOptRule : postLogical) { if (isEligibleQueryPostRule(relOptRule, config)) { hepProgramBuilder.addRuleInstance(relOptRule); } @@ -643,17 +659,18 @@ private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, if (!isRuleSkipped(CommonConstants.Broker.PlannerRuleNames.JOIN_TO_ENRICHED_JOIN, Set.of(), useRuleSet, config.defaultDisabledPlannerRules())) { // push filter and project above join to enrichedJoin, does not work with physical optimizer - hepProgramBuilder.addRuleCollection(PinotEnrichedJoinRule.PINOT_ENRICHED_JOIN_RULES); + hepProgramBuilder.addRuleCollection(ruleSet.rulesFor(Phase.POST_LOGICAL_ENRICHED_JOIN)); } } else { - for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES_V2) { + for (RelOptRule relOptRule : ruleSet.rulesFor(Phase.POST_LOGICAL_V2)) { if (isEligibleQueryPostRule(relOptRule, config)) { hepProgramBuilder.addRuleInstance(relOptRule); } } } if (!usePhysicalOptimizer) { - // apply RelDistribution trait to all nodes + // apply RelDistribution trait to all nodes — these rules depend on the + // per-query WorkerManager, so they stay outside PinotRuleSet. if (workerManager != null) { hepProgramBuilder.addRuleInstance(PinotImplicitTableHintRule.withWorkerManager(workerManager)); } @@ -695,6 +712,17 @@ public interface Config { @Nullable TableCache getTableCache(); + /** + * The multi-stage planner's per-phase Calcite rule lists. Defaults to the + * process-wide singleton built from {@link java.util.ServiceLoader}-discovered + * {@link RuleSetCustomizer}s, so per-query {@link Config} instances do not + * repeat discovery work. + */ + @Value.Default + default PinotRuleSet getRuleSet() { + return PinotRuleSet.defaultInstance(); + } + @Value.Default default boolean isNullHandlingEnabled() { return false; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/DefaultRuleSetCustomizer.java similarity index 69% rename from pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/DefaultRuleSetCustomizer.java index 60923dce79c0..35e8fd7945d8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/DefaultRuleSetCustomizer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.calcite.rel.rules; +package org.apache.pinot.query.planner.rules; import java.util.List; import org.apache.calcite.plan.RelOptRule; @@ -43,26 +43,44 @@ import org.apache.calcite.rel.rules.SortJoinTransposeRule; import org.apache.calcite.rel.rules.SortRemoveRule; import org.apache.calcite.rel.rules.UnionToDistinctRule; +import org.apache.pinot.calcite.rel.rules.PinotAggregateExchangeNodeInsertRule; +import org.apache.pinot.calcite.rel.rules.PinotAggregateFunctionRewriteRule; +import org.apache.pinot.calcite.rel.rules.PinotAggregateReduceFunctionsRule; +import org.apache.pinot.calcite.rel.rules.PinotEnrichedJoinRule; +import org.apache.pinot.calcite.rel.rules.PinotEvaluateLiteralRule; +import org.apache.pinot.calcite.rel.rules.PinotExchangeEliminationRule; import org.apache.pinot.calcite.rel.rules.PinotFilterJoinRule.PinotFilterIntoJoinRule; import org.apache.pinot.calcite.rel.rules.PinotFilterJoinRule.PinotJoinConditionPushRule; +import org.apache.pinot.calcite.rel.rules.PinotJoinExchangeNodeInsertRule; +import org.apache.pinot.calcite.rel.rules.PinotJoinPushTransitivePredicatesRule; +import org.apache.pinot.calcite.rel.rules.PinotJoinToDynamicBroadcastRule; +import org.apache.pinot.calcite.rel.rules.PinotLogicalAggregateRule; +import org.apache.pinot.calcite.rel.rules.PinotProjectJoinTransposeRule; +import org.apache.pinot.calcite.rel.rules.PinotSemiJoinDistinctProjectRule; +import org.apache.pinot.calcite.rel.rules.PinotSetOpExchangeNodeInsertRule; +import org.apache.pinot.calcite.rel.rules.PinotSingleValueAggregateRemoveRule; +import org.apache.pinot.calcite.rel.rules.PinotSortExchangeCopyRule; +import org.apache.pinot.calcite.rel.rules.PinotSortExchangeNodeInsertRule; +import org.apache.pinot.calcite.rel.rules.PinotTableScanConverterRule; +import org.apache.pinot.calcite.rel.rules.PinotWindowExchangeNodeInsertRule; +import org.apache.pinot.calcite.rel.rules.PinotWindowSplitRule; import org.apache.pinot.spi.utils.CommonConstants.Broker.PlannerRuleNames; -/** - * Default rule sets for Pinot query - * Defaultly disabled rules are defined in - * {@link org.apache.pinot.spi.utils.CommonConstants.Broker#DEFAULT_DISABLED_RULES} - * - * TODO: This class started as a list of constant rule sets, but since then we have added dynamic rule generation - * to it as well. We should probably refactor the class to make it easier to understand, maintain and change the rules - * based on contextual information like query options. - */ -public class PinotQueryRuleSets { - private PinotQueryRuleSets() { - } +/// [RuleSetCustomizer] that seeds every [Phase] with the OSS default Calcite +/// rules for the multi-stage query planner. Registered as a [java.util.ServiceLoader] +/// service entry so it is picked up automatically by [PinotRuleSet]. +/// +/// `POST_LOGICAL` includes `PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY` +/// configured with the rule's hard-coded default `fetchLimitThreshold`. +/// Per-query overrides (and broker-config overrides) of that limit are +/// applied later by `QueryEnvironment.getTraitProgram`, which swaps the +/// configured `PinotSortExchangeCopyRule` on a per-query copy of the +/// `POST_LOGICAL` list. +public final class DefaultRuleSetCustomizer implements RuleSetCustomizer { //@formatter:off - public static final List BASIC_RULES = List.of( + static final List BASIC_RULES = List.of( // push a filter into a join PinotFilterIntoJoinRule .instanceWithDescription(PlannerRuleNames.FILTER_INTO_JOIN), @@ -162,7 +180,7 @@ private PinotQueryRuleSets() { // Filter pushdown rules run using a RuleCollection since we want to push down a filter as much as possible in a // single HepInstruction. - public static final List FILTER_PUSHDOWN_RULES = List.of( + static final List FILTER_PUSHDOWN_RULES = List.of( PinotFilterIntoJoinRule .instanceWithDescription(PlannerRuleNames.FILTER_INTO_JOIN), FilterAggregateTransposeRule.Config.DEFAULT @@ -175,7 +193,7 @@ private PinotQueryRuleSets() { // Project pushdown rules run using a RuleCollection since we want to push down a project as much as possible in a // single HepInstruction. - public static final List PROJECT_PUSHDOWN_RULES = List.of( + static final List PROJECT_PUSHDOWN_RULES = List.of( ProjectFilterTransposeRule.Config.DEFAULT .withDescription(PlannerRuleNames.PROJECT_FILTER_TRANSPOSE).toRule(), PinotProjectJoinTransposeRule @@ -185,7 +203,7 @@ private PinotQueryRuleSets() { ); // The pruner rules run top-down to ensure Calcite restarts from root node after applying a transformation. - public static final List PRUNE_RULES = List.of( + static final List PRUNE_RULES = List.of( AggregateProjectMergeRule.Config.DEFAULT .withDescription(PlannerRuleNames.AGGREGATE_PROJECT_MERGE).toRule(), ProjectMergeRule.Config.DEFAULT @@ -218,7 +236,38 @@ private PinotQueryRuleSets() { .withDescription(PlannerRuleNames.PRUNE_EMPTY_UNION).toRule() ); - public static final List PINOT_POST_RULES_V2 = List.of( + /// Pinot specific rules that should be run AFTER all other rules. + /// Includes [PinotSortExchangeCopyRule#SORT_EXCHANGE_COPY] (configured with + /// the rule's hard-coded default fetch limit). `QueryEnvironment` swaps the + /// rule on a per-query copy of this list when the per-query + /// `sortExchangeCopyLimit` differs from that default. + static final List POST_LOGICAL_RULES = List.of( + // TODO: Merge the following 2 rules into a single rule + // add an extra exchange for sort + PinotSortExchangeNodeInsertRule.INSTANCE, + PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY, + + PinotSingleValueAggregateRemoveRule.INSTANCE, + PinotJoinExchangeNodeInsertRule.INSTANCE, + PinotAggregateExchangeNodeInsertRule.SortProjectAggregate.INSTANCE, + PinotAggregateExchangeNodeInsertRule.SortAggregate.INSTANCE, + PinotAggregateExchangeNodeInsertRule.WithoutSort.INSTANCE, + PinotWindowSplitRule.INSTANCE, + PinotWindowExchangeNodeInsertRule.INSTANCE, + PinotSetOpExchangeNodeInsertRule.INSTANCE, + + // apply dynamic broadcast rule after exchange is inserted + PinotJoinToDynamicBroadcastRule.INSTANCE, + + // remove exchanges when there's duplicates + PinotExchangeEliminationRule.INSTANCE, + + // Evaluate the Literal filter nodes + CoreRules.FILTER_REDUCE_EXPRESSIONS, + PinotTableScanConverterRule.INSTANCE + ); + + static final List POST_LOGICAL_V2_RULES = List.of( PinotTableScanConverterRule.INSTANCE, PinotLogicalAggregateRule.SortProjectAggregate.INSTANCE, PinotLogicalAggregateRule.SortAggregate.INSTANCE, @@ -229,44 +278,38 @@ private PinotQueryRuleSets() { ); //@formatter:on - /// Pinot specific rules that should be run AFTER all other rules - public static List getPinotPostRules(int sortExchangeCopyLimit) { + /// No-arg constructor required by [java.util.ServiceLoader]. + public DefaultRuleSetCustomizer() { + } - // copy exchanges down, this must be done after SortExchangeNodeInsertRule - PinotSortExchangeCopyRule sortExchangeCopyRule; - if (sortExchangeCopyLimit != PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.config.getFetchLimitThreshold()) { - sortExchangeCopyRule = ImmutablePinotSortExchangeCopyRule.Config.builder() - .from(PinotSortExchangeCopyRule.Config.DEFAULT) - .fetchLimitThreshold(sortExchangeCopyLimit) - .build() - .toRule(); - } else { - sortExchangeCopyRule = PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY; + @Override + public void customize(Phase phase, List rules) { + switch (phase) { + case BASIC: + rules.addAll(BASIC_RULES); + return; + case FILTER_PUSHDOWN: + rules.addAll(FILTER_PUSHDOWN_RULES); + return; + case PROJECT_PUSHDOWN: + rules.addAll(PROJECT_PUSHDOWN_RULES); + return; + case PRUNE: + rules.addAll(PRUNE_RULES); + return; + case POST_LOGICAL: + rules.addAll(POST_LOGICAL_RULES); + return; + case POST_LOGICAL_V2: + rules.addAll(POST_LOGICAL_V2_RULES); + return; + case POST_LOGICAL_ENRICHED_JOIN: + rules.addAll(PinotEnrichedJoinRule.PINOT_ENRICHED_JOIN_RULES); + return; + default: + throw new IllegalStateException( + "DefaultRuleSetCustomizer is missing OSS rule defaults for Phase." + phase + + "; extend the switch when adding a new Phase value."); } - return List.of( - // TODO: Merge the following 2 rules into a single rule - // add an extra exchange for sort - PinotSortExchangeNodeInsertRule.INSTANCE, - sortExchangeCopyRule, - - PinotSingleValueAggregateRemoveRule.INSTANCE, - PinotJoinExchangeNodeInsertRule.INSTANCE, - PinotAggregateExchangeNodeInsertRule.SortProjectAggregate.INSTANCE, - PinotAggregateExchangeNodeInsertRule.SortAggregate.INSTANCE, - PinotAggregateExchangeNodeInsertRule.WithoutSort.INSTANCE, - PinotWindowSplitRule.INSTANCE, - PinotWindowExchangeNodeInsertRule.INSTANCE, - PinotSetOpExchangeNodeInsertRule.INSTANCE, - - // apply dynamic broadcast rule after exchange is inserted/ - PinotJoinToDynamicBroadcastRule.INSTANCE, - - // remove exchanges when there's duplicates - PinotExchangeEliminationRule.INSTANCE, - - // Evaluate the Literal filter nodes - CoreRules.FILTER_REDUCE_EXPRESSIONS, - PinotTableScanConverterRule.INSTANCE - ); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/Phase.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/Phase.java new file mode 100644 index 000000000000..4badee7a1691 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/Phase.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.rules; + + +/// HEP phases the multi-stage planner exposes to plugin +/// [RuleSetCustomizer]s. Every phase corresponds to one slot in the rule +/// programs built by `QueryEnvironment#getOptProgram` / +/// `QueryEnvironment#getTraitProgram`. +/// +/// **Stability**: this enum is append-only — new phases may be added at the +/// end without breaking plugins compiled against an older version. Existing +/// phases must never be reordered or removed. +public enum Phase { + /// Basic logical-rewrite phase. HEP, depth-first. + /// OSS defaults: `DefaultRuleSetCustomizer.BASIC_RULES`. + BASIC, + + /// Filter pushdown rules. The HEP program runs this phase twice (around the + /// project pushdown phase). OSS defaults: + /// `DefaultRuleSetCustomizer.FILTER_PUSHDOWN_RULES`. + FILTER_PUSHDOWN, + + /// Project pushdown rules. + /// OSS defaults: `DefaultRuleSetCustomizer.PROJECT_PUSHDOWN_RULES`. + PROJECT_PUSHDOWN, + + /// Top-down pruning rules. + /// OSS defaults: `DefaultRuleSetCustomizer.PRUNE_RULES`. + PRUNE, + + /// Post-logical rules used when the physical optimizer is **not** enabled. + /// OSS defaults: `DefaultRuleSetCustomizer.POST_LOGICAL_RULES`. The list + /// includes `PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY` configured with + /// the rule's hard-coded default `fetchLimitThreshold`. Per-query overrides + /// (and broker-config overrides) are applied by `QueryEnvironment` swapping + /// the rule on a per-query copy of this list. + POST_LOGICAL, + + /// Post-logical rules used when the physical optimizer **is** enabled. + /// OSS defaults: `DefaultRuleSetCustomizer.POST_LOGICAL_V2_RULES`. + POST_LOGICAL_V2, + + /// Conditional enriched-join rules applied after the post-logical phase + /// when the `JOIN_TO_ENRICHED_JOIN` rule is not skipped. + /// OSS defaults: `PinotEnrichedJoinRule.PINOT_ENRICHED_JOIN_RULES`. + POST_LOGICAL_ENRICHED_JOIN +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/PinotRuleSet.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/PinotRuleSet.java new file mode 100644 index 000000000000..2c816d7bcae5 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/PinotRuleSet.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.rules; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.calcite.plan.RelOptRule; + + +/// Owns the multi-stage planner's per-phase Calcite rule lists. Constructed +/// once at broker startup; lists are immutable for the process lifetime. +/// +/// Construction sequence: +/// +/// 1. Allocate an empty mutable list per [Phase]. +/// 2. For every [RuleSetCustomizer] (in supplied order), call +/// `customize(phase, list)` once per phase. The OSS defaults are themselves +/// contributed by [DefaultRuleSetCustomizer] which is registered as a +/// `ServiceLoader` entry; plugin customizers run after and observe a list +/// pre-populated with the OSS defaults. +/// 3. Defensively copy each per-phase list to an immutable [List] and freeze +/// the map. +/// +/// `QueryEnvironment` reads `rulesFor(phase)` and applies per-query +/// `usePlannerRules` / `skipPlannerRules` filters on top. +public final class PinotRuleSet { + + private final Map> _rulesByPhase; + + /// Builds a rule set from the supplied customizers. Customizers run in the + /// order of the iterable; the framework freezes the per-phase lists after + /// every customizer has run. + public PinotRuleSet(Iterable customizers) { + EnumMap> mutable = new EnumMap<>(Phase.class); + for (Phase phase : Phase.values()) { + mutable.put(phase, new ArrayList<>()); + } + for (RuleSetCustomizer customizer : customizers) { + for (Phase phase : Phase.values()) { + customizer.customize(phase, mutable.get(phase)); + } + } + EnumMap> frozen = new EnumMap<>(Phase.class); + for (Map.Entry> entry : mutable.entrySet()) { + frozen.put(entry.getKey(), List.copyOf(entry.getValue())); + } + _rulesByPhase = Collections.unmodifiableMap(frozen); + } + + /// Discovers every [RuleSetCustomizer] via [ServiceLoader] and builds a + /// rule set from them. Used by [#defaultInstance()] and by callers that + /// don't have an externally-managed customizer list. + public static PinotRuleSet loadFromServiceLoader() { + List customizers = new ArrayList<>(); + for (RuleSetCustomizer customizer : ServiceLoader.load(RuleSetCustomizer.class)) { + customizers.add(customizer); + } + return new PinotRuleSet(customizers); + } + + /// Lazily-initialized process-wide singleton built from the + /// `ServiceLoader`-discovered customizers. Used as the `@Value.Default` of + /// `QueryEnvironment.Config#getRuleSet()` so per-query `Config` instances + /// don't repeat the discovery work. + public static PinotRuleSet defaultInstance() { + return DefaultInstanceHolder.INSTANCE; + } + + /// Returns the rule list for the given phase, after every customization + /// was applied. Per-query filtering by `usePlannerRules` / + /// `skipPlannerRules` is the caller's responsibility (see + /// `QueryEnvironment#getOptProgram`). + public List rulesFor(Phase phase) { + return _rulesByPhase.getOrDefault(phase, List.of()); + } + + private static final class DefaultInstanceHolder { + static final PinotRuleSet INSTANCE = loadFromServiceLoader(); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/RuleSetCustomizer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/RuleSetCustomizer.java new file mode 100644 index 000000000000..951edac20aa1 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/rules/RuleSetCustomizer.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.rules; + +import java.util.List; +import org.apache.calcite.plan.RelOptRule; + + +/// Plugin SPI for customizing the multi-stage planner's Calcite rule sets. +/// +/// Implementations are discovered via [java.util.ServiceLoader]: a plugin +/// declares its customizer in +/// `META-INF/services/org.apache.pinot.query.planner.rules.RuleSetCustomizer`. +/// Implementations must therefore have a public no-arg constructor. +/// +/// [PinotRuleSet] invokes every discovered customizer once per [Phase] at +/// broker startup. The customizer receives the mutable per-phase rule list +/// and can append, remove, replace, or reorder rules using standard `List` +/// operations. After every customizer has run, `PinotRuleSet` defensively +/// copies each list to an immutable form; mutations to the supplied list +/// after `customize` returns have no effect. +/// +/// ### Iteration order +/// +/// Customizers run in `ServiceLoader` iteration order — typically classpath +/// order, which is JVM-dependent. The OSS defaults are themselves contributed +/// by [DefaultRuleSetCustomizer]; plugin authors that depend on observing the +/// OSS defaults should not assume a specific position. To guarantee ordering +/// across customizers, drop and re-add rules from your own `customize` +/// implementation. +/// +/// ### Example +/// +/// ```java +/// public class MyPluginRules implements RuleSetCustomizer { +/// @Override public void customize(Phase phase, List rules) { +/// if (phase == Phase.BASIC) { +/// rules.add(MyOptimizationRule.INSTANCE); +/// rules.removeIf(r -> "BadOldRule".equals(r.toString())); +/// } +/// } +/// } +/// ``` +/// +/// And `META-INF/services/org.apache.pinot.query.planner.rules.RuleSetCustomizer`: +/// ``` +/// com.example.MyPluginRules +/// ``` +/// +/// ### Thread safety +/// +/// Customizers are invoked single-threaded during [PinotRuleSet] construction. +/// Implementations need not be thread-safe and must not retain references to +/// the supplied list — the list is defensively copied after every customizer +/// has run. A customizer that throws aborts construction and propagates the +/// exception out of [PinotRuleSet#loadFromServiceLoader()]. +public interface RuleSetCustomizer { + + /// Modify the broker's rule list for the given phase. The list is mutable + /// during this call only; it is defensively copied after every customizer + /// has run. Implementations may simply return without mutating when the + /// phase is not one they care about. Implementations must not insert `null` + /// rules — `PinotRuleSet` will reject the resulting list. + void customize(Phase phase, List rules); +} diff --git a/pinot-query-planner/src/main/resources/META-INF/services/org.apache.pinot.query.planner.rules.RuleSetCustomizer b/pinot-query-planner/src/main/resources/META-INF/services/org.apache.pinot.query.planner.rules.RuleSetCustomizer new file mode 100644 index 000000000000..19362aa37e6d --- /dev/null +++ b/pinot-query-planner/src/main/resources/META-INF/services/org.apache.pinot.query.planner.rules.RuleSetCustomizer @@ -0,0 +1 @@ +org.apache.pinot.query.planner.rules.DefaultRuleSetCustomizer diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/rules/PinotRuleSetTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/rules/PinotRuleSetTest.java new file mode 100644 index 000000000000..0bf93e1b556a --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/rules/PinotRuleSetTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.rules; + +import java.util.List; +import org.apache.calcite.plan.RelOptRule; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; + + +/// Unit tests for [PinotRuleSet] and the [RuleSetCustomizer] SPI. +public class PinotRuleSetTest { + + @Test + public void defaultsSeedEveryPhaseFromDefaultRuleSetCustomizer() { + PinotRuleSet ruleSet = new PinotRuleSet(List.of(new DefaultRuleSetCustomizer())); + + assertEquals(ruleSet.rulesFor(Phase.BASIC), DefaultRuleSetCustomizer.BASIC_RULES); + assertEquals(ruleSet.rulesFor(Phase.FILTER_PUSHDOWN), DefaultRuleSetCustomizer.FILTER_PUSHDOWN_RULES); + assertEquals(ruleSet.rulesFor(Phase.PROJECT_PUSHDOWN), DefaultRuleSetCustomizer.PROJECT_PUSHDOWN_RULES); + assertEquals(ruleSet.rulesFor(Phase.PRUNE), DefaultRuleSetCustomizer.PRUNE_RULES); + assertEquals(ruleSet.rulesFor(Phase.POST_LOGICAL), DefaultRuleSetCustomizer.POST_LOGICAL_RULES); + assertEquals(ruleSet.rulesFor(Phase.POST_LOGICAL_V2), DefaultRuleSetCustomizer.POST_LOGICAL_V2_RULES); + assertTrue(ruleSet.rulesFor(Phase.POST_LOGICAL_ENRICHED_JOIN).size() > 0, + "POST_LOGICAL_ENRICHED_JOIN should be populated by PinotEnrichedJoinRule.PINOT_ENRICHED_JOIN_RULES"); + } + + @Test + public void serviceLoaderDiscoveryFindsDefault() { + // The DefaultRuleSetCustomizer is registered via META-INF/services, so the + // default instance built by ServiceLoader exposes the OSS defaults. + PinotRuleSet ruleSet = PinotRuleSet.defaultInstance(); + assertTrue(ruleSet.rulesFor(Phase.BASIC).size() > 0); + assertTrue(ruleSet.rulesFor(Phase.POST_LOGICAL).size() > 0); + } + + @Test + public void customizerCanAppendRule() { + RelOptRule extraRule = DefaultRuleSetCustomizer.BASIC_RULES.get(0); + int defaultSize = DefaultRuleSetCustomizer.BASIC_RULES.size(); + + RuleSetCustomizer plugin = (phase, rules) -> { + if (phase == Phase.BASIC) { + rules.add(extraRule); + } + }; + PinotRuleSet ruleSet = new PinotRuleSet(List.of(new DefaultRuleSetCustomizer(), plugin)); + + assertEquals(ruleSet.rulesFor(Phase.BASIC).size(), defaultSize + 1); + assertSame(ruleSet.rulesFor(Phase.BASIC).get(defaultSize), extraRule); + } + + @Test + public void customizerCanRemoveOssRuleByName() { + String firstRuleName = DefaultRuleSetCustomizer.BASIC_RULES.get(0).toString(); + int defaultSize = DefaultRuleSetCustomizer.BASIC_RULES.size(); + + RuleSetCustomizer plugin = (phase, rules) -> { + if (phase == Phase.BASIC) { + rules.removeIf(r -> firstRuleName.equals(r.toString())); + } + }; + PinotRuleSet ruleSet = new PinotRuleSet(List.of(new DefaultRuleSetCustomizer(), plugin)); + + assertEquals(ruleSet.rulesFor(Phase.BASIC).size(), defaultSize - 1); + assertTrue(ruleSet.rulesFor(Phase.BASIC).stream().noneMatch(r -> firstRuleName.equals(r.toString()))); + } + + @Test + public void rulesForUnseededPhaseReturnsEmpty() { + // No customizers — every phase comes back empty. + PinotRuleSet ruleSet = new PinotRuleSet(List.of()); + for (Phase phase : Phase.values()) { + assertEquals(ruleSet.rulesFor(phase).size(), 0, phase + " should be empty"); + } + } +}