Skip to content

Commit

Permalink
MDEV-31095 tpool - restrict threadpool concurrency during bufferpool …
Browse files Browse the repository at this point in the history
…load

Add threadpool functionality to restrict concurrency during "batch"
periods (where tasks are added in rapid succession).
This will throttle thread creation more agressively than usual, while
keeping performance at least on-par.

One of these cases is bufferpool load, where async read IOs are executed
without any throttling. There can be as much as 650K read IOs for
loading 10GB buffer pool.

Another one is recovery, where "fake read" IOs are executed.

Why there are more threads than we expect?
Worker threads are not be recognized as idle, until they return to the
standby list, and to return to that list, they need to acquire
mutex currently held in the submit_task(). In those cases, submit_task()
has no worker to wake, and would create threads until default concurrency
level (2*ncpus) is satisfied. Only after that throttling would happen.
  • Loading branch information
vaintroub committed Oct 4, 2023
1 parent 9ba8dc1 commit e33e2fa
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 11 deletions.
2 changes: 2 additions & 0 deletions storage/innobase/buf/buf0dump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,9 @@ static void buf_dump_load_func(void *)
#ifdef WITH_WSREP
if (!get_wsrep_recovery()) {
#endif /* WITH_WSREP */
srv_thread_pool->set_concurrency(srv_n_read_io_threads);
buf_load();
srv_thread_pool->set_concurrency();
#ifdef WITH_WSREP
}
#endif /* WITH_WSREP */
Expand Down
9 changes: 7 additions & 2 deletions storage/innobase/srv/srv0start.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1486,9 +1486,14 @@ dberr_t srv_start(bool create_new_db)
if (srv_force_recovery < SRV_FORCE_NO_LOG_REDO) {
/* Apply the hashed log records to the
respective file pages, for the last batch of
recv_group_scan_log_recs(). */

recv_group_scan_log_recs().
Since it may generate huge batch of threadpool tasks,
for read io task group, scale down thread creation rate
by temporarily restricting tpool concurrency.
*/
srv_thread_pool->set_concurrency(srv_n_read_io_threads);
recv_sys.apply(true);
srv_thread_pool->set_concurrency();

if (recv_sys.is_corrupt_log()
|| recv_sys.is_corrupt_fs()) {
Expand Down
18 changes: 18 additions & 0 deletions tpool/tpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,24 @@ class thread_pool
{
m_aio.reset();
}

/**
Tweaks how fast worker threads are created, or how often they are signaled.
@param threads - desired number of concurrently active threads
Special value 0 means default. Not the same as max number of threads
in the pool - oversubscription is allowed and stalls are still detected
@note
It is designed to use with "batch" operations, where huge number
of tasks is submitted in rapid succession. In this case, it is
better to temporarily restrict concurrency, which will make thread
creation throttling more aggressive.
Once the batch is over, restore default concurrency
by calling set_concurrency(0).
*/
virtual void set_concurrency(unsigned int threads=0){}

int bind(native_file_handle &fd) { return m_aio->bind(fd); }
void unbind(const native_file_handle &fd) { if (m_aio) m_aio->unbind(fd); }
int submit_io(aiocb *cb) { return m_aio->submit_io(cb); }
Expand Down
25 changes: 16 additions & 9 deletions tpool/tpool_generic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ class thread_pool_generic : public thread_pool
{
return new timer_generic(func, data, this);
}
void set_concurrency(unsigned int concurrency=0) override;
};

void thread_pool_generic::cancel_pending(task* t)
Expand Down Expand Up @@ -796,7 +797,6 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
m_tasks_dequeued(),
m_wakeups(),
m_spurious_wakeups(),
m_concurrency(std::thread::hardware_concurrency()*2),
m_in_shutdown(),
m_timestamp(),
m_long_tasks_count(),
Expand All @@ -808,14 +808,7 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
m_last_activity(),
m_maintenance_timer(thread_pool_generic::maintenance_func, this, nullptr)
{

if (m_max_threads < m_concurrency)
m_concurrency = m_max_threads;
if (m_min_threads > m_concurrency)
m_concurrency = min_threads;
if (!m_concurrency)
m_concurrency = 1;

set_concurrency();
// start the timer
m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
}
Expand Down Expand Up @@ -844,6 +837,20 @@ bool thread_pool_generic::too_many_active_threads()
m_concurrency* OVERSUBSCRIBE_FACTOR;
}

void thread_pool_generic::set_concurrency(unsigned int concurrency)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (concurrency == 0)
concurrency= 2 * std::thread::hardware_concurrency();
m_concurrency = concurrency;
if (m_concurrency > m_max_threads)
m_concurrency = m_max_threads;
if (m_concurrency < m_min_threads)
m_concurrency = m_min_threads;
if (m_concurrency < 1)
m_concurrency = 1;
}

/** Submit a new task*/
void thread_pool_generic::submit_task(task* task)
{
Expand Down

0 comments on commit e33e2fa

Please sign in to comment.