Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #3526] Optimize SubStreamHandler #3527

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
* ResetCountDownLatch can reset
*
* @see java.util.concurrent.CountDownLatch
*/
public class ResetCountDownLatch {

private final RestSync restSync;

public ResetCountDownLatch(int count) {
this.restSync = new RestSync(count);
}


/**
* Causes the current thread to wait until the latch has counted down to zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>If the current count is zero then this method returns immediately.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies dormant until one of two things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void await() throws InterruptedException {
restSync.acquireSharedInterruptibly(1);
}

/**
* Causes the current thread to wait until the latch has counted down to zero, unless the thread is {@linkplain Thread#interrupt interrupted}, or
* the specified waiting time elapses.
*
* <p>If the current count is zero then this method returns immediately
* with the value {@code true}.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If the count reaches zero then the method returns with the
* value {@code true}.
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if the count reached zero and {@code false} if the waiting time elapsed before the count reached zero
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return restSync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}


/**
* Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
public void countDown() {
restSync.releaseShared(1);
}

/**
* Returns the current count.
*
* <p>This method is typically used for debugging and testing purposes.
*
* @return the current count
*/
public int getCount() {
return restSync.getCount();
}

/**
* Reset the CountDownLatch
*/
public void reset() {
restSync.reset();
}

/**
* Synchronization control For ResetCountDownLatch. Uses AQS state to represent count.
*/
private static final class RestSync extends AbstractQueuedSynchronizer {

private final int initCount;

RestSync(int count) {
if (count < 0) {
throw new IllegalArgumentException("count must be greater than or equal to 0");
}
this.initCount = count;
setState(count);
}

protected void reset() {
setState(initCount);
}

int getCount() {
return getState();
}

@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

@Override
protected boolean tryReleaseShared(int releases) {
for (; ; ) {
int count = getState();
if (count == 0) {
return false;
}
int nextCount = count - 1;
if (compareAndSetState(count, nextCount)) {
return nextCount == 0;
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class ThreadWrapper implements Runnable {

private final AtomicBoolean started = new AtomicBoolean(false);
protected Thread thread;
Comment on lines +25 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides its usage in SubStreamHandler, could ThreadWrapper be utilized in other classes as well?

ThreadWrapper appears to streamline synchronization for SubStreamHandler, eliminating the need for using a latch. However, the synchronization mechanism using latch in SubStreamHandler itself isn't overly complex. Hence, I'm curious whether introducing the additional complexity of ThreadWrapper for synchronization would address additional problems.

protected final ResetCountDownLatch waiter = new ResetCountDownLatch(1);
protected volatile AtomicBoolean hasWakeup = new AtomicBoolean(false);
protected boolean isDaemon = false;
protected volatile boolean isRunning = false;

public ThreadWrapper() {

}

public abstract String getThreadName();

public void start() {

if (!started.compareAndSet(false, true)) {
log.warn("Start thread:{} fail", getThreadName());
return;
}
this.thread = new Thread(this, getThreadName());
this.thread.setDaemon(isDaemon);
this.thread.start();
this.isRunning = true;
log.info("Start thread:{} success", getThreadName());
}

public void await() {
if (hasWakeup.compareAndSet(true, false)) {
return;
}
//reset count
waiter.reset();
try {
waiter.await();
} catch (InterruptedException e) {
log.error("Thread[{}] Interrupted", getThreadName(), e);
} finally {
hasWakeup.set(false);
}
}

public void await(long timeout) {
await(timeout, TimeUnit.MILLISECONDS);
}

public void await(long timeout, TimeUnit timeUnit) {
if (hasWakeup.compareAndSet(true, false)) {
return;
}
//reset count
waiter.reset();
try {
waiter.await(timeout, timeUnit == null ? TimeUnit.MILLISECONDS : timeUnit);
} catch (InterruptedException e) {
log.error("Thread[{}] Interrupted", getThreadName(), e);
} finally {
hasWakeup.set(false);
}
}

public void wakeup() {
if (hasWakeup.compareAndSet(false, true)) {
waiter.countDown();
}
}

public void shutdownImmediately() {
shutdown(true);
}

public void shutdown() {
shutdown(false);
}

private void shutdown(final boolean interruptThread) {
if (!started.compareAndSet(true, false)) {
return;
}
this.isRunning = false;
//wakeup the thread to run
wakeup();

try {
if (interruptThread) {
this.thread.interrupt();
}
if (!this.isDaemon) {
//wait main thread to wait this thread finish
this.thread.join(TimeUnit.SECONDS.toMillis(60));
}
} catch (InterruptedException e) {
log.error("Thread[{}] Interrupted", getThreadName(), e);
}
}

public void setDaemon(boolean daemon) {
isDaemon = daemon;
}

public boolean isStated() {
return this.started.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common;

import java.util.concurrent.TimeUnit;

import org.junit.Assert;
import org.junit.Test;

public class ResetCountDownLatchTest {

@Test
public void testConstructorParameterError() {
try {
new ResetCountDownLatch(-1);
} catch (IllegalArgumentException e) {
Assert.assertEquals(e.getMessage(), "count must be greater than or equal to 0");
}
ResetCountDownLatch resetCountDownLatch = new ResetCountDownLatch(1);
Assert.assertEquals(1, resetCountDownLatch.getCount());
}

@Test
public void testAwaitTimeout() throws InterruptedException {
ResetCountDownLatch latch = new ResetCountDownLatch(1);
boolean await = latch.await(5, TimeUnit.MILLISECONDS);
Assert.assertFalse(await);
latch.countDown();
await = latch.await(5, TimeUnit.MILLISECONDS);
Assert.assertTrue(await);
}

@Test(timeout = 1000)
public void testCountDownAndGetCount() throws InterruptedException {
int count = 2;
ResetCountDownLatch resetCountDownLatch = new ResetCountDownLatch(count);
Assert.assertEquals(count, resetCountDownLatch.getCount());
resetCountDownLatch.countDown();
Assert.assertEquals(count - 1, resetCountDownLatch.getCount());
resetCountDownLatch.countDown();
resetCountDownLatch.await();
Assert.assertEquals(0, resetCountDownLatch.getCount());
}

@Test
public void testReset() throws InterruptedException {
int count = 2;
ResetCountDownLatch resetCountDownLatch = new ResetCountDownLatch(count);
resetCountDownLatch.countDown();
Assert.assertEquals(count - 1, resetCountDownLatch.getCount());
resetCountDownLatch.reset();
Assert.assertEquals(count, resetCountDownLatch.getCount());
resetCountDownLatch.countDown();
resetCountDownLatch.countDown();
resetCountDownLatch.await();
Assert.assertEquals(0, resetCountDownLatch.getCount());
resetCountDownLatch.countDown();
Assert.assertEquals(0, resetCountDownLatch.getCount());
resetCountDownLatch.reset();
resetCountDownLatch.countDown();
Assert.assertEquals(1, resetCountDownLatch.getCount());

}
}
Loading