Skip to content

Commit

Permalink
[FLINK-10157][State TTL] Allow null user values in map state with TTL
Browse files Browse the repository at this point in the history
This closes apache#6707.
  • Loading branch information
azagrebin authored and StefanRRichter committed Sep 21, 2018
1 parent 00add9c commit f343204
Show file tree
Hide file tree
Showing 14 changed files with 552 additions and 87 deletions.
3 changes: 3 additions & 0 deletions docs/dev/stream/state/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ will lead to compatibility failure and `StateMigrationException`.

- The TTL configuration is not part of check- or savepoints but rather a way of how Flink treats it in the currently running job.

- The map state with TTL currently supports null user values only if the user value serializer can handle null values.
If the serializer does not support null values, it can be wrapped with `NullableSerializer` at the cost of an extra byte in the serialized form.

#### Cleanup of Expired State

Currently, expired values are only removed when they are read out explicitly,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
* <p>To create keyed map state (on a KeyedStream), use
* {@link org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}.
*
* <p>Note: The map state with TTL currently supports {@code null} user values
* only if the user value serializer can handle {@code null} values.
* If the serializer does not support {@code null} values,
* it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
* at the cost of an extra byte in the serialized form.
*
* @param <UK> The type of the keys that can be added to the map state.
*/
@PublicEvolving
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@

/**
* Configuration of state TTL logic.
*
* <p>Note: The map state with TTL currently supports {@code null} user values
* only if the user value serializer can handle {@code null} values.
* If the serializer does not support {@code null} values,
* it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
* at the cost of an extra byte in the serialized form.
*/
public class StateTtlConfig implements Serializable {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils.runtime;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.List;

/**
* Serializer wrapper to add support of {@code null} value serialization.
*
* <p>If the target serializer does not support {@code null} values of its type,
* you can use this class to wrap this serializer.
* This is a generic treatment of {@code null} value serialization
* which comes with the cost of additional byte in the final serialized value.
* The {@code NullableSerializer} will intercept {@code null} value serialization case
* and prepend the target serialized value with a boolean flag marking whether it is {@code null} or not.
* <pre> {@code
* TypeSerializer<T> originalSerializer = ...;
* TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrap(originalSerializer);
* // or
* TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrapIfNullIsNotSupported(originalSerializer);
* }}</pre>
*
* @param <T> type to serialize
*/
public class NullableSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 3335569358214720033L;
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

@Nonnull
private final TypeSerializer<T> originalSerializer;
private final byte[] padding;

private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
this.originalSerializer = originalSerializer;
this.padding = createPadding(originalSerializer.getLength(), padNullValueIfFixedLen);

}

private static <T> byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) {
boolean padNullValue = originalSerializerLength > 0 && padNullValueIfFixedLen;
return padNullValue ? new byte[originalSerializerLength] : EMPTY_BYTE_ARRAY;
}

/**
* This method tries to serialize {@code null} value with the {@code originalSerializer}
* and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code originalSerializer}.
*
* @param originalSerializer serializer to wrap and add {@code null} support
* @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
* @return serializer which supports {@code null} values
*/
public static <T> TypeSerializer<T> wrapIfNullIsNotSupported(
@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
return checkIfNullSupported(originalSerializer) ?
originalSerializer : wrap(originalSerializer, padNullValueIfFixedLen);
}

/**
* This method checks if {@code serializer} supports {@code null} value.
*
* @param serializer serializer to check
*/
public static <T> boolean checkIfNullSupported(@Nonnull TypeSerializer<T> serializer) {
int length = serializer.getLength() > 0 ? serializer.getLength() : 1;
DataOutputSerializer dos = new DataOutputSerializer(length);
try {
serializer.serialize(null, dos);
} catch (IOException | RuntimeException e) {
return false;
}
Preconditions.checkArgument(
serializer.getLength() < 0 || serializer.getLength() == dos.getCopyOfBuffer().length,
"The serialized form of the null value should have the same length " +
"as any other if the length is fixed in the serializer");
DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer());
try {
Preconditions.checkArgument(serializer.deserialize(dis) == null);
} catch (IOException e) {
throw new RuntimeException(
String.format("Unexpected failure to deserialize just serialized null value with %s",
serializer.getClass().getName()), e);
}
Preconditions.checkArgument(
serializer.copy(null) == null,
"Serializer %s has to be able properly copy null value if it can serialize it",
serializer.getClass().getName());
return true;
}

