Skip to content

Commit

Permalink
Merge pull request apache#12000 from BEAM-10221: Add in example tests…
Browse files Browse the repository at this point in the history
… cases of Kotlin cookbook

BEAM-10221: Add in example tests cases of Kotlin cookbook
  • Loading branch information
pabloem committed Jun 15, 2020
2 parents a0e0706 + 1e7220f commit 34dff56
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.kotlin.cookbook

import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.testing.PAssert
import org.apache.beam.sdk.testing.TestPipeline
import org.apache.beam.sdk.testing.ValidatesRunner
import org.apache.beam.sdk.transforms.Create
import org.apache.beam.sdk.transforms.Distinct
import org.junit.Rule
import org.junit.Test
import org.junit.experimental.categories.Category
import org.junit.runner.RunWith
import org.junit.runners.JUnit4

/** Unit tests for [Distinct]. */
@RunWith(JUnit4::class)
class DistinctExampleTest {

private val pipeline: TestPipeline = TestPipeline.create()

@Rule
fun pipeline(): TestPipeline = pipeline

@Test
@Category(ValidatesRunner::class)
fun testDistinct() {
val strings = listOf("k1", "k5", "k5", "k2", "k1", "k2", "k3")
val input = pipeline.apply(Create.of(strings).withCoder(StringUtf8Coder.of()))
val output = input.apply(Distinct.create())
PAssert.that(output).containsInAnyOrder("k1", "k5", "k2", "k3")
pipeline.run().waitUntilFinish()
}

@Test
@Category(ValidatesRunner::class)
fun testDistinctEmpty() {
val strings = listOf<String>()
val input = pipeline.apply(Create.of(strings).withCoder(StringUtf8Coder.of()))
val output = input.apply(Distinct.create())
PAssert.that(output).empty()
pipeline.run().waitUntilFinish()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.kotlin.cookbook

import com.google.api.services.bigquery.model.TableRow
import org.apache.beam.examples.kotlin.cookbook.FilterExamples.FilterSingleMonthDataFn
import org.apache.beam.examples.kotlin.cookbook.FilterExamples.ProjectionFn
import org.apache.beam.sdk.testing.PAssert
import org.apache.beam.sdk.testing.TestPipeline
import org.apache.beam.sdk.testing.ValidatesRunner
import org.apache.beam.sdk.transforms.Create
import org.apache.beam.sdk.transforms.ParDo
import org.junit.Rule
import org.junit.Test
import org.junit.experimental.categories.Category
import org.junit.runner.RunWith
import org.junit.runners.JUnit4

/** Unit tests for [FilterExamples]. */
@RunWith(JUnit4::class)
class FilterExamplesTest {

private val row1 = TableRow()
.set("month", "6")
.set("day", "21")
.set("year", "2014")
.set("mean_temp", "85.3")
.set("tornado", true)
private val row2 = TableRow()
.set("month", "7")
.set("day", "20")
.set("year", "2014")
.set("mean_temp", "75.4")
.set("tornado", false)
private val row3 = TableRow()
.set("month", "6")
.set("day", "18")
.set("year", "2014")
.set("mean_temp", "45.3")
.set("tornado", true)

private val outRow1 = TableRow().set("year", 2014).set("month", 6).set("day", 21).set("mean_temp", 85.3)
private val outRow2 = TableRow().set("year", 2014).set("month", 7).set("day", 20).set("mean_temp", 75.4)
private val outRow3 = TableRow().set("year", 2014).set("month", 6).set("day", 18).set("mean_temp", 45.3)

private val pipeline: TestPipeline = TestPipeline.create()

@Rule
fun pipeline(): TestPipeline = pipeline

@Test
@Category(ValidatesRunner::class)
fun testProjectionFn() {
val input = pipeline.apply(Create.of(row1, row2, row3))
val results = input.apply(ParDo.of(ProjectionFn()))
PAssert.that(results).containsInAnyOrder(outRow1, outRow2, outRow3)
pipeline.run().waitUntilFinish()
}

@Test
@Category(ValidatesRunner::class)
fun testFilterSingleMonthDataFn() {
val input = pipeline.apply(Create.of(outRow1, outRow2, outRow3))
val results = input.apply(ParDo.of(FilterSingleMonthDataFn(7)))
PAssert.that(results).containsInAnyOrder(outRow2)
pipeline.run().waitUntilFinish()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.kotlin.cookbook

import com.google.api.services.bigquery.model.TableRow
import org.apache.beam.examples.kotlin.cookbook.JoinExamples.ExtractCountryInfoFn
import org.apache.beam.examples.kotlin.cookbook.JoinExamples.ExtractEventDataFn
import org.apache.beam.sdk.testing.PAssert
import org.apache.beam.sdk.testing.TestPipeline
import org.apache.beam.sdk.testing.ValidatesRunner
import org.apache.beam.sdk.transforms.Create
import org.apache.beam.sdk.transforms.ParDo
import org.apache.beam.sdk.values.KV
import org.apache.beam.sdk.values.PCollection
import org.junit.Rule
import org.junit.Test
import org.junit.experimental.categories.Category
import org.junit.runner.RunWith
import org.junit.runners.JUnit4

/** Unit tests for [JoinExamples]. */
@RunWith(JUnit4::class)
class JoinExamplesTest {

companion object {
private val row1 = TableRow()
.set("ActionGeo_CountryCode", "VM")
.set("SQLDATE", "20141212")
.set("Actor1Name", "BANGKOK")
.set("SOURCEURL", "http:https://cnn.com")
private val row2 = TableRow()
.set("ActionGeo_CountryCode", "VM")
.set("SQLDATE", "20141212")
.set("Actor1Name", "LAOS")
.set("SOURCEURL", "http:https://www.chicagotribune.com")
private val row3 = TableRow()
.set("ActionGeo_CountryCode", "BE")
.set("SQLDATE", "20141213")
.set("Actor1Name", "AFGHANISTAN")
.set("SOURCEURL", "http:https://cnn.com")
private val EVENTS = arrayOf(row1, row2, row3)
val EVENT_ARRAY: List<TableRow> = listOf(*EVENTS)
private val PARSED_EVENTS = listOf(
KV.of("VM", "Date: 20141212, Actor1: LAOS, url: http:https://www.chicagotribune.com"),
KV.of("BE", "Date: 20141213, Actor1: AFGHANISTAN, url: http:https://cnn.com"),
KV.of("VM", "Date: 20141212, Actor1: BANGKOK, url: http:https://cnn.com"))
private val PARSED_COUNTRY_CODES = listOf(KV.of("BE", "Belgium"), KV.of("VM", "Vietnam"))
private val cc1 = TableRow().set("FIPSCC", "VM").set("HumanName", "Vietnam")
private val cc2 = TableRow().set("FIPSCC", "BE").set("HumanName", "Belgium")
private val CCS = arrayOf(cc1, cc2)
val CC_ARRAY = listOf(*CCS)
val JOINED_EVENTS = arrayOf(
"Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, "
+ "url: http:https://www.chicagotribune.com",
"Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, "
+ "url: http:https://cnn.com",
"Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, "
+ "url: http:https://cnn.com")
}

private val pipeline: TestPipeline = TestPipeline.create()

@Rule
fun pipeline(): TestPipeline = pipeline

@Test
fun testExtractEventDataFn() {
val output = pipeline.apply(Create.of(EVENT_ARRAY)).apply(ParDo.of(ExtractEventDataFn()))
PAssert.that(output).containsInAnyOrder(PARSED_EVENTS)
pipeline.run().waitUntilFinish()
}

@Test
fun testExtractCountryInfoFn() {
val output = pipeline.apply(Create.of(CC_ARRAY)).apply(ParDo.of(ExtractCountryInfoFn()))
PAssert.that(output).containsInAnyOrder(PARSED_COUNTRY_CODES)
pipeline.run().waitUntilFinish()
}

@Test
@Category(ValidatesRunner::class)
fun testJoin() {
val input1 = pipeline.apply("CreateEvent", Create.of(EVENT_ARRAY))
val input2 = pipeline.apply("CreateCC", Create.of(CC_ARRAY))
val output: PCollection<String> = JoinExamples.joinEvents(input1, input2)
PAssert.that(output).containsInAnyOrder(*JOINED_EVENTS)
pipeline.run().waitUntilFinish()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.kotlin.cookbook

import com.google.api.services.bigquery.model.TableRow
import org.apache.beam.sdk.testing.PAssert
import org.apache.beam.sdk.testing.TestPipeline
import org.apache.beam.sdk.testing.ValidatesRunner
import org.apache.beam.sdk.transforms.Create
import org.apache.beam.sdk.transforms.ParDo
import org.apache.beam.sdk.values.KV
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList
import org.junit.Rule
import org.junit.Test
import org.junit.experimental.categories.Category
import org.junit.runner.RunWith
import org.junit.runners.JUnit4

/** Unit tests for [MaxPerKeyExamples]. */
@RunWith(JUnit4::class)
class MaxPerKeyExamplesTest {

private val row1 = TableRow()
.set("month", "6")
.set("day", "21")
.set("year", "2014")
.set("mean_temp", "85.3")
.set("tornado", true)
private val row2 = TableRow()
.set("month", "7")
.set("day", "20")
.set("year", "2014")
.set("mean_temp", "75.4")
.set("tornado", false)
private val row3 = TableRow()
.set("month", "6")
.set("day", "18")
.set("year", "2014")
.set("mean_temp", "45.3")
.set("tornado", true)

private val testRows: List<TableRow> = listOf(row1, row2, row3)
private val kv1 = KV.of(6, 85.3)
private val kv2 = KV.of(6, 45.3)
private val kv3 = KV.of(7, 75.4)
private val testKvs: List<KV<Int, Double>> = listOf(kv1, kv2, kv3)
private val resultRow1 = TableRow().set("month", 6).set("max_mean_temp", 85.3)
private val resultRow2 = TableRow().set("month", 6).set("max_mean_temp", 45.3)
private val resultRow3 = TableRow().set("month", 7).set("max_mean_temp", 75.4)

private val pipeline: TestPipeline = TestPipeline.create()

@Rule
fun pipeline(): TestPipeline = pipeline

@Test
@Category(ValidatesRunner::class)
fun testExtractTempFn() {
val results = pipeline.apply(Create.of(testRows)).apply(ParDo.of<TableRow, KV<Int, Double>>(MaxPerKeyExamples.ExtractTempFn()))
PAssert.that(results).containsInAnyOrder(ImmutableList.of(kv1, kv2, kv3))
pipeline.run().waitUntilFinish()
}

@Test
@Category(ValidatesRunner::class)
fun testFormatMaxesFn() {
val results = pipeline.apply(Create.of(testKvs)).apply(ParDo.of<KV<Int, Double>, TableRow>(MaxPerKeyExamples.FormatMaxesFn()))
PAssert.that(results).containsInAnyOrder(resultRow1, resultRow2, resultRow3)
pipeline.run().waitUntilFinish()
}
}

0 comments on commit 34dff56

Please sign in to comment.