Skip to content

Commit

Permalink
Merge pull request apache#11799: [BEAM-10066] Add support for ValuePr…
Browse files Browse the repository at this point in the history
…oviders in RedisConnectionConfiguration
  • Loading branch information
iemejia committed Jun 7, 2020
2 parents 42faa44 + c879e00 commit f70dc1e
Showing 1 changed file with 49 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Protocol;
Expand All @@ -33,67 +34,82 @@
@AutoValue
public abstract class RedisConnectionConfiguration implements Serializable {

abstract String host();
abstract ValueProvider<String> host();

abstract int port();
abstract ValueProvider<Integer> port();

@Nullable
abstract String auth();
abstract ValueProvider<String> auth();

abstract int timeout();
abstract ValueProvider<Integer> timeout();

abstract boolean ssl();
abstract ValueProvider<Boolean> ssl();

abstract Builder builder();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setHost(String host);
abstract Builder setHost(ValueProvider<String> host);

abstract Builder setPort(int port);
abstract Builder setPort(ValueProvider<Integer> port);

abstract Builder setAuth(String auth);
abstract Builder setAuth(ValueProvider<String> auth);

abstract Builder setTimeout(int timeout);
abstract Builder setTimeout(ValueProvider<Integer> timeout);

abstract Builder setSsl(boolean ssl);
abstract Builder setSsl(ValueProvider<Boolean> ssl);

abstract RedisConnectionConfiguration build();
}

public static RedisConnectionConfiguration create() {
return new AutoValue_RedisConnectionConfiguration.Builder()
.setHost(Protocol.DEFAULT_HOST)
.setPort(Protocol.DEFAULT_PORT)
.setTimeout(Protocol.DEFAULT_TIMEOUT)
.setSsl(false)
.setHost(ValueProvider.StaticValueProvider.of(Protocol.DEFAULT_HOST))
.setPort(ValueProvider.StaticValueProvider.of(Protocol.DEFAULT_PORT))
.setTimeout(ValueProvider.StaticValueProvider.of(Protocol.DEFAULT_TIMEOUT))
.setSsl(ValueProvider.StaticValueProvider.of(Boolean.FALSE))
.build();
}

public static RedisConnectionConfiguration create(String host, int port) {
return new AutoValue_RedisConnectionConfiguration.Builder()
.setHost(host)
.setPort(port)
.setTimeout(Protocol.DEFAULT_TIMEOUT)
.setSsl(false)
.build();
return create().withHost(host).withPort(port);
}

public static RedisConnectionConfiguration create(
ValueProvider<String> host, ValueProvider<Integer> port) {
return create().withHost(host).withPort(port);
}

/** Define the host name of the Redis server. */
public RedisConnectionConfiguration withHost(String host) {
checkArgument(host != null, "host can not be null");
return withHost(ValueProvider.StaticValueProvider.of(host));
}

/** See {@link RedisConnectionConfiguration#withHost(String)}. */
public RedisConnectionConfiguration withHost(ValueProvider<String> host) {
return builder().setHost(host).build();
}

/** Define the port number of the Redis server. */
public RedisConnectionConfiguration withPort(int port) {
checkArgument(port > 0, "port can not be negative or 0");
return withPort(ValueProvider.StaticValueProvider.of(port));
}

/** See {@link RedisConnectionConfiguration#withPort(int)}. */
public RedisConnectionConfiguration withPort(ValueProvider<Integer> port) {
return builder().setPort(port).build();
}

/** Define the password to authenticate on the Redis server. */
public RedisConnectionConfiguration withAuth(String auth) {
checkArgument(auth != null, "auth can not be null");
return withAuth(ValueProvider.StaticValueProvider.of(auth));
}

/** See {@link RedisConnectionConfiguration#withAuth(String)}. */
public RedisConnectionConfiguration withAuth(ValueProvider<String> auth) {
return builder().setAuth(auth).build();
}

Expand All @@ -102,19 +118,29 @@ public RedisConnectionConfiguration withAuth(String auth) {
*/
public RedisConnectionConfiguration withTimeout(int timeout) {
checkArgument(timeout >= 0, "timeout can not be negative");
return withTimeout(ValueProvider.StaticValueProvider.of(timeout));
}

/** See {@link RedisConnectionConfiguration#withTimeout(int)}. */
public RedisConnectionConfiguration withTimeout(ValueProvider<Integer> timeout) {
return builder().setTimeout(timeout).build();
}

/** Enable SSL connection to Redis server. */
public RedisConnectionConfiguration enableSSL() {
return builder().setSsl(true).build();
return withSSL(ValueProvider.StaticValueProvider.of(Boolean.TRUE));
}

/** Define if a SSL connection to Redis server should be used. */
public RedisConnectionConfiguration withSSL(ValueProvider<Boolean> ssl) {
return builder().setSsl(ssl).build();
}

/** Connect to the Redis instance. */
public Jedis connect() {
Jedis jedis = new Jedis(host(), port(), timeout(), ssl());
Jedis jedis = new Jedis(host().get(), port().get(), timeout().get(), ssl().get());
if (auth() != null) {
jedis.auth(auth());
jedis.auth(auth().get());
}
return jedis;
}
Expand Down

0 comments on commit f70dc1e

Please sign in to comment.