Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6488] Portable Flink runner support for running cross-language … #7709

Merged
merged 3 commits into from
Feb 9, 2019

Conversation

ihji
Copy link
Contributor

@ihji ihji commented Feb 2, 2019

Multi-language support in DefaultJobBundleFactory.

Current status:

  • DefaultJobBundleFactory reuses a set of grpc servers if a new environment shares the same URN.
  • DefaultJobBundleFactory initializes a set of grpc servers as instance variables and creates WrappedSdkHarnessClient from them.

Proposed changes:

  • Remove the usage of instance variables and refactor them to an immutable data structure AutoValue_ServerInfo to enable thread-safe initialization of WrappedSdkHarnessClient.
  • Allow multiple sets of grpc servers for each payload of the same URN so that we can use multiple docker images of different URL per language.

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
Build Status --- --- ---

@ihji
Copy link
Contributor Author

ihji commented Feb 4, 2019

run java precommit

@ihji
Copy link
Contributor Author

ihji commented Feb 4, 2019

run java postcommit

@ihji
Copy link
Contributor Author

ihji commented Feb 6, 2019

R: @angoenka, @mxm

This modification allows me to run a simple testing multi-language pipeline. Please let me know if you have any concern on this change.

…transforms

Multi-language support in DefaultJobBundleFactory
Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! This is great because it allows us to run multiple Environments of the same type but with different configuration, e.g. Python/Java/Go environments in cross-language pipelines.

A few comments inline.

CC @tweise

@@ -164,7 +164,7 @@ public void createsMultipleEnvironmentOfSingleType() throws Exception {
verify(envFactoryA, Mockito.times(0)).createEnvironment(environmentAA);

bundleFactory.forStage(getExecutableStage(environmentAA));
verify(environmentProviderFactoryA, Mockito.times(1))
verify(environmentProviderFactoryA, Mockito.times(2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem correct. The factory should only be provided once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your feedback! This PR allows to create multiple ServerInfos and their matching EnvironmentFactorys when two environments have same URNs but different payloads (for example, environment(urn: "docker", payload: "beam/python") and environment(urn: "docker", payload: "beam/java")). Is there any reason I didn't know that environments with same URNs should use a single EnvironmentFactory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense for two Environments with the same URN also to share the same EnvironmentFactory. I don't think it is necessary to store the environment factory with the ServerInfo because there will only every be one factory anyway. We don't allow multiple EnvironmentFactories.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw you changed the code to allow different environment types, e.g. Docker and Process-based environment at the same time. I don't think this is necessary because it is sufficient to have a single EnvironmentFactory (i.e. Docker) to run cross-language pipelines.

However, I don't see why we would restrict to only one environment type. So fine with me!

@ihji
Copy link
Contributor Author

ihji commented Feb 6, 2019

run java precommit

@ihji
Copy link
Contributor Author

ihji commented Feb 6, 2019

run java postcommit

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your code and addressing the comments. Looks great!

In addition to being able to construct multiple Environment with the same URN (e.g. Docker Python and Docker Java), you pushed an additional change to allow mixing of environment types, e.g. (Docker Java, Process Python). I think that makes sense for some situations and there is no reason restricting environment types.

If there are no further comments, I'd merge this later today.

@tweise
Copy link
Contributor

tweise commented Feb 8, 2019

Thanks, but I believe this still isn't on par with previous behavior and is causing noise in the logs.

https://builds.apache.org/job/beam_PreCommit_Portable_Python_Commit/1433/
https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/430/

@ihji
Copy link
Contributor Author

ihji commented Feb 9, 2019

@tweise What do you exactly mean by noise? If you're saying "call already cancelled" exceptions, I guess there are other reasons causing this issue. I found few logs from previous PRs that showing the same error message.

I'll look into it but maybe it's better to create a new ticket.

@mxm: Any comment? I know you're the original author of close() method 😄

@tweise
Copy link
Contributor

tweise commented Feb 9, 2019

Run Portable_Python PreCommit

1 similar comment
@tweise
Copy link
Contributor

tweise commented Feb 9, 2019

Run Portable_Python PreCommit

@tweise tweise merged commit c50e3fc into apache:master Feb 9, 2019
@tweise
Copy link
Contributor

tweise commented Feb 9, 2019

@ihji after repeating Run Portable_Python PreCommit I see that the exception does not appear always and cannot say for sure if the changes in this PR change the status quo or not. You can for reference also check the cron executions of the job, where in the few cases I checked I did not see it: https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/

It would be great if you could create a new ticket and take a look at eliminating these exceptions (listed below). Thanks!

During job termination:

14:53:22 [grpc-default-executor-1] WARN org.apache.beam.runners.fnexecution.logging.GrpcLoggingService - Logging client failed unexpectedly.
14:53:22 org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
14:53:22 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asRuntimeException(Status.java:517)
14:53:22 	at 

During job submission

14:52:56 [main] INFO org.apache.beam.runners.flink.FlinkJobServerDriver - JobService started on localhost:58179
14:52:57 [grpc-default-executor-0] ERROR org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - Encountered Unexpected Exception for Invocation job_bfb7df0e-408e-4bfd-bb3c-432e946ca819
14:52:57 org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND
14:52:57 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534)
14:52:57 	at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341)
14:52:57 	at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262)
14:52:57 	at org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:693)
14:52:57 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
14:52:57 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
14:52:57 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
14:52:57 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
14:52:57 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
14:52:57 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
14:52:57 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
14:52:57 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
14:52:57 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
14:52:57 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
14:52:57 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

The following seems to appear consistently in the PVR tests:
https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/

14:54:50 Feb 07, 2019 10:54:50 PM org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
14:54:50 SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=628, target=localhost:41409} was not shutdown properly!!! ~*~*~*
14:54:50     Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
14:54:50 java.lang.RuntimeException: ManagedChannel allocation site
14:54:50 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:103)
14:54:50 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
14:54:50 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
14:54:50 	at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:410)
14:54:50 	at org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:44)
14:54:50 	at org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:108)
14:54:50 	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:154)
14:54:50 	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:137)
14:54:50 	at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
14:54:50 	at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
14:54:50 	at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
14:54:50 	at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
14:54:50 	at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
14:54:50 	at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
14:54:50 	at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
14:54:50 	at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4992)
14:54:50 	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:162)

@mxm
Copy link
Contributor

mxm commented Feb 11, 2019

Could we simply restore the original order of the close calls? (edit: saw you already did that)

IMHO the last two errors above are unrelated. The first one originates originates from the client opening the connection to the JobServer too early. The second one is a problem with the embedded SDK harness which does not clean up GRPC channels correctly.

@mxm
Copy link
Contributor

mxm commented Feb 11, 2019

I agree that we should track above issues and fix them.

@ihji ihji deleted the BEAM-6488 branch February 11, 2019 19:40
@ihji
Copy link
Contributor Author

ihji commented Feb 12, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants