Skip to content

Commit

Permalink
[FLINK-18361][es][table] Support username and password options for ne…
Browse files Browse the repository at this point in the history
…w Elasticsearch connector

Co-authored-by: zhisheng17 <[email protected]>

This closes apache#12715
  • Loading branch information
KarmaGYZ committed Jul 10, 2020
1 parent 69ef4b7 commit ee65377
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 5 deletions.
16 changes: 15 additions & 1 deletion docs/dev/table/connectors/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ Connector Options
<td>String</td>
<td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"."</td>
</tr>
<tr>
<td><h5>username</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Username used to connect to Elasticsearch instance. Please notice that Elasticsearch doesn't pre-bundled security feature, but you can enable it by following the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/configuring-security.html">guideline</a> to secure an Elasticsearch cluster.</td>
</tr>
<tr>
<td><h5>password</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Password used to connect to Elasticsearch instance. If <code>username</code> is configured, this option must be configured with non-empty string as well.</td>
</tr>
<tr>
<td><h5>failure-handler</h5></td>
<td>optional</td>
Expand Down Expand Up @@ -265,4 +279,4 @@ Data Type Mapping
Elasticsearch stores document in a JSON string. So the data type mapping is between Flink data type and JSON data type.
Flink uses built-in `'json'` format for Elasticsearch connector. Please refer to [JSON Format]({% link dev/table/connectors/formats/json.md %}) page for more type mapping details.

{% top %}
{% top %}
14 changes: 14 additions & 0 deletions docs/dev/table/connectors/elasticsearch.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ Connector Options
<td>String</td>
<td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"."</td>
</tr>
<tr>
<td><h5>username</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Username used to connect to Elasticsearch instance. Please notice that Elasticsearch doesn't pre-bundled security feature, but you can enable it by following the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/configuring-security.html">guideline</a> to secure an Elasticsearch cluster.</td>
</tr>
<tr>
<td><h5>password</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Password used to connect to Elasticsearch instance. If <code>username</code> is configured, this option must be configured with non-empty string as well.</td>
</tr>
<tr>
<td><h5>failure-handler</h5></td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION;

/**
* Accessor methods to elasticsearch options.
Expand Down Expand Up @@ -98,6 +100,14 @@ public long getBulkFlushInterval() {
return interval == 0 ? -1 : interval;
}

public Optional<String> getUsername() {
return config.getOptional(USERNAME_OPTION);
}

public Optional<String> getPassword() {
return config.getOptional(PASSWORD_OPTION);
}

public boolean isBulkFlushBackoffEnabled() {
return config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION) != ElasticsearchOptions.BackOffType.DISABLED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public enum BackOffType {
.stringType()
.noDefaultValue()
.withDescription("Elasticsearch document type.");
public static final ConfigOption<String> PASSWORD_OPTION =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("Password used to connect to Elasticsearch instance.");
public static final ConfigOption<String> USERNAME_OPTION =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("Username used to connect to Elasticsearch instance.");
public static final ConfigOption<String> KEY_DELIMITER_OPTION =
ConfigOptions.key("document-id.key-delimiter")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.StringUtils;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -138,7 +143,14 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {

// we must overwrite the default factory which is defined with a lambda because of a bug
// in shading lambda serialization shading see FLINK-18006
builder.setRestClientFactory(new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
if (config.getUsername().isPresent()
&& config.getPassword().isPresent()
&& !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
&& !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
builder.setRestClientFactory(new AuthRestClientFactory(config.getPathPrefix().orElse(null), config.getUsername().get(), config.getPassword().get()));
} else {
builder.setRestClientFactory(new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
}

final ElasticsearchSink<RowData> sink = builder.build();

Expand Down Expand Up @@ -197,6 +209,56 @@ public int hashCode() {
}
}

/**
* Serializable {@link RestClientFactory} used by the sink which enable authentication.
*/
@VisibleForTesting
static class AuthRestClientFactory implements RestClientFactory {

private final String pathPrefix;
private final String username;
private final String password;
private transient CredentialsProvider credentialsProvider;

public AuthRestClientFactory(@Nullable String pathPrefix, String username, String password) {
this.pathPrefix = pathPrefix;
this.password = password;
this.username = username;
}

@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
if (pathPrefix != null) {
restClientBuilder.setPathPrefix(pathPrefix);
}
if (credentialsProvider == null) {
credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
}
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AuthRestClientFactory that = (AuthRestClientFactory) o;
return Objects.equals(pathPrefix, that.pathPrefix) &&
Objects.equals(username, that.username) &&
Objects.equals(password, that.password);
}

@Override
public int hashCode() {
return Objects.hash(pathPrefix, username, password);
}
}

