Skip to content

Commit

Permalink
[FLINK-7778] [build] Shade ZooKeeper dependency (followups)
Browse files Browse the repository at this point in the history
  - Rename the 'flink-shaded-curator-recipes' module to 'flink-shaded-curator',
    because it actually contains more curator code than just the recipes.

  - Move the exception handling logic of 'ZooKeeperAccess' directly into the
    ZooKeeperStateHandleStore
  • Loading branch information
StephanEwen committed Nov 2, 2017
1 parent d368a07 commit 8afadd4
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
import org.apache.flink.runtime.zookeeper.ZookeeperAccess;
import org.apache.flink.util.FlinkException;

import org.apache.mesos.Protos;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -214,19 +214,11 @@ public void putWorker(MesosWorkerStore.Worker worker) throws Exception {

int currentVersion = workersInZooKeeper.exists(path);
if (currentVersion == -1) {
try {
workersInZooKeeper.addAndLock(path, worker);
LOG.debug("Added {} in ZooKeeper.", worker);
} catch (Exception ex) {
throw ZookeeperAccess.wrapIfZooKeeperNodeExistsException(ex);
}
workersInZooKeeper.addAndLock(path, worker);
LOG.debug("Added {} in ZooKeeper.", worker);
} else {
try {
workersInZooKeeper.replace(path, currentVersion, worker);
LOG.debug("Updated {} in ZooKeeper.", worker);
} catch (Exception ex) {
throw ZookeeperAccess.wrapIfZooKeeperNoNodeException(ex);
}
workersInZooKeeper.replace(path, currentVersion, worker);
LOG.debug("Updated {} in ZooKeeper.", worker);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -163,6 +164,9 @@ public RetrievableStateHandle<T> addAndLock(
success = true;
return storeHandle;
}
catch (KeeperException.NodeExistsException e) {
throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
}
finally {
if (!success) {
// Cleanup the state handle if it was not written to ZooKeeper.
Expand Down Expand Up @@ -202,8 +206,10 @@ public void replace(String pathInZooKeeper, int expectedVersion, T state) throws
.withVersion(expectedVersion)
.forPath(path, serializedStateHandle);
success = true;
} catch (KeeperException.NoNodeException e) {
throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
} finally {
if(success) {
if (success) {
oldStateHandle.discardState();
} else {
newStateHandle.discardState();
Expand Down Expand Up @@ -673,7 +679,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
}

}
};
}

/**
* Callback interface for remove calls
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ under the License.
<relativePath>..</relativePath>
</parent>

<artifactId>flink-shaded-curator-recipes</artifactId>
<name>flink-shaded-curator-recipes</name>
<artifactId>flink-shaded-curator</artifactId>
<name>flink-shaded-curator</name>

<packaging>jar</packaging>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ under the License.
<module>tools/force-shading</module>
<module>flink-annotations</module>
<module>flink-shaded-hadoop</module>
<module>flink-shaded-curator-recipes</module>
<module>flink-shaded-curator</module>
<module>flink-core</module>
<module>flink-java</module>
<module>flink-java8</module>
Expand Down

0 comments on commit 8afadd4

Please sign in to comment.