diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java index 75e82dac4756..e254388115a0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java @@ -156,6 +156,10 @@ public Map constructDispatchablePlanFragmentM dispatchablePlanMetadata.getWorkerIdToMailboxesMap(); Preconditions.checkArgument(workerIdToSegmentsMap == null || workerIdToTableNameSegmentsMap == null, "Both workerIdToSegmentsMap and workerIdToTableNameSegmentsMap cannot be set at the same time"); + Map>> workerIdToOptionalSegmentsMap = + dispatchablePlanMetadata.getWorkerIdToOptionalSegmentsMap(); + Map>> workerIdToOptionalTableSegmentsMap = + dispatchablePlanMetadata.getWorkerIdToOptionalTableSegmentsMap(); Map> serverInstanceToWorkerIdsMap = new HashMap<>(); WorkerMetadata[] workerMetadataArray = new WorkerMetadata[workerIdToServerInstanceMap.size()]; for (Map.Entry serverEntry : workerIdToServerInstanceMap.entrySet()) { @@ -165,9 +169,21 @@ public Map constructDispatchablePlanFragmentM WorkerMetadata workerMetadata = new WorkerMetadata(workerId, workerIdToMailboxesMap.get(workerId)); if (workerIdToSegmentsMap != null) { workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId)); + if (workerIdToOptionalSegmentsMap != null) { + Map> optionalForWorker = workerIdToOptionalSegmentsMap.get(workerId); + if (optionalForWorker != null && !optionalForWorker.isEmpty()) { + workerMetadata.setOptionalTableSegmentsMap(optionalForWorker); + } + } } if (workerIdToTableNameSegmentsMap != null) { workerMetadata.setLogicalTableSegmentsMap(workerIdToTableNameSegmentsMap.get(workerId)); + if (workerIdToOptionalTableSegmentsMap != null) { + Map> optionalLogical = workerIdToOptionalTableSegmentsMap.get(workerId); + if (optionalLogical != null && !optionalLogical.isEmpty()) { + workerMetadata.setOptionalTableSegmentsMap(optionalLogical); + } + } } workerMetadataArray[workerId] = workerMetadata; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java index 98690aa86ac7..3d86bdc119b0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java @@ -71,6 +71,9 @@ public class DispatchablePlanMetadata implements Serializable { // Available for leaf stage only // Map from workerId -> {tableType -> segments} private Map>> _workerIdToSegmentsMap; + // Map from workerId -> {tableType -> optional segments}, parallel to _workerIdToSegmentsMap when present + @Nullable + private Map>> _workerIdToOptionalSegmentsMap; // Map from tableType -> segments, available when 'is_replicated' hint is set to true private Map> _replicatedSegments; private TimeBoundaryInfo _timeBoundaryInfo; @@ -85,6 +88,8 @@ public class DispatchablePlanMetadata implements Serializable { * Map from workerId -> {physicalTableName -> segments} is required for logical tables. */ private Map>> _workerIdToTableSegmentsMap; + @Nullable + private Map>> _workerIdToOptionalTableSegmentsMap; private LogicalTableRouteInfo _logicalTableRouteInfo; public List getScannedTables() { @@ -125,6 +130,16 @@ public void setWorkerIdToSegmentsMap(Map>> wor _workerIdToSegmentsMap = workerIdToSegmentsMap; } + @Nullable + public Map>> getWorkerIdToOptionalSegmentsMap() { + return _workerIdToOptionalSegmentsMap; + } + + public void setWorkerIdToOptionalSegmentsMap( + @Nullable Map>> workerIdToOptionalSegmentsMap) { + _workerIdToOptionalSegmentsMap = workerIdToOptionalSegmentsMap; + } + @Nullable public Map> getReplicatedSegments() { return _replicatedSegments; @@ -204,4 +219,14 @@ public void setWorkerIdToTableSegmentsMap( Map>> workerIdToTableSegmentsMap) { _workerIdToTableSegmentsMap = workerIdToTableSegmentsMap; } + + @Nullable + public Map>> getWorkerIdToOptionalTableSegmentsMap() { + return _workerIdToOptionalTableSegmentsMap; + } + + public void setWorkerIdToOptionalTableSegmentsMap( + @Nullable Map>> workerIdToOptionalTableSegmentsMap) { + _workerIdToOptionalTableSegmentsMap = workerIdToOptionalTableSegmentsMap; + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 6c10f2af2b51..6bda23c6fa2f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -328,6 +328,7 @@ protected void assignWorkersToIntermediateFragment(PlanFragment fragment, Dispat } childMetadata.setWorkerIdToServerInstanceMap(childWorkerIdToServerInstanceMap); childMetadata.setWorkerIdToSegmentsMap(childWorkerIdToSegmentsMap); + attachReplicatedLeafOptionalSegments(childMetadata, context, childWorkerIdToServerInstanceMap); } } @@ -530,8 +531,9 @@ private void assignWorkersToNonPartitionedLeafFragment(PlanFragment fragment, Di Preconditions.checkState(!routingTableMap.isEmpty(), "Unable to find routing entries for table: %s", tableName); // acquire time boundary info if it is a hybrid table. + TimeBoundaryInfo timeBoundaryInfo = null; if (routingTableMap.size() > 1) { - TimeBoundaryInfo timeBoundaryInfo = _routingManager.getTimeBoundaryInfo( + timeBoundaryInfo = _routingManager.getTimeBoundaryInfo( TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(tableName))); if (timeBoundaryInfo != null) { metadata.setTimeBoundaryInfo(timeBoundaryInfo); @@ -543,6 +545,7 @@ private void assignWorkersToNonPartitionedLeafFragment(PlanFragment fragment, Di // extract all the instances associated to each table type Map>> serverInstanceToSegmentsMap = new HashMap<>(); + Map>> serverInstanceToOptionalSegmentsMap = new HashMap<>(); for (Map.Entry routingEntry : routingTableMap.entrySet()) { String tableType = routingEntry.getKey(); RoutingTable routingTable = routingEntry.getValue(); @@ -551,9 +554,13 @@ private void assignWorkersToNonPartitionedLeafFragment(PlanFragment fragment, Di for (Map.Entry serverEntry : segmentsMap.entrySet()) { Map> tableTypeToSegmentListMap = serverInstanceToSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>()); - // TODO: support optional segments for multi-stage engine. Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue().getSegments()) == null, "Entry for server {} and table type: {} already exist!", serverEntry.getKey(), tableType); + List optionalSegments = serverEntry.getValue().getOptionalSegments(); + if (CollectionUtils.isNotEmpty(optionalSegments)) { + serverInstanceToOptionalSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>()) + .put(tableType, new ArrayList<>(optionalSegments)); + } } // attach unavailable segments to metadata @@ -571,10 +578,17 @@ private void assignWorkersToNonPartitionedLeafFragment(PlanFragment fragment, Di new ArrayList<>(serverInstanceToSegmentsMap.entrySet()); sortedServerInstanceToSegmentsMap.sort(Comparator.comparing(entry -> entry.getKey().getInstanceId())); + if (sortedServerInstanceToSegmentsMap.isEmpty()) { + assignNonPartitionedLeafWorkersWhenNoServersHaveSegments(metadata, tableName, routingTableMap, context, + timeBoundaryInfo); + return; + } + // Assign 1 worker per server int numWorkers = sortedServerInstanceToSegmentsMap.size(); Map workerIdToServerInstanceMap = Maps.newHashMapWithExpectedSize(numWorkers); Map>> workerIdToSegmentsMap = Maps.newHashMapWithExpectedSize(numWorkers); + Map>> workerIdToOptionalSegmentsMap = Maps.newHashMapWithExpectedSize(numWorkers); for (int workerId = 0; workerId < numWorkers; workerId++) { Map.Entry>> serverEntry = @@ -584,10 +598,52 @@ private void assignWorkersToNonPartitionedLeafFragment(PlanFragment fragment, Di workerIdToServerInstanceMap.put(workerId, server); workerIdToSegmentsMap.put(workerId, segmentsMap); + Map> optionalForServer = serverInstanceToOptionalSegmentsMap.get(serverEntry.getKey()); + if (MapUtils.isNotEmpty(optionalForServer)) { + workerIdToOptionalSegmentsMap.put(workerId, optionalForServer); + } } metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap); metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap); + if (!workerIdToOptionalSegmentsMap.isEmpty()) { + metadata.setWorkerIdToOptionalSegmentsMap(workerIdToOptionalSegmentsMap); + } + } + + /** + * When routing returns no server-to-segment entries (e.g. empty table), still assign a leaf worker on a routable + * server with empty segment lists so the multi-stage leaf can return an empty v1 result. + */ + private void assignNonPartitionedLeafWorkersWhenNoServersHaveSegments(DispatchablePlanMetadata metadata, + String tableName, Map routingTableMap, DispatchablePlanContext context, + @Nullable TimeBoundaryInfo timeBoundaryInfo) { + Map routableServers = _routingManager.getRoutableServerInstanceMap(); + if (routableServers.isEmpty()) { + LOGGER.error("[RequestId: {}] No routable server for empty routing on table: {}", context.getRequestId(), + tableName); + throw new IllegalStateException("No routable server for empty routing on table: " + tableName); + } + List sortedInstanceIds = new ArrayList<>(routableServers.keySet()); + Collections.sort(sortedInstanceIds); + ServerInstance serverInstance = routableServers.get(sortedInstanceIds.get(0)); + Map> emptySegmentsMap = new HashMap<>(); + for (String tableType : routingTableMap.keySet()) { + emptySegmentsMap.put(tableType, List.of()); + } + if (emptySegmentsMap.isEmpty()) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableType != null) { + emptySegmentsMap.put(tableType.name(), List.of()); + } else if (timeBoundaryInfo != null) { + emptySegmentsMap.put(TableType.OFFLINE.name(), List.of()); + emptySegmentsMap.put(TableType.REALTIME.name(), List.of()); + } else { + emptySegmentsMap.put(TableType.OFFLINE.name(), List.of()); + } + } + metadata.setWorkerIdToServerInstanceMap(Map.of(0, new QueryServerInstance(serverInstance))); + metadata.setWorkerIdToSegmentsMap(Map.of(0, emptySegmentsMap)); } /** @@ -726,13 +782,20 @@ private void setSegmentsForReplicatedLeafFragment(DispatchablePlanMetadata metad } } - // TODO: Support unavailable segments and optional segments for replicated leaf stage + Map routingTableMap = + getRoutingTable(tableName, context.getRequestId(), context.getPlannerContext().getOptions()); + for (Map.Entry routingEntry : routingTableMap.entrySet()) { + RoutingTable routingTable = routingEntry.getValue(); + if (!routingTable.getUnavailableSegments().isEmpty()) { + metadata.addUnavailableSegments(tableName, routingTable.getUnavailableSegments()); + } + } + metadata.setReplicatedSegments(segmentsMap); } /** * Returns the segments for the given table, keyed by table type. - * TODO: It doesn't handle unavailable segments. */ private Map> getSegments(String tableName, Map queryOptions) { String samplerName = MapUtils.isNotEmpty(queryOptions) ? QueryOptionsUtils.getTableSampler(queryOptions) : null; @@ -796,20 +859,38 @@ private void assignWorkersToNonPartitionedLeafFragmentForLogicalTable(Dispatchab tableRouteProvider.calculateRoutes(logicalTableRouteInfo, _routingManager, offlineBrokerRequest, realtimeBrokerRequest, context.getRequestId()); + if (logicalTableRouteInfo.getOfflineTables() != null) { + for (TableRouteInfo physicalTableRoute : logicalTableRouteInfo.getOfflineTables()) { + List unavailable = physicalTableRoute.getUnavailableSegments(); + if (CollectionUtils.isNotEmpty(unavailable)) { + metadata.addUnavailableSegments(physicalTableRoute.getOfflineTableName(), unavailable); + } + } + } + if (logicalTableRouteInfo.getRealtimeTables() != null) { + for (TableRouteInfo physicalTableRoute : logicalTableRouteInfo.getRealtimeTables()) { + List unavailable = physicalTableRoute.getUnavailableSegments(); + if (CollectionUtils.isNotEmpty(unavailable)) { + metadata.addUnavailableSegments(physicalTableRoute.getRealtimeTableName(), unavailable); + } + } + } + assignTableSegmentsToWorkers(logicalTableRouteInfo, metadata); } private static void assignTableSegmentsToWorkers(LogicalTableRouteInfo logicalTableRouteInfo, DispatchablePlanMetadata metadata) { - Map>> serverInstanceToLogicalSegmentsMap = - new HashMap<>(); + Map>> serverInstanceToLogicalSegmentsMap = new HashMap<>(); + Map>> serverInstanceToOptionalLogicalSegmentsMap = new HashMap<>(); if (logicalTableRouteInfo.getOfflineTables() != null) { for (TableRouteInfo physicalTableRoute : logicalTableRouteInfo.getOfflineTables()) { // Routing table maybe null if no routing table is found OR there are no segments. if (physicalTableRoute.getOfflineRoutingTable() != null) { transferToServerInstanceLogicalSegmentsMap(physicalTableRoute.getOfflineTableName(), - physicalTableRoute.getOfflineRoutingTable(), serverInstanceToLogicalSegmentsMap); + physicalTableRoute.getOfflineRoutingTable(), serverInstanceToLogicalSegmentsMap, + serverInstanceToOptionalLogicalSegmentsMap); } } } @@ -819,7 +900,8 @@ private static void assignTableSegmentsToWorkers(LogicalTableRouteInfo logicalTa // Routing table maybe null if no routing table is found OR there are no segments. if (physicalTableRoute.getRealtimeRoutingTable() != null) { transferToServerInstanceLogicalSegmentsMap(physicalTableRoute.getRealtimeTableName(), - physicalTableRoute.getRealtimeRoutingTable(), serverInstanceToLogicalSegmentsMap); + physicalTableRoute.getRealtimeRoutingTable(), serverInstanceToLogicalSegmentsMap, + serverInstanceToOptionalLogicalSegmentsMap); } } } @@ -836,6 +918,8 @@ private static void assignTableSegmentsToWorkers(LogicalTableRouteInfo logicalTa Map workerIdToServerInstanceMap = Maps.newHashMapWithExpectedSize(numWorkers); Map>> workerIdToLogicalTableSegmentsMap = Maps.newHashMapWithExpectedSize(numWorkers); + Map>> workerIdToOptionalLogicalTableSegmentsMap = + Maps.newHashMapWithExpectedSize(numWorkers); for (int workerId = 0; workerId < numWorkers; workerId++) { Map.Entry>> serverEntry = @@ -845,22 +929,35 @@ private static void assignTableSegmentsToWorkers(LogicalTableRouteInfo logicalTa workerIdToServerInstanceMap.put(workerId, server); workerIdToLogicalTableSegmentsMap.put(workerId, segmentsMap); + Map> optionalForServer = + serverInstanceToOptionalLogicalSegmentsMap.get(serverEntry.getKey()); + if (MapUtils.isNotEmpty(optionalForServer)) { + workerIdToOptionalLogicalTableSegmentsMap.put(workerId, optionalForServer); + } } metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap); metadata.setWorkerIdToTableSegmentsMap(workerIdToLogicalTableSegmentsMap); + if (!workerIdToOptionalLogicalTableSegmentsMap.isEmpty()) { + metadata.setWorkerIdToOptionalTableSegmentsMap(workerIdToOptionalLogicalTableSegmentsMap); + } } private static void transferToServerInstanceLogicalSegmentsMap(String physicalTableName, Map segmentsMap, - Map>> serverInstanceToLogicalSegmentsMap) { + Map>> serverInstanceToLogicalSegmentsMap, + Map>> serverInstanceToOptionalLogicalSegmentsMap) { for (Map.Entry serverEntry : segmentsMap.entrySet()) { Map> tableNameToSegmentsMap = serverInstanceToLogicalSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>()); - // TODO: support optional segments for multi-stage engine. Preconditions.checkState( tableNameToSegmentsMap.put(physicalTableName, serverEntry.getValue().getSegments()) == null, "Entry for server {} and physical table: {} already exist!", serverEntry.getKey(), physicalTableName); + List optionalSegments = serverEntry.getValue().getOptionalSegments(); + if (CollectionUtils.isNotEmpty(optionalSegments)) { + serverInstanceToOptionalLogicalSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>()) + .put(physicalTableName, new ArrayList<>(optionalSegments)); + } } } @@ -895,10 +992,12 @@ private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata met Map>> workerIdToSegmentsMap = new HashMap<>(); if (numPartitionsPerWorker == 1) { assignOnePartitionPerWorker(tableName, context.getRequestId(), partitionInfoMap, - _routingManager.getEnabledServerInstanceMap(), workedIdToServerInstanceMap, workerIdToSegmentsMap); + _routingManager.getEnabledServerInstanceMap(), workedIdToServerInstanceMap, workerIdToSegmentsMap, + metadata, partitionTableInfo); } else { assignMultiplePartitionsPerWorker(tableName, context.getRequestId(), numPartitionsPerWorker, partitionInfoMap, - _routingManager.getEnabledServerInstanceMap(), workedIdToServerInstanceMap, workerIdToSegmentsMap); + _routingManager.getEnabledServerInstanceMap(), workedIdToServerInstanceMap, workerIdToSegmentsMap, + metadata, partitionTableInfo); } metadata.setWorkerIdToServerInstanceMap(workedIdToServerInstanceMap); metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap); @@ -910,15 +1009,22 @@ private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata met private void assignOnePartitionPerWorker(String tableName, long requestId, PartitionInfo[] partitionInfoMap, Map enabledServerInstanceMap, Map workedIdToServerInstanceMap, - Map>> workerIdToSegmentsMap) { + Map>> workerIdToSegmentsMap, DispatchablePlanMetadata metadata, + PartitionTableInfo partitionTableInfo) { int numPartitions = partitionInfoMap.length; int workerId = 0; for (int i = 0; i < numPartitions; i++) { PartitionInfo partitionInfo = partitionInfoMap[i]; - // TODO: Currently we don't support the case when a partition doesn't contain any segment. The reason is that - // the leaf stage won't be able to directly return empty response. - Preconditions.checkState(partitionInfo != null, "Failed to find any segment for table: %s, partition: %s", - tableName, i); + if (partitionInfo == null) { + ServerInstance serverInstance = + pickDeterministicEnabledServer(enabledServerInstanceMap, requestId + i); + Preconditions.checkState(serverInstance != null, + "No enabled server available for empty partition on table: %s, partition: %s", tableName, i); + workedIdToServerInstanceMap.put(workerId, new QueryServerInstance(serverInstance)); + workerIdToSegmentsMap.put(workerId, emptyPartitionLeafSegmentsMap(metadata, partitionTableInfo)); + workerId++; + continue; + } // NOTE: Pick worker based on the request id so that the same worker is picked across different table scan when // the segments for the same partition is colocated ServerInstance serverInstance = @@ -940,7 +1046,8 @@ private void assignOnePartitionPerWorker(String tableName, long requestId, Parti private void assignMultiplePartitionsPerWorker(String tableName, long requestId, int numPartitionsPerWorker, PartitionInfo[] partitionInfoMap, Map enabledServerInstanceMap, Map workedIdToServerInstanceMap, - Map>> workerIdToSegmentsMap) { + Map>> workerIdToSegmentsMap, DispatchablePlanMetadata metadata, + PartitionTableInfo partitionTableInfo) { int numPartitions = partitionInfoMap.length; assert numPartitions % numPartitionsPerWorker == 0; int numWorkers = numPartitions / numPartitionsPerWorker; @@ -974,11 +1081,17 @@ private void assignMultiplePartitionsPerWorker(String tableName, long requestId, } } } - // TODO: Currently we don't support the case when all partitions for a worker don't contain any segment. The - // reason is that the leaf stage won't be able to directly return empty response. - Preconditions.checkState(fullyReplicatedServers != null, - "Failed to find any segment for table: %s, worker: %s, partitions per worker: %s", tableName, i, - numPartitionsPerWorker); + if (fullyReplicatedServers == null) { + ServerInstance serverInstance = + pickDeterministicEnabledServer(enabledServerInstanceMap, requestId++); + Preconditions.checkState(serverInstance != null, + "No enabled server for empty partition group on table: %s, worker: %s, partitions per worker: %s", + tableName, i, numPartitionsPerWorker); + workedIdToServerInstanceMap.put(workerId, new QueryServerInstance(serverInstance)); + workerIdToSegmentsMap.put(workerId, emptyPartitionLeafSegmentsMap(metadata, partitionTableInfo)); + workerId++; + continue; + } // NOTE: Pick worker based on the request id so that the same worker is picked across different table scan when // the segments for the same partition is colocated ServerInstance serverInstance = pickEnabledServer(fullyReplicatedServers, enabledServerInstanceMap, requestId++); @@ -1188,6 +1301,79 @@ private static class PartitionInfo { } } + private void attachReplicatedLeafOptionalSegments(DispatchablePlanMetadata childMetadata, + DispatchablePlanContext context, Map childWorkerIdToServerInstanceMap) { + List scannedTables = childMetadata.getScannedTables(); + if (scannedTables.isEmpty()) { + return; + } + String tableName = scannedTables.get(0); + Map routingTableMap = + getRoutingTable(tableName, context.getRequestId(), context.getPlannerContext().getOptions()); + if (routingTableMap.size() > 1 && childMetadata.getTimeBoundaryInfo() == null) { + routingTableMap = new HashMap<>(routingTableMap); + routingTableMap.remove(TableType.OFFLINE.name()); + } + Map>> optionalByInstanceId = new HashMap<>(); + for (Map.Entry e : routingTableMap.entrySet()) { + String tableType = e.getKey(); + Map sm = e.getValue().getServerInstanceToSegmentsMap(); + for (Map.Entry se : sm.entrySet()) { + List opt = se.getValue().getOptionalSegments(); + if (CollectionUtils.isEmpty(opt)) { + continue; + } + optionalByInstanceId.computeIfAbsent(se.getKey().getInstanceId(), x -> new HashMap<>()) + .put(tableType, new ArrayList<>(opt)); + } + } + if (optionalByInstanceId.isEmpty()) { + return; + } + Map>> workerIdToOptional = new HashMap<>(); + for (Map.Entry we : childWorkerIdToServerInstanceMap.entrySet()) { + Map> opt = optionalByInstanceId.get(we.getValue().getInstanceId()); + if (MapUtils.isNotEmpty(opt)) { + workerIdToOptional.put(we.getKey(), opt); + } + } + if (!workerIdToOptional.isEmpty()) { + childMetadata.setWorkerIdToOptionalSegmentsMap(workerIdToOptional); + } + } + + private static Map> emptyPartitionLeafSegmentsMap(DispatchablePlanMetadata metadata, + PartitionTableInfo partitionTableInfo) { + String scannedTable = metadata.getScannedTables().get(0); + if (partitionTableInfo._timeBoundaryInfo != null || metadata.getTimeBoundaryInfo() != null) { + return Map.of(TableType.OFFLINE.name(), List.of(), TableType.REALTIME.name(), List.of()); + } + TableType tableType = TableNameBuilder.getTableTypeFromTableName(scannedTable); + if (tableType != null) { + return Map.of(tableType.name(), List.of()); + } + return Map.of(TableType.OFFLINE.name(), List.of()); + } + + @Nullable + private static ServerInstance pickDeterministicEnabledServer( + Map enabledServerInstanceMap, long salt) { + if (enabledServerInstanceMap.isEmpty()) { + return null; + } + List instanceIds = new ArrayList<>(enabledServerInstanceMap.keySet()); + Collections.sort(instanceIds); + int n = instanceIds.size(); + int start = (int) ((salt & Long.MAX_VALUE) % n); + for (int i = 0; i < n; i++) { + ServerInstance instance = enabledServerInstanceMap.get(instanceIds.get((start + i) % n)); + if (instance != null) { + return instance; + } + } + return null; + } + /** * Picks an enabled server deterministically based on the given index to pick. */ diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java index b00a31294efd..2f5d2069f032 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java @@ -44,6 +44,7 @@ public class WorkerMetadata { public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap"; public static final String LOGICAL_TABLE_SEGMENTS_MAP_KEY = "logicalTableSegmentsMap"; + public static final String OPTIONAL_TABLE_SEGMENTS_MAP_KEY = "optionalTableSegmentsMap"; private final int _workerId; private final Map _mailboxInfosMap; @@ -98,6 +99,21 @@ public boolean isLeafStageWorker() { || _customProperties.containsKey(LOGICAL_TABLE_SEGMENTS_MAP_KEY); } + @Nullable + public Map> getOptionalTableSegmentsMap() { + return deserializeStringSegmentListMap(OPTIONAL_TABLE_SEGMENTS_MAP_KEY); + } + + public void setOptionalTableSegmentsMap(Map> optionalTableSegmentsMap) { + String json; + try { + json = JsonUtils.objectToString(optionalTableSegmentsMap); + } catch (JsonProcessingException e) { + throw new RuntimeException("Unable to serialize optional table segments map: " + optionalTableSegmentsMap, e); + } + _customProperties.put(OPTIONAL_TABLE_SEGMENTS_MAP_KEY, json); + } + public void setTableSegmentsMap(Map> tableSegmentsMap) { String tableSegmentsMapStr; try { diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java index 07b614c9ef5e..f2fb67a0b376 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java @@ -43,6 +43,7 @@ import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; @@ -428,6 +429,58 @@ public void testLiteralOnlyStagesUseTableServers() { assertTrue(anyT1Server, "Expected at least one T1 server to be used"); } + @Test + public void testNonPartitionedLeafPropagatesOptionalSegmentsToWorkerMetadata() { + Schema schema = getSchemaBuilder("optTable").build(); + ServerInstance server = getServerInstance("localhost", 1); + Map serverInstanceMap = Map.of(server.getInstanceId(), server); + List optionalSegments = List.of("optionalSeg1"); + RoutingTable routingTable = new RoutingTable( + Map.of(server, new SegmentsToQuery(List.of("requiredSeg1"), optionalSegments)), List.of(), 0); + CapturingRoutingManager routingManager = new CapturingRoutingManager(serverInstanceMap, + Map.of("optTable_OFFLINE", routingTable)); + + Map tableNameMap = new HashMap<>(); + tableNameMap.put("optTable_OFFLINE", "optTable_OFFLINE"); + tableNameMap.put("optTable", "optTable"); + + TableCache tableCache = mock(TableCache.class); + when(tableCache.getTableNameMap()).thenReturn(tableNameMap); + when(tableCache.getActualTableName(anyString())).thenAnswer(inv -> tableNameMap.get(inv.getArgument(0))); + when(tableCache.getSchema(anyString())).thenReturn(schema); + when(tableCache.getTableConfig("optTable_OFFLINE")).thenReturn(mock(TableConfig.class)); + + WorkerManager workerManager = new WorkerManager("Broker_localhost", "localhost", 3, routingManager); + QueryEnvironment queryEnvironment = new QueryEnvironment(CommonConstants.DEFAULT_DATABASE, tableCache, + workerManager); + + try (QueryEnvironment.CompiledQuery compiledQuery = queryEnvironment.compile("SELECT * FROM optTable")) { + DispatchableSubPlan plan = compiledQuery.planQuery(0).getQueryPlan(); + assertNotNull(plan); + boolean foundOptional = false; + for (DispatchablePlanFragment fragment : plan.getQueryStageMap().values()) { + List workers = fragment.getWorkerMetadataList(); + if (workers == null) { + continue; + } + for (WorkerMetadata worker : workers) { + if (!worker.isLeafStageWorker()) { + continue; + } + Map> optionalMap = worker.getOptionalTableSegmentsMap(); + if (optionalMap != null && optionalSegments.equals(optionalMap.get(TableType.OFFLINE.name()))) { + foundOptional = true; + break; + } + } + if (foundOptional) { + break; + } + } + assertTrue(foundOptional, "Leaf worker metadata should include optional segments from routing"); + } + } + /** * A RoutingManager that simulates two tenants of servers, where only one tenant serves the queried * tables. {@code getEnabledServerInstanceMap()} returns all servers across both tenants, while diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java index 5a650941d825..7922d92e418c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java @@ -524,9 +524,14 @@ private void executeOneRequest(ServerQueryRequest request, @Nullable Runnable on } } else { // NOTE: Instance response block might contain data (not metadata only) when all the segments are pruned. - // Add the results block if it contains data. + // Propagate empty-but-typed non-selection results (e.g. zero-row aggregation) so pruned leaves still + // contribute schema upstream. For selection, require at least one row: an empty SelectionResultsBlock + // from the instance response is skipped so a mismatched block schema does not emit a spurious zero-row + // data block before EOS. BaseResultsBlock resultsBlock = instanceResponseBlock.getResultsBlock(); - if (resultsBlock != null && resultsBlock.getNumRows() > 0) { + boolean shouldAdd = resultsBlock != null && (resultsBlock.getNumRows() > 0 + || (resultsBlock.getDataSchema() != null && !(resultsBlock instanceof SelectionResultsBlock))); + if (shouldAdd) { try { addResultsBlock(resultsBlock); } catch (InterruptedException e) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index 3e3c64ccedfe..3c174a92661a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import javax.annotation.Nullable; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.metrics.ServerMetrics; @@ -162,12 +163,15 @@ public static List constructServerQueryRequests(OpChainExecutio String rawTableName = TableNameBuilder.extractRawTableName(stageMetadata.getTableName()); Map> tableSegmentsMap = executionContext.getWorkerMetadata().getTableSegmentsMap(); assert tableSegmentsMap != null; + Map> optionalSegmentsMap = + executionContext.getWorkerMetadata().getOptionalTableSegmentsMap(); TimeBoundaryInfo timeBoundary = stageMetadata.getTimeBoundary(); int numRequests = tableSegmentsMap.size(); if (numRequests == 1) { Map.Entry> entry = tableSegmentsMap.entrySet().iterator().next(); String tableType = entry.getKey(); List segments = entry.getValue(); + List optionalSegments = optionalListOrNull(optionalSegmentsMap, tableType); if (tableType.equals(TableType.OFFLINE.name())) { String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName); TableDataManager tableDataManager = instanceDataManager.getTableDataManager(offlineTableName); @@ -176,7 +180,7 @@ public static List constructServerQueryRequests(OpChainExecutio Pair tableConfigAndSchema = tableDataManager.getCachedTableConfigAndSchema(); return List.of(compileInstanceRequest(executionContext, pinotQuery, timeBoundary, TableType.OFFLINE, tableDataManager.getTableName(), tableConfigAndSchema.getLeft(), tableConfigAndSchema.getRight(), segments, - null)); + optionalSegments, null)); } else { assert tableType.equals(TableType.REALTIME.name()); String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName); @@ -186,13 +190,15 @@ public static List constructServerQueryRequests(OpChainExecutio Pair tableConfigAndSchema = tableDataManager.getCachedTableConfigAndSchema(); return List.of(compileInstanceRequest(executionContext, pinotQuery, timeBoundary, TableType.REALTIME, tableDataManager.getTableName(), tableConfigAndSchema.getLeft(), tableConfigAndSchema.getRight(), segments, - null)); + optionalSegments, null)); } } else { assert numRequests == 2; List offlineSegments = tableSegmentsMap.get(TableType.OFFLINE.name()); List realtimeSegments = tableSegmentsMap.get(TableType.REALTIME.name()); assert offlineSegments != null && realtimeSegments != null; + List offlineOptional = optionalListOrNull(optionalSegmentsMap, TableType.OFFLINE.name()); + List realtimeOptional = optionalListOrNull(optionalSegmentsMap, TableType.REALTIME.name()); String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName); TableDataManager offlineTableDataManager = instanceDataManager.getTableDataManager(offlineTableName); Preconditions.checkState(offlineTableDataManager != null, "Failed to find data manager for table: %s", @@ -208,20 +214,30 @@ public static List constructServerQueryRequests(OpChainExecutio return List.of( compileInstanceRequest(executionContext, new PinotQuery(pinotQuery), timeBoundary, TableType.OFFLINE, offlineTableDataManager.getTableName(), offlineTableConfigAndSchema.getLeft(), - offlineTableConfigAndSchema.getRight(), offlineSegments, null), + offlineTableConfigAndSchema.getRight(), offlineSegments, offlineOptional, null), compileInstanceRequest(executionContext, pinotQuery, timeBoundary, TableType.REALTIME, realtimeTableDataManager.getTableName(), realtimeTableConfigAndSchema.getLeft(), - realtimeTableConfigAndSchema.getRight(), realtimeSegments, null)); + realtimeTableConfigAndSchema.getRight(), realtimeSegments, realtimeOptional, null)); } } + @Nullable + private static List optionalListOrNull(@Nullable Map> optionalSegmentsMap, + String tableTypeKey) { + if (optionalSegmentsMap == null) { + return null; + } + List list = optionalSegmentsMap.get(tableTypeKey); + return CollectionUtils.isEmpty(list) ? null : list; + } + /** * Convert {@link PinotQuery} into an {@link InstanceRequest}. */ private static InstanceRequest compileInstanceRequest(OpChainExecutionContext executionContext, PinotQuery pinotQuery, @Nullable TimeBoundaryInfo timeBoundaryInfo, TableType tableType, String tableNameWithType, TableConfig tableConfig, Schema schema, @Nullable List segmentList, - @Nullable List tableRouteInfoList) { + @Nullable List optionalSegmentList, @Nullable List tableRouteInfoList) { Preconditions.checkArgument(segmentList == null || tableRouteInfoList == null, "Either segmentList OR tableRouteInfoList should be set"); @@ -266,6 +282,9 @@ private static InstanceRequest compileInstanceRequest(OpChainExecutionContext ex } else { instanceRequest.setTableSegmentsInfoList(tableRouteInfoList); } + if (CollectionUtils.isNotEmpty(optionalSegmentList)) { + instanceRequest.setOptionalSegments(optionalSegmentList); + } instanceRequest.setQuery(brokerRequest); return instanceRequest; @@ -432,6 +451,8 @@ private static List constructLogicalTableServerQueryRequests( Map> logicalTableSegmentsMap = executionContext.getWorkerMetadata().getLogicalTableSegmentsMap(); + Map> optionalLogicalTableSegmentsMap = + executionContext.getWorkerMetadata().getOptionalTableSegmentsMap(); List offlineTableRouteInfoList = new ArrayList<>(); List realtimeTableRouteInfoList = new ArrayList<>(); @@ -442,6 +463,12 @@ private static List constructLogicalTableServerQueryRequests( TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo(); tableSegmentsInfo.setTableName(physicalTableName); tableSegmentsInfo.setSegments(entry.getValue()); + if (optionalLogicalTableSegmentsMap != null) { + List optionalSegments = optionalLogicalTableSegmentsMap.get(physicalTableName); + if (CollectionUtils.isNotEmpty(optionalSegments)) { + tableSegmentsInfo.setOptionalSegments(new ArrayList<>(optionalSegments)); + } + } if (tableType == TableType.REALTIME) { realtimeTableRouteInfoList.add(tableSegmentsInfo); } else { @@ -461,14 +488,14 @@ private static List constructLogicalTableServerQueryRequests( return List.of( compileInstanceRequest(executionContext, pinotQuery, timeBoundaryInfo, TableType.OFFLINE, offlineTableName, logicalTableContext.getRefOfflineTableConfig(), logicalTableContext.getLogicalTableSchema(), null, - routeInfoList)); + null, routeInfoList)); } else { Preconditions.checkNotNull(logicalTableContext.getRefRealtimeTableConfig()); String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(logicalTableName); return List.of( compileInstanceRequest(executionContext, pinotQuery, timeBoundaryInfo, TableType.REALTIME, realtimeTableName, logicalTableContext.getRefRealtimeTableConfig(), - logicalTableContext.getLogicalTableSchema(), null, routeInfoList)); + logicalTableContext.getLogicalTableSchema(), null, null, routeInfoList)); } } else { Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig()); @@ -480,10 +507,10 @@ private static List constructLogicalTableServerQueryRequests( return List.of( compileInstanceRequest(executionContext, offlinePinotQuery, timeBoundaryInfo, TableType.OFFLINE, offlineTableName, logicalTableContext.getRefOfflineTableConfig(), - logicalTableContext.getLogicalTableSchema(), null, offlineTableRouteInfoList), + logicalTableContext.getLogicalTableSchema(), null, null, offlineTableRouteInfoList), compileInstanceRequest(executionContext, realtimePinotQuery, timeBoundaryInfo, TableType.REALTIME, realtimeTableName, logicalTableContext.getRefRealtimeTableConfig(), - logicalTableContext.getLogicalTableSchema(), null, realtimeTableRouteInfoList)); + logicalTableContext.getLogicalTableSchema(), null, null, realtimeTableRouteInfoList)); } } }