Skip to content

Commit

Permalink
[FLINK-21626][core] Make RuntimeContext.jobID non-optional
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Mar 28, 2021
1 parent b19538d commit 42c6414
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.util.AbstractID;

import javax.transaction.xa.Xid;

Expand Down Expand Up @@ -63,12 +62,8 @@ public void open() {

@Override
public Xid generateXid(RuntimeContext runtimeContext, long checkpointId) {
byte[] jobIdOrRandomBytes =
runtimeContext
.getJobId()
.map(AbstractID::getBytes)
.orElse(getRandomBytes(JobID.SIZE));
System.arraycopy(jobIdOrRandomBytes, 0, gtridBuffer, 0, JobID.SIZE);
byte[] jobIdBytes = runtimeContext.getJobId().getBytes();
System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JobID.SIZE);

writeNumber(runtimeContext.getIndexOfThisSubtask(), Integer.BYTES, gtridBuffer, JobID.SIZE);
writeNumber(checkpointId, Long.BYTES, gtridBuffer, JobID.SIZE + Integer.BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -168,8 +167,8 @@ static JdbcXaSinkFunction<TestEntry> buildSink(
static final RuntimeContext TEST_RUNTIME_CONTEXT =
new RuntimeContext() {
@Override
public Optional<JobID> getJobId() {
return Optional.of(new JobID());
public JobID getJobId() {
return new JobID();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
Expand All @@ -59,11 +58,10 @@
public interface RuntimeContext {

/**
* The ID of the current job. Empty if the execution happens outside of any job context (e.g.
* standalone collection executor). Note that Job ID can change in particular upon manual
* restart. The returned ID should NOT be used for any job management tasks.
* The ID of the current job. Note that Job ID can change in particular upon manual restart. The
* returned ID should NOT be used for any job management tasks.
*/
Optional<JobID> getJobId();
JobID getJobId();

/**
* Returns the name of the task in which the UDF runs, as assigned during plan construction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.functions.util;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
Expand All @@ -33,32 +34,55 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;

/** A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators. */
/**
* A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators. Used
* mostly in CollectionExecutor and can be removed along with the DataSet API.
*/
@Internal
public class RuntimeUDFContext extends AbstractRuntimeUDFContext {

private final HashMap<String, Object> initializedBroadcastVars = new HashMap<>();

private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<>();
private final JobID jobID;

@VisibleForTesting
public RuntimeUDFContext(
TaskInfo taskInfo,
ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks,
Map<String, Accumulator<?, ?>> accumulators,
MetricGroup metrics) {
this(
taskInfo,
userCodeClassLoader,
executionConfig,
cpTasks,
accumulators,
metrics,
new JobID());
}

public RuntimeUDFContext(
TaskInfo taskInfo,
ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks,
Map<String, Accumulator<?, ?>> accumulators,
MetricGroup metrics,
JobID jobID) {
super(
taskInfo,
SimpleUserCodeClassLoader.create(userCodeClassLoader),
executionConfig,
accumulators,
cpTasks,
metrics);
this.jobID = jobID;
}

@Override
Expand Down Expand Up @@ -117,8 +141,8 @@ public <T, C> C getBroadcastVariableWithInitializer(
}

@Override
public Optional<JobID> getJobId() {
return Optional.empty();
public JobID getJobId() {
return jobID;
}

@Override
Expand Down
Loading

0 comments on commit 42c6414

Please sign in to comment.