Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10066] Add support for ValueProviders in RedisConnectionConfiguration #11799

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Protocol;
Expand All @@ -33,66 +34,90 @@
@AutoValue
public abstract class RedisConnectionConfiguration implements Serializable {

abstract String host();
abstract ValueProvider<String> host();

abstract int port();
abstract ValueProvider<Integer> port();

@Nullable
abstract String auth();
abstract ValueProvider<String> auth();

abstract int timeout();
abstract ValueProvider<Integer> timeout();

abstract boolean ssl();
abstract ValueProvider<Boolean> ssl();

abstract Builder builder();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setHost(String host);
abstract Builder setHost(ValueProvider<String> host);

abstract Builder setPort(int port);
abstract Builder setPort(ValueProvider<Integer> port);

abstract Builder setAuth(String auth);
abstract Builder setAuth(ValueProvider<String> auth);

abstract Builder setTimeout(int timeout);
abstract Builder setTimeout(ValueProvider<Integer> timeout);

abstract Builder setSsl(boolean ssl);
abstract Builder setSsl(ValueProvider<Boolean> ssl);

abstract RedisConnectionConfiguration build();
}

public static RedisConnectionConfiguration create() {
return new AutoValue_RedisConnectionConfiguration.Builder()
.setHost(Protocol.DEFAULT_HOST)
.setPort(Protocol.DEFAULT_PORT)
.setTimeout(Protocol.DEFAULT_TIMEOUT)
.setSsl(false)
.setHost(ValueProvider.StaticValueProvider.of(Protocol.DEFAULT_HOST))
.setPort(ValueProvider.StaticValueProvider.of(Protocol.DEFAULT_PORT))
.setTimeout(ValueProvider.StaticValueProvider.of(Protocol.DEFAULT_TIMEOUT))
.setSsl(ValueProvider.StaticValueProvider.of(false))
.build();
}

public static RedisConnectionConfiguration create(String host, int port) {
return create(
ValueProvider.StaticValueProvider.of(host), ValueProvider.StaticValueProvider.of(port));
}

public static RedisConnectionConfiguration create(
ValueProvider<String> host, ValueProvider<Integer> port) {
return new AutoValue_RedisConnectionConfiguration.Builder()
.setHost(host)
.setPort(port)
.setTimeout(Protocol.DEFAULT_TIMEOUT)
.setSsl(false)
.setTimeout(ValueProvider.StaticValueProvider.of(Protocol.DEFAULT_TIMEOUT))
.setSsl(ValueProvider.StaticValueProvider.of(false))
.build();
}

/** Define the host name of the Redis server. */
public RedisConnectionConfiguration withHost(String host) {
checkArgument(host != null, "host can not be null");
return withHost(ValueProvider.StaticValueProvider.of(host));
}

/** See {@link RedisConnectionConfiguration#withHost(String)}. */
public RedisConnectionConfiguration withHost(ValueProvider<String> host) {
checkArgument(host != null, "host can not be null");
return builder().setHost(host).build();
}

/** Define the port number of the Redis server. */
public RedisConnectionConfiguration withPort(int port) {
checkArgument(port > 0, "port can not be negative or 0");
return withPort(ValueProvider.StaticValueProvider.of(port));
}

/** See {@link RedisConnectionConfiguration#withPort(int)}. */
public RedisConnectionConfiguration withPort(ValueProvider<Integer> port) {
checkArgument(port != null, "port can not be null");
return builder().setPort(port).build();
}

/** Define the password to authenticate on the Redis server. */
public RedisConnectionConfiguration withAuth(String auth) {
checkArgument(auth != null, "auth can not be null");
return withAuth(ValueProvider.StaticValueProvider.of(auth));
}

/** See {@link RedisConnectionConfiguration#withAuth(String)}. */
public RedisConnectionConfiguration withAuth(ValueProvider<String> auth) {
checkArgument(auth != null, "auth can not be null");
return builder().setAuth(auth).build();
}
Expand All @@ -102,19 +127,30 @@ public RedisConnectionConfiguration withAuth(String auth) {
*/
public RedisConnectionConfiguration withTimeout(int timeout) {
checkArgument(timeout >= 0, "timeout can not be negative");
return withTimeout(ValueProvider.StaticValueProvider.of(timeout));
}

/** See {@link RedisConnectionConfiguration#withTimeout(int)}. */
public RedisConnectionConfiguration withTimeout(ValueProvider<Integer> timeout) {
checkArgument(timeout != null, "timeout can not be null");
return builder().setTimeout(timeout).build();
}

/** Enable SSL connection to Redis server. */
public RedisConnectionConfiguration enableSSL() {
return builder().setSsl(true).build();
return withSSL(ValueProvider.StaticValueProvider.of(true));
}

/** Define if a SSL connection to Redis server should be used. */
public RedisConnectionConfiguration withSSL(ValueProvider<Boolean> ssl) {
return builder().setSsl(ssl).build();
}

/** Connect to the Redis instance. */
public Jedis connect() {
Jedis jedis = new Jedis(host(), port(), timeout(), ssl());
Jedis jedis = new Jedis(host().get(), port().get(), timeout().get(), ssl().get());
if (auth() != null) {
jedis.auth(auth());
jedis.auth(auth().get());
}
return jedis;
}
Expand Down