Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableCache in highly concurrent query with limited max_open_files #6699

Closed
koldat opened this issue Apr 14, 2020 · 10 comments
Closed

TableCache in highly concurrent query with limited max_open_files #6699

koldat opened this issue Apr 14, 2020 · 10 comments

Comments

@koldat
Copy link
Contributor

koldat commented Apr 14, 2020

Expected behavior

TableCache::FindTable should not allow to load two same files at the same time.

Actual behavior

TableCache::FindTable is loading same files many times in case there is concurrent query hitting same table that needs to be used. Because cache is limited in size to max_open_files it pushes out already loaded tables, because these are duplicated (other thread are loading the same tables).
It cause snowball effect that slow down queries to absolute minimum, because it is opening files again and again. My scenario:

  • I have roughly 300 SST files
  • I have max_open_files = 500
  • NO_FILE_OPENS is 200 per second
    To mitigate this issue I used max_open_files = -1 which is not what I want to use.

Actual metric NO_FILES_OPENS. After some time when load lowers little bit it drops. Entries are not push out of table Cache that fast so it will catch up (it is less than max_open_files ...). Then it is stable, because it will always fit in cache (300 < 500).
image

Steps to reproduce the behavior

I wrote simple test, but it is visible even from source code there is no protection against parallel load.
NO_FILE_OPENS(notParallelTest=true): 5
NO_FILE_OPENS(notParallelTest=false): 10

    public void createDB() throws Exception {
        Options opt = new Options();
        opt.setWriteBufferSize(128 * 1024);
        opt.setCreateIfMissing(true);
        opt.setTargetFileSizeBase(128 * 1024);
        opt.setTargetFileSizeMultiplier(1);
        opt.setMaxBytesForLevelBase(1024 * 1024);
        opt.setMaxBytesForLevelMultiplier(2);

        deleteDirectory("OpenFilesTest");
        try (RocksDB db = RocksDB.open(opt, "OpenFilesTest");
            WriteOptions writeOptions = new WriteOptions()) {
            
            writeOptions.setDisableWAL(true);
            
            ByteBuffer directKeyBuffer = ByteBuffer.allocateDirect(128);
            directKeyBuffer.order(ByteOrder.BIG_ENDIAN);
            
            ByteBuffer directValueBuffer = ByteBuffer.allocateDirect(128);
            directValueBuffer.order(ByteOrder.BIG_ENDIAN);
            
            Random r = new Random();
            
            for (int i = 0; i < 1_000_000; i++) {
                directKeyBuffer.clear();
                directValueBuffer.clear();
                directKeyBuffer.putLong(r.nextLong());
                for (int o = 0; o < 16; o++) {
                    directValueBuffer.putLong(r.nextLong());
                }
                directKeyBuffer.flip();
                directValueBuffer.flip();
                
                db.put(writeOptions, directKeyBuffer, directValueBuffer);
            }
        }
        
    }

    @Test
    public void testLoadFiles() throws Exception {
        createDB();
        
        Options opt = new Options();
        opt.setWriteBufferSize(128 * 1024);
        opt.setCreateIfMissing(true);
        opt.setTargetFileSizeBase(128 * 1024);
        opt.setTargetFileSizeMultiplier(1);
        opt.setMaxBytesForLevelBase(1024 * 1024);
        opt.setMaxBytesForLevelMultiplier(2);
        opt.setMaxOpenFiles(30);
        Statistics statistics = new Statistics();
        opt.setStatistics(statistics);
        
        for (int phase = 0; phase < 2; phase++) {
            boolean notParallelTest = phase == 0;
            try (RocksDB db = RocksDB.open(opt, "OpenFilesTest")) {
    
                ByteBuffer keyBuffer = ByteBuffer.allocate(8);
                keyBuffer.order(ByteOrder.BIG_ENDIAN);
                
                keyBuffer.putLong(Long.MAX_VALUE / 2);
                keyBuffer.flip();
    
    
                Object sync = new Object();
                // Clear counter
                statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS);
                Thread t1 = new Thread(() -> {
                    RocksIterator it = db.newIterator();
                    synchronized (sync) {
                        try {
                            sync.wait();
                        } catch (InterruptedException e) {
                        }
                    }
                    it.seek(keyBuffer.array());
                    it.close();
                });
                Thread t2 = new Thread(() -> {
                    RocksIterator it = db.newIterator();
                    synchronized (sync) {
                        try {
                            sync.wait();
                        } catch (InterruptedException e) {
                        }
                    }
                    it.seek(keyBuffer.array());
                    it.close();
                });
    
                if (notParallelTest) {
                    t1.start();
                    Thread.sleep(200);
                    synchronized (sync) {
                        sync.notifyAll();
                    }
                    Thread.sleep(200);
                    t2.start();
                    Thread.sleep(200);
                    synchronized (sync) {
                        sync.notifyAll();
                    }
                    
                } else {
                    t1.start();
                    t2.start();
                    
                    Thread.sleep(200);
                    synchronized (sync) {
                        sync.notifyAll();
                    }
                }
                
                t1.join();
                t2.join();
                
                System.out.println("NO_FILE_OPENS(notParallelTest=" + notParallelTest + "): " + statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS));
            }
        }
    }
