Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Shard Splitting To CouchDB #1972

Merged
merged 6 commits into from
Apr 3, 2019
Merged

Introduce Shard Splitting To CouchDB #1972

merged 6 commits into from
Apr 3, 2019

Conversation

nickva
Copy link
Contributor

@nickva nickva commented Mar 7, 2019

This PR implements shard splitting [1] for CouchDB as discussed in RFC #1920.

The work is organized as a series of commits in the following order:

  • Uneven shard copy handling in mem3 and fabric. That's a preparatory step to
    make sure mem3 and fabric knows how to handle uneven shard copies on the
    cluster.

  • Main job manager and supervisor. This commit defines the supervision tree and
    the main job manager. The job manager is a central component which starts,
    stops and monitors jobs.

  • Individual shard splitting job implementation and associated helper module.
    Here the logic for a single shard splitting job is defined along with the
    needed helpers, some of which were separated in different modules. The
    couch_db_split module, specifically is in its own commit as it lives in the
    couch application, operates on db files (instead of shards) and general is
    large and separate enough from the rest to warrant its own commit.

  • Internal replicator update to handle split shards. Internal replicator and
    some associated helpers in mem3_rep were updated to handle split shard
    sources and targets. Internal replicator involved in resharding in two main
    ways 1) during the normal cluster operation replicating between possibly
    split shards and 2) as part of the shard splitting job logic where it is used
    to topoff updates from the source to the targets.

  • HTTP API implementation and functional tests. This commit implements the HTTP
    API and adds behavior and other tests as discussed in the RFC.

[1] Shard splitting and resharding is used interchangeably in the PR and the
commits. Initially the work had started with implementing just shard splitting
and the API reflected that, however after a fruitful discussion in the couchdb
dev mailing list, it was agreed to generalize the work to potentially allow
shard merging or other shard map operations in the future. So closer to the
HTTP API level "resharding" will be used more often and in the job
implementation discussions and comments shard splitting will be used, since
currently only shard splitting is supported.

