Skip to content

Commit

Permalink
[FLINK-2436] [streaming] Make ByteStreamStateHandles more robust
Browse files Browse the repository at this point in the history
Closes apache#958
  • Loading branch information
gyfora committed Aug 2, 2015
1 parent d73cb73 commit 83102f0
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
private static final long serialVersionUID = -962025800339325828L;

private transient Serializable state;
private boolean isWritten = false;

public ByteStreamStateHandle(Serializable state) {
this.state = state;
if (state != null) {
this.state = state;
} else {
throw new RuntimeException("State cannot be null");
}
}

/**
Expand All @@ -54,16 +59,25 @@ public ByteStreamStateHandle(Serializable state) {
public Serializable getState() throws Exception {
if (!stateFetched()) {
ObjectInputStream stream = new ObjectInputStream(getInputStream());
state = (Serializable) stream.readObject();
stream.close();
try {
state = (Serializable) stream.readObject();
} finally {
stream.close();
}
}
return state;
}

private void writeObject(ObjectOutputStream oos) throws Exception {
ObjectOutputStream stream = new ObjectOutputStream(getOutputStream());
stream.writeObject(state);
stream.close();
if (!isWritten) {
ObjectOutputStream stream = new ObjectOutputStream(getOutputStream());
try {
stream.writeObject(state);
isWritten = true;
} finally {
stream.close();
}
}
oos.defaultWriteObject();
}

Expand All @@ -74,4 +88,11 @@ private void writeObject(ObjectOutputStream oos) throws Exception {
public boolean stateFetched() {
return state != null;
}

/**
* Checks whether the state has already been written to the external store
*/
public boolean isWritten() {
return isWritten;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;

import org.apache.flink.util.InstantiationUtil;
import org.junit.Test;

public class ByteStreamStateHandleTest {

@Test
public void testHandle() throws Exception {
MockHandle handle;

try {
handle = new MockHandle(null);
fail();
} catch (RuntimeException e) {
// expected behaviour
}

handle = new MockHandle(1);

assertEquals(1, handle.getState());
assertTrue(handle.stateFetched());
assertFalse(handle.isWritten());
assertFalse(handle.discarded);

MockHandle handleDs = serializeDeserialize(handle);

assertEquals(1, handle.getState());
assertTrue(handle.stateFetched());
assertTrue(handle.isWritten());
assertTrue(handle.generatedOutput);
assertFalse(handle.discarded);

assertFalse(handleDs.stateFetched());
assertTrue(handleDs.isWritten());
assertFalse(handleDs.generatedOutput);
assertFalse(handle.discarded);

try {
handleDs.getState();
fail();
} catch (UnsupportedOperationException e) {
// good
}

MockHandle handleDs2 = serializeDeserialize(handleDs);

assertFalse(handleDs2.stateFetched());
assertTrue(handleDs2.isWritten());
assertFalse(handleDs.generatedOutput);
assertFalse(handleDs2.generatedOutput);
assertFalse(handleDs2.discarded);

handleDs2.discardState();
assertTrue(handleDs2.discarded);

}

@SuppressWarnings("unchecked")
private <X extends StateHandle<?>> X serializeDeserialize(X handle) throws IOException,
ClassNotFoundException {
byte[] serialized = InstantiationUtil.serializeObject(handle);
return (X) InstantiationUtil.deserializeObject(serialized, Thread.currentThread()
.getContextClassLoader());
}

private static class MockHandle extends ByteStreamStateHandle {

private static final long serialVersionUID = 1L;

public MockHandle(Serializable state) {
super(state);
}

boolean discarded = false;
transient boolean generatedOutput = false;

@Override
public void discardState() throws Exception {
discarded = true;
}

@Override
protected OutputStream getOutputStream() throws Exception {
generatedOutput = true;
return new ByteArrayOutputStream();
}

@Override
protected InputStream getInputStream() throws Exception {
throw new UnsupportedOperationException();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -90,13 +91,23 @@ public void testFileStateHandle() throws Exception {
+ hdPath);

FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);

try {
handleProvider.createStateHandle(null);
fail();
} catch (RuntimeException e) {
// good
}

assertTrue(handle.stateFetched());
assertFalse(handle.isWritten());

// Serialize the handle so it writes the value to hdfs
SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
handle);


assertTrue(handle.isWritten());

// Deserialize the handle and verify that the state is not fetched yet
FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
.deserializeValue(Thread.currentThread().getContextClassLoader());
Expand All @@ -107,7 +118,7 @@ public void testFileStateHandle() throws Exception {

// Test whether discard removes the checkpoint file properly
assertTrue(hdfs.listFiles(hdPath, true).hasNext());
handle.discardState();
deserializedHandle.discardState();
assertFalse(hdfs.listFiles(hdPath, true).hasNext());

}
Expand Down

0 comments on commit 83102f0

Please sign in to comment.