Skip to content

Commit

Permalink
[FLINK-22810][connector-elasticsearch] Drop usages of legacy planner …
Browse files Browse the repository at this point in the history
…in Elasticsearch modules

This closes apache#16031.
  • Loading branch information
twalthr committed Jun 2, 2021
1 parent abd321c commit 1c67df8
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 102 deletions.
44 changes: 15 additions & 29 deletions flink-connectors/flink-connector-elasticsearch-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ under the License.

<dependencies>

<!-- core dependencies -->
<!-- Core -->

<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -51,6 +51,19 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->

<!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- Elasticsearch -->

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
Expand All @@ -70,25 +83,7 @@ under the License.
</exclusions>
</dependency>

<!-- Table ecosystem -->
<!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- test dependencies -->
<!-- Tests -->

<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -113,15 +108,6 @@ under the License.
<type>test-jar</type>
</dependency>

<!-- Elasticsearch table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Elasticsearch table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sinks.UpsertStreamTableSink;
import org.apache.flink.table.typeutils.TypeCheckUtils;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -305,7 +304,7 @@ private void validateKeyTypes(int[] keyFieldIndices) {
final TypeInformation<?>[] types = getFieldTypes();
for (int keyFieldIndex : keyFieldIndices) {
final TypeInformation<?> type = types[keyFieldIndex];
if (!TypeCheckUtils.isSimpleStringRepresentation(type)) {
if (!type.isKeyType()) {
throw new ValidationException(
"Only simple types that can be safely converted into a string representation "
+ "can be used as keys. But was: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
import org.apache.flink.streaming.connectors.elasticsearch.index.IndexGeneratorFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.Elasticsearch;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableDescriptor;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.sinks.TableSink;
Expand Down Expand Up @@ -160,32 +156,40 @@ protected Map<SinkOption, String> createTestSinkOptions() {
}

protected Map<String, String> createElasticSearchProperties() {
return new TestTableDescriptor(
new Elasticsearch()
.version(getElasticsearchVersion())
.host(HOSTNAME, PORT, SCHEMA)
.index(INDEX)
.documentType(DOC_TYPE)
.keyDelimiter(KEY_DELIMITER)
.keyNullLiteral(KEY_NULL_LITERAL)
.bulkFlushBackoffExponential()
.bulkFlushBackoffDelay(123L)
.bulkFlushBackoffMaxRetries(3)
.bulkFlushInterval(100L)
.bulkFlushMaxActions(1000)
.bulkFlushMaxSize("1 MB")
.failureHandlerCustom(DummyFailureHandler.class)
.connectionMaxRetryTimeout(100)
.connectionPathPrefix("/myapp"))
.withFormat(new Json().deriveSchema())
.withSchema(
new Schema()
.field(FIELD_KEY, DataTypes.BIGINT())
.field(FIELD_FRUIT_NAME, DataTypes.STRING())
.field(FIELD_COUNT, DataTypes.DECIMAL(10, 4))
.field(FIELD_TS, DataTypes.TIMESTAMP(3)))
.inUpsertMode()
.toProperties();
final Map<String, String> map = new HashMap<>();
map.put("connector.bulk-flush.backoff.type", "exponential");
map.put("connector.bulk-flush.max-size", "1 mb");
map.put("schema.0.data-type", "BIGINT");
map.put("schema.1.name", "fruit_name");
map.put("connector.property-version", "1");
map.put("connector.bulk-flush.backoff.max-retries", "3");
map.put("schema.3.data-type", "TIMESTAMP(3)");
map.put("connector.document-type", "MyType");
map.put("schema.3.name", "ts");
map.put("connector.index", "MyIndex");
map.put("schema.0.name", "key");
map.put("connector.bulk-flush.backoff.delay", "123");
map.put("connector.bulk-flush.max-actions", "1000");
map.put("schema.2.name", "count");
map.put("update-mode", "upsert");
map.put(
"connector.failure-handler-class",
ElasticsearchUpsertTableSinkFactoryTestBase.DummyFailureHandler.class.getName());
map.put("format.type", "json");
map.put("schema.1.data-type", "VARCHAR(2147483647)");
map.put("connector.version", getElasticsearchVersion());
map.put("connector.bulk-flush.interval", "100");
map.put("schema.2.data-type", "DECIMAL(10, 4)");
map.put("connector.hosts", "https://host1:1234");
map.put("connector.failure-handler", "custom");
map.put("format.property-version", "1");
map.put("format.derive-schema", "true");
map.put("connector.type", "elasticsearch");
map.put("connector.key-null-literal", "");
map.put("connector.key-delimiter", "#");
map.put("connector.connection-path-prefix", "/myapp");
map.put("connector.connection-max-retry-timeout", "100");
return map;
}

// --------------------------------------------------------------------------------------------
Expand Down
8 changes: 6 additions & 2 deletions flink-connectors/flink-connector-elasticsearch5/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ under the License.

<dependencies>

<!-- core dependencies -->
<!-- Core -->

<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -51,6 +51,8 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- Flink Elasticsearch -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
Expand All @@ -64,6 +66,8 @@ under the License.
</exclusions>
</dependency>

<!-- Elasticsearch -->

<!-- Dependency for Elasticsearch 5.x Java Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
Expand Down Expand Up @@ -120,7 +124,7 @@ under the License.
<version>4.1.46.Final</version>
</dependency>

<!-- test dependencies -->
<!-- Tests -->

<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
38 changes: 17 additions & 21 deletions flink-connectors/flink-connector-elasticsearch6/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ under the License.

<dependencies>

<!-- core dependencies -->
<!-- Core -->

<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -51,6 +51,19 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->

<!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- Flink Elasticsearch -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
Expand All @@ -64,24 +77,16 @@ under the License.
</exclusions>
</dependency>

<!-- Elasticsearch -->

<!-- Dependency for Elasticsearch 6.x REST Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

<!-- Table ecosystem -->
<!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- test dependencies -->
<!-- Tests -->

<dependency>
<groupId>org.testcontainers</groupId>
Expand Down Expand Up @@ -143,15 +148,6 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- Elasticsearch table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Table API integration tests -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
30 changes: 12 additions & 18 deletions flink-connectors/flink-connector-elasticsearch7/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ under the License.

<dependencies>

<!-- core dependencies -->
<!-- Core -->

<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -51,6 +51,8 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- Flink Elasticsearch -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
Expand All @@ -64,14 +66,8 @@ under the License.
</exclusions>
</dependency>

<!-- Dependency for Elasticsearch 7.x REST Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

<!-- Table ecosystem -->

<!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -81,7 +77,14 @@ under the License.
<optional>true</optional>
</dependency>

<!-- test dependencies -->
<!-- Dependency for Elasticsearch 7.x REST Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

<!-- Tests -->

<dependency>
<groupId>org.testcontainers</groupId>
Expand Down Expand Up @@ -137,15 +140,6 @@ under the License.
<scope>test</scope>
</dependency>

<!-- Elasticsearch table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Table API integration tests -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down

0 comments on commit 1c67df8

Please sign in to comment.