Skip to content

Commit

Permalink
This closes apache#1164
Browse files Browse the repository at this point in the history
  • Loading branch information
eljefe6a committed Nov 2, 2016
2 parents facf096 + 8183ac8 commit 8883877
Show file tree
Hide file tree
Showing 16 changed files with 61 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
Expand Down Expand Up @@ -209,7 +209,7 @@ public PCollection<KV<String, KV<URI, Double>>> apply(
final PCollectionView<Long> totalDocuments =
uriToContent
.apply("GetURIs", Keys.<URI>create())
.apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
.apply("DistinctDocs", Distinct.<URI>create())
.apply(Count.<URI>globally())
.apply(View.<Long>asSingleton());

Expand Down Expand Up @@ -238,7 +238,7 @@ public void processElement(ProcessContext c) {
// Compute a mapping from each word to the total
// number of documents in which it appears.
PCollection<KV<String, Long>> wordToDocCount = uriToWords
.apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
.apply("DistinctWords", Distinct.<KV<URI, String>>create())
.apply(Values.<String>create())
.apply("CountDocs", Count.<String>perElement());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.util.gcsfs.GcsPath;

/**
* This example uses as input Shakespeare's plays as plaintext files, and will remove any
* duplicate lines across all the files. (The output does not preserve any input order).
*
* <p>Concepts: the RemoveDuplicates transform, and how to wire transforms together.
* <p>Concepts: the Distinct transform, and how to wire transforms together.
* Demonstrates {@link org.apache.beam.sdk.io.TextIO.Read}/
* {@link RemoveDuplicates}/{@link org.apache.beam.sdk.io.TextIO.Write}.
* {@link Distinct}/{@link org.apache.beam.sdk.io.TextIO.Write}.
*
* <p>To execute this pipeline locally, specify a local output file or output prefix on GCS:
* --output=[YOUR_LOCAL_FILE | gs:https://YOUR_OUTPUT_PREFIX]
Expand Down Expand Up @@ -88,7 +88,7 @@ public static void main(String[] args)
Pipeline p = Pipeline.create(options);

p.apply("ReadLines", TextIO.Read.from(options.getInput()))
.apply(RemoveDuplicates.<String>create())
.apply(Distinct.<String>create())
.apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));

p.run().waitUntilFinish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ larger Dataflow pipeline. They include:
<li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java">DeDupExample</a>
&mdash; An example that uses Shakespeare's plays as plain text files, and
removes duplicate lines across all the files. Demonstrates the
<code>RemoveDuplicates</code>, <code>TextIO.Read</code>,
<code>Distinct</code>, <code>TextIO.Read</code>,
and <code>TextIO.Write</code> transforms, and how to wire transforms together.
</li>
<li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java">FilterExamples</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Test;
Expand Down Expand Up @@ -57,7 +57,7 @@ public void testTfIdf() throws Exception {

PCollection<String> words = wordToUriAndTfIdf
.apply(Keys.<String>create())
.apply(RemoveDuplicates.<String>create());
.apply(Distinct.<String>create());

PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Unit tests for {@link DeDupExample}. */
/** Unit tests for {@link Distinct}. */
@RunWith(JUnit4.class)
public class DeDupExampleTest {
public class DistinctExampleTest {

@Test
@Category(RunnableOnService.class)
public void testRemoveDuplicates() {
public void testDistinct() {
List<String> strings = Arrays.asList(
"k1",
"k5",
Expand All @@ -55,7 +55,7 @@ public void testRemoveDuplicates() {
.withCoder(StringUtf8Coder.of()));

PCollection<String> output =
input.apply(RemoveDuplicates.<String>create());
input.apply(Distinct.<String>create());

PAssert.that(output)
.containsInAnyOrder("k1", "k5", "k2", "k3");
Expand All @@ -64,7 +64,7 @@ public void testRemoveDuplicates() {

@Test
@Category(RunnableOnService.class)
public void testRemoveDuplicatesEmpty() {
public void testDistinctEmpty() {
List<String> strings = Arrays.asList();

Pipeline p = TestPipeline.create();
Expand All @@ -74,7 +74,7 @@ public void testRemoveDuplicatesEmpty() {
.withCoder(StringUtf8Coder.of()));

PCollection<String> output =
input.apply(RemoveDuplicates.<String>create());
input.apply(Distinct.<String>create());

PAssert.that(output).empty();
p.run().waitUntilFinish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void testBoundedToUnboundedSourceAdapter() throws Exception {
.isEqualTo(numElements);
// Unique count == numElements
PAssert
.thatSingleton(output.apply(RemoveDuplicates.<Long>create())
.thatSingleton(output.apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
.isEqualTo(numElements);
// Min == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
Expand Down Expand Up @@ -221,7 +221,7 @@ public PCollection<KV<String, KV<URI, Double>>> apply(
final PCollectionView<Long> totalDocuments =
uriToContent
.apply("GetURIs", Keys.<URI>create())
.apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
.apply("DistinctDocs", Distinct.<URI>create())
.apply(Count.<URI>globally())
.apply(View.<Long>asSingleton());

Expand Down Expand Up @@ -251,7 +251,7 @@ public void processElement(ProcessContext c) {
// Compute a mapping from each word to the total
// number of documents in which it appears.
PCollection<KV<String, Long>> wordToDocCount = uriToWords
.apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
.apply("DistinctWords", Distinct.<KV<URI, String>>create())
.apply(Values.<String>create())
.apply("CountDocs", Count.<String>perElement());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -115,7 +115,7 @@ public void testEarliest2Topics() throws Exception {
KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply(Window.<KV<String, String>>into(FixedWindows.of(batchAndWindowDuration)))
.apply(ParDo.of(new FormatKVFn()))
.apply(RemoveDuplicates.<String>create());
.apply(Distinct.<String>create());

PAssertStreaming.runAndAssertContents(p, deduped, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.FluentBackoff;
Expand Down Expand Up @@ -88,7 +88,7 @@ public PCollection<T> apply(PBegin input) {
PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime)));
if (source.requiresDeduping()) {
read = read.apply(RemoveDuplicates.withRepresentativeValueFn(
read = read.apply(Distinct.withRepresentativeValueFn(
new SerializableFunction<ValueWithRecordId<T>, byte[]>() {
@Override
public byte[] apply(ValueWithRecordId<T> input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* {@code RemoveDuplicates<T>} takes a {@code PCollection<T>} and
* returns a {@code PCollection<T>} that has all the elements of the
* input but with duplicate elements removed such that each element is
* unique within each window.
* {@code Distinct<T>} takes a {@code PCollection<T>} and
* returns a {@code PCollection<T>} that has all distinct elements of the
* input. Thus, each element is unique within each window.
*
* <p>Two values of type {@code T} are compared for equality <b>not</b> by
* regular Java {@link Object#equals}, but instead by first encoding
Expand All @@ -52,26 +51,26 @@
* <pre> {@code
* PCollection<String> words = ...;
* PCollection<String> uniqueWords =
* words.apply(RemoveDuplicates.<String>create());
* words.apply(Distinct.<String>create());
* } </pre>
*
* @param <T> the type of the elements of the input and output
* {@code PCollection}s
*/
public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
public class Distinct<T> extends PTransform<PCollection<T>,
PCollection<T>> {
/**
* Returns a {@code RemoveDuplicates<T>} {@code PTransform}.
* Returns a {@code Distinct<T>} {@code PTransform}.
*
* @param <T> the type of the elements of the input and output
* {@code PCollection}s
*/
public static <T> RemoveDuplicates<T> create() {
return new RemoveDuplicates<T>();
public static <T> Distinct<T> create() {
return new Distinct<T>();
}

/**
* Returns a {@code RemoveDuplicates<T, IdT>} {@code PTransform}.
* Returns a {@code Distinct<T, IdT>} {@code PTransform}.
*
* @param <T> the type of the elements of the input and output
* {@code PCollection}s
Expand Down Expand Up @@ -102,10 +101,10 @@ public Void apply(Iterable<Void> iter) {
}

/**
* A {@link RemoveDuplicates} {@link PTransform} that uses a {@link SerializableFunction} to
* A {@link Distinct} {@link PTransform} that uses a {@link SerializableFunction} to
* obtain a representative value for each input element.
*
* <p>Construct via {@link RemoveDuplicates#withRepresentativeValueFn(SerializableFunction)}.
* <p>Construct via {@link Distinct#withRepresentativeValueFn(SerializableFunction)}.
*
* @param <T> the type of input and output element
* @param <IdT> the type of representative values used to dedup
Expand Down Expand Up @@ -144,7 +143,7 @@ public T apply(T left, T right) {
* the specified output type descriptor.
*
* <p>Required for use of
* {@link RemoveDuplicates#withRepresentativeValueFn(SerializableFunction)}
* {@link Distinct#withRepresentativeValueFn(SerializableFunction)}
* in Java 8 with a lambda as the fn.
*
* @param type a {@link TypeDescriptor} describing the representative type of this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -56,7 +56,7 @@ public static void addCountingAsserts(PCollection<Long> input, long numElements)
// Unique count == numElements
PAssert.thatSingleton(
input
.apply(RemoveDuplicates.<Long>create())
.apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
.isEqualTo(numElements);
// Min == 0
Expand Down Expand Up @@ -141,7 +141,7 @@ public void testUnboundedInputTimestamps() {
PCollection<Long> diffs =
input
.apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
.apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
.apply("DistinctTimestamps", Distinct.<Long>create());
// This assert also confirms that diffs only has one unique value.
PAssert.thatSingleton(diffs).isEqualTo(0L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -66,7 +66,7 @@ public static void addCountingAsserts(PCollection<Long> input, long numElements)
.isEqualTo(numElements);
// Unique count == numElements
PAssert
.thatSingleton(input.apply(RemoveDuplicates.<Long>create())
.thatSingleton(input.apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
.isEqualTo(numElements);
// Min == 0
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testUnboundedSourceTimestamps() {

PCollection<Long> diffs = input
.apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
.apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
.apply("DistinctTimestamps", Distinct.<Long>create());
// This assert also confirms that diffs only has one unique value.
PAssert.thatSingleton(diffs).isEqualTo(0L);

Expand All @@ -204,7 +204,7 @@ public void testUnboundedSourceWithRate() {
PCollection<Long> diffs =
input
.apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
.apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
.apply("DistinctTimestamps", Distinct.<Long>create());
// This assert also confirms that diffs only has one unique value.
PAssert.thatSingleton(diffs).isEqualTo(0L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private static void runApproximateUniquePipeline(int sampleSize) {
PCollection<Long> approximate = input.apply(ApproximateUnique.<String>globally(sampleSize));
final PCollectionView<Long> exact =
input
.apply(RemoveDuplicates.<String>create())
.apply(Distinct.<String>create())
.apply(Count.<String>globally())
.apply(View.<Long>asSingleton());

Expand Down
Loading

0 comments on commit 8883877

Please sign in to comment.