Skip to content

Commit

Permalink
[FLINK-8879] [avro] Add concurrency check Avro Serializer on DEBUG le…
Browse files Browse the repository at this point in the history
…vel.
  • Loading branch information
StephanEwen committed Mar 7, 2018
1 parent 57ff6e8 commit be7c895
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

Expand All @@ -52,12 +54,24 @@
* (ReflectDatumReader / -Writer). The serializer instantiates them depending on
* the class of the type it should serialize.
*
* <p><b>Important:</b> This serializer is NOT THREAD SAFE, because it reuses the data encoders
* and decoders which have buffers that would be shared between the threads if used concurrently
*
* @param <T> The type to be serialized.
*/
public class AvroSerializer<T> extends TypeSerializer<T> {

private static final long serialVersionUID = 1L;

/** Logger instance. */
private static final Logger LOG = LoggerFactory.getLogger(AvroSerializer.class);

/** Flag whether to check for concurrent thread access.
* Because this flag is static final, a value of 'false' allows the JIT compiler to eliminate
* the guarded code sections. */
private static final boolean CONCURRENT_ACCESS_CHECK =
LOG.isDebugEnabled() || AvroSerializerDebugInitHelper.setToDebug;

// -------- configuration fields, serializable -----------

/** The class of the type that is serialized by this serializer. */
Expand All @@ -78,6 +92,9 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
/** The serializer configuration snapshot, cached for efficiency. */
private transient AvroSchemaSerializerConfigSnapshot configSnapshot;

/** The currently accessing thread, set and checked on debug level only. */
private transient volatile Thread currentThread;

// ------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -127,23 +144,56 @@ public T createInstance() {

@Override
public void serialize(T value, DataOutputView target) throws IOException {
checkAvroInitialized();
this.encoder.setOut(target);
this.writer.write(value, this.encoder);
if (CONCURRENT_ACCESS_CHECK) {
enterExclusiveThread();
}

try {
checkAvroInitialized();
this.encoder.setOut(target);
this.writer.write(value, this.encoder);
}
finally {
if (CONCURRENT_ACCESS_CHECK) {
exitExclusiveThread();
}
}
}

@Override
public T deserialize(DataInputView source) throws IOException {
checkAvroInitialized();
this.decoder.setIn(source);
return this.reader.read(null, this.decoder);
if (CONCURRENT_ACCESS_CHECK) {
enterExclusiveThread();
}

try {
checkAvroInitialized();
this.decoder.setIn(source);
return this.reader.read(null, this.decoder);
}
finally {
if (CONCURRENT_ACCESS_CHECK) {
exitExclusiveThread();
}
}
}

@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
checkAvroInitialized();
this.decoder.setIn(source);
return this.reader.read(reuse, this.decoder);
if (CONCURRENT_ACCESS_CHECK) {
enterExclusiveThread();
}

try {
checkAvroInitialized();
this.decoder.setIn(source);
return this.reader.read(reuse, this.decoder);
}
finally {
if (CONCURRENT_ACCESS_CHECK) {
exitExclusiveThread();
}
}
}

