Skip to content

Commit

Permalink
[FLINK-18938][table-api] Throw better exception message for querying …
Browse files Browse the repository at this point in the history
…sink only or source only connector

This closes apache#13214
  • Loading branch information
pyscala committed Nov 10, 2020
1 parent d6f53f1 commit ea88795
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,41 @@ private static <T extends DynamicTableFactory> T getDynamicTableFactory(
"Table options do not contain an option key '%s' for discovering a connector.",
CONNECTOR.key()));
}
final DynamicTableFactory factory;
try {
return discoverFactory(context.getClassLoader(), factoryClass, connectorOption);
factory = discoverFactory(context.getClassLoader(), DynamicTableFactory.class, connectorOption);
} catch (ValidationException e) {
throw new ValidationException(
String.format(
"Cannot discover a connector using option '%s'.",
stringifyOption(CONNECTOR.key(), connectorOption)),
e);
}

if (factoryClass.isAssignableFrom(factory.getClass())) {
return (T) factory;
} else {
final Class<?> sourceFactoryClass = DynamicTableSourceFactory.class;
final Class<?> sinkFactoryClass = DynamicTableSinkFactory.class;
// for a better exception message
if (sourceFactoryClass.equals(factoryClass) && sinkFactoryClass.isAssignableFrom(factory.getClass())) {
// discovering source, but not found, and this is a sink connector.
throw new ValidationException(String.format(
"Connector '%s' only supports to be used as sink, can't be used as source.",
connectorOption));
} else if (sinkFactoryClass.equals(factoryClass) && sourceFactoryClass.isAssignableFrom(factory.getClass())) {
// discovering sink, but not found, and this is a a source connector.
throw new ValidationException(String.format(
"Connector '%s' only supports to be used as source, can't be used as sink.",
connectorOption));
} else {
throw new ValidationException(String.format(
"Connector '%s' should at least implements '%s' or '%s' interface.",
connectorOption,
sourceFactoryClass.getName(),
sinkFactoryClass.getName()));
}
}
}

private static List<Factory> discoverFactories(ClassLoader classLoader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -43,6 +44,8 @@

import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/**
* Tests for {@link FactoryUtil}.
Expand All @@ -62,9 +65,9 @@ public void testMissingConnector() {
public void testInvalidConnector() {
expectError(
"Could not find any factory for identifier 'FAIL' that implements '" +
DynamicTableSourceFactory.class.getName() + "' in the classpath.\n\n" +
DynamicTableFactory.class.getName() + "' in the classpath.\n\n" +
"Available factory identifiers are:\n\n" +
"test-connector");
"sink-only\nsource-only\ntest-connector");
testError(options -> options.put("connector", "FAIL"));
}

Expand Down Expand Up @@ -207,6 +210,25 @@ public void testAlternativeValueFormat() {
assertEquals(expectedSink, actualSink);
}

@Test
public void testAvailableFactoryTipsDependencyJarForConnector() {
try {
createTableSource(Collections.singletonMap("connector", "sink-only"));
fail();
} catch (Exception e) {
String errorMsg = "Connector 'sink-only' only supports to be used as sink, can't be used as source.";
assertThat(e, containsCause(new ValidationException(errorMsg)));
}

try {
createTableSink(Collections.singletonMap("connector", "source-only"));
fail();
} catch (Exception e) {
String errorMsg = "Connector 'source-only' only supports to be used as source, can't be used as sink.";
assertThat(e, containsCause(new ValidationException(errorMsg)));
}
}

// --------------------------------------------------------------------------------------------

private void expectError(String message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.factories;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.connector.sink.DynamicTableSink;

import java.util.Collections;
import java.util.Set;

/**
* Test implementations for {@link DynamicTableSinkFactory} which only supports to be used as sink.
*/
public final class TestDynamicTableSinkOnlyFactory implements DynamicTableSinkFactory {

public static final String IDENTIFIER = "sink-only";

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
return null;
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.factories;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.connector.source.DynamicTableSource;

import java.util.Collections;
import java.util.Set;

/**
* Test implementations for {@link DynamicTableSourceFactory} which only supports to be used as source.
*/
public final class TestDynamicTableSourceOnlyFactory implements DynamicTableSourceFactory {

public static final String IDENTIFIER = "source-only";

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
return null;
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@

org.apache.flink.table.factories.TestDynamicTableFactory
org.apache.flink.table.factories.TestFormatFactory
org.apache.flink.table.factories.TestDynamicTableSinkOnlyFactory
org.apache.flink.table.factories.TestDynamicTableSourceOnlyFactory

0 comments on commit ea88795

Please sign in to comment.