Skip to content

Commit

Permalink
[BEAM-3640] part 2 - add STATIC_INIT,INSTANCE_INIT,ENUM_DEF,INTERFACE…
Browse files Browse the repository at this point in the history
…_DEF,CTOR_DEF lf's
  • Loading branch information
dkulp committed Feb 21, 2018
1 parent a3a6ca0 commit bb719c8
Show file tree
Hide file tree
Showing 33 changed files with 51 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,16 @@ private static class AllPrefixes
extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
private final int minPrefix;
private final int maxPrefix;

public AllPrefixes(int minPrefix) {
this(minPrefix, Integer.MAX_VALUE);
}

public AllPrefixes(int minPrefix, int maxPrefix) {
this.minPrefix = minPrefix;
this.maxPrefix = maxPrefix;
}

@ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow> {
public FormatTotalFlow(String triggerType) {
this.triggerType = triggerType;
}

@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
String[] values = c.element().getValue().split(",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public Quote() {
this.source = "";
this.quote = "";
}

public Quote(String source, String quote) {
this.source = source;
this.quote = quote;
Expand All @@ -84,6 +85,7 @@ public WeatherData() {
this.day = 0;
this.maxTemp = 0.0f;
}

public WeatherData(long year, long month, long day, double maxTemp) {
this.year = year;
this.month = month;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void emit(ApexStreamTuple<? extends WindowedValue<?>> tuple) {
*/
public interface OutputEmitter<T> {
void emit(T tuple);
};
}

/**
* The processing logic for this operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime.
UrnUtils.validateCommonUrn(SLIDING_WINDOWS_FN);
UrnUtils.validateCommonUrn(SESSION_WINDOWS_FN);
}

// This URN says that the WindowFn is just a UDF blob the Java SDK understands
// TODO: standardize such things
public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
*/
public class DirectRegistrar {
private DirectRegistrar() {}

/**
* Registers the {@link DirectRunner}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class EmptyCheckpointMark implements UnboundedSource.CheckpointMark, Seri
private static final EmptyCheckpointMark INSTANCE = new EmptyCheckpointMark();
private static final int ID = 2654265; // some constant to serve as identifier.

private EmptyCheckpointMark() {};
private EmptyCheckpointMark() {}

public static EmptyCheckpointMark get() {
return INSTANCE;
Expand Down
7 changes: 4 additions & 3 deletions sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,10 @@ page at http:https://checkstyle.sourceforge.net/config.html -->
<property name="allowNoEmptyLineBetweenFields" value="false"/>
<property name="allowMultipleEmptyLines" value="true"/>
<property name="allowMultipleEmptyLinesInsideClassMembers" value="true"/>
<property name="tokens" value="IMPORT,CLASS_DEF" />
<!-- eventually start adding: ,INTERFACE_DEF,ENUM_DEF,
STATIC_INIT,INSTANCE_INIT,METHOD_DEF,CTOR_DEF, VARIABLE_DEF -->
<property name="tokens" value="IMPORT,CLASS_DEF,INTERFACE_DEF,STATIC_INIT,
INSTANCE_INIT,ENUM_DEF,CTOR_DEF" />
<!-- we don't want blank lines between member variables, so don't use VARIABLE_DEF -->
<!-- eventually add: METHOD_DEF -->
</module>

<module name="WhitespaceAround">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,9 @@ public Coder<V> getDefaultOutputCoder(CoderRegistry registry, Coder<V> inputCode
public static class Holder<V> {
@Nullable private V value;
private boolean present;

private Holder() { }

private Holder(V value) {
set(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ public abstract static class OnTimerContextParameter extends Parameter {
@AutoValue
public abstract static class WindowParameter extends Parameter {
WindowParameter() {}

public abstract TypeDescriptor<? extends BoundedWindow> windowT();
}

Expand All @@ -387,6 +388,7 @@ public abstract static class WindowParameter extends Parameter {
public abstract static class RestrictionTrackerParameter extends Parameter {
// Package visible for AutoValue
RestrictionTrackerParameter() {}

public abstract TypeDescriptor<?> trackerT();
}

Expand All @@ -400,6 +402,7 @@ public abstract static class RestrictionTrackerParameter extends Parameter {
public abstract static class StateParameter extends Parameter {
// Package visible for AutoValue
StateParameter() {}

public abstract StateDeclaration referent();
}

Expand All @@ -411,6 +414,7 @@ public abstract static class StateParameter extends Parameter {
public abstract static class TimerParameter extends Parameter {
// Package visible for AutoValue
TimerParameter() {}

public abstract TimerDeclaration referent();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,17 @@ public static void mergeWindows(WindowFn<?, IntervalWindow>.MergeContext c) thro
private static class MergeCandidate {
@Nullable private IntervalWindow union;
private final List<IntervalWindow> parts;

public MergeCandidate() {
union = null;
parts = new ArrayList<>();
}

public MergeCandidate(IntervalWindow window) {
union = window;
parts = new ArrayList<>(Arrays.asList(window));
}

public boolean intersects(IntervalWindow window) {
return union == null || union.intersects(window);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private static class ObjectIdentityBoolean {
public ObjectIdentityBoolean(boolean value) {
this.value = value;
}

public boolean getValue() {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public int hashCode() {

private static class NotSerializableObjectCoder extends AtomicCoder<NotSerializableObject> {
private NotSerializableObjectCoder() { }

private static final NotSerializableObjectCoder INSTANCE = new NotSerializableObjectCoder();

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ private enum LifecycleState {
INSIDE_BUNDLE,
TORN_DOWN
}

private LifecycleState state = LifecycleState.UNINITIALIZED;

@Setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,7 @@ private static class TestDummy { }

private static class TestDummyCoder extends AtomicCoder<TestDummy> {
private TestDummyCoder() { }

private static final TestDummyCoder INSTANCE = new TestDummyCoder();

@JsonCreator
Expand Down Expand Up @@ -1017,6 +1018,7 @@ private static class MainOutputDummyFn extends DoFn<Integer, TestDummy> {
public MainOutputDummyFn(TupleTag<Integer> intOutputTag) {
this.intOutputTag = intOutputTag;
}

@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TestDummy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void processElement(ProcessContext c, BoundedWindow window) {
public WindowedCount(WindowFn<? super String, ?> windowFn) {
this.windowFn = windowFn;
}

@Override
public PCollection<String> expand(PCollection<String> in) {
return in.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ public BeamSqlTable findTable(String tableName){

private static class BeamCalciteTable implements ScannableTable, Serializable {
private RowType beamRowType;

public BeamCalciteTable(RowType beamRowType) {
this.beamRowType = beamRowType;
}

@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return CalciteUtils.toCalciteRowType(this.beamRowType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
public BeamSqlCurrentDateExpression() {
super(Collections.emptyList(), SqlTypeName.DATE);
}

@Override public boolean accept() {
return getOperands().size() == 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
public BeamSqlCurrentTimeExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.TIME);
}

@Override public boolean accept() {
int opCount = getOperands().size();
return opCount <= 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression {
public BeamSqlCurrentTimestampExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.TIMESTAMP);
}

@Override public boolean accept() {
int opCount = getOperands().size();
return opCount <= 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class BeamSqlDateCeilExpression extends BeamSqlExpression {
public BeamSqlDateCeilExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.TIMESTAMP);
}

@Override public boolean accept() {
return operands.size() == 2
&& opType(1) == SqlTypeName.SYMBOL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class BeamSqlDateFloorExpression extends BeamSqlExpression {
public BeamSqlDateFloorExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.DATE);
}

@Override public boolean accept() {
return operands.size() == 2
&& opType(1) == SqlTypeName.SYMBOL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class BeamSqlExtractExpression extends BeamSqlExpression {
public BeamSqlExtractExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.BIGINT);
}

@Override public boolean accept() {
return operands.size() == 2
&& opType(1) == SqlTypeName.TIMESTAMP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public abstract class BeamSqlLogicalExpression extends BeamSqlExpression {
private BeamSqlLogicalExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
super(operands, outputType);
}

public BeamSqlLogicalExpression(List<BeamSqlExpression> operands) {
this(operands, SqlTypeName.BOOLEAN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class CheckSize implements SerializableFunction<Iterable<Row>, Void> {
public CheckSize(int size) {
this.size = size;
}

@Override public Void apply(Iterable<Row> input) {
int count = 0;
for (Row row : input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ public int read(byte[] b, int off, int len) throws IOException {
* first access to {@link #next()} or {@link #hasNext()}.
*/
public static class DataStreamDecoder<T> implements Iterator<T> {
private enum State { READ_REQUIRED, HAS_NEXT, EOF };

private enum State { READ_REQUIRED, HAS_NEXT, EOF }

private final CountingInputStream countingInputStream;
private final PushbackInputStream pushbackInputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public static Iterator<ByteString> forFirstChunk(
* pre-fetch any future chunks and blocks whenever required to fetch the next block.
*/
static class LazyBlockingStateFetchingIterator implements Iterator<ByteString> {
private enum State { READ_REQUIRED, HAS_NEXT, EOF };

private enum State { READ_REQUIRED, HAS_NEXT, EOF }

private final BeamFnStateClient beamFnStateClient;
private final StateRequest stateRequestForFirstChunk;
private State currentState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class BeamFnDataReadRunnerTest {
throw new ExceptionInInitializerError(e);
}
}

private static final BeamFnApi.Target INPUT_TARGET = BeamFnApi.Target.newBuilder()
.setPrimitiveTransformReference("1")
.setName("out")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class BeamFnDataWriteRunnerTest {
throw new ExceptionInInitializerError(e);
}
}

private static final BeamFnApi.Target OUTPUT_TARGET = BeamFnApi.Target.newBuilder()
.setPrimitiveTransformReference("1")
.setName("out")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ private BoundedElasticsearchSource(Read spec, @Nullable String shardPreference,
this.numSlices = numSlices;
this.sliceId = sliceId;
}

@Override
public List<? extends BoundedSource<String>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static class DelegatingDynamicDestinations<T, DestinationT>
DelegatingDynamicDestinations(DynamicDestinations<T, DestinationT> inner) {
this.inner = inner;
}

@Override
public DestinationT getDestination(ValueInSingleWindow<T> element) {
return inner.getDestination(element);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ private static class TableRowBuilder {
public TableRowBuilder() {
row = new TableRow();
}

public TableRowBuilder set(String fieldName, Object value) {
row.set(fieldName, value);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* {@link Bid}.
*/
public class Event implements KnownSize, Serializable {

private enum Tag {
PERSON(0),
AUCTION(1),
Expand All @@ -42,6 +43,7 @@ private enum Tag {
this.value = value;
}
}

private static final Coder<Integer> INT_CODER = VarIntCoder.of();

public static final Coder<Event> CODER =
Expand Down

0 comments on commit bb719c8

Please sign in to comment.