diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java index f1dcc83f652fa..d3b774c8428d4 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; /** * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls across different versions. @@ -79,6 +80,19 @@ void configureBulkProcessorBackoff( BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy); + /** + * Creates a {@link RequestIndexer} that is able to work with {@link BulkProcessor} binary compatible. + */ + default RequestIndexer createBulkProcessorIndexer( + BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequestsRef) { + return new PreElasticsearch6BulkProcessorIndexer( + bulkProcessor, + flushOnCheckpoint, + numPendingRequestsRef); + } + /** * Perform any necessary state cleanup. */ diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java index 7dac06ceb8a7a..4d0c00252d279 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java @@ -164,7 +164,7 @@ public void setDelayMillis(long delayMillis) { private boolean flushOnCheckpoint = true; /** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */ - private transient BulkProcessorIndexer requestIndexer; + private transient RequestIndexer requestIndexer; // ------------------------------------------------------------------------ // Internals for the Flink Elasticsearch Sink @@ -295,7 +295,7 @@ public void disableFlushOnCheckpoint() { public void open(Configuration parameters) throws Exception { client = callBridge.createClient(userConfig); bulkProcessor = buildBulkProcessor(new BulkProcessorListener()); - requestIndexer = new BulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests); + requestIndexer = callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests); } @Override diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java new file mode 100644 index 0000000000000..85f4b9a3ea1ed --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.annotation.Internal; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; + +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. + * {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster. + * + * @deprecated This class is not binary compatible with newer Elasticsearch 6+ versions + * (i.e. the {@link #add(UpdateRequest...)} ). However, this module is currently + * compiled against a very old Elasticsearch version. + */ +@Deprecated +@Internal +class PreElasticsearch6BulkProcessorIndexer implements RequestIndexer { + + private final BulkProcessor bulkProcessor; + private final boolean flushOnCheckpoint; + private final AtomicLong numPendingRequestsRef; + + PreElasticsearch6BulkProcessorIndexer(BulkProcessor bulkProcessor, boolean flushOnCheckpoint, AtomicLong numPendingRequestsRef) { + this.bulkProcessor = checkNotNull(bulkProcessor); + this.flushOnCheckpoint = flushOnCheckpoint; + this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef); + } + + @Override + public void add(DeleteRequest... deleteRequests) { + for (DeleteRequest deleteRequest : deleteRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add(deleteRequest); + } + } + + @Override + public void add(IndexRequest... indexRequests) { + for (IndexRequest indexRequest : indexRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add(indexRequest); + } + } + + @Override + public void add(UpdateRequest... updateRequests) { + for (UpdateRequest updateRequest : updateRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add(updateRequest); + } + } +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java index 03bf9c0710995..782cbbcf4677c 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.util.Preconditions; import org.apache.http.HttpHost; @@ -38,6 +39,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; /** * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions. @@ -126,4 +128,15 @@ public void configureBulkProcessorBackoff( builder.setBackoffPolicy(backoffPolicy); } + + @Override + public RequestIndexer createBulkProcessorIndexer( + BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequestsRef) { + return new Elasticsearch6BulkProcessorIndexer( + bulkProcessor, + flushOnCheckpoint, + numPendingRequestsRef); + } } diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java new file mode 100644 index 0000000000000..af3c5b13a9a3a --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; + +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. + * {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster. + * + *
Note: This class is binary compatible to Elasticsearch 6.
+ */
+@Internal
+class Elasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+ private final BulkProcessor bulkProcessor;
+ private final boolean flushOnCheckpoint;
+ private final AtomicLong numPendingRequestsRef;
+
+ Elasticsearch6BulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ this.bulkProcessor = checkNotNull(bulkProcessor);
+ this.flushOnCheckpoint = flushOnCheckpoint;
+ this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ for (DeleteRequest deleteRequest : deleteRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(deleteRequest);
+ }
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ for (IndexRequest indexRequest : indexRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(indexRequest);
+ }
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ for (UpdateRequest updateRequest : updateRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(updateRequest);
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
index 18fa05a8976b5..21c53edcf4fa7 100644
--- a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -17,16 +17,18 @@
package org.apache.flink.streaming.tests;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.Collector;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@@ -56,11 +58,14 @@ public static void main(String[] args) throws Exception {
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);
- DataStream