Skip to content

Commit

Permalink
Rename schema.
Browse files Browse the repository at this point in the history
  • Loading branch information
Reuven Lax committed Apr 29, 2019
1 parent d1609b7 commit ba95f57
Show file tree
Hide file tree
Showing 7 changed files with 528 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.common.collect.Iterators;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -539,12 +538,15 @@ public String toString() {
return "*";
}

List<String> singleSelectors = getFieldsAccessed().stream()
.map(FieldDescriptor::getFieldName)
.collect(Collectors.toList());
List<String> nestedSelectors = getNestedFieldsAccessed().entrySet().stream()
.map(e -> e.getKey().getFieldName() + "." + e.getValue().toString())
.collect(Collectors.toList());;
return String.join(", ", Iterables.concat(singleSelectors, nestedSelectors));
List<String> singleSelectors =
getFieldsAccessed().stream()
.map(FieldDescriptor::getFieldName)
.collect(Collectors.toList());
List<String> nestedSelectors =
getNestedFieldsAccessed().entrySet().stream()
.map(e -> e.getKey().getFieldName() + "." + e.getValue().toString())
.collect(Collectors.toList());
;
return String.join(", ", Iterables.concat(singleSelectors, nestedSelectors));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.sdk.schemas.transforms;

import com.google.common.collect.Lists;
Expand All @@ -30,8 +29,9 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

