Skip to content

Commit

Permalink
[FLINK-3882] [docs] Fix errors in sample Java code for the Elasticsea…
Browse files Browse the repository at this point in the history
…rch2 sink

This closes apache#1971
  • Loading branch information
markreddy authored and fhueske committed May 10, 2016
1 parent 08e8054 commit 43bd6f6
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions docs/apis/streaming/connectors/elasticsearch2.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,19 @@ transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));

input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element): IndexRequest = {
Map<String,String> json = new HashMap<>()
json.put("data", element)
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);

}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element))
indexer.add(createIndexRequest(element));
}
}));
{% endhighlight %}
Expand All @@ -116,10 +115,10 @@ input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFun
json.put("data", element)
Requests.indexRequest.index("my-index").`type`("my-type").source(json)
}

override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
indexer.add(createIndexRequest(element))
}
}
}))
{% endhighlight %}
</div>
Expand All @@ -139,7 +138,7 @@ This will buffer elements and Action Requests before sending to the cluster. The
* **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
settings in milliseconds

This now provides a list of Elasticsearch Nodes
This now provides a list of Elasticsearch Nodes
to which the sink should connect via a `TransportClient`.

More about information about Elasticsearch can be found [here](https://elastic.co).

0 comments on commit 43bd6f6

Please sign in to comment.