Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Schema field selections in DoFn using NewDoFn injection #8311

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
foo;
o
Foo.
  • Loading branch information
Reuven Lax committed Apr 25, 2019
commit 01e8bd8f30107319b3e318a90140610ac5266e04
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -412,34 +415,11 @@ public void testSchemasPassedThrough() {
}

/** Pojo used for testing. */
@DefaultSchema(JavaFieldSchema.class)
static class InferredPojo2 {
final Integer integerField;
final String stringField;

@SchemaCreate
InferredPojo2(String stringField, Integer integerField) {
this.stringField = stringField;
this.integerField = integerField;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof InferredPojo2)) {
return false;
}
InferredPojo2 that = (InferredPojo2) o;
return Objects.equals(integerField, that.integerField)
&& Objects.equals(stringField, that.stringField);
}

@Override
public int hashCode() {
return Objects.hash(integerField, stringField);
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract static class Inferred2 {
abstract Integer getIntegerField();
abstract String getStringField();
}

@Test
Expand All @@ -456,8 +436,8 @@ public void testSchemaConversionPipeline() {
ParDo.of(
new DoFn<InferredPojo, String>() {
@ProcessElement
public void process(@Element InferredPojo2 pojo, OutputReceiver<String> r) {
r.output(pojo.stringField + ":" + pojo.integerField);
public void process(@Element Inferred2 pojo, OutputReceiver<String> r) {
r.output(pojo.getStringField() + ":" + pojo.getIntegerField());
}
}));
PAssert.that(output).containsInAnyOrder("a:1", "b:2", "c:3");
Expand Down Expand Up @@ -503,55 +483,29 @@ public void process(@Element Nested nested, OutputReceiver<String> r) {
pipeline.run();
}

@DefaultSchema(JavaFieldSchema.class)
static class PojoForExtraction {
final Integer integerField;
final String stringField;
List<Integer> ints;

@SchemaCreate
public PojoForExtraction(String stringField, Integer integerField, List<Integer> ints) {
this.integerField = integerField;
this.stringField = stringField;
this.ints = ints;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

PojoForExtraction that = (PojoForExtraction) o;
return Objects.equals(integerField, that.integerField)
&& Objects.equals(stringField, that.stringField)
&& Objects.equals(ints, that.ints);
}

@Override
public int hashCode() {
return Objects.hash(integerField, stringField, ints);
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract static class ForExtraction {
abstract Integer getIntegerField();
abstract String getStringField();
abstract List<Integer> getInts();
}

@Test
@Category({ValidatesRunner.class, UsesSchema.class})
public void testSchemaFieldSelectionUnboxing() {
List<PojoForExtraction> pojoList =
List<ForExtraction> pojoList =
Lists.newArrayList(
new PojoForExtraction("a", 1, Lists.newArrayList(1, 2)),
new PojoForExtraction("b", 2, Lists.newArrayList(2, 3)),
new PojoForExtraction("c", 3, Lists.newArrayList(3, 4)));
new AutoValue_ParDoSchemaTest_ForExtraction(1, "a", Lists.newArrayList(1, 2)),
new AutoValue_ParDoSchemaTest_ForExtraction(2, "b", Lists.newArrayList(2, 3)),
new AutoValue_ParDoSchemaTest_ForExtraction(3, "c", Lists.newArrayList(3, 4)));

PCollection<String> output =
pipeline
.apply(Create.of(pojoList))
.apply(
ParDo.of(
new DoFn<PojoForExtraction, String>() {
new DoFn<ForExtraction, String>() {
// Read the list twice as two equivalent types to ensure that Beam properly
// converts.
@ProcessElement
Expand Down Expand Up @@ -580,18 +534,18 @@ public void process(
@Test
@Category({ValidatesRunner.class, UsesSchema.class})
public void testSchemaFieldDescriptorSelectionUnboxing() {
List<PojoForExtraction> pojoList =
List<ForExtraction> pojoList =
Lists.newArrayList(
new PojoForExtraction("a", 1, Lists.newArrayList(1, 2)),
new PojoForExtraction("b", 2, Lists.newArrayList(2, 3)),
new PojoForExtraction("c", 3, Lists.newArrayList(3, 4)));
new AutoValue_ParDoSchemaTest_ForExtraction(1,"a", Lists.newArrayList(1, 2)),
new AutoValue_ParDoSchemaTest_ForExtraction(2,"b", Lists.newArrayList(2, 3)),
new AutoValue_ParDoSchemaTest_ForExtraction(3,"c", Lists.newArrayList(3, 4)));

PCollection<String> output =
pipeline
.apply(Create.of(pojoList))
.apply(
ParDo.of(
new DoFn<PojoForExtraction, String>() {
new DoFn<ForExtraction, String>() {
@FieldAccess("stringSelector")
final FieldAccessDescriptor stringSelector =
FieldAccessDescriptor.withFieldNames("stringField");
Expand All @@ -618,69 +572,48 @@ public void process(
pipeline.run();
}

@DefaultSchema(JavaFieldSchema.class)
static class NestedPojoForExtraction {
PojoForExtraction inner;

@SchemaCreate
public NestedPojoForExtraction(PojoForExtraction inner) {
this.inner = inner;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof NestedPojoForExtraction)) {
return false;
}
NestedPojoForExtraction that = (NestedPojoForExtraction) o;
return Objects.equals(inner, that.inner);
}

@Override
public int hashCode() {
return Objects.hash(inner);
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract static class NestedForExtraction {
abstract ForExtraction getInner();
}

@Test
@Category({ValidatesRunner.class, UsesSchema.class})
public void testSchemaFieldSelectionNested() {
List<PojoForExtraction> pojoList =
List<ForExtraction> pojoList =
Lists.newArrayList(
new PojoForExtraction("a", 1, Lists.newArrayList(1, 2)),
new PojoForExtraction("b", 2, Lists.newArrayList(2, 3)),
new PojoForExtraction("c", 3, Lists.newArrayList(3, 4)));
List<NestedPojoForExtraction> outerList =
pojoList.stream().map(NestedPojoForExtraction::new).collect(Collectors.toList());
new AutoValue_ParDoSchemaTest_ForExtraction(1,"a", Lists.newArrayList(1, 2)),
new AutoValue_ParDoSchemaTest_ForExtraction(2, "b", Lists.newArrayList(2, 3)),
new AutoValue_ParDoSchemaTest_ForExtraction(3,"c", Lists.newArrayList(3, 4)));
List<NestedForExtraction> outerList =
pojoList.stream().map(AutoValue_ParDoSchemaTest_NestedForExtraction::new).collect(Collectors.toList());

PCollection<String> output =
pipeline
.apply(Create.of(outerList))
.apply(
ParDo.of(
new DoFn<NestedPojoForExtraction, String>() {
new DoFn<NestedForExtraction, String>() {

@ProcessElement
public void process(
@FieldAccess("inner.*") PojoForExtraction pojo,
@FieldAccess("inner") PojoForExtraction pojo2,
@FieldAccess("inner.*") ForExtraction extracted,
@FieldAccess("inner") ForExtraction extracted1,
@FieldAccess("inner.stringField") String stringField,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a FieldAccess has a comma-separated list of selectors? like "inner.stringField, inner.integerField"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually the grammar for our expression parser doesn't include commas, so it will fail with parse error.

@FieldAccess("inner.integerField") int integerField,
@FieldAccess("inner.ints") List<Integer> intArray,
OutputReceiver<String> r) {
assertEquals(pojo, pojo2);
assertEquals(stringField, pojo.stringField);
assertEquals(integerField, (int) pojo.integerField);
assertEquals(intArray, pojo.ints);
assertEquals(extracted, extracted1);
assertEquals(stringField, extracted.getStringField());
assertEquals(integerField, (int) extracted.getIntegerField());
assertEquals(intArray, extracted.getInts());
r.output(
pojo.stringField
extracted.getStringField()
+ ":"
+ pojo.integerField
+ extracted.getIntegerField()
+ ":"
+ pojo.ints.toString());
+ extracted.getInts().toString());
}
}));
PAssert.that(output).containsInAnyOrder("a:1:[1, 2]", "b:2:[2, 3]", "c:3:[3, 4]");
Expand Down