diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/DistinctExampleTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/DistinctExampleTest.kt new file mode 100644 index 0000000000000..56702a3a17467 --- /dev/null +++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/DistinctExampleTest.kt @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.kotlin.cookbook + +import org.apache.beam.sdk.coders.StringUtf8Coder +import org.apache.beam.sdk.testing.PAssert +import org.apache.beam.sdk.testing.TestPipeline +import org.apache.beam.sdk.testing.ValidatesRunner +import org.apache.beam.sdk.transforms.Create +import org.apache.beam.sdk.transforms.Distinct +import org.junit.Rule +import org.junit.Test +import org.junit.experimental.categories.Category +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 + +/** Unit tests for [Distinct]. */ +@RunWith(JUnit4::class) +class DistinctExampleTest { + + private val pipeline: TestPipeline = TestPipeline.create() + + @Rule + fun pipeline(): TestPipeline = pipeline + + @Test + @Category(ValidatesRunner::class) + fun testDistinct() { + val strings = listOf("k1", "k5", "k5", "k2", "k1", "k2", "k3") + val input = pipeline.apply(Create.of(strings).withCoder(StringUtf8Coder.of())) + val output = input.apply(Distinct.create()) + PAssert.that(output).containsInAnyOrder("k1", "k5", "k2", "k3") + pipeline.run().waitUntilFinish() + } + + @Test + @Category(ValidatesRunner::class) + fun testDistinctEmpty() { + val strings = listOf() + val input = pipeline.apply(Create.of(strings).withCoder(StringUtf8Coder.of())) + val output = input.apply(Distinct.create()) + PAssert.that(output).empty() + pipeline.run().waitUntilFinish() + } +} \ No newline at end of file diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/FilterExamplesTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/FilterExamplesTest.kt new file mode 100644 index 0000000000000..d5cb544a7606d --- /dev/null +++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/FilterExamplesTest.kt @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.kotlin.cookbook + +import com.google.api.services.bigquery.model.TableRow +import org.apache.beam.examples.kotlin.cookbook.FilterExamples.FilterSingleMonthDataFn +import org.apache.beam.examples.kotlin.cookbook.FilterExamples.ProjectionFn +import org.apache.beam.sdk.testing.PAssert +import org.apache.beam.sdk.testing.TestPipeline +import org.apache.beam.sdk.testing.ValidatesRunner +import org.apache.beam.sdk.transforms.Create +import org.apache.beam.sdk.transforms.ParDo +import org.junit.Rule +import org.junit.Test +import org.junit.experimental.categories.Category +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 + +/** Unit tests for [FilterExamples]. */ +@RunWith(JUnit4::class) +class FilterExamplesTest { + + private val row1 = TableRow() + .set("month", "6") + .set("day", "21") + .set("year", "2014") + .set("mean_temp", "85.3") + .set("tornado", true) + private val row2 = TableRow() + .set("month", "7") + .set("day", "20") + .set("year", "2014") + .set("mean_temp", "75.4") + .set("tornado", false) + private val row3 = TableRow() + .set("month", "6") + .set("day", "18") + .set("year", "2014") + .set("mean_temp", "45.3") + .set("tornado", true) + + private val outRow1 = TableRow().set("year", 2014).set("month", 6).set("day", 21).set("mean_temp", 85.3) + private val outRow2 = TableRow().set("year", 2014).set("month", 7).set("day", 20).set("mean_temp", 75.4) + private val outRow3 = TableRow().set("year", 2014).set("month", 6).set("day", 18).set("mean_temp", 45.3) + + private val pipeline: TestPipeline = TestPipeline.create() + + @Rule + fun pipeline(): TestPipeline = pipeline + + @Test + @Category(ValidatesRunner::class) + fun testProjectionFn() { + val input = pipeline.apply(Create.of(row1, row2, row3)) + val results = input.apply(ParDo.of(ProjectionFn())) + PAssert.that(results).containsInAnyOrder(outRow1, outRow2, outRow3) + pipeline.run().waitUntilFinish() + } + + @Test + @Category(ValidatesRunner::class) + fun testFilterSingleMonthDataFn() { + val input = pipeline.apply(Create.of(outRow1, outRow2, outRow3)) + val results = input.apply(ParDo.of(FilterSingleMonthDataFn(7))) + PAssert.that(results).containsInAnyOrder(outRow2) + pipeline.run().waitUntilFinish() + } +} \ No newline at end of file diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/JoinExamplesTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/JoinExamplesTest.kt new file mode 100644 index 0000000000000..8728a827229b1 --- /dev/null +++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/JoinExamplesTest.kt @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.kotlin.cookbook + +import com.google.api.services.bigquery.model.TableRow +import org.apache.beam.examples.kotlin.cookbook.JoinExamples.ExtractCountryInfoFn +import org.apache.beam.examples.kotlin.cookbook.JoinExamples.ExtractEventDataFn +import org.apache.beam.sdk.testing.PAssert +import org.apache.beam.sdk.testing.TestPipeline +import org.apache.beam.sdk.testing.ValidatesRunner +import org.apache.beam.sdk.transforms.Create +import org.apache.beam.sdk.transforms.ParDo +import org.apache.beam.sdk.values.KV +import org.apache.beam.sdk.values.PCollection +import org.junit.Rule +import org.junit.Test +import org.junit.experimental.categories.Category +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 + +/** Unit tests for [JoinExamples]. */ +@RunWith(JUnit4::class) +class JoinExamplesTest { + + companion object { + private val row1 = TableRow() + .set("ActionGeo_CountryCode", "VM") + .set("SQLDATE", "20141212") + .set("Actor1Name", "BANGKOK") + .set("SOURCEURL", "http://cnn.com") + private val row2 = TableRow() + .set("ActionGeo_CountryCode", "VM") + .set("SQLDATE", "20141212") + .set("Actor1Name", "LAOS") + .set("SOURCEURL", "http://www.chicagotribune.com") + private val row3 = TableRow() + .set("ActionGeo_CountryCode", "BE") + .set("SQLDATE", "20141213") + .set("Actor1Name", "AFGHANISTAN") + .set("SOURCEURL", "http://cnn.com") + private val EVENTS = arrayOf(row1, row2, row3) + val EVENT_ARRAY: List = listOf(*EVENTS) + private val PARSED_EVENTS = listOf( + KV.of("VM", "Date: 20141212, Actor1: LAOS, url: http://www.chicagotribune.com"), + KV.of("BE", "Date: 20141213, Actor1: AFGHANISTAN, url: http://cnn.com"), + KV.of("VM", "Date: 20141212, Actor1: BANGKOK, url: http://cnn.com")) + private val PARSED_COUNTRY_CODES = listOf(KV.of("BE", "Belgium"), KV.of("VM", "Vietnam")) + private val cc1 = TableRow().set("FIPSCC", "VM").set("HumanName", "Vietnam") + private val cc2 = TableRow().set("FIPSCC", "BE").set("HumanName", "Belgium") + private val CCS = arrayOf(cc1, cc2) + val CC_ARRAY = listOf(*CCS) + val JOINED_EVENTS = arrayOf( + "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, " + + "url: http://www.chicagotribune.com", + "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, " + + "url: http://cnn.com", + "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, " + + "url: http://cnn.com") + } + + private val pipeline: TestPipeline = TestPipeline.create() + + @Rule + fun pipeline(): TestPipeline = pipeline + + @Test + fun testExtractEventDataFn() { + val output = pipeline.apply(Create.of(EVENT_ARRAY)).apply(ParDo.of(ExtractEventDataFn())) + PAssert.that(output).containsInAnyOrder(PARSED_EVENTS) + pipeline.run().waitUntilFinish() + } + + @Test + fun testExtractCountryInfoFn() { + val output = pipeline.apply(Create.of(CC_ARRAY)).apply(ParDo.of(ExtractCountryInfoFn())) + PAssert.that(output).containsInAnyOrder(PARSED_COUNTRY_CODES) + pipeline.run().waitUntilFinish() + } + + @Test + @Category(ValidatesRunner::class) + fun testJoin() { + val input1 = pipeline.apply("CreateEvent", Create.of(EVENT_ARRAY)) + val input2 = pipeline.apply("CreateCC", Create.of(CC_ARRAY)) + val output: PCollection = JoinExamples.joinEvents(input1, input2) + PAssert.that(output).containsInAnyOrder(*JOINED_EVENTS) + pipeline.run().waitUntilFinish() + } +} \ No newline at end of file diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamplesTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamplesTest.kt new file mode 100644 index 0000000000000..c9f4ec3e0b05f --- /dev/null +++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamplesTest.kt @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.kotlin.cookbook + +import com.google.api.services.bigquery.model.TableRow +import org.apache.beam.sdk.testing.PAssert +import org.apache.beam.sdk.testing.TestPipeline +import org.apache.beam.sdk.testing.ValidatesRunner +import org.apache.beam.sdk.transforms.Create +import org.apache.beam.sdk.transforms.ParDo +import org.apache.beam.sdk.values.KV +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList +import org.junit.Rule +import org.junit.Test +import org.junit.experimental.categories.Category +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 + +/** Unit tests for [MaxPerKeyExamples]. */ +@RunWith(JUnit4::class) +class MaxPerKeyExamplesTest { + + private val row1 = TableRow() + .set("month", "6") + .set("day", "21") + .set("year", "2014") + .set("mean_temp", "85.3") + .set("tornado", true) + private val row2 = TableRow() + .set("month", "7") + .set("day", "20") + .set("year", "2014") + .set("mean_temp", "75.4") + .set("tornado", false) + private val row3 = TableRow() + .set("month", "6") + .set("day", "18") + .set("year", "2014") + .set("mean_temp", "45.3") + .set("tornado", true) + + private val testRows: List = listOf(row1, row2, row3) + private val kv1 = KV.of(6, 85.3) + private val kv2 = KV.of(6, 45.3) + private val kv3 = KV.of(7, 75.4) + private val testKvs: List> = listOf(kv1, kv2, kv3) + private val resultRow1 = TableRow().set("month", 6).set("max_mean_temp", 85.3) + private val resultRow2 = TableRow().set("month", 6).set("max_mean_temp", 45.3) + private val resultRow3 = TableRow().set("month", 7).set("max_mean_temp", 75.4) + + private val pipeline: TestPipeline = TestPipeline.create() + + @Rule + fun pipeline(): TestPipeline = pipeline + + @Test + @Category(ValidatesRunner::class) + fun testExtractTempFn() { + val results = pipeline.apply(Create.of(testRows)).apply(ParDo.of>(MaxPerKeyExamples.ExtractTempFn())) + PAssert.that(results).containsInAnyOrder(ImmutableList.of(kv1, kv2, kv3)) + pipeline.run().waitUntilFinish() + } + + @Test + @Category(ValidatesRunner::class) + fun testFormatMaxesFn() { + val results = pipeline.apply(Create.of(testKvs)).apply(ParDo.of, TableRow>(MaxPerKeyExamples.FormatMaxesFn())) + PAssert.that(results).containsInAnyOrder(resultRow1, resultRow2, resultRow3) + pipeline.run().waitUntilFinish() + } +} \ No newline at end of file