[Opened a new PR as the original #1921 seems to have gotten unsynchronized with Travis-CI where it was not building the correct commits anymore]

@nickva nickva force-pushed the reshard branch 6 times, most recently from 5dca167 to 01df008 Compare March 9, 2019 00:04
Copy link
Contributor

@jaydoane jaydoane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little over halfway through, but at this rate it's going to take at least another day, so might as well submit pending comments.

src/couch/src/couch_db_engine.erl Show resolved Hide resolved
src/mem3/test/mem3_ring_prop_tests.erl Outdated Show resolved Hide resolved
src/couch/src/couch_db_split.erl Outdated Show resolved Hide resolved
src/couch/src/couch_db_split.erl Show resolved Hide resolved
src/mem3/src/mem3_reshard.erl Outdated Show resolved Hide resolved
src/mem3/src/mem3_reshard.erl Outdated Show resolved Hide resolved
src/mem3/src/mem3_reshard.erl Outdated Show resolved Hide resolved
src/mem3/src/mem3_reshard.erl Outdated Show resolved Hide resolved
src/mem3/README_reshard.md Show resolved Hide resolved
Copy link
Member

@wohali wohali left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment meaningfully on the code, but I included a bunch of comments on the readme, which again, thank you for that. Hope it helps.

src/mem3/README_reshard.md Outdated Show resolved Hide resolved
src/mem3/README_reshard.md Outdated Show resolved Hide resolved
src/mem3/README_reshard.md Outdated Show resolved Hide resolved
src/mem3/README_reshard.md Show resolved Hide resolved
src/mem3/README_reshard.md Outdated Show resolved Hide resolved
src/mem3/README_reshard.md Outdated Show resolved Hide resolved
src/mem3/README_reshard.md Show resolved Hide resolved
src/mem3/README_reshard.md Show resolved Hide resolved
Copy link
Contributor

@jaydoane jaydoane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tests are passing with make eunit, but I've encountered this failure all three times I ran make elixir:

PartitionAllDocsTest
  * test partition _all_docs with descending (5.5ms)
  * test all_docs with partitioned:true returns partitioned fields (6.5ms)
  * test partition all docs can set query limits (119.4ms)
  * test partition _all_docs with timeout (5057.4ms)

  1) test partition _all_docs with timeout (PartitionAllDocsTest)
     test/partition_all_docs_test.exs:185
     ** (RuntimeError) timed out after 5014 ms
     code: retry_until(fn ->
     stacktrace:
       (foo) lib/couch/db_test.ex:215: Couch.DBTest.retry_until/4
       test/partition_all_docs_test.exs:191: (test)

although it's not clear that failure has anything to do with this PR. Can you reproduce?

src/mem3/src/mem3_reshard_dbdoc.erl Show resolved Hide resolved
src/mem3/src/mem3_reshard_dbdoc.erl Outdated Show resolved Hide resolved
src/mem3/src/mem3_reshard_dbdoc.erl Outdated Show resolved Hide resolved
src/mem3/src/mem3_reshard_httpd.erl Outdated Show resolved Hide resolved
src/mem3/src/mem3_reshard_httpd.erl Outdated Show resolved Hide resolved
src/mem3/test/mem3_reshard_changes_feed_test.erl Outdated Show resolved Hide resolved
src/mem3/test/mem3_reshard_test.erl Show resolved Hide resolved
src/mem3/test/mem3_reshard_test.erl Outdated Show resolved Hide resolved
test/elixir/test/reshard_helpers.exs Outdated Show resolved Hide resolved
src/mem3/README_reshard.md Outdated Show resolved Hide resolved
@nickva
Copy link
Contributor Author

nickva commented Mar 25, 2019

@jaydoane Thanks for taking a look and for the detailed review. I think I have fixed / responded to most of the comments. I couldn't reproduce the. I couldn't reproduce the PartitionAllDocsTest failure locally as discussed online.

Copy link
Contributor

@jaydoane jaydoane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that all questions and comments I had have been addressed. Exceptional work!

Copy link
Contributor

@jaydoane jaydoane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this case already covered by a test? If not, perhaps it would be worth adding one?

src/mem3/src/mem3_reshard_job.erl Outdated Show resolved Hide resolved
Copy link
Member

@eiri eiri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. The only thing I want is an explanation why it was necessary to tighten handle_message expectation guarantees in fabric modules.

To be honest it's hard to feel confident in reviewing ~10K LOC, but this is nice and clean code with comprehensive testing - good job!

src/couch/src/couch_db.erl Outdated Show resolved Hide resolved
src/fabric/src/fabric_db_doc_count.erl Show resolved Hide resolved
src/fabric/src/fabric_db_info.erl Show resolved Hide resolved
src/fabric/src/fabric_design_doc_count.erl Show resolved Hide resolved
@nickva nickva force-pushed the reshard branch 3 times, most recently from ce0f56b to 3041f86 Compare March 27, 2019 15:09
Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks pretty awesome here. I did find a few things on my first pass. I'll give this another read through tomorrow after it percolates over night. I was mostly reading this first pass with an eye towards data safety. On that end the only thing that jumped out at me was the comment on dropping docs during internal replication. Other than that everything looks super duper on first (eighth?) read through.

src/couch/src/couch_db_split.erl Outdated Show resolved Hide resolved
src/couch/src/couch_db_split.erl Show resolved Hide resolved
src/mem3/src/mem3_rep.erl Show resolved Hide resolved
src/mem3/src/mem3_reshard_index.erl Outdated Show resolved Hide resolved
@wohali
Copy link
Member

wohali commented Mar 29, 2019

@nickva have you seen #1987 and #1996 ?

src/mem3/src/mem3_rep.erl Outdated Show resolved Hide resolved
@nickva
Copy link
Contributor Author

nickva commented Mar 29, 2019

@wohali I think you referenced the bit where we copy and modify _local checkpoint docs to make replications not rewind. We have that functionality in the PR and could probably surface it somehow at the API level. But would need a separate PR or RFC for it.

@wohali
Copy link
Member

wohali commented Mar 29, 2019

@nickva Well, yes, and the problem of shard syncing or resharding getting confused when nodes hosting shards change (along with the _dbs document). What happens then? If we've changed the starting conditions, how does the scheduling service react? Sorry to stick this here but I only just thought of it.

Separate PR is fine without an RFC I think for an enhancement like this since we're talking about, really, essential functionality.

@davisp
Copy link
Member

davisp commented Mar 30, 2019

@wohali Definitely a good idea to add some tests around missing hosts and so on. We'd obviously want those to fail properly and not cause issues. However the actual work is all local to a node so we don't have any doomsday situations where we're writing bytes that disappear into the ether if a node has been renamed.

@nickva
Copy link
Contributor Author

nickva commented Mar 31, 2019

We do have a few places which react to changes in the shard map. If db is deleted we clean up the splitting job. When the shard is updated we error out if the shard we are replacing is missing https://github.com/apache/couchdb/pull/1972/files#diff-bec6521346a40b9585a85ff559b82b50R110 , if expected source shard node is not there we bail out https://github.com/apache/couchdb/pull/1972/files#diff-bec6521346a40b9585a85ff559b82b50R200 and there are others.

For cases where they are manual shard moves or other manual operations we added the API to stop resharding via the _reshard/state endpoint.

Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minor improvements I noticed for the get_ring property test.

src/mem3/test/mem3_ring_prop_tests.erl Show resolved Hide resolved
src/mem3/test/mem3_ring_prop_tests.erl Outdated Show resolved Hide resolved
Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit on the change log in the dbs db.

src/mem3/src/mem3_reshard_dbdoc.erl Outdated Show resolved Hide resolved
Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More testing ideas

src/mem3/test/mem3_ring_prop_tests.erl Show resolved Hide resolved
nickva added a commit to apache/couchdb-documentation that referenced this pull request Apr 2, 2019
Copy link
Member

@davisp davisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Awesome work @nickva!

nickva and others added 6 commits April 2, 2019 16:47
The introduction of shard splitting will eliminate the contraint that all
document copies are located in shards with same range boundaries. That
assumption was made by default in mem3 and fabric functions that do shard
replacement, worker spawning, unpacking `_changes` update sequences and some
others. This commit updates those places to handle the case where document
copies might be in different shard ranges.

A good place to start from is the `mem3_util:get_ring()` function. This
function returns a full non-overlapped ring from a set of possibly overlapping
shards.

This function is used by almost everything else in this commit:

1) It's used when only a single copy of the data is needed, for example in
cases where _all_docs or _changes procesessig.

