Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decoupling local cache function and cache algorithm #38048

Merged
Prev Previous commit
Next Next commit
fix rebase
  • Loading branch information
KinderRiven committed Aug 10, 2022
commit 9d83b93e88025f4052e5a32562377b26ab87a0cd
196 changes: 185 additions & 11 deletions src/Common/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <IO/Operators.h>
#include <pcg-random/pcg_random.hpp>
#include <filesystem>
#include <Common/LRUFileCache.h>
#include <Common/LRUFileCachePriority.h>

namespace fs = std::filesystem;

Expand All @@ -23,17 +23,192 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

FileCache::FileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_)
: IFileCache(cache_base_path_, cache_settings_)
, main_priority(std::make_shared<LRUFileCache>())
, stash_priority(std::make_shared<LRUFileCache>())
FileCache::FileCache(
const String & cache_base_path_,
const FileCacheSettings & cache_settings_)
: cache_base_path(cache_base_path_)
, max_size(cache_settings_.max_size)
, max_element_size(cache_settings_.max_elements)
, max_file_segment_size(cache_settings_.max_file_segment_size)
, enable_filesystem_query_cache_limit(cache_settings_.enable_filesystem_query_cache_limit)
, main_priority(std::make_shared<LRUFileCachePriority>())
, stash_priority(std::make_shared<LRUFileCachePriority>())
KinderRiven marked this conversation as resolved.
Show resolved Hide resolved
, max_stash_element_size(cache_settings_.max_elements)
, enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold)
, log(&Poco::Logger::get("FileCache"))
, allow_to_remove_persistent_segments_from_cache_by_default(cache_settings_.allow_to_remove_persistent_segments_from_cache_by_default)
{
}

String FileCache::Key::toString() const
{
return getHexUIntLowercase(key);
}

FileCache::Key FileCache::hash(const String & path)
{
return Key(sipHash128(path.data(), path.size()));
}

String FileCache::getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const
{
auto key_str = key.toString();
return fs::path(cache_base_path)
/ key_str.substr(0, 3)
/ key_str
/ (std::to_string(offset) + (is_persistent ? "_persistent" : ""));
}

String FileCache::getPathInLocalCache(const Key & key) const
{
auto key_str = key.toString();
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
}

static bool isQueryInitialized()
{
return CurrentThread::isInitialized()
&& CurrentThread::get().getQueryContext()
&& CurrentThread::getQueryId().size != 0;
}

bool FileCache::isReadOnly()
{
return !isQueryInitialized();
}

void FileCache::assertInitialized() const
{
if (!is_initialized)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
}

FileCache::QueryContextPtr FileCache::getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock)
{
if (!isQueryInitialized())
return nullptr;

return getQueryContext(CurrentThread::getQueryId().toString(), cache_lock);
}

FileCache::QueryContextPtr FileCache::getQueryContext(const String & query_id, std::lock_guard<std::mutex> & /* cache_lock */)
{
auto query_iter = query_map.find(query_id);
return (query_iter == query_map.end()) ? nullptr : query_iter->second;
}

void FileCache::removeQueryContext(const String & query_id)
{
std::lock_guard cache_lock(mutex);
auto query_iter = query_map.find(query_id);

if (query_iter == query_map.end())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to release query context that does not exist (query_id: {})",
query_id);
}

query_map.erase(query_iter);
}

FileCache::QueryContextPtr FileCache::getOrSetQueryContext(
const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> & cache_lock)
{
if (query_id.empty())
return nullptr;

auto context = getQueryContext(query_id, cache_lock);
if (context)
return context;

auto query_context = std::make_shared<QueryContext>(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache);
auto query_iter = query_map.emplace(query_id, query_context).first;
return query_iter->second;
}

FileCache::QueryContextHolder FileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings)
{
std::lock_guard cache_lock(mutex);

if (!enable_filesystem_query_cache_limit || settings.max_query_cache_size == 0)
return {};

/// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero,
/// we create context query for current query.
auto context = getOrSetQueryContext(query_id, settings, cache_lock);
return QueryContextHolder(query_id, this, context);
}

void FileCache::QueryContext::remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (cache_size < size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size");

if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record != records.end())
{
record->second->remove(cache_lock);
records.erase({key, offset});
}
}
cache_size -= size;
}

void FileCache::QueryContext::reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (cache_size + size > max_cache_size)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Reserved cache size exceeds the remaining cache size (key: {}, offset: {})",
key.toString(), offset);
}

if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record == records.end())
{
auto queue_iter = priority->add(key, offset, 0, cache_lock);
record = records.insert({{key, offset}, queue_iter}).first;
}
record->second->incrementSize(size, cache_lock);
}
cache_size += size;
}

void FileCache::QueryContext::use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
{
if (skip_download_if_exceeds_query_cache)
return;

auto record = records.find({key, offset});
if (record != records.end())
record->second->use(cache_lock);
}

FileCache::QueryContextHolder::QueryContextHolder(
const String & query_id_,
FileCache * cache_,
FileCache::QueryContextPtr context_)
: query_id(query_id_)
, cache(cache_)
, context(context_)
{
}

FileCache::QueryContextHolder::~QueryContextHolder()
{
/// If only the query_map and the current holder hold the context_query,
/// the query has been completed and the query_context is released.
if (context && context.use_count() == 2)
cache->removeQueryContext(query_id);
}

void FileCache::initialize()
{
std::lock_guard cache_lock(mutex);
Expand Down Expand Up @@ -115,7 +290,7 @@ FileSegments FileCache::getImpl(
files.erase(key);

/// Note: it is guaranteed that there is no concurrency with files deletion,
/// because cache files are deleted only inside IFileCache and under cache lock.
/// because cache files are deleted only inside FileCache and under cache lock.
if (fs::exists(key_path))
fs::remove_all(key_path);

Expand Down Expand Up @@ -387,7 +562,7 @@ FileCache::FileSegmentCell * FileCache::addCell(

if (stash_priority->getElementsNum(cache_lock) > max_stash_element_size)
{
auto remove_priority_iter = stash_priority->getNewIterator(cache_lock);
auto remove_priority_iter = stash_priority->getNewIterator(cache_lock)->getWriteIterator();
stash_records.erase({remove_priority_iter->key(), remove_priority_iter->offset()});
remove_priority_iter->remove(cache_lock);
}
Expand Down Expand Up @@ -473,7 +648,7 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc

auto * cell_for_reserve = getCell(key, offset, cache_lock);

std::vector<IFileCachePriority::Iterator> ghost;
std::vector<IFileCachePriority::WriteIterator> ghost;
std::vector<FileSegmentCell *> trash;
std::vector<FileSegmentCell *> to_evict;

Expand All @@ -496,7 +671,7 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc
{
/// The cache corresponding to this record may be swapped out by
/// other queries, so it has become invalid.
ghost.push_back(iter->getSnapshot());
ghost.push_back(iter->getWriteIterator());
removed_size += iter->size();
}
else
Expand Down Expand Up @@ -844,10 +1019,9 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock
Key key;
UInt64 offset = 0;
size_t size = 0;
std::vector<std::pair<IFileCachePriority::Iterator, std::weak_ptr<FileSegment>>> queue_entries;
std::vector<std::pair<IFileCachePriority::WriteIterator, std::weak_ptr<FileSegment>>> queue_entries;

/// cache_base_path / key_prefix / key / offset

if (!files.empty())
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
Expand Down
Loading