From 1d5dda330ff61e66723ba3d98d38ba4642b8cd0d Mon Sep 17 00:00:00 2001 From: Kurt Young Date: Sun, 31 Mar 2019 17:11:16 +0800 Subject: [PATCH] [FLINK-12062][table-runtime-blink] Introduce bundle operator to streaming table runtime. This closes #8086 --- .../bundle/AbstractMapBundleOperator.java | 175 ++++++++++++++++++ .../bundle/KeyedMapBundleOperator.java | 41 ++++ .../runtime/bundle/MapBundleFunction.java | 62 +++++++ .../runtime/bundle/MapBundleOperator.java | 47 +++++ .../runtime/bundle/trigger/BundleTrigger.java | 53 ++++++ .../bundle/trigger/BundleTriggerCallback.java | 36 ++++ .../bundle/trigger/CoBundleTrigger.java | 62 +++++++ .../bundle/trigger/CountBundleTrigger.java | 62 +++++++ .../bundle/trigger/CountCoBundleTrigger.java | 71 +++++++ .../runtime/bundle/MapBundleOperatorTest.java | 120 ++++++++++++ .../trigger/CountBundleTriggerTest.java | 48 +++++ .../trigger/CountCoBundleTriggerTest.java | 54 ++++++ .../bundle/trigger/TestTriggerCallback.java | 36 ++++ 13 files changed, 867 insertions(+) create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/AbstractMapBundleOperator.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/KeyedMapBundleOperator.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleOperator.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/BundleTrigger.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/BundleTriggerCallback.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CoBundleTrigger.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CountBundleTrigger.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CountCoBundleTrigger.java create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/MapBundleOperatorTest.java create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/CountBundleTriggerTest.java create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/CountCoBundleTriggerTest.java create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/TestTriggerCallback.java diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/AbstractMapBundleOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/AbstractMapBundleOperator.java new file mode 100644 index 0000000000000..d37aa31560c95 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/AbstractMapBundleOperator.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle; + +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.runtime.bundle.trigger.BundleTrigger; +import org.apache.flink.table.runtime.bundle.trigger.BundleTriggerCallback; +import org.apache.flink.table.runtime.context.ExecutionContextImpl; +import org.apache.flink.table.runtime.util.StreamRecordCollector; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link AbstractMapBundleOperator} simply used a java Map to store the input elements + * in key-value form. The map key is typically the same with the state key, so we can do some + * optimizations before accessing states, like pre aggregate values for each key. And we will + * only need to access state every key we have, but not every element we processed. + * + *

