Skip to content

Commit

Permalink
[FLINK-4673] [core] TypeInfoFactory for Either type
Browse files Browse the repository at this point in the history
Removes from TypeExtractor the explicit parsing for Either and adds an
EitherTypeInfoFactory.

This closes apache#2545.
  • Loading branch information
greghogan authored and twalthr committed Jan 10, 2017
1 parent f11447e commit d4d7cc3
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@

package org.apache.flink.api.java.typeutils;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.types.Either;

import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link TypeInformation} for the {@link Either} type of the Java API.
*
Expand All @@ -43,8 +48,8 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {

@PublicEvolving
public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) {
this.leftType = leftType;
this.rightType = rightType;
this.leftType = checkNotNull(leftType);
this.rightType = checkNotNull(rightType);
}

@Override
Expand Down Expand Up @@ -78,6 +83,15 @@ public Class<Either<L, R>> getTypeClass() {
return (Class<Either<L, R>>) (Class<?>) Either.class;
}

@Override
@PublicEvolving
public Map<String, TypeInformation<?>> getGenericParameters() {
Map<String, TypeInformation<?>> m = new HashMap<>();
m.put("L", this.leftType);
m.put("R", this.rightType);
return m;
}

@Override
@PublicEvolving
public boolean isKeyType() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils;

import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Either;

import java.lang.reflect.Type;
import java.util.Map;

public class EitherTypeInfoFactory<L, R> extends TypeInfoFactory<Either<L, R>> {

@Override
public TypeInformation<Either<L, R>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
TypeInformation<?> leftType = genericParameters.get("L");
TypeInformation<?> rightType = genericParameters.get("R");

if (leftType == null) {
throw new InvalidTypesException("Type extraction is not possible on Either" +
" type as it does not contain information about the 'left' type.");
}

if (rightType == null) {
throw new InvalidTypesException("Type extraction is not possible on Either" +
" type as it does not contain information about the 'right' type.");
}

return new EitherTypeInfo(leftType, rightType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,11 @@

package org.apache.flink.api.java.typeutils;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;

import java.util.Map;
import org.apache.avro.specific.SpecificRecordBase;

import org.apache.commons.lang3.ClassUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
Expand All @@ -66,19 +49,34 @@
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass;
import org.apache.flink.types.Either;
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass;

/**
* A utility for reflection analysis on classes, to determine the return type of implementations of transformation
Expand Down Expand Up @@ -690,38 +688,6 @@ else if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) {
return new TupleTypeInfo(typeToClass(t), subTypesInfo);

}
// check if type is a subclass of Either
else if (isClassType(t) && Either.class.isAssignableFrom(typeToClass(t))) {
Type curT = t;

// go up the hierarchy until we reach Either (with or without generics)
// collect the types while moving up for a later top-down
while (!(isClassType(curT) && typeToClass(curT).equals(Either.class))) {
typeHierarchy.add(curT);
curT = typeToClass(curT).getGenericSuperclass();
}

// check if Either has generics
if (curT instanceof Class<?>) {
throw new InvalidTypesException("Either needs to be parameterized by using generics.");
}

typeHierarchy.add(curT);

// create the type information for the subtypes
final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
// type needs to be treated a pojo due to additional fields
if (subTypesInfo == null) {
if (t instanceof ParameterizedType) {
return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
}
else {
return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
}
}
// return either info
return (TypeInformation<OUT>) new EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]);
}
// type depends on another type
// e.g. class MyMapper<E> extends MapFunction<String, E>
else if (t instanceof TypeVariable) {
Expand Down Expand Up @@ -947,7 +913,7 @@ else if (fieldType instanceof ParameterizedType || fieldType instanceof GenericA

/**
* Creates the TypeInformation for all elements of a type that expects a certain number of
* subtypes (e.g. TupleXX or Either).
* subtypes (e.g. TupleXX).
*
* @param originalType most concrete subclass
* @param definingType type that defines the number of subtypes (e.g. Tuple2 -> 2 subtypes)
Expand Down Expand Up @@ -1234,29 +1200,6 @@ else if (typeInfo instanceof TupleTypeInfo) {
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
}
}
// check for Either
else if (typeInfo instanceof EitherTypeInfo) {
// check if Either at all
if (!(isClassType(type) && Either.class.isAssignableFrom(typeToClass(type)))) {
throw new InvalidTypesException("Either type expected.");
}

// go up the hierarchy until we reach Either (with or without generics)
while (!(isClassType(type) && typeToClass(type).equals(Either.class))) {
typeHierarchy.add(type);
type = typeToClass(type).getGenericSuperclass();
}

// check if Either has generics
if (type instanceof Class<?>) {
throw new InvalidTypesException("Parameterized Either type expected.");
}

EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, ?>) typeInfo;
Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments();
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType());
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType());
}
// check for primitive array
else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
Type component;
Expand Down Expand Up @@ -1675,11 +1618,6 @@ private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz,
throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class.");
}

// check for subclasses of Either
if (Either.class.isAssignableFrom(clazz)) {
throw new InvalidTypesException("Type information extraction for Either cannot be done based on the class.");
}

// check for Enums
if(Enum.class.isAssignableFrom(clazz)) {
return new EnumTypeInfo(clazz);
Expand Down Expand Up @@ -1956,18 +1894,6 @@ private <X> TypeInformation<X> privateGetForObject(X value) {
}
return new TupleTypeInfo(value.getClass(), infos);
}
// we can not extract the types from an Either object since it only contains type information
// of one type, but from Either classes
else if (value instanceof Either) {
try {
return (TypeInformation<X>) privateCreateTypeInfo(value.getClass());
}
catch (InvalidTypesException e) {
throw new InvalidTypesException("Automatic type extraction is not possible on an Either type "
+ "as it does not contain information about both possible types. "
+ "Please specify the types directly.");
}
}
else {
return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
}
Expand Down
3 changes: 3 additions & 0 deletions flink-core/src/main/java/org/apache/flink/types/Either.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.types;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.java.typeutils.EitherTypeInfoFactory;

/**
* This type represents a value of one two possible types, Left or Right (a
Expand All @@ -30,6 +32,7 @@
* the type of Right
*/
@Public
@TypeInfo(EitherTypeInfoFactory.class)
public abstract class Either<L, R> {

/**
Expand Down

0 comments on commit d4d7cc3

Please sign in to comment.