Skip to content

Commit

Permalink
[FLINK-8199] [elasticsearch] Properly annotate APIs of Elasticsearch …
Browse files Browse the repository at this point in the history
…connector

This closes apache#5124.
  • Loading branch information
zhangminglei authored and tzulitai committed Jan 12, 2018
1 parent 5a318de commit 9b5fce6
Show file tree
Hide file tree
Showing 18 changed files with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.PublicEvolving;

import org.elasticsearch.action.ActionRequest;

import java.io.Serializable;
Expand Down Expand Up @@ -56,6 +58,7 @@
* could not be retrieved through the older version Java client APIs (thus, the types will be general {@link Exception}s
* and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
*/
@PublicEvolving
public interface ActionRequestFailureHandler extends Serializable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.Internal;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;

Expand All @@ -29,6 +31,7 @@
* Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
* {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
*/
@Internal
class BulkProcessorIndexer implements RequestIndexer {

private final BulkProcessor bulkProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.Internal;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.Client;
Expand All @@ -36,6 +38,7 @@
* is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold
* exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
*/
@Internal
public interface ElasticsearchApiCallBridge extends Serializable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -61,6 +62,7 @@
*
* @param <T> Type of the elements handled by this sink
*/
@Internal
public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> implements CheckpointedFunction {

private static final long serialVersionUID = -1007596293618451942L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;

Expand Down Expand Up @@ -56,6 +57,7 @@
*
* @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
*/
@PublicEvolving
public interface ElasticsearchSinkFunction<T> extends Serializable, Function {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.PublicEvolving;

import org.elasticsearch.action.ActionRequest;

/**
* Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare
* them for sending to an Elasticsearch cluster.
*/
@PublicEvolving
public interface RequestIndexer {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.streaming.connectors.elasticsearch.util;

import org.apache.flink.annotation.Internal;

import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;

Expand All @@ -28,6 +30,7 @@
/**
* Suite of utility methods for Elasticsearch.
*/
@Internal
public class ElasticsearchUtils {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.elasticsearch.util;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

Expand All @@ -25,6 +26,7 @@
/**
* An {@link ActionRequestFailureHandler} that simply fails the sink on any failures.
*/
@Internal
public class NoOpFailureHandler implements ActionRequestFailureHandler {

private static final long serialVersionUID = 737941343410827885L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.elasticsearch.util;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.ExceptionUtils;
Expand All @@ -30,6 +31,7 @@
* {@link EsRejectedExecutionException}s (which means that Elasticsearch node queues are currently full),
* and fails for all other failures.
*/
@PublicEvolving
public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {

private static final long serialVersionUID = -7423562912824511906L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;

import org.elasticsearch.action.bulk.BulkItemResponse;
Expand All @@ -40,6 +41,7 @@
/**
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
*/
@Internal
public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {

private static final long serialVersionUID = -2632363720584123682L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;

import org.elasticsearch.action.ActionRequest;
Expand Down Expand Up @@ -62,6 +63,7 @@
*
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;

/**
* A dummy {@link ElasticsearchSinkFunction} that wraps a {@link IndexRequestBuilder}.
* This serves as a bridge for the usage deprecation of the {@code IndexRequestBuilder} interface.
*/
@Internal
class IndexRequestBuilderWrapperFunction<T> implements ElasticsearchSinkFunction<T> {

private static final long serialVersionUID = 289876038414250101L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.elasticsearch2;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
Expand All @@ -42,6 +43,7 @@
/**
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
*/
@Internal
public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {

private static final long serialVersionUID = 2638252694744361079L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.elasticsearch2;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
Expand Down Expand Up @@ -56,6 +57,7 @@
*
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.flink.streaming.connectors.elasticsearch2;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

/**
* A dummy {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} to bridge
* the migration from the deprecated {@link ElasticsearchSinkFunction}.
*/
@Internal
class OldNewElasticsearchSinkFunctionBridge<T> implements org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> {

private static final long serialVersionUID = 2415651895272659448L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.flink.streaming.connectors.elasticsearch2;

import org.apache.flink.annotation.Internal;

import org.elasticsearch.action.ActionRequest;

/**
* A dummy {@link org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer} to bridge
* the migration from the deprecated {@link RequestIndexer}.
*/
@Internal
class OldNewRequestIndexerBridge implements RequestIndexer {

private static final long serialVersionUID = 4213982619497149416L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.elasticsearch5;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
Expand Down Expand Up @@ -45,6 +46,7 @@
/**
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
*/
@Internal
public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {

private static final long serialVersionUID = -5222683870097809633L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.elasticsearch5;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
Expand Down Expand Up @@ -57,6 +58,7 @@
*
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {

private static final long serialVersionUID = 1L;
Expand Down

0 comments on commit 9b5fce6

Please sign in to comment.