Skip to content

Commit

Permalink
[FLINK-24087][connector-kafka] Avoid importing Table API classes for …
Browse files Browse the repository at this point in the history
…DataStream API programs

This closes apache#17082.
  • Loading branch information
leonardBang authored and twalthr committed Sep 1, 2021
1 parent ad052cc commit 61b5b0a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
import org.apache.flink.table.api.TableException;

/** Startup modes for the Kafka Consumer. */
@Internal
Expand Down Expand Up @@ -58,25 +56,6 @@ public enum StartupMode {
this.stateSentinel = stateSentinel;
}

public static StartupMode fromOption(ScanStartupMode scanStartupMode) {
switch (scanStartupMode) {
case EARLIEST_OFFSET:
return StartupMode.EARLIEST;
case LATEST_OFFSET:
return StartupMode.LATEST;
case GROUP_OFFSETS:
return StartupMode.GROUP_OFFSETS;
case SPECIFIC_OFFSETS:
return StartupMode.SPECIFIC_OFFSETS;
case TIMESTAMP:
return StartupMode.TIMESTAMP;

default:
throw new TableException(
"Unsupported startup mode. Validator should have checked that.");
}
}

public long getStateSentinel() {
return stateSentinel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public static StartupOptions getStartupOptions(ReadableConfig tableOptions) {
final StartupMode startupMode =
tableOptions
.getOptional(SCAN_STARTUP_MODE)
.map(StartupMode::fromOption)
.map(KafkaConnectorOptionsUtil::fromOption)
.orElse(StartupMode.GROUP_OFFSETS);
if (startupMode == StartupMode.SPECIFIC_OFFSETS) {
// It will be refactored after support specific offset for multiple topics in
Expand Down Expand Up @@ -257,6 +257,29 @@ private static void buildSpecificOffsets(
});
}

/**
* Returns the {@link StartupMode} of Kafka Consumer by passed-in table-specific {@link
* ScanStartupMode}.
*/
private static StartupMode fromOption(ScanStartupMode scanStartupMode) {
switch (scanStartupMode) {
case EARLIEST_OFFSET:
return StartupMode.EARLIEST;
case LATEST_OFFSET:
return StartupMode.LATEST;
case GROUP_OFFSETS:
return StartupMode.GROUP_OFFSETS;
case SPECIFIC_OFFSETS:
return StartupMode.SPECIFIC_OFFSETS;
case TIMESTAMP:
return StartupMode.TIMESTAMP;

default:
throw new TableException(
"Unsupported startup mode. Validator should have checked that.");
}
}

public static Properties getKafkaProperties(Map<String, String> tableOptions) {
final Properties kafkaProperties = new Properties();

Expand Down

0 comments on commit 61b5b0a

Please sign in to comment.