Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6988] flink-connector-kafka-0.11 with exactly-once semantic #4239

Closed
wants to merge 5 commits into from

Conversation

pnowojski
Copy link
Contributor

@pnowojski pnowojski commented Jun 30, 2017

First four commits are from #4557 and #4561.

@pnowojski pnowojski force-pushed the kafka011 branch 4 times, most recently from 723a5be to 35ee552 Compare July 3, 2017 15:00
@pnowojski pnowojski changed the title [FLINK-6988] Initial flink-connector-kafka-0.11 with at-least-once semantic [FLINK-6988] flink-connector-kafka-0.11 with exactly-once semantic Jul 13, 2017
@pnowojski pnowojski force-pushed the kafka011 branch 3 times, most recently from 46b3b68 to 2cf5f3b Compare July 17, 2017 15:39
*
* <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
* is constructed. That means that the client that submits the program needs to be able to
* reach the Kafka brokers or ZooKeeper.</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This NOTE is no longer valid and can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it also true for 0.10?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I have a separate PR which cleans that up for all Kafka versions.

*
* <p>Details about approach (a):
* Pre Kafka 0.11 producers only follow approach (a), allowing users to use the producer using the
* DataStream.addSink() method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Pre Kafka 0.11 producers only follow approach (a)" is incorrect.
Kafka 0.10 also supports hybrid.

*/
private boolean logFailuresOnly;

private Semantic semantic;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: also include Javadoc for consistency.

/** Number of unacknowledged records. */
private final AtomicLong pendingRecords = new AtomicLong();

private final Map<String, KafkaMetricMuttableWrapper> previouslyCreatedMetrics = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: also include Javadoc for consistency.

* @param <TXN> Transaction to store all of the information required to handle a transaction (must be Serializable)
*/
@PublicEvolving
public abstract class TwoPhaseCommitSinkFunction<IN, TXN extends Serializable>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the idea of introducing this abstraction :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, though, I would like to see unit tests specifically for this TwoPhaseCommitSinkFunction class.

// was triggered) and because there can be concurrent overlapping checkpoints
// (a new one is started before the previous fully finished).
//
// ==> There should never be a case where we have no pending transaction here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move this comment block as a Javadoc on the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why do you think so? This is a purely implementation detail, nothing that should bother the user of this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense. No strong objection here, can keep as is.

// checkpoint (temporary outage in the storage system) but
// could persist a successive checkpoint (the one notified here)
//
// - other (non Pravega sink) tasks could not persist their status during
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it required to mention Pravega here?

* Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.11.x. By default producer
* will use {@link Semantic.EXACTLY_ONCE} semantic.
*
* <p>Implementation note: This producer is a hybrid between a regular regular sink function (a)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this implementation really support the hybrid modes?
As far as I can understand it, FlinkKafkaProducer011 only extends TwoPhaseCommitSinkFunction, which doesn't support the hybrid modes.

Copy link
Contributor Author

@pnowojski pnowojski Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, according to all of the tests it passes.

(b) version works by passing instance of FlinkKafkaProducer011 as aSinkFunction in the KafkaStreamSink<IN> extends StreamSink<IN> class. Under the hood this StreamSink makes some checking if SinkFunction actually implements various versions of checkpointing interfaces and in that way it calls the appropriate methods on FlinkKafkaProducer011.

// for the reasons discussed in the 'notifyCheckpointComplete()' method.

pendingCommitTransactionsState = context.getOperatorStateStore().getSerializableListState("pendingCommitTransactions");
pendingTransactionsState = context.getOperatorStateStore().getSerializableListState("pendingTransactions");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getSerializableListState is deprecated and discouraged usage.
I would recommend that implementations may also pass in either the TypeInformation or their own TypeSerializer for the transaction state holder.

@tzulitai
Copy link
Contributor

Thanks a lot for opening a pull request for this very important feature @pnowojski.
I did a rough first pass and had some comments I would like to clear out first (this is a big chunk of code, we would probably need to go through this quite a few times before it can be mergeable.)

Most notably, some comments so far:

  1. I think we need UTs for the TwoPhaseCommitSinkFunction. It alone is a very important addition (I would even prefer a separate PR for it and try to merge that first.)
  2. Serialization of the transaction state in TwoPhaseCommitSinkFunction needs to be changed
  3. Is the FlinkKafkaProducer011 actually supporting hybrid (normal sink function and writeToKafkaWithTimestamps as a custom sink operator)? From the looks of it, it doesn't seem like it.

