Skip to content

Commit

Permalink
Bump calcite version to 1.15.0 (apache#4692)
Browse files Browse the repository at this point in the history
  • Loading branch information
apilloud authored and xumingmin committed Feb 16, 2018
1 parent f4ee8a1 commit bb8c12c
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 41 deletions.
2 changes: 1 addition & 1 deletion sdks/java/extensions/sql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ configurations {
}


def calcite_version = "1.13.0"
def calcite_version = "1.15.0"
def avatica_version = "1.10.0"

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/extensions/sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<properties>
<timestamp>${maven.build.timestamp}</timestamp>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
<calcite.version>1.13.0</calcite.version>
<calcite.version>1.15.0</calcite.version>
<avatica.version>1.10.0</avatica.version>
<mockito.version>1.9.5</mockito.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.sdk.values.RowType;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.calcite.DataContext;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
Expand All @@ -49,6 +50,8 @@
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.Frameworks;

/**
Expand Down Expand Up @@ -190,6 +193,16 @@ public Statistic getStatistic() {
public Schema.TableType getJdbcTableType() {
return Schema.TableType.TABLE;
}

@Override public boolean isRolledUp(String column) {
return false;
}

@Override public boolean rolledUpColumnValidInsideAgg(String column,
SqlCall call, SqlNode parent,
CalciteConnectionConfig config) {
return false;
}
}

public BeamQueryPlanner getPlanner() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@

package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;

import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand All @@ -44,55 +41,51 @@
* <li>DAYOFYEAR(date) =&gt; EXTRACT(DOY FROM date)</li>
* <li>DAYOFMONTH(date) =&gt; EXTRACT(DAY FROM date)</li>
* <li>DAYOFWEEK(date) =&gt; EXTRACT(DOW FROM date)</li>
* <li>HOUR(date) =&gt; EXTRACT(HOUR FROM date)</li>
* <li>MINUTE(date) =&gt; EXTRACT(MINUTE FROM date)</li>
* <li>SECOND(date) =&gt; EXTRACT(SECOND FROM date)</li>
* </ul>
*/
public class BeamSqlExtractExpression extends BeamSqlExpression {
private static final Map<TimeUnitRange, Integer> typeMapping = new HashMap<>();
static {
typeMapping.put(TimeUnitRange.DOW, Calendar.DAY_OF_WEEK);
typeMapping.put(TimeUnitRange.DOY, Calendar.DAY_OF_YEAR);
typeMapping.put(TimeUnitRange.WEEK, Calendar.WEEK_OF_YEAR);
}

public BeamSqlExtractExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.BIGINT);
}
@Override public boolean accept() {
return operands.size() == 2
&& opType(1) == SqlTypeName.BIGINT;
&& opType(1) == SqlTypeName.TIMESTAMP;
}

@Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
Long time = opValueEvaluated(1, inputRow, window);
Date time = opValueEvaluated(1, inputRow, window);

TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();

switch (unit) {
case YEAR:
case QUARTER:
case MONTH:
case DAY:
Long timeByDay = time / 1000 / 3600 / 24;
case DOW:
case WEEK:
case DOY:
case CENTURY:
case MILLENNIUM:
Long timeByDay = time.getTime() / DateTimeUtils.MILLIS_PER_DAY;
Long extracted = DateTimeUtils.unixDateExtract(
unit,
timeByDay
);
return BeamSqlPrimitive.of(outputType, extracted);

case DOY:
case DOW:
case WEEK:
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date(time));
return BeamSqlPrimitive.of(outputType, (long) calendar.get(typeMapping.get(unit)));

case QUARTER:
calendar = Calendar.getInstance();
calendar.setTime(new Date(time));
long ret = calendar.get(Calendar.MONTH) / 3;
if (ret * 3 < calendar.get(Calendar.MONTH)) {
ret += 1;
}
return BeamSqlPrimitive.of(outputType, ret);
case HOUR:
case MINUTE:
case SECOND:
int timeInDay = (int) (time.getTime() % DateTimeUtils.MILLIS_PER_DAY);
extracted = (long) DateTimeUtils.unixTimeExtract(
unit,
timeInDay
);
return BeamSqlPrimitive.of(outputType, extracted);

default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public BeamQueryPlanner(SchemaPlus schema) {
sqlOperatorTables.add(SqlStdOperatorTable.instance());
sqlOperatorTables.add(
new CalciteCatalogReader(
CalciteSchema.from(schema), false, Collections.emptyList(), TYPE_FACTORY));
CalciteSchema.from(schema), Collections.emptyList(), TYPE_FACTORY, null));

FrameworkConfig config = Frameworks.newConfigBuilder()
.parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
Expand All @@ -35,11 +36,11 @@
public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase {
@Test public void evaluate() throws Exception {
List<BeamSqlExpression> operands = new ArrayList<>();
long time = str2LongTime("2017-05-22 16:17:18");
Date time = str2DateTime("2017-05-22 16:17:18");

// YEAR
operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
time));
assertEquals(2017L,
new BeamSqlExtractExpression(operands)
Expand All @@ -48,7 +49,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
// MONTH
operands.clear();
operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
time));
assertEquals(5L,
new BeamSqlExtractExpression(operands)
Expand All @@ -57,7 +58,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
// DAY
operands.clear();
operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DAY));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
time));
assertEquals(22L,
new BeamSqlExtractExpression(operands)
Expand All @@ -66,7 +67,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
// DAY_OF_WEEK
operands.clear();
operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DOW));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
time));
assertEquals(2L,
new BeamSqlExtractExpression(operands)
Expand All @@ -75,7 +76,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
// DAY_OF_YEAR
operands.clear();
operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DOY));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
time));
assertEquals(142L,
new BeamSqlExtractExpression(operands)
Expand All @@ -84,7 +85,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
// WEEK
operands.clear();
operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.WEEK));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
time));
assertEquals(21L,
new BeamSqlExtractExpression(operands)
Expand All @@ -93,7 +94,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
// QUARTER
operands.clear();
operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.QUARTER));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
time));
assertEquals(2L,
new BeamSqlExtractExpression(operands)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.tools.ValidationException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -243,7 +244,7 @@ public void testOrderBy_bigFetch() throws Exception {
pipeline.run().waitUntilFinish();
}

@Test(expected = UnsupportedOperationException.class)
@Test(expected = ValidationException.class)
public void testOrderBy_exception() throws Exception {
String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id) SELECT "
+ " order_id, COUNT(*) "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private void prepareAggregationCalls() {
aggCalls = new ArrayList<>();
aggCalls.add(
new AggregateCall(
new SqlCountAggFunction(),
new SqlCountAggFunction("COUNT"),
false,
Arrays.asList(),
new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
Expand Down

0 comments on commit bb8c12c

Please sign in to comment.