Skip to content

Commit

Permalink
[FLINK-1123] Add first-n to Scala API
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Sep 29, 2014
1 parent 01e74da commit e2c0b9d
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package org.apache.flink.api.scala

import org.apache.commons.lang3.Validate
import org.apache.flink.api.common.InvalidProgramException
import org.apache.flink.api.common.aggregators.Aggregator
import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat}
import org.apache.flink.api.java.operators.JoinOperator.JoinHint
import org.apache.flink.api.java.operators.Keys.FieldPositionKeys
Expand Down Expand Up @@ -493,6 +494,17 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) {
wrap(new GroupReduceOperator[T, R](set, implicitly[TypeInformation[R]], reducer))
}

/**
* Creates a new DataSet containing the first `n` elements of this DataSet.
*/
def first(n: Int): DataSet[T] = {
if (n < 1) {
throw new InvalidProgramException("Parameter n of first(n) must be at least 1.")
}
// Normally reduceGroup expects implicit parameters, supply them manually here.
reduceGroup(new FirstReducer[T](n))(set.getType, implicitly[ClassTag[T]])
}

// --------------------------------------------------------------------------------------------
// distinct
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.api.scala

import org.apache.flink.api.common.InvalidProgramException
import org.apache.flink.api.java.functions.FirstReducer
import org.apache.flink.api.scala.operators.ScalaAggregateOperator

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -140,6 +141,11 @@ trait GroupedDataSet[T] {
* concatenation of the emitted values will form the resulting [[DataSet]].
*/
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]

/**
* Creates a new DataSet containing the first `n` elements of each group of this DataSet.
*/
def first(n: Int): DataSet[T]
}

/**
Expand Down Expand Up @@ -274,4 +280,12 @@ private[flink] class GroupedDataSetImpl[T: ClassTag](
new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
implicitly[TypeInformation[R]], reducer))
}

def first(n: Int): DataSet[T] = {
if (n < 1) {
throw new InvalidProgramException("Parameter n of first(n) must be at least 1.")
}
// Normally reduceGroup expects implicit parameters, supply them manually here.
reduceGroup(new FirstReducer[T](n))(set.getType, implicitly[ClassTag[T]])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala.operators

import org.apache.flink.api.common.InvalidProgramException
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import org.junit.{Assert, Test}

import org.apache.flink.api.scala._

class FirstNOperatorTest {

private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()

@Test
def testUngroupedFirstN(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment

val tupleDs = env.fromCollection(emptyTupleData)

try {
tupleDs.first(1)
}
catch {
case e: Exception => {
Assert.fail()
}
}
try {
tupleDs.first(10)
}
catch {
case e: Exception => {
Assert.fail()
}
}
try {
tupleDs.first(0)
Assert.fail()
}
catch {
case ipe: InvalidProgramException => {
}
case e: Exception => {
Assert.fail()
}
}
try {
tupleDs.first(-1)
Assert.fail()
}
catch {
case ipe: InvalidProgramException => {
}
case e: Exception => {
Assert.fail()
}
}
}

@Test
def testGroupedFirstN(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tupleDs = env.fromCollection(emptyTupleData)

try {
tupleDs.groupBy(2).first(1)
}
catch {
case e: Exception => {
Assert.fail()
}
}
try {
tupleDs.groupBy(1, 3).first(10)
}
catch {
case e: Exception => {
Assert.fail()
}
}
try {
tupleDs.groupBy(0).first(0)
Assert.fail()
}
catch {
case ipe: InvalidProgramException => {
}
case e: Exception => {
Assert.fail()
}
}
try {
tupleDs.groupBy(2).first(-1)
Assert.fail()
}
catch {
case ipe: InvalidProgramException => {
}
case e: Exception => {
Assert.fail()
}
}
}

@Test
def testGroupedSortedFirstN(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tupleDs = env.fromCollection(emptyTupleData)

try {
tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(1)
}
catch {
case e: Exception => {
Assert.fail()
}
}
try {
tupleDs.groupBy(1, 3).sortGroup(4, Order.ASCENDING).first(10)
}
catch {
case e: Exception => {
Assert.fail()
}
}
try {
tupleDs.groupBy(0).sortGroup(4, Order.ASCENDING).first(0)
Assert.fail()
}
catch {
case ipe: InvalidProgramException => {
}
case e: Exception => {
Assert.fail()
}
}
try {
tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(-1)
Assert.fail()
}
catch {
case ipe: InvalidProgramException => {
}
case e: Exception => {
Assert.fail()
}
}
}

}

0 comments on commit e2c0b9d

Please sign in to comment.