Skip to content

Commit

Permalink
play with seastar
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonYuchen committed Sep 18, 2021
1 parent 591a697 commit e15657d
Show file tree
Hide file tree
Showing 13 changed files with 270 additions and 156 deletions.
8 changes: 8 additions & 0 deletions 3rd/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
include(FetchContent)
FetchContent_Declare(
googletest
# Specify the commit you depend on and update it regularly.
URL https://github.com/google/googletest/archive/609281088cfefc76f9d0ce82e1ff6c30cc3591e5.zip
)

FetchContent_MakeAvailable(googletest)
32 changes: 18 additions & 14 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
cmake_minimum_required(VERSION 3.16)
project(seastar_playground)
project(rafter)

set(CMAKE_CXX_STANDARD 20)

find_package(Seastar REQUIRED)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines-ts")

add_subdirectory(storage)
if (CMAKE_BUILD_TYPE STREQUAL "Release")
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=undefined")
endif()

# enable LTO/LTCG
#if (CMAKE_BUILD_TYPE STREQUAL "Release")
# message("lto enabled in release build")
# set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)
#else()
# message("address sanitizer enabled in debug build")
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=undefined")
#endif()
include_directories(${PROJECT_SOURCE_DIR})

add_executable(seastar_playground main.cpp)
#add_subdirectory(3rd)
add_subdirectory(test)

target_link_libraries(seastar_playground
PUBLIC Seastar::seastar)
set(SOURCES
main.cpp
util/fragmented_temporary_buffer.cc)

add_executable(${PROJECT_NAME} ${SOURCES})

target_link_libraries(${PROJECT_NAME}
PUBLIC Seastar::seastar)
42 changes: 0 additions & 42 deletions echo.h

This file was deleted.

87 changes: 30 additions & 57 deletions main.cpp
Original file line number Diff line number Diff line change
@@ -1,72 +1,45 @@
#include <seastar/core/app-template.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/when_all.hh>

#include <seastar/core/file.hh>
#include <seastar/core/iostream.hh>
#include <iostream>
#include <chrono>

#include "util/fragmented_temporary_buffer.hh"

using namespace std;
using namespace chrono_literals;

class test {
public:
int _id = -1;
explicit test(int id) : _id(id) {
cout << "ctor " << _id << endl;
}
test(test&& r) noexcept {
_id = r._id; r._id = -1;
cout << "move ctor from " << _id << endl;
}
test& operator=(test&& r) noexcept {
_id = r._id; r._id = -1;
cout << "move assign from " << _id << endl;
return *this;
}
~test() {
cout << "dtor " << _id << endl;
}

seastar::future<> handle() {
cout << "start" << endl;
co_await seastar::sleep(1s);
cout << "done" << endl;
}
};

seastar::future<> handle(test o) {
co_return co_await o.handle();
}
seastar::future<> handle_lref(test& o) {
co_return co_await o.handle();
}
seastar::future<> handle_rref(test&& o) {
co_return co_await o.handle();
}
seastar::future<> handle_sp(seastar::lw_shared_ptr<test> o) {
co_return co_await o->handle();
}