@koldat
Copy link
Contributor Author

koldat commented Apr 14, 2020

One way is to use similar concept Guava in Java uses: https://github.com/google/guava/wiki/StripedExplained

And then lock using StripedLock by file name. It will block other threads from loading the same. We can do this also optional using Options.

@siying , what do you think? Can I go ahead with change and PR? Or you do not like this approach at all.

@pdillinger
Copy link
Contributor

I think we want to at least have the option to close these performance race conditions. See e.g. #6681 for a similar issue in block cache. I prototyped a solution in pdillinger@513affa.

The link you provided suggests (in my reading) that if file1 and file2 are in the same shard, opening file2 would not start until file1 is completed opening, which is bad. My prototype for the block cache has no such limitation because the lock is only held at beginning and completion of loading data. Other threads wanting the same data wait on a condition variable for the shard. Locks are never held during I/O and extraneous wake-ups should be small.

@siying
Copy link
Contributor

siying commented Apr 14, 2020

I think to solve the problem of table cache specifically, a stripe lock in table cache to make sure one file is being loaded sounds fine to me, as long as the max_open_files path is not touched, I don't worry about it.
@pdillinger raised a good general question too. How to solve the cache problem in general, not just table cache. We might need a longer discussion for that.

@koldat
Copy link
Contributor Author

koldat commented Apr 14, 2020

I quickly looked at your solution and it seems very complex to me and will take long time to release (need a lot of testing). The idea of striped lock is super simple. You create N number of locks (1024) and then you simply take the lock on hash % 1024 location. You use it and do the job. The theoretical max concurrency is 1024 threads without context switch. If one hide the control behind one mutex or condition variable it suffers from context switch in case more threads meets. From my experience it stops working when there is big amount of threads. One hit on same lock basically cumulate and stop all.

You can pick any she size of lock array to lower probability of hash clash between two threads. We are using this in many highly concurrent applications and it just works.

Pseudo code:

Lock lock = lockStripe.getLock(key); // This will do hash(key) % stripeSize
{
    ScopedLock l(lock);
   // do what you need with exclusive access to key with concurrency conflict = 1 / stripeSize
}

This approach can be used even for keys, blocks, ... Again only the size defines theoretical concurrency. If there as a conflict time to time it is still only a portion of 1/size. Which can be overhead of 1% and less.

@pdillinger
Copy link
Contributor

If one hide the control behind one mutex or condition variable it suffers from context switch in case more threads meets. From my experience it stops working when there is big amount of threads. One hit on same lock basically cumulate and stop all.

