From 1c67df838ad045484f96a5839c324771285d9cf7 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Mon, 31 May 2021 14:52:49 +0200 Subject: [PATCH] [FLINK-22810][connector-elasticsearch] Drop usages of legacy planner in Elasticsearch modules This closes #16031. --- .../pom.xml | 44 +++++-------- .../ElasticsearchUpsertTableSinkBase.java | 3 +- ...csearchUpsertTableSinkFactoryTestBase.java | 64 ++++++++++--------- .../flink-connector-elasticsearch5/pom.xml | 8 ++- .../flink-connector-elasticsearch6/pom.xml | 38 +++++------ .../flink-connector-elasticsearch7/pom.xml | 30 ++++----- 6 files changed, 85 insertions(+), 102 deletions(-) diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml index 0c069fbeb6bd6..ea25c68ea9fe1 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml @@ -42,7 +42,7 @@ under the License. - + org.apache.flink @@ -51,6 +51,19 @@ under the License. provided + + + + + org.apache.flink + flink-table-api-java-bridge_${scala.binary.version} + ${project.version} + provided + true + + + + org.elasticsearch elasticsearch @@ -70,25 +83,7 @@ under the License. - - - - org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} - ${project.version} - provided - true - - - - org.apache.flink - flink-table-planner_${scala.binary.version} - ${project.version} - provided - true - - - + org.apache.flink @@ -113,15 +108,6 @@ under the License. test-jar - - - org.apache.flink - flink-table-planner_${scala.binary.version} - ${project.version} - test-jar - test - - org.apache.flink diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java index 7037bc7f38e72..5d47dacb7c94e 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java @@ -33,7 +33,6 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sinks.UpsertStreamTableSink; -import org.apache.flink.table.typeutils.TypeCheckUtils; import org.apache.flink.table.utils.TableConnectorUtils; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; @@ -305,7 +304,7 @@ private void validateKeyTypes(int[] keyFieldIndices) { final TypeInformation[] types = getFieldTypes(); for (int keyFieldIndex : keyFieldIndices) { final TypeInformation type = types[keyFieldIndex]; - if (!TypeCheckUtils.isSimpleStringRepresentation(type)) { + if (!type.isKeyType()) { throw new ValidationException( "Only simple types that can be safely converted into a string representation " + "can be used as keys. But was: " diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java index 03e5b24123836..a67953d8eef0f 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java @@ -26,10 +26,6 @@ import org.apache.flink.streaming.connectors.elasticsearch.index.IndexGeneratorFactory; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.Elasticsearch; -import org.apache.flink.table.descriptors.Json; -import org.apache.flink.table.descriptors.Schema; -import org.apache.flink.table.descriptors.TestTableDescriptor; import org.apache.flink.table.factories.StreamTableSinkFactory; import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.sinks.TableSink; @@ -160,32 +156,40 @@ protected Map createTestSinkOptions() { } protected Map createElasticSearchProperties() { - return new TestTableDescriptor( - new Elasticsearch() - .version(getElasticsearchVersion()) - .host(HOSTNAME, PORT, SCHEMA) - .index(INDEX) - .documentType(DOC_TYPE) - .keyDelimiter(KEY_DELIMITER) - .keyNullLiteral(KEY_NULL_LITERAL) - .bulkFlushBackoffExponential() - .bulkFlushBackoffDelay(123L) - .bulkFlushBackoffMaxRetries(3) - .bulkFlushInterval(100L) - .bulkFlushMaxActions(1000) - .bulkFlushMaxSize("1 MB") - .failureHandlerCustom(DummyFailureHandler.class) - .connectionMaxRetryTimeout(100) - .connectionPathPrefix("/myapp")) - .withFormat(new Json().deriveSchema()) - .withSchema( - new Schema() - .field(FIELD_KEY, DataTypes.BIGINT()) - .field(FIELD_FRUIT_NAME, DataTypes.STRING()) - .field(FIELD_COUNT, DataTypes.DECIMAL(10, 4)) - .field(FIELD_TS, DataTypes.TIMESTAMP(3))) - .inUpsertMode() - .toProperties(); + final Map map = new HashMap<>(); + map.put("connector.bulk-flush.backoff.type", "exponential"); + map.put("connector.bulk-flush.max-size", "1 mb"); + map.put("schema.0.data-type", "BIGINT"); + map.put("schema.1.name", "fruit_name"); + map.put("connector.property-version", "1"); + map.put("connector.bulk-flush.backoff.max-retries", "3"); + map.put("schema.3.data-type", "TIMESTAMP(3)"); + map.put("connector.document-type", "MyType"); + map.put("schema.3.name", "ts"); + map.put("connector.index", "MyIndex"); + map.put("schema.0.name", "key"); + map.put("connector.bulk-flush.backoff.delay", "123"); + map.put("connector.bulk-flush.max-actions", "1000"); + map.put("schema.2.name", "count"); + map.put("update-mode", "upsert"); + map.put( + "connector.failure-handler-class", + ElasticsearchUpsertTableSinkFactoryTestBase.DummyFailureHandler.class.getName()); + map.put("format.type", "json"); + map.put("schema.1.data-type", "VARCHAR(2147483647)"); + map.put("connector.version", getElasticsearchVersion()); + map.put("connector.bulk-flush.interval", "100"); + map.put("schema.2.data-type", "DECIMAL(10, 4)"); + map.put("connector.hosts", "https://host1:1234"); + map.put("connector.failure-handler", "custom"); + map.put("format.property-version", "1"); + map.put("format.derive-schema", "true"); + map.put("connector.type", "elasticsearch"); + map.put("connector.key-null-literal", ""); + map.put("connector.key-delimiter", "#"); + map.put("connector.connection-path-prefix", "/myapp"); + map.put("connector.connection-max-retry-timeout", "100"); + return map; } // -------------------------------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/pom.xml b/flink-connectors/flink-connector-elasticsearch5/pom.xml index fdf56b802a9d1..7a5a91eb741f5 100644 --- a/flink-connectors/flink-connector-elasticsearch5/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch5/pom.xml @@ -42,7 +42,7 @@ under the License. - + org.apache.flink @@ -51,6 +51,8 @@ under the License. provided + + org.apache.flink flink-connector-elasticsearch-base_${scala.binary.version} @@ -64,6 +66,8 @@ under the License. + + org.elasticsearch.client @@ -120,7 +124,7 @@ under the License. 4.1.46.Final - + org.apache.flink diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml index afdb3fec58697..b717dccc4c04b 100644 --- a/flink-connectors/flink-connector-elasticsearch6/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml @@ -42,7 +42,7 @@ under the License. - + org.apache.flink @@ -51,6 +51,19 @@ under the License. provided + + + + + org.apache.flink + flink-table-api-java-bridge_${scala.binary.version} + ${project.version} + provided + true + + + + org.apache.flink flink-connector-elasticsearch-base_${scala.binary.version} @@ -64,6 +77,8 @@ under the License. + + org.elasticsearch.client @@ -71,17 +86,7 @@ under the License. ${elasticsearch.version} - - - - org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} - ${project.version} - provided - true - - - + org.testcontainers @@ -143,15 +148,6 @@ under the License. provided - - - org.apache.flink - flink-table-planner_${scala.binary.version} - ${project.version} - test-jar - test - - org.apache.flink diff --git a/flink-connectors/flink-connector-elasticsearch7/pom.xml b/flink-connectors/flink-connector-elasticsearch7/pom.xml index ea9ed8ec45200..63d6b629294e2 100644 --- a/flink-connectors/flink-connector-elasticsearch7/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch7/pom.xml @@ -42,7 +42,7 @@ under the License. - + org.apache.flink @@ -51,6 +51,8 @@ under the License. provided + + org.apache.flink flink-connector-elasticsearch-base_${scala.binary.version} @@ -64,14 +66,8 @@ under the License. - - - org.elasticsearch.client - elasticsearch-rest-high-level-client - ${elasticsearch.version} - - + org.apache.flink @@ -81,7 +77,14 @@ under the License. true - + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + ${elasticsearch.version} + + + org.testcontainers @@ -137,15 +140,6 @@ under the License. test - - - org.apache.flink - flink-table-planner_${scala.binary.version} - ${project.version} - test-jar - test - - org.apache.flink