Skip to content

Commit

Permalink
[FLINK-18448][pubsub] Update Google Cloud PubSub dependencies
Browse files Browse the repository at this point in the history
This closes apache#12846
  • Loading branch information
nielsbasjes authored and rmetzger committed Jul 23, 2020
1 parent da210e0 commit fdcaa46
Show file tree
Hide file tree
Showing 21 changed files with 484 additions and 107 deletions.
2 changes: 1 addition & 1 deletion docs/dev/connectors/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
.withProjectName("my-fake-project")
.withSubscriptionName("subscription")
.withHostAndPortForEmulator(hostAndPort)
.build()
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(pubsubSource)
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/connectors/pubsub.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
.withProjectName("my-fake-project")
.withSubscriptionName("subscription")
.withHostAndPortForEmulator(hostAndPort)
.build()
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(pubsubSource)
Expand Down
30 changes: 19 additions & 11 deletions flink-connectors/flink-connector-gcp-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ under the License.

<packaging>jar</packaging>

<properties>
<google-cloud-libraries-bom.version>8.1.0</google-cloud-libraries-bom.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<!-- This is the way we get a consistent set of versions of the Google tools -->
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bom</artifactId>
<version>0.80.0-alpha</version>
<artifactId>libraries-bom</artifactId>
<version>${google-cloud-libraries-bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -56,18 +60,22 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-core</artifactId>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<!-- Version is pulled from google-cloud-bom -->
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-pubsub-v1</artifactId>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public PubSubSubscriber getSubscriber(Credentials credentials) throws IOExceptio

PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(channel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,28 @@
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
import org.apache.flink.util.Preconditions;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannel;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -97,17 +100,31 @@ public void open(Configuration configuration) throws Exception {
serializationSchema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));

Publisher.Builder builder = Publisher
.newBuilder(ProjectTopicName.of(projectName, topicName))
.newBuilder(TopicName.of(projectName, topicName))
.setCredentialsProvider(FixedCredentialsProvider.create(credentials));

// Having the host and port for the emulator means we are in a testing scenario.
if (hostAndPortForEmulator != null) {
managedChannel = ManagedChannelBuilder
.forTarget(hostAndPortForEmulator)
.usePlaintext(true) // This is 'Ok' because this is ONLY used for testing.
.usePlaintext() // This is 'Ok' because this is ONLY used for testing.
.build();
channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
builder.setChannelProvider(FixedTransportChannelProvider.create(channel))
.setCredentialsProvider(NoCredentialsProvider.create());
.setCredentialsProvider(EmulatorCredentialsProvider.create())
// In test scenarios we are limiting the Retry settings.
// The values here are based on the default settings with lower attempts and timeouts.
.setRetrySettings(
RetrySettings.newBuilder()
.setMaxAttempts(10)
.setTotalTimeout(Duration.ofSeconds(10))
.setInitialRetryDelay(Duration.ofMillis(100))
.setRetryDelayMultiplier(1.3)
.setMaxRetryDelay(Duration.ofSeconds(5))
.setInitialRpcTimeout(Duration.ofSeconds(5))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(Duration.ofSeconds(10))
.build());
}

publisher = builder.build();
Expand Down Expand Up @@ -157,7 +174,7 @@ private void shutdownManagedChannel() {
}

@Override
public void invoke(IN message, SinkFunction.Context context) throws Exception {
public void invoke(IN message, SinkFunction.Context context) {
PubsubMessage pubsubMessage = PubsubMessage
.newBuilder()
.setData(ByteString.copyFrom(serializationSchema.serialize(message)))
Expand Down Expand Up @@ -206,7 +223,7 @@ private void waitForFuturesToComplete() {
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
public void initializeState(FunctionInitializationContext context) {
}

/**
Expand Down Expand Up @@ -273,7 +290,13 @@ public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort) {
*/
public PubSubSink<IN> build() throws IOException {
if (credentials == null) {
credentials = defaultCredentialsProviderBuilder().build().getCredentials();
if (hostAndPort == null) {
// No hostAndPort is the normal scenario so we use the default credentials.
credentials = defaultCredentialsProviderBuilder().build().getCredentials();
} else {
// With hostAndPort the PubSub emulator is used so we do not have credentials.
credentials = EmulatorCredentials.getInstance();
}
}
return new PubSubSink<>(credentials, serializationSchema, projectName, topicName, hostAndPort);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public void run(SourceContext<OUT> sourceContext) throws Exception {
isRunning = false;
}
}
}

@Override
public void close() throws Exception {
subscriber.close();
}

Expand Down Expand Up @@ -223,13 +227,12 @@ private void createAndSetPubSubSubscriber() throws Exception {
* @param <OUT> The type of objects which will be read
*/
public static class PubSubSourceBuilder<OUT> implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
private PubSubDeserializationSchema<OUT> deserializationSchema;
private final PubSubDeserializationSchema<OUT> deserializationSchema;
private String projectName;
private String subscriptionName;

private PubSubSubscriberFactory pubSubSubscriberFactory;
private Credentials credentials;
private int maxMessageToAcknowledge = 10000;
private int messagePerSecondRateLimit = 100000;

private PubSubSourceBuilder(DeserializationSchema<OUT> deserializationSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void addAcknowledgeId(ACKID id) {
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
public void notifyCheckpointComplete(long checkpointId) {
//get all acknowledgeIds of this and earlier checkpoints
List<ACKID> idsToAcknowledge = acknowledgeIdsPerCheckpoint
.stream()
Expand All @@ -87,15 +87,15 @@ public void notifyCheckpointAborted(long checkpointId) {
}

@Override
public List<AcknowledgeIdsForCheckpoint<ACKID>> snapshotState(long checkpointId, long timestamp) throws Exception {
public List<AcknowledgeIdsForCheckpoint<ACKID>> snapshotState(long checkpointId, long timestamp) {
acknowledgeIdsPerCheckpoint.add(new AcknowledgeIdsForCheckpoint<>(checkpointId, acknowledgeIdsForPendingCheckpoint));
acknowledgeIdsForPendingCheckpoint = new ArrayList<>();

return acknowledgeIdsPerCheckpoint;
}

@Override
public void restoreState(List<AcknowledgeIdsForCheckpoint<ACKID>> state) throws Exception {
public void restoreState(List<AcknowledgeIdsForCheckpoint<ACKID>> state) {
outstandingAcknowledgements = new AtomicInteger(numberOfAcknowledgementIds(state));
acknowledgeIdsPerCheckpoint = state;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.gcp.pubsub.emulator;

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.OAuth2Credentials;

import java.time.Instant;
import java.util.Date;

import static java.lang.Long.MAX_VALUE;

/**
* A placeholder for credentials to signify that requests sent to the server should not be
* authenticated. This is typically useful when using local service emulators.
* NOTE: The Google provided NoCredentials and NoCredentialsProvider do not behave as expected
* See https://github.com/googleapis/gax-java/issues/1148
*/
public final class EmulatorCredentials extends OAuth2Credentials {
private static final EmulatorCredentials INSTANCE = new EmulatorCredentials();

private EmulatorCredentials() {
}

private Object readResolve() {
return INSTANCE;
}

public static EmulatorCredentials getInstance() {
return INSTANCE;
}

@Override
public boolean equals(Object obj) {
return this == obj;
}

@Override
public int hashCode() {
return System.identityHashCode(this);
}

@Override
public AccessToken refreshAccessToken() {
return new AccessToken("Dummy credentials for emulator", Date.from(Instant.ofEpochMilli(MAX_VALUE)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.gcp.pubsub.emulator;

import com.google.api.gax.core.CredentialsProvider;
import com.google.auth.Credentials;

/**
* A CredentialsProvider that simply provides the right credentials that are to be used for connecting to an emulator.
* NOTE: The Google provided NoCredentials and NoCredentialsProvider do not behave as expected.
* See https://github.com/googleapis/gax-java/issues/1148
*/
public final class EmulatorCredentialsProvider implements CredentialsProvider {
@Override
public Credentials getCredentials() {
return EmulatorCredentials.getInstance();
}

public static EmulatorCredentialsProvider create() {
return new EmulatorCredentialsProvider();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.gcp.pubsub;
package org.apache.flink.streaming.connectors.gcp.pubsub.emulator;

import org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;

Expand Down Expand Up @@ -57,7 +58,6 @@ public PubSubSubscriber getSubscriber(Credentials credentials) throws IOExceptio

PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(managedChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.google.pubsub.v1.ReceivedMessage;
import org.junit.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -134,7 +133,7 @@ public void testProducingMultipleResults() throws Exception {
PubSubSource<String> pubSubSource = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema() {
@Override
public void deserialize(byte[] message, Collector<String> out) throws IOException {
public void deserialize(byte[] message, Collector<String> out) {
String[] records = super.deserialize(message).split(",");
for (String record : records) {
out.collect(record);
Expand Down Expand Up @@ -234,7 +233,7 @@ public List<ReceivedMessage> pull() {
}

@Override
public void close() throws Exception {
public void close() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down
Loading

0 comments on commit fdcaa46

Please sign in to comment.