Skip to content

Commit

Permalink
[FLINK-9316][streaming] Expose operator's unique ID in DataStream pro…
Browse files Browse the repository at this point in the history
…grams

This allows to uniquely and stably across multiple job submissions identify operators.
Previously two different operators that were executed by tasks that had the same name
were indistinguishable.
  • Loading branch information
pnowojski authored and tzulitai committed May 22, 2018
1 parent 67eac44 commit 3fd694d
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.FunctionInitializationContext;
Expand Down Expand Up @@ -873,6 +874,11 @@ private static class MockStreamOperator extends AbstractStreamOperator<Integer>
public ExecutionConfig getExecutionConfig() {
return new ExecutionConfig();
}

@Override
public OperatorID getOperatorID() {
return new OperatorID();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand Down Expand Up @@ -83,5 +84,10 @@ private static class TestStreamOperator extends AbstractStreamOperator<Integer>
public ExecutionConfig getExecutionConfig() {
return new ExecutionConfig();
}

@Override
public OperatorID getOperatorID() {
return new OperatorID(42, 44);
}
}
}
8 changes: 8 additions & 0 deletions flink-contrib/flink-storm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.storm.util.TestDummyBolt;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockStreamConfig;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
Expand Down Expand Up @@ -159,7 +159,7 @@ private void testWrapper(final int numberOfAttributes) throws Exception {
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);

final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null);
wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class));
wrapper.open();

wrapper.processElement(record);
Expand Down Expand Up @@ -195,7 +195,7 @@ public void testMultipleOutputStreams() throws Exception {
}

final BoltWrapper wrapper = new BoltWrapper(bolt, null, raw);
wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output);
wrapper.setup(createMockStreamTask(), new MockStreamConfig(), output);
wrapper.open();

final SplitStreamType splitRecord = new SplitStreamType<Integer>();
Expand Down Expand Up @@ -248,7 +248,7 @@ public void testOpen() throws Exception {

final IRichBolt bolt = mock(IRichBolt.class);
BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class));

wrapper.open();
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
Expand All @@ -261,7 +261,7 @@ public void testOpen() throws Exception {

final IRichBolt bolt = mock(IRichBolt.class);
BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class));

wrapper.open();
verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), any(OutputCollector.class));
Expand All @@ -278,7 +278,7 @@ public void testOpen() throws Exception {

TestDummyBolt testBolt = new TestDummyBolt();
BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(testBolt);
wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class));

wrapper.open();
for (Entry<String, String> entry : cfg.toMap().entrySet()) {
Expand All @@ -305,7 +305,7 @@ public void testOpenSink() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);

wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class));
wrapper.open();

verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class));
Expand All @@ -322,7 +322,7 @@ public void testClose() throws Exception {

final BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);

wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class));

wrapper.close();
wrapper.dispose();
Expand Down Expand Up @@ -379,7 +379,7 @@ public Map<String, Object> getComponentConfiguration() {
final CloseableRegistry closeableRegistry = new CloseableRegistry();
StreamTask<?, ?> mockTask = mock(StreamTask.class);
when(mockTask.getCheckpointLock()).thenReturn(new Object());
when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration()));
when(mockTask.getConfiguration()).thenReturn(new MockStreamConfig());
when(mockTask.getEnvironment()).thenReturn(env);
when(mockTask.getExecutionConfig()).thenReturn(execConfig);
when(mockTask.getCancelables()).thenReturn(closeableRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {

private final StreamConfig streamConfig;

private final String operatorUniqueID;

public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
Environment env, Map<String, Accumulator<?, ?>> accumulators) {
super(env.getTaskInfo(),
Expand All @@ -73,6 +75,7 @@ public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
this.operator = operator;
this.taskEnvironment = env;
this.streamConfig = new StreamConfig(env.getTaskConfiguration());
this.operatorUniqueID = operator.getOperatorID().toString();
}

// ------------------------------------------------------------------------
Expand All @@ -90,6 +93,18 @@ public ProcessingTimeService getProcessingTimeService() {
return operator.getProcessingTimeService();
}

/**
* Returned value is guaranteed to be unique between operators within the same job and to be
* stable and the same across job submissions.
*
* <p>This operation is currently only supported in Streaming (DataStream) contexts.
*
* @return String representation of the operator's unique id.
*/
public String getOperatorUniqueID() {
return operatorUniqueID;
}

// ------------------------------------------------------------------------
// broadcast variables
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
Expand Down Expand Up @@ -308,6 +309,11 @@ private static class MockStreamOperator extends AbstractStreamOperator<Integer>
public ExecutionConfig getExecutionConfig() {
return new ExecutionConfig();
}

@Override
public OperatorID getOperatorID() {
return new OperatorID();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;

/**
* Tests the uid translation to {@link org.apache.flink.runtime.jobgraph.OperatorID}.
*/
@SuppressWarnings("serial")
public class GetOperatorUniqueIDTest extends TestLogger {

/**
* If expected values ever change double check that the change is not braking the contract of
* {@link StreamingRuntimeContext#getOperatorUniqueID()} being stable between job submissions.
*/
@Test
public void testGetOperatorUniqueID() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

env.fromElements(1, 2, 3)
.map(new VerifyOperatorIDMapFunction("6c4f323f22da8fb6e34f80c61be7a689")).uid("42")
.map(new VerifyOperatorIDMapFunction("3e129e83691e7737fbf876b47452acbc")).uid("44");

env.execute();
}

private static class VerifyOperatorIDMapFunction extends AbstractRichFunction implements MapFunction<Integer, Integer> {
private static final long serialVersionUID = 6584823409744624276L;

private final String expectedOperatorUniqueID;

public VerifyOperatorIDMapFunction(String expectedOperatorUniqueID) {
this.expectedOperatorUniqueID = checkNotNull(expectedOperatorUniqueID);
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);

assertEquals(expectedOperatorUniqueID, ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID());
}

@Override
public Integer map(Integer value) throws Exception {
return value;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
Expand Down Expand Up @@ -296,6 +297,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(StateDescriptor.class));

when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore);
when(operatorMock.getOperatorID()).thenReturn(new OperatorID());

return operatorMock;
}
Expand Down Expand Up @@ -333,6 +335,7 @@ public ListState<String> answer(InvocationOnMock invocationOnMock) throws Throwa
}).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(ListStateDescriptor.class));

when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore);
when(operatorMock.getOperatorID()).thenReturn(new OperatorID());
return operatorMock;
}

Expand Down Expand Up @@ -369,6 +372,7 @@ public MapState<Integer, String> answer(InvocationOnMock invocationOnMock) throw
}).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(MapStateDescriptor.class));

when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore);
when(operatorMock.getOperatorID()).thenReturn(new OperatorID());
return operatorMock;
}

Expand Down

0 comments on commit 3fd694d

Please sign in to comment.