My code uses the existing lock sharding of the LRUCache, usually 64 locks (see #6592, input welcome), which is essentially what you're referring to as "striped lock," but IMHO "striped lock" is a misnomer, because with hashing there's no better likelihood that "adjacent" files will map to a different lock vs. "distant" files, as one should expect in "striping."

A meta-idea of my prototype was that the distinction between aggressive loading (by each thread on cache miss) vs. cooperative loading should be made in the Cache object, and Cache API users should be written so that both are possible. As seen in my changes to block_based_table_reader.cc, the added complexity for the API user is essentially an extra variable and two extra if's. Not much, though maybe that could be simplified more. Anyway, this allow for shared cache infrastructure, and per-use customization, without disjoint but related one-off implementations.

The danger in adding simple code expected to become obsolete is configuration parameters that become obsolete. :-/

@koldat
Copy link
Contributor Author

koldat commented Apr 14, 2020

Yes LRU is sharded 6 bits. It is there to increase concurrency of LRU itself. I was trying to solve input to cache. That input in case it is complex needs to be evaluated once (like open file). I understand that your solution is basically working based on cooperation like:

  1. Client wants to read some key
  2. Cache try get and put a ticket if not "worker" is there already
  3. Now client can do the job and others waits until we work
  4. When client finish cache is updated and others are notified
  5. Others pick the result from 4

This concept is similar to Guava LoadingCache. Client simply does "get" and if key is not there it starts loading. When load is done It updates cache and notify all blocked threads that have requested same key. Loading cache is passing "loader" during cache construction which is very nice and readable. Then one uses that as a "map" like (get, invalidate, ....).

That is true and I agree with that. There is actually only 4 cases that uses cache now (if I am not wrong):

  1. Table cache
  2. Row cache
  3. Block cache
  4. Compressed block cache

First one is I think most complex. "Striped lock" or "sharded loading" or "pick any" is I think most easiest way (10 lines of code) that simply do the job. Probability of hitting same lock is absolute minimum compared to the complexity of opening file. I do not count that we do not have 1000 cores today. The IO itself and CPU overhead right now is hundred times worse that stopping one thread in case of having same hash value out of 1000 possible ones.

So I was talking about this flow:
Loader - using striped lock mechanism to load one file at the same time.
Cache - no change (get and put used by loader).

Ofcourse having something like LoadingCache would be nice, but when I was testing performance it was always way slower than simple ConcurrentMap, because of the complexity it needs to do.

BTW striped lock has much more features (multiple key mutex without deadlock possibility). I have just used that as standard solution for this simple one.

@pdillinger
Copy link
Contributor

So I was talking about this flow:
Loader - using striped lock mechanism to load one file at the same time.
Cache - no change (get and put used by loader).

Lock held to span cache->Lookup and cache->Insert, OK. For a key application of ours, TableCache::FindTable is about 0.5% CPU, and almost none of that in ShardedCache::Lookup. That suggests modest needs for number of lock shards (in the steady state at least), and that the overhead of acquiring and releasing an uncontended lock should be negligible.

koldat added a commit to koldat/rocksdb that referenced this issue Apr 18, 2020
facebook-github-bot pushed a commit that referenced this issue Apr 21, 2020
Summary:
In highly concurrent requests table cache opens same file more times which lowers purpose of max_open_files. Fixes (#6699)
Pull Request resolved: #6707

Reviewed By: ltamasi

Differential Revision: D21044965

fbshipit-source-id: f6e91d90b60dad86e518b5147021da42460ee1d2
@koldat
Copy link
Contributor Author

koldat commented Apr 22, 2020

If anyone interested, I have posted LRU concurrent performance analysis here: https://groups.google.com/forum/#!topic/rocksdb/1_uo3Pr6DiE

@pdillinger
Copy link
Contributor

Fixed with #6707?

@koldat
Copy link
Contributor Author

koldat commented Apr 22, 2020

Yes seems fixed to me. Havent's seen the issue so far.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants