Skip to content

Commit

Permalink
This closes apache#2169
Browse files Browse the repository at this point in the history
  • Loading branch information
davorbonaci committed Mar 7, 2017
2 parents 410534b + 4dda585 commit 1fd52f5
Showing 1 changed file with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.spark.Accumulator;

/**
Expand All @@ -40,12 +41,10 @@
*/
public class SparkRuntimeContext implements Serializable {
private final String serializedPipelineOptions;
private transient CoderRegistry coderRegistry;

/**
* Map fo names to Beam aggregators.
*/
// map for names to Beam aggregators.
private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>();
private transient CoderRegistry coderRegistry;

SparkRuntimeContext(Pipeline pipeline) {
this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions());
Expand All @@ -67,8 +66,8 @@ private static PipelineOptions deserializePipelineOptions(String serializedPipel
}
}

public synchronized PipelineOptions getPipelineOptions() {
return deserializePipelineOptions(serializedPipelineOptions);
public PipelineOptions getPipelineOptions() {
return PipelineOptionsHolder.getOrInit(serializedPipelineOptions);
}

/**
Expand Down Expand Up @@ -118,6 +117,24 @@ public CoderRegistry getCoderRegistry() {
return coderRegistry;
}

private static class PipelineOptionsHolder {
// on executors, this should deserialize once.
private static transient volatile PipelineOptions pipelineOptions = null;

static PipelineOptions getOrInit(String serializedPipelineOptions) {
if (pipelineOptions == null) {
synchronized (PipelineOptionsHolder.class) {
if (pipelineOptions == null) {
pipelineOptions = deserializePipelineOptions(serializedPipelineOptions);
}
}
// register IO factories.
IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
}
return pipelineOptions;
}
}

/**
* Initialize spark aggregators exactly once.
*
Expand Down

0 comments on commit 1fd52f5

Please sign in to comment.