Skip to content

Commit

Permalink
[FLINK-3632] [tableAPI] Clean up TableAPI exceptions.
Browse files Browse the repository at this point in the history
This closes apache#2015
  • Loading branch information
yjshen authored and fhueske committed May 23, 2016
1 parent 53949d1 commit 9cc6296
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TableConversions(table: Table) {
tEnv.toDataSet(table)
case _ =>
throw new TableException(
"Only tables that orginate from Scala DataSets can be converted to Scala DataSets.")
"Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ abstract class TableEnvironment(val config: TableConfig) {
case c: CaseClassTypeInfo[A] => c.getFieldNames
case p: PojoTypeInfo[A] => p.getFieldNames
case tpe =>
throw new IllegalArgumentException(
s"Type $tpe requires explicit field naming.")
throw new TableException(s"Type $tpe lacks explicit field naming")
}
val fieldIndexes = fieldNames.indices.toArray
(fieldNames, fieldIndexes)
Expand All @@ -259,50 +258,48 @@ abstract class TableEnvironment(val config: TableConfig) {
val indexedNames: Array[(Int, String)] = inputType match {
case a: AtomicType[A] =>
if (exprs.length != 1) {
throw new IllegalArgumentException("Atomic type may can only have a single field.")
throw new TableException("Table of atomic type can only have a single field.")
}
exprs.map {
case UnresolvedFieldReference(name) => (0, name)
case _ => throw new IllegalArgumentException(
"Field reference expression expected.")
case _ => throw new TableException("Field reference expression expected.")
}
case t: TupleTypeInfo[A] =>
exprs.zipWithIndex.map {
case (UnresolvedFieldReference(name), idx) => (idx, name)
case (Alias(UnresolvedFieldReference(origName), name), _) =>
val idx = t.getFieldIndex(origName)
if (idx < 0) {
throw new IllegalArgumentException(s"$origName is not a field of type $t")
throw new TableException(s"$origName is not a field of type $t")
}
(idx, name)
case _ => throw new IllegalArgumentException(
"Field reference expression or naming expression expected.")
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
case c: CaseClassTypeInfo[A] =>
exprs.zipWithIndex.map {
case (UnresolvedFieldReference(name), idx) => (idx, name)
case (Alias(UnresolvedFieldReference(origName), name), _) =>
val idx = c.getFieldIndex(origName)
if (idx < 0) {
throw new IllegalArgumentException(s"$origName is not a field of type $c")
throw new TableException(s"$origName is not a field of type $c")
}
(idx, name)
case _ => throw new IllegalArgumentException(
"Field reference expression or naming expression expected.")
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
case p: PojoTypeInfo[A] =>
exprs.map {
case Alias(UnresolvedFieldReference(origName), name) =>
val idx = p.getFieldIndex(origName)
if (idx < 0) {
throw new IllegalArgumentException(s"$origName is not a field of type $p")
throw new TableException(s"$origName is not a field of type $p")
}
(idx, name)
case _ => throw new IllegalArgumentException(
"Field naming expression expected.")
case _ => throw new TableException("Alias on field reference expression expected.")
}
case tpe => throw new IllegalArgumentException(
s"Type $tpe cannot be converted into Table.")
case tpe => throw new TableException(
s"Source of type $tpe cannot be converted into Table.")
}

val (fieldIndexes, fieldNames) = indexedNames.unzip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
case other => Alias(other, s"_c$i")
}
case _ => throw new IllegalArgumentException
case _ =>
throw new RuntimeException("This should never be called and probably points to a bug.")
}
}
Project(newProjectList, child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.plan.nodes.FlinkRel
import org.apache.flink.api.table.runtime.MapRunner
import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException}

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -61,7 +61,7 @@ trait DataSetRel extends RelNode with FlinkRel {
case SqlTypeName.DOUBLE => s + 8
case SqlTypeName.VARCHAR => s + 12
case SqlTypeName.CHAR => s + 1
case _ => throw new IllegalArgumentException("Unsupported data type encountered")
case _ => throw new TableException("Unsupported data type encountered")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ object FlinkRuleSets {
)

/**
* RuleSet to optimize plans for batch / DataSet execution
* RuleSet to optimize plans for stream / DataStream execution
*/
val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.flink.api.common.typeinfo.{TypeInformation, AtomicType}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.TableException
import org.apache.flink.api.table.typeutils.TypeConverter
import org.apache.flink.streaming.api.datastream.DataStream

abstract class FlinkTable[T](
val typeInfo: TypeInformation[T],
Expand All @@ -33,30 +33,30 @@ abstract class FlinkTable[T](
extends AbstractTable {

if (fieldIndexes.length != fieldNames.length) {
throw new IllegalArgumentException(
throw new TableException(
"Number of field indexes and field names must be equal.")
}

// check uniqueness of field names
if (fieldNames.length != fieldNames.toSet.size) {
throw new IllegalArgumentException(
throw new TableException(
"Table field names must be unique.")
}

val fieldTypes: Array[SqlTypeName] =
typeInfo match {
case cType: CompositeType[T] =>
if (fieldNames.length != cType.getArity) {
throw new IllegalArgumentException(
s"Arity of DataStream type (" + cType.getFieldNames.deep + ") " +
throw new TableException(
s"Arity of type (" + cType.getFieldNames.deep + ") " +
"not equal to number of field names " + fieldNames.deep + ".")
}
fieldIndexes
.map(cType.getTypeAt(_))
.map(TypeConverter.typeInfoToSqlType(_))
case aType: AtomicType[T] =>
if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
throw new IllegalArgumentException(
throw new TableException(
"Non-composite input type may have only a single field and its index must be 0.")
}
Array(TypeConverter.typeInfoToSqlType(aType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ package org.apache.flink.api.table.sources
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.TupleCsvInputFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, TupleTypeInfo}
import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
import org.apache.flink.api.table.Row
import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TupleTypeInfoBase}
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.table.{Row, TableException}
import org.apache.flink.core.fs.Path

/**
Expand Down Expand Up @@ -52,11 +52,11 @@ class CsvTableSource(
extends BatchTableSource[Tuple] {

if (fieldNames.length != fieldTypes.length) {
throw new IllegalArgumentException("Number of field names and field types must be equal.")
throw new TableException("Number of field names and field types must be equal.")
}

if (fieldNames.length > 25) {
throw new IllegalArgumentException("Only up to 25 fields supported with this CsvTableSource.")
throw new TableException("Only up to 25 fields supported with this CsvTableSource.")
}

/** Returns the data of the table as a [[DataSet]] of [[Row]]. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ abstract class TreeNode[A <: TreeNode[A]] extends Product { self: A =>
try {
defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
} catch {
case e: java.lang.IllegalArgumentException =>
throw new IllegalArgumentException(s"Fail to copy treeNode ${getClass.getName}")
case e: Throwable =>
throw new RuntimeException(
s"Fail to copy treeNode ${getClass.getName}: ${e.getStackTraceString}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo

import scala.collection.mutable.ArrayBuffer
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.{Row, TableException}

/**
* TypeInformation for [[Row]].
Expand All @@ -39,10 +39,10 @@ class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]], fieldNames: Seq[String])
{

if (fieldTypes.length != fieldNames.length) {
throw new IllegalArgumentException("Number of field types and names is different.")
throw new TableException("Number of field types and names is different.")
}
if (fieldNames.length != fieldNames.toSet.size) {
throw new IllegalArgumentException("Field names are not unique.")
throw new TableException("Field names are not unique.")
}

def this(fieldTypes: Seq[TypeInformation[_]]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.table.TableEnvironment;
import org.apache.flink.api.table.TableException;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -220,7 +221,7 @@ public void testAsFromAndToPrivateFieldPojo() throws Exception {
compareResultAsText(results, expected);
}

@Test(expected = IllegalArgumentException.class)
@Test(expected = TableException.class)
public void testAsWithToFewFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
Expand All @@ -229,7 +230,7 @@ public void testAsWithToFewFields() throws Exception {
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
}

@Test(expected = IllegalArgumentException.class)
@Test(expected = TableException.class)
public void testAsWithToManyFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
Expand All @@ -238,7 +239,7 @@ public void testAsWithToManyFields() throws Exception {
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
}

@Test(expected = IllegalArgumentException.class)
@Test(expected = TableException.class)
public void testAsWithAmbiguousFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
Expand All @@ -247,7 +248,7 @@ public void testAsWithAmbiguousFields() throws Exception {
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
}

@Test(expected = IllegalArgumentException.class)
@Test(expected = TableException.class)
public void testAsWithNonFieldReference1() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
Expand All @@ -256,7 +257,7 @@ public void testAsWithNonFieldReference1() throws Exception {
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
}

@Test(expected = IllegalArgumentException.class)
@Test(expected = TableException.class)
public void testAsWithNonFieldReference2() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.{Row, TableEnvironment}
import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
Expand Down Expand Up @@ -101,7 +101,7 @@ class ToTableITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test(expected = classOf[IllegalArgumentException])
@Test(expected = classOf[TableException])
def testToTableWithToFewFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
Expand All @@ -111,7 +111,7 @@ class ToTableITCase(
.toTable(tEnv, 'a, 'b)
}

@Test(expected = classOf[IllegalArgumentException])
@Test(expected = classOf[TableException])
def testToTableWithToManyFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
Expand All @@ -121,7 +121,7 @@ class ToTableITCase(
.toTable(tEnv, 'a, 'b, 'c, 'd)
}

@Test(expected = classOf[IllegalArgumentException])
@Test(expected = classOf[TableException])
def testToTableWithAmbiguousFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
Expand All @@ -131,7 +131,7 @@ class ToTableITCase(
.toTable(tEnv, 'a, 'b, 'b)
}

@Test(expected = classOf[IllegalArgumentException])
@Test(expected = classOf[TableException])
def testToTableWithNonFieldReference1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
Expand All @@ -141,7 +141,7 @@ class ToTableITCase(
.toTable(tEnv, 'a + 1, 'b, 'c)
}

@Test(expected = classOf[IllegalArgumentException])
@Test(expected = classOf[TableException])
def testToTableWithNonFieldReference2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.api.scala.stream.table

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.stream.utils.{StreamTestData, StreamITCase}
import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.{Row, TableEnvironment}
import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.junit.Assert._
Expand Down Expand Up @@ -108,7 +108,7 @@ class SelectITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

@Test(expected = classOf[IllegalArgumentException])
@Test(expected = classOf[TableException])
def testAsWithToFewFields(): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
Expand All @@ -124,7 +124,7 @@ class SelectITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

@Test(expected = classOf[IllegalArgumentException])
@Test(expected = classOf[TableException])
def testAsWithToManyFields(): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
Expand All @@ -140,7 +140,7 @@ class SelectITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

@Test(expected = classOf[IllegalArgumentException])
@Test(expected = classOf[TableException])
def testAsWithAmbiguousFields(): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
Expand All @@ -157,7 +157,7 @@ class SelectITCase extends StreamingMultipleProgramsTestBase {
}


@Test(expected = classOf[IllegalArgumentException])
@Test(expected = classOf[TableException])
def testOnlyFieldRefInAs(): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
Expand Down

0 comments on commit 9cc6296

Please sign in to comment.