diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala index a0f93dd5dcb35..bf07aa0437fec 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala @@ -18,22 +18,15 @@ package org.apache.flink.api.scala.operators.translation -import org.apache.flink.api.java.io.DiscardingOutputFormat -import org.apache.flink.optimizer.util.CompilerTestBase -import org.junit.Assert._ -import org.junit.Test -import org.apache.flink.api.common.functions.Partitioner -import org.apache.flink.api.scala._ -import org.apache.flink.runtime.operators.shipping.ShipStrategyType -import org.apache.flink.optimizer.plan.SingleInputPlanNode import org.apache.flink.api.common.operators.Order -import org.apache.flink.api.common.InvalidProgramException -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint -import org.apache.flink.optimizer.plan.DualInputPlanNode import org.apache.flink.api.common.operators.base.CoGroupOperatorBase -import org.junit.Ignore +import org.apache.flink.api.java.io.DiscardingOutputFormat +import org.apache.flink.api.scala._ +import org.apache.flink.util.{Collector, TestLogger} +import org.junit.Assert._ +import org.junit.{Ignore, Test} -class CoGroupGroupSortTranslationTest { +class CoGroupGroupSortTranslationTest extends TestLogger { @Test def testGroupSortTuples() { @@ -131,11 +124,12 @@ class CoGroupGroupSortTranslationTest { val input2 = env.fromElements( (0L, 0L, 0L) ) input1 - .coGroup(input2) - .where(1).equalTo(2) - .sortFirstGroup(0, Order.DESCENDING) - .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING) - .print() + .coGroup(input2) + .where(1).equalTo(2) + .sortFirstGroup(0, Order.DESCENDING) + .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING) + .apply((a, b, c: Collector[(Long, Long)]) => a.foreach(e => c.collect(e))) + .output(new DiscardingOutputFormat[(Long, Long)]) val p = env.createProgramPlan()