Skip to content

Commit

Permalink
Merge pull request #38048 from KinderRiven/decoupling_cache_function_…
Browse files Browse the repository at this point in the history
…and_algorithm

Decoupling local cache function and cache algorithm
  • Loading branch information
kssenii committed Aug 11, 2022
2 parents 45d6995 + 1aa7bbc commit be69169
Show file tree
Hide file tree
Showing 28 changed files with 887 additions and 830 deletions.
462 changes: 293 additions & 169 deletions src/Common/LRUFileCache.cpp → src/Common/FileCache.cpp

Large diffs are not rendered by default.

289 changes: 289 additions & 0 deletions src/Common/FileCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
#pragma once

#include <atomic>
#include <chrono>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <unordered_set>
#include <boost/functional/hash.hpp>
#include <boost/noncopyable.hpp>

#include <Core/Types.h>
#include <IO/ReadSettings.h>
#include <Common/FileCache_fwd.h>
#include <Common/FileSegment.h>
#include <Common/IFileCachePriority.h>
#include <Common/logger_useful.h>
#include <Common/FileCacheType.h>

namespace DB
{

/// Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
/// Different caching algorithms are implemented based on IFileCachePriority.
class FileCache : private boost::noncopyable
{
friend class FileSegment;
friend class IFileCachePriority;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;

public:
using Key = DB::FileCacheKey;

FileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_);

~FileCache() = default;

/// Restore cache from local filesystem.
void initialize();

void removeIfExists(const Key & key);

void removeIfReleasable(bool remove_persistent_files);

static bool isReadOnly();

/// Cache capacity in bytes.
size_t capacity() const { return max_size; }

static Key hash(const String & path);

String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const;

String getPathInLocalCache(const Key & key) const;

const String & getBasePath() const { return cache_base_path; }

std::vector<String> tryGetCachePaths(const Key & key);

/**
* Given an `offset` and `size` representing [offset, offset + size) bytes interval,
* return list of cached non-overlapping non-empty
* file segments `[segment1, ..., segmentN]` which intersect with given interval.
*
* Segments in returned list are ordered in ascending order and represent a full contiguous
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
*
* As long as pointers to returned file segments are hold
* it is guaranteed that these file segments are not removed from cache.
*/
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent);

/**
* Segments in returned list are ordered in ascending order and represent a full contiguous
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
*
* If file segment has state EMPTY, then it is also marked as "detached". E.g. it is "detached"
* from cache (not owned by cache), and as a result will never change it's state and will be destructed
* with the destruction of the holder, while in getOrSet() EMPTY file segments can eventually change
* it's state (and become DOWNLOADED).
*/
FileSegmentsHolder get(const Key & key, size_t offset, size_t size);

FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size, bool is_persistent);

FileSegments getSnapshot() const;

/// For debug.
String dumpStructure(const Key & key);

size_t getUsedCacheSize() const;

size_t getFileSegmentsNum() const;

private:
String cache_base_path;
size_t max_size;
size_t max_element_size;
size_t max_file_segment_size;

bool is_initialized = false;

mutable std::mutex mutex;

bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);

void remove(Key key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);

bool isLastFileSegmentHolder(
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);

void reduceSizeToDownloaded(
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */);

void assertInitialized() const;

using AccessKeyAndOffset = std::pair<Key, size_t>;
struct KeyAndOffsetHash
{
std::size_t operator()(const AccessKeyAndOffset & key) const
{
return std::hash<UInt128>()(key.first.key) ^ std::hash<UInt64>()(key.second);
}
};

using FileCacheRecords = std::unordered_map<AccessKeyAndOffset, IFileCachePriority::WriteIterator, KeyAndOffsetHash>;

/// Used to track and control the cache access of each query.
/// Through it, we can realize the processing of different queries by the cache layer.
struct QueryContext
{
FileCacheRecords records;
FileCachePriorityPtr priority;

size_t cache_size = 0;
size_t max_cache_size;

bool skip_download_if_exceeds_query_cache;

QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_)
: max_cache_size(max_cache_size_), skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_)
{
}

void remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);

void reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);

void use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);

size_t getMaxCacheSize() const { return max_cache_size; }

size_t getCacheSize() const { return cache_size; }

FileCachePriorityPtr getPriority() { return priority; }

bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; }
};

using QueryContextPtr = std::shared_ptr<QueryContext>;
using QueryContextMap = std::unordered_map<String, QueryContextPtr>;

QueryContextMap query_map;

bool enable_filesystem_query_cache_limit;

QueryContextPtr getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock);

QueryContextPtr getQueryContext(const String & query_id, std::lock_guard<std::mutex> & cache_lock);

void removeQueryContext(const String & query_id);

QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> &);

public:
/// Save a query context information, and adopt different cache policies
/// for different queries through the context cache layer.
struct QueryContextHolder : private boost::noncopyable
{
QueryContextHolder(const String & query_id_, FileCache * cache_, QueryContextPtr context_);

QueryContextHolder() = default;

~QueryContextHolder();

String query_id;
FileCache * cache = nullptr;
QueryContextPtr context;
};

QueryContextHolder getQueryContextHolder(const String & query_id, const ReadSettings & settings);