@tzulitai
Copy link
Contributor

tzulitai commented Jul 19, 2017

Regarding how I would proceed with this big contribution:
Lets first try to clean up the commits that are bundled all together here.

  1. I would first try to merge [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1 #4321 (the first 4 commits) and [misc] Commit read offsets in Kafka integration tests #4310 (af7ed19) and get those out of the way. Could you also prioritize in commenting on those first (I've left new comments there)?
  2. For a06cb94 (TwoPhaseCommitSinkFunction), could you open a separate PR with unit tests covered?
  3. After the above is all sorted out, we rebase this again.

@tzulitai
Copy link
Contributor

tzulitai commented Jul 19, 2017

One other comment regarding the commits:
I would argue that df6d5e0 to 5ff8106 should not appear in the commit log history, since in the end we actually have a completely new producer for 011 anyways.
Also, 321a142 to 2cf5f3b should be squashed to a single commit for the addition of an "exactly-once producer for 011" (the new FlinkKafkaProducer implementation and exactly-once tests shouldn't stand alone as independent commits, IMO. FlinkKafkaProducer isn't used by other producer version, and the exactly-once producer addition wouldn't be valid without the tests).

What do you think?

@pnowojski
Copy link
Contributor Author

df6d5e0 to 5ff8106 should definitely be squashed, I left them only to make it easier for reviewers to follow the changes made in 0.11 vs 0.10 connectors (those changes would be invisible in one blob commit).

For 321a142 to 2cf5f3b I'm not sure about the first one, FlinkKafkaProducer is that hacky that it could deserve separate commit. It would make it stand out more if anyone in the future would look at the commit history/changes (it could hide in larger change).

@tzulitai
Copy link
Contributor

Ok :) I can agree that we keep 321a142 a separate commit.
For df6d5e0 to 5ff8106, I actually found it easier to ignore all that (because a lot of it is irrelevant in the end) and went straight to 41ad973.

Copy link
Contributor Author

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added tests for TwoPhaseCommitFunction and opened new PR #4368 for that - however please check responses to your comments here before moving to that new PR.

*
* <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
* is constructed. That means that the client that submits the program needs to be able to
* reach the Kafka brokers or ZooKeeper.</p>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it also true for 0.10?

// was triggered) and because there can be concurrent overlapping checkpoints
// (a new one is started before the previous fully finished).
//
// ==> There should never be a case where we have no pending transaction here
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why do you think so? This is a purely implementation detail, nothing that should bother the user of this class.

@zentol
Copy link
Contributor

zentol commented Jul 28, 2017

please add an entry to the MODULES_CONNECTORS variable in the tools/travis_mvn_watchdog sh script.

@tzulitai
Copy link
Contributor

tzulitai commented Aug 7, 2017

Now that the prerequisite PRs are merged, we can rebase this now :)

@pnowojski pnowojski force-pushed the kafka011 branch 5 times, most recently from cc48a21 to 35cc64f Compare August 8, 2017 12:37
@rangadi
Copy link

rangadi commented Aug 9, 2017

How does exactly-once sink handle large gap between preCommit() and recoverAndCommit() in case of a recovery? The server seems to abort a transaction after a timeout max.transaction.timeout.ms.

@pnowojski
Copy link
Contributor Author

I think there is no way we can to handle it in any different way then to increase the timeout to some very large value. Or is it?

