Skip to content

Commit

Permalink
[FLINK-2992] Remove use of SerializationUtils
Browse files Browse the repository at this point in the history
This closes apache#1343
  • Loading branch information
rmetzger committed Nov 10, 2015
1 parent b6f00f9 commit e058abb
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.streaming.api.datastream;

import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
Expand Down Expand Up @@ -153,13 +152,9 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
evictor).enableSetProcessingTime(setProcessingTime);

} else {
// we need to copy because we need our own instance of the pre aggregator
@SuppressWarnings("unchecked")
ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);

operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
new PreAggregatingHeapWindowBuffer.Factory<>(function),
new ReduceAllWindowFunction<W, T>(function),
trigger).enableSetProcessingTime(setProcessingTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.streaming.api.datastream;

import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
Expand Down Expand Up @@ -167,15 +166,11 @@ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
evictor).enableSetProcessingTime(setProcessingTime);

} else {
// we need to copy because we need our own instance of the pre aggregator
@SuppressWarnings("unchecked")
ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);

operator = new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
new PreAggregatingHeapWindowBuffer.Factory<>(function),
new ReduceWindowFunction<K, W, T>(function),
trigger).enableSetProcessingTime(setProcessingTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void run(final Object lockingObject, final Output<StreamRecord<T>> collec
public void cancel() {
userFunction.cancel();
// the context may not be initialized if the source was never running.
if(ctx != null) {
if (ctx != null) {
ctx.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.streaming.runtime.operators.windowing;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -42,11 +41,10 @@
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
Expand Down Expand Up @@ -372,16 +370,15 @@ public Context(


@SuppressWarnings("unchecked")
protected Context(DataInputView in) throws Exception {
protected Context(DataInputView in, ClassLoader userClassloader) throws Exception {
this.window = windowSerializer.deserialize(in);
this.watermarkTimer = in.readLong();
this.processingTimeTimer = in.readLong();

int stateSize = in.readInt();
byte[] stateData = new byte[stateSize];
in.read(stateData);
ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
state = InstantiationUtil.deserializeObject(stateData, userClassloader);

this.windowBuffer = windowBufferFactory.create();
int numElements = in.readInt();
Expand All @@ -396,10 +393,9 @@ protected void writeToState(StateBackend.CheckpointStateOutputView out) throws I
out.writeLong(watermarkTimer);
out.writeLong(processingTimeTimer);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
SerializationUtils.serialize(state, baos);
out.writeInt(baos.size());
out.write(baos.toByteArray(), 0, baos.size());
byte[] serializedState = InstantiationUtil.serializeObject(state);
out.writeInt(serializedState.length);
out.write(serializedState, 0, serializedState.length);

MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
out.writeInt(windowBuffer.size());
Expand Down Expand Up @@ -534,18 +530,18 @@ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp)
public void restoreState(StreamTaskState taskState) throws Exception {
super.restoreState(taskState);


final ClassLoader userClassloader = getUserCodeClassloader();
@SuppressWarnings("unchecked")
StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
DataInputView in = inputState.getState(getUserCodeClassloader());
DataInputView in = inputState.getState(userClassloader);

int numWindows = in.readInt();
this.windows = new HashMap<>(numWindows);
this.processingTimeTimers = new HashMap<>();
this.watermarkTimers = new HashMap<>();

for (int j = 0; j < numWindows; j++) {
Context context = new Context(in);
Context context = new Context(in, userClassloader);
windows.put(context.window, context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.streaming.runtime.operators.windowing;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -43,11 +42,10 @@
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
Expand Down Expand Up @@ -436,7 +434,7 @@ public Context(K key,
* {@link #writeToState(StateBackend.CheckpointStateOutputView)}
*/
@SuppressWarnings("unchecked")
protected Context(DataInputView in) throws Exception {
protected Context(DataInputView in, ClassLoader userClassloader) throws Exception {
this.key = keySerializer.deserialize(in);
this.window = windowSerializer.deserialize(in);
this.watermarkTimer = in.readLong();
Expand All @@ -445,8 +443,7 @@ protected Context(DataInputView in) throws Exception {
int stateSize = in.readInt();
byte[] stateData = new byte[stateSize];
in.read(stateData);
ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
state = InstantiationUtil.deserializeObject(stateData, userClassloader);

this.windowBuffer = windowBufferFactory.create();
int numElements = in.readInt();
Expand All @@ -465,10 +462,9 @@ protected void writeToState(StateBackend.CheckpointStateOutputView out) throws I
out.writeLong(watermarkTimer);
out.writeLong(processingTimeTimer);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
SerializationUtils.serialize(state, baos);
out.writeInt(baos.size());
out.write(baos.toByteArray(), 0, baos.size());
byte[] serializedState = InstantiationUtil.serializeObject(state);
out.writeInt(serializedState.length);
out.write(serializedState, 0, serializedState.length);

MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
out.writeInt(windowBuffer.size());
Expand Down Expand Up @@ -608,10 +604,11 @@ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp)
public void restoreState(StreamTaskState taskState) throws Exception {
super.restoreState(taskState);

final ClassLoader userClassloader = getUserCodeClassloader();

@SuppressWarnings("unchecked")
StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
DataInputView in = inputState.getState(getUserCodeClassloader());
DataInputView in = inputState.getState(userClassloader);

int numKeys = in.readInt();
this.windows = new HashMap<>(numKeys);
Expand All @@ -621,7 +618,7 @@ public void restoreState(StreamTaskState taskState) throws Exception {
for (int i = 0; i < numKeys; i++) {
int numWindows = in.readInt();
for (int j = 0; j < numWindows; j++) {
Context context = new Context(in);
Context context = new Context(in, userClassloader);
Map<W, Context> keyWindows = windows.get(context.key);
if (keyWindows == null) {
keyWindows = new HashMap<>(numWindows);
Expand Down
7 changes: 7 additions & 0 deletions tools/maven/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,18 @@ under the License.
<module name="IllegalImport">
<property name="illegalPkgs" value="org.apache.flink.shaded"/>
</module>
<!-- forbid use of commons lang validate -->
<module name="Regexp">
<property name="format" value="org\.apache\.commons\.lang3\.Validate"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use Guava Checks instead of Commons Validate. Please refer to the coding guidelines."/>
</module>
<!-- forbid the use of org.apache.commons.lang.SerializationUtils -->
<module name="Regexp">
<property name="format" value="org\.apache\.commons\.lang\.SerializationUtils"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use Flink's InstantiationUtil instead of common's SerializationUtils"/>
</module>
<module name="NeedBraces">
<property name="tokens" value="LITERAL_IF, LITERAL_ELSE"/>
</module>
Expand Down

0 comments on commit e058abb

Please sign in to comment.