/**
* Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the sink.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.StringUtils;

import java.util.Set;
import java.util.function.Supplier;
Expand All @@ -52,6 +53,8 @@
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION;

/**
* A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
Expand All @@ -75,7 +78,9 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory
BULK_FLUSH_BACKOFF_DELAY_OPTION,
CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
CONNECTION_PATH_PREFIX,
FORMAT_OPTION
FORMAT_OPTION,
PASSWORD_OPTION,
USERNAME_OPTION
).collect(Collectors.toSet());

@Override
Expand Down Expand Up @@ -133,6 +138,17 @@ private void validate(Elasticsearch6Configuration config, Configuration original
BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
config.getBulkFlushBackoffRetries().get())
);
if (config.getUsername().isPresent() && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
validate(
config.getPassword().isPresent() && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
() -> String.format(
"'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
USERNAME_OPTION.key(),
PASSWORD_OPTION.key(),
config.getUsername().get(),
config.getPassword().orElse("")
));
}
}

private static void validate(boolean condition, Supplier<String> message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,25 @@ public void validatePrimaryKeyOnIllegalColumn() {
.build()
);
}

@Test
public void validateWrongCredential() {
Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();

thrown.expect(ValidationException.class);
thrown.expectMessage(
"'username' and 'password' must be set at the same time. Got: username 'username' and password ''");
sinkFactory.createDynamicTableSink(
context()
.withSchema(TableSchema.builder()
.field("a", DataTypes.TIME())
.build())
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http:https://localhost:1234")
.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
.withOption(ElasticsearchOptions.USERNAME_OPTION.key(), "username")
.withOption(ElasticsearchOptions.PASSWORD_OPTION.key(), "")
.build()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class Elasticsearch6DynamicSinkTest {
private static final String SCHEMA = "https";
private static final String INDEX = "MyIndex";
private static final String DOC_TYPE = "MyType";
private static final String USERNAME = "username";
private static final String PASSWORD = "password";

@Test
public void testBuilder() {
Expand Down Expand Up @@ -114,6 +116,35 @@ public void testDefaultConfig() {
verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
}

@Test
public void testAuthConfig() {
final TableSchema schema = createTestSchema();
Configuration configuration = new Configuration();
configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + ":https://" + HOSTNAME + ":" + PORT);
configuration.setString(ElasticsearchOptions.USERNAME_OPTION.key(), USERNAME);
configuration.setString(ElasticsearchOptions.PASSWORD_OPTION.key(), PASSWORD);

BuilderProvider provider = new BuilderProvider();
final Elasticsearch6DynamicSink testSink = new Elasticsearch6DynamicSink(
new DummyEncodingFormat(),
new Elasticsearch6Configuration(configuration, this.getClass().getClassLoader()),
schema,
provider
);

testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();

verify(provider.builderSpy).setFailureHandler(new NoOpFailureHandler());
verify(provider.builderSpy).setBulkFlushBackoff(false);
verify(provider.builderSpy).setBulkFlushInterval(1000);
verify(provider.builderSpy).setBulkFlushMaxActions(1000);
verify(provider.builderSpy).setBulkFlushMaxSizeMb(2);
verify(provider.builderSpy).setRestClientFactory(new Elasticsearch6DynamicSink.AuthRestClientFactory(null, USERNAME, PASSWORD));
verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
}

private Configuration getConfig() {
Configuration configuration = new Configuration();
configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.StringUtils;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -138,7 +143,14 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {

// we must overwrite the default factory which is defined with a lambda because of a bug
// in shading lambda serialization shading see FLINK-18006
builder.setRestClientFactory(new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
if (config.getUsername().isPresent()
&& config.getPassword().isPresent()
&& !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
&& !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
builder.setRestClientFactory(new AuthRestClientFactory(config.getPathPrefix().orElse(null), config.getUsername().get(), config.getPassword().get()));
} else {
builder.setRestClientFactory(new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
}

final ElasticsearchSink<RowData> sink = builder.build();

Expand Down Expand Up @@ -197,6 +209,56 @@ public int hashCode() {
}
}

/**
* Serializable {@link RestClientFactory} used by the sink which enable authentication.
*/
@VisibleForTesting
static class AuthRestClientFactory implements RestClientFactory {

private final String pathPrefix;
private final String username;
private final String password;
private transient CredentialsProvider credentialsProvider;

public AuthRestClientFactory(@Nullable String pathPrefix, String username, String password) {
this.pathPrefix = pathPrefix;
this.password = password;
this.username = username;
}

@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
if (pathPrefix != null) {
restClientBuilder.setPathPrefix(pathPrefix);
}
if (credentialsProvider == null) {
credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
}
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AuthRestClientFactory that = (AuthRestClientFactory) o;
return Objects.equals(pathPrefix, that.pathPrefix) &&
Objects.equals(username, that.username) &&
Objects.equals(password, that.password);
}

@Override
public int hashCode() {
return Objects.hash(pathPrefix, password, username);
}
}

/**
* Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the sink.
*/
Expand Down
Loading

0 comments on commit ee65377

Please sign in to comment.