public void initializeState(FunctionInitializationContext context) throws Exception {
availableTransactionalIds.clear();
for (int i = 0; i < kafkaProducersPoolSize; i++) {
availableTransactionalIds.add(UUID.randomUUID().toString());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to reuse stored ids rather than creating new ones each time. I am thinking of a case where a task goes into crash loop dying even before first commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense, but I guess its mostly due to that the current code isn't differentiating between used and unused transaction ids in the state. If we differentiate that, it would be possible to reuse stored ids.

Piotr, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a valid issue, however on it's one this solution would not be enough. It would not work for a case when we first (1) scale down, then we (2) scale up. On event (2), we would need to create new transactional ids, but we wouldn't know from which id we can start.

However I think we can deduce the starting point for new IDs using getUnionListState to track down globally what is the next available transactional id.

Copy link

@rangadi rangadi Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we wouldn't know from which id we can start.

Not sure if you need 'start id'. You can just abort all of them whether they are any open transactions or not (in fact if you open a new producer with the id, Kafka aborts any that are open). This is mainly a for clarification, will leave it to you guys to decide on specifics.

Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow-up revision @pnowojski.
I think the latest approach we're going for seems sane.

I've only checked the code on the producer side. I'm assuming that other codes (consumer, table sink / sources) are mostly identical to the other versions. From what I see, I think this is almost mergeable, minus some comments I had left inline.

* <li>{@link #NONE}</li>
*/
public enum Semantic {
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line before comment block.

* <li>increase size of {@link FlinkKafkaProducer}s pool</li>
*/
EXACTLY_ONCE,
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line before comment block.

* to be acknowledged by the Kafka producer on a checkpoint.
*/
AT_LEAST_ONCE,
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line before comment block.

/**
* Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}.
*/
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you briefly describe the reason of the number 5?
Why not use numConcurrentCheckpoints + 1 (as we discussed offline)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I remember the reason was that it is not easy/not possible at the moment to get this information in the operator. It should be a follow up work. Regardless of this, code of this operator would look the same (because we don't have guarantees for the notifyCheckpointComplete to always reach us on time).

/**
* Pool of transacional ids backed up in state.
*/
private ListState<String> transactionalIdsState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably make this transient also for documentation purposes.

availableTransactionalIds.add(UUID.randomUUID().toString());
}

super.initializeState(context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we initialize the base TwoPhaseCommitSink first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope :(

this.kafkaProducer = kafkaProducer;
}

// TODO: is this used anywhere?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this variant is used when using writeToKafkaWithTimestamps

private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);

private final KafkaProducer<K, V> kafkaProducer;
@Nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line before this field annotation.

if (!(producerId >= 0 && epoch >= 0)) {
throw new IllegalStateException(String.format("Incorrect values for producerId [%s] and epoch [%s]", producerId, epoch));
}
LOG.info("Attempting to resume transaction with producerId [%s] and epoch [%s]", producerId, epoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{} instead of [%s]s


public void resumeTransaction(long producerId, short epoch) {
if (!(producerId >= 0 && epoch >= 0)) {
throw new IllegalStateException(String.format("Incorrect values for producerId [%s] and epoch [%s]", producerId, epoch));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use Preconditions.checkState(...) here.

@pnowojski pnowojski force-pushed the kafka011 branch 4 times, most recently from 5ced4ea to dfb7e24 Compare August 29, 2017 10:15
@aljoscha
Copy link
Contributor

I did a first high-level review of the code. I think it's good so far!

Before we can merge this, however, we need a few more things around it:

  • A section in the Kafka doc about the new exactly-once mode, how it can be configured etc.
  • A big disclaimer (possibly in an "alert" box) about the interplay with the transaction timeout and what the caveats there are
  • A section in the Javadocs about the aforementioned caveats
  • A check that ensures that the transaction timeout is set to a reasonably high setting (say 1 hour) when exactly-once semantics are enabled. (With an override setting that allows the user to set a lower transaction time out if they want to.)

Also, this has interplay with #4616 but we can resolve that by merging them in any order and fixing up the later changes when merging.

@pnowojski pnowojski force-pushed the kafka011 branch 2 times, most recently from cbfc50d to e2d477f Compare September 4, 2017 13:11
@pnowojski
Copy link
Contributor Author

pnowojski commented Sep 4, 2017

@aljoscha I have addressed you high level comments and fixed some bugs. Please check 5 latest commits (one of them is a new dependency on another PR: #4631 )

@aljoscha
Copy link
Contributor

aljoscha commented Sep 6, 2017

What were the bugs that you fixed?

@pnowojski
Copy link
Contributor Author

Bugs in tests (those that you can see in fixup commits)

@ariskk
Copy link

ariskk commented Oct 4, 2017

We are really looking forward to this 👍

@pnowojski pnowojski force-pushed the kafka011 branch 2 times, most recently from ed98a07 to 32ff813 Compare October 4, 2017 09:20
@pnowojski
Copy link
Contributor Author

@aljoscha rebased on latest master and integrated your changes

@aljoscha
Copy link
Contributor

aljoscha commented Oct 9, 2017

Merged! 😃

Could you please close this PR?

@pnowojski
Copy link
Contributor Author

Thanks :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants