From f3a2197a23524048200ae2b4712d6ed833208124 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Fri, 3 Nov 2017 14:47:33 +0100 Subject: [PATCH] [FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State This falls back to the original serializer (Pojo / Kryo) in cases where an old snapshot is resumed. --- .../api/java/typeutils/PojoTypeInfo.java | 6 +- .../api/java/typeutils/AvroTypeInfo.java | 2 +- .../typeutils/runtime/AvroSerializer.java | 337 +++++++++++++++ .../avro/typeutils/AvroSerializer.java | 382 ++++++++++-------- .../formats/avro/typeutils/AvroTypeInfo.java | 91 +++-- .../BackwardsCompatibleAvroSerializer.java | 218 ++++++++++ .../avro/typeutils/AvroSerializerTest.java | 59 +++ .../typeutils/AvroTypeExtractionTest.java | 14 +- .../avro/typeutils/AvroTypeInfoTest.java | 12 + ...BackwardsCompatibleAvroSerializerTest.java | 167 ++++++++ .../formats/avro/utils/TestDataGenerator.java | 120 ++++++ .../flink-1.3-avro-type-serialized-data | Bin 0 -> 23926 bytes .../flink-1.3-avro-type-serializer-snapshot | Bin 0 -> 48089 bytes 13 files changed, 1208 insertions(+), 200 deletions(-) create mode 100644 flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java create mode 100644 flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java create mode 100644 flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java create mode 100644 flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java create mode 100644 flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java create mode 100644 flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data create mode 100644 flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 2e893bbc5e91d..211b7efae43cb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -309,6 +309,10 @@ public TypeSerializer createSerializer(ExecutionConfig config) { return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass()); } + return createPojoSerializer(config); + } + + public PojoSerializer createPojoSerializer(ExecutionConfig config) { TypeSerializer[] fieldSerializers = new TypeSerializer[fields.length]; Field[] reflectiveFields = new Field[fields.length]; @@ -319,7 +323,7 @@ public TypeSerializer createSerializer(ExecutionConfig config) { return new PojoSerializer(getTypeClass(), fieldSerializers, reflectiveFields, config); } - + @Override public boolean equals(Object obj) { if (obj instanceof PojoTypeInfo) { diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java index 58085f6e536d7..03bacfab134a0 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java @@ -33,6 +33,6 @@ public class AvroTypeInfo extends PojoTypeInfo { public AvroTypeInfo(Class typeClass) { - super(typeClass, generateFieldsFromAvroSchema(typeClass)); + super(typeClass, generateFieldsFromAvroSchema(typeClass, true)); } } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java new file mode 100644 index 0000000000000..228e67272f849 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.utils.DataInputDecoder; +import org.apache.flink.formats.avro.utils.DataOutputEncoder; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import com.esotericsoftware.kryo.Kryo; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.util.Utf8; + +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Old deprecated Avro serializer. It is retained for a smoother experience when + * upgrading from an earlier Flink savepoint that stored this serializer. + */ +@Internal +@Deprecated +@SuppressWarnings({"unused", "deprecation"}) +public final class AvroSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + private final Class type; + + private final Class typeToInstantiate; + + /** + * Map of class tag (using classname as tag) to their Kryo registration. + * + *

This map serves as a preview of the final registration result of + * the Kryo instance, taking into account registration overwrites. + */ + private LinkedHashMap kryoRegistrations; + + private transient ReflectDatumWriter writer; + private transient ReflectDatumReader reader; + + private transient DataOutputEncoder encoder; + private transient DataInputDecoder decoder; + + private transient Kryo kryo; + + private transient T deepCopyInstance; + + // -------------------------------------------------------------------------------------------- + + public AvroSerializer(Class type) { + this(type, type); + } + + public AvroSerializer(Class type, Class typeToInstantiate) { + this.type = checkNotNull(type); + this.typeToInstantiate = checkNotNull(typeToInstantiate); + + InstantiationUtil.checkForInstantiation(typeToInstantiate); + + this.kryoRegistrations = buildKryoRegistrations(type); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public AvroSerializer duplicate() { + return new AvroSerializer<>(type, typeToInstantiate); + } + + @Override + public T createInstance() { + return InstantiationUtil.instantiate(this.typeToInstantiate); + } + + @Override + public T copy(T from) { + checkKryoInitialized(); + + return KryoUtils.copy(from, kryo, this); + } + + @Override + public T copy(T from, T reuse) { + checkKryoInitialized(); + + return KryoUtils.copy(from, reuse, kryo, this); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + checkAvroInitialized(); + this.encoder.setOut(target); + this.writer.write(value, this.encoder); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + checkAvroInitialized(); + this.decoder.setIn(source); + return this.reader.read(null, this.decoder); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + checkAvroInitialized(); + this.decoder.setIn(source); + return this.reader.read(reuse, this.decoder); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + checkAvroInitialized(); + + if (this.deepCopyInstance == null) { + this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); + } + + this.decoder.setIn(source); + this.encoder.setOut(target); + + T tmp = this.reader.read(this.deepCopyInstance, this.decoder); + this.writer.write(tmp, this.encoder); + } + + private void checkAvroInitialized() { + if (this.reader == null) { + this.reader = new ReflectDatumReader<>(type); + this.writer = new ReflectDatumWriter<>(type); + this.encoder = new DataOutputEncoder(); + this.decoder = new DataInputDecoder(); + } + } + + private void checkKryoInitialized() { + if (this.kryo == null) { + this.kryo = new Kryo(); + + Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); + instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(instantiatorStrategy); + + kryo.setAsmEnabled(true); + + KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + } + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof AvroSerializer) { + @SuppressWarnings("unchecked") + AvroSerializer avroSerializer = (AvroSerializer) obj; + + return avroSerializer.canEqual(this) && + type == avroSerializer.type && + typeToInstantiate == avroSerializer.typeToInstantiate; + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof AvroSerializer; + } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public AvroSerializerConfigSnapshot snapshotConfiguration() { + return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations); + } + + @SuppressWarnings("unchecked") + @Override + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + final AvroSerializerConfigSnapshot config = (AvroSerializerConfigSnapshot) configSnapshot; + + if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) { + // resolve Kryo registrations; currently, since the Kryo registrations in Avro + // are fixed, there shouldn't be a problem with the resolution here. + + LinkedHashMap oldRegistrations = config.getKryoRegistrations(); + oldRegistrations.putAll(kryoRegistrations); + + for (Map.Entry reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) { + if (reconfiguredRegistrationEntry.getValue().isDummy()) { + return CompatibilityResult.requiresMigration(); + } + } + + this.kryoRegistrations = oldRegistrations; + return CompatibilityResult.compatible(); + } + } + + // ends up here if the preceding serializer is not + // the ValueSerializer, or serialized data type has changed + return CompatibilityResult.requiresMigration(); + } + + /** + * Config snapshot for this serializer. + */ + public static class AvroSerializerConfigSnapshot extends KryoRegistrationSerializerConfigSnapshot { + + private static final int VERSION = 1; + + private Class typeToInstantiate; + + public AvroSerializerConfigSnapshot() {} + + public AvroSerializerConfigSnapshot( + Class baseType, + Class typeToInstantiate, + LinkedHashMap kryoRegistrations) { + + super(baseType, kryoRegistrations); + this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + out.writeUTF(typeToInstantiate.getName()); + } + + @SuppressWarnings("unchecked") + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + + String classname = in.readUTF(); + try { + typeToInstantiate = (Class) Class.forName(classname, true, getUserCodeClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find requested class " + classname + " in classpath.", e); + } + } + + @Override + public int getVersion() { + return VERSION; + } + + public Class getTypeToInstantiate() { + return typeToInstantiate; + } + } + + // -------------------------------------------------------------------------------------------- + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + // kryoRegistrations may be null if this Avro serializer is deserialized from an old version + if (kryoRegistrations == null) { + this.kryoRegistrations = buildKryoRegistrations(type); + } + } + + private static LinkedHashMap buildKryoRegistrations(Class serializedDataType) { + final LinkedHashMap registrations = new LinkedHashMap<>(); + + // register Avro types. + registrations.put( + GenericData.Array.class.getName(), + new KryoRegistration( + GenericData.Array.class, + new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); + registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class)); + registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class)); + registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class)); + registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class)); + + // register the serialized data type + registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType)); + + return registrations; + } +} diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java index 02f74f5d5ace1..bc3369fcf593a 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java @@ -18,85 +18,93 @@ package org.apache.flink.formats.avro.typeutils; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoUtils; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.formats.avro.utils.DataInputDecoder; import org.apache.flink.formats.avro.utils.DataOutputEncoder; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; -import com.esotericsoftware.kryo.Kryo; -import org.apache.avro.generic.GenericData; +import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType; +import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.Utf8; -import org.objenesis.strategy.StdInstantiatorStrategy; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.LinkedHashMap; -import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and - * Kryo for deep object copies. We want to change this to Kryo-only. + * A serializer that serializes types via Avro. * - * @param The type serialized. + *

The serializer supports both efficient specific record serialization for + * types generated via Avro, as well as serialization via reflection + * (ReflectDatumReader / -Writer). The serializer instantiates them depending on + * the class of the type it should serialize. + * + * @param The type to be serialized. */ -@Internal -public final class AvroSerializer extends TypeSerializer { +public class AvroSerializer extends TypeSerializer { private static final long serialVersionUID = 1L; - private final Class type; + // -------- configuration fields, serializable ----------- - private final Class typeToInstantiate; + /** The class of the type that is serialized by this serializer. */ + private final Class type; - /** - * Map of class tag (using classname as tag) to their Kryo registration. - * - *

This map serves as a preview of the final registration result of - * the Kryo instance, taking into account registration overwrites. - */ - private LinkedHashMap kryoRegistrations; + // -------- runtime fields, non-serializable, lazily initialized ----------- - private transient ReflectDatumWriter writer; - private transient ReflectDatumReader reader; + private transient SpecificDatumWriter writer; + private transient SpecificDatumReader reader; private transient DataOutputEncoder encoder; private transient DataInputDecoder decoder; - private transient Kryo kryo; + private transient SpecificData avroData; - private transient T deepCopyInstance; + private transient Schema schema; - // -------------------------------------------------------------------------------------------- + /** The serializer configuration snapshot, cached for efficiency. */ + private transient AvroSchemaSerializerConfigSnapshot configSnapshot; + // ------------------------------------------------------------------------ + + /** + * Creates a new AvroSerializer for the type indicated by the given class. + */ public AvroSerializer(Class type) { - this(type, type); + this.type = checkNotNull(type); } + /** + * @deprecated Use {@link AvroSerializer#AvroSerializer(Class)} instead. + */ + @Deprecated + @SuppressWarnings("unused") public AvroSerializer(Class type, Class typeToInstantiate) { - this.type = checkNotNull(type); - this.typeToInstantiate = checkNotNull(typeToInstantiate); + this(type); + } - InstantiationUtil.checkForInstantiation(typeToInstantiate); + // ------------------------------------------------------------------------ - this.kryoRegistrations = buildKryoRegistrations(type); + public Class getType() { + return type; } - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ @Override public boolean isImmutableType() { @@ -104,32 +112,17 @@ public boolean isImmutableType() { } @Override - public AvroSerializer duplicate() { - return new AvroSerializer(type, typeToInstantiate); - } - - @Override - public T createInstance() { - return InstantiationUtil.instantiate(this.typeToInstantiate); - } - - @Override - public T copy(T from) { - checkKryoInitialized(); - - return KryoUtils.copy(from, kryo, this); + public int getLength() { + return -1; } - @Override - public T copy(T from, T reuse) { - checkKryoInitialized(); - - return KryoUtils.copy(from, reuse, kryo, this); - } + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ @Override - public int getLength() { - return -1; + public T createInstance() { + return InstantiationUtil.instantiate(type); } @Override @@ -153,111 +146,216 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // ------------------------------------------------------------------------ + // Copying + // ------------------------------------------------------------------------ + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader(type); - this.writer = new ReflectDatumWriter(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // ------------------------------------------------------------------------ + // Compatibility and Upgrades + // ------------------------------------------------------------------------ + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + checkAvroInitialized(); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else { + return CompatibilityResult.requiresMigration(); } } - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public TypeSerializer duplicate() { + return new AvroSerializer<>(type); + } @Override public int hashCode() { - return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode(); + return 42 + type.hashCode(); } @Override public boolean equals(Object obj) { - if (obj instanceof AvroSerializer) { - @SuppressWarnings("unchecked") - AvroSerializer avroSerializer = (AvroSerializer) obj; - - return avroSerializer.canEqual(this) && - type == avroSerializer.type && - typeToInstantiate == avroSerializer.typeToInstantiate; - } else { + if (obj == this) { + return true; + } + else if (obj != null && obj.getClass() == AvroSerializer.class) { + final AvroSerializer that = (AvroSerializer) obj; + return this.type == that.type; + } + else { return false; } } @Override public boolean canEqual(Object obj) { - return obj instanceof AvroSerializer; + return obj.getClass() == this.getClass(); } - // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility - // -------------------------------------------------------------------------------------------- - @Override - public AvroSerializerConfigSnapshot snapshotConfiguration() { - return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations); + public String toString() { + return getClass().getName() + " (" + getType().getName() + ')'; } - @SuppressWarnings("unchecked") - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof AvroSerializerConfigSnapshot) { - final AvroSerializerConfigSnapshot config = (AvroSerializerConfigSnapshot) configSnapshot; + // ------------------------------------------------------------------------ + // Initialization + // ------------------------------------------------------------------------ + + private void checkAvroInitialized() { + if (writer == null) { + initializeAvro(); + } + } + + private void initializeAvro() { + final ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + if (SpecificRecord.class.isAssignableFrom(type)) { + this.avroData = new SpecificData(cl); + this.schema = this.avroData.getSchema(type); + this.reader = new SpecificDatumReader<>(schema, schema, avroData); + this.writer = new SpecificDatumWriter<>(schema, avroData); + } + else { + final ReflectData reflectData = new ReflectData(cl); + this.avroData = reflectData; + this.schema = this.avroData.getSchema(type); + this.reader = new ReflectDatumReader<>(schema, schema, reflectData); + this.writer = new ReflectDatumWriter<>(schema, reflectData); + } + + this.encoder = new DataOutputEncoder(); + this.decoder = new DataInputDecoder(); + } + + // ------------------------------------------------------------------------ + // Serializer Snapshots + // ------------------------------------------------------------------------ + + /** + * A config snapshot for the Avro Serializer that stores the Avro Schema to check compatibility. + */ + public static final class AvroSchemaSerializerConfigSnapshot extends TypeSerializerConfigSnapshot { + + private String schemaString; + + /** + * Default constructor for instantiation via reflection. + */ + @SuppressWarnings("unused") + public AvroSchemaSerializerConfigSnapshot() {} + + public AvroSchemaSerializerConfigSnapshot(String schemaString) { + this.schemaString = checkNotNull(schemaString); + } + + public String getSchemaString() { + return schemaString; + } + + // --- Serialization --- + + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + this.schemaString = in.readUTF(); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + out.writeUTF(schemaString); + } - if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) { - // resolve Kryo registrations; currently, since the Kryo registrations in Avro - // are fixed, there shouldn't be a problem with the resolution here. + // --- Version --- - LinkedHashMap oldRegistrations = config.getKryoRegistrations(); - oldRegistrations.putAll(kryoRegistrations); + @Override + public int getVersion() { + return 1; + } - for (Map.Entry reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) { - if (reconfiguredRegistrationEntry.getValue().isDummy()) { - return CompatibilityResult.requiresMigration(); - } - } + // --- Utils --- - this.kryoRegistrations = oldRegistrations; - return CompatibilityResult.compatible(); + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + else if (obj != null && obj.getClass() == AvroSchemaSerializerConfigSnapshot.class) { + final AvroSchemaSerializerConfigSnapshot that = (AvroSchemaSerializerConfigSnapshot) obj; + return this.schemaString.equals(that.schemaString); + } + else { + return false; } } - // ends up here if the preceding serializer is not - // the ValueSerializer, or serialized data type has changed - return CompatibilityResult.requiresMigration(); + @Override + public int hashCode() { + return 11 + schemaString.hashCode(); + } + + @Override + public String toString() { + return getClass().getName() + " (" + schemaString + ')'; + } } /** - * {@link TypeSerializerConfigSnapshot} for Avro. + * The outdated config snapshot, retained for backwards compatibility. + * + * @deprecated The {@link AvroSchemaSerializerConfigSnapshot} should be used instead. */ + @Deprecated public static class AvroSerializerConfigSnapshot extends KryoRegistrationSerializerConfigSnapshot { private static final int VERSION = 1; @@ -266,15 +364,6 @@ public static class AvroSerializerConfigSnapshot extends KryoRegistrationSeri public AvroSerializerConfigSnapshot() {} - public AvroSerializerConfigSnapshot( - Class baseType, - Class typeToInstantiate, - LinkedHashMap kryoRegistrations) { - - super(baseType, kryoRegistrations); - this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate); - } - @Override public void write(DataOutputView out) throws IOException { super.write(out); @@ -304,35 +393,4 @@ public Class getTypeToInstantiate() { return typeToInstantiate; } } - - // -------------------------------------------------------------------------------------------- - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - // kryoRegistrations may be null if this Avro serializer is deserialized from an old version - if (kryoRegistrations == null) { - this.kryoRegistrations = buildKryoRegistrations(type); - } - } - - private static LinkedHashMap buildKryoRegistrations(Class serializedDataType) { - final LinkedHashMap registrations = new LinkedHashMap<>(); - - // register Avro types. - registrations.put( - GenericData.Array.class.getName(), - new KryoRegistration( - GenericData.Array.class, - new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); - registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class)); - registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class)); - registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class)); - registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class)); - - // register the serialized data type - registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType)); - - return registrations; - } } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java index ad6b06e588129..644ee50d36172 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java @@ -33,6 +33,7 @@ import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; /** * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs) @@ -49,43 +50,83 @@ public class AvroTypeInfo extends PojoTypeInfo private static final long serialVersionUID = 1L; + private static final ConcurrentHashMap IN_BACKWARDS_COMPATIBLE_MODE = new ConcurrentHashMap<>(); + + private final boolean useBackwardsCompatibleSerializer; + + /** + * Creates a new Avro type info for the given class. + */ public AvroTypeInfo(Class typeClass) { - super(typeClass, generateFieldsFromAvroSchema(typeClass)); + this(typeClass, false); + } + + /** + * Creates a new Avro type info for the given class. + * + *

This constructor takes a flag to specify whether a serializer + * that is backwards compatible with PoJo-style serialization of Avro types should be used. + * That is only necessary, if one has a Flink 1.3 (or earlier) savepoint where Avro types + * were stored in the checkpointed state. New Flink programs will never need this. + */ + public AvroTypeInfo(Class typeClass, boolean useBackwardsCompatibleSerializer) { + super(typeClass, generateFieldsFromAvroSchema(typeClass, useBackwardsCompatibleSerializer)); + + final Boolean modeOnStack = IN_BACKWARDS_COMPATIBLE_MODE.get(Thread.currentThread()); + this.useBackwardsCompatibleSerializer = modeOnStack == null ? + useBackwardsCompatibleSerializer : modeOnStack; } @Override + @SuppressWarnings("deprecation") public TypeSerializer createSerializer(ExecutionConfig config) { - return super.createSerializer(config); + return useBackwardsCompatibleSerializer ? + new BackwardsCompatibleAvroSerializer<>(getTypeClass()) : + new AvroSerializer<>(getTypeClass()); } @SuppressWarnings("unchecked") @Internal - public static List generateFieldsFromAvroSchema(Class typeClass) { - PojoTypeExtractor pte = new PojoTypeExtractor(); - ArrayList typeHierarchy = new ArrayList<>(); - typeHierarchy.add(typeClass); - TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null); - - if (!(ti instanceof PojoTypeInfo)) { - throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); - } - PojoTypeInfo pti = (PojoTypeInfo) ti; - List newFields = new ArrayList<>(pti.getTotalFields()); - - for (int i = 0; i < pti.getArity(); i++) { - PojoField f = pti.getPojoFieldAt(i); - TypeInformation newType = f.getTypeInformation(); - // check if type is a CharSequence - if (newType instanceof GenericTypeInfo) { - if ((newType).getTypeClass().equals(CharSequence.class)) { - // replace the type by a org.apache.avro.util.Utf8 - newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class); + public static List generateFieldsFromAvroSchema( + Class typeClass, + boolean useBackwardsCompatibleSerializer) { + + final Thread currentThread = Thread.currentThread(); + final boolean entryPoint = + IN_BACKWARDS_COMPATIBLE_MODE.putIfAbsent(currentThread, useBackwardsCompatibleSerializer) == null; + + try { + PojoTypeExtractor pte = new PojoTypeExtractor(); + ArrayList typeHierarchy = new ArrayList<>(); + typeHierarchy.add(typeClass); + TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null); + + if (!(ti instanceof PojoTypeInfo)) { + throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); + } + PojoTypeInfo pti = (PojoTypeInfo) ti; + List newFields = new ArrayList<>(pti.getTotalFields()); + + for (int i = 0; i < pti.getArity(); i++) { + PojoField f = pti.getPojoFieldAt(i); + TypeInformation newType = f.getTypeInformation(); + // check if type is a CharSequence + if (newType instanceof GenericTypeInfo) { + if ((newType).getTypeClass().equals(CharSequence.class)) { + // replace the type by a org.apache.avro.util.Utf8 + newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class); + } } + PojoField newField = new PojoField(f.getField(), newType); + newFields.add(newField); + } + return newFields; + } + finally { + if (entryPoint) { + IN_BACKWARDS_COMPATIBLE_MODE.remove(currentThread); } - PojoField newField = new PojoField(f.getField(), newType); - newFields.add(newField); } - return newFields; } private static class PojoTypeExtractor extends TypeExtractor { diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java new file mode 100644 index 0000000000000..e5eb5d89a1f77 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if + * it has to ensure compatibility with one of those. + * + *

This serializer is there only as a means to explicitly fall back to PoJo serialization + * in the case where an upgrade from an earlier savepoint was made. + * + * @param The type to be serialized. + */ +@SuppressWarnings("deprecation") +public class BackwardsCompatibleAvroSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + /** The type to serialize. */ + private final Class type; + + /** The type serializer currently used. Avro by default. */ + private TypeSerializer serializer; + + /** + * Creates a new backwards-compatible Avro Serializer, for the given type. + */ + public BackwardsCompatibleAvroSerializer(Class type) { + this.type = type; + this.serializer = new AvroSerializer<>(type); + } + + /** + * Private copy constructor. + */ + private BackwardsCompatibleAvroSerializer(Class type, TypeSerializer serializer) { + this.type = type; + this.serializer = serializer; + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + @Override + public boolean isImmutableType() { + return serializer.isImmutableType(); + } + + @Override + public int getLength() { + return serializer.getLength(); + } + + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ + + @Override + public T createInstance() { + return serializer.createInstance(); + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + serializer.serialize(value, target); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + return serializer.deserialize(source); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + return serializer.deserialize(reuse, source); + } + + // ------------------------------------------------------------------------ + // Copying + // ------------------------------------------------------------------------ + + @Override + public T copy(T from) { + return serializer.copy(from); + } + + @Override + public T copy(T from, T reuse) { + return serializer.copy(from, reuse); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + serializer.copy(source, target); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public TypeSerializer duplicate() { + return new BackwardsCompatibleAvroSerializer<>(type, serializer.duplicate()); + } + + @Override + public int hashCode() { + return type.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + else if (obj != null && obj.getClass() == BackwardsCompatibleAvroSerializer.class) { + final BackwardsCompatibleAvroSerializer that = (BackwardsCompatibleAvroSerializer) obj; + return this.type == that.type && this.serializer.equals(that.serializer); + } + else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj.getClass() == this.getClass(); + } + + @Override + public String toString() { + return getClass().getName() + " (" + type.getName() + ')'; + } + + // ------------------------------------------------------------------------ + // Configuration Snapshots and Upgrades + // ------------------------------------------------------------------------ + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + // we return the configuration of the actually used serializer here + return serializer.snapshotConfiguration(); + } + + @Override + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot || + configSnapshot instanceof AvroSerializerConfigSnapshot) { + + // avro serializer, nice :-) + checkState(serializer instanceof AvroSerializer, + "Serializer was changed backwards to PojoSerializer and now encounters AvroSerializer snapshot."); + + return serializer.ensureCompatibility(configSnapshot); + } + else if (configSnapshot instanceof PojoSerializerConfigSnapshot) { + // common previous case + checkState(SpecificRecordBase.class.isAssignableFrom(type), + "BackwardsCompatibleAvroSerializer resuming a state serialized " + + "via a PojoSerializer, but not for an Avro Specific Record"); + + final AvroTypeInfo typeInfo = + new AvroTypeInfo<>(type.asSubclass(SpecificRecordBase.class), true); + + @SuppressWarnings("unchecked") + final TypeSerializer pojoSerializer = + (TypeSerializer) typeInfo.createPojoSerializer(new ExecutionConfig()); + this.serializer = pojoSerializer; + return serializer.ensureCompatibility(configSnapshot); + } + else if (configSnapshot instanceof KryoRegistrationSerializerConfigSnapshot) { + // force-kryo old case common previous case + // we create a new Kryo Serializer with a blank execution config. + // registrations are anyways picked up from the snapshot. + serializer = new KryoSerializer<>(type, new ExecutionConfig()); + return serializer.ensureCompatibility(configSnapshot); + } + else { + // completely incompatible type, needs migration + return CompatibilityResult.requiresMigration(); + } + } +} diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java new file mode 100644 index 0000000000000..0ab58683f5aba --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.utils.TestDataGenerator; + +import java.util.Random; + +/** + * Tests for the {@link AvroSerializer} that test specific avro types. + */ +public class AvroSerializerTest extends SerializerTestBase { + + @Override + protected TypeSerializer createSerializer() { + return new AvroSerializer<>(User.class); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class getTypeClass() { + return User.class; + } + + @Override + protected User[] getTestData() { + final Random rnd = new Random(); + final User[] users = new User[20]; + + for (int i = 0; i < users.length; i++) { + users[i] = TestDataGenerator.generateRandomUser(rnd); + } + + return users; + } +} diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java index ae410313e5227..fbabb958daa8e 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java @@ -82,21 +82,14 @@ public void testSimpleAvroRead() throws Exception { AvroInputFormat users = new AvroInputFormat(in, User.class); DataSet usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction() { - @Override - public User map(User value) throws Exception { - value.setTypeMap(null); - return value; - } - }); + .map((value) -> value); usersDS.writeAsText(resultPath); env.execute("Simple Avro read job"); - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; + expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; } @Test @@ -107,7 +100,6 @@ public void testSerializeWithAvro() throws Exception { AvroInputFormat users = new AvroInputFormat(in, User.class); DataSet usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) .map(new MapFunction() { @Override public User map(User value) throws Exception { diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java index 79a4a451c74b0..371cd4f8b3b9b 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java @@ -18,10 +18,16 @@ package org.apache.flink.formats.avro.typeutils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeInformationTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.User; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + /** * Test for {@link AvroTypeInfo}. */ @@ -34,4 +40,10 @@ protected AvroTypeInfo[] getTestData() { new AvroTypeInfo<>(User.class), }; } + + @Test + public void testAvroByDefault() { + final TypeSerializer serializer = new AvroTypeInfo<>(User.class).createSerializer(new ExecutionConfig()); + assertTrue(serializer instanceof AvroSerializer); + } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java new file mode 100644 index 0000000000000..92395bab586a5 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.utils.TestDataGenerator; + +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * This test ensures that state and state configuration created by Flink 1.3 Avro types + * that used the PojoSerializer still works. + * + *

Important: Since Avro itself broke class compatibility between 1.7.7 (used in Flink 1.3) + * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken through Avro already. + * This test only tests that the Avro serializer change (switching from Pojo to Avro for Avro types) + * works properly. + * + *

This test can be dropped once we drop backwards compatibility with Flink 1.3 snapshots. + */ +public class BackwardsCompatibleAvroSerializerTest { + + private static final String SNAPSHOT_RESOURCE = "flink-1.3-avro-type-serializer-snapshot"; + + private static final String DATA_RESOURCE = "flink-1.3-avro-type-serialized-data"; + + @SuppressWarnings("unused") + private static final String SNAPSHOT_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + SNAPSHOT_RESOURCE; + + @SuppressWarnings("unused") + private static final String DATA_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + DATA_RESOURCE; + + private static final long RANDOM_SEED = 143065108437678L; + + private static final int NUM_DATA_ENTRIES = 20; + + @Test + public void testCompatibilityWithFlink_1_3() throws Exception { + + // retrieve the old config snapshot + + final TypeSerializer serializer; + final TypeSerializerConfigSnapshot configSnapshot; + + try (InputStream in = getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) { + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in); + + List, TypeSerializerConfigSnapshot>> deserialized = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience( + inView, getClass().getClassLoader()); + + assertEquals(1, deserialized.size()); + + @SuppressWarnings("unchecked") + final TypeSerializer typedSerializer = (TypeSerializer) deserialized.get(0).f0; + + serializer = typedSerializer; + configSnapshot = deserialized.get(0).f1; + } + + assertNotNull(serializer); + assertNotNull(configSnapshot); + + assertTrue(serializer instanceof PojoSerializer); + assertTrue(configSnapshot instanceof PojoSerializerConfigSnapshot); + + // sanity check for the test: check that the test data works with the original serializer + validateDeserialization(serializer); + + // sanity check for the test: check that a PoJoSerializer and the original serializer work together + assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration()); + + final TypeSerializer newSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig()); + assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration()); + + // deserialize the data and make sure this still works + validateDeserialization(newSerializer); + + TypeSerializerConfigSnapshot nextSnapshot = newSerializer.snapshotConfiguration(); + final TypeSerializer nextSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig()); + + assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration()); + + // deserialize the data and make sure this still works + validateDeserialization(nextSerializer); + } + + private static void validateDeserialization(TypeSerializer serializer) throws IOException { + final Random rnd = new Random(RANDOM_SEED); + + try (InputStream in = BackwardsCompatibleAvroSerializerTest.class.getClassLoader() + .getResourceAsStream(DATA_RESOURCE)) { + + final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in); + + for (int i = 0; i < NUM_DATA_ENTRIES; i++) { + final User deserialized = serializer.deserialize(inView); + + // deterministically generate a reference record + final User reference = TestDataGenerator.generateRandomUser(rnd); + + assertEquals(reference, deserialized); + } + } + } + +// run this code on a 1.3 (or earlier) branch to generate the test data +// public static void main(String[] args) throws Exception { +// +// AvroTypeInfo typeInfo = new AvroTypeInfo<>(User.class); +// +// TypeSerializer serializer = typeInfo.createPojoSerializer(new ExecutionConfig()); +// TypeSerializerConfigSnapshot confSnapshot = serializer.snapshotConfiguration(); +// +// try (FileOutputStream fos = new FileOutputStream(SNAPSHOT_RESOURCE_WRITER)) { +// DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(fos); +// +// TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( +// out, +// Collections.singletonList( +// new Tuple2<>(serializer, confSnapshot))); +// } +// +// try (FileOutputStream fos = new FileOutputStream(DATA_RESOURCE_WRITER)) { +// final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(fos); +// final Random rnd = new Random(RANDOM_SEED); +// +// for (int i = 0; i < NUM_DATA_ENTRIES; i++) { +// serializer.serialize(TestDataGenerator.generateRandomUser(rnd), out); +// } +// } +// } +} diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java new file mode 100644 index 0000000000000..9a9061e0ef757 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.utils; + +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.formats.avro.generated.Colors; +import org.apache.flink.formats.avro.generated.Fixed16; +import org.apache.flink.formats.avro.generated.User; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Random; + +/** + * Generator for random test data for the generated Avro User type. + */ +public class TestDataGenerator { + + public static User generateRandomUser(Random rnd) { + return new User( + generateRandomString(rnd, 50), + rnd.nextBoolean() ? null : rnd.nextInt(), + rnd.nextBoolean() ? null : generateRandomString(rnd, 6), + rnd.nextBoolean() ? null : rnd.nextLong(), + rnd.nextDouble(), + null, + rnd.nextBoolean(), + generateRandomStringList(rnd, 20, 30), + generateRandomBooleanList(rnd, 20), + rnd.nextBoolean() ? null : generateRandomStringList(rnd, 20, 20), + generateRandomColor(rnd), + new HashMap<>(), + generateRandomFixed16(rnd), + generateRandomUnion(rnd), + generateRandomAddress(rnd)); + } + + public static Colors generateRandomColor(Random rnd) { + return Colors.values()[rnd.nextInt(Colors.values().length)]; + } + + public static Fixed16 generateRandomFixed16(Random rnd) { + if (rnd.nextBoolean()) { + return new Fixed16(); + } + else { + byte[] bytes = new byte[16]; + rnd.nextBytes(bytes); + return new Fixed16(bytes); + } + } + + public static Address generateRandomAddress(Random rnd) { + return new Address( + rnd.nextInt(), + generateRandomString(rnd, 20), + generateRandomString(rnd, 20), + generateRandomString(rnd, 20), + generateRandomString(rnd, 20)); + } + + private static List generateRandomBooleanList(Random rnd, int maxEntries) { + final int num = rnd.nextInt(maxEntries + 1); + ArrayList list = new ArrayList<>(); + for (int i = 0; i < num; i++) { + list.add(rnd.nextBoolean()); + } + return list; + } + + private static List generateRandomStringList(Random rnd, int maxEntries, int maxLen) { + final int num = rnd.nextInt(maxEntries + 1); + ArrayList list = new ArrayList<>(); + for (int i = 0; i < num; i++) { + list.add(generateRandomString(rnd, maxLen)); + } + return list; + } + + private static String generateRandomString(Random rnd, int maxLen) { + char[] chars = new char[rnd.nextInt(maxLen + 1)]; + for (int i = 0; i < chars.length; i++) { + chars[i] = (char) rnd.nextInt(Character.MAX_VALUE); + } + return new String(chars); + } + + private static Object generateRandomUnion(Random rnd) { + if (rnd.nextBoolean()) { + if (rnd.nextBoolean()) { + return null; + } else { + return rnd.nextBoolean(); + } + } else { + if (rnd.nextBoolean()) { + return rnd.nextLong(); + } else { + return rnd.nextDouble(); + } + } + } +} diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data new file mode 100644 index 0000000000000000000000000000000000000000..028c1e6ce47737698dcf45bf5600d67f76bb2f8c GIT binary patch literal 23926 zcmb5W39#&LRv&iXJirW$?SNsDDwCv~6jU4=5>r`7DrNvD7*_>IDh6yALx{^15QC=x z8&k%lH??||THR{(zNlNhuj*DWYW0F=V}@avjaguZq4jB(K1oi2Ar-*M>G${eKbP`L zIrrAPw_mqfxAlGJ{LVSQ^E-OW)5{m6&5XQTN$Xvau=Up`f46-4j(_@Bei~kyIs1Yd zIfa9Wbrey$VWv9wWx{O{>LiXR#%kxB!A4J2a4nBEWm9SS)auh?v^i?!y(uS6HtR{U zH|_hSgxYL{tv1R~_f^(SoULA0&lUoSv6ZwsB;9D-stVSnX7-6-$?NhYoJ5{pJpJU4 z{-YoLrhoWzKk@g!>DzziXMXe-{>PvA+24EdC13gk-!H!O#cN*w>X$DVr!C>A;vXaL zo@qVWDW;5g#Ltm|cr=fs%gg>O7xYtBvtY!_ZZY}tMNOUTwEf15B!g(iy>NDYW$+C@ z%7^v6)~$vIAtaIxA5(WbEzO2<(;X@~zEVD)O*9ndy;_(1Y||B4mayYWHTWUVsoEA0Ic z^-6KS+tBtgE5Tdz*cczoJ^%7WA{0CNDDIv*FJC16P93k9J3sIzedAnu`J(Pq&5y{= zKy-+*Dor(RZFgvIkO#Nz#?HyDJ$fS)H@tgmlJ(U^xt9*N`QAsgK#3 z(@4PS5mEG01>D={y{>r{UcR8MM(@6<>#6+xFb#y&ml^UpNLH7z%NI?R zF4^arE!6#X#3xF@agFP;n$06R=G8A=ip283h*KV!vk&8-+r&3fOWdr=4K-rHnz1r_ zzelRQVVEFLQbAGTu~6NDyQguF^tgl{gKH~m1wuQg!`&jxad@9}+Ecrc@@_qAeez08 zTurE58Pf-oA6w^~Q@-gE6=n(RR0tN=moK8=cFvkH1_ixR#q2R)y8cXjY-*7US6z#V z`}6?KL`PDrL=!W6V@Y8m`YUC$s9-9Ml2!lGu#}peWYh}fcGBPyPw#Ml;Lm@D^{;;8FaJ;9^Y(B1p!~)RGgoz4K&gJX z8W|<=={K4m`$;$ht!ATITe^=&Zf6@NGJbYx#Vd9(8qDHeiZ%`_^}MrtbE7B0(FE>f z^+HKYD-N}R|1+N~TSMNamuiF5su7Q@1*5rb&v=~I>SsJu zoa05!)~ziU9+hBHFe(wFJXen;``Ge^;D1T9@1q0dCOR%7hTH%=-#A{;YKO;HO-R`7 zg|#?Ys}?-(t(5!f;LS2xdttU%uhd7g)yo$d`_fP++^33aoKfdRD^+poRbA-$iy>@- ze=Ff4qc6+vTOgT5m#Nu|nWr4C9KBLf?{)%~*AsOkdegTZ?G(j@D9+tbZwi)$#htx6 z`s5~DDd~d6EE8zsyXG09=$j#9%oi)D+Il(mM$B@GQ8Bi-QH`D?e@^^ zRJCFB^AosGLzrvhYS>}IMK|a);9U0w7c50IX!@tDwaMaI%T6aPa%8v4-r=TETQ^?T zxGtvp_G*ZPMNo};kv6ykEQYn`J^E}WmR6^$vqQ6Z2oH%D56dIW+tkaSjK$ua%wND0 z94<J{P?pFJ020_jqk@}h$XLnKAw48RNx{X^}st;;#;UvR5$9!Gw> z<&@H9dh|HMJIu=;1n->=AAb0jr-vi{(l31X-}?{V(fnth`r)7Z-?z*k{x8q}%5w)y z1eHD#>HBQ_^pWn~hjqxQr;~QzjKS0{QtI?*vkEGd!t6e{-n>m-El1!W#^)K-AN%W~ zHW-KPp5jOPzCwpp!r;om_NEqw7QYI! zIcJ%~os>oQ?aj#*rrt4!S8J~;I|BLqlfSvDf9Bo)!36ji$~UlhFQ%pZteY!cLlUXr)xL`bzzqvnpb z&S0sDdv|dO>V4~$H0I#4X-u!IK|xD(F;jCUN3EK-a7_y z6tx;ztIa_4wPtj%0iTXr(Y@9LCypUy1JNqT%#20V9rO*Q?6z}$jImQ=Cwul|4OAH0 zDaP9g>Nx7o=IXp>^inPv)3o108pn+BNU!z~gab#Nf& z3h}4^llax2hiBsU2r%e%_Wkr4yZ=*QBRJ9tXBo5GwCi0vIWU_QzY=o>Lp74K4lA1z z->>NTThP6P@wvN#bt%}Tiq3BFKsLxUnvK2O%ocO@psDd7uqoVp_M=H?n`n0@)8ZTDL4w4iS(y`{vt)e~9D{ti11~m=iR5+gKm6JL#P+4z>g?gwO}EV9Pt{sP@>u94=S2Y4rVDwb*vjB3hC5ma)qT zGv4=R#An!7ZjApl^WS~l+mweJzj)=wk4Inp_1qbryK(gCz4!n6|A5`1^nTCnRq!Zb zcE&j0M~SvB_O1L$Yi6u&SFMzAITs#CIFxm-U>O{m9@JsnomRDu>=RX-xajiDsJ7qR zFiV7;A*~|q#dmfhVf0#FnepQkytiarl4`N8b(7(C6wYGSENARu_c>`zNA_vvaRt;^ znsX{#$IKq>MJcCUbf@d*m++=Pe>oW5{j@U5GklRD2x#qXK%LZORZp=t2d)s)dfbb}xU=lm9Dz$k*ovFc9p5FRLpRfM1{m-`*@hvZ&-t*1> zM1_SbI;n-ft%S{mSC2T1=n5yl-uPtZF3y7XR?VcHOUgf`orHwqaknj6CH^%Y_LfH+ zdChCEo)y$so8z(F5xqno71wy&Gm!NOohFrzm!RtO?X zZ=BSM8=O`?A-cJ-KH`jg-{S>5%DWNBBL*S0@njCH-I$XuleVOjp4@50YY?U1-wKX$ zSWSM6gShOodp81~fmV~z``M z_WmbbHWK`10ZbzKNhuhu@U87_CvL6-&*YtzXhxg*ZuqNn6j6e~&};2cl?;Z_fGv65 zn@#56u#~$^YUhYmg+i(4B`+a=vwGJTFJ9$03A4EixrKTeJ(8Oq6cqYNF@}_L9P0Ae zqGpCTaaTFNIn|Za$M-?`=E{3SwA^A5AIcUrHYY=G+tSZb6fIcs`29A;&9OFZxYN3x z+4{4*UOK3Yz15_(YTxZFUSo=1@hbKCLdx^T@8`?%CJaE(pi%7-Oq%y|(;AIXdJ?YE zUS)0MHsPgcHU?_6rU@DQJi!qNhv`rrgy)PtN@)X0FBk6fm04cdoz%l}t1+7<^m!Wh zCrBxIhevLVh^Vu_Ir$o}19x7sm4Q}*DBjcr#_4qMWRGTZdnVhNEWRwl*f}T`kW{~) zWv{)(@({9QH7@ESpef3MoI;6-E%46jN;jQJ#eh(b`ojx73e$Z$5^+`_8HI4X03$wU=mw{>wjWmhy}l}eHBhV`Wydr_J?25dsyKc zS>aFq)xZAr)Bp5CU-i>pRsXHZ? z!Dx*A=*eCcAu|jUbMLyc>Md(Mb2^OQk9;<1Zc8{_54OO*q|98~TV#P*2_|W4o%heA zN0Lup@eb?%X8+FY_kZv$FCHG#{NTtC%0_Qy)LFBiL)&3^iTg*+Y*gK1-M)w)PX>*X zPwn+w+esDuWZP30=DZHNw}SmE3u}=#-LbW%3&Lcs+){4Pv4>&xh5kodeYE z*o(DEa-NvMr8QJPp1?tt{yQf_UO7iTpk}ymaN{=sC^NgzS7+3|I7!Oy#9eA>Pp3f_ zV5H*hcym@)kBn1D>*=g|t+*`IaEv}>>@#bYqE@%1L)?g;eNI8cm_=2BdfHGfM*pC- zV^0lai_O;xTtsQxZ9qm zB=uAd2|yv_vAhjybu9}1+f?^`pejtI41lgGHC0)u%ZA=g8mW}Aub?hxrjB8upmHlG zcR|J3`yq)fYpsqt%UP+0+g7~NRKK-@$;LXw>{ZMtiuXOxAdGu#*^`c2+IqVd&bPv9 zBaA5F0f;GJei|AR@v0ZEKe75G-a=~G4p>Q>N=6~!q>jD_R7^I=D#5Pol&adkWiGDn zvFP`BbUOsx(JMkm(lR4MXG!U$d_WxZNkT7zBeIB6JRmI3ElcL@Nh%}<=7Mpnaf9F6 zt%5nrVkU+cXT7)&Zy9r()`k*VZuAV{Hz_=Xnzsl@UQoOvc-irStv&PQ*<8kwWjWW9bqObCG>nf&2Cc@#LoK-N+t9BZRy&*Degai&Gn zkcNlZE2KeQvFo)L&asZSQ%#Bc8A8jk`sk?lAq`0*C4uHCBLU<@$CXRJI?@j{s|mJH z44Zo(%F+BBQB$H)}LkWyQpzqIq@7imfQ;MCric|f=R<#oyzMGhtm7iT}G0Msm z)#O=QTtfytx8PKl#;|(dY{D8Jq?0;5#FvgLSI}9p(-o7AJI9Kh@0g7Oo}^wt0{iJ7 z+x$LkY1K&+)>3kS^rSAqma^Eo6>D1iw0^pI!~!cDw{+4Ir!Ybwa*eqRCRggJ`WrW( zsa$^Q)r&g2dS-%eV*FeG6*cm&vV=$G@R}pP^uY^VNnA>!$pp#^x~@X#6j;P0OI?hLFFnE5TH;nmMz* zH-R)+)?SAqe;O>sm{r0|5#{sVt>#h@jbQ_ZboV*b0OD%s)$86RZV&ch^J+=gATH>& z^v7dr9typ*wmzd&$KORws2@v$+h`e=g(G&sFa0`)3YnnI+K~U;4k}5xDcD@f+A;yR z(;FgQChY=YOjuJMeyU*6fRRN@p6l_2HrfIT;3VTQ5ZZI^)R?`xYq$;4pG@Aj(S|XH zrctF~RK{uz%#Z!}$V*Ou0m7M>BeHIC?(UCXb>WkJ6lHL1WA!S=YUVR#d?NJ>tx=LH z#heiY)U=b?qxRgX&Twhw$0YJ^{WG87|~;dr}WJohc{&1zkC1<{iPwRpM9j7H|%PTh>}H7*gHc|bXIlJEg)*_zhK7`=w^7g^Q`PEX8V9aeouhC)VVAEiKF?0rZDQTUF z9UrO*&ZQHGSeV?-ZkP2z-rZB4)OU%VyUW{L97A=^9-M5`o;K_`M6iyTTsxPxABk(^ zMX&ADZP!ggT735TmOZW^vEoq&b5ZeH;^#LiFzHG=ywF~;Z&e0bxf#w|PGX=p&-#jU z=2>l0va3~J-0BfHcT+oMtjJK0HT-5LTyNa#L61jFKI7#xR{dsmHUZoAH;SGq-o=G> zTBxlW+Hk+kJDG8~&DkvL@Ah^%XLf-yIT$-|l$g67!B3|P^(JXhw_tH}@Ih{ay2<+?N0$gi8QJQFg|Z$`_q9@L%GcFQM!~ z&97^Q~Otlqm>a{+J?*C!=3gdNB`J4HK2?7nRF zN;bLj08(>Ft5buzd>nS{qe@qs3gKz$aaEVvxm^FJ{V`*9L?hAmrqx%W zM*j!+yJ+TH9)aeKlJoKCi@%;@^z+mca^kilE>L6Z18rE|=wlisC~wJup=+Jh(=YtU zcRvCP6-?W@#8{2$=U^?a0UdSoX9wz-W5T1OXkS+oV|{ypBQt262^y@bRFD$YqgP`g z>z64O6tgYE%e0yxkyJFPgC+6b=H0U1X&I2ykgkOE2@~g_bk&!6Xhl8$&~GLyU-m$7 zF6Pgn(%tiN?wrE0o+r&wfnjp#>PQ=>4+&UUbj+e^OUvuPFz#^-;l^I{CQo)&C z0%eKkvk)X8^Nc=##p;+WG=XwzZRoVbPabi9?F>kh2WE_hxU6?1x76`jC9H1I@oMI3 zR-W{$TQ`+8yR$%mHBScPzFOLt?IMO&%NATb6_#7-pzl&qBctF7`P&Wm(ZQ{U%-AhX zoz?o?6_XOxf)dZf@v8^x9^P z+EkA{K*$}W>?>U4=B~8v)4dgy?QF?Q$OgScw=(XJ9I^UqQ+=34<(owDh&Y>%IVN9-;gp8oz{{?E!_uVrtXz>0i7Z?y3yJoP@eHhcN|kkd|^;JCp&WY0X7 z(x^@ut425xjri1GPV98o8x!#M1io&6O~%|79!Fp2`~MF3ez+^E8z{s`P|Soa*0*|IKzghOgjVQBpUYy3o zl~WwJiL_oC5;U{o*6yI~KuEygbLzgynEjS>92>Xc zebfn4anzh@6mPeR9##GL($6W@ART}}iG#gM8bigTC?lH;stqld0h@OR`EU<{tDH7U z*m1!H^#%dCN=$!*tAa64SkpBU6{9*2xPen>8PP4u@*%KFN)t`k@HjI{DLr{JxAO;b zH>0)yUDTR$>H3?Jy)U@SPB3lync~x1|M36%twcEZ7t!B^nw0;^#iK>p3RX=tt_I7# zD`uP#FnY2pF3j4*-^VZ~tAjC)bn&t42@|sktdQhTHTzsu_eC#xG!vEZR@Y()Eq`>` z%)MXsrX6d;KQp&CVgF0M06edG{pnRGeE)~v`MZBzyuWGHUmONsJ@!7?x&2g!``Txx zd2ZD->creqU0aUS-p$!a&PCAsQ~T7>C&w_agmTobruB|sL{3j%&;O%GFQE(>yjnYJ z^)7V7P4OGfRC5_;c5tR~ zPa?wD9?nQxO92>lmv4s=Ucap&!?|yI`fBWmgqahp`oWp?-Tu`(QNWwNbrYEN(jWby zaejkE?W&DBPAJ}N5L`&FI&!&DxNAa?w30V>oD4cUZCvx2gWntkBJg89bddDzO6$j* zgP^rUgRO_Nyr-P?JmK+_eZ8tZ>DN2XEagzUXG{0y4t;?|cq2$We!ed0hY(I{2ii@^ zp0v61;|2{1 zt%BNHSirbuMn;#upEmv>htf+eQx1fa-JGEYqlu>$s@Ne(m zw6pRrf8&>Y3jN6KWB<)x==|ef{^Tz_{k3-KDT6CK=b5lb}(PtxpaK0ZXPn5&k)GsGNAfvZ3&O~;)5e)3=x@w==PE67%A*3HH zA#0A0dMy)DVsPGRk!B#&t){GP<9>0EV@xPq!(;b;r2SMz?^J^8-s`q?AraQE@aD8d zh#Dmu&0-KQxWl5&B*X4TJ=9+Z#Zcd;o!VBLiE6Z|F=Qw)ntV~Wq*h--Vy4l3NS2MO z^5JtBn+5dhF6{27G#-%ZLI(MVQ7LB455H!dYPa#N3jDk|fq zN;DGl_W!P1lmA*crmgoC)?KLl$+zi z+Q8>hTDq&TXYU&Km>X^aix7h*FRZygWW7{FOK=QPCYHrLsUcKI9CSwhh!d$sK}%3 zMW1lqKJKiBdT#L`7JsbzOMeY^e&)pHdaSS2nBY?PSsCbjHBr%XGsN}W^xB!NRVj)$ zf_;uaDQWk%I*7@Hika;N)XK<4(IA7xy3@$Gby4lGCf)Q7O|Qn;i#(q2!DQ{E8h(rR zPrRSLS%ZO5Ve~VkL|44qy0O$V|9BHjUJo8`1@HF?V+1_%3Bq^P$cgvkNu4f(YQ%|l zjcF@L=H9;zV~|M})!r4yN(Pa3Kp41Z@h0Z&(mIhwKroJ%# zAPyE$2Bga;O8mr95M`zr*wlwFst z)a5>oSTWH}R@Frtm{KDqqy5OFI)TzL6H%uTakdo*p3oiDW+#;qt!~u2)KIgnQ#UV1 zYO{)$N#2EIC{4m<+{fTSMPqJQA(P+`1#?@%VX>!&^1coC#R>FPq zzO4F*lC{kSOi674&7O1kwqBT`0`29N>Ooe^0-7|@MGjLSI@{WN4(C<@->{}2r>#OV z3J!~6JgCg^;)aBdac$k9S2O5`aj>ERk{Cm{yh=hpLcQb5zveHz>rEJa4RjfS;DV#Z-o7@{1 z>E7s|^8ba5MSY#u^FwvO2aR#q1ifd~j6$Vdw^m*4uyac{zs9+(s=dtXHN_QnH~}>c z_4$ICJz>7%a@8Q4u?Mshomurq78((@ z3^^a5V9FH)W5(;-Gd>^8d;;Ba_O<@Vds`Q+33)6W!&+oZtrr6BX2-f=Z4B_H&e}d# zMMFxT%$-5rj){0UMU8WC<;_gb=nTzFCU`I^*?T_`bS38|Srrg83t{9IN~E#P1euGI z1+*qU#T&)e2hw)|B0NK9PQo2$;l6)q2peF=3`JZgObg#&bhf-z;{6*WNcaUR>u)p zZ$%6uZng4mxopw0&oZbtfSCP8UX)P9_I**iFLTcf>fZ);AmG)*+$oGNoy5tb6Ix+{ zm9;yV-`6u^$ZH3|$WPQ|N9*L!xEYoYXfX)NYn4t}$FWmy*d+-QbsZGw^O3o`fu*@K z(A({BKK&s|-=Ceu&6OrWH|5BpuWUV{Y{PH`cSZ?V(!5`)`qI!^O^gF|UkG}x=I}?q zRkvnkR4ik@?Vn&Fn^t#-vssHsTKl5~{9+`n-5S=B^N%b_N_s7?Cr0-{S6@_7gEcxg zb1*{lvPFt0FRJ6KyEq1U(QOcBxoGoEbcK&ufv%IC0j9m?>{IG3>Lyw?4efEoSBlPX z_Hn`AZP6k-h${my+{iQQ#1Z&#L^7=~gE2JtGoL(6Z@@i&<=apb`^O>nvja#X@ zcdQ!}`Dxxwh<0`DD3CPfgWLghAV|)rhrTx%sX|q2t4ywhxIdX2mc8OB6Rn(UK>qunG26ONu)i8s;bLW>_s`f`@j1d$ML{LrJ1@3hxwkT=gT2-iigesyWNp)6@q!pX`R!S`ck| z=@rfwoyNj!=iFu@Sk;j*yYtHsI*`x+kjD79*Z8<|%i5Jk;u2hCz;`I~p4X zP1m+G011gv03NoCxQdzH`PUuJ7R^|~$&}m@^v$BiO3^xLZ$=u2nm;UBLdR-?eifw9 z#(yH8``2*ZuRN+{>H!jjwkZh{>x`>_Z-J# z_gUlV7w_NyEr7zw@X|A{o!}NzrLNgN!c&Bn-&oSZWEbx7@&BA3sd#x#9dh<;V*={7oHZ?4^S-{w z`nN1lIWD<&=lT03Wz?f)HSbCgy<<)Rih=CCy9$J*p66|Td_P?MEJzy|#7%KS)^?y% z$U72YrqVW-)%d!;Y=3TT)S*5sWASj`ukbSW5fH}EXghXl09a9@DuQ^#-Np5M7J;g^ z(KezBV2a(+SZz1(6lf2~0@t`#G&d5YJWfCB6{9+tMH9l>mbB|!Zyz;gf%-)wozjUE z?oZ8i%8%qjcIJxkiCOECFmnYzcXbXMd(!gS17gOg_#p~?-T0mS%&sXlk6znkEox)bbzYd*t(@L{yeq=@?fdmae^4(j5op_g> zy&E{((!U;|8TDKCXa3Or{JS1BY0S8GADyU-p3Z5tA*e&_$j;5?ya{D&iC-H9o8>c8 zdlTUwlL3!!NI+hFzw!7B&%DlNAnPo9@q|0usHe0ZWzi-QUUCl2ynkdhcWPEfXJ9

C(g?B0I{5N!%qN=az1sm4sk8A*Q-f%l!I&%-FjNIrCp~6@KdXkbu8b7 zu|qAa)*k32OwNKium~xq@`yVcvz$&mgcVLAg%(RoiK6b=J}K(;=1vte z&${iHc@!)urgs7QhR}1yg>`T#g|CLk3@Un;C--e9*l#sHW!>6FWaRe>DBG|T<#3nO zdZf>#@hXkWkzms^;xlzJ(Nb6ca&i|fcTx;8iaAM{ashG@b46Hv&}(oZbmMkoBgWdT zlYM9!ja6{tooiJ+O6oHEzJ)ocKiFOAWQ!m#N$Z<+c-C=TwmIhfH)q!%v*y1f(nX5J4YfIC@S}1 zaKX=jAZel(Wq-ps$F6l{?x2_3L1!}_q}tX6qHw|4b8d-s=Vz5V2Z^nI$QtttE;Pb% zM1|Cd8XMBhB^urVCgh-urHn|?BbH%#7UsIB4;*gd4zi3>GN9V1A4et9<<_*lq$%l?fD zk1$?%go#F7;PhPwC|@Ub3u!*g^vwJzBrEs*1}klcfbhLD4mY>g3a-jefgxDrTuV1` zHUa~sUcP2;W4P4N%V{+Y=vdYcF|#&xAw2eEBt*QkVAM+P1ja|Z<`ITb7Y+@6d%Ta^ zc2m%Y9euhAJA&Ds>v<3(6xD9dV)j9`=#|TMVeK_JZBcX+7az!wZrn{uh&p^2DB_w{ zkV1uY*)HrWZq*&oCX85fo$%vRcYD=9M|^JB!^o##fJ4$7#el3j(TYl*KD~1IoTES^ zvZUok#*lqa%>vh$>`a+*uXVpJdEJ<|5FLPrC-H)`{rLDyQ z9Y6;vytfO~M96O3G&movPBt3Xas>uvG#@;>5YY?DM^C|E;*kx5PMU*>Smra|18d_vDBh6}31Q zjN}iV{BbdyNcgDO@Rv5Ewgz>K{LDk&be%7Q2=T8)QLj1X&poz*WI^2(9MUE&py)7M zAI)CHQb2N8L`P^+NoWhQ!LrqksGCB_Y}NWYtirf3wiPCG12G|@;-*r!R@zWPekh|f_DQj)w4_%KNNkA8Cf zzL$daGK@=FxrUdbyA%Ce6e*BxG;Lw#U#}0pNKrUf)K61wby1ZOf+GG(h0%7{ARQ&< zPz@y1LYDP9j7zYAQn9adisa8la#B~AaB_BYBcG&QUn~4qfL8X(&c!_RN;@lT7VoLfvbx~Su`ZcV0f6g#F^V)3gde(DFs;o zJOTulrV_dLn`Q+}1_tm`Do5+>D7scKd$2YvUL@Z~;K}k1KqXJ~c^zMBo^mmVWqg=m zzOT;|FC7o_P)9cH=+ZCcK%u8Yi%IHxqm{PbyXhvPi4PpS4%`J`A4EGkb+kKxJ%a9WI~2><1pDP zxLZC*R8Xd?<_7+DrVoLts^jhrI^)(d9hRXAUV^IguVTWAHhjBGZjNCrXrdr<4aM2Y^h)75`Syz>#m-xvbx7~lG?Pk-y@ z|G@ZLzyF=T_#MCcZBL1>d)L4BJwN!>fA9TbN%t30@Ym%n zUk%RXL?>2cg~vOqPMfQ}!Cv&+!0d78v@uRIe99VJ-H`VBZK1J?P@etDDP%HupYZ84 zW}vg%^rWmS($*lats~}y*Yg*B)($cY=TO%U8Gks?SEYa{p!`5v9n8!C-NtS*V=YEj z9d530w9_!=y8NA#!9s6reZPXe=Yzx6D=6-KtYrlxka~o&m@bwtVPxdS3ve3>{|5Pf z-OA1o^ebAxICnL0NO|V0qgJzbUy1ru*4r0DPO2qPq_Iv?@Tp#y7LA?cNPPqE zox|eVzksp@%JoW6Sy?1wlvdz;POpWxM>{11)wwG7++sY8cU)-lL9R%TLb~iF!2wVD zHH(D2KaT6C?FS$^BXwnTKTYhq;`c;#)-WG8tY~?Ot`QkP(x!1Gr$cIN0>`@cWC*n> z-d~(_&O7Cedect8$Xy|1qrvf@!L2pqoHaU{yAcvuo9SD}u34R$S30C?&O;JYH>1Nq z8LH#iy}!alLtVDRJZUL$t-zY)d^n70iK~_Fn8T7kuc{Z$yl%owTAjDSW_4wSzzOyR zv&+~qh-}n-R?{yc;+MFUR#PK)$62SbJ*Yz|?nL+IwGZQfrVLGW(o9_a1vKGvzjX@I z^H6{qBMGmVmz5hm*s+QFP+*qSQ9hUegO+tBMZLelx3LYi>}9G$0eU*QyrS3k-bt|Q z8_*1*Lj03+Bm?TSRc52@S{5hjB^ZL`O?Il$3f_#ZX4WK6cK4)_m5-Ie#lnu1y&?&T zr`8>NH_FUc5Ha<;5hpctS6NWA=@Dq~mV;E?Kf(t}tCgcecFcLtgNA#hsgw2lW_(`) zYs;FIEvn@2(hL!MZ2>9|R!uhoagW)E-cQ?S-ed~4K!pw3;Tf&oa@mBs?B5lj1G82s zb>9tw`pW4j_7z%Rw{U*(8e615>I6ND>_dKe_PcNk*^Vg=4B zwESco4>xv(4U$Q7(R0MTL3hmT*jg>_JQI{6Xff7TBJTE`7^sX#YP_TNOLjkL=b2a7 zdj9V}^sTGk`o|A!eM1S|JpS?T|H@DO@ms(4?LYLX{onY@Km8m4o?~i3rJ63CR972A zS%Oy6AKgN^4^t=Fk@{p>o$mc{LaozwItNn_Pu;)tjo$)rmvY2Iu*@2nojxA>d$^fN z)`?FaaEr$*)Z1P!VxMz%3X~{$#6^Pr&}L&;N~z~El9t|nh%*aU=A0|2+GLb^AZW>0 z4YXy%Ua7R&;T0;U;B>U&HCI~TaoiwdR;BP^28OhP<3JlEJZ|&+%HD)R(6J=U z(tTU3A_VP%oy2(O-;Sg z)Fxv0@$B=wUKrW2liz(MzS=_SvecneXuw2zQ)_4KT{+}>9@jQ%RPd1AFnYFRQG`w9 zgAoIrLVYEDuw`G@Ub^gn;HN;?iy|H~FtLEsoJ+tv$Z)^~tOCkQZB+}GW0wJn2zbwo zeLWhDiE(VJlf0Yj28(&HSbOuiI%Hse%gEkb>f|di_?vdU09sIxB>1tdli>6zYtJv? zT+kwGycg}t$Q_ICJ)7(e=5GBBAaMFU8;zd$U|SUVt+HGMvIm-R3*w|p@>d})3G7moN>|YUxOm8i3@)PojL}K zB=mE`)&(@pW}W$L5oc>^s|VTJXUggcrXFh!Df#Uo^wO-w^p!d=W;I$*zhQo*1STnl zMa9X=dJL-H{r$uR?SjsdPN`>6QIx7XW6nA+f?2}pD%x$SUQY-lIhPhRw~Wlm?xn)| z%p9h0f!1!k7vnzznvyQ2lkPzd&#On$R)X0H(1|EoY3|1^_x;=vTUw`W#|l;o)c+m3 zIyB2MeFB;C$vJdh6ZXIS72Ea&e(iw>U(AL)^NFmvi>PS`4V56<_s+aSCHzRiNb|PH zITV!dNmU+c>;&*XEKWZQ00slb3wr|7V^vfbX>ikwBn6|P0o-4KsGE=(6H#1);qV=* zgWVHeSpkbi`}U`x7@>V2Rm7DI^UnLG)2aP9DCf*W!Di`TJ-PFBEROAE(VKw&Bonlj zfcx4iZqLqs{1R|A6~rpTw%P+3Ssm@_7Kq$9!jJYpc!EjlUu{9-(d~`!iP2^S|2XuR zSx|u6K0s$^jmjDA7DLxjcsiN;0Up#aCu!${r9j*)1zEV|Q|F@zZJhU&RT!=Qdd5hz zZmneSP$3A=-*NbgzlplViAl%RO3KWFi8b5<=K}dg_i>ZJ)u^AUn6ZODxuF9Dx1>L+ z1*3uwlH1nRu4Q#VaX|^9YjZvEg>!gp;CcN%FTQ_+5+$<-)L;kqBzH42dN3ba#;vH% z$HQ~hg!V}^r-2HoH`G8?SnSxRh8-Q_Qp8x2VRY$CcXm~R*;12f8acQ#7z}R8VNyn& z-Gj%f-P9Tsi=?Z-xI)o7k^Xv(NXE%S;HJE4QN8We0tXbTRT_qcTCiVYnRKp%7Vl|| zG3qz*qwA7Y$?|8G_I2Znl6I*CO<6s|ot#*6(o_4;JE6j4)mFHm(6gCoc-XuA)`uyH zO&8U#dN=Q7yKZ@pE`S4^DV9unqQ?%nwZ8WVznIsql+KB$P8ii>xXi0HPED0iVPFa{ zr9>OGiUV5Dl5B6Qz|TM~7((m^Eh)RPplQDYZkzW-PTx+{N6)H<_^Fy-zak0!-$%7? zi2m3k1pb>f+E23a;qUp*-~5_idgdO_gCV6}BhYgU$QUZ!oU{}S&i6CiYpv{7%8-eN zUHJ~<{YUBP-G3+gHb97?0k>B<%+y6ohhA@+##!LjZf0bmA0?C<26WZi4fQo^#qz4W zhxt!n7T|3;Jl=pxSFdK>f&lrD1q%Kpr*?tV8)^a#Eqpkr000~u7CH2jwEyhp@eut-b^^7mbHqwY34p#J_!76AT1mUvg^2g>uRa8 zx7+&Hjuj`vcvQVz@1r?NZ_Q0g8?$P6Zq=##`J$x{-a6%ho^-+CDySl+_YK_TU}1q2 zd>Hq(H%nfbdHZDOOz&L=g|vUCrG0OeI+QGf^8cOB!Ex$kV;nZBwB5tutL&Z>0o-Hb`*16uIeh1(ig z`87;&jB8$IcYhT8#zZ0OR;p<6C~fG-BFYJey`|?9XaqHZ@Sv`oWVI#q7#XwDQ(}a{ zFZFX>zBX)#bcXPIqco8MQOK(c_ibC=H>~Q!z(;PDq%xrjH=XT*@gcX?Y8cegsGm$* zN7CnxZU_TN(-w}}H7CuQCQk#r(QXZM350a)FKhO=0394?M9wCprn4?xSDwli%sm=4 zY1-AsoWDQm(U5w9X--_P_4U8S^_7%P+xwZ}?m!wUhLAG;+O znWM%`G`->4%UGG{GBSgpMHru&1} zaUD+NSl(^CW75c`WRfzMDY*l$!ZzFh+-+Kl9E{e%9LIm(^XaXTnz+Mc0FZeQRxVFI z)MRiLDr8J{>p$)NcI&0j`*E~<;yF5=OaCjb02B+3QO$VjDzmkHJZKphB$DcgcGEN@ z%AfW&=(iVoJfjF3{nj#i;rs4{1r<(?vJ|!L#mg5JA|ns8N>|YNoHjUGnPQOVl|<98 zt_)!x#BRsFLfE~l!vKa0CcGX_z6Wi=}lUGe_)4hG7)y@h2G@c z&|KmnTNETha5tQ~TUf;2>jK}%Ss~75tHG_TP+d1)(h97axB%Z*%4fT9#uzA_Q#Olr zYuFoL%r5IwI#^}F-nmbBGe1^Wq_$w>{@T1QoMPP`)s*r=9rJ3Oxo^Q#6O1UOPAQuc zoMB6E)@=fVY*w$4!KrMwy6$NI^R|@8nWeK|8S5L-AOJ$FmB(-p_o6#9dymL zyDDP}7y+aSW2Aka=z_0(<@Sru{q)tv?B%)V;~wUvaQM{ah9vrQxIU z@Ob@Cp?H3M#eF~%vH?>z(`Z3Y3e&QP*_|8MWkH3#!B9}Sm7kbcR09!gPB%V9fDRxN zH)$91xT$*TD2p|+pr7D_5;T83iCCCCmNz3C`jfP1&f{u*W~5hk?y9FZL2nG#Qf)%0 z=?o&u%9UlZ?_}oYA{X(ytNZ91TnG0$!=UrP;h4Q16!X=gKA=ra)9`|Rfdj}MeqIg?!8w@Ld}M7DOadHoC`b=W#*5!UW{ zV~a^|eekK91or>7A_>W$$0`i)+lz`L2?w&gZ z`<pM2?!f4uePum4|yjNMlQP;~4t#tcq|#^mz- z_M@ARxYfKm8R9=$kB;2Q-WWCSXSCyByJxMTs9aOlv8tt(&VkcHq7tji^w@n`TFpvu zxENzWXT!J|2B8NSY5Db?U4qv$Z_F-Us$;PkL#zZsMLqPSRnlzs{ViwrmJXZ;Sn}n^ z{kXs*TNV6a!5)R-_Uj?1^s!Q(`zu0eHvx9S=Ik*T_m5cOp}1K2oJ-&g#{C!{G>L#6 zs+X1=U1>9Hr?b$inAM6qr@S=!!PC3OxTQ7=5QItfsBt}+1(Y#H=^w2scNcHLsbMzG z4nK5@F$0VZs%UiM+M%RnMCp|F+7pW>rSaZga?(@B7;dp^Myp|yoVkj?vd}>jCa&%q z!YEAa@Hf{CbsAGGMbta%@-6&QsiIG#n=9^yVsnZWio40y`A*JTX!?Ekro2g_H%Z zn)DBZEB2LQLxDeA`Q|n`k8ZmmBs1D=t}N)=lNQ56sln zj}khO@iR;C1DrAJ9O(N9%bMhG4;}*#yRAB-)fQm+vpPYSil4C*OzQqtR3>R9vN9Vp zAKoryMjlj*DQWNc`)&ybL1j{fL#)}o8iaViP}20VF_cDm%zV}vFi^&xV5(cf?bELx z^l8D^U5!Y=Ebz*zX;S#|@d%6@nQPgM$z9w@-%6*Tf|S~4ACl|~0XI}FF)`SMUmaj4 zuTgbUdhW%s?4(hhNM?OEK5w$O zd)?iO&k+g)QCg`A2&skgp;A@KpQ3&OK?{{o0jU)d5`_;fs)W=QK7#lH6x0vpy_wzH z-MhW>?fLA)%~;9a?(FRRdGqGYZ{Ezj2LK(w|8{=_@apgI7N2*2@X|k^(=y>c!^|g? zqLN);iFr-c&*OudIHz1t682J&m26eB5@t!a)dEXQ8t05@W~z#&USy_76*}V3m4O|h(@4D_-x%VMgAK4R$eYTkxBGgySAC_<7K z0Uy+4)+iJVJ(&tdA36hl^OTKO49kXl&NNJ;Qjz3ItdjR77N_52dDXI+$#T4e3{%@J zicM8dW{@H*_n=}e99N2%`3^6>myY!)T!MJngoB~P6gBaYGgu0jj4^<65g&Rlm~cl; zLi2`MP;4uqTriD9p6Sd~Y?e!$vY7eh%uil?xOn*CEtr4_xBEWjk*y-js`F|#G40$; zv8-X{jw%+5J^#O#UVr)EUiz|#%;5jlV+N=ERW0Ead-n|7q?Bxfsx`&QiDknEPU*^= z#&YAZgWjIT@?sUt^JyJ#E-2az4CGXcq8y{@s9E~n8i#=jhfFcsTpDGXvNQwTNPsLG z#@f6051C8GINYdVx_Wl$q^V@jt9sr|Jq_CmN_kQ-6-{HBY8BG33t4oInJ4BeHK7bt zX3ap4F?Wt-?I~8WSQ>hY-e)tgr)a9WJ!u+wlUdei37_(n6vknntyt$r6g|r{EDBcJ zEx~bB!+ghKZ*_!|*p-ZuE#A*SKjt~28CJ=}AEqEFGSHVba%@;vv?WWmj^n)ziH0mb zQWYOrdrywdD<#b)I(fY6a5-`8h79Zsjl=mF-)^n6geb!84U15r2}Q>I2lARRr)cBG z9JNA$VXI^38g4EPAshye)h0nHd!3Lf$N8pSx1DBo@C%Y*{0kz%whY|nCB}nKtM)=8 zlHIawe3yKCkY8-Yw5M(AtNA5Sr;_L#Hk@rp3FlkO`8GKfNDG;EFc=a{eW+Y?guc^F z0rvN~w;SL5$oT{M?Qoym-;PUR0*HnMM-o&so15@i z{9_zqW3X8>l-!t-wG9)8Bzpm=xL|0xa`Ay9q8y8x@Rw7f47{CoguOume8wk6>x)8S zb$v-bAP(|`HwJ`(&jHN*G69DVHRZ@=>L!8l3h zMU>Id=N7ii%f2TpFtkGTt6D6X&hS}_0+t!LhVH$*=ZC-8`QbB_kqayPKq%dKtJyVp z;ym`=M6k0t)QrbqV+M9P;;Eka+FtYC?R)){w0|&iU-VXWUq=cIRt1w3sF&-$hH>4D zHc)jb^^P)KJLz&GU3LW;V@ri2V<^IW#N%T(?;NM>`9nzA`+YAxj;upjBC5ao~>QKriQ|}*Ug8S z{_J;o!J}+m)m0iyNhI5FIy_^ld$Ph+Ne87StLr}KmV}wXgi%T&)~Zi*w`b#TxmYZs zR5-PgHmF+;Hy@Ip)D|c~dQMGDPo_rFW9ihW9S}wy07OXVAg58qRtASnQ(5A?bsN^o zFTWap;^xk4iF|f6w2t8*uu`R9BM)!;)$!+V{^rk6f@ff}*RD3)n!$vU%rX!Ss9DRH zw-*(YC8@V2y=KCO!GaV*3sQ&SDTRcWf;H%(U6*}F35pk(>ot#ex>9(;Tqhv?n?oCb$|)S%EN^0Ps} z7YCJCw#WqAq$=DY5&e*CpyVT$>NroliBeJOxW0X?{-g${aM=-s6Bcw+_)ni*PwRi@)8NTz6Q`>OvT4E;+grq9Y z&}``S=?jFGpo>RAnbLp8rvjl&h6MW~KP764S7(Zkg3B;1m&%nff;Y%c2N1?Nir9dW z;#?%m+Yp~hjUp8UVCD@;6Vp61x4-|~3m;s5{16J~G<5QkVkh@C>ja-BfBmJW_x3#X z?vpOK12^H$V^gWr1ke6FX4v6|Z#bGcm8t~C$_OkEaJ@_rST?NX(BZd+IzQhT|GArM zoLW<*i%yqEdHPNC7#ot!(sxWPv)qBZdCD|$M*Z-g`X_ELz1M3B^~F6$g1`dwZ)aiPKcvS7WW&GA#Bvsh^I}^g{l|E>`KIoYEjI(n~*;A z(IcGlx6$p{7CZRaD2=z4_@J%zYo_sfDaY=`KG-&|Trf=4X0uru7DaGB^j3ngJyFLi z!m+J&R6L4@8*o~M;B;1@Y0~VRVUVs>yc&Rh-z#i)gstKYX2{=A^jRBKa3Xj+^zq;v zhatWfJ*ae|Ob9cGs9Rt&51FS?CYY#(^4Rz7X4u4I78H&qyBD`Y4-e2W-|BDmlIAba z4j2$oF_@-eqL#rDlysc?l(8ptz-iwgwX7;3xacz&d=zv2X&r}w1%u{L9nsb<@QYsj zj~cnIHvzg|1=#!oK+jzO{r?8I;jaKUT>;ql2*9UN4YvRH0EvqL2Oa|W+&%aozqxA? z;BJHqhd%{yt*G+AQ7*f0t`#L8PG_#Bk`J?&O1@?mNmiu& z+k#M8YX6+I?)9bpi*to^H0>WMi)*!i{e1j78)ca13RF^^dsD>0;A6;G-%xiRYouCl z_$5SP;7Gh4a|ZN=I?d)*GNh7Q>?>*1#Tq|$s| zsx;jL48Fb460NNStyS%fLB;IqZdSa0^_W$Z=EH!vtl17lcPmuA9o|xTd3|nOTz9yv zwlBqMcKOxDk+4g`F4`{~wjVcSAAv+&%&L2sLZU8-y67x%@btj7rx*(x1tjW13h;9p zaxD$Fvs|qMBNBDhoPmn6cnly{gh`TJ#bZw^S9VqH(WL62qU#4z(IpjK%N_rYxGd>A zCF2~6lB%v+(=Bv7WaZ~_YiLb!%K4MZu4Rs8$}LO@+7M62I6d=4Ev*bW9Ov7@v@RDR z!+KN9f9g-jvyc!XK!iPZ5u!@)C75engRh#CFjyyqU+^~vezogZ?=k|<>+Q{+L%%k+ ztC@36rW_=Ca6nw{sd7LL!y`H^^s zEP@LDvw5P&Y}_qpjdIr5=2>G@ivn{0M2J1m_Dm|HHbpy(JFJP_sKz7L3JprhBW31# zH7M1}`oKWgkuuzpjktdGkCa92p@d-~+iZ#c0cIm-!J$T>ThgjrTb9?tW;2nFVmf9b zDi$4hUjcZ(h3Qv)D?QiA3^>YMwTf630~&(GFNE?|W#fYPPRx?KV6h_sR+UCIEr+1@^vG9;?o;XWXWG5EIS*Nky?J`Q#J zE$estEbDhK`Yr2sqxmcszD`0sVJO#XeeO#ThD zOnx(3CVv+#lm7-SlYbR0lPhSM{F4~Z`P=`Y@GpRms1=c>5v6IwZgl7_-NqA-y3b>5XocdJy>}|zoFYRD!#g2${P3;3X z{NPlc>C9BKiCUZGQh_EWiR)T{CJd45B~6>PnPodjO&TIco)=s7rN_M^g3arCL2lAN za*a7(x%Y3?_Wsq-S~MZQ$}N9#%imR7@mZI+EUY(4L*z+su}SL2@=1;pkPv9id4TH! zxy9vS(^Qt~`raF$1&Ub$U*t|~R-wo!qiHEq`aG5hjJ4SgSd~vWiN972e^pUhqA!;M z()-@k+;6ik(N}F>l0J_e-Gszlt>LcC9Cwv;-0R`MEvfL5aI4*Lt7h6L0oSSlt{Pgq nfCGP}!b>W=q{7S7)+~R