Skip to content

Commit

Permalink
[FLINK-17817][streaming] Fix type serializer duplication in CollectSi…
Browse files Browse the repository at this point in the history
…nkFunction


This closes apache#12272
  • Loading branch information
tsreaper committed May 21, 2020
1 parent 9f3a711 commit 1cf06e1
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void open(Configuration parameters) throws Exception {
// so that the client can know if the sink has been restarted
version = UUID.randomUUID().toString();

serverThread = new ServerThread();
serverThread = new ServerThread(serializer);
serverThread.start();

// sending socket server address to coordinator
Expand Down Expand Up @@ -325,6 +325,7 @@ public static <T> Tuple2<Long, CollectCoordinationResponse<T>> deserializeAccumu
*/
private class ServerThread extends Thread {

private final TypeSerializer<IN> serializer;
private final ServerSocket serverSocket;

private boolean running;
Expand All @@ -333,7 +334,8 @@ private class ServerThread extends Thread {
private DataInputViewStreamWrapper inStream;
private DataOutputViewStreamWrapper outStream;

private ServerThread() throws Exception {
private ServerThread(TypeSerializer<IN> serializer) throws Exception {
this.serializer = serializer.duplicate();
this.serverSocket = new ServerSocket(0, 0, getBindAddress());
this.running = true;
}
Expand Down

0 comments on commit 1cf06e1

Please sign in to comment.