Skip to content

Commit

Permalink
[streaming] Exception handling update & minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ghermann authored and StephanEwen committed Aug 18, 2014
1 parent e6766fd commit 35cf874
Show file tree
Hide file tree
Showing 26 changed files with 229 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void initializeConnection() {
channel = connection.createChannel();

} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}

initDone = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,6 @@ public void setUserInvokable(StreamComponentInvokable<?> invokableObject) {
}
}

// @SuppressWarnings("unchecked")
// public <T extends StreamComponentInvokable> Class<? extends T>
// getUserInvokableClass() {
// return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
// }

public <T> StreamComponentInvokable<T> getUserInvokableObject() {
try {
return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
Expand Down Expand Up @@ -156,10 +150,6 @@ public Object getFunction() {
}
}

// public void setFunctionName(String functionName) {
// config.setString(FUNCTION_NAME, functionName);
// }

public String getFunctionName() {
return config.getString(FUNCTION_NAME, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {

OutputSelector<OUT> outputSelector;
private static final Log log = LogFactory.getLog(DirectedStreamCollector.class);
private static final Log LOG = LogFactory.getLog(DirectedStreamCollector.class);
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;

/**
Expand Down Expand Up @@ -97,8 +97,8 @@ private void emit(StreamRecord<OUT> streamRecord) {
}
}
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error(String.format("Emit to %s failed due to: %s", outputName,
if (LOG.isErrorEnabled()) {
LOG.error(String.format("Emit to %s failed due to: %s", outputName,
StringUtils.stringifyException(e)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;

public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
private static final Log log = LogFactory.getLog(RemoteStreamEnvironment.class);
private static final Log LOG = LogFactory.getLog(RemoteStreamEnvironment.class);

private String host;
private int port;
Expand Down Expand Up @@ -72,8 +72,8 @@ public RemoteStreamEnvironment(String host, int port, String... jarFiles) {

@Override
public void execute() {
if (log.isInfoEnabled()) {
log.info("Running remotely at " + host + ":" + port);
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at " + host + ":" + port);
}

JobGraph jobGraph = jobGraphBuilder.getJobGraph();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,21 @@ public SinkInvokable(SinkFunction<IN> sinkFunction) {
@Override
protected void immutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
sinkFunction.invoke((IN) reuse.getObject());
callUserFunctionAndLogException();
resetReuse();
}
}

@Override
protected void mutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
sinkFunction.invoke((IN) reuse.getObject());
callUserFunctionAndLogException();
}
}

@Override
protected void callUserFunction() throws Exception {
sinkFunction.invoke((IN) reuse.getObject());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public SourceInvokable(SourceFunction<OUT> sourceFunction) {
this.sourceFunction = sourceFunction;
}

@Override
public void invoke() throws Exception {
sourceFunction.invoke(collector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,6 @@ public void close() throws Exception {
((RichFunction) userFunction).close();
}
}


public abstract void invoke() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;

public abstract class StreamRecordInvokable<IN, OUT> extends
StreamComponentInvokable<OUT> {
Expand All @@ -35,6 +38,7 @@ public StreamRecordInvokable(Function userFunction) {
}

private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(StreamComponentInvokable.class);

protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
StreamRecordSerializer<IN> serializer;
Expand All @@ -59,7 +63,7 @@ protected StreamRecord<IN> loadNextRecord() {
try {
reuse = recordIterator.next(reuse);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return reuse;
}
Expand All @@ -68,6 +72,20 @@ protected StreamRecord<IN> loadNextRecord() {

protected abstract void mutableInvoke() throws Exception;

protected abstract void callUserFunction() throws Exception;

protected void callUserFunctionAndLogException() {
try {
callUserFunction();
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error(String.format("Calling user function failed due to: %s",
StringUtils.stringifyException(e)));
}
}
}

@Override
public void invoke() throws Exception {
if (this.isMutable) {
mutableInvoke();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

public class BatchGroupReduceInvokable<IN> extends BatchReduceInvokable<IN, IN> {

private static final long serialVersionUID = 1L;

int keyPosition;
protected ReduceFunction<IN> reducer;
private Iterator<StreamRecord<IN>> iterator;
Expand All @@ -40,31 +42,35 @@ public BatchGroupReduceInvokable(ReduceFunction<IN> reduceFunction, long batchSi
values = new MutableTableState<Object, IN>();
}

private IN reduced;
private IN nextValue;
private IN currentValue;

@Override
protected void reduce() throws Exception {
protected void reduce() {
iterator = state.getStreamRecordIterator();
while (iterator.hasNext()) {
StreamRecord<IN> nextRecord = iterator.next();

IN nextValue = nextRecord.getObject();
nextValue = nextRecord.getObject();
Object key = nextRecord.getField(keyPosition);

IN currentValue = values.get(key);
currentValue = values.get(key);
if (currentValue != null) {
IN reduced = reducer.reduce(currentValue, nextValue);
callUserFunctionAndLogException();
values.put(key, reduced);
collector.collect(reduced);
} else {
values.put(key, nextValue);
collector.collect(nextValue);
}
}
System.out.println(values);
values.clear();
System.out.println(values);

}

private static final long serialVersionUID = 1L;
@Override
protected void callUserFunction() throws Exception {
reduced = reducer.reduce(currentValue, nextValue);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,5 @@ protected boolean batchNotFull() {
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ public FilterInvokable(FilterFunction<IN> filterFunction) {
this.filterFunction = filterFunction;
}

private boolean canCollect;

@Override
protected void immutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
if (filterFunction.filter(reuse.getObject())) {
callUserFunctionAndLogException();
if (canCollect) {
collector.collect(reuse.getObject());
}
resetReuse();
Expand All @@ -46,10 +49,15 @@ protected void immutableInvoke() throws Exception {
@Override
protected void mutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
if (filterFunction.filter(reuse.getObject())) {
callUserFunctionAndLogException();
if (canCollect) {
collector.collect(reuse.getObject());
}
}
}

@Override
protected void callUserFunction() throws Exception {
canCollect = filterFunction.filter(reuse.getObject());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,21 @@ public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) {
@Override
protected void immutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
flatMapper.flatMap(reuse.getObject(), collector);
callUserFunctionAndLogException();
resetReuse();
}
}

@Override
protected void mutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
flatMapper.flatMap(reuse.getObject(), collector);
callUserFunctionAndLogException();
}
}

@Override
protected void callUserFunction() throws Exception {
flatMapper.flatMap(reuse.getObject(), collector);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ protected void mutableInvoke() throws Exception {
}
}

private IN reduced;
private IN nextValue;
private IN currentValue;

private void reduce() throws Exception {
Object key = reuse.getField(keyPosition);
IN currentValue = values.get(key);
IN nextValue = reuse.getObject();
currentValue = values.get(key);
nextValue = reuse.getObject();
if (currentValue != null) {
IN reduced = reducer.reduce(currentValue, nextValue);
callUserFunctionAndLogException();
values.put(key, reduced);
collector.collect(reduced);
} else {
Expand All @@ -66,4 +70,9 @@ private void reduce() throws Exception {
}
}

@Override
protected void callUserFunction() throws Exception {
reduced = reducer.reduce(currentValue, nextValue);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,20 @@ public MapInvokable(MapFunction<IN, OUT> mapper) {
@Override
protected void immutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
collector.collect(mapper.map(reuse.getObject()));
callUserFunctionAndLogException();
resetReuse();
}
}

@Override
protected void mutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
collector.collect(mapper.map(reuse.getObject()));
callUserFunctionAndLogException();
}
}

@Override
protected void callUserFunction() throws Exception {
collector.collect(mapper.map(reuse.getObject()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,16 @@ protected void immutableInvoke() throws Exception {
}
}

protected void reduce() throws Exception {
protected void reduce() {
userIterator = state.getIterator();
callUserFunctionAndLogException();
}

@Override
protected void callUserFunction() throws Exception {
reducer.reduce(userIterable, collector);
}

private void collectOneUnit() {
ArrayList<StreamRecord<IN>> list;
list = new ArrayList<StreamRecord<IN>>(listSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,22 @@ public WindowGroupReduceInvokable(ReduceFunction<IN> reduceFunction, long window
values = new MutableTableState<Object, IN>();
}

private IN reduced;
private IN nextValue;
private IN currentValue;

@Override
protected void reduce() throws Exception {
protected void reduce() {
iterator = state.getStreamRecordIterator();
while (iterator.hasNext()) {
StreamRecord<IN> nextRecord = iterator.next();

IN nextValue = nextRecord.getObject();
nextValue = nextRecord.getObject();
Object key = nextRecord.getField(keyPosition);

IN currentValue = values.get(key);
currentValue = values.get(key);
if (currentValue != null) {
IN reduced = reducer.reduce(currentValue, nextValue);
callUserFunctionAndLogException();
values.put(key, reduced);
collector.collect(reduced);
} else {
Expand All @@ -62,6 +66,11 @@ protected void reduce() throws Exception {
values.clear();
}

@Override
protected void callUserFunction() throws Exception {
reduced = reducer.reduce(currentValue, nextValue);
}

private static final long serialVersionUID = 1L;

}
Loading

0 comments on commit 35cf874

Please sign in to comment.