Skip to content

Commit

Permalink
[FLINK-28416][table] Add (Async)LookupFunction and providers in repla…
Browse files Browse the repository at this point in the history
…ce of (Async)TableFunction as the API for lookup table (apache#20177)
  • Loading branch information
PatrickRen authored Jul 27, 2022
1 parent aef75be commit 3e2620b
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.connector.source;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
Expand All @@ -36,8 +37,12 @@
* similar to other {@link UserDefinedFunction}s. However, for convenience, in a {@link
* LookupTableSource} the output type can simply be a {@link Row} or {@link RowData} in which case
* the input and output types are derived from the table's schema with default conversion.
*
* @deprecated Please use {@link AsyncLookupFunctionProvider} to implement asynchronous lookup
* table.
*/
@PublicEvolving
@Deprecated
public interface AsyncTableFunctionProvider<T> extends LookupTableSource.LookupRuntimeProvider {

/** Helper method for creating a static provider. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.table.connector.source;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.types.RowKind;

import java.io.Serializable;
Expand Down Expand Up @@ -56,8 +58,8 @@ public interface LookupTableSource extends DynamicTableSource {
* <p>The given {@link LookupContext} offers utilities by the planner for creating runtime
* implementation with minimal dependencies to internal data structures.
*
* @see TableFunctionProvider
* @see AsyncTableFunctionProvider
* @see LookupFunctionProvider
* @see AsyncLookupFunctionProvider
*/
LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);

Expand Down Expand Up @@ -97,8 +99,8 @@ interface LookupContext extends DynamicTableSource.Context {
* <p>There exist different interfaces for runtime implementation which is why {@link
* LookupRuntimeProvider} serves as the base interface.
*
* @see TableFunctionProvider
* @see AsyncTableFunctionProvider
* @see LookupFunctionProvider
* @see AsyncLookupFunctionProvider
*/
@PublicEvolving
interface LookupRuntimeProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.connector.source;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
Expand All @@ -36,8 +37,11 @@
* similar to other {@link UserDefinedFunction}s. However, for convenience, in a {@link
* LookupTableSource} the output type can simply be a {@link Row} or {@link RowData} in which case
* the input and output types are derived from the table's schema with default conversion.
*
* @deprecated Please use {@link LookupFunctionProvider} to implement synchronous lookup table.
*/
@PublicEvolving
@Deprecated
public interface TableFunctionProvider<T> extends LookupTableSource.LookupRuntimeProvider {

/** Helper method for creating a static provider. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.connector.source.lookup;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.functions.AsyncLookupFunction;

/** A provider for creating {@link AsyncLookupFunction}. */
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

/** Helper function for creating a static provider. */
static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
return () -> asyncLookupFunction;
}

/** Creates an {@link AsyncLookupFunction} instance. */
AsyncLookupFunction createAsyncLookupFunction();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.connector.source.lookup;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.functions.LookupFunction;

/** A provider for creating {@link LookupFunction}. */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

/** Helper function for creating a static provider. */
static LookupFunctionProvider of(LookupFunction lookupFunction) {
return () -> lookupFunction;
}

/** Creates an {@link LookupFunction} instance. */
LookupFunction createLookupFunction();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
* A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
* keys from external system.
*
* <p>The output type of this table function is fixed as {@link RowData}.
*/
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

/**
* Asynchronously lookup rows matching the lookup keys.
*
* @param keyRow - A {@link RowData} that wraps lookup keys.
* @return A collection of all matching rows in the lookup table.
*/
public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);

/** Invokes {@link #asyncLookup} and chains futures. */
public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
GenericRowData keyRow = GenericRowData.of(keys);
asyncLookup(keyRow)
.whenCompleteAsync(
(result, exception) -> {
if (exception != null) {
future.completeExceptionally(
new TableException(
String.format(
"Failed to asynchronously lookup entries with key '%s'",
keyRow),
exception));
return;
}
future.complete(result);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;

import java.io.IOException;
import java.util.Collection;

/**
* A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
* from external system.
*
* <p>The output type of this table function is fixed as {@link RowData}.
*/
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

/**
* Synchronously lookup rows matching the lookup keys.
*
* @param keyRow - A {@link RowData} that wraps lookup keys.
* @return A collection of all matching rows in the lookup table.
*/
public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;

/** Invoke {@link #lookup} and handle exceptions. */
public final void eval(Object... keys) {
GenericRowData keyRow = GenericRowData.of(keys);
try {
Collection<RowData> lookup = lookup(keyRow);
if (lookup == null) {
return;
}
lookup.forEach(this::collect);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to lookup values with given key row '%s'", keyRow), e);
}
}
}

0 comments on commit 3e2620b

Please sign in to comment.