Skip to content

Commit

Permalink
[FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for …
Browse files Browse the repository at this point in the history
…intermediate results

This closes apache#254.
  • Loading branch information
uce committed Jan 12, 2015
1 parent c8c50be commit d908ca1
Show file tree
Hide file tree
Showing 284 changed files with 11,806 additions and 11,300 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private <T> void connect(String upStreamVertexName, String downStreamVertexName,
.connectNewDataSetAsInput(upStreamVertex, DistributionPattern.POINTWISE);
} else {
downStreamVertex
.connectNewDataSetAsInput(upStreamVertex, DistributionPattern.BIPARTITE);
.connectNewDataSetAsInput(upStreamVertex, DistributionPattern.ALL_TO_ALL);
}

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.List;
import java.util.Set;

import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.List;
import java.util.Map;

import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.flink.streaming.api.streamvertex;

import java.util.ArrayList;

import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.reader.BufferReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.reader.UnionBufferReader;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
Expand All @@ -28,6 +28,8 @@
import org.apache.flink.streaming.io.CoRecordReader;
import org.apache.flink.util.MutableObjectIterator;

import java.util.ArrayList;

public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {

protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
Expand Down Expand Up @@ -74,27 +76,47 @@ protected void setConfigInputs() throws StreamVertexException {

int numberOfInputs = configuration.getNumberOfInputs();

ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>> inputList1 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>>();
ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>> inputList2 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>>();
ArrayList<BufferReader> inputList1 = new ArrayList<BufferReader>();
ArrayList<BufferReader> inputList2 = new ArrayList<BufferReader>();

for (int i = 0; i < numberOfInputs; i++) {
int inputType = configuration.getInputType(i);
BufferReader reader = getEnvironment().getReader(i);
switch (inputType) {
case 1:
inputList1.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(
this));
break;
case 2:
inputList2.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(
this));
break;
default:
throw new RuntimeException("Invalid input type number: " + inputType);
case 1:
inputList1.add(reader);
break;
case 2:
inputList2.add(reader);
break;
default:
throw new RuntimeException("Invalid input type number: " + inputType);
}
}

coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
inputList1, inputList2);
MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>> reader1;
if (inputList1.size() == 1) {
reader1 = new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(inputList1.get(0));
}
else if (inputList1.size() > 1) {
reader1 = new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(new UnionBufferReader(inputList1.toArray(new BufferReader[inputList1.size()])));
}
else {
throw new IllegalStateException("Illegal input size for first input.");
}

MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>> reader2;
if (inputList2.size() == 1) {
reader2 = new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(inputList2.get(0));
}
else if (inputList2.size() > 1) {
reader2 = new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(new UnionBufferReader(inputList2.toArray(new BufferReader[inputList2.size()])));
}
else {
throw new IllegalStateException("Illegal input size for first input.");
}

coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(reader1, reader2);
}

@Override
Expand All @@ -106,32 +128,29 @@ public void invoke() throws Exception {
@Override
public <X> MutableObjectIterator<X> getInput(int index) {
switch (index) {
case 0:
return (MutableObjectIterator<X>) inputIter1;
case 1:
return (MutableObjectIterator<X>) inputIter2;
default:
throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
case 0:
return (MutableObjectIterator<X>) inputIter1;
case 1:
return (MutableObjectIterator<X>) inputIter2;
default:
throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
}
}

@SuppressWarnings("unchecked")
@Override
public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
switch (index) {
case 0:
return (StreamRecordSerializer<X>) inputDeserializer1;
case 1:
return (StreamRecordSerializer<X>) inputDeserializer2;
default:
throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
case 0:
return (StreamRecordSerializer<X>) inputDeserializer1;
case 1:
return (StreamRecordSerializer<X>) inputDeserializer2;
default:
throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
}
}

@SuppressWarnings("unchecked")
@Override
public <X, Y> CoReaderIterator<X, Y> getCoReader() {
return (CoReaderIterator<X, Y>) coIter;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.reader.UnionBufferReader;
import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
Expand Down Expand Up @@ -58,15 +58,11 @@ protected void setConfigInputs() throws StreamVertexException {

if (numberOfInputs < 2) {

inputs = new MutableRecordReader<IOReadableWritable>(streamVertex);
inputs = new MutableRecordReader<IOReadableWritable>(streamVertex.getEnvironment().getReader(0));

} else {
MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];

for (int i = 0; i < numberOfInputs; i++) {
recordReaders[i] = new MutableRecordReader<IOReadableWritable>(streamVertex);
}
inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
UnionBufferReader reader = new UnionBufferReader(streamVertex.getEnvironment().getAllReaders());
inputs = new MutableRecordReader<IOReadableWritable>(reader);
}

inputIter = createInputIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@

package org.apache.flink.streaming.api.streamvertex;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
Expand All @@ -36,6 +32,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

public class OutputHandler<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);

Expand Down Expand Up @@ -121,15 +121,15 @@ void setPartitioner(int outputNumber,
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;

if (bufferTimeout >= 0) {
output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex.getEnvironment().getWriter(outputNumber),
outputPartitioner, bufferTimeout);

if (LOG.isTraceEnabled()) {
LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}",
bufferTimeout, streamVertex.getClass().getSimpleName());
}
} else {
output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex.getEnvironment().getWriter(outputNumber),
outputPartitioner);

if (LOG.isTraceEnabled()) {
Expand Down Expand Up @@ -161,12 +161,6 @@ public void flushOutputs() throws IOException, InterruptedException {
}
}

public void initializeOutputSerializers() {
for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
output.initializeSerializers();
}
}

long startTime;

public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT> userInvokable)
Expand All @@ -176,8 +170,6 @@ public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT>
streamVertex.getName(), streamVertex.getInstanceID());
}

initializeOutputSerializers();

try {
streamVertex.invokeUserFunction(userInvokable);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.io.BlockingQueueBroker;
Expand Down Expand Up @@ -71,8 +71,6 @@ public void invoke() throws Exception {
LOG.debug("SOURCE {} invoked with instance id {}", getName(), getInstanceID());
}

outputHandler.initializeOutputSerializers();

StreamRecord<OUT> nextRecord;

while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public int getInstanceID() {
return instanceID;
}

public StreamingRuntimeContext createRuntimeContext(String taskName,
Map<String, OperatorState<?>> states) {
public StreamingRuntimeContext createRuntimeContext(String taskName, Map<String, OperatorState<?>> states) {
Environment env = getEnvironment();
return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(), states);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {

public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
Map<String, OperatorState<?>> operatorStates) {
super(name, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
userCodeClassLoader, env.getCopyTask());
this.env = env;
this.operatorStates = operatorStates;
Expand Down
Loading

0 comments on commit d908ca1

Please sign in to comment.