From 17128855add6ae50b65f672df315b4f6c3268953 Mon Sep 17 00:00:00 2001 From: Akanksha Kedia Date: Mon, 4 May 2026 17:29:25 +0530 Subject: [PATCH] Centralize batch ingestion job spec constants into SegmentGenerationJobUtils Move duplicated string constants (SEGMENT_GENERATION_JOB_SPEC, DEPENDENCY_JAR_DIR, STAGING_DIR) from HadoopSegmentGenerationJobRunner and SparkSegmentGenerationJobRunner into the shared SegmentGenerationJobUtils class in pinot-batch-ingestion-common. This eliminates silent drift between the Hadoop and Spark runners, where the same config keys were independently declared as private literals. The Hadoop runner's public SEGMENT_GENERATION_JOB_SPEC field is retained as a delegating alias for backward compatibility. Co-Authored-By: Claude Sonnet 4.6 --- .../batch/common/SegmentGenerationJobUtils.java | 7 +++++++ .../hadoop/HadoopSegmentGenerationJobRunner.java | 15 +++++++-------- .../spark3/SparkSegmentGenerationJobRunner.java | 10 +++++----- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java index 816bef6232e7..1129bcce128a 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java @@ -47,6 +47,13 @@ private SegmentGenerationJobUtils() { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationJobUtils.class); + // Key used to pass the serialized SegmentGenerationJobSpec through a distributed job framework + public static final String SEGMENT_GENERATION_JOB_SPEC = "segmentGenerationJobSpec"; + + // Field names in the executionFrameworkSpec/extraConfigs section shared across ingestion frameworks + public static final String DEPENDENCY_JAR_DIR = "dependencyJarDir"; + public static final String STAGING_DIR = "stagingDir"; + /** * Always use local directory sequence id unless explicitly config: "use.global.directory.sequence.id". * diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index 918ecb99da67..de173ff93d9b 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -67,13 +67,10 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements IngestionJobRunner, Serializable { private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentGenerationJobRunner.class); - public static final String SEGMENT_GENERATION_JOB_SPEC = "segmentGenerationJobSpec"; + // Kept for backward compatibility; callers should prefer SegmentGenerationJobUtils.SEGMENT_GENERATION_JOB_SPEC + public static final String SEGMENT_GENERATION_JOB_SPEC = SegmentGenerationJobUtils.SEGMENT_GENERATION_JOB_SPEC; - // Field names in job spec's executionFrameworkSpec/extraConfigs section - private static final String DEPS_JAR_DIR_FIELD = "dependencyJarDir"; - private static final String STAGING_DIR_FIELD = "stagingDir"; - - // Sub-dirs under directory specified by STAGING_DIR_FIELD + // Sub-dirs under the staging directory private static final String SEGMENT_TAR_SUBDIR_NAME = "segmentTar"; private static final String DEPS_JAR_SUBDIR_NAME = "dependencyJars"; @@ -156,7 +153,8 @@ public void run() outputDirFS.mkdir(outputDirURI); //Get staging directory for temporary output pinot segments - String stagingDir = _spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR_FIELD); + String stagingDir = + _spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.STAGING_DIR); Preconditions.checkNotNull(stagingDir, "Please set config: stagingDir under 'executionFrameworkSpec.extraConfigs'"); URI stagingDirURI = URI.create(stagingDir); if (stagingDirURI.getScheme() == null) { @@ -247,7 +245,8 @@ public void run() packPluginsToDistributedCache(job, outputDirFS, stagingDirURI); // Add dependency jars, if we're provided with a directory containing these. - String dependencyJarsSrcDir = _spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR_FIELD); + String dependencyJarsSrcDir = + _spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR); if (dependencyJarsSrcDir != null) { Path dependencyJarsDestPath = new Path(stagingDirURI.toString(), DEPS_JAR_SUBDIR_NAME); addJarsToDistributedCache(job, new File(dependencyJarsSrcDir), outputDirFS, dependencyJarsDestPath.toUri(), diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java index d32fb861fea0..fccf20175cac 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java @@ -67,8 +67,6 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Serializable { private static final Logger LOGGER = LoggerFactory.getLogger(SparkSegmentGenerationJobRunner.class); - private static final String DEPS_JAR_DIR = "dependencyJarDir"; - private static final String STAGING_DIR = "stagingDir"; private SegmentGenerationJobSpec _spec; @@ -155,7 +153,8 @@ public void run() outputDirFS.mkdir(outputDirURI); //Get staging directory for temporary output pinot segments - String stagingDir = _spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR); + String stagingDir = + _spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.STAGING_DIR); URI stagingDirURI = null; if (stagingDir != null) { stagingDirURI = URI.create(stagingDir); @@ -178,9 +177,10 @@ public void run() packPluginsToDistributedCache(sparkContext); // Add dependency jars - if (_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) { + if (_spec.getExecutionFrameworkSpec().getExtraConfigs() + .containsKey(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR)) { addDepsJarToDistributedCache(sparkContext, - _spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR)); + _spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR)); } List pathAndIdxList = new ArrayList<>();