Skip to content

Commit

Permalink
[FLINK-12929] Pass TypeInformation in addSource
Browse files Browse the repository at this point in the history
Co-authored-by: Georg Rollinger <[email protected]>
  • Loading branch information
2 people authored and aljoscha committed Jul 10, 2019
1 parent 8acc1d3 commit 7ea00ad
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1453,17 +1453,16 @@ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeI
@SuppressWarnings("unchecked")
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {

if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
}
if (typeInfo == null) {
if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
} else {
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {

val cleanFun = scalaClean(function)
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.addSource(cleanFun).returns(typeInfo))
asScalaStream(javaEnv.addSource(cleanFun, typeInfo))
}

/**
Expand Down

0 comments on commit 7ea00ad

Please sign in to comment.