Skip to content

Commit

Permalink
migrate from boost::barrier
Browse files Browse the repository at this point in the history
Summary: Migrate from `boost::barrier` to `std::barrier` and `std::latch`.

Reviewed By: Orvid

Differential Revision: D58611034

fbshipit-source-id: 5b7483cc7f1fb2f4cae8c5f0c22fe71e1466e543
  • Loading branch information
yfeldblum authored and facebook-github-bot committed Jun 16, 2024
1 parent 2957b2a commit 754ecd7
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 32 deletions.
37 changes: 20 additions & 17 deletions wangle/bootstrap/test/AcceptRoutingHandlerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

#include <latch>

#include "Mocks.h"

using namespace folly;
Expand Down Expand Up @@ -180,7 +182,7 @@ TEST_F(AcceptRoutingHandlerTest, ParseRoutingDataSuccess) {
}));

// Downstream pipeline is created, and its handler receives events
boost::barrier barrier(2);
std::latch barrier(2);
EXPECT_CALL(*downstreamHandler_, transportActive(_));
EXPECT_CALL(*downstreamHandler_, read(_, _))
.WillOnce(Invoke([&](MockBytesToBytesHandler::Context* /*ctx*/,
Expand All @@ -191,38 +193,38 @@ TEST_F(AcceptRoutingHandlerTest, ParseRoutingDataSuccess) {
.WillOnce(Invoke([&](MockBytesToBytesHandler::Context* ctx) {
VLOG(4) << "Downstream EOF";
ctx->fireClose();
barrier.wait();
barrier.arrive_and_wait();
}));
EXPECT_CALL(*downstreamHandler_, transportInactive(_));

// Send client request that triggers server processing
clientConnectAndCleanClose();

barrier.wait();
barrier.arrive_and_wait();

// Routing pipeline has been erased
EXPECT_EQ(0, acceptRoutingHandler_->getRoutingPipelineCount());
}

TEST_F(AcceptRoutingHandlerTest, SocketErrorInRoutingPipeline) {
// Server receives data, and parses routing data
boost::barrier barrierConnect(2);
std::latch barrierConnect(2);
EXPECT_CALL(*routingDataHandler_, transportActive(_));
EXPECT_CALL(*routingDataHandler_, parseRoutingData(_, _))
.WillOnce(
Invoke([&](folly::IOBufQueue& /*bufQueue*/,
MockRoutingDataHandler::RoutingData& /*routingData*/) {
VLOG(4) << "Need more data to be parse.";
barrierConnect.wait();
barrierConnect.arrive_and_wait();
return false;
}));

// Send client request that triggers server processing
auto futureClientPipeline = clientConnectAndWrite();

// Socket exception after routing pipeline had been created
barrierConnect.wait();
boost::barrier barrierException(2);
barrierConnect.arrive_and_wait();
std::latch barrierException(2);
std::move(futureClientPipeline)
.thenValue([](DefaultPipeline* clientPipeline) {
clientPipeline->getTransport()->getEventBase()->runInEventBaseThread(
Expand All @@ -236,9 +238,9 @@ TEST_F(AcceptRoutingHandlerTest, SocketErrorInRoutingPipeline) {
folly::exception_wrapper ex) {
VLOG(4) << "Routing data handler Exception";
acceptRoutingHandler_->onError(kConnId0, ex);
barrierException.wait();
barrierException.arrive_and_wait();
}));
barrierException.wait();
barrierException.arrive_and_wait();

// Downstream pipeline is not created
EXPECT_CALL(*downstreamHandler_, transportActive(_)).Times(0);
Expand All @@ -257,23 +259,24 @@ TEST_F(AcceptRoutingHandlerTest, OnNewConnectionWithBadSocket) {
delete downstreamHandler_;

// Send client request that triggers server processing
boost::barrier barrierConnect(2);
std::latch barrierConnect(2);
EXPECT_CALL(*routingDataHandler_, transportActive(_))
.WillOnce(Invoke([&](MockBytesToBytesHandler::Context* /*ctx*/) {
barrierConnect.wait();
barrierConnect.arrive_and_wait();
}));
auto futureClientPipeline = justClientConnect();
barrierConnect.wait();
barrierConnect.arrive_and_wait();
futureClientPipeline.wait();

// Expect an exception on the routing data handler
boost::barrier barrierException(2);
std::latch barrierException(2);
EXPECT_CALL(*routingDataHandler_, readException(_, _))
.WillOnce(Invoke(
[&](MockBytesToBytesHandler::Context* /*ctx*/,
folly::exception_wrapper /*ex*/) { barrierException.wait(); }));
.WillOnce(Invoke([&](MockBytesToBytesHandler::Context* /*ctx*/,
folly::exception_wrapper /*ex*/) {
barrierException.arrive_and_wait();
}));
sendClientException(futureClientPipeline.value());
barrierException.wait();
barrierException.arrive_and_wait();

// Routing pipeline has been added
EXPECT_EQ(1, acceptRoutingHandler_->getRoutingPipelineCount());
Expand Down
4 changes: 0 additions & 4 deletions wangle/bootstrap/test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ cpp_unittest(
"//wangle/channel:handler",
],
external_deps = [
("boost", None, "boost_thread"),
"glog",
],
)
Expand All @@ -43,7 +42,4 @@ cpp_library(
"//wangle/channel:pipeline",
"//wangle/channel/test:mocks",
],
exported_external_deps = [
("boost", None, "boost_thread"),
],
)
9 changes: 5 additions & 4 deletions wangle/bootstrap/test/BootstrapTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* limitations under the License.
*/

#include <latch>

#include "wangle/bootstrap/ClientBootstrap.h"
#include "wangle/bootstrap/ServerBootstrap.h"
#include "wangle/channel/Handler.h"

#include <boost/thread.hpp>
#include <folly/experimental/TestUtil.h>
#include <folly/portability/GTest.h>
#include <glog/logging.h>
Expand Down Expand Up @@ -166,15 +167,15 @@ TEST(Bootstrap, ServerAcceptGroupTest) {
SocketAddress address;
server.getSockets()[0]->getAddress(&address);

boost::barrier barrier(2);
std::latch barrier(2);
auto thread = std::thread([&]() {
TestClient client;
client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client.connect(address);
EventBaseManager::get()->getEventBase()->loop();
barrier.wait();
barrier.arrive_and_wait();
});
barrier.wait();
barrier.arrive_and_wait();
server.stop();
thread.join();
server.join();
Expand Down
1 change: 0 additions & 1 deletion wangle/bootstrap/test/Mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <wangle/channel/Pipeline.h>
#include <wangle/channel/test/MockHandler.h>

#include <boost/thread.hpp>
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>

Expand Down
3 changes: 0 additions & 3 deletions wangle/channel/test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,4 @@ cpp_unittest(
"//wangle/channel:pipeline",
"//wangle/channel:static_pipeline",
],
external_deps = [
("boost", None, "boost_thread"),
],
)
7 changes: 4 additions & 3 deletions wangle/channel/test/PipelineTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
* limitations under the License.
*/

#include <boost/thread/barrier.hpp>
#include <barrier>

#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
#include <wangle/channel/AsyncSocketHandler.h>
Expand Down Expand Up @@ -372,10 +373,10 @@ TEST(Pipeline, Concurrent) {
NiceMock<MockHandlerAdapter<int, int>> handler1, handler2;
auto pipeline = Pipeline<int, int>::create();
(*pipeline).addBack(&handler1).addBack(&handler2).finalize();
boost::barrier b{2};
std::barrier b{2};
auto spam = [&] {
for (int i = 0; i < 100000; i++) {
b.wait();
b.arrive_and_wait();
pipeline->read(i);
}
};
Expand Down

0 comments on commit 754ecd7

Please sign in to comment.