Skip to content

Commit

Permalink
[FLINK-15670][core] Provide a utility function to flatten a recursive {
Browse files Browse the repository at this point in the history
…@link Properties} to a first level property HashTable

In some cases, {@code KafkaProducer#propsToMap} for example, Properties is used purely as a HashTable without considering its default properties.
  • Loading branch information
curcur authored and pnowojski committed May 18, 2020
1 parent 591aebc commit 03f5d54
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
23 changes: 23 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.slf4j.Logger;

import java.util.Collections;
import java.util.Properties;

/**
Expand Down Expand Up @@ -108,6 +109,28 @@ public static boolean getBoolean(Properties config, String key, boolean defaultV
}
}

/**
* Flatten a recursive {@link Properties} to a first level property map.
*
* <p>In some cases, {@code KafkaProducer#propsToMap} for example, Properties is used purely as a HashTable
* without considering its default properties.
*
* @param config Properties to be flattened
* @return Properties without defaults; all properties are put in the first-level
*/
public static Properties flatten(Properties config) {
final Properties flattenProperties = new Properties();

Collections.list(config.propertyNames()).stream().forEach(
name -> {
Preconditions.checkArgument(name instanceof String);
flattenProperties.setProperty((String) name, config.getProperty((String) name));
}
);

return flattenProperties;
}

// ------------------------------------------------------------------------

/** Private default constructor to prevent instantiation. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import org.junit.Assert;
import org.junit.Test;

import java.util.Properties;

import static org.apache.flink.util.PropertiesUtil.flatten;

/**
* Tests for the {@link PropertiesUtil}.
*/
public class PropertiesUtilTest {

@Test
public void testFlatten() {
// default Properties is null
Properties prop1 = new Properties();
prop1.put("key1", "value1");

// default Properties is prop1
Properties prop2 = new Properties(prop1);
prop2.put("key2", "value2");

// default Properties is prop2
Properties prop3 = new Properties(prop2);
prop3.put("key3", "value3");

Properties flattened = flatten(prop3);
Assert.assertEquals(flattened.get("key1"), "value1");
Assert.assertEquals(flattened.get("key2"), "value2");
Assert.assertEquals(flattened.get("key3"), "value3");
}
}

0 comments on commit 03f5d54

Please sign in to comment.