Skip to content

Commit

Permalink
[FLINK-14231][runtime] Introduce and use ProcessingTimeServiceAware t…
Browse files Browse the repository at this point in the history
…o pass ProcessingTimeService to operator
  • Loading branch information
sunhaibotb authored and pnowojski committed Feb 21, 2020
1 parent fa22ed2 commit ab642cb
Show file tree
Hide file tree
Showing 32 changed files with 362 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ public void testFileReadingOperatorWithIngestionTime() throws Exception {
}

TextInputFormat format = new TextInputFormat(new Path(testBasePath));
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);

final long watermarkInterval = 10;

Expand Down
53 changes: 53 additions & 0 deletions flink-fs-tests/src/test/java/org/apache/flink/hdfstests/Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.hdfstests;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

/**
* Utility class that contains common methods for testing.
*/
public class Utils {

public static <OUT> OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, OUT> createContinuousFileProcessingTestHarness(
FileInputFormat<OUT> inputFormat) throws Exception {

return createContinuousFileProcessingTestHarness(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), null);
}

public static <OUT> OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, OUT> createContinuousFileProcessingTestHarness(
FileInputFormat<OUT> inputFormat,
TypeInformation<OUT> outTypeInfo,
ExecutionConfig executionConfig) throws Exception {

OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, OUT> testHarness =
new OneInputStreamOperatorTestHarness<>(new ContinuousFileReaderOperatorFactory<>(inputFormat));
testHarness.getOperatorFactory().setOutputType(
outTypeInfo,
executionConfig == null ? testHarness.getExecutionConfig() : executionConfig);

return testHarness;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
Expand Down Expand Up @@ -214,8 +215,13 @@ public void onNoMoreData(ContinuousFileReaderOperator<?> op) {
}
};