// ------------------------------------------------------------------------
Expand All @@ -152,8 +202,19 @@ public T deserialize(T reuse, DataInputView source) throws IOException {

@Override
public T copy(T from) {
checkAvroInitialized();
return avroData.deepCopy(schema, from);
if (CONCURRENT_ACCESS_CHECK) {
enterExclusiveThread();
}

try {
checkAvroInitialized();
return avroData.deepCopy(schema, from);
}
finally {
if (CONCURRENT_ACCESS_CHECK) {
exitExclusiveThread();
}
}
}

@Override
Expand All @@ -163,8 +224,10 @@ public T copy(T from, T reuse) {

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
T value = deserialize(source);
serialize(value, target);
// we do not have concurrency checks here, because serialize() and
// deserialize() do the checks and the current concurrency check mechanism
// does provide additional safety in cases of re-entrant calls
serialize(deserialize(source), target);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -277,6 +340,31 @@ private void initializeAvro() {
this.decoder = new DataInputDecoder();
}

// --------------------------------------------------------------------------------------------
// Concurrency checks
// --------------------------------------------------------------------------------------------

private void enterExclusiveThread() {
// we use simple get, check, set here, rather than CAS
// we don't need lock-style correctness, this is only a sanity-check and we thus
// favor speed at the cost of some false negatives in this check
Thread previous = currentThread;
Thread thisThread = Thread.currentThread();

if (previous == null) {
currentThread = thisThread;
}
else if (previous != thisThread) {
throw new IllegalStateException(
"Concurrent access to KryoSerializer. Thread 1: " + thisThread.getName() +
" , Thread 2: " + previous.getName());
}
}

private void exitExclusiveThread() {
currentThread = null;
}

// ------------------------------------------------------------------------
// Serializer Snapshots
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.typeutils;

import org.apache.flink.annotation.Internal;

/**
* Simple helper class to initialize the concurrency checks for tests.
*
* <p>The flag is automatically set to true when assertions are activated (tests)
* and can be set to true manually in other tests as well;
*/
@Internal
class AvroSerializerDebugInitHelper {

/** This captures the initial setting after initialization. It is used to
* validate in tests that we never change the default to true. */
static final boolean INITIAL_SETTING;

/** The flag that is used to initialize the KryoSerializer's concurrency check flag. */
static boolean setToDebug = false;

static {
// capture the default setting, for tests
INITIAL_SETTING = setToDebug;

// if assertions are active, the check should be activated
//noinspection AssertWithSideEffects,ConstantConditions
assert setToDebug = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.typeutils;

import org.junit.Test;

import static org.junit.Assert.assertTrue;

/**
* A test that validates that the concurrency checks in the Avro Serializer
* are not hard coded to active.
*
* <p>The debug initialization in the AvroSerializer happens together with class
* initialization (that makes it peak efficient), which is why this test needs to
* run in a fresh JVM fork, and the JVM fork of this test should not be reused.
*
* <p><b>Important:</b> If you see this test fail and the initial settings are still
* correct, check the assumptions above (on fresh JVM fork).
*/
public class AvroSerializerConcurrencyCheckInactiveITCase {

// this sets the debug initialization back to its default, even if
// by default tests modify it (implicitly via assertion loading)
static {
AvroSerializerDebugInitHelper.setToDebug = AvroSerializerDebugInitHelper.INITIAL_SETTING;
}

/**
* This test checks that concurrent access is not detected by default, meaning that
* the thread concurrency checks are off by default.
*/
@Test
public void testWithNoConcurrencyCheck() throws Exception {
boolean assertionError;
try {
new AvroSerializerConcurrencyTest().testConcurrentUseOfSerializer();
assertionError = false;
}
catch (AssertionError e) {
assertionError = true;
}

assertTrue("testConcurrentUseOfSerializer() should have failed if " +
"concurrency checks are off by default", assertionError);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.typeutils;

import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.core.testutils.CheckedThread;

import org.junit.Test;

import java.io.IOException;

import static org.junit.Assert.fail;

/**
* This tests that the {@link AvroSerializer} properly fails when accessed by two threads
* concurrently.
*
* <p><b>Important:</b> This test only works if assertions are activated (-ea) on the JVM
* when running tests.
*/
public class AvroSerializerConcurrencyTest {

@Test
public void testConcurrentUseOfSerializer() throws Exception {
final AvroSerializer<String> serializer = new AvroSerializer<>(String.class);

final BlockerSync sync = new BlockerSync();

final DataOutputView regularOut = new DataOutputSerializer(32);
final DataOutputView lockingOut = new LockingView(sync);

// this thread serializes and gets stuck there
final CheckedThread thread = new CheckedThread("serializer") {
@Override
public void go() throws Exception {
serializer.serialize("a value", lockingOut);
}
};

thread.start();
sync.awaitBlocker();

// this should fail with an exception
try {
serializer.serialize("value", regularOut);
fail("should have failed with an exception");
}
catch (IllegalStateException e) {
// expected
}
finally {
// release the thread that serializes
sync.releaseBlocker();
}

// this propagates exceptions from the spawned thread
thread.sync();
}

// ------------------------------------------------------------------------

private static class LockingView extends DataOutputSerializer {

private final BlockerSync blocker;

LockingView(BlockerSync blocker) {
super(32);
this.blocker = blocker;
}

@Override
public void writeInt(int v) throws IOException {
blocker.blockNonInterruptible();
}
}
}
Loading

0 comments on commit be7c895

Please sign in to comment.