2) Used when checking if progress is possible after some nodes died.
`get_ring()` returns `[]` when it cannot find a full ring is used to indicate
that progress is not possible.

3) During shard replacement. This is pershaps the most complicated case. During
replacement besides just finding a possible covering of the ring from the set
of shards, it is also desirable to find one that minimizes the number of
workers that have to be replaced. A neat trick used here is to provide
`get_ring` with a custom sort function, which prioritizes certain shard copies
over others. In case of replacements it prioritiezes shards for which workers
have already spawned. In the default cause `get_ring()` will prioritize longer
ranges over shorter ones, so for example, to cover the interval [00-ff] with
either [00-7f, 80-ff] or [00-ff] shards ranges, it will pick the single [00-ff]
range instead of [00-7f, 80-ff] pair.

Co-authored-by: Paul J. Davis <[email protected]>
The first step when a new shard splitting job starts is to do a bulk copy of
data from the source to the target.

Ideally this should happen as fast as possible as it could potentially churn
through billions of documents. This logic is implemented in the
`couch_db_split` module in the main `couch` application.

To understand better what happens in `couch_db_split` it is better to think of
it as a version of `couch_bt_engine_compactor` that lives just above the
couch_db_engine (PSE) interface instead of below it.

The first initial data copy does is it creates the targets. Targets are created
based on the source parameters. So if the source uses a specific PSE engine,
targets will use the same PSE engine. If the source is partitioned, the targets
will use the same partitioned hash function as well.

An interesting bit with respect to target creation is that targets are not
regular couch_db databases but are closer to a couch_file with a
couch_db_updater process linked to them. They are linked directly without going
through couch_server. This is done in order to avoid the complexity of handling
concurrent updates, handling VDU, interactive vs non-interactive updates,
making sure it doesn't compact while copying happens, doesn't update any LRUs,
or emit `db_updated` events. Those are things are not needed and handling them
would make this more fragile. Another way to think of the targets during the
initial bulk data copy is as "hidden" or "write-only" dbs.

Another notable thing is that `couch_db_split` doesn't know anything about
shards and only knows about databases. The input is a source, a map of targets
and a caller provided "picker" function which will know how for each given
document ID to pick one of the targets. This will work for both regular dbs as
well as partitioned ones. All the logic will be inside the pick function not
embedded in `couch_db_split`.

One last point is about handling internal replicator _local checkpoint docs.
Those documents are transformed when they are copied such that the old source
UUID is replaced with the new target's UUID, since each shard will have its own
new UUID. That is done to avoid replications rewinding.

Besides those points, the rest is rather boring and it's just "open documents
from the source, pick the target, copy the documents to one of the targets,
read more documents from the source, etc".

Co-authored-by: Paul J. Davis <[email protected]>
Co-authored-by: Eric Avdey <[email protected]>
Shard splitting will result in uneven shard copies. Previously internal
replicator knew to replicate from one shard copy to another but now it needs to
know how to replicate from one source to possibly multiple targets.

The main idea is to reuse the same logic and "pick" function as
`couch_db_split`.