ContinuousFileReaderOperator(FileInputFormat<OUT> format, MailboxExecutor mailboxExecutor) {
ContinuousFileReaderOperator(
FileInputFormat<OUT> format,
ProcessingTimeService processingTimeService,
MailboxExecutor mailboxExecutor) {

this.format = checkNotNull(format);
this.processingTimeService = checkNotNull(processingTimeService);
this.executor = checkNotNull(mailboxExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
Expand All @@ -32,9 +33,9 @@
/**
* {@link ContinuousFileReaderOperator} factory.
*/
public class ContinuousFileReaderOperatorFactory<OUT> implements YieldingOperatorFactory<OUT>, OneInputStreamOperatorFactory<TimestampedFileInputSplit, OUT> {
public class ContinuousFileReaderOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT>
implements YieldingOperatorFactory<OUT>, OneInputStreamOperatorFactory<TimestampedFileInputSplit, OUT> {

private ChainingStrategy strategy = ChainingStrategy.HEAD;
private final FileInputFormat<OUT> inputFormat;
private TypeInformation<OUT> type;
private ExecutionConfig executionConfig;
Expand All @@ -48,6 +49,7 @@ public ContinuousFileReaderOperatorFactory(FileInputFormat<OUT> inputFormat, Typ
this.inputFormat = inputFormat;
this.type = type;
this.executionConfig = executionConfig;
this.chainingStrategy = ChainingStrategy.HEAD;
}

@Override
Expand All @@ -57,7 +59,7 @@ public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {

@Override
public StreamOperator createStreamOperator(StreamTask containingTask, StreamConfig config, Output output) {
ContinuousFileReaderOperator<OUT> operator = new ContinuousFileReaderOperator<>(inputFormat, mailboxExecutor);
ContinuousFileReaderOperator<OUT> operator = new ContinuousFileReaderOperator<>(inputFormat, processingTimeService, mailboxExecutor);
operator.setup(containingTask, config, output);
operator.setOutputType(type, executionConfig);
return operator;
Expand All @@ -69,16 +71,6 @@ public void setOutputType(TypeInformation<OUT> type, ExecutionConfig executionCo
this.executionConfig = executionConfig;
}

@Override
public void setChainingStrategy(ChainingStrategy strategy) {
this.strategy = strategy;
}

@Override
public ChainingStrategy getChainingStrategy() {
return strategy;
}

@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return ContinuousFileReaderOperator.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public abstract class AbstractStreamOperator<OUT>

// ---------------- time handler ------------------

private transient ProcessingTimeService processingTimeService;
protected transient ProcessingTimeService processingTimeService;
protected transient InternalTimeServiceManager<?> timeServiceManager;

// ---------------- two-input operator watermarks ------------------
Expand All @@ -174,7 +174,6 @@ public abstract class AbstractStreamOperator<OUT>
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
final Environment environment = containingTask.getEnvironment();
this.container = containingTask;
this.processingTimeService = containingTask.getProcessingTimeService(config.getChainIndex());
this.config = config;
try {
OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
Expand Down Expand Up @@ -234,6 +233,15 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S
stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
}

/**
* @deprecated The {@link ProcessingTimeService} instance should be passed by the operator
* constructor and this method will be removed along with {@link SetupableStreamOperator}.
*/
@Deprecated
public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
}

@Override
public MetricGroup getMetricGroup() {
return metrics;
Expand Down Expand Up @@ -558,7 +566,7 @@ public OperatorStateBackend getOperatorStateBackend() {
* Returns the {@link ProcessingTimeService} responsible for getting the current
* processing time and registering timers.
*/
protected ProcessingTimeService getProcessingTimeService() {
public ProcessingTimeService getProcessingTimeService() {
return processingTimeService;
}

Expand Down Expand Up @@ -673,7 +681,6 @@ public final ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}


// ------------------------------------------------------------------------
// Metrics
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;

/**
* Base class for all stream operator factories. It implements some common methods and the
* {@link ProcessingTimeServiceAware} interface which enables stream operators to access
* {@link ProcessingTimeService}.
*/
public abstract class AbstractStreamOperatorFactory<OUT> implements StreamOperatorFactory<OUT>, ProcessingTimeServiceAware {

protected ChainingStrategy chainingStrategy = ChainingStrategy.ALWAYS;

protected transient ProcessingTimeService processingTimeService;

@Override
public void setChainingStrategy(ChainingStrategy strategy) {
this.chainingStrategy = strategy;
}

@Override
public ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}

@Override
public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
this.processingTimeService = processingTimeService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* @param <OUT> The output type of the operator
*/
@Internal
public class SimpleOperatorFactory<OUT> implements StreamOperatorFactory<OUT> {
public class SimpleOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT> {

private final StreamOperator<OUT> operator;

Expand All @@ -61,6 +61,7 @@ public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator)

protected SimpleOperatorFactory(StreamOperator<OUT> operator) {
this.operator = checkNotNull(operator);
this.chainingStrategy = operator.getChainingStrategy();
}

public StreamOperator<OUT> getOperator() {
Expand All @@ -71,6 +72,9 @@ public StreamOperator<OUT> getOperator() {
@Override
public <T extends StreamOperator<OUT>> T createStreamOperator(StreamTask<?, ?> containingTask,
StreamConfig config, Output<StreamRecord<OUT>> output) {
if (operator instanceof AbstractStreamOperator) {
((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService);
}
if (operator instanceof SetupableStreamOperator) {
((SetupableStreamOperator) operator).setup(containingTask, config, output);
}
Expand All @@ -79,14 +83,10 @@ public <T extends StreamOperator<OUT>> T createStreamOperator(StreamTask<?, ?> c

@Override
public void setChainingStrategy(ChainingStrategy strategy) {
this.chainingStrategy = strategy;
operator.setChainingStrategy(strategy);
}

@Override
public ChainingStrategy getChainingStrategy() {
return operator.getChainingStrategy();
}

@Override
public boolean isStreamSource() {
return operator instanceof StreamSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;

/**
* A utility to instantiate new operators with a given factory.
Expand All @@ -40,11 +41,18 @@ public static <OUT, OP extends StreamOperator<OUT>> OP createOperator(
StreamTask<OUT, ?> containingTask,
StreamConfig configuration,
Output<StreamRecord<OUT>> output) {
MailboxExecutorFactory mailboxExecutorFactory = containingTask.getMailboxExecutorFactory();

MailboxExecutor mailboxExecutor = containingTask.getMailboxExecutorFactory().createExecutor(configuration.getChainIndex());

if (operatorFactory instanceof YieldingOperatorFactory) {
MailboxExecutor mailboxExecutor = mailboxExecutorFactory.createExecutor(configuration.getChainIndex());
((YieldingOperatorFactory) operatorFactory).setMailboxExecutor(mailboxExecutor);
}

if (operatorFactory instanceof ProcessingTimeServiceAware) {
ProcessingTimeService processingTimeService = containingTask.getProcessingTimeServiceFactory().createProcessingTimeService(mailboxExecutor);
((ProcessingTimeServiceAware) operatorFactory).setProcessingTimeService(processingTimeService);
}

return operatorFactory.createStreamOperator(containingTask, configuration, output);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -108,6 +109,7 @@ public AsyncWaitOperator(
long timeout,
int capacity,
@Nonnull AsyncDataStream.OutputMode outputMode,
@Nonnull ProcessingTimeService processingTimeService,
@Nonnull MailboxExecutor mailboxExecutor) {
super(asyncFunction);

Expand All @@ -122,6 +124,8 @@ public AsyncWaitOperator(

this.timeout = timeout;

this.processingTimeService = Preconditions.checkNotNull(processingTimeService);

this.mailboxExecutor = mailboxExecutor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
Expand All @@ -33,13 +34,14 @@
*
* @param <OUT> The output type of the operator
*/
public class AsyncWaitOperatorFactory<IN, OUT> implements OneInputStreamOperatorFactory<IN, OUT>, YieldingOperatorFactory<OUT> {
public class AsyncWaitOperatorFactory<IN, OUT> extends AbstractStreamOperatorFactory<OUT>
implements OneInputStreamOperatorFactory<IN, OUT>, YieldingOperatorFactory<OUT> {

private final AsyncFunction<IN, OUT> asyncFunction;
private final long timeout;
private final int capacity;
private final AsyncDataStream.OutputMode outputMode;
private MailboxExecutor mailboxExecutor;
private ChainingStrategy strategy = ChainingStrategy.HEAD;

public AsyncWaitOperatorFactory(
AsyncFunction<IN, OUT> asyncFunction,
Expand All @@ -50,6 +52,7 @@ public AsyncWaitOperatorFactory(
this.timeout = timeout;
this.capacity = capacity;
this.outputMode = outputMode;
this.chainingStrategy = ChainingStrategy.HEAD;
}

@Override
Expand All @@ -64,21 +67,12 @@ public StreamOperator createStreamOperator(StreamTask containingTask, StreamConf
timeout,
capacity,
outputMode,
processingTimeService,
mailboxExecutor);
asyncWaitOperator.setup(containingTask, config, output);
return asyncWaitOperator;
}

@Override
public void setChainingStrategy(ChainingStrategy strategy) {
this.strategy = strategy;
}

@Override
public ChainingStrategy getChainingStrategy() {
return strategy;
}

@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return AsyncWaitOperator.class;
Expand Down
Loading

0 comments on commit ab642cb

Please sign in to comment.