Skip to content

Commit

Permalink
[FLINK-8489][ES] Prevent side-effects when modifying user-config
Browse files Browse the repository at this point in the history
This closes apache#5378.
This closes apache#4847.
This closes apache#5305.
This closes apache#5208.
This closes apache#2192.
This closes apache#2422.
This closes apache#3478.
  • Loading branch information
zentol committed Jan 31, 2018
1 parent 8cf2be7 commit 91cf60b
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -216,6 +217,9 @@ public ElasticsearchSinkBase(

checkNotNull(userConfig);

// copy config so we can remove entries without side-effects
userConfig = new HashMap<>(userConfig);

ParameterTool params = ParameterTool.fromMap(userConfig);

if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@
*/
public class ElasticsearchSinkBaseTest {

/**
* Verifies that the collection given to the sink is not modified.
*/
@Test
public void testCollectionArgumentNotModified() {
Map<String, String> userConfig = new HashMap<>();
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, "1");
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, "true");
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, "1");
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, "CONSTANT");
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "1");

new DummyElasticsearchSink<>(
Collections.unmodifiableMap(userConfig),
new SimpleSinkFunction<String>(),
new NoOpFailureHandler());
}

/** Tests that any item failure in the listener callbacks is rethrown on an immediately following invoke call. */
@Test
public void testItemFailureRethrownOnInvoke() throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void runTransportClientFailsTest() throws Exception {
userConfig.put("cluster.name", "my-transport-client-cluster");

source.addSink(createElasticsearchSinkForEmbeddedNode(
userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
Collections.unmodifiableMap(userConfig), new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));

try {
env.execute("Elasticsearch Transport Client Test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -113,13 +114,14 @@ protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(

// Elasticsearch 1.x requires this setting when using
// LocalTransportAddress to connect to a local embedded node
userConfig = new HashMap<>(userConfig);
userConfig.put("node.local", "true");

List<TransportAddress> transports = new ArrayList<>();
transports.add(new LocalTransportAddress("1"));

return new ElasticsearchSink<>(
userConfig,
Collections.unmodifiableMap(userConfig),
transports,
elasticsearchSinkFunction);
}
Expand Down

0 comments on commit 91cf60b

Please sign in to comment.