Skip to content

Commit

Permalink
[FLINK-11431][runtime] Upgrade akka to 2.5
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 18, 2019
1 parent edaad52 commit 97e6b7c
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 32 deletions.
18 changes: 9 additions & 9 deletions NOTICE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ This project bundles the following dependencies under the Apache Software Licens
- com.twitter:chill-java:0.7.6
- com.twitter:chill_2.11:0.7.6
- com.typesafe:config:1.3.0
- com.typesafe:ssl-config-core_2.11:0.2.1
- com.typesafe.akka:akka-actor_2.11:2.4.20
- com.typesafe.akka:akka-camel_2.11:2.4.20
- com.typesafe.akka:akka-protobuf_2.11:2.4.20
- com.typesafe.akka:akka-slf4j_2.11:2.4.20
- com.typesafe.akka:akka-stream_2.11:2.4.20
- com.typesafe:ssl-config-core_2.11:0.3.7
- com.typesafe.akka:akka-actor_2.11:2.5.1
- com.typesafe.akka:akka-camel_2.11:2.5.21
- com.typesafe.akka:akka-protobuf_2.11:2.5.21
- com.typesafe.akka:akka-slf4j_2.11:2.5.21
- com.typesafe.akka:akka-stream_2.11:2.5.21
- commons-cli:commons-cli:1.3.1
- commons-collections:commons-collections:3.2.2
- commons-io:commons-io:2.4
Expand All @@ -64,7 +64,7 @@ The following dependencies all share the same BSD license which you find under l
- org.scala-lang:scala-library:2.11.12
- org.scala-lang:scala-reflect:2.11.12
- org.scala-lang.modules:scala-java8-compat_2.11:0.7.0
- org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4
- org.scala-lang.modules:scala-parser-combinators_2.11:1.1.1
- org.scala-lang.modules:scala-xml_2.11:1.0.5

This project bundles the following dependencies under the MIT/X11 license.
Expand All @@ -80,7 +80,7 @@ but some files are heavily based on public domain code written by Igor Pavlov.

This project bundles the following dependencies under the Creative Commons CC0 "No Rights Reserved".

- org.reactivestreams:reactive-streams:1.0.0
- org.reactivestreams:reactive-streams:1.0.2

This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
Expand All @@ -106,7 +106,7 @@ Copyright 2014-2019 The Apache Software Foundation

