Skip to content

Commit

Permalink
[FLINK-12151][es1] Remove Elasticsearch 1 connector
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 11, 2019
1 parent cdf461b commit d4bef49
Show file tree
Hide file tree
Showing 20 changed files with 3 additions and 1,294 deletions.
169 changes: 0 additions & 169 deletions docs/dev/connectors/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ of the Elasticsearch installation:
</tr>
</thead>
<tbody>
<tr>
<td>flink-connector-elasticsearch{{ site.scala_version_suffix }}</td>
<td>1.0.0</td>
<td>1.x</td>
</tr>
<tr>
<td>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</td>
<td>1.0.0</td>
Expand Down Expand Up @@ -82,54 +77,6 @@ Elasticsearch cluster.
The example below shows how to configure and create a sink:

<div class="codetabs" markdown="1">
<div data-lang="java, Elasticsearch 1.x" markdown="1">
{% highlight java %}
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");

List<TransportAddress> transportAddresses = new ArrayList<String>();
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
{% endhighlight %}
</div>
<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
{% highlight java %}
import org.apache.flink.api.common.functions.RuntimeContext;
Expand Down Expand Up @@ -237,49 +184,6 @@ esSinkBuilder.setRestClientFactory(
input.addSink(esSinkBuilder.build());
{% endhighlight %}
</div>
<div data-lang="scala, Elasticsearch 1.x" markdown="1">
{% highlight scala %}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer

import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.common.transport.TransportAddress

import java.net.InetAddress
import java.util.ArrayList
import java.util.HashMap
import java.util.List
import java.util.Map

val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")

val transportAddresses = new java.util.ArrayList[TransportAddress]
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))

input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json)
}
}))
{% endhighlight %}
</div>
<div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
{% highlight scala %}
import org.apache.flink.api.common.functions.RuntimeContext
Expand Down Expand Up @@ -437,70 +341,6 @@ that this essentially means the sink will not provide any strong
delivery guarantees anymore, even with checkpoint for the topology enabled.
</p>

### Communication using Embedded Node (only for Elasticsearch 1.x)

For Elasticsearch versions 1.x, communication using an embedded node is
also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
for information about the differences between communicating with Elasticsearch
with an embedded node and a `TransportClient`.

Below is an example of how to create an `ElasticsearchSink` use an
embedded node instead of a `TransportClient`:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
DataStream<String> input = ...;

Map<String, String> config = new HashMap<>;
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");

input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")

input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json)
}
}))
{% endhighlight %}
</div>
</div>

The difference is that now we do not need to provide a list of addresses
of Elasticsearch nodes.

### Handling Failing Elasticsearch Requests

Elasticsearch action requests may fail due to a variety of reasons, including
Expand Down Expand Up @@ -596,15 +436,6 @@ all the pending requests. This also means that if re-added requests never
succeed, the checkpoint will never finish.
</p>

<p style="border-radius: 5px; padding: 5px" class="bg-warning">
<b>Failure handling for Elasticsearch 1.x</b>: For Elasticsearch 1.x, it
is not feasible to match the type of the failure because the exact type
could not be retrieved through the older version Java client APIs (thus,
the types will be general <b>Exception</b>s and only differ in the
failure message). In this case, it is recommended to match on the
provided REST status code.
</p>

### Configuring the Internal Bulk Processor

The internal `BulkProcessor` can be further configured for its behaviour
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ under the License.

<!-- Allow users to pass custom connector versions -->
<properties>
<elasticsearch.version>1.7.1</elasticsearch.version>
<elasticsearch.version>2.3.5</elasticsearch.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -510,7 +510,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {

if (mockItemFailure == null) {
// the mock response for the item is success
mockResponses[i] = new BulkItemResponse(i, "opType", mock(ActionResponse.class));
mockResponses[i] = new BulkItemResponse(i, "opType", mock(ActionWriteResponse.class));
} else {
// the mock response for the item is failure
mockResponses[i] = new BulkItemResponse(i, "opType", new BulkItemResponse.Failure("index", "type", "id", mockItemFailure));
Expand Down
Loading

0 comments on commit d4bef49

Please sign in to comment.