Skip to content

Commit

Permalink
[FLINK-20500][upsert-kafka] Fix unstable UpsertKafkaTableITCase.testT…
Browse files Browse the repository at this point in the history
…emporalJoin

This closes apache#14330
  • Loading branch information
fsk119 committed Dec 9, 2020
1 parent 5376254 commit b82585f
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,17 @@ public void testAggregate() throws Exception {
@Test
public void testTemporalJoin() throws Exception {
String topic = USERS_TOPIC + "_" + format;
env.setParallelism(2);
createTestTopic(topic, 2, 1);
// ------------- test ---------------
// Kafka DefaultPartitioner's hash strategy is slightly different from Flink KeyGroupStreamPartitioner,
// which causes the records in the different Flink partitions are written into the same Kafka partition.
// When reading from the out-of-order Kafka partition, we need to set suitable watermark interval to
// tolerate the disorderliness.
// For convenience, we just set the parallelism 1 to make all records are in the same Flink partition and
// use the Kafka DefaultPartition to repartition the records.
env.setParallelism(1);
writeChangelogToUpsertKafkaWithMetadata(topic);
env.setParallelism(2);
temporalJoinUpsertKafka(topic);
// ------------- clean up ---------------
deleteTestTopic(topic);
Expand Down Expand Up @@ -480,7 +487,6 @@ private void writeChangelogToUpsertKafkaWithMetadata(String userTable) throws Ex
}

private void temporalJoinUpsertKafka(String userTable) throws Exception {

// ------------- test data ---------------
List<Row> input = Arrays.asList(
Row.of(10001L, 100L, LocalDateTime.parse("2020-08-15T00:00:02")),
Expand Down

0 comments on commit b82585f

Please sign in to comment.