Skip to content

Commit

Permalink
[FLINK-10269] [connectors] Fix Elasticsearch 6 UpdateRequest binary i…
Browse files Browse the repository at this point in the history
…ncompatibility

This commit fixes the binary incompatibility for UpdateRequests in Elasticsearch. This
is due to a binary compatibility issue between the base module (which is compiled
against a very old ES version and the current Elasticsearch version).
It lets the API call bridge also provide the RequestIndexer version-specific.

This closes apache#6682.
  • Loading branch information
twalthr committed Sep 13, 2018
1 parent f04af47 commit c4beb3a
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls across different versions.
Expand Down Expand Up @@ -79,6 +80,19 @@ void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);

/**
* Creates a {@link RequestIndexer} that is able to work with {@link BulkProcessor} binary compatible.
*/
default RequestIndexer createBulkProcessorIndexer(
BulkProcessor bulkProcessor,
boolean flushOnCheckpoint,
AtomicLong numPendingRequestsRef) {
return new PreElasticsearch6BulkProcessorIndexer(
bulkProcessor,
flushOnCheckpoint,
numPendingRequestsRef);
}

/**
* Perform any necessary state cleanup.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void setDelayMillis(long delayMillis) {
private boolean flushOnCheckpoint = true;

/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
private transient BulkProcessorIndexer requestIndexer;
private transient RequestIndexer requestIndexer;

// ------------------------------------------------------------------------
// Internals for the Flink Elasticsearch Sink
Expand Down Expand Up @@ -295,7 +295,7 @@ public void disableFlushOnCheckpoint() {
public void open(Configuration parameters) throws Exception {
client = callBridge.createClient(userConfig);
bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
requestIndexer = new BulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
requestIndexer = callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.Internal;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;

import java.util.concurrent.atomic.AtomicLong;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
* {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
*
* @deprecated This class is not binary compatible with newer Elasticsearch 6+ versions
* (i.e. the {@link #add(UpdateRequest...)} ). However, this module is currently
* compiled against a very old Elasticsearch version.
*/
@Deprecated
@Internal
class PreElasticsearch6BulkProcessorIndexer implements RequestIndexer {

private final BulkProcessor bulkProcessor;
private final boolean flushOnCheckpoint;
private final AtomicLong numPendingRequestsRef;

PreElasticsearch6BulkProcessorIndexer(BulkProcessor bulkProcessor, boolean flushOnCheckpoint, AtomicLong numPendingRequestsRef) {
this.bulkProcessor = checkNotNull(bulkProcessor);
this.flushOnCheckpoint = flushOnCheckpoint;
this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
}

@Override
public void add(DeleteRequest... deleteRequests) {
for (DeleteRequest deleteRequest : deleteRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
this.bulkProcessor.add(deleteRequest);
}
}

@Override
public void add(IndexRequest... indexRequests) {
for (IndexRequest indexRequest : indexRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
this.bulkProcessor.add(indexRequest);
}
}

@Override
public void add(UpdateRequest... updateRequests) {
for (UpdateRequest updateRequest : updateRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
this.bulkProcessor.add(updateRequest);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.Preconditions;

import org.apache.http.HttpHost;
Expand All @@ -38,6 +39,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
Expand Down Expand Up @@ -126,4 +128,15 @@ public void configureBulkProcessorBackoff(

builder.setBackoffPolicy(backoffPolicy);
}

@Override
public RequestIndexer createBulkProcessorIndexer(
BulkProcessor bulkProcessor,
boolean flushOnCheckpoint,
AtomicLong numPendingRequestsRef) {
return new Elasticsearch6BulkProcessorIndexer(
bulkProcessor,
flushOnCheckpoint,
numPendingRequestsRef);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.elasticsearch6;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;

import java.util.concurrent.atomic.AtomicLong;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
* {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
*
* <p>Note: This class is binary compatible to Elasticsearch 6.
*/
@Internal
class Elasticsearch6BulkProcessorIndexer implements RequestIndexer {

private final BulkProcessor bulkProcessor;
private final boolean flushOnCheckpoint;
private final AtomicLong numPendingRequestsRef;

Elasticsearch6BulkProcessorIndexer(
BulkProcessor bulkProcessor,
boolean flushOnCheckpoint,
AtomicLong numPendingRequestsRef) {
this.bulkProcessor = checkNotNull(bulkProcessor);
this.flushOnCheckpoint = flushOnCheckpoint;
this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
}

@Override
public void add(DeleteRequest... deleteRequests) {
for (DeleteRequest deleteRequest : deleteRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
this.bulkProcessor.add(deleteRequest);
}
}

@Override
public void add(IndexRequest... indexRequests) {
for (IndexRequest indexRequest : indexRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
this.bulkProcessor.add(indexRequest);
}
}

@Override
public void add(UpdateRequest... updateRequests) {
for (UpdateRequest updateRequest : updateRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
this.bulkProcessor.add(updateRequest);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@

package org.apache.flink.streaming.tests;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.Collector;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -56,11 +58,14 @@ public static void main(String[] args) throws Exception {
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);

DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
.map(new MapFunction<Long, String>() {
DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
.flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
@Override
public String map(Long value) throws Exception {
return "message # " + value;
public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
final String key = String.valueOf(value);
final String message = "message #" + value;
out.collect(Tuple2.of(key, message + "update #1"));
out.collect(Tuple2.of(key, message + "update #2"));
}
});

Expand All @@ -72,12 +77,13 @@ public String map(Long value) throws Exception {
List<TransportAddress> transports = new ArrayList<>();
transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element, parameterTool));
}
}));
source.addSink(new ElasticsearchSink<>(
userConfig,
transports,
(Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
indexer.add(createIndexRequest(element.f1, parameterTool));
indexer.add(createUpdateRequest(element, parameterTool));
}));

env.execute("Elasticsearch1.x end to end sink test example");
}
Expand All @@ -92,4 +98,16 @@ private static IndexRequest createIndexRequest(String element, ParameterTool par
.id(element)
.source(json);
}

private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
Map<String, Object> json = new HashMap<>();
json.put("data", element.f1);

return new UpdateRequest(
parameterTool.getRequired("index"),
parameterTool.getRequired("type"),
element.f0)
.doc(json)
.upsert(json);
}
}
Loading

0 comments on commit c4beb3a

Please sign in to comment.