Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"replication": "1",
"retentionTimeUnit": "",
"retentionTimeValue": "",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"segmentPushFrequency": "daily",
"segmentPushType": "REFRESH",
"timeColumnName": "HoursSinceEpoch",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"replication": "1",
"retentionTimeUnit": "",
"retentionTimeValue": "",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"segmentPushFrequency": "daily",
"segmentPushType": "APPEND",
"timeColumnName": "HoursSinceEpoch",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"replication": "1",
"retentionTimeUnit": "",
"retentionTimeValue": "",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"segmentPushFrequency": "daily",
"segmentPushType": "REFRESH",
"timeColumnName": "HoursSinceEpoch",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"replication": "1",
"retentionTimeUnit": "",
"retentionTimeValue": "",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"segmentPushFrequency": "daily",
"segmentPushType": "APPEND",
"timeColumnName": "HoursSinceEpoch",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"replication": "1",
"retentionTimeUnit": "",
"retentionTimeValue": "",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"segmentPushFrequency": "daily",
"segmentPushType": "APPEND",
"timeColumnName": "HoursSinceEpoch",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "5",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"replication": "1",
"replicasPerPartition": "1"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"replication": "1",
"replicasPerPartition": "1"
},
Expand Down
2 changes: 0 additions & 2 deletions helm/pinot/pinot-realtime-quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ data:
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "3650",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"replication": "1",
"replicasPerPartition": "1"
},
Expand Down Expand Up @@ -68,7 +67,6 @@ data:
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "3650",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"replication": "1",
"replicasPerPartition": "1"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;


Expand Down Expand Up @@ -65,10 +66,17 @@ public static boolean allowInstanceAssignment(TableConfig tableConfig,
// Allow OFFLINE instance assignment if the offline table has it configured or (for backward-compatibility) is
// using replica-group segment assignment
case OFFLINE:
Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap();
boolean isSetReplicaGroupAssignmentStrategy = false;
if (segmentAssignmentConfigMap != null
&& segmentAssignmentConfigMap.get(instancePartitionsType.toString()) != null) {
isSetReplicaGroupAssignmentStrategy =
AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY.equalsIgnoreCase(
segmentAssignmentConfigMap.get(instancePartitionsType.toString()).getAssignmentStrategy());
}
Comment on lines +69 to +76
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this logic?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to see if the table has explicitly configured REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY in segment assignment strategy.
I don't know whether it's common to have this strategy explicitly set in segment assignment strategy, but this is the same logic as before, just to check from the segment assignment strategy map instead of segment assignment strategy in segment validation config (the deprecated field).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 78 already covers this. When deprecating a field, in most cases we don't need to add new logic

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always allow instance assignment when instanceAssignmentConfig is configured. It is not tight to replica group assignment strategy

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was this part of the original code

|| AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
            .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()));

And we're deprecating this so I changed this to its counterpart, which is the segmentAssignmentConfigMap.

Also correctness-wise, in SegmentAssignmentStrategyFactory#getSegmentAssignmentStrategy, if a table doesn't have instanceAssignmentConfig but has the strategy explicitly set in segmentAssignmentConfigMap, it would use replica group strategy, so this check is relevant in this sense.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized this is segment assignment config, not instance assignment config.. But this is wrong coupling. Segment assignment config shouldn't interfere with instance assignment. The old AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY has implicit meaning on both instance assignment and segment assignment, but segment assignment shouldn't be coupled with instance assignment.

Segment assignment config is very rarely used (I even forgot its existence). We should also modify all tests to explicitly put instance assignment config instead of segment assignment config.

return tableType == TableType.OFFLINE && ((instanceAssignmentConfigMap != null
&& instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString()) != null)
|| AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()));
|| isSetReplicaGroupAssignmentStrategy);
// Allow CONSUMING/COMPLETED instance assignment if the real-time table has it configured
case CONSUMING:
case COMPLETED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
Expand Down Expand Up @@ -146,7 +147,9 @@ public void testSerDe()
{
// With SegmentAssignmentStrategyConfig
ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new ReplicaGroupStrategyConfig("memberId", 5);
TableConfig tableConfig = tableConfigBuilder.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy")
TableConfig tableConfig = tableConfigBuilder.setSegmentAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new SegmentAssignmentConfig("ReplicaGroupSegmentAssignmentStrategy")))
.setReplicaGroupStrategyConfig(replicaGroupStrategyConfig)
.build();

