Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Katas - Convert task description from HTML to Markdown #11736

Merged
merged 42 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
dce14f8
Support ZetaSQL DATE type as a Beam LogicalType
robinyqiu Mar 24, 2020
401f213
[BEAM-6733] Add pipeline option to flush bundle data before checkpoin…
mxm May 11, 2020
f4a0f66
Remove all answer placeholder checks as they can be confusing at time…
henryken May 16, 2020
849721f
Update course in Stepik
henryken May 16, 2020
fc5c981
[BEAM-10018] Fix timestamps in windowing kata
iht May 16, 2020
db5004c
[BEAM-10018] Kata failing due to failed parsing
iht May 16, 2020
af2d850
Convert html task description to md for "Hello Beam" and "Core Transf…
henryken May 17, 2020
f214352
Remove unused import
iht May 17, 2020
b18ea2a
Add missing dependency
iht May 17, 2020
80bc613
Fix member variable name in Kata documentation
iht May 17, 2020
45a0b85
Fix placeholder location
iht May 17, 2020
ab42e55
Convert html task description to md for "Core Transforms" remaining l…
henryken May 17, 2020
ee4a44e
Convert html task description to md for "Common Transforms" lessons
henryken May 17, 2020
5ea0940
Convert html task description to md for remaining Python Katas lessons
henryken May 17, 2020
d5606be
Convert html task description to md for most of Java Katas lessons
henryken May 17, 2020
f9ae024
Convert html task description to md for Java Katas "Common Transforms…
henryken May 17, 2020
6c73dbe
Convert html task description to md for Java Katas "Core Transforms" …
henryken May 17, 2020
d773f8c
[BEAM-2530] Implement Zeta SQL precommit compile tests and run on jav…
pawelpasterz May 18, 2020
7c80ecb
Merge pull request #11678: [BEAM-6733] Add pipeline option to flush b…
mxm May 18, 2020
64414b8
Python3 fix - convert dict.keys() to list before indexing (#11733)
chamikaramj May 18, 2020
1aa715c
Updates google-apitools and httplib2 (#11726)
tvalentyn May 18, 2020
1f21a4c
Merge pull request #11731 from [BEAM-10018] Fix timestamps in two win…
pabloem May 18, 2020
ddf2927
Merge pull request #11730 from henryken/katas-python-remove-answer-pl…
pabloem May 18, 2020
de9177e
[BEAM-9964] Update CHANGES.md (#11743)
omarismail94 May 18, 2020
47c246b
Merge pull request #11272: [BEAM-9641] Support ZetaSQL DATE type as a…
apilloud May 18, 2020
76fbe45
[BEAM-9577] Artifact v2 support for uber jars. (#11708)
robertwb May 18, 2020
7c81b93
Populate all SpannerIO batching parameters in display data.
nielm Apr 26, 2020
9ded9e2
Fix capitalization, clarify descriptions
TheNeuralBit May 14, 2020
192e9ad
fix capitalization, clarify description Grouped
TheNeuralBit May 14, 2020
30a68f5
Refactor to extract single method for popuplating displayData
nielm May 18, 2020
decd50a
[BEAM-9821] Populate all SpannerIO batching parameters in display dat…
TheNeuralBit May 19, 2020
c89f188
Convert html task description to md for "Hello Beam" and "Core Transf…
henryken May 17, 2020
3f5a48c
Convert html task description to md for "Core Transforms" remaining l…
henryken May 17, 2020
714d82f
Convert html task description to md for "Common Transforms" lessons
henryken May 17, 2020
1d55f6f
Convert html task description to md for remaining Python Katas lessons
henryken May 17, 2020
1a5afa1
Convert html task description to md for most of Java Katas lessons
henryken May 17, 2020
23f419f
Convert html task description to md for Java Katas "Common Transforms…
henryken May 17, 2020
353abda
Convert html task description to md for Java Katas "Core Transforms" …
henryken May 17, 2020
b739cb7
Resolve merge conflict
henryken May 19, 2020
2c0c4c9
Merge remote-tracking branch 'origin/katas-convert-html-desc-to-md' i…
henryken May 19, 2020
80a490e
Update Python Katas on Stepik
henryken May 19, 2020
bfcf1b4
Update Beam Katas Java on Stepik
henryken May 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Support ZetaSQL DATE type as a Beam LogicalType
  • Loading branch information
robinyqiu committed May 14, 2020
commit dce14f8b5fd15cacb68e56b43107bdcb068c2814
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.logicaltypes;

import java.time.LocalDate;
import org.apache.beam.sdk.schemas.Schema;

/**
* A date without a time-zone.
*
* <p>It cannot represent an instant on the time-line without additional information such as an
* offset or time-zone.
*
* <p>Its input type is a {@link LocalDate}, and base type is a {@link Long} that represents a
* incrementing count of days where day 0 is 1970-01-01 (ISO).
*/
public class Date implements Schema.LogicalType<LocalDate, Long> {

@Override
public String getIdentifier() {
return "beam:logical_type:date:v1";
}

// unused
@Override
public Schema.FieldType getArgumentType() {
return Schema.FieldType.STRING;
}

// unused
@Override
public String getArgument() {
return "";
}

@Override
public Schema.FieldType getBaseType() {
return Schema.FieldType.INT64;
}

@Override
public Long toBaseType(LocalDate input) {
return input == null ? null : input.toEpochDay();
}

@Override
public LocalDate toInputType(Long base) {
return base == null ? null : LocalDate.ofEpochDay(base);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.logicaltypes;

import java.time.LocalDate;
import org.apache.beam.sdk.schemas.Schema.LogicalType;

/** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType}s corresponding to SQL data types. */
public class SqlTypes {

private SqlTypes() {}

/** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATE type. */
public static final LogicalType<LocalDate, Long> DATE = new Date();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Strings;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;

Expand All @@ -38,7 +39,7 @@ class SchemaUtils {
ImmutableMap.<String, FieldType>builder()
.put("BOOL", FieldType.BOOLEAN)
.put("BYTES", FieldType.BYTES)
.put("DATE", FieldType.logicalType(new CalciteUtils.DateType()))
.put("DATE", FieldType.logicalType(SqlTypes.DATE))
.put("DATETIME", FieldType.DATETIME)
.put("DOUBLE", FieldType.DOUBLE)
.put("FLOAT", FieldType.DOUBLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import static org.apache.beam.sdk.schemas.Schema.FieldType;
import static org.apache.beam.sdk.schemas.Schema.TypeName;
import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.Arrays;
Expand All @@ -39,11 +39,11 @@
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimestampWithLocalTzType;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -315,7 +315,7 @@ private static Expression castOutput(Expression value, FieldType toType) {
private static Expression castOutputTime(Expression value, FieldType toType) {
Expression valueDateTime = value;

// First, convert to millis
// First, convert to millis (except for DATE type)
if (CalciteUtils.TIMESTAMP.typesEqual(toType)
|| CalciteUtils.NULLABLE_TIMESTAMP.typesEqual(toType)) {
if (value.getType() == java.sql.Timestamp.class) {
Expand All @@ -331,13 +331,16 @@ private static Expression castOutputTime(Expression value, FieldType toType) {
if (value.getType() == java.sql.Date.class) {
valueDateTime = Expressions.call(BuiltInMethod.DATE_TO_INT.method, valueDateTime);
}
valueDateTime = Expressions.multiply(valueDateTime, Expressions.constant(MILLIS_PER_DAY));
} else {
throw new UnsupportedOperationException("Unknown DateTime type " + toType);
}

// Second, convert to joda Instant
valueDateTime = Expressions.new_(Instant.class, valueDateTime);
// Second, convert to joda Instant (or LocalDate for DATE type)
if (CalciteUtils.DATE.typesEqual(toType) || CalciteUtils.NULLABLE_DATE.typesEqual(toType)) {
valueDateTime = Expressions.call(LocalDate.class, "ofEpochDay", valueDateTime);
} else {
valueDateTime = Expressions.new_(Instant.class, valueDateTime);
}

// Third, make conversion conditional on non-null input.
if (!((Class) value.getType()).isPrimitive()) {
Expand Down Expand Up @@ -371,9 +374,9 @@ private static class InputGetterImpl implements RexToLixTranslator.InputGetter {
.put(TypeName.ROW, Row.class)
.build();

private static final Map<String, Class> LOGICAL_TYPE_CONVERSION_MAP =
private static final Map<String, Class> LOGICAL_TYPE_TO_BASE_TYPE_MAP =
ImmutableMap.<String, Class>builder()
.put(DateType.IDENTIFIER, ReadableInstant.class)
.put(SqlTypes.DATE.getIdentifier(), Long.class)
.put(TimeType.IDENTIFIER, ReadableInstant.class)
.put(TimeWithLocalTzType.IDENTIFIER, ReadableInstant.class)
.put(TimestampWithLocalTzType.IDENTIFIER, ReadableInstant.class)
Expand Down Expand Up @@ -406,7 +409,7 @@ private static Expression value(
if (storageType == Object.class) {
convertTo = Object.class;
} else if (fromType.getTypeName().isLogicalType()) {
convertTo = LOGICAL_TYPE_CONVERSION_MAP.get(fromType.getLogicalType().getIdentifier());
convertTo = LOGICAL_TYPE_TO_BASE_TYPE_MAP.get(fromType.getLogicalType().getIdentifier());
} else {
convertTo = TYPE_CONVERSION_MAP.get(fromType.getTypeName());
}
Expand All @@ -427,18 +430,13 @@ private static Expression value(

private static Expression value(Expression value, Schema.FieldType type) {
if (type.getTypeName().isLogicalType()) {
Expression millisField = Expressions.call(value, "getMillis");
String logicalId = type.getLogicalType().getIdentifier();
if (logicalId.equals(TimeType.IDENTIFIER)) {
return nullOr(value, Expressions.convert_(millisField, int.class));
} else if (logicalId.equals(DateType.IDENTIFIER)) {
value =
nullOr(
value,
Expressions.convert_(
Expressions.divide(millisField, Expressions.constant(MILLIS_PER_DAY)),
int.class));
} else if (!logicalId.equals(CharType.IDENTIFIER)) {
if (TimeType.IDENTIFIER.equals(logicalId)) {
return nullOr(
value, Expressions.convert_(Expressions.call(value, "getMillis"), int.class));
} else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
value = nullOr(value, value);
} else if (!CharType.IDENTIFIER.equals(logicalId)) {
throw new UnsupportedOperationException(
"Unknown LogicalType " + type.getLogicalType().getIdentifier());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;

import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY;

import java.io.IOException;
import java.time.LocalDate;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -36,7 +36,6 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeType;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.Counter;
Expand All @@ -50,6 +49,7 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -303,11 +303,15 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
switch (type.getTypeName()) {
case LOGICAL_TYPE:
String logicalId = type.getLogicalType().getIdentifier();
if (logicalId.equals(TimeType.IDENTIFIER)) {
if (TimeType.IDENTIFIER.equals(logicalId)) {
return (int) ((ReadableInstant) beamValue).getMillis();
} else if (logicalId.equals(DateType.IDENTIFIER)) {
return (int) (((ReadableInstant) beamValue).getMillis() / MILLIS_PER_DAY);
} else if (logicalId.equals(CharType.IDENTIFIER)) {
} else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
if (beamValue instanceof Long) { // base type
return ((Long) beamValue).intValue();
} else { // input type
return (int) (((LocalDate) beamValue).toEpochDay());
}
} else if (CharType.IDENTIFIER.equals(logicalId)) {
return beamValue;
} else {
throw new UnsupportedOperationException("Unknown DateTime type " + logicalId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.io.IOException;
import java.io.StringWriter;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
Expand Down Expand Up @@ -114,8 +117,17 @@ public static Object autoCastField(Schema.Field field, Object rawObj) {
} else {
return rawObj;
}
} else if (CalciteUtils.DATE.typesEqual(type) || CalciteUtils.NULLABLE_DATE.typesEqual(type)) {
if (rawObj instanceof GregorianCalendar) { // used by the SQL CLI
GregorianCalendar calendar = (GregorianCalendar) rawObj;
return Instant.ofEpochMilli(calendar.getTimeInMillis())
.atZone(calendar.getTimeZone().toZoneId())
.toLocalDate();
} else {
return LocalDate.ofEpochDay((Integer) rawObj);
}
} else if (CalciteUtils.isDateTimeType(type)) {
// Internal representation of DateType in Calcite is convertible to Joda's Datetime.
// Internal representation of Date in Calcite is convertible to Joda's Datetime.
return new DateTime(rawObj);
} else if (type.getTypeName().isNumericType()
&& ((rawObj instanceof String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
Expand All @@ -43,15 +44,6 @@ public class CalciteUtils {
// SQL has schema types that do not directly correspond to Beam Schema types. We define
// LogicalTypes to represent each of these types.

/** A LogicalType corresponding to DATE. */
public static class DateType extends PassThroughLogicalType<Instant> {
public static final String IDENTIFIER = "SqlDateType";

public DateType() {
super(IDENTIFIER, FieldType.STRING, "", FieldType.DATETIME);
}
}

/** A LogicalType corresponding to TIME. */
public static class TimeType extends PassThroughLogicalType<Instant> {
public static final String IDENTIFIER = "SqlTimeType";
Expand Down Expand Up @@ -96,7 +88,7 @@ public static boolean isDateTimeType(FieldType fieldType) {

if (fieldType.getTypeName().isLogicalType()) {
String logicalId = fieldType.getLogicalType().getIdentifier();
return logicalId.equals(DateType.IDENTIFIER)
return logicalId.equals(SqlTypes.DATE.getIdentifier())
|| logicalId.equals(TimeType.IDENTIFIER)
|| logicalId.equals(TimeWithLocalTzType.IDENTIFIER)
|| logicalId.equals(TimestampWithLocalTzType.IDENTIFIER);
Expand Down Expand Up @@ -128,9 +120,9 @@ public static boolean isStringType(FieldType fieldType) {
public static final FieldType VARBINARY = FieldType.BYTES;
public static final FieldType VARCHAR = FieldType.STRING;
public static final FieldType CHAR = FieldType.logicalType(new CharType());
public static final FieldType DATE = FieldType.logicalType(new DateType());
public static final FieldType DATE = FieldType.logicalType(SqlTypes.DATE);
public static final FieldType NULLABLE_DATE =
FieldType.logicalType(new DateType()).withNullable(true);
FieldType.logicalType(SqlTypes.DATE).withNullable(true);
public static final FieldType TIME = FieldType.logicalType(new TimeType());
public static final FieldType NULLABLE_TIME =
FieldType.logicalType(new TimeType()).withNullable(true);
Expand Down Expand Up @@ -205,12 +197,16 @@ public static SqlTypeName toSqlTypeName(FieldType type) {
return SqlTypeName.MAP;
default:
SqlTypeName typeName = BEAM_TO_CALCITE_TYPE_MAPPING.get(type.withNullable(false));
if (typeName != null) {
return typeName;
} else {
if (typeName == null) {
// This will happen e.g. if looking up a STRING type, and metadata isn't set to say which
// type of SQL string we want. In this case, use the default mapping.
return BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type);
typeName = BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type);
}
if (typeName == null) {
throw new IllegalArgumentException(
String.format("Cannot find a matching Calcite SqlTypeName for Beam type: %s", type));
} else {
return typeName;
}
}
}
Expand Down
Loading