Skip to content

Commit

Permalink
[FLINK-22358][connector base] Add stability annotations to connector …
Browse files Browse the repository at this point in the history
…base and iterator sources.
  • Loading branch information
StephanEwen committed Sep 9, 2021
1 parent d666d2a commit c28dd94
Show file tree
Hide file tree
Showing 17 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;

Expand All @@ -28,6 +29,7 @@
* @param <T> the type of records that are eventually emitted to the {@link SourceOutput}.
* @param <SplitStateT> the mutable type of split state.
*/
@PublicEvolving
public interface RecordEmitter<E, T, SplitStateT> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceSplit;

import javax.annotation.Nullable;
Expand All @@ -34,6 +35,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/** An implementation of RecordsWithSplitIds to host all the records by splits. */
@PublicEvolving
public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {

private final Set<String> finishedSplits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.flink.connector.base.source.reader;

import org.apache.flink.annotation.PublicEvolving;

import javax.annotation.Nullable;

import java.util.Set;

/** An interface for the elements passed from the fetchers to the source reader. */
@PublicEvolving
public interface RecordsWithSplitIds<E> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
Expand Down Expand Up @@ -55,6 +56,7 @@
* @param <SplitT> The type of the splits processed by the source.
* @param <SplitStateT> The type of the mutable state per split.
*/
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<
E, T, SplitT extends SourceSplit, SplitStateT>
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
Expand Down Expand Up @@ -63,6 +64,7 @@
* @param <SplitT> the immutable split type.
* @param <SplitStateT> the mutable type of split state.
*/
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {
private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.flink.connector.base.source.reader;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;

/** The options that can be set for the {@link SourceReaderBase}. */
@PublicEvolving
public class SourceReaderOptions {

public static final ConfigOption<Long> SOURCE_READER_CLOSE_TIMEOUT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader.fetcher;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
Expand All @@ -26,6 +27,7 @@
import java.util.Map;

/** The task to add splits. */
@Internal
class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {

private final SplitReader<?, SplitT> splitReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader.fetcher;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
Expand All @@ -28,6 +29,7 @@
import java.util.function.Consumer;

/** The default fetch task that fetches the records into the element queue. */
@Internal
class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
private final SplitReader<E, SplitT> splitReader;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader.fetcher;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
Expand All @@ -39,6 +40,7 @@
* via the same client. In the example of the file source, there is a single thread that reads the
* files after another.
*/
@Internal
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
extends SplitFetcherManager<E, SplitT> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader.fetcher;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
Expand All @@ -38,6 +39,7 @@
import java.util.function.Consumer;

/** The internal fetcher runnable responsible for polling message from the external system. */
@Internal
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader.fetcher;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
Expand Down Expand Up @@ -50,6 +51,7 @@
* manager would only start a single fetcher and assign all the splits to it. A one-thread-per-split
* fetcher may spawn a new thread every time a new split is assigned.
*/
@Internal
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
private static final Logger LOG = LoggerFactory.getLogger(SplitFetcherManager.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

package org.apache.flink.connector.base.source.reader.fetcher;

import org.apache.flink.annotation.Internal;

import java.io.IOException;

/** An interface similar to {@link Runnable} but allows throwing exceptions and wakeup. */
@Internal
public interface SplitFetcherTask {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader.splitreader;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;

Expand All @@ -30,6 +31,7 @@
* @param <E> the element type.
* @param <SplitT> the split type.
*/
@PublicEvolving
public interface SplitReader<E, SplitT extends SourceSplit> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

package org.apache.flink.connector.base.source.reader.splitreader;

import org.apache.flink.annotation.PublicEvolving;

import java.util.List;

/**
* A change to add splits.
*
* @param <SplitT> the split type.
*/
@PublicEvolving
public class SplitsAddition<SplitT> extends SplitsChange<SplitT> {

public SplitsAddition(List<SplitT> splits) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.flink.connector.base.source.reader.splitreader;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Collections;
import java.util.List;

/** An abstract class to host splits change. */
@PublicEvolving
public abstract class SplitsChange<SplitT> {
private final List<SplitT> splits;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader.synchronization;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FlinkRuntimeException;

Expand Down Expand Up @@ -69,6 +70,7 @@
*
* @param <T> the type of the elements in the queue.
*/
@Internal
public class FutureCompletingBlockingQueue<T> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one

package org.apache.flink.connector.base.source.utils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;

Expand All @@ -32,6 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.util.function.Function;

/** A util class with some helper method for serde in the sources. */
@Internal
public class SerdeUtils {

/** Private constructor for util class. */
Expand Down

0 comments on commit c28dd94

Please sign in to comment.