Skip to content

Commit

Permalink
[FLINK-12062][table-runtime-blink] Introduce bundle operator to strea…
Browse files Browse the repository at this point in the history
…ming table runtime.

This closes apache#8086
  • Loading branch information
KurtYoung committed Apr 3, 2019
1 parent 4775207 commit 1d5dda3
Show file tree
Hide file tree
Showing 13 changed files with 867 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.bundle;

import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.bundle.trigger.BundleTrigger;
import org.apache.flink.table.runtime.bundle.trigger.BundleTriggerCallback;
import org.apache.flink.table.runtime.context.ExecutionContextImpl;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The {@link AbstractMapBundleOperator} simply used a java Map to store the input elements
* in key-value form. The map key is typically the same with the state key, so we can do some
* optimizations before accessing states, like pre aggregate values for each key. And we will
* only need to access state every key we have, but not every element we processed.
*
* <p>NOTES: if all elements we processed have different keys, such operator will only increase
* memory footprint, and will not have any performance improvement.
*
* @param <K> The type of the key in the bundle map
* @param <V> The type of the value in the bundle map
* @param <IN> Input type for the operator.
* @param <OUT> Output type for the operator.
*/
public abstract class AbstractMapBundleOperator<K, V, IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {

private static final long serialVersionUID = 5081841938324118594L;

/** The map in heap to store elements. */
private final transient Map<K, V> bundle;

/** The trigger that determines how many elements should be put into a bundle. */
private final BundleTrigger<IN> bundleTrigger;

/** The function used to process when receiving element. */
private final MapBundleFunction<K, V, IN, OUT> function;

/** Output for stream records. */
private transient Collector<OUT> collector;

private transient int numOfElements = 0;

AbstractMapBundleOperator(
MapBundleFunction<K, V, IN, OUT> function,
BundleTrigger<IN> bundleTrigger) {
chainingStrategy = ChainingStrategy.ALWAYS;
this.bundle = new HashMap<>();
this.function = checkNotNull(function, "function is null");
this.bundleTrigger = checkNotNull(bundleTrigger, "bundleTrigger is null");
}

@Override
public void open() throws Exception {
super.open();
function.open(new ExecutionContextImpl(this, getRuntimeContext()));

this.numOfElements = 0;
this.collector = new StreamRecordCollector<>(output);

bundleTrigger.registerCallback(this);
// reset trigger
bundleTrigger.reset();
LOG.info("BundleOperator's trigger info: " + bundleTrigger.explain());

// counter metric to get the size of bundle
getRuntimeContext().getMetricGroup().gauge("bundleSize", (Gauge<Integer>) () -> numOfElements);
getRuntimeContext().getMetricGroup().gauge("bundleRatio", (Gauge<Double>) () -> {
int numOfKeys = bundle.size();
if (numOfKeys == 0) {
return 0.0;
} else {
return 1.0 * numOfElements / numOfKeys;
}
});
}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// get the key and value for the map bundle
final IN input = element.getValue();
final K bundleKey = getKey(input);
final V bundleValue = bundle.get(bundleKey);

// get a new value after adding this element to bundle
final V newBundleValue = function.addInput(bundleValue, input);

// update to map bundle
bundle.put(bundleKey, newBundleValue);

numOfElements++;
bundleTrigger.onElement(input);
}

/**
* Get the key for current processing element, which will be used as the map bundle's key.
*/
protected abstract K getKey(final IN input) throws Exception;

@Override
public void finishBundle() throws Exception {
if (!bundle.isEmpty()) {
numOfElements = 0;
function.finishBundle(bundle, collector);
bundle.clear();
}
bundleTrigger.reset();
}

@Override
public void processWatermark(Watermark mark) throws Exception {
finishBundle();
super.processWatermark(mark);
}

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
finishBundle();
}

@Override
public void close() throws Exception {
try {
finishBundle();
} finally {
Exception exception = null;

try {
super.close();
if (function != null) {
FunctionUtils.closeFunction(function);
}
} catch (InterruptedException interrupted) {
exception = interrupted;

Thread.currentThread().interrupt();
} catch (Exception e) {
exception = e;
}

if (exception != null) {
LOG.warn("Errors occurred while closing the BundleOperator.", exception);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.bundle;

import org.apache.flink.table.runtime.bundle.trigger.BundleTrigger;

/**
* The {@link KeyedMapBundleOperator} uses framework's key as bundle map key, thus can only be
* used on {@link org.apache.flink.streaming.api.datastream.KeyedStream}.
*/
public class KeyedMapBundleOperator<K, V, IN, OUT> extends AbstractMapBundleOperator<K, V, IN, OUT> {

private static final long serialVersionUID = 1L;

public KeyedMapBundleOperator(
MapBundleFunction<K, V, IN, OUT> function,
BundleTrigger<IN> bundleTrigger) {
super(function, bundleTrigger);
}

@Override
protected K getKey(IN input) throws Exception {
return (K) getCurrentKey();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.bundle;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.util.Map;

/**
* Basic interface for map bundle processing.
*
* @param <K> The type of the key in the bundle map
* @param <V> The type of the value in the bundle map
* @param <IN> Type of the input elements.
* @param <OUT> Type of the returned elements.
*/
public abstract class MapBundleFunction<K, V, IN, OUT> implements Function {

private static final long serialVersionUID = -6672219582127325882L;

protected transient ExecutionContext ctx;

public void open(ExecutionContext ctx) throws Exception {
this.ctx = Preconditions.checkNotNull(ctx);
}

/**
* Adds the given input to the given value, returning the new bundle value.
*
* @param value the existing bundle value, maybe null
* @param input the given input, not null
*/
public abstract V addInput(@Nullable V value, IN input);

/**
* Called when a bundle is finished. Transform a bundle to zero, one, or more output elements.
*/
public abstract void finishBundle(Map<K, V> buffer, Collector<OUT> out) throws Exception;

public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.bundle;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.table.runtime.bundle.trigger.BundleTrigger;

/**
* The {@link MapBundleOperator} uses a {@link KeySelector} to extract bundle key, thus can be
* used with non-keyed-stream.
*/
public class MapBundleOperator<K, V, IN, OUT> extends AbstractMapBundleOperator<K, V, IN, OUT> {

private static final long serialVersionUID = 1L;

/** KeySelector is used to extract key for bundle map. */
private final KeySelector<IN, K> keySelector;

public MapBundleOperator(
MapBundleFunction<K, V, IN, OUT> function,
BundleTrigger<IN> bundleTrigger,
KeySelector<IN, K> keySelector) {
super(function, bundleTrigger);
this.keySelector = keySelector;
}

@Override
protected K getKey(IN input) throws Exception {
return this.keySelector.getKey(input);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.bundle.trigger;

import org.apache.flink.annotation.Internal;

import java.io.Serializable;

/**
* A {@link BundleTrigger} determines when a bundle of input elements should be evaluated and
* trigger the callback which registered previously.
*
* @param <T> The input element type.
*/
@Internal
public interface BundleTrigger<T> extends Serializable {

/**
* Register a callback which will be called once this trigger decides to finish this bundle.
*/
void registerCallback(BundleTriggerCallback callback);

/**
* Called for every element that gets added to the bundle. If the trigger decides to start
* evaluate the input, {@link BundleTriggerCallback#finishBundle()} should be invoked.
*
* @param element The element that arrived.
*/
void onElement(final T element) throws Exception;

/**
* Reset the trigger to its initiate status.
*/
void reset();

String explain();
}
Loading

0 comments on commit 1d5dda3

Please sign in to comment.