Skip to content

Commit

Permalink
Merge pull request apache#4358 from iemejia/BEAM-3373-add-serviceEndp…
Browse files Browse the repository at this point in the history
…oint

[BEAM-3373] Add serviceEndpoint parameter to KinesisIO
  • Loading branch information
jbonofre committed Jan 8, 2018
2 parents d2ec896 + 35846e3 commit 192b4c7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
Expand Down Expand Up @@ -197,12 +198,26 @@ public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) {
}

/**
* Specify credential details and region to be used to read from Kinesis.
* If you need more sophisticated credential protocol, then you should look at
* {@link Read#withAWSClientsProvider(AWSClientsProvider)}.
* Specify credential details and region to be used to read from Kinesis. If you need more
* sophisticated credential protocol, then you should look at {@link
* Read#withAWSClientsProvider(AWSClientsProvider)}.
*/
public Read withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) {
return withAWSClientsProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
}

/**
* Specify credential details and region to be used to read from Kinesis. If you need more
* sophisticated credential protocol, then you should look at {@link
* Read#withAWSClientsProvider(AWSClientsProvider)}.
*
* <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
* the tests with a kinesis service emulator.
*/
public Read withAWSClientsProvider(
String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) {
return withAWSClientsProvider(
new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
}

/** Specifies to read at most a given number of records. */
Expand Down Expand Up @@ -249,42 +264,50 @@ public PCollection<KinesisRecord> expand(PBegin input) {
}

private static final class BasicKinesisProvider implements AWSClientsProvider {

private final String accessKey;
private final String secretKey;
private final Regions region;
@Nullable private final String serviceEndpoint;

private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
private BasicKinesisProvider(
String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) {
checkArgument(accessKey != null, "accessKey can not be null");
checkArgument(secretKey != null, "secretKey can not be null");
checkArgument(region != null, "region can not be null");
this.accessKey = accessKey;
this.secretKey = secretKey;
this.region = region;
this.serviceEndpoint = serviceEndpoint;
}

private AWSCredentialsProvider getCredentialsProvider() {
return new AWSStaticCredentialsProvider(new BasicAWSCredentials(
accessKey,
secretKey
));

return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
}

@Override
public AmazonKinesis getKinesisClient() {
return AmazonKinesisClientBuilder.standard()
.withCredentials(getCredentialsProvider())
.withRegion(region)
.build();
AmazonKinesisClientBuilder clientBuilder =
AmazonKinesisClientBuilder.standard().withCredentials(getCredentialsProvider());
if (serviceEndpoint == null) {
clientBuilder.withRegion(region);
} else {
clientBuilder.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region.getName()));
}
return clientBuilder.build();
}

@Override
public AmazonCloudWatch getCloudWatchClient() {
return AmazonCloudWatchClientBuilder.standard()
.withCredentials(getCredentialsProvider())
.withRegion(region)
.build();
AmazonCloudWatchClientBuilder clientBuilder =
AmazonCloudWatchClientBuilder.standard().withCredentials(getCredentialsProvider());
if (serviceEndpoint == null) {
clientBuilder.withRegion(region);
} else {
clientBuilder.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region.getName()));
}
return clientBuilder.build();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public boolean start() throws IOException {

/**
* Moves to the next record in one of the shards.
* If current shard iterator can be move forward (i.e. there's a record present) then we do it.
* If current shard iterator can be moved forward (i.e. there's a record present) then we do it.
* If not, we iterate over shards in a round-robin manner.
*/
@Override
Expand Down

0 comments on commit 192b4c7

Please sign in to comment.