NOTES: if all elements we processed have different keys, such operator will only increase + * memory footprint, and will not have any performance improvement. + * + * @param The type of the key in the bundle map + * @param The type of the value in the bundle map + * @param Input type for the operator. + * @param Output type for the operator. + */ +public abstract class AbstractMapBundleOperator + extends AbstractStreamOperator + implements OneInputStreamOperator, BundleTriggerCallback { + + private static final long serialVersionUID = 5081841938324118594L; + + /** The map in heap to store elements. */ + private final transient Map bundle; + + /** The trigger that determines how many elements should be put into a bundle. */ + private final BundleTrigger bundleTrigger; + + /** The function used to process when receiving element. */ + private final MapBundleFunction function; + + /** Output for stream records. */ + private transient Collector collector; + + private transient int numOfElements = 0; + + AbstractMapBundleOperator( + MapBundleFunction function, + BundleTrigger bundleTrigger) { + chainingStrategy = ChainingStrategy.ALWAYS; + this.bundle = new HashMap<>(); + this.function = checkNotNull(function, "function is null"); + this.bundleTrigger = checkNotNull(bundleTrigger, "bundleTrigger is null"); + } + + @Override + public void open() throws Exception { + super.open(); + function.open(new ExecutionContextImpl(this, getRuntimeContext())); + + this.numOfElements = 0; + this.collector = new StreamRecordCollector<>(output); + + bundleTrigger.registerCallback(this); + // reset trigger + bundleTrigger.reset(); + LOG.info("BundleOperator's trigger info: " + bundleTrigger.explain()); + + // counter metric to get the size of bundle + getRuntimeContext().getMetricGroup().gauge("bundleSize", (Gauge) () -> numOfElements); + getRuntimeContext().getMetricGroup().gauge("bundleRatio", (Gauge) () -> { + int numOfKeys = bundle.size(); + if (numOfKeys == 0) { + return 0.0; + } else { + return 1.0 * numOfElements / numOfKeys; + } + }); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + // get the key and value for the map bundle + final IN input = element.getValue(); + final K bundleKey = getKey(input); + final V bundleValue = bundle.get(bundleKey); + + // get a new value after adding this element to bundle + final V newBundleValue = function.addInput(bundleValue, input); + + // update to map bundle + bundle.put(bundleKey, newBundleValue); + + numOfElements++; + bundleTrigger.onElement(input); + } + + /** + * Get the key for current processing element, which will be used as the map bundle's key. + */ + protected abstract K getKey(final IN input) throws Exception; + + @Override + public void finishBundle() throws Exception { + if (!bundle.isEmpty()) { + numOfElements = 0; + function.finishBundle(bundle, collector); + bundle.clear(); + } + bundleTrigger.reset(); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + finishBundle(); + super.processWatermark(mark); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + finishBundle(); + } + + @Override + public void close() throws Exception { + try { + finishBundle(); + } finally { + Exception exception = null; + + try { + super.close(); + if (function != null) { + FunctionUtils.closeFunction(function); + } + } catch (InterruptedException interrupted) { + exception = interrupted; + + Thread.currentThread().interrupt(); + } catch (Exception e) { + exception = e; + } + + if (exception != null) { + LOG.warn("Errors occurred while closing the BundleOperator.", exception); + } + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/KeyedMapBundleOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/KeyedMapBundleOperator.java new file mode 100644 index 0000000000000..ac6ab34432c4f --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/KeyedMapBundleOperator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle; + +import org.apache.flink.table.runtime.bundle.trigger.BundleTrigger; + +/** + * The {@link KeyedMapBundleOperator} uses framework's key as bundle map key, thus can only be + * used on {@link org.apache.flink.streaming.api.datastream.KeyedStream}. + */ +public class KeyedMapBundleOperator extends AbstractMapBundleOperator { + + private static final long serialVersionUID = 1L; + + public KeyedMapBundleOperator( + MapBundleFunction function, + BundleTrigger bundleTrigger) { + super(function, bundleTrigger); + } + + @Override + protected K getKey(IN input) throws Exception { + return (K) getCurrentKey(); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java new file mode 100644 index 0000000000000..8adde15ea54a4 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.table.runtime.context.ExecutionContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** + * Basic interface for map bundle processing. + * + * @param The type of the key in the bundle map + * @param The type of the value in the bundle map + * @param Type of the input elements. + * @param Type of the returned elements. + */ +public abstract class MapBundleFunction implements Function { + + private static final long serialVersionUID = -6672219582127325882L; + + protected transient ExecutionContext ctx; + + public void open(ExecutionContext ctx) throws Exception { + this.ctx = Preconditions.checkNotNull(ctx); + } + + /** + * Adds the given input to the given value, returning the new bundle value. + * + * @param value the existing bundle value, maybe null + * @param input the given input, not null + */ + public abstract V addInput(@Nullable V value, IN input); + + /** + * Called when a bundle is finished. Transform a bundle to zero, one, or more output elements. + */ + public abstract void finishBundle(Map buffer, Collector out) throws Exception; + + public void close() throws Exception {} +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleOperator.java new file mode 100644 index 0000000000000..4d2144d472439 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleOperator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.table.runtime.bundle.trigger.BundleTrigger; + +/** + * The {@link MapBundleOperator} uses a {@link KeySelector} to extract bundle key, thus can be + * used with non-keyed-stream. + */ +public class MapBundleOperator extends AbstractMapBundleOperator { + + private static final long serialVersionUID = 1L; + + /** KeySelector is used to extract key for bundle map. */ + private final KeySelector keySelector; + + public MapBundleOperator( + MapBundleFunction function, + BundleTrigger bundleTrigger, + KeySelector keySelector) { + super(function, bundleTrigger); + this.keySelector = keySelector; + } + + @Override + protected K getKey(IN input) throws Exception { + return this.keySelector.getKey(input); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/BundleTrigger.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/BundleTrigger.java new file mode 100644 index 0000000000000..7b8ce67e2d67d --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/BundleTrigger.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle.trigger; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +/** + * A {@link BundleTrigger} determines when a bundle of input elements should be evaluated and + * trigger the callback which registered previously. + * + * @param The input element type. + */ +@Internal +public interface BundleTrigger extends Serializable { + + /** + * Register a callback which will be called once this trigger decides to finish this bundle. + */ + void registerCallback(BundleTriggerCallback callback); + + /** + * Called for every element that gets added to the bundle. If the trigger decides to start + * evaluate the input, {@link BundleTriggerCallback#finishBundle()} should be invoked. + * + * @param element The element that arrived. + */ + void onElement(final T element) throws Exception; + + /** + * Reset the trigger to its initiate status. + */ + void reset(); + + String explain(); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/BundleTriggerCallback.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/BundleTriggerCallback.java new file mode 100644 index 0000000000000..425f8af026719 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/BundleTriggerCallback.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle.trigger; + +import org.apache.flink.annotation.Internal; + +/** + * Interface for bundle trigger callbacks that can be registered to a {@link BundleTrigger}. + */ +@Internal +public interface BundleTriggerCallback { + + /** + * This method is invoked to finish current bundle and start a new one when the trigger was fired. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + void finishBundle() throws Exception; +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CoBundleTrigger.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CoBundleTrigger.java new file mode 100644 index 0000000000000..d649ce105ed38 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CoBundleTrigger.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle.trigger; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +/** + * A {@link CoBundleTrigger} is similar with {@link BundleTrigger}, and the only differences is + * {@link CoBundleTrigger} can handle two inputs. + * + * @param The first input element type. + * @param The second input element type. + */ +@Internal +public interface CoBundleTrigger extends Serializable { + + /** + * Register a callback which will be called once this trigger decides to finish this bundle. + */ + void registerCallback(BundleTriggerCallback callback); + + /** + * Called for every element that gets added to the bundle from the first input. If the trigger + * decides to start evaluate, {@link BundleTriggerCallback#finishBundle()} should be invoked. + * + * @param element The element that arrived from the first input. + */ + void onElement1(final IN1 element) throws Exception; + + /** + * Called for every element that gets added to the bundle from the second input. If the trigger + * decides to start evaluate, {@link BundleTriggerCallback#finishBundle()} should be invoked. + * + * @param element The element that arrived from the second input. + */ + void onElement2(final IN2 element) throws Exception; + + /** + * Reset the trigger to its initiate status. + */ + void reset(); + + String explain(); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CountBundleTrigger.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CountBundleTrigger.java new file mode 100644 index 0000000000000..8aa561418a45d --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CountBundleTrigger.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle.trigger; + +import org.apache.flink.util.Preconditions; + +/** + * A {@link BundleTrigger} that fires once the count of elements in a bundle reaches the given count. + */ +public class CountBundleTrigger implements BundleTrigger { + + private static final long serialVersionUID = -3640028071558094814L; + + private final long maxCount; + private transient BundleTriggerCallback callback; + private transient long count = 0; + + public CountBundleTrigger(long maxCount) { + Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0"); + this.maxCount = maxCount; + } + + @Override + public void registerCallback(BundleTriggerCallback callback) { + this.callback = Preconditions.checkNotNull(callback, "callback is null"); + } + + @Override + public void onElement(T element) throws Exception { + count++; + if (count >= maxCount) { + callback.finishBundle(); + reset(); + } + } + + @Override + public void reset() { + count = 0; + } + + @Override + public String explain() { + return "CountBundleTrigger with size " + maxCount; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CountCoBundleTrigger.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CountCoBundleTrigger.java new file mode 100644 index 0000000000000..ec623bfc07fdd --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/trigger/CountCoBundleTrigger.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle.trigger; + +import org.apache.flink.util.Preconditions; + +/** + * A {@link CoBundleTrigger} that fires once the count of elements in a bundle reaches the given + * count. + */ +public class CountCoBundleTrigger implements CoBundleTrigger { + + private final long maxCount; + private transient BundleTriggerCallback callback; + private transient long count = 0; + + public CountCoBundleTrigger(long maxCount) { + Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0"); + this.maxCount = maxCount; + } + + @Override + public void registerCallback(BundleTriggerCallback callback) { + this.callback = Preconditions.checkNotNull(callback, "callback is null"); + } + + @Override + public void onElement1(final IN1 element) throws Exception { + count++; + if (count >= maxCount) { + callback.finishBundle(); + reset(); + } + } + + @Override + public void onElement2(final IN2 element) throws Exception { + count++; + if (count >= maxCount) { + callback.finishBundle(); + reset(); + } + } + + @Override + public void reset() { + count = 0; + } + + @Override + public String explain() { + return "CountCoBundleTrigger with size " + maxCount; + } +} + diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/MapBundleOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/MapBundleOperatorTest.java new file mode 100644 index 0000000000000..9112b3ba30410 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/MapBundleOperatorTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.runtime.bundle.trigger.CountBundleTrigger; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link MapBundleOperator}. + */ +public class MapBundleOperatorTest { + + @Test + public void testSimple() throws Exception { + @SuppressWarnings("unchecked") + TestMapBundleFunction func = new TestMapBundleFunction(); + CountBundleTrigger> trigger = new CountBundleTrigger<>(3); + KeySelector, String> keySelector = + (KeySelector, String>) value -> value.f0; + + OneInputStreamOperatorTestHarness, String> op = + new OneInputStreamOperatorTestHarness<>( + new MapBundleOperator<>(func, trigger, keySelector)); + op.open(); + synchronized (op.getCheckpointLock()) { + StreamRecord> input = new StreamRecord<>(null); + + input.replace(new Tuple2<>("k1", "v1")); + op.processElement(input); + + input.replace(new Tuple2<>("k1", "v2")); + op.processElement(input); + + assertEquals(0, func.getFinishCount()); + + input.replace(new Tuple2<>("k2", "v3")); + op.processElement(input); + + assertEquals(1, func.getFinishCount()); + assertThat(Arrays.asList("k1=v1,v2", "k2=v3"), is(func.getOutputs())); + + input.replace(new Tuple2<>("k3", "v4")); + op.processElement(input); + + input.replace(new Tuple2<>("k4", "v5")); + op.processElement(input); + + assertEquals(1, func.getFinishCount()); + + op.close(); + assertEquals(2, func.getFinishCount()); + assertThat(Arrays.asList("k3=v4", "k4=v5"), is(func.getOutputs())); + } + } + + private static class TestMapBundleFunction extends MapBundleFunction, String> { + + private int finishCount = 0; + private List outputs = new ArrayList<>(); + + @Override + public String addInput(@Nullable String value, Tuple2 input) { + if (value == null) { + return input.f1; + } else { + return value + "," + input.f1; + } + } + + @Override + public void finishBundle(Map buffer, Collector out) throws Exception { + finishCount++; + outputs.clear(); + for (Map.Entry entry : buffer.entrySet()) { + outputs.add(entry.getKey() + "=" + entry.getValue()); + } + } + + int getFinishCount() { + return finishCount; + } + + List getOutputs() { + return outputs; + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/CountBundleTriggerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/CountBundleTriggerTest.java new file mode 100644 index 0000000000000..2b4ffd0e72493 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/CountBundleTriggerTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle.trigger; + +import org.junit.Test; + +import static junit.framework.TestCase.assertEquals; + +/** + * Tests for {@link CountBundleTrigger}. + */ +public class CountBundleTriggerTest { + + @Test + public void testTrigger() throws Exception { + CountBundleTrigger trigger = new CountBundleTrigger<>(2); + TestTriggerCallback callback = new TestTriggerCallback(); + trigger.registerCallback(callback); + + trigger.onElement(null); + assertEquals(0, callback.getTriggerCount()); + + trigger.onElement(null); + assertEquals(1, callback.getTriggerCount()); + + trigger.onElement(null); + assertEquals(1, callback.getTriggerCount()); + + trigger.onElement(null); + assertEquals(2, callback.getTriggerCount()); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/CountCoBundleTriggerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/CountCoBundleTriggerTest.java new file mode 100644 index 0000000000000..87222535edd4f --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/CountCoBundleTriggerTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle.trigger; + +import org.junit.Test; + +import static junit.framework.TestCase.assertEquals; + +/** + * Tests for {@link CountCoBundleTrigger}. + */ +public class CountCoBundleTriggerTest { + + @Test + public void testTrigger() throws Exception { + CountCoBundleTrigger trigger = new CountCoBundleTrigger<>(2); + TestTriggerCallback callback = new TestTriggerCallback(); + trigger.registerCallback(callback); + + trigger.onElement1(null); + assertEquals(0, callback.getTriggerCount()); + + trigger.onElement2(null); + assertEquals(1, callback.getTriggerCount()); + + trigger.onElement1(null); + assertEquals(1, callback.getTriggerCount()); + + trigger.onElement1(null); + assertEquals(2, callback.getTriggerCount()); + + trigger.onElement2(null); + assertEquals(2, callback.getTriggerCount()); + + trigger.onElement2(null); + assertEquals(3, callback.getTriggerCount()); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/TestTriggerCallback.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/TestTriggerCallback.java new file mode 100644 index 0000000000000..1d893490d04d2 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/bundle/trigger/TestTriggerCallback.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.bundle.trigger; + +/** + * A bundle trigger callback which simply track the number of triggers. + */ +public class TestTriggerCallback implements BundleTriggerCallback { + + private int triggerCount = 0; + + @Override + public void finishBundle() throws Exception { + triggerCount++; + } + + int getTriggerCount() { + return triggerCount; + } +}