But to avoid a penalty of calling the custom hash function for every document
even for cases when there is just a single target, there is a special "1
target" case where the hash function is `undefined`.

Another case where internal replicator is used is to topoff replication and to
replicate the shard map dbs to and from current node (used in shard map update
logic). For that reason there are a few helper mem3_util and mem3_rpc
functions.
This is the implementation of the shard splitting job. `mem3_reshard` manager
spawns `mem3_reshard_job` instances via the `mem3_reshard_job_sup` supervisor.

Each job is a gen_server process that starts in `mem3_reshard_job:init/1` with
`#job{}` record instance as the argument.

Then the job goes through recovery, so it can handle resuming in cases where
the job was interrupted previously and it was initialized from a checkpointed
state. Checkpoiting happens in `mem3_reshard` manager with the help of the
`mem3_reshard_store` module (introduced in a previous commit).

After recovery, processing starts in the `switch_state` function. The states
are defined as a sequence of atoms in a list in `mem3_reshard.hrl`.

In the `switch_state()` function, the state and history is updated in the
`#job{}` record, then `mem3_reshard` manager is asked to checkpoint the new
state. The job process waits for `mem3_reshard` manager to notify it when
checkpointing has finished so it can continue processesing the new state. That
happens when the `do_state` gen_server cast is received.

`do_state` function has state matching heads for each state. Usually if there
are long running tasks to be performed `do_state` will spawn a few workers and
perform all the work in there. In the meantime the main job process will simpy
wait for all the workers to exit. When that happens, it will call
`switch_state` to switch to the new state, checkpoint again and so on.

Since there are quite a few steps needed to split a shard, some of the helper
function needed are defined in separate modules such as:

 * mem3_reshard_index : Index discovery and building.
 * mem3_reshard_dbdoc : Shard map updates.
 * couch_db_split : Initial (bulk) data copy (added in a separate commit).
 * mem3_rep : To perfom "top-offs" in between some steps.
Most of the resharding logic lives in the mem3 application under the
`mem3_reshard_sup` supervisor. `mem3_reshard_sup` has three children:

1) `mem3_reshard` : The main reshading job manager.

2) `mem3_reshard_job_sup` : A simple-one-for-one supervisor to keep track of
individual resharding jobs.

3) `mem3_reshard_dbdoc` : Helper gen_server used to update the shard map.

`mem_reshard` gen_server is the central point in the resharding logic. It is a job
manager which accept new jobs, monitors jobs when they run, checkpoints their
status as they make progress, and knows how to restore their state when a node
reboots.

Jobs are represented as instances of the `#job{}` records defined in
`mem3_reshard.hrl` header. There is also a global resharding state represented
by a `#state{}` record.

`mem3_reshard` gen_server maintains an ets table of "live" `#job{}` records. as
its gen_server state represented by `#state{}`. When jobs are checkpointed or
user updates the global resharding state, `mem3_reshard` will use the
`mem3_reshard_store` module to persist those updates to `_local/...` documents
in the shards database. The idea is to allow jobs to persist across node or
application restarts.

After a job is added, if the global state is not `stopped`, `mem3_reshard`
manager will ask the `mem3_reshard_job_sup` to spawn a new child. That child
will be running in a gen_server defined in `mem3_reshard_job` module (included
in subsequent commits). Each child process will periodically ask `mem3_reshard`
manager to checkpoint when it jump to a new state. `mem3_reshard` checkpoints
then informs the child to continue its work.
This implements the API as defined in RFC #1920

The handlers live in the `mem3_reshard_httpd` and helpers, like validators live
in the `mem3_reshard_api` module.

There are also a bunch of high level (HTTP & fabric) API tests that check that
shard splitting happens properly, jobs are behaving as defined in the RFC, etc.

Co-authored-by: Eric Avdey <[email protected]>
nickva added a commit to apache/couchdb-documentation that referenced this pull request Apr 3, 2019
@nickva
Copy link
Contributor Author

nickva commented Apr 3, 2019

Documentation PR: apache/couchdb-documentation#404

nickva added a commit to apache/couchdb-documentation that referenced this pull request Apr 3, 2019
@nickva nickva merged commit a6db7d5 into master Apr 3, 2019
@nickva nickva deleted the reshard branch April 3, 2019 14:48
nickva added a commit to apache/couchdb-documentation that referenced this pull request Apr 3, 2019
nickva added a commit to apache/couchdb-documentation that referenced this pull request Apr 3, 2019
tabeth pushed a commit to tabeth/couchdb-documentation that referenced this pull request Jul 11, 2019
nickva added a commit to nickva/couchdb that referenced this pull request Sep 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants