From 7ea00ad9004f920e0c4c1b6ed906e14418951272 Mon Sep 17 00:00:00 2001 From: Fabio Lombardelli Date: Thu, 27 Jun 2019 10:45:49 +0200 Subject: [PATCH] [FLINK-12929] Pass TypeInformation in addSource Co-authored-by: Georg Rollinger --- .../StreamExecutionEnvironment.java | 19 +++++++++---------- .../scala/StreamExecutionEnvironment.scala | 2 +- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 47a803245783c..619de9c6e66d7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1453,17 +1453,16 @@ public DataStreamSource addSource(SourceFunction function, TypeI @SuppressWarnings("unchecked") public DataStreamSource addSource(SourceFunction function, String sourceName, TypeInformation typeInfo) { + if (function instanceof ResultTypeQueryable) { + typeInfo = ((ResultTypeQueryable) function).getProducedType(); + } if (typeInfo == null) { - if (function instanceof ResultTypeQueryable) { - typeInfo = ((ResultTypeQueryable) function).getProducedType(); - } else { - try { - typeInfo = TypeExtractor.createTypeInfo( - SourceFunction.class, - function.getClass(), 0, null, null); - } catch (final InvalidTypesException e) { - typeInfo = (TypeInformation) new MissingTypeInfo(sourceName, e); - } + try { + typeInfo = TypeExtractor.createTypeInfo( + SourceFunction.class, + function.getClass(), 0, null, null); + } catch (final InvalidTypesException e) { + typeInfo = (TypeInformation) new MissingTypeInfo(sourceName, e); } } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 71eee2b2c43bb..be6a62654c5ea 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -615,7 +615,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { val cleanFun = scalaClean(function) val typeInfo = implicitly[TypeInformation[T]] - asScalaStream(javaEnv.addSource(cleanFun).returns(typeInfo)) + asScalaStream(javaEnv.addSource(cleanFun, typeInfo)) } /**