Skip to content

Commit

Permalink
[BEAM-2939] Fold Sizes sub-interfaces into RestrictionTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
lukecwik committed Apr 22, 2020
1 parent 7ac97ea commit a02eb60
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 168 deletions.
11 changes: 5 additions & 6 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes.Progress;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.util.NameUtils;
Expand Down Expand Up @@ -670,7 +669,7 @@ private static class UnboundedSourceAsSDFRestrictionTracker<
OutputT, CheckpointT extends CheckpointMark>
extends RestrictionTracker<
KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue<OutputT>[]>
implements Sizes.HasProgress {
implements HasProgress {
private final KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction;
private final PipelineOptions pipelineOptions;
private UnboundedSource.UnboundedReader<OutputT> currentReader;
Expand Down Expand Up @@ -770,7 +769,7 @@ public void checkDone() throws IllegalStateException {
public Progress getProgress() {
// We treat the empty source as implicitly done.
if (currentRestriction().getKey() instanceof EmptyUnboundedSource) {
return Sizes.Progress.from(1, 0);
return RestrictionTracker.Progress.from(1, 0);
}

if (currentReader == null) {
Expand All @@ -788,7 +787,7 @@ public Progress getProgress() {
if (size != UnboundedReader.BACKLOG_UNKNOWN) {
// The UnboundedSource/UnboundedReader API has no way of reporting how much work
// has been completed so runners can only see the work remaining changing.
return Sizes.Progress.from(0, size);
return RestrictionTracker.Progress.from(0, size);
}

// TODO: Support "global" backlog reporting
Expand All @@ -798,7 +797,7 @@ public Progress getProgress() {
// }

// We treat unknown as 0 progress
return Sizes.Progress.from(0, 1);
return RestrictionTracker.Progress.from(0, 1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -650,10 +649,10 @@ public interface MultiOutputReceiver {
* <li>The type of restrictions used by all of these methods must be the same.
* <li>It <i>must</i> define a {@link GetInitialRestriction} method.
* <li>It <i>should</i> define a {@link GetSize} method or ensure that the {@link
* RestrictionTracker} implements one of the interfaces in {@link Sizes}. Poor auto-scaling
* of workers and/or splitting may result if neither is defined or if the size is an
* inaccurate representation of work. See {@link GetSize} and {@link Sizes} for further
* details.
* RestrictionTracker} implements one of the progress or sizing interfaces. Poor
* auto-scaling of workers and/or splitting may result if neither is defined or if the size
* is an inaccurate representation of work. See {@link GetSize} and {@link
* RestrictionTracker} for further details.
* <li>It <i>should</i> define a {@link SplitRestriction} method. This method enables runners to
* perform bulk splitting initially allowing for a rapid increase in parallelism. See {@link
* RestrictionTracker#trySplit} for details about splitting when the current element and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes.HasSize;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasSize;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.UserCodeException;
Expand Down Expand Up @@ -398,7 +397,7 @@ RestrictionTracker<RestrictionT, PositionT> invokeNewTracker(
}

public static class DefaultGetSize {
/** Uses {@link Sizes.HasProgress} or {@link Sizes.HasSize} to produce the size. */
/** Uses {@link HasProgress} or {@link HasSize} to produce the size. */
@SuppressWarnings("unused")
public static <InputT, OutputT> double invokeGetSize(
DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasSize;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
Expand Down Expand Up @@ -98,8 +99,8 @@ <RestrictionT> RestrictionT invokeGetInitialRestriction(
*
* <ol>
* <li>get the work remaining from the {@link RestrictionTracker} if it supports {@link
* Sizes.HasProgress}.
* <li>get the size if the {@link RestrictionTracker} supports {@link Sizes.HasSize}.
* HasProgress}.
* <li>get the size if the {@link RestrictionTracker} supports {@link HasSize}.
* <li>returning the constant {@link 1.0}.
* </ol>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasSize;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
Expand All @@ -40,7 +41,7 @@
*/
@Experimental(Kind.SPLITTABLE_DO_FN)
public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKey>
implements Sizes.HasSize {
implements HasSize {
/* An empty range which contains no keys. */
@VisibleForTesting
static final ByteKeyRange NO_KEYS = ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x00));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasSize;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;

/**
* A {@link RestrictionTracker} for claiming offsets in an {@link OffsetRange} in a monotonically
* increasing fashion.
*/
@Experimental(Kind.SPLITTABLE_DO_FN)
public class OffsetRangeTracker extends RestrictionTracker<OffsetRange, Long>
implements Sizes.HasSize {
public class OffsetRangeTracker extends RestrictionTracker<OffsetRange, Long> implements HasSize {
private OffsetRange range;
@Nullable private Long lastClaimedOffset = null;
@Nullable private Long lastAttemptedOffset = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
*/
package org.apache.beam.sdk.transforms.splittabledofn;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.transforms.DoFn;

/**
* Manages access to the restriction and keeps track of its claimed part for a <a
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
*
* <p>{@link RestrictionTracker}s which work on restrictions which have a fixed known size should
* implement {@link HasSize} otherwise {@link RestrictionTracker}s which handle restrictions that
* can grow or shrink over time should implement {@link HasProgress}.
*/
@Experimental(Kind.SPLITTABLE_DO_FN)
public abstract class RestrictionTracker<RestrictionT, PositionT> {
Expand Down Expand Up @@ -89,4 +94,96 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
* work remaining in the restriction.
*/
public abstract void checkDone() throws IllegalStateException;

/**
* {@link RestrictionTracker}s which work on restrictions which have a fixed size should implement
* this interface.
*
* <p>By default, the initial amount of work will be captured from {@link #getSize()} before
* processing of the restriction begins. Afterwards progress will be reported as:
*
* <ul>
* <li>{@code work_remaining = getSize()}
* <li>{@code work_completed = max(initial_amount_of_work - work_remaining, 0)}
* </ul>
*/
public interface HasSize {
/**
* A representation for the amount of known work represented as a size. Size {@code double}
* representations should preferably represent a linear space.
*
* <p>It is up to each restriction tracker to convert between their natural representation of
* outstanding work and this representation. For example:
*
* <ul>
* <li>Block based file source (e.g. Avro): From the end of the current block, the remaining
* number of bytes to the end of the restriction.
* <li>Key range based source (e.g. BigQuery, Bigtable, ...): Scale the start key to be one
* and end key to be zero and interpolate the position of the next splittable key as the
* size. If information about the probability density function or cumulative distribution
* function is available, size interpolation can be improved. Alternatively, if the number
* of encoded bytes for the keys and values is known for the key range, the number of
* remaining bytes can be used.
* </ul>
*
* @return The size of the restriction, must be {@code >= 0}.
*/
double getSize();
}

/**
* {@link RestrictionTracker}s which handle restrictions that can grow or shrink over time should
* implement this interface.
*/
public interface HasProgress {
/**
* A representation for the amount of known completed and known remaining work.
*
* <p>It is up to each restriction tracker to convert between their natural representation of
* completed and remaining work and the {@code double} representation. For example:
*
* <ul>
* <li>Pull based queue based source (e.g. Pubsub): The local/global size available in number
* of messages or number of {@code message bytes} that have processed and the number of
* messages or number of {@code message bytes} that are outstanding.
* <li>Claim set: The local/global number of objects that have been claimed and the number of
* known objects that remain to be claimed.
* </ul>
*
* <p>The work completed and work remaining must be of the same scale whether that be number of
* messages or number of bytes and should never represent two distinct unit types.
*/
Progress getProgress();
}

/**
* A representation for the amount of known completed and remaining work. See {@link
* HasProgress#getProgress()} for details.
*/
@AutoValue
public abstract static class Progress {

/**
* A representation for the amount of known completed and remaining work. See {@link
* HasProgress#getProgress()} for details.
*
* @param workCompleted Must be {@code >= 0}.
* @param workRemaining Must be {@code >= 0}.
*/
public static Progress from(double workCompleted, double workRemaining) {
if (workCompleted < 0 || workRemaining < 0) {
throw new IllegalArgumentException(
String.format(
"Work completed and work remaining must be greater than or equal to zero but were %s and %s.",
workCompleted, workRemaining));
}
return new AutoValue_RestrictionTracker_Progress(workCompleted, workRemaining);
}

/** The known amount of completed work. */
public abstract double getWorkCompleted();

/** The known amount of work remaining. */
public abstract double getWorkRemaining();
}
}

This file was deleted.

Loading

0 comments on commit a02eb60

Please sign in to comment.