From 035ed7de3e078204492030dd08bb07be52b70b58 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 17 Sep 2017 20:55:13 +0200 Subject: [PATCH] [FLINK-7629] [scala] Fix RecursiveProductFieldAccessor.fieldType --- .../util/typeutils/FieldAccessor.java | 2 +- .../util/typeutils/FieldAccessorTest.java | 18 ++++++++++++++++++ .../api/scala/CaseClassFieldAccessorTest.scala | 12 ++++++++++++ 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java index 411e4adaefcfe..5b18303a4b6c5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java @@ -352,11 +352,11 @@ static final class RecursiveProductFieldAccessor extends FieldAccessor< checkNotNull(innerAccessor, "innerAccessor must not be null."); this.pos = pos; - this.fieldType = ((TupleTypeInfoBase) typeInfo).getTypeAt(pos); this.serializer = (TupleSerializerBase) typeInfo.createSerializer(config); this.length = this.serializer.getArity(); this.fields = new Object[this.length]; this.innerAccessor = innerAccessor; + this.fieldType = innerAccessor.getFieldType(); } @SuppressWarnings("unchecked") diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java index 2fb7964c2cd53..548ccf5e5590f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java @@ -48,6 +48,7 @@ public void testFlatTuple() { (TupleTypeInfo>) TypeExtractor.getForObject(t); FieldAccessor, String> f0 = FieldAccessorFactory.getAccessor(tpeInfo, "f0", null); + assertEquals(String.class, f0.getFieldType().getTypeClass()); assertEquals("aa", f0.get(t)); assertEquals("aa", t.f0); t = f0.set(t, "b"); @@ -55,6 +56,7 @@ public void testFlatTuple() { assertEquals("b", t.f0); FieldAccessor, Integer> f1 = FieldAccessorFactory.getAccessor(tpeInfo, "f1", null); + assertEquals(Integer.class, f1.getFieldType().getTypeClass()); assertEquals(5, (int) f1.get(t)); assertEquals(5, (int) t.f1); t = f1.set(t, 7); @@ -64,6 +66,7 @@ public void testFlatTuple() { assertEquals("b", t.f0); FieldAccessor, Integer> f1n = FieldAccessorFactory.getAccessor(tpeInfo, 1, null); + assertEquals(Integer.class, f1n.getFieldType().getTypeClass()); assertEquals(7, (int) f1n.get(t)); assertEquals(7, (int) t.f1); t = f1n.set(t, 10); @@ -74,6 +77,7 @@ public void testFlatTuple() { assertEquals("b", t.f0); FieldAccessor, Integer> f1ns = FieldAccessorFactory.getAccessor(tpeInfo, "1", null); + assertEquals(Integer.class, f1ns.getFieldType().getTypeClass()); assertEquals(10, (int) f1ns.get(t)); assertEquals(10, (int) t.f1); t = f1ns.set(t, 11); @@ -85,6 +89,7 @@ public void testFlatTuple() { // This is technically valid (the ".0" is selecting the 0th field of a basic type). FieldAccessor, String> f0f0 = FieldAccessorFactory.getAccessor(tpeInfo, "f0.0", null); + assertEquals(String.class, f0f0.getFieldType().getTypeClass()); assertEquals("b", f0f0.get(t)); assertEquals("b", t.f0); t = f0f0.set(t, "cc"); @@ -110,11 +115,13 @@ public void testTupleInTuple() { FieldAccessor>, String> f0 = FieldAccessorFactory .getAccessor(tpeInfo, "f0", null); + assertEquals(String.class, f0.getFieldType().getTypeClass()); assertEquals("aa", f0.get(t)); assertEquals("aa", t.f0); FieldAccessor>, Double> f1f2 = FieldAccessorFactory .getAccessor(tpeInfo, "f1.f2", null); + assertEquals(Double.class, f1f2.getFieldType().getTypeClass()); assertEquals(2.0, f1f2.get(t), 0); assertEquals(2.0, t.f1.f2, 0); t = f1f2.set(t, 3.0); @@ -125,6 +132,7 @@ public void testTupleInTuple() { FieldAccessor>, Tuple3> f1 = FieldAccessorFactory.getAccessor(tpeInfo, "f1", null); + assertEquals(Tuple3.class, f1.getFieldType().getTypeClass()); assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); t = f1.set(t, Tuple3.of(8, 12L, 4.0)); @@ -135,6 +143,7 @@ public void testTupleInTuple() { FieldAccessor>, Tuple3> f1n = FieldAccessorFactory.getAccessor(tpeInfo, 1, null); + assertEquals(Tuple3.class, f1n.getFieldType().getTypeClass()); assertEquals(Tuple3.of(8, 12L, 4.0), f1n.get(t)); assertEquals(Tuple3.of(8, 12L, 4.0), t.f1); t = f1n.set(t, Tuple3.of(10, 13L, 5.0)); @@ -175,6 +184,7 @@ public void testTupleInPojoInTuple() { (TupleTypeInfo>) TypeExtractor.getForObject(t); FieldAccessor, Long> f1tf1 = FieldAccessorFactory.getAccessor(tpeInfo, "f1.t.f1", null); + assertEquals(Long.class, f1tf1.getFieldType().getTypeClass()); assertEquals(9L, (long) f1tf1.get(t)); assertEquals(9L, (long) t.f1.t.f1); t = f1tf1.set(t, 12L); @@ -182,6 +192,7 @@ public void testTupleInPojoInTuple() { assertEquals(12L, (long) t.f1.t.f1); FieldAccessor, String> f1tf0 = FieldAccessorFactory.getAccessor(tpeInfo, "f1.t.f0", null); + assertEquals(String.class, f1tf0.getFieldType().getTypeClass()); assertEquals("ddd", f1tf0.get(t)); assertEquals("ddd", t.f1.t.f0); t = f1tf0.set(t, "alma"); @@ -190,6 +201,8 @@ public void testTupleInPojoInTuple() { FieldAccessor, Foo> f1 = FieldAccessorFactory.getAccessor(tpeInfo, "f1", null); FieldAccessor, Foo> f1n = FieldAccessorFactory.getAccessor(tpeInfo, 1, null); + assertEquals(Foo.class, f1.getFieldType().getTypeClass()); + assertEquals(Foo.class, f1n.getFieldType().getTypeClass()); assertEquals(Tuple2.of("alma", 12L), f1.get(t).t); assertEquals(Tuple2.of("alma", 12L), f1n.get(t).t); assertEquals(Tuple2.of("alma", 12L), t.f1.t); @@ -261,6 +274,7 @@ public void testPojoInPojo() { PojoTypeInfo tpeInfo = (PojoTypeInfo) TypeInformation.of(Outer.class); FieldAccessor fix = FieldAccessorFactory.getAccessor(tpeInfo, "i.x", null); + assertEquals(Long.class, fix.getFieldType().getTypeClass()); assertEquals(4L, (long) fix.get(o)); assertEquals(4L, o.i.x); o = fix.set(o, 22L); @@ -268,6 +282,7 @@ public void testPojoInPojo() { assertEquals(22L, o.i.x); FieldAccessor fi = FieldAccessorFactory.getAccessor(tpeInfo, "i", null); + assertEquals(Inner.class, fi.getFieldType().getTypeClass()); assertEquals(22L, fi.get(o).x); assertEquals(22L, (long) fix.get(o)); assertEquals(22L, o.i.x); @@ -328,6 +343,7 @@ public void testArrayInPojo() { PojoTypeInfo tpeInfo = (PojoTypeInfo) TypeInformation.of(ArrayInPojo.class); FieldAccessor fix = FieldAccessorFactory.getAccessor(tpeInfo, "arr.1", null); + assertEquals(Integer.class, fix.getFieldType().getTypeClass()); assertEquals(4, (int) fix.get(o)); assertEquals(4L, o.arr[1]); o = fix.set(o, 8); @@ -341,12 +357,14 @@ public void testBasicType() { TypeInformation tpeInfo = BasicTypeInfo.LONG_TYPE_INFO; FieldAccessor f = FieldAccessorFactory.getAccessor(tpeInfo, 0, null); + assertEquals(Long.class, f.getFieldType().getTypeClass()); assertEquals(7L, (long) f.get(x)); x = f.set(x, 12L); assertEquals(12L, (long) f.get(x)); assertEquals(12L, (long) x); FieldAccessor f2 = FieldAccessorFactory.getAccessor(tpeInfo, "*", null); + assertEquals(Long.class, f2.getFieldType().getTypeClass()); assertEquals(12L, (long) f2.get(x)); x = f2.set(x, 14L); assertEquals(14L, (long) f2.get(x)); diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CaseClassFieldAccessorTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CaseClassFieldAccessorTest.scala index 9a7b4952cc7dd..028249b881d99 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CaseClassFieldAccessorTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CaseClassFieldAccessorTest.scala @@ -40,6 +40,9 @@ class CaseClassFieldAccessorTest extends TestLogger with JUnitSuiteLike { val accessor1 = FieldAccessorFactory.getAccessor[IntBoolean, Int](tpeInfo, "foo", null) val accessor2 = FieldAccessorFactory.getAccessor[IntBoolean, Boolean](tpeInfo, "bar", null) + assert(accessor1.getFieldType.getTypeClass.getSimpleName == "Integer") + assert(accessor2.getFieldType.getTypeClass.getSimpleName == "Boolean") + val x1 = IntBoolean(5, false) assert(accessor1.get(x1) == 5) assert(accessor2.get(x1) == false) @@ -61,6 +64,9 @@ class CaseClassFieldAccessorTest extends TestLogger with JUnitSuiteLike { val accessor1 = FieldAccessorFactory.getAccessor[IntBoolean, Int](tpeInfo, 0, null) val accessor2 = FieldAccessorFactory.getAccessor[IntBoolean, Boolean](tpeInfo, 1, null) + assert(accessor1.getFieldType.getTypeClass.getSimpleName == "Integer") + assert(accessor2.getFieldType.getTypeClass.getSimpleName == "Boolean") + val x1 = IntBoolean(5, false) assert(accessor1.get(x1) == 5) assert(accessor2.get(x1) == false) @@ -85,6 +91,7 @@ class CaseClassFieldAccessorTest extends TestLogger with JUnitSuiteLike { val cfg = new ExecutionConfig val fib = FieldAccessorFactory.getAccessor[Outer, Boolean](tpeInfo, "i.b", cfg) + assert(fib.getFieldType.getTypeClass.getSimpleName == "Boolean") assert(fib.get(x) == true) assert(x.i.b == true) x = fib.set(x, false) @@ -92,6 +99,7 @@ class CaseClassFieldAccessorTest extends TestLogger with JUnitSuiteLike { assert(x.i.b == false) val fi = FieldAccessorFactory.getAccessor[Outer, FieldAccessorTest.Inner](tpeInfo, "i", cfg) + assert(fi.getFieldType.getTypeClass.getSimpleName == "Inner") assert(fi.get(x).x == 3L) assert(x.i.x == 3L) x = fi.set(x, new FieldAccessorTest.Inner(4L, true)) @@ -99,6 +107,7 @@ class CaseClassFieldAccessorTest extends TestLogger with JUnitSuiteLike { assert(x.i.x == 4L) val fin = FieldAccessorFactory.getAccessor[Outer, FieldAccessorTest.Inner](tpeInfo, 1, cfg) + assert(fin.getFieldType.getTypeClass.getSimpleName == "Inner") assert(fin.get(x).x == 4L) assert(x.i.x == 4L) x = fin.set(x, new FieldAccessorTest.Inner(5L, true)) @@ -111,6 +120,7 @@ class CaseClassFieldAccessorTest extends TestLogger with JUnitSuiteLike { val tpeInfo = createTypeInformation[(Int, Long)] var x = (5, 6L) val f0 = FieldAccessorFactory.getAccessor[(Int, Long), Int](tpeInfo, 0, null) + assert(f0.getFieldType.getTypeClass.getSimpleName == "Integer") assert(f0.get(x) == 5) x = f0.set(x, 8) assert(f0.get(x) == 8) @@ -125,6 +135,7 @@ class CaseClassFieldAccessorTest extends TestLogger with JUnitSuiteLike { val fib = FieldAccessorFactory .getAccessor[OuterCaseClassWithInner, String](tpeInfo, "i.b", null) + assert(fib.getFieldType.getTypeClass.getSimpleName == "String") assert(fib.get(x) == "alma") assert(x.i.b == "alma") x = fib.set(x, "korte") @@ -133,6 +144,7 @@ class CaseClassFieldAccessorTest extends TestLogger with JUnitSuiteLike { val fi = FieldAccessorFactory .getAccessor[OuterCaseClassWithInner, InnerCaseClass](tpeInfo, "i", null) + assert(fi.getFieldType.getTypeClass == classOf[InnerCaseClass]) assert(fi.get(x) == InnerCaseClass(2, "korte")) x = fi.set(x, InnerCaseClass(3, "aaa")) assert(x.i == InnerCaseClass(3, "aaa"))