private:
struct FileSegmentCell : private boost::noncopyable
{
FileSegmentPtr file_segment;

/// Iterator is put here on first reservation attempt, if successful.
IFileCachePriority::WriteIterator queue_iterator;

/// Pointer to file segment is always hold by the cache itself.
/// Apart from pointer in cache, it can be hold by cache users, when they call
/// getorSet(), but cache users always hold it via FileSegmentsHolder.
bool releasable() const { return file_segment.unique(); }

size_t size() const { return file_segment->reserved_size; }

FileSegmentCell(FileSegmentPtr file_segment_, FileCache * cache, std::lock_guard<std::mutex> & cache_lock);

FileSegmentCell(FileSegmentCell && other) noexcept
: file_segment(std::move(other.file_segment)), queue_iterator(other.queue_iterator)
{
}
};

using FileSegmentsByOffset = std::map<size_t, FileSegmentCell>;
using CachedFiles = std::unordered_map<Key, FileSegmentsByOffset>;

CachedFiles files;
std::unique_ptr<IFileCachePriority> main_priority;

FileCacheRecords stash_records;
std::unique_ptr<IFileCachePriority> stash_priority;

size_t max_stash_element_size;
size_t enable_cache_hits_threshold;

Poco::Logger * log;
bool allow_to_remove_persistent_segments_from_cache_by_default;

FileSegments getImpl(const Key & key, const FileSegment::Range & range, std::lock_guard<std::mutex> & cache_lock);

FileSegmentCell * getCell(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);

FileSegmentCell * addCell(
const Key & key,
size_t offset,
size_t size,
FileSegment::State state,
bool is_persistent,
std::lock_guard<std::mutex> & cache_lock);

void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock) const;

bool tryReserveForMainList(
const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard<std::mutex> & cache_lock);

size_t getAvailableCacheSize() const;

void loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock);

FileSegments splitRangeIntoCells(
const Key & key,
size_t offset,
size_t size,
FileSegment::State state,
bool is_persistent,
std::lock_guard<std::mutex> & cache_lock);

String dumpStructureUnlocked(const Key & key_, std::lock_guard<std::mutex> & cache_lock);

void fillHolesWithEmptyFileSegments(
FileSegments & file_segments,
const Key & key,
const FileSegment::Range & range,
bool fill_with_detached_file_segments,
bool is_persistent,
std::lock_guard<std::mutex> & cache_lock);

size_t getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;

size_t getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;

size_t getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const;

void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard<std::mutex> & cache_lock);

public:
void assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock);

void assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock);

void assertPriorityCorrectness(std::lock_guard<std::mutex> & cache_lock);
};

}
4 changes: 2 additions & 2 deletions src/Common/FileCacheFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "FileCacheFactory.h"
#include "LRUFileCache.h"
#include "FileCache.h"

namespace DB
{
Expand Down Expand Up @@ -53,7 +53,7 @@ FileCachePtr FileCacheFactory::getOrCreate(
return it->second->cache;
}

auto cache = std::make_shared<LRUFileCache>(cache_base_path, file_cache_settings);
auto cache = std::make_shared<FileCache>(cache_base_path, file_cache_settings);
FileCacheData result{cache, file_cache_settings};

auto cache_it = caches.insert(caches.end(), std::move(result));
Expand Down
31 changes: 31 additions & 0 deletions src/Common/FileCacheType.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once
#include <Core/Types.h>
#include <Common/hex.h>

namespace DB
{

struct FileCacheKey
{
UInt128 key;

String toString() const { return getHexUIntLowercase(key); }

FileCacheKey() = default;

explicit FileCacheKey(const UInt128 & key_) : key(key_) { }

bool operator==(const FileCacheKey & other) const { return key == other.key; }
};

}

namespace std
{
template <>
struct hash<DB::FileCacheKey>
{
std::size_t operator()(const DB::FileCacheKey & k) const { return hash<UInt128>()(k.key); }
};

}
4 changes: 2 additions & 2 deletions src/Common/FileCache_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 100
static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS = 1024 * 1024;
static constexpr int REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD = 0;

class IFileCache;
using FileCachePtr = std::shared_ptr<IFileCache>;
class FileCache;
using FileCachePtr = std::shared_ptr<FileCache>;

struct FileCacheSettings;

Expand Down
6 changes: 3 additions & 3 deletions src/Common/FileSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <filesystem>

#include <Common/FileCache.h>

namespace CurrentMetrics
{
Expand All @@ -25,7 +25,7 @@ FileSegment::FileSegment(
size_t offset_,
size_t size_,
const Key & key_,
IFileCache * cache_,
FileCache * cache_,
State download_state_,
bool is_persistent_)
: segment_range(offset_, offset_ + size_ - 1)
Expand Down Expand Up @@ -787,7 +787,7 @@ FileSegmentsHolder::~FileSegmentsHolder()
/// FileSegmentsHolder right after calling file_segment->complete(), so on destruction here
/// remain only uncompleted file segments.

IFileCache * cache = nullptr;
FileCache * cache = nullptr;

for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
{
Expand Down
Loading

0 comments on commit be69169

Please sign in to comment.