This project bundles the following dependencies under the Apache Software License 2.0. (http:https://www.apache.org/licenses/LICENSE-2.0.txt)

- com.typesafe.akka:akka-remote_2.11:2.4.20
- com.typesafe.akka:akka-remote_2.11:2.5.21
- io.netty:netty:3.10.6.Final
- org.apache.zookeeper:zookeeper:3.4.10
- org.uncommons.maths:uncommons-maths:1.2.2a
Expand Down
16 changes: 8 additions & 8 deletions flink-dist/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ This project bundles the following dependencies under the Apache Software Licens
- com.twitter:chill-java:0.7.6
- com.twitter:chill_2.11:0.7.6
- com.typesafe:config:1.3.0
- com.typesafe:ssl-config-core_2.11:0.2.1
- com.typesafe.akka:akka-actor_2.11:2.4.20
- com.typesafe.akka:akka-camel_2.11:2.4.20
- com.typesafe.akka:akka-protobuf_2.11:2.4.20
- com.typesafe.akka:akka-slf4j_2.11:2.4.20
- com.typesafe.akka:akka-stream_2.11:2.4.20
- com.typesafe:ssl-config-core_2.11:0.3.7
- com.typesafe.akka:akka-actor_2.11:2.5.1
- com.typesafe.akka:akka-camel_2.11:2.5.21
- com.typesafe.akka:akka-protobuf_2.11:2.5.21
- com.typesafe.akka:akka-slf4j_2.11:2.5.21
- com.typesafe.akka:akka-stream_2.11:2.5.21
- commons-cli:commons-cli:1.3.1
- commons-collections:commons-collections:3.2.2
- commons-io:commons-io:2.4
Expand All @@ -41,7 +41,7 @@ The following dependencies all share the same BSD license which you find under l
- org.scala-lang:scala-library:2.11.12
- org.scala-lang:scala-reflect:2.11.12
- org.scala-lang.modules:scala-java8-compat_2.11:0.7.0
- org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4
- org.scala-lang.modules:scala-parser-combinators_2.11:1.1.1
- org.scala-lang.modules:scala-xml_2.11:1.0.5

This project bundles the following dependencies under the MIT/X11 license.
Expand All @@ -57,4 +57,4 @@ but some files are heavily based on public domain code written by Igor Pavlov.

This project bundles the following dependencies under the Creative Commons CC0 "No Rights Reserved".

- org.reactivestreams:reactive-streams:1.0.0
- org.reactivestreams:reactive-streams:1.0.2
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void close(boolean cleanup) throws Exception {
Throwable exception = null;

try {
actorSystem.shutdown();
actorSystem.terminate();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.UntypedActor;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,7 +80,7 @@
*
* @param <T> Type of the {@link RpcEndpoint}
*/
class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {

protected final Logger log = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -135,12 +136,16 @@ public void postStop() throws Exception {
}

@Override
public void onReceive(final Object message) {
if (message instanceof RemoteHandshakeMessage) {
handleHandshakeMessage((RemoteHandshakeMessage) message);
} else if (message instanceof ControlMessages) {
handleControlMessage(((ControlMessages) message));
} else if (state.isRunning()) {
public Receive createReceive() {
return ReceiveBuilder.create()
.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
.match(ControlMessages.class, this::handleControlMessage)
.matchAny(this::handleMessage)
.build();
}

private void handleMessage(final Object message) {
if (state.isRunning()) {
mainThreadValidator.enterMainThread();

try {
Expand Down
2 changes: 1 addition & 1 deletion flink-runtime/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The Apache Software Foundation (http:https://www.apache.org/).

This project bundles the following dependencies under the Apache Software License 2.0. (http:https://www.apache.org/licenses/LICENSE-2.0.txt)

- com.typesafe.akka:akka-remote_2.11:2.4.20
- com.typesafe.akka:akka-remote_2.11:2.5.21
- io.netty:netty:3.10.6.Final
- org.apache.zookeeper:zookeeper:3.4.10
- org.uncommons.maths:uncommons-maths:1.2.2a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public void testAkkaRpcServiceShutDownWithRpcEndpoints() throws Exception {
}

terminationFuture.get();
assertThat(akkaRpcService.getActorSystem().isTerminated(), is(true));
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted(), is(true));
} finally {
RpcUtils.terminateRpcService(akkaRpcService, TIMEOUT);
}
Expand Down Expand Up @@ -363,7 +363,7 @@ public void testAkkaRpcServiceShutDownWithFailingRpcEndpoints() throws Exception
assertThat(ExceptionUtils.findThrowable(e, OnStopException.class).isPresent(), is(true));
}

assertThat(akkaRpcService.getActorSystem().isTerminated(), is(true));
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted(), is(true));
}

private Collection<CompletableFuture<Void>> startStopNCountingAsynchronousOnStopEndpoints(AkkaRpcService akkaRpcService, int numberActors) throws Exception {
Expand All @@ -381,7 +381,7 @@ private Collection<CompletableFuture<Void>> startStopNCountingAsynchronousOnStop
CompletableFuture<Void> terminationFuture = akkaRpcService.stopService();

assertThat(terminationFuture.isDone(), is(false));
assertThat(akkaRpcService.getActorSystem().isTerminated(), is(false));
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted(), is(false));

countDownLatch.await();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private Inet6Address getLocalIPv6Address() {
ActorSystem as = AkkaUtils.createActorSystem(
new Configuration(),
new Some<scala.Tuple2<String, Object>>(new scala.Tuple2<String, Object>(addr.getHostAddress(), port)));
as.shutdown();
as.terminate();

log.info("Using address " + addr);
return (Inet6Address) addr;
Expand Down
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ under the License.
<log4j.configuration>log4j-test.properties</log4j.configuration>
<flink.shaded.version>6.0</flink.shaded.version>
<guava.version>18.0</guava.version>
<akka.version>2.4.20</akka.version>
<akka.version>2.5.21</akka.version>
<java.version>1.8</java.version>
<slf4j.version>1.7.15</slf4j.version>
<log4j.version>1.2.17</log4j.version>
Expand Down Expand Up @@ -533,6 +533,12 @@ under the License.
<version>${akka.version}</version>
</dependency>

<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
Expand Down

0 comments on commit 97e6b7c

Please sign in to comment.