Skip to content

Commit

Permalink
[hotfix] Shade avro in sql-avro jars
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Nov 25, 2020
1 parent 01d8f99 commit 9abe38e
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@

import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap;

/** Tool class used to convert from {@link RowData} to Avro {@link GenericRecord}. **/
/**
* Tool class used to convert from {@link RowData} to Avro {@link GenericRecord}.
*/
@Internal
public class RowDataToAvroConverters {

Expand All @@ -59,6 +61,13 @@ public interface RowDataToAvroConverter extends Serializable {
Object convert(Schema schema, Object object);
}

// --------------------------------------------------------------------------------
// IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
// necessary because the maven shade plugin cannot relocate classes in
// SerializedLambdas (MSHADE-260). On the other hand we want to relocate Avro for
// sql-client uber jars.
// --------------------------------------------------------------------------------

/**
* Creates a runtime converter accroding to the given logical type that converts objects
* of Flink Table & SQL internal data structures to corresponding Avro data structures.
Expand All @@ -67,13 +76,31 @@ public static RowDataToAvroConverter createConverter(LogicalType type) {
final RowDataToAvroConverter converter;
switch (type.getTypeRoot()) {
case NULL:
converter = (schema, object) -> null;
converter = new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return null;
}
};
break;
case TINYINT:
converter = (schema, object) -> ((Byte) object).intValue();
converter = new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((Byte) object).intValue();
}
};
break;
case SMALLINT:
converter = (schema, object) -> ((Short) object).intValue();
converter = new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((Short) object).intValue();
}
};
break;
case BOOLEAN: // boolean
case INTEGER: // int
Expand All @@ -84,21 +111,51 @@ public static RowDataToAvroConverter createConverter(LogicalType type) {
case DOUBLE: // double
case TIME_WITHOUT_TIME_ZONE: // int
case DATE: // int
converter = (schema, object) -> object;
converter = new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return object;
}
};
break;
case CHAR:
case VARCHAR:
converter = (schema, object) -> new Utf8(object.toString());
converter = new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return new Utf8(object.toString());
}
};
break;
case BINARY:
case VARBINARY:
converter = (schema, object) -> ByteBuffer.wrap((byte[]) object);
converter = new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ByteBuffer.wrap((byte[]) object);
}
};
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
converter = (schema, object) -> ((TimestampData) object).toInstant().toEpochMilli();
converter = new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
}
};
break;
case DECIMAL:
converter = (schema, object) -> ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
converter = new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
}
};
break;
case ARRAY:
converter = createArrayConverter((ArrayType) type);
Expand All @@ -116,28 +173,32 @@ public static RowDataToAvroConverter createConverter(LogicalType type) {
}

// wrap into nullable converter
return (schema, object) -> {
if (object == null) {
return null;
}
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
if (object == null) {
return null;
}

// get actual schema if it is a nullable schema
Schema actualSchema;
if (schema.getType() == Schema.Type.UNION) {
List<Schema> types = schema.getTypes();
int size = types.size();
if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
actualSchema = types.get(0);
} else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
actualSchema = types.get(1);
} else {
throw new IllegalArgumentException(
// get actual schema if it is a nullable schema
Schema actualSchema;
if (schema.getType() == Schema.Type.UNION) {
List<Schema> types = schema.getTypes();
int size = types.size();
if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
actualSchema = types.get(0);
} else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
actualSchema = types.get(1);
} else {
throw new IllegalArgumentException(
"The Avro schema is not a nullable type: " + schema.toString());
}
} else {
actualSchema = schema;
}
} else {
actualSchema = schema;
return converter.convert(actualSchema, object);
}
return converter.convert(actualSchema, object);
};
}

Expand All @@ -154,18 +215,22 @@ private static RowDataToAvroConverter createRowConverter(RowType rowType) {
}
final int length = rowType.getFieldCount();

return (schema, object) -> {
final RowData row = (RowData) object;
final List<Schema.Field> fields = schema.getFields();
final GenericRecord record = new GenericData.Record(schema);
for (int i = 0; i < length; ++i) {
final Schema.Field schemaField = fields.get(i);
Object avroObject = fieldConverters[i].convert(
schemaField.schema(),
fieldGetters[i].getFieldOrNull(row));
record.put(i, avroObject);
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
final RowData row = (RowData) object;
final List<Schema.Field> fields = schema.getFields();
final GenericRecord record = new GenericData.Record(schema);
for (int i = 0; i < length; ++i) {
final Schema.Field schemaField = fields.get(i);
Object avroObject = fieldConverters[i].convert(
schemaField.schema(),
fieldGetters[i].getFieldOrNull(row));
record.put(i, avroObject);
}
return record;
}
return record;
};
}

Expand All @@ -174,14 +239,20 @@ private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType)
final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType());

return (schema, object) -> {
final Schema elementSchema = schema.getElementType();
ArrayData arrayData = (ArrayData) object;
List<Object> list = new ArrayList<>();
for (int i = 0; i < arrayData.size(); ++i) {
list.add(elementConverter.convert(elementSchema, elementGetter.getElementOrNull(arrayData, i)));
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
final Schema elementSchema = schema.getElementType();
ArrayData arrayData = (ArrayData) object;
List<Object> list = new ArrayList<>();
for (int i = 0; i < arrayData.size(); ++i) {
list.add(elementConverter.convert(
elementSchema,
elementGetter.getElementOrNull(arrayData, i)));
}
return list;
}
return list;
};
}

Expand All @@ -190,18 +261,24 @@ private static RowDataToAvroConverter createMapConverter(LogicalType type) {
final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
final RowDataToAvroConverter valueConverter = createConverter(valueType);

return (schema, object) -> {
final Schema valueSchema = schema.getValueType();
final MapData mapData = (MapData) object;
final ArrayData keyArray = mapData.keyArray();
final ArrayData valueArray = mapData.valueArray();
final Map<Object, Object> map = new HashMap<>(mapData.size());
for (int i = 0; i < mapData.size(); ++i) {
final String key = keyArray.getString(i).toString();
final Object value = valueConverter.convert(valueSchema, valueGetter.getElementOrNull(valueArray, i));
map.put(key, value);
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
final Schema valueSchema = schema.getValueType();
final MapData mapData = (MapData) object;
final ArrayData keyArray = mapData.keyArray();
final ArrayData valueArray = mapData.valueArray();
final Map<Object, Object> map = new HashMap<>(mapData.size());
for (int i = 0; i < mapData.size(); ++i) {
final String key = keyArray.getString(i).toString();
final Object value = valueConverter.convert(
valueSchema,
valueGetter.getElementOrNull(valueArray, i));
map.put(key, value);
}
return map;
}
return map;
};
}
}
20 changes: 15 additions & 5 deletions flink-formats/flink-sql-avro-confluent-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,27 @@ under the License.
</includes>
</artifactSet>
<relocations combine.children="append">
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka</shadedPattern>
</relocation>
<relocation>
<pattern>io.confluent</pattern>
<shadedPattern>org.apache.flink.avro.registry.confluent.shaded.io.confluent</shadedPattern>
</relocation>
<!--The following modules must have the same shading pattern as in flink-sql-avro. It is needed to be able to put
both the flink-sql-avro & flink-sql-avro-confluent on the classpath -->
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson</shadedPattern>
<shadedPattern>org.apache.flink.avro.shaded.com.fasterxml.jackson</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.compress</pattern>
<shadedPattern>org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.commons.compress</shadedPattern>
<pattern>org.apache.avro</pattern>
<shadedPattern>org.apache.flink.avro.shaded.org.apache.avro</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.kafka</shadedPattern>
<pattern>org.apache.commons.compress</pattern>
<shadedPattern>org.apache.flink.avro.shaded.org.apache.commons.compress</shadedPattern>
</relocation>
</relocations>
<filters>
Expand Down
4 changes: 4 additions & 0 deletions flink-formats/flink-sql-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ under the License.
<pattern>org.apache.commons.compress</pattern>
<shadedPattern>org.apache.flink.avro.shaded.org.apache.commons.compress</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>org.apache.flink.avro.shaded.org.apache.avro</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down

0 comments on commit 9abe38e

Please sign in to comment.