Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ private SegmentGenerationJobUtils() {

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationJobUtils.class);

// Key used to pass the serialized SegmentGenerationJobSpec through a distributed job framework
public static final String SEGMENT_GENERATION_JOB_SPEC = "segmentGenerationJobSpec";

// Field names in the executionFrameworkSpec/extraConfigs section shared across ingestion frameworks
public static final String DEPENDENCY_JAR_DIR = "dependencyJarDir";
public static final String STAGING_DIR = "stagingDir";

/**
* Always use local directory sequence id unless explicitly config: "use.global.directory.sequence.id".
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,10 @@
public class HadoopSegmentGenerationJobRunner extends Configured implements IngestionJobRunner, Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentGenerationJobRunner.class);

public static final String SEGMENT_GENERATION_JOB_SPEC = "segmentGenerationJobSpec";
// Kept for backward compatibility; callers should prefer SegmentGenerationJobUtils.SEGMENT_GENERATION_JOB_SPEC
public static final String SEGMENT_GENERATION_JOB_SPEC = SegmentGenerationJobUtils.SEGMENT_GENERATION_JOB_SPEC;

// Field names in job spec's executionFrameworkSpec/extraConfigs section
private static final String DEPS_JAR_DIR_FIELD = "dependencyJarDir";
private static final String STAGING_DIR_FIELD = "stagingDir";

// Sub-dirs under directory specified by STAGING_DIR_FIELD
// Sub-dirs under the staging directory
private static final String SEGMENT_TAR_SUBDIR_NAME = "segmentTar";
private static final String DEPS_JAR_SUBDIR_NAME = "dependencyJars";

Expand Down Expand Up @@ -156,7 +153,8 @@ public void run()
outputDirFS.mkdir(outputDirURI);

//Get staging directory for temporary output pinot segments
String stagingDir = _spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR_FIELD);
String stagingDir =
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.STAGING_DIR);
Preconditions.checkNotNull(stagingDir, "Please set config: stagingDir under 'executionFrameworkSpec.extraConfigs'");
URI stagingDirURI = URI.create(stagingDir);
if (stagingDirURI.getScheme() == null) {
Expand Down Expand Up @@ -247,7 +245,8 @@ public void run()
packPluginsToDistributedCache(job, outputDirFS, stagingDirURI);

// Add dependency jars, if we're provided with a directory containing these.
String dependencyJarsSrcDir = _spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR_FIELD);
String dependencyJarsSrcDir =
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR);
if (dependencyJarsSrcDir != null) {
Path dependencyJarsDestPath = new Path(stagingDirURI.toString(), DEPS_JAR_SUBDIR_NAME);
addJarsToDistributedCache(job, new File(dependencyJarsSrcDir), outputDirFS, dependencyJarsDestPath.toUri(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@
public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Serializable {

private static final Logger LOGGER = LoggerFactory.getLogger(SparkSegmentGenerationJobRunner.class);
private static final String DEPS_JAR_DIR = "dependencyJarDir";
private static final String STAGING_DIR = "stagingDir";

private SegmentGenerationJobSpec _spec;

Expand Down Expand Up @@ -155,7 +153,8 @@ public void run()
outputDirFS.mkdir(outputDirURI);

//Get staging directory for temporary output pinot segments
String stagingDir = _spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR);
String stagingDir =
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.STAGING_DIR);
URI stagingDirURI = null;
if (stagingDir != null) {
stagingDirURI = URI.create(stagingDir);
Expand All @@ -178,9 +177,10 @@ public void run()
packPluginsToDistributedCache(sparkContext);

// Add dependency jars
if (_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) {
if (_spec.getExecutionFrameworkSpec().getExtraConfigs()
.containsKey(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR)) {
addDepsJarToDistributedCache(sparkContext,
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR));
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR));
}

List<String> pathAndIdxList = new ArrayList<>();
Expand Down
Loading