int main(int argc, char** argv) {
seastar::app_template app;
app.run(argc, argv, [] () -> seastar::future<> {
for (int i = 1; i <= 2; ++i) {
// 1
test o(i);
// (void)o.handle();
// 2
// (void)handle(std::move(o));
// 3
// (void)handle_lref(o);
// 4
(void)handle_rref(std::move(o));
// 5
// auto p = seastar::make_lw_shared<test>(i);
// (void)handle_sp(p);
fragmented_temporary_buffer buf(49, 4096);
std::cout << buf.size() << std::endl;
buf.remove_suffix(buf.size() - 49);
std::cout << buf.size() << std::endl;
auto out = buf.as_ostream();
std::string a(25, 'a');
std::string b(24, 'b');
std::cout << "writing a" << std::endl;
out.write(a.data(), a.size());
std::cout << "writing b" << std::endl;
out.write(b.data(), b.size());
buf.remove_prefix(5);
int cnt = 0;
auto it = buf.begin();
while (true) {
if (it == buf.end()) {
break;
}
cnt++;
std::cout << cnt << " fragment" << std::endl;
std::cout << "\tsize " << it->size() << std::endl;
std::cout << "\tcontent " << std::string(it->data(), it->size()) << std::endl;
it++;
}
co_await seastar::sleep(3s);
co_return;

});
}
38 changes: 0 additions & 38 deletions segment_log.h

This file was deleted.

3 changes: 0 additions & 3 deletions storage/CMakeLists.txt

This file was deleted.

10 changes: 8 additions & 2 deletions storage/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ class segment {
};
segment() {
file_ = seastar::open_file_dma(path_.string(), seastar::open_flags::create | seastar::open_flags::dsync | seastar::open_flags::rw);

out_ = seastar::make_file_output_stream(file_);
header_.data = seastar::temporary_buffer<char>::aligned(file_.memory_dma_alignment(), header::HEADER_SIZE);
co_await header_.allocate(file_).set_term(0).set_vote(0).fill_checksum(0).dump_to(file_);
header_.set_term(0);
header_.set_vote(0);
header_.fill_checksum(0);
file_.dma_write(0, header_.data.share());
co_await out_.write(header_.data.share());
co_await out_.flush();
}
Expand All @@ -195,6 +195,9 @@ class segment {
static constexpr uint64_t CHECKSUM_TYPE = 36;
static constexpr uint64_t CHECKSUM = 40;
header() = default;
header& allocate(seastar::file& file) {
data = seastar::temporary_buffer<char>::aligned(file_.memory_dma_alignment(), header::HEADER_SIZE);
}
uint64_t term() const noexcept {
return read_le<uint64_t>(data.get() + TERM);
}
Expand All @@ -216,6 +219,9 @@ class segment {
write_le(checksum, data.get_write() + CHECKSUM);
return *this;
}
seastar::future<> dump_to(seastar::file& file) {
co_return co_await file.dma_write(0, data.share());
}

seastar::temporary_buffer<char> data;
};
Expand Down
11 changes: 11 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
project(rafter_test)

set(SOURCES
base.cc
fragmented_temporary_buffer_test.cc)

add_executable(${PROJECT_NAME} ${SOURCES})

target_link_libraries(${PROJECT_NAME}
PUBLIC Seastar::seastar
PUBLIC gtest)
10 changes: 10 additions & 0 deletions test/base.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//
// Created by jason on 2021/9/15.
//

#include "gtest/gtest.h"

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
10 changes: 10 additions & 0 deletions test/base.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//
// Created by jason on 2021/9/15.
//

#pragma once

#include <seastar/testing/test_case.hh>

#define RAFTER_TEST_CASE(name) \
SEASTAR_TEST_CASE(__file__ ## name)
20 changes: 20 additions & 0 deletions test/fragmented_temporary_buffer_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//
// Created by jason on 2021/9/12.
//

#include "test/base.hh"
#include "util/fragmented_temporary_buffer.hh"

using namespace rafter::util;

RAFTER_TEST_CASE(fragmented_temporary_buffer_1_fragment) {
auto fragment_size = fragmented_temporary_buffer::default_fragment_size;
auto buffer_size = fragment_size - 10;
std::string data(buffer_size, 'c');
fragmented_temporary_buffer buffer(buffer_size, 4096);
buffer.as_ostream().write(data.data(), data.size());
BOOST_CHECK_EQUAL(buffer.size(), buffer_size);
int i = 0;
for (auto it = buffer.begin(); it != buffer.end(); it++, i++) {}
BOOST_CHECK_EQUAL(i, 1);
}
3 changes: 3 additions & 0 deletions util/fragmented_temporary_buffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//
// Created by jason on 2021/9/15.
//
Loading

0 comments on commit e15657d

Please sign in to comment.