Skip to content

Commit

Permalink
[FLINK-7902] Use TypeSerializer in TwoPhaseCommitSinkFunctions
Browse files Browse the repository at this point in the history
We use custom serializers to ensure that we have control over the
serialization format, which allows us easier evolution of the format in
the future.

This also implements custom serializers for KafkaProducer11, the only
TwoPhaseCommitSinkFunction we currently have.
  • Loading branch information
aljoscha committed Nov 1, 2017
1 parent 944a63c commit 0ba528c
Show file tree
Hide file tree
Showing 5 changed files with 628 additions and 25 deletions.
9 changes: 9 additions & 0 deletions flink-connectors/flink-connector-kafka-0.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ under the License.

<!-- test dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
Expand Down Expand Up @@ -411,7 +415,7 @@ public FlinkKafkaProducer011(
Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
Semantic semantic,
int kafkaProducersPoolSize) {
super(TypeInformation.of(new TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {}));
super(new TransactionStateSerializer(), new ContextStateSerializer());

this.defaultTopicId = checkNotNull(defaultTopicId, "defaultTopicId is null");
this.schema = checkNotNull(serializationSchema, "serializationSchema is null");
Expand Down Expand Up @@ -958,6 +962,8 @@ public int compare(PartitionInfo o1, PartitionInfo o2) {
/**
* State for handling transactions.
*/
@VisibleForTesting
@Internal
static class KafkaTransactionState {

private final transient FlinkKafkaProducer<byte[], byte[]> producer;
Expand All @@ -970,38 +976,260 @@ static class KafkaTransactionState {
final short epoch;

KafkaTransactionState(String transactionalId, FlinkKafkaProducer<byte[], byte[]> producer) {
this.producer = producer;
this.transactionalId = transactionalId;
this.producerId = producer.getProducerId();
this.epoch = producer.getEpoch();
this(transactionalId, producer.getProducerId(), producer.getEpoch(), producer);
}

KafkaTransactionState(FlinkKafkaProducer<byte[], byte[]> producer) {
this(null, -1, (short) -1, producer);
}

KafkaTransactionState(
String transactionalId,
long producerId,
short epoch,
FlinkKafkaProducer<byte[], byte[]> producer) {
this.transactionalId = transactionalId;
this.producerId = producerId;
this.epoch = epoch;
this.producer = producer;
this.transactionalId = null;
this.producerId = -1;
this.epoch = -1;
}

@Override
public String toString() {
return String.format("%s [transactionalId=%s]", this.getClass().getSimpleName(), transactionalId);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

KafkaTransactionState that = (KafkaTransactionState) o;

if (producerId != that.producerId) {
return false;
}
if (epoch != that.epoch) {
return false;
}
return transactionalId != null ? transactionalId.equals(that.transactionalId) : that.transactionalId == null;
}

@Override
public int hashCode() {
int result = transactionalId != null ? transactionalId.hashCode() : 0;
result = 31 * result + (int) (producerId ^ (producerId >>> 32));
result = 31 * result + (int) epoch;
return result;
}
}

/**
* Context associated to this instance of the {@link FlinkKafkaProducer011}. User for keeping track of the
* transactionalIds.
*/
static class KafkaTransactionContext {
@VisibleForTesting
@Internal
public static class KafkaTransactionContext {
final Set<String> transactionalIds;

KafkaTransactionContext(Set<String> transactionalIds) {
checkNotNull(transactionalIds);
this.transactionalIds = transactionalIds;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

KafkaTransactionContext that = (KafkaTransactionContext) o;

return transactionalIds.equals(that.transactionalIds);
}

@Override
public int hashCode() {
return transactionalIds.hashCode();
}
}

/**
* {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
* {@link KafkaTransactionState}.
*/
@VisibleForTesting
@Internal
public static class TransactionStateSerializer extends TypeSerializerSingleton<KafkaTransactionState> {

private static final long serialVersionUID = 1L;

@Override
public boolean isImmutableType() {
return true;
}

@Override
public KafkaTransactionState createInstance() {
return null;
}

@Override
public KafkaTransactionState copy(KafkaTransactionState from) {
return from;
}

@Override
public KafkaTransactionState copy(
KafkaTransactionState from,
KafkaTransactionState reuse) {
return from;
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(
KafkaTransactionState record,
DataOutputView target) throws IOException {
if (record.transactionalId == null) {
target.writeBoolean(false);
} else {
target.writeBoolean(true);
target.writeUTF(record.transactionalId);
}
target.writeLong(record.producerId);
target.writeShort(record.epoch);
}

@Override
public KafkaTransactionState deserialize(DataInputView source) throws IOException {
String transactionalId = null;
if (source.readBoolean()) {
transactionalId = source.readUTF();
}
long producerId = source.readLong();
short epoch = source.readShort();
return new KafkaTransactionState(transactionalId, producerId, epoch, null);
}

@Override
public KafkaTransactionState deserialize(
KafkaTransactionState reuse,
DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(
DataInputView source, DataOutputView target) throws IOException {
boolean hasTransactionalId = source.readBoolean();
target.writeBoolean(hasTransactionalId);
if (hasTransactionalId) {
target.writeUTF(source.readUTF());
}
target.writeLong(source.readLong());
target.writeShort(source.readShort());
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof TransactionStateSerializer;
}
}

/**
* {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
* {@link KafkaTransactionContext}.
*/
@VisibleForTesting
@Internal
public static class ContextStateSerializer extends TypeSerializerSingleton<KafkaTransactionContext> {

private static final long serialVersionUID = 1L;

@Override
public boolean isImmutableType() {
return true;
}

@Override
public KafkaTransactionContext createInstance() {
return null;
}

@Override
public KafkaTransactionContext copy(KafkaTransactionContext from) {
return from;
}

@Override
public KafkaTransactionContext copy(
KafkaTransactionContext from,
KafkaTransactionContext reuse) {
return from;
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(
KafkaTransactionContext record,
DataOutputView target) throws IOException {
int numIds = record.transactionalIds.size();
target.writeInt(numIds);
for (String id : record.transactionalIds) {
target.writeUTF(id);
}
}

@Override
public KafkaTransactionContext deserialize(DataInputView source) throws IOException {
int numIds = source.readInt();
Set<String> ids = new HashSet<>(numIds);
for (int i = 0; i < numIds; i++) {
ids.add(source.readUTF());
}
return new KafkaTransactionContext(ids);
}

@Override
public KafkaTransactionContext deserialize(
KafkaTransactionContext reuse,
DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(
DataInputView source,
DataOutputView target) throws IOException {
int numIds = source.readInt();
target.writeInt(numIds);
for (int i = 0; i < numIds; i++) {
target.writeUTF(source.readUTF());
}
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof ContextStateSerializer;
}
}

static class ProducersPool implements Closeable {
private final LinkedBlockingDeque<FlinkKafkaProducer<byte[], byte[]>> pool = new LinkedBlockingDeque<>();
Expand Down
Loading

0 comments on commit 0ba528c

Please sign in to comment.