/** A transform to add new nullable fields to a PCollection's schema. Elements are extended to
* have the new schema, with null values used for the new fields. Any new fields added must be nullable.
/**
* A transform to add new nullable fields to a PCollection's schema. Elements are extended to have
* the new schema, with null values used for the new fields. Any new fields added must be nullable.
*
* <p>Example use:
*
Expand All @@ -40,12 +40,14 @@
* events.apply(AddFields.fields(Field.nullable("newField1", FieldType.STRING),
* Field.nullable("newField2", FieldType.INT64)));
* }</pre>
**/
*/
public class AddFields {
/** Add all specified fields to the schema. */
public static <T> Inner<T> fields(Field... fields) {
return fields(Arrays.asList(fields));
}

/** Add all specified fields to the schema. */
public static <T> Inner<T> fields(List<Field> fields) {
for (Field field : fields) {
if (!field.getType().getNullable()) {
Expand All @@ -69,20 +71,24 @@ private Inner(List<Field> newFields) {
@Override
public PCollection<Row> expand(PCollection<T> input) {
Schema inputSchema = input.getSchema();
Schema outputSchema = Schema.builder()
.addFields(inputSchema.getFields())
.addFields(newFields)
.build();
Schema outputSchema =
Schema.builder().addFields(inputSchema.getFields()).addFields(newFields).build();

return input.apply(ParDo.of(new DoFn<T, Row>() {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<Row> o) {
List<Object> values = Lists.newArrayListWithCapacity(outputSchema.getFieldCount());
values.addAll(row.getValues());
values.addAll(nullValues);
Row newRow = Row.withSchema(outputSchema).attachValues(values).build();
o.output(newRow);
}})).setRowSchema(outputSchema);
return input
.apply(
ParDo.of(
new DoFn<T, Row>() {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<Row> o) {
List<Object> values =
Lists.newArrayListWithCapacity(outputSchema.getFieldCount());
values.addAll(row.getValues());
values.addAll(nullValues);
Row newRow = Row.withSchema(outputSchema).attachValues(values).build();
o.output(newRow);
}
}))
.setRowSchema(outputSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.sdk.schemas.transforms;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.FieldAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

/** A transform to drop fields from a schema.
/**
* A transform to drop fields from a schema.
*
* <p>This transform acts as the inverse of the {@link Select} transform. A list of fields to drop is specified, and
* all fields in the schema that are not specified are selected. For example:
* <p>This transform acts as the inverse of the {@link Select} transform. A list of fields to drop
* is specified, and all fields in the schema that are not specified are selected. For example:
*
* <pre>{@code @DefaultSchema(JavaFieldSchema.class)
* public class UserEvent {
Expand All @@ -66,13 +57,12 @@
* // Drop the latitude field.
* PCollection<Row> noLatitude = events.apply(DropFields.fields("location.latitude"));
* }</pre>
**/
*/
public class DropFields {
public static <T> Inner<T> fields(String... fields) {
return fields(FieldAccessDescriptor.withFieldNames(fields));
}


public static <T> Inner<T> fields(Integer... fieldIds) {
return fields(FieldAccessDescriptor.withFieldIds(fieldIds));
}
Expand All @@ -90,23 +80,26 @@ private Inner(FieldAccessDescriptor fieldsToDrop) {

FieldAccessDescriptor complement(Schema inputSchema, FieldAccessDescriptor input) {
Set<String> fieldNamesToSelect = Sets.newHashSet();
Map<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> nestedFieldsToSelect = Maps.newHashMap();
Map<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> nestedFieldsToSelect =
Maps.newHashMap();
for (int i = 0; i < inputSchema.getFieldCount(); ++i) {
if (input.fieldIdsAccessed().contains(i)) {
// This field is selected, so exclude it from the complement.
continue;
}
Field field = inputSchema.getField(i);
Map<Integer, FieldAccessDescriptor.FieldDescriptor> nestedFields =
input.getNestedFieldsAccessed().entrySet().stream()
.map(Map.Entry::getKey)
input.getNestedFieldsAccessed().entrySet().stream()
.map(Map.Entry::getKey)
.collect(Collectors.toMap(k -> k.getFieldId(), k -> k));

FieldAccessDescriptor.FieldDescriptor fieldDescriptor = nestedFields.get(i);
if (fieldDescriptor != null) {
// Some subfields are selected, so recursively calculate the complementary subfields to select.
// Some subfields are selected, so recursively calculate the complementary subfields to
// select.
FieldType fieldType = inputSchema.getField(i).getType();
for (FieldAccessDescriptor.FieldDescriptor.Qualifier qualifier : fieldDescriptor.getQualifiers()) {
for (FieldAccessDescriptor.FieldDescriptor.Qualifier qualifier :
fieldDescriptor.getQualifiers()) {
switch (qualifier.getKind()) {
case LIST:
fieldType = fieldType.getCollectionElementType();
Expand All @@ -119,17 +112,19 @@ FieldAccessDescriptor complement(Schema inputSchema, FieldAccessDescriptor input
}
}
Preconditions.checkArgument(fieldType.getTypeName().isCompositeType());
FieldAccessDescriptor nestedDescriptor = input.getNestedFieldsAccessed().get(fieldDescriptor);
nestedFieldsToSelect.put(fieldDescriptor, complement(fieldType.getRowSchema(), nestedDescriptor));
FieldAccessDescriptor nestedDescriptor =
input.getNestedFieldsAccessed().get(fieldDescriptor);
nestedFieldsToSelect.put(
fieldDescriptor, complement(fieldType.getRowSchema(), nestedDescriptor));
} else {
// Neither the field nor the subfield is selected. This means we should select it.
fieldNamesToSelect.add(field.getName());
}
}

FieldAccessDescriptor fieldAccess = FieldAccessDescriptor.withFieldNames(fieldNamesToSelect);
for (Map.Entry<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor>
entry : nestedFieldsToSelect.entrySet()) {
for (Map.Entry<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> entry :
nestedFieldsToSelect.entrySet()) {
fieldAccess = fieldAccess.withNestedField(entry.getKey(), entry.getValue());
}
return fieldAccess.resolve(inputSchema);
Expand All @@ -138,10 +133,10 @@ FieldAccessDescriptor complement(Schema inputSchema, FieldAccessDescriptor input
@Override
public PCollection<Row> expand(PCollection<T> input) {
Schema inputSchema = input.getSchema();
FieldAccessDescriptor selectDescriptor = complement(
inputSchema, fieldsToDrop.resolve(inputSchema));
FieldAccessDescriptor selectDescriptor =
complement(inputSchema, fieldsToDrop.resolve(inputSchema));

return Select.<T>fieldAccess(selectDescriptor).expand(input);
}
}
}
}
Loading

0 comments on commit ba95f57

Please sign in to comment.