Skip to content

Commit

Permalink
Add WithKeys Java kata
Browse files Browse the repository at this point in the history
  • Loading branch information
henryken committed Jun 16, 2019
1 parent 521ddee commit 6d9e2a2
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 0 deletions.
83 changes: 83 additions & 0 deletions learning/katas/java/.idea/study_project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.learning.katas.commontransforms.withkeys;

import static org.apache.beam.sdk.values.TypeDescriptors.strings;

import org.apache.beam.learning.katas.util.Log;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class Task {

public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

PCollection<String> words =
pipeline.apply(
Create.of("apple", "banana", "cherry", "durian", "guava", "melon")
);

PCollection<KV<String, String>> output = applyTransform(words);

output.apply(Log.ofElements());

pipeline.run();
}

static PCollection<KV<String, String>> applyTransform(PCollection<String> input) {
return input
.apply(WithKeys.<String, String>of(fruit -> fruit.substring(0, 1))
.withKeyType(strings()));
}

}
34 changes: 34 additions & 0 deletions learning/katas/java/Common Transforms/WithKeys/WithKeys/task.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http:https://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<html>
<h2>WithKeys</h2>
<p>
<b>Kata:</b> Convert each fruit name into a KV of its first letter and itself, e.g.
<code>apple => KV.of("a", "apple")</code>
</p>
<br>
<div class="hint">
Use <a href="https://beam.apache.org/releases/javadoc/2.13.0/org/apache/beam/sdk/transforms/WithKeys.html">
WithKeys</a>.
</div>
<div class="hint">
If using a lambda in Java 8, <code>withKeyType(TypeDescriptor)</code> must be called on the
result PTransform.
</div>
</html>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.learning.katas.commontransforms.withkeys;

import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;

public class TaskTest {

@Rule
public final transient TestPipeline testPipeline = TestPipeline.create();

@SuppressWarnings("unchecked")
@Test
public void groupByKey() {
PCollection<String> numbers =
testPipeline.apply(
Create.of("apple", "banana", "cherry", "durian", "guava", "melon")
);

PCollection<KV<String, String>> results = Task.applyTransform(numbers);

PAssert.that(results)
.containsInAnyOrder(
KV.of("a", "apple"),
KV.of("b", "banana"),
KV.of("c", "cherry"),
KV.of("d", "durian"),
KV.of("g", "guava"),
KV.of("m", "melon")
);

testPipeline.run().waitUntilFinish();
}

}

0 comments on commit 6d9e2a2

Please sign in to comment.