Skip to content

Commit

Permalink
[FLINK-21442][connectors/jdbc] Fix XaSinkStateSerializer serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
mobuchowski authored and rkhachatryan committed Feb 25, 2021
1 parent 7fd4767 commit eb0c19d
Showing 1 changed file with 26 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,14 @@

/** XaSinkStateSerializer. */
@Internal
final class XaSinkStateSerializer extends TypeSerializer<JdbcXaSinkFunctionState> {
public final class XaSinkStateSerializer extends TypeSerializer<JdbcXaSinkFunctionState> {

private static final TypeSerializerSnapshot<JdbcXaSinkFunctionState> SNAPSHOT =
new SimpleTypeSerializerSnapshot<JdbcXaSinkFunctionState>(XaSinkStateSerializer::new) {
private static final int VERSION = 1;

@Override
public void writeSnapshot(DataOutputView out) throws IOException {
super.writeSnapshot(out);
out.writeInt(VERSION);
}

@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader)
throws IOException {
super.readSnapshot(readVersion, in, classLoader);
in.readInt();
}
};

new XaSinkStateSimpleXaTypeSerializerSnapshot();
private final TypeSerializer<Xid> xidSerializer;
private final TypeSerializer<CheckpointAndXid> checkpointAndXidSerializer;

XaSinkStateSerializer() {
public XaSinkStateSerializer() {
this(new XidSerializer(), new CheckpointAndXidSerializer());
}

Expand Down Expand Up @@ -149,4 +133,27 @@ public int hashCode() {
public TypeSerializerSnapshot<JdbcXaSinkFunctionState> snapshotConfiguration() {
return SNAPSHOT;
}

/** Simple {@link TypeSerializerSnapshot} for {@link XaSinkStateSerializer}. */
public static class XaSinkStateSimpleXaTypeSerializerSnapshot
extends SimpleTypeSerializerSnapshot<JdbcXaSinkFunctionState> {
private static final int VERSION = 1;

public XaSinkStateSimpleXaTypeSerializerSnapshot() {
super(XaSinkStateSerializer::new);
}

@Override
public void writeSnapshot(DataOutputView out) throws IOException {
super.writeSnapshot(out);
out.writeInt(VERSION);
}

@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader)
throws IOException {
super.readSnapshot(readVersion, in, classLoader);
in.readInt();
}
}
}

0 comments on commit eb0c19d

Please sign in to comment.