Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decoupling local cache function and cache algorithm #38048

Merged
Prev Previous commit
Next Next commit
fix
  • Loading branch information
KinderRiven committed Aug 10, 2022
commit 76e0aad69e361e9a25823b7c9287b785a5111517
32 changes: 21 additions & 11 deletions src/Common/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ FileCache::FileCache(
, max_element_size(cache_settings_.max_elements)
, max_file_segment_size(cache_settings_.max_file_segment_size)
, enable_filesystem_query_cache_limit(cache_settings_.enable_filesystem_query_cache_limit)
, main_priority(std::make_shared<LRUFileCachePriority>())
, stash_priority(std::make_shared<LRUFileCachePriority>())
, main_priority(std::make_unique<LRUFileCachePriority>())
, stash_priority(std::make_unique<LRUFileCachePriority>())
, max_stash_element_size(cache_settings_.max_elements)
, enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold)
, log(&Poco::Logger::get("FileCache"))
Expand Down Expand Up @@ -145,7 +145,7 @@ void FileCache::QueryContext::remove(const Key & key, size_t offset, size_t size
auto record = records.find({key, offset});
if (record != records.end())
{
record->second->remove(cache_lock);
record->second->removeAndGetNext(cache_lock);
records.erase({key, offset});
}
}
Expand Down Expand Up @@ -561,7 +561,7 @@ FileCache::FileSegmentCell * FileCache::addCell(
{
auto remove_priority_iter = stash_priority->getLowestPriorityWriteIterator(cache_lock);
stash_records.erase({remove_priority_iter->key(), remove_priority_iter->offset()});
remove_priority_iter->remove(cache_lock);
remove_priority_iter->removeAndGetNext(cache_lock);
}
/// For segments that do not reach the download threshold, we do not download them, but directly read them
result_state = FileSegment::State::SKIP_CACHE;
Expand Down Expand Up @@ -645,7 +645,17 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc

auto * cell_for_reserve = getCell(key, offset, cache_lock);

std::vector<std::tuple<Key, size_t, size_t>> ghost;
struct Segment
{
Key key;
size_t offset;
size_t size;

Segment(Key key_, size_t offset_, size_t size_)
: key(key_), offset(offset_), size(size_) {}
};

std::vector<Segment> ghost;
std::vector<FileSegmentCell *> trash;
std::vector<FileSegmentCell *> to_evict;

Expand All @@ -669,9 +679,9 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc
/// The cache corresponding to this record may be swapped out by
/// other queries, so it has become invalid.
removed_size += iter->size();
ghost.push_back({iter->key(), iter->offset(), iter->size()});
ghost.push_back(Segment(iter->key(), iter->offset(), iter->size()));
/// next()
iter->remove(cache_lock);
iter->removeAndGetNext(cache_lock);
}
else
{
Expand Down Expand Up @@ -720,7 +730,7 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc
}

for (auto & entry : ghost)
query_context->remove(std::get<0>(entry), std::get<1>(entry), std::get<2>(entry), cache_lock);
query_context->remove(entry.key, entry.offset, entry.size, cache_lock);

if (is_overflow())
return false;
Expand Down Expand Up @@ -926,7 +936,7 @@ void FileCache::removeIfReleasable(bool remove_persistent_files)

std::lock_guard cache_lock(mutex);

std::vector<FileSegmentPtr> to_remove;
std::vector<FileSegment *> to_remove;
for (auto it = main_priority->getLowestPriorityReadIterator(cache_lock); it->valid(); it->next())
{
const auto & key = it->key();
Expand All @@ -946,7 +956,7 @@ void FileCache::removeIfReleasable(bool remove_persistent_files)
|| remove_persistent_files
|| allow_to_remove_persistent_segments_from_cache_by_default))
{
to_remove.emplace_back(file_segment);
to_remove.emplace_back(file_segment.get());
}
}
}
Expand Down Expand Up @@ -981,7 +991,7 @@ void FileCache::remove(

if (cell->queue_iterator)
{
cell->queue_iterator->remove(cache_lock);
cell->queue_iterator->removeAndGetNext(cache_lock);
}

auto & offsets = files[key];
Expand Down
4 changes: 2 additions & 2 deletions src/Common/FileCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ class FileCache : private boost::noncopyable
using CachedFiles = std::unordered_map<Key, FileSegmentsByOffset>;

CachedFiles files;
FileCachePriorityPtr main_priority;
std::unique_ptr<IFileCachePriority> main_priority;

FileCacheRecords stash_records;
FileCachePriorityPtr stash_priority;
std::unique_ptr<IFileCachePriority> stash_priority;

size_t max_stash_element_size;
size_t enable_cache_hits_threshold;
Expand Down
2 changes: 1 addition & 1 deletion src/Common/IFileCachePriority.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class IFileCachePriority

/// Deletes an existing cached record. And to avoid pointer suspension
/// the iterator should automatically point to the next record.
virtual void remove(std::lock_guard<std::mutex> &) = 0;
virtual void removeAndGetNext(std::lock_guard<std::mutex> &) = 0;

virtual void incrementSize(size_t, std::lock_guard<std::mutex> &) = 0;
};
Expand Down
10 changes: 4 additions & 6 deletions src/Common/LRUFileCachePriority.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@ class LRUFileCachePriority::LRUFileCacheIterator : public IFileCachePriority::II

size_t hits() const override { return queue_iter->hits; }

void remove(std::lock_guard<std::mutex> &) override
void removeAndGetNext(std::lock_guard<std::mutex> &) override
{
auto remove_iter = queue_iter;
queue_iter++;
file_cache->cache_size -= remove_iter->size;
file_cache->queue.erase(remove_iter);
file_cache->cache_size -= queue_iter->size;
queue_iter = file_cache->queue.erase(queue_iter);
}

void incrementSize(size_t size_increment, std::lock_guard<std::mutex> &) override
Expand All @@ -75,7 +73,7 @@ class LRUFileCachePriority::LRUFileCacheIterator : public IFileCachePriority::II
}

private:
mutable LRUFileCachePriority * file_cache;
LRUFileCachePriority * file_cache;
mutable LRUFileCachePriority::LRUQueueIterator queue_iter;
KinderRiven marked this conversation as resolved.
Show resolved Hide resolved
};

Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(

with_cache = settings.remote_fs_cache
&& settings.enable_filesystem_cache
&& (!IFileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
&& (!FileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
}

SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const String & path, size_t file_size)
Expand Down