private boolean padNullValue() {
return padding.length > 0;
}

/**
* This method wraps the {@code originalSerializer} with the {@code NullableSerializer} if not already wrapped.
*
* @param originalSerializer serializer to wrap and add {@code null} support
* @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
* @return wrapped serializer which supports {@code null} values
*/
public static <T> TypeSerializer<T> wrap(
@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
return originalSerializer instanceof NullableSerializer ?
originalSerializer : new NullableSerializer<>(originalSerializer, padNullValueIfFixedLen);
}

@Override
public boolean isImmutableType() {
return originalSerializer.isImmutableType();
}

@Override
public TypeSerializer<T> duplicate() {
TypeSerializer<T> duplicateOriginalSerializer = originalSerializer.duplicate();
return duplicateOriginalSerializer == originalSerializer ?
this : new NullableSerializer<>(originalSerializer.duplicate(), padNullValue());
}

@Override
public T createInstance() {
return originalSerializer.createInstance();
}

@Override
public T copy(T from) {
return from == null ? null : originalSerializer.copy(from);
}

@Override
public T copy(T from, T reuse) {
return from == null ? null :
(reuse == null ? originalSerializer.copy(from) : originalSerializer.copy(from, reuse));
}

@Override
public int getLength() {
return padNullValue() ? 1 + padding.length : -1;
}

@Override
public void serialize(T record, DataOutputView target) throws IOException {
if (record == null) {
target.writeBoolean(true);
target.write(padding);
} else {
target.writeBoolean(false);
originalSerializer.serialize(record, target);
}
}

@Override
public T deserialize(DataInputView source) throws IOException {
boolean isNull = deserializeNull(source);
return isNull ? null : originalSerializer.deserialize(source);
}

@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
boolean isNull = deserializeNull(source);
return isNull ? null : (reuse == null ?
originalSerializer.deserialize(source) : originalSerializer.deserialize(reuse, source));
}

private boolean deserializeNull(DataInputView source) throws IOException {
boolean isNull = source.readBoolean();
if (isNull) {
source.skipBytesToRead(padding.length);
}
return isNull;
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
boolean isNull = source.readBoolean();
target.writeBoolean(isNull);
if (isNull) {
target.write(padding);
} else {
originalSerializer.copy(source, target);
}
}

@Override
public boolean equals(Object obj) {
return obj == this ||
(obj != null && obj.getClass() == getClass() &&
originalSerializer.equals(((NullableSerializer) obj).originalSerializer));
}

@Override
public boolean canEqual(Object obj) {
return (obj != null && obj.getClass() == getClass() &&
originalSerializer.canEqual(((NullableSerializer) obj).originalSerializer));
}

@Override
public int hashCode() {
return originalSerializer.hashCode();
}

@Override
public NullableSerializerConfigSnapshot<T> snapshotConfiguration() {
return new NullableSerializerConfigSnapshot<>(originalSerializer);
}

@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof NullableSerializerConfigSnapshot) {
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();

CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
previousKvSerializersAndConfigs.get(0).f0,
UnloadableDummyTypeSerializer.class,
previousKvSerializersAndConfigs.get(0).f1,
originalSerializer);

if (!compatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new NullableSerializer<>(
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue()));
}
}

return CompatibilityResult.requiresMigration();
}

/**
* Configuration snapshot for serializers of nullable types, containing the
* configuration snapshot of its original serializer.
*/
@Internal
public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
private static final int VERSION = 1;

/** This empty nullary constructor is required for deserializing the configuration. */
@SuppressWarnings("unused")
public NullableSerializerConfigSnapshot() {}

NullableSerializerConfigSnapshot(TypeSerializer<T> originalSerializer) {
super(originalSerializer);
}

@Override
public int getVersion() {
return VERSION;
}
}
}
Loading

0 comments on commit f343204

Please sign in to comment.