Skip to content

Commit

Permalink
Add new schema transforms.
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax authored and Reuven Lax committed Apr 29, 2019
1 parent 77b295b commit ad66a04
Show file tree
Hide file tree
Showing 7 changed files with 969 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
Expand Down Expand Up @@ -332,6 +333,23 @@ public Map<Integer, FieldAccessDescriptor> nestedFieldsById() {
.collect(Collectors.toMap(f -> f.getKey().getFieldId(), f -> f.getValue()));
}

/** Returns true if this descriptor references only a single, non-wildcard field. */
public boolean referencesSingleField() {
if (getAllFields()) {
return false;
}

if (getFieldsAccessed().size() == 1 && getNestedFieldsAccessed().isEmpty()) {
return true;
}

if (getFieldsAccessed().isEmpty() && getNestedFieldsAccessed().size() == 1) {
return getNestedFieldsAccessed().values().iterator().next().referencesSingleField();
}

return false;
}

/**
* Resolve the {@link FieldAccessDescriptor} against a schema.
*
Expand Down Expand Up @@ -513,4 +531,22 @@ private static void validateFieldDescriptor(Schema schema, FieldDescriptor field
}
}
}

@Override
public String toString() {
if (getAllFields()) {
return "*";
}

List<String> singleSelectors =
getFieldsAccessed().stream()
.map(FieldDescriptor::getFieldName)
.collect(Collectors.toList());
List<String> nestedSelectors =
getNestedFieldsAccessed().entrySet().stream()
.map(e -> e.getKey().getFieldName() + "." + e.getValue().toString())
.collect(Collectors.toList());
;
return String.join(", ", Iterables.concat(singleSelectors, nestedSelectors));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.transforms;

import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

/**
* A transform to add new nullable fields to a PCollection's schema. Elements are extended to have
* the new schema, with null values used for the new fields. Any new fields added must be nullable.
*
* <p>Example use:
*
* <pre>{@code PCollection<Event> events = readEvents();
* PCollection<Row> augmentedEvents =
* events.apply(AddFields.fields(Field.nullable("newField1", FieldType.STRING),
* Field.nullable("newField2", FieldType.INT64)));
* }</pre>
*/
public class AddFields {
/** Add all specified fields to the schema. */
public static <T> Inner<T> fields(Field... fields) {
return fields(Arrays.asList(fields));
}

/** Add all specified fields to the schema. */
public static <T> Inner<T> fields(List<Field> fields) {
for (Field field : fields) {
if (!field.getType().getNullable()) {
throw new IllegalArgumentException(
"Only nullable fields can be added to an existing schema.");
}
}
return new Inner<>(fields);
}

/** Inner PTransform for AddFields. */
public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> {
private final List<Field> newFields;
private final List<Object> nullValues;

private Inner(List<Field> newFields) {
this.newFields = newFields;
this.nullValues = Collections.nCopies(newFields.size(), null);
}

@Override
public PCollection<Row> expand(PCollection<T> input) {
Schema inputSchema = input.getSchema();
Schema outputSchema =
Schema.builder().addFields(inputSchema.getFields()).addFields(newFields).build();

return input
.apply(
ParDo.of(
new DoFn<T, Row>() {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<Row> o) {
List<Object> values =
Lists.newArrayListWithCapacity(outputSchema.getFieldCount());
values.addAll(row.getValues());
values.addAll(nullValues);
Row newRow = Row.withSchema(outputSchema).attachValues(values).build();
o.output(newRow);
}
}))
.setRowSchema(outputSchema);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.transforms;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

/**
* A transform to drop fields from a schema.
*
* <p>This transform acts as the inverse of the {@link Select} transform. A list of fields to drop
* is specified, and all fields in the schema that are not specified are selected. For example:
*
* <pre>{@code @DefaultSchema(JavaFieldSchema.class)
* public class UserEvent {
* public String userId;
* public String eventId;
* public int eventType;
* public Location location;
* }}</pre>
*
* <pre>{@code @DefaultSchema(JavaFieldSchema.class)
* public class Location {
* public double latitude;
* public double longtitude;
* }
*
* PCollection<UserEvent> events = readUserEvents();
* // Drop the location field.
* PCollection<Row> noLocation = events.apply(DropFields.fields("location"));
* // Drop the latitude field.
* PCollection<Row> noLatitude = events.apply(DropFields.fields("location.latitude"));
* }</pre>
*/
public class DropFields {
public static <T> Inner<T> fields(String... fields) {
return fields(FieldAccessDescriptor.withFieldNames(fields));
}

public static <T> Inner<T> fields(Integer... fieldIds) {
return fields(FieldAccessDescriptor.withFieldIds(fieldIds));
}

public static <T> Inner<T> fields(FieldAccessDescriptor fieldsToDrop) {
return new Inner<>(fieldsToDrop);
}

public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> {
private final FieldAccessDescriptor fieldsToDrop;

private Inner(FieldAccessDescriptor fieldsToDrop) {
this.fieldsToDrop = fieldsToDrop;
}

FieldAccessDescriptor complement(Schema inputSchema, FieldAccessDescriptor input) {
Set<String> fieldNamesToSelect = Sets.newHashSet();
Map<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> nestedFieldsToSelect =
Maps.newHashMap();
for (int i = 0; i < inputSchema.getFieldCount(); ++i) {
if (input.fieldIdsAccessed().contains(i)) {
// This field is selected, so exclude it from the complement.
continue;
}
Field field = inputSchema.getField(i);
Map<Integer, FieldAccessDescriptor.FieldDescriptor> nestedFields =
input.getNestedFieldsAccessed().entrySet().stream()
.map(Map.Entry::getKey)
.collect(Collectors.toMap(k -> k.getFieldId(), k -> k));

FieldAccessDescriptor.FieldDescriptor fieldDescriptor = nestedFields.get(i);
if (fieldDescriptor != null) {
// Some subfields are selected, so recursively calculate the complementary subfields to
// select.
FieldType fieldType = inputSchema.getField(i).getType();
for (FieldAccessDescriptor.FieldDescriptor.Qualifier qualifier :
fieldDescriptor.getQualifiers()) {
switch (qualifier.getKind()) {
case LIST:
fieldType = fieldType.getCollectionElementType();
break;
case MAP:
fieldType = fieldType.getMapValueType();
break;
default:
throw new RuntimeException("Unexpected field descriptor type.");
}
}
Preconditions.checkArgument(fieldType.getTypeName().isCompositeType());
FieldAccessDescriptor nestedDescriptor =
input.getNestedFieldsAccessed().get(fieldDescriptor);
nestedFieldsToSelect.put(
fieldDescriptor, complement(fieldType.getRowSchema(), nestedDescriptor));
} else {
// Neither the field nor the subfield is selected. This means we should select it.
fieldNamesToSelect.add(field.getName());
}
}

FieldAccessDescriptor fieldAccess = FieldAccessDescriptor.withFieldNames(fieldNamesToSelect);
for (Map.Entry<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> entry :
nestedFieldsToSelect.entrySet()) {
fieldAccess = fieldAccess.withNestedField(entry.getKey(), entry.getValue());
}
return fieldAccess.resolve(inputSchema);
}

@Override
public PCollection<Row> expand(PCollection<T> input) {
Schema inputSchema = input.getSchema();
FieldAccessDescriptor selectDescriptor =
complement(inputSchema, fieldsToDrop.resolve(inputSchema));

return Select.<T>fieldAccess(selectDescriptor).expand(input);
}
}
}
Loading

0 comments on commit ad66a04

Please sign in to comment.