Skip to content

Commit

Permalink
[FLINK-25128][table] Reorganize table modules and introduce flink-tab…
Browse files Browse the repository at this point in the history
…le-planner-loader

- [table-planner] PlannerBase is now using the class' classloader in order to load the ParserFactory. This makes sure that it will try to load first from its own classpath, and then from the parent classpath.
- [table] Removed flink-table-uber and replaced with flink-table-api-java-uber to ship only api related packages in a single uber jar
- [table-runtime] Now table-runtime ships janino and code splitter.
- [table-planner] Introduce table-planner loader bundle, a variant of the table-planner uber jar that includes scala, in order to be used by flink-table-planner-loader
- [table-planner-loader] Introduce flink-table-planner-loader
- [java-ci-tools] Add flink-table-planner-loader in the excluded modules
- [dist] Rework distribution to include table-api-java-uber, table-runtime, table-planner-loader and cep in lib, while table-planner_${scala.version} in opt.
- [sql-client] Rework SQL Client dependencies and remove scala suffix.
- [examples][e2e] Use planner-loader wherever is possible in tests and examples
- [table] Add README documenting the various modules
- [table-planner-loader] Allow access to javassist from parent classpath
- [table-planner] Make sure the object mapper is using the correct classloader
  • Loading branch information
slinkydeveloper authored and twalthr committed Dec 30, 2021
1 parent 3e93060 commit 749bb77
Show file tree
Hide file tree
Showing 37 changed files with 1,026 additions and 348 deletions.
6 changes: 3 additions & 3 deletions flink-architecture-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client_${scala.binary.version}</artifactId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand Down Expand Up @@ -102,7 +102,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<artifactId>flink-table-runtime</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -116,7 +116,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<artifactId>flink-sql-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ org.apache.flink.api.java.operators.IterativeDataSet.getAggregators(): Returned
org.apache.flink.api.java.operators.ReduceOperator.setCombineHint(org.apache.flink.api.common.operators.base.ReduceOperatorBase$CombineHint): Argument leaf type org.apache.flink.api.common.operators.base.ReduceOperatorBase$CombineHint does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.api.java.typeutils.PojoTypeInfo.getPojoFieldAt(int): Returned leaf type org.apache.flink.api.java.typeutils.PojoField does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(org.apache.flink.api.common.ExecutionConfig): Returned leaf type org.apache.flink.api.java.typeutils.runtime.TupleSerializer does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.cep.functions.PatternProcessFunction.processMatch(java.util.Map, org.apache.flink.cep.functions.PatternProcessFunction$Context, org.apache.flink.util.Collector): Argument leaf type org.apache.flink.cep.functions.PatternProcessFunction$Context does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.cep.functions.TimedOutPartialMatchHandler.processTimedOutMatch(java.util.Map, org.apache.flink.cep.functions.PatternProcessFunction$Context): Argument leaf type org.apache.flink.cep.functions.PatternProcessFunction$Context does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.cep.pattern.conditions.IterativeCondition.filter(java.lang.Object, org.apache.flink.cep.pattern.conditions.IterativeCondition$Context): Argument leaf type org.apache.flink.cep.pattern.conditions.IterativeCondition$Context does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamContextEnvironment.execute(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, boolean): Argument leaf type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,4 @@ org.apache.flink.streaming.api.windowing.triggers.TriggerResult does not satisfy
org.apache.flink.streaming.api.windowing.windows.GlobalWindow$Serializer does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.windowing.windows.GlobalWindow$Serializer$GlobalWindowSerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@ Constructor <org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.<in
Constructor <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.<init>(org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.runtime.execution.Environment, java.util.Map)> calls method <org.apache.flink.streaming.api.operators.AbstractStreamOperator.getProcessingTimeService()> in (StreamingRuntimeContext.java:85)
Constructor <org.apache.flink.streaming.runtime.io.StreamTaskExternallyInducedSourceInput.<init>(org.apache.flink.streaming.api.operators.SourceOperator, java.util.function.Consumer, int, int)> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getSourceReader()> in (StreamTaskExternallyInducedSourceInput.java:39)
Method <org.apache.flink.api.java.typeutils.runtime.TupleSerializerSnapshot.getNestedSerializers(org.apache.flink.api.java.typeutils.runtime.TupleSerializer)> calls method <org.apache.flink.api.java.typeutils.runtime.TupleSerializer.getFieldSerializers()> in (TupleSerializerSnapshot.java:70)
Method <org.apache.flink.cep.nfa.sharedbuffer.LockableTypeSerializerSnapshot.getNestedSerializers(org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer)> calls method <org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.getElementSerializer()> in (LockableTypeSerializerSnapshot.java:60)
Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getNumAliveFetchers()> calls method <org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.getNumAliveFetchers()> in (KafkaSourceReader.java:193)
Method <org.apache.flink.orc.nohive.shim.OrcNoHiveShim.createRecordReader(org.apache.hadoop.conf.Configuration, org.apache.orc.TypeDescription, [I, java.util.List, org.apache.flink.core.fs.Path, long, long)> calls method <org.apache.flink.orc.shim.OrcShimV200.getOffsetAndLengthForSplit(long, long, java.util.List)> in (OrcNoHiveShim.java:62)
Method <org.apache.flink.runtime.blob.BlobInputStream.read()> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (BlobInputStream.java:127)
Method <org.apache.flink.runtime.blob.BlobInputStream.read([B, int, int)> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (BlobInputStream.java:163)
Method <org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(java.io.InputStream, java.security.MessageDigest, org.apache.flink.runtime.blob.BlobKey$BlobType)> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (BlobOutputStream.java:155)
Method <org.apache.flink.runtime.blob.BlobServerConnection.get(java.io.InputStream, java.io.OutputStream, [B)> calls method <org.apache.flink.runtime.blob.BlobServer.getStorageLocation(org.apache.flink.api.common.JobID, org.apache.flink.runtime.blob.BlobKey)> in (BlobServerConnection.java:200)
Method <org.apache.flink.runtime.blob.FileSystemBlobStore.get(java.lang.String, java.io.File, org.apache.flink.runtime.blob.BlobKey)> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (FileSystemBlobStore.java:122)
Method <org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getTriggerRequestQueue()> calls method <org.apache.flink.runtime.checkpoint.CheckpointRequestDecider.getTriggerRequestQueue()> in (CheckpointCoordinator.java:1735)
Method <org.apache.flink.runtime.executiongraph.Execution.finishPartitionsAndUpdateConsumers()> calls method <org.apache.flink.runtime.executiongraph.ExecutionVertex.finishAllBlockingPartitions()> in (Execution.java:974)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.grantDispatcherLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.grantLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:67)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.grantJobMasterLeadership(org.apache.flink.api.common.JobID)> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.grantLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:79)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.grantResourceManagerLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.grantLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:93)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.revokeDispatcherLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.revokeLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:61)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.revokeJobMasterLeadership(org.apache.flink.api.common.JobID)> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.revokeLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:73)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.revokeResourceManagerLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.revokeLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:86)
Method <org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getTriggerRequestQueue()> calls method <org.apache.flink.runtime.checkpoint.CheckpointRequestDecider.getTriggerRequestQueue()> in (CheckpointCoordinator.java:1737)
Method <org.apache.flink.runtime.executiongraph.Execution.finishPartitionsAndUpdateConsumers()> calls method <org.apache.flink.runtime.executiongraph.ExecutionVertex.finishAllBlockingPartitions()> in (Execution.java:983)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.grantDispatcherLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.grantLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:78)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.grantJobMasterLeadership(org.apache.flink.api.common.JobID)> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.grantLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:90)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.grantResourceManagerLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.grantLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:104)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.revokeDispatcherLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.revokeLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:72)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.revokeJobMasterLeadership(org.apache.flink.api.common.JobID)> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.revokeLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:84)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.revokeResourceManagerLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.revokeLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:97)
Method <org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl.lambda$listDataSets$10(java.util.Map$Entry)> calls method <org.apache.flink.runtime.io.network.partition.DataSetMetaInfo.withNumRegisteredPartitions(int, int)> in (ResourceManagerPartitionTrackerImpl.java:269)
Method <org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNumberOfAvailableBuffers()> calls method <org.apache.flink.runtime.io.network.partition.consumer.BufferManager.getNumberOfAvailableBuffers()> in (RemoteInputChannel.java:336)
Method <org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNumberOfRequiredBuffers()> calls method <org.apache.flink.runtime.io.network.partition.consumer.BufferManager.unsynchronizedGetNumberOfRequiredBuffers()> in (RemoteInputChannel.java:341)
Expand All @@ -31,11 +32,11 @@ Method <org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlot
Method <org.apache.flink.streaming.api.datastream.WindowedStream.getAllowedLateness()> calls method <org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder.getAllowedLateness()> in (WindowedStream.java:907)
Method <org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.numEventTimeTimers()> calls method <org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.numEventTimeTimers()> in (InternalTimeServiceManagerImpl.java:249)
Method <org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.numProcessingTimeTimers()> calls method <org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.numProcessingTimeTimers()> in (InternalTimeServiceManagerImpl.java:240)
Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.asClassLoader()> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> in (SourceOperator.java:250)
Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.registerReleaseHookIfAbsent(java.lang.String, java.lang.Runnable)> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> in (SourceOperator.java:256)
Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.asClassLoader()> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> in (SourceOperator.java:252)
Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.registerReleaseHookIfAbsent(java.lang.String, java.lang.Runnable)> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> in (SourceOperator.java:258)
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.getTransactionCoordinatorId()> calls method <org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.getTransactionCoordinatorId()> in (FlinkKafkaProducer.java:1327)
Method <org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init()> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getSourceReader()> in (SourceOperatorStreamTask.java:72)
Method <org.apache.flink.streaming.runtime.tasks.StreamTask.isMailboxLoopRunning()> calls method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.isMailboxLoopRunning()> in (StreamTask.java:801)
Method <org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxStep()> calls method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep()> in (StreamTask.java:796)
Method <org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init()> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getSourceReader()> in (SourceOperatorStreamTask.java:73)
Method <org.apache.flink.streaming.runtime.tasks.StreamTask.isMailboxLoopRunning()> calls method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.isMailboxLoopRunning()> in (StreamTask.java:793)
Method <org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxStep()> calls method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep()> in (StreamTask.java:788)
Method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.isIdle()> calls method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.isDefaultActionAvailable()> in (MailboxExecutorImpl.java:63)
Method <org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer.getTable(org.apache.flink.table.catalog.ObjectPath)> calls method <org.apache.flink.table.catalog.hive.HiveCatalog.getHiveTable(org.apache.flink.table.catalog.ObjectPath)> in (HiveParserDDLSemanticAnalyzer.java:271)
53 changes: 39 additions & 14 deletions flink-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,45 @@ under the License.
<scope>compile</scope>
</dependency>

<!-- Table dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- SQL Client and planner with scala version goes in the /opt folder -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!--
The following dependencies are packaged in 'examples/'
The scope of these dependencies needs to be 'provided' so that
Expand Down Expand Up @@ -320,20 +359,6 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-uber_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api</artifactId>
Expand Down
Loading

0 comments on commit 749bb77

Please sign in to comment.