Expand Down Expand Up @@ -439,7 +442,12 @@ private void checkTenantConfigWithTagOverride(TableConfig tableConfig) {

private void checkSegmentAssignmentStrategyConfig(TableConfig tableConfig) {
SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
assertEquals(validationConfig.getSegmentAssignmentStrategy(), "ReplicaGroupSegmentAssignmentStrategy");
Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap();
assertNotNull(segmentAssignmentConfigMap);
SegmentAssignmentConfig segmentAssignmentConfig =
segmentAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString());
assertNotNull(segmentAssignmentConfig);
assertEquals(segmentAssignmentConfig.getAssignmentStrategy(), "ReplicaGroupSegmentAssignmentStrategy");
ReplicaGroupStrategyConfig replicaGroupStrategyConfig = validationConfig.getReplicaGroupStrategyConfig();
assertNotNull(replicaGroupStrategyConfig);
assertEquals(replicaGroupStrategyConfig.getPartitionColumn(), "memberId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"segmentPushType" : "APPEND",
"replication" : "2",
"timeColumnName" : "timestamp",
"segmentAssignmentStrategy" : "BalanceNumSegmentAssignmentStrategy",
"replicaGroupStrategyConfig" : null,
"hllConfig" : null,
"replicasPerPartition" : 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void setUp()
TEST_INSTANCE.addDummySchema(TABLE_NAME);
// Adding table
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(2).build();
.setNumReplicas(2).build();
TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.Enablement;
Expand Down Expand Up @@ -75,7 +76,8 @@ public void testDefaultOfflineReplicaGroup() {
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setServerTenant(TENANT_NAME)
.setNumReplicas(numReplicas)
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
.setSegmentAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new SegmentAssignmentConfig(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY))).build();
int numInstancesPerPartition = 2;
tableConfig.getValidationConfig()
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(null, numInstancesPerPartition));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
Expand Down Expand Up @@ -86,7 +87,8 @@ public void setUp() {
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
.setStreamConfigs(streamConfigs)
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
.setSegmentAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.COMPLETED.toString(),
new SegmentAssignmentConfig(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)))
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build();
_segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), tableConfig, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
Expand Down Expand Up @@ -135,7 +136,8 @@ private static SegmentAssignment createSegmentAssignment(String tableType) {
TableConfigBuilder builder = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
.setNumReplicas(NUM_REPLICAS)
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
.setSegmentAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.COMPLETED.toString(),
new SegmentAssignmentConfig(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)))
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
TableConfig tableConfig;
if ("upsert".equalsIgnoreCase(tableType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
Expand Down Expand Up @@ -90,7 +91,8 @@ public void setUp() {
TableConfig tableConfigWithoutPartitions =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITHOUT_PARTITION)
.setNumReplicas(NUM_REPLICAS)
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
.setSegmentAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new SegmentAssignmentConfig(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY))).build();
_segmentAssignmentWithoutPartition =
SegmentAssignmentFactory.getSegmentAssignment(null, tableConfigWithoutPartitions, null);

Expand Down Expand Up @@ -140,7 +142,8 @@ public void setUp() {
TableConfig tableConfigWithPartitions =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITH_PARTITION)
.setNumReplicas(NUM_REPLICAS)
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
.setSegmentAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new SegmentAssignmentConfig(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)))
.setReplicaGroupStrategyConfig(replicaGroupStrategyConfig).build();
_segmentAssignmentWithPartition =
SegmentAssignmentFactory.getSegmentAssignment(helixManagerWithPartitions, tableConfigWithPartitions, null);
Expand Down Expand Up @@ -398,7 +401,8 @@ public void testOneReplicaWithPartition() {
new ReplicaGroupStrategyConfig(PARTITION_COLUMN, numInstancesPerPartition);
TableConfig tableConfigWithPartitions =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITH_PARTITION).setNumReplicas(1)
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
.setSegmentAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new SegmentAssignmentConfig(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY))).build();
tableConfigWithPartitions.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(helixManager, tableConfigWithPartitions, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ public void testReplicaGroupSegmentAssignmentStrategyForBackwardCompatibility()
int numInstancesPerPartition = numInstancesPerReplicaGroup / NUM_REPLICAS;
ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
new ReplicaGroupStrategyConfig(PARTITION_COLUMN, numInstancesPerPartition);
Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap = new HashMap<>();
segmentAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(),
new SegmentAssignmentConfig("ReplicaGroup"));
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITH_PARTITION)
.setNumReplicas(NUM_REPLICAS).setSegmentAssignmentStrategy("ReplicaGroup")
.setNumReplicas(NUM_REPLICAS).setSegmentAssignmentConfigMap(segmentAssignmentConfigMap)
.setReplicaGroupStrategyConfig(replicaGroupStrategyConfig).build();

// {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ public void getQueryWorkloadConfigsForTagsTest() {
public TableConfig createTableConfig(String tableName, String serverTag, String brokerTenant, TableType type) {
return new TableConfigBuilder(type)
.setTableName(tableName)
.setSegmentAssignmentStrategy("BalanceNumSegmentAssignmentStrategy")
.setNumReplicas(1)
.setBrokerTenant(brokerTenant)
.setServerTenant(serverTag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"replication": "3",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "5",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"segmentPushFrequency": "daily",
"segmentPushType": "APPEND",
"timeColumnName": "colTime",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"replication": "3",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "5",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"segmentPushFrequency": "daily",
"segmentPushType": "APPEND",
"timeColumnName": "colTime",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
"routing": {},
"segmentsConfig": {
"timeColumn": "eventTime",
"replication": 1,
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
"replication": 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1994,7 +1994,6 @@ public void testValidateQueryApiWithStaticTable()
.setRetentionTimeUnit("DAYS")
.setRetentionTimeValue("5000")
.setDeletedSegmentsRetentionPeriod("7d")
.setSegmentAssignmentStrategy("BalanceNumSegmentAssignmentStrategy")
.setNumReplicas(1)
.setSegmentPushType("APPEND")
.setBrokerTenant("DefaultTenant")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"tableType": "OFFLINE",
"segmentsConfig": {
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"replication": "3"
},
"tenants": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"replicasPerPartition": "1",
"replicaGroupStrategyConfig": {
"partitionColumn": "event_id",
Expand Down
Loading
Loading