Skip to content

Commit

Permalink
Gateway util for per key lock and per key blocking queue executor(#2847)
Browse files Browse the repository at this point in the history
Gateway util for per key lock and per key blocking queue executor
  • Loading branch information
xyuanlu committed Jul 24, 2024
1 parent cf6a202 commit fc4a2e1
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.apache.helix.gateway.util;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/**
* A per-key blocking executor that ensures that only one event is running for a given key at a time.
*/
public class PerKeyBlockingExecutor {
private final ThreadPoolExecutor _executor;
private final Map<String, Queue<Runnable>> _pendingBlockedEvents;
private final ConcurrentHashMap.KeySetView<String, Boolean> _runningEvents;
private final Lock _queueLock;

public PerKeyBlockingExecutor(int maxWorkers) {
this._executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxWorkers);
this._pendingBlockedEvents = new HashMap<>();
this._queueLock = new ReentrantLock();
this._runningEvents = ConcurrentHashMap.newKeySet();
}

/**
* Offer an event to be executed. If an event is already running for the given key, the event will be queued.
* @param key
* @param event
*/
public void offerEvent(String key, Runnable event) {
_queueLock.lock();
try {
if (!_runningEvents.contains(key)) {
_executor.execute(() -> runEvent(key, event));
} else {
_pendingBlockedEvents.computeIfAbsent(key, k -> new ConcurrentLinkedQueue<>());
_pendingBlockedEvents.get(key).offer(event);
}
} finally {
_queueLock.unlock();
}
}

private void runEvent(String key, Runnable event) {
try {
_runningEvents.add(key);
event.run();
} finally {
_queueLock.lock();
try {
_runningEvents.remove(key);
processQueue(key);
} finally {
_queueLock.unlock();
}
}
}

private void processQueue(String key) {
if (!_pendingBlockedEvents.containsKey(key)) {
return;
}
Runnable event = _pendingBlockedEvents.get(key).poll();
if (event != null) {
_executor.execute(() -> runEvent(key, event));
}
}

public void shutdown() {
_executor.shutdown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.apache.helix.gateway.util;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

/**
* A registry that manages locks per key.
*/
public class PerKeyLockRegistry {
private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();

public void lock(String key) {
ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock());
lock.lock();
}

public void unlock(String key) {
ReentrantLock lock = lockMap.get(key);
if (lock != null) {
lock.unlock();
}
}

/**
* Execute the action with the lock on the key
* @param key
* @param action
*/
public void withLock(String key, Runnable action) {
lock(key);
try {
action.run();
} finally {
unlock(key);
}
}

/**
* Remove the lock if it is not being used.
* it must be called after the lock is required
* @param key
*/
public boolean removeLock(String key) {
ReentrantLock lock = lockMap.get(key);
if (lock != null && lock.isHeldByCurrentThread() && !lock.hasQueuedThreads()) {
lockMap.remove(key, lock);
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.apache.helix.gateway;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
import org.testng.annotations.Test;
import org.testng.Assert;


public class TestPerKeyBlockingExecutor {
@Test
public void testEventNotAddedIfPending() throws InterruptedException {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);

PerKeyBlockingExecutor perKeyBlockingExecutor = new PerKeyBlockingExecutor(3);

perKeyBlockingExecutor.offerEvent("key1", () -> {
try {
latch1.await(); // Wait for the test to release this latch
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

perKeyBlockingExecutor.offerEvent("key1", () -> {
latch2.countDown();
});

Thread.sleep(100); // Give time for the second event to be potentially processed

Assert.assertFalse(latch2.await(100, TimeUnit.MILLISECONDS)); // Event 2 should not run yet
latch1.countDown(); // Release the first latch
Assert.assertTrue(latch2.await(1, TimeUnit.SECONDS)); // Event 2 should run now

perKeyBlockingExecutor.offerEvent("key1", () -> {
latch3.countDown();
});

Assert.assertTrue(latch3.await(1, TimeUnit.SECONDS)); // Event 3 should run after Event 2
perKeyBlockingExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.apache.helix.gateway;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.gateway.util.PerKeyLockRegistry;
import org.testng.annotations.Test;
import org.testng.Assert;


public class TestPerKeyLockRegistry {
@Test
public void testConcurrentAccess() {
PerKeyLockRegistry lockRegistry = new PerKeyLockRegistry();
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(2);

lockRegistry.withLock("key1", () -> {
counter.incrementAndGet();
// try to acquir the lock for another key
lockRegistry.withLock("key2", () -> {
counter.incrementAndGet();
});
});

// counter should be 2
Assert.assertEquals(2, counter.get());

// acquire the lock for key
ExecutorService executor = Executors.newFixedThreadPool(2);
lockRegistry.lock("key1");
executor.submit(() -> {
lockRegistry.withLock("key1", () -> {
//try remove lock
Assert.assertFalse(lockRegistry.removeLock("key1"));
});
});
lockRegistry.unlock("key1");
executor.submit(() -> {
lockRegistry.withLock("key2", () -> {
//try remove lock, should fail because key1 is not locked
Assert.assertFalse(lockRegistry.removeLock("key1"));
});
});
executor.submit(() -> {
lockRegistry.withLock("key1", () -> {
//try remove lock, only this tiem it succeded
Assert.assertFalse(lockRegistry.removeLock("key1"));
});
});
executor.shutdown();
}
}

0 comments on commit fc4a2e1

Please sign in to comment.