Skip to content

Commit

Permalink
[FLINK-29041][tests] Add utility to test POJO compliance without any …
Browse files Browse the repository at this point in the history
…Kryo usage
  • Loading branch information
zentol committed Aug 24, 2022
1 parent 1ed1deb commit 254b276
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ You can also register your own custom serializer if required; see [Serialization
Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.

You can test whether your class adheres to the POJO requirements via `org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo()` from the `flink-test-utils`.
If you additionally want to ensure that no field of the POJO will be serialized with Kryo, use `assertSerializedAsPojoWithoutKryo()` instead.

The following example shows a simple POJO with two public fields.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ You can also register your own custom serializer if required; see [Serialization
Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.

You can test whether your class adheres to the POJO requirements via `org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo()` from the `flink-test-utils`.
If you additionally want to ensure that no field of the POJO will be serialized with Kryo, use `assertSerializedAsPojoWithoutKryo()` instead.

The following example shows a simple POJO with two public fields.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class PojoTestUtils {
* {@link PojoSerializer}, as documented <a
* href="https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos">here</a>.
*
* <p>Note that this check will succeed even if the Pojo is partially serialized with Kryo. If
* this is not desired, use {@link #assertSerializedAsPojoWithoutKryo(Class)} instead.
*
* @param clazz class to analyze
* @param <T> class type
* @throws AssertionError if instances of the class cannot be serialized as a POJO
Expand All @@ -52,4 +55,37 @@ public static <T> void assertSerializedAsPojo(Class<T> clazz) throws AssertionEr
TypeExtractor.class.getCanonicalName())
.isInstanceOf(PojoSerializer.class);
}

/**
* Verifies that instances of the given class fulfill all conditions to be serialized with the
* {@link PojoSerializer}, as documented <a
* href="https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos">here</a>,
* without any field being serialized with Kryo.
*
* @param clazz class to analyze
* @param <T> class type
* @throws AssertionError if instances of the class cannot be serialized as a POJO or required
* Kryo for one or more fields
*/
public static <T> void assertSerializedAsPojoWithoutKryo(Class<T> clazz) throws AssertionError {
final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.disableGenericTypes();

final TypeInformation<T> typeInformation = TypeInformation.of(clazz);
final TypeSerializer<T> actualSerializer;
try {
actualSerializer = typeInformation.createSerializer(executionConfig);
} catch (UnsupportedOperationException e) {
throw new AssertionError(e);
}

assertThat(actualSerializer)
.withFailMessage(
"Instances of the class '%s' cannot be serialized as a POJO, but would use a '%s' instead. %n"
+ "Re-run this test with INFO logging enabled and check messages from the '%s' for possible reasons.",
clazz.getSimpleName(),
actualSerializer.getClass().getSimpleName(),
TypeExtractor.class.getCanonicalName())
.isInstanceOf(PojoSerializer.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.junit.jupiter.api.Test;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

class PojoTestUtilsTest {
Expand All @@ -34,9 +36,32 @@ void testPojoAccepted() {
PojoTestUtils.assertSerializedAsPojo(Pojo.class);
}

@Test
void testPojoAcceptedIfKryoRequired() {
PojoTestUtils.assertSerializedAsPojo(PojoRequiringKryo.class);
}

@Test
void testWithoutKryoPojoAccepted() {
PojoTestUtils.assertSerializedAsPojoWithoutKryo(Pojo.class);
}

@Test
void testWithoutKryoPojoRejected() {
assertThatThrownBy(
() ->
PojoTestUtils.assertSerializedAsPojoWithoutKryo(
PojoRequiringKryo.class))
.isInstanceOf(AssertionError.class);
}

private static class NoPojo {}

public static class Pojo {
public int x;
}

public static class PojoRequiringKryo {
public List<Integer> x;
}
}

0 comments on commit 254b276

Please sign in to comment.