Skip to content

Commit

Permalink
[FLINK-23498][streaming-java] Expose StreamExecutionEnvironment.getCo…
Browse files Browse the repository at this point in the history
…nfiguration()
  • Loading branch information
twalthr committed Aug 4, 2021
1 parent 659ffa7 commit bf55428
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def excluded_methods(cls):
'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
'socketTextStream', 'initializeContextEnvironment', 'readTextFile',
'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
'clearJobListeners', 'getJobListeners', "fromSequence"}
'clearJobListeners', 'getJobListeners', 'fromSequence', 'getConfiguration'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
Expand Down Expand Up @@ -2187,6 +2188,22 @@ public void addOperator(Transformation<?> transformation) {
this.transformations.add(transformation);
}

/**
* Gives read-only access to the underlying configuration of this environment.
*
* <p>Note that the returned configuration might not be complete. It only contains options that
* have initialized the environment via {@link #StreamExecutionEnvironment(Configuration)} or
* options that are not represented in dedicated configuration classes such as {@link
* ExecutionConfig} or {@link CheckpointConfig}.
*
* <p>Use {@link #configure(ReadableConfig, ClassLoader)} to set options that are specific to
* this environment.
*/
@Internal
public ReadableConfig getConfiguration() {
return new UnmodifiableConfiguration(configuration);
}

// --------------------------------------------------------------------------------------------
// Factory methods for ExecutionEnvironments
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,32 @@

package org.apache.flink.streaming.api.scala

import java.net.URI
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving}
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.{ExecutionConfig, RuntimeExecutionMode}
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
import org.apache.flink.api.common.operators.SlotSharingGroup
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.connector.source.{Source, SourceSplit}
import org.apache.flink.api.connector.source.lib.NumberSequenceSource
import org.apache.flink.api.connector.source.{Source, SourceSplit}
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.source._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.{SplittableIterator, TernaryBoolean}

import com.esotericsoftware.kryo.Serializer

import java.net.URI

import _root_.scala.language.implicitConversions
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -958,9 +960,21 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
def getStreamGraph(jobName: String, clearTransformations: Boolean) =
javaEnv.getStreamGraph(jobName, clearTransformations)

/**
* Gives read-only access to the underlying configuration of this environment.
*
* Note that the returned configuration might not be complete. It only contains options that
* have initialized the environment or options that are not represented in dedicated configuration
* classes such as [[ExecutionConfig]] or [[CheckpointConfig]].
*
* Use [[configure]] to set options that are specific to this environment.
*/
@Internal
def getConfiguration: ReadableConfig = javaEnv.getConfiguration

/**
* Getter of the wrapped [[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment]]
*
*
* @return The encased ExecutionEnvironment
*/
@Internal
Expand Down

0 comments on commit bf55428

Please sign in to comment.