Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rfc(decision): Batch multiple files together into single large file to improve network throughput #98

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add technical details section
  • Loading branch information
cmanallen committed May 26, 2023
commit d34401e3b3d251abb6ed6a0de7a57028b751c23a
123 changes: 120 additions & 3 deletions text/0098-store-multiple-replay-segments-in-a-single-blob.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ First, a new table called "recording_byte_range" with the following structure is

| replay_id | segment_id | filename | start | stop |
| --------- | ---------- | -------- | ----- | ----- |
| A | 0 | file.txt | 0 | 6241 |
| B | 0 | file.txt | 6242 | 8213 |
| A | 1 | file.txt | 8214 | 12457 |
| A | 0 | file.bin | 0 | 6241 |
| B | 0 | file.bin | 6242 | 8213 |
| A | 1 | file.bin | 8214 | 12457 |

This table will need to support, at a minimum, one write per segment. Currently, we recieve ~350 segments per second at peak load.

Expand Down Expand Up @@ -78,10 +78,127 @@ The response bytes will be decompressed, merged into a single payload, and retur
- It can likely interface with the Django ORM. But its not clear to me at the time of writing.
- Whatever database we use must support deletes.

3. How fast can we encrypt and decrypt each segment?

# Extensions

By extending the schema of the "recording_byte_range" table to include a "type" column we can further reduce the number of bytes returned to the client. The client has different requirements for different sets of data. The player may only need the next `n` seconds worth of data, the console and network tabs may paginate their events, and the timeline will always fetch a simplified view of the entire recording.

With the byte range pattern in place these behaviors are possible and can be exposed to the client. The ultimate outcome of this change is faster loading times and the elimination of browser freezes and crashes from large replays.

This will increase the number of rows written to our database table. We would write four rows whereas with the original proposal we were only writing one. Therefore we should select our database carefully to ensure it can handle this level of write intensity.

# Technical Details

## Storage Service Support

The following sections describe the psuedo-code necessary to fetch a range of bytes from a service provider and also links to the documentation where applicable.

**Google Cloud Storage**

```python
from google.cloud.storage import Blob

blob = Blob(filename, bucket)
blob.download_as_bytes(start=start, end=stop)
```

Source: https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.blob.Blob#google_cloud_storage_blob_Blob_download_as_bytes

**AWS S3**

```python
from boto3 import client

response = client("s3", **auth).get_object(
Bucket=bucket,
Key=filename,
Range=f"bytes={start}-{stop}",
)
response["Body"].read()
```

Source: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_object.html

**Filesystem**

```python
with open(filename, "r") as f:
f.seek(start)
f.read((stop - start) + 1) # Range is inclusive.
```

## Consumer Buffering Mechanics

We will continue our approach of using _at-least once processing_. Each message we receive is guaranteed to be processed to completion regardless of error or interrupt. Duplicate messages are possible under this scheme and must be accounted for in the planning of each component.

**Buffer Location and Behavior**

The buffer is kept as an in-memory list inside the consumer process. For each message we receive we append the message to the buffer. Afterwards, we check if the buffer is full. If it is we flush. Else we wait for another message.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this will constrain you to a fairly small buffer size.
Also it ties the number of replicas of your consumer to the cost efficiency of the storage, which is quite undesirable:
Assuming you are never going to commit on kafka untill the buffer is flushed (if you did you would not be able to guarantee at least once):

  • If you increase the number of replicas for any reason, each replicas takes less traffic, thus it takes longer to fill the buffer.
  • Unless you tie the commit batch time and size to the number of replicas (which is undesirable - see the snuba consumer), increasing the number of replicas would increase the number of files written per unit of time.
    replicas and file size should not be connected to each other, otherwise we will have to keep in mind a lot of affected moving parts when scaling the consumer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the time to accumulate a batch is less than the time to upload a batch then you need to add a replica. That's the only constraint. You get more efficiency at peak load so its best to run our replicas hot. The deadline will prevent the upload from sitting idle too long. The total scale factor will be determined by the number of machines we can throw at the problem.

Multi-processing/threading, I think, will be deadly to this project. So we will need a lot of single-threaded machines running.

I re-wrote this response several times. Its as disorganized as my thoughts are on this. Happy to hear critiques.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you tie the commit batch time and size to the number of replicas (which is undesirable - see the snuba consumer), increasing the number of replicas would increase the number of files written per unit of time.

I think this is an okay outcome. If you double the number of replicas you halve the number of parts per file and double the number of files. That reduces cost efficiency but the throughput efficiency remains the same for each replica. Ignoring replica count, cost efficiency will ebb and flow with the variations in load we receive throughout the day.

We still come out ahead because the total number of files written per second is less than the current implementation which is 1 file per message.

Copy link
Member Author

@cmanallen cmanallen Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[...] otherwise we will have to keep in mind a lot of affected moving parts when scaling the consumer.

We should scale our replicas agnostic to the implementation of the buffer's flush mechanics. I mentioned above about cost-efficiency being a hard target. So I don't think we should target it.

A deadline should be present to guarantee regular buffer commits and a max buffer size should exist to prevent us from using too many resources. I think those two commit semantics save us from having to think about the implications of adding replicas. The throughput of a single machine may drop but the risk of back log has decreased across the cluster.


This is a somewhat simplified view of whats happening. In reality we will have time based flushing and a timeout mechanism for message listening. This ensures the buffer does not stay partially full indefinitely.

**Buffer Flush**

On flush the buffer will take every message in the list and merge them together into a single bytes object. This bytes object will then be uploaded to the storage service-provider. Upon successful upload the start and stop byte range values of each message are stored in a database in addition to other metadata such as their replay_id and segment_id. Finally, the last offset is committed to Kafka.

**Handling Consumer Restarts**

If the consumer restarts with a non-empty buffer, the buffer's last item's offset will not be committed. When the consumer resumes it will start processing from the last offset committed (i.e. the last item in the last successfully-flushed-buffer). The buffer will be rebuilt exactly as it was prior to restart.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the target size of each file you want to write and the target buffer size ?
The approach of throwing away the in flight work is generally ok if the buffer are very small and the time it takes to reprocessing is negligible. If this is not the case (you want large files) you may create a lot of noise when kafka rebalances consumer groups and create a lot of backlog.

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the commit criterion I have (naively) envisioned. All these values are adjustable.

  • Commit if filesize >= 10MB.
  • Commit if num_parts >= 1000.
  • Commit if five-second-deadline exceeded.

10MB and 1000 parts can be scaled down to 1MB and 100 parts if they seem unrealistic. Any lower and the concept, I think, has run its course and is not worth pursuing.

the time it takes to reprocessing is negligible

I believe this will be the case. A generic consumer implementation should only be doing byte concatenation. But depending on size it may take a while to fetch those files over the network to even begin buffering.

I would prefer this generic consumer deployed independently of getsentry so re-balances are less common.


**Storage Service Failure**

If we can not communicate with the storage provider we have several options.

1. Catch the exception and commit the offset anyway. This means all the segments in the buffer would be lost.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
2. Do not catch the exception and let the consumer rebuild the buffer from its last saved offset.
3. Catch the exception and retry.

Option three is the preferred solution but the semantics of the retry behavior can get complicated depending on how the system is constructed. For example, how long do you retry? How do retries affect message processing? Do you communicate with the service provider in a thread? If so how do you manage resources?

A blocking approach is the simplest solution but it does not offer maximum throughput.

**Managing Effects**

With a buffered approach most of the consumer's effects are accomplished in two bulk operations. However, click search, segment-0 outcome tracking, and segment-0 project lookup are not handle-able in this way. We will address each case independently below.

1. Click Tracking.
- Click events are published to the replay-event Kafka consumer.
- This publishing step is asynchronous and relies on threading to free up the main process thread.
- This operation is measured in microseconds and is not anticipated to significantly impact total throughput.
2. Outcome Tracking.
- Outcome events are published to the outcomes Kafka consumer.
- This publishing step is asynchronous and relies on threading to free up the main process thread.
- This operation only occurs for segment-0 events.
- This operation is measured in microseconds and is not anticipated to significantly impact total throughput.
3. Project lookup.
- Projects are retrieved by a cache lookup or querying PostgreSQL if it could not be found.
- This operation typically takes >1ms to complete.
- This operation only occurs for segment-0 events.
- Querying this information in a tight loop is not an ideal situation.
- Forwarding the project_id to a secondary Kafka consumer would free up resources on our main consumer and allow the secondary consumer to optimize for this type of workload.
- Alternatively, another method for looking up the project's `has_replay` flag could be found.

**Duplicate Message Handling**

1. Google Cloud Storage.
- Unique filename generation per buffer would mean that a segment could be present in multiple files.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
- This has COGS implications but does not impact our application.
2. "recording_byte_range" table.
- Duplicate replay, segment ID pairs will be recorded in the table.
- A reader must either select distinct or group by the replay_id, segment_id pair.
- Neither row has precendence over the other but the filename value must come from the same row as the start and stop byte range values.
3. Outcome tracking.
- Duplicate outcomes will be recorded for a given replay.
- The replay_id functions as an idempotency token in the outcomes consumer and prevents the customer from being charged for the same replay multiple times.
4. Click tracking.
- Duplicate click events will be inserted for a replay, segment pair.
- This is an acceptable outcome and will not impact search behavior.

## Security

How do we prevent users from seeing segments that do not belong to them? The short-answer is test coverage. Recording another segment's byte range would be the same as generating the incorrect filename under the current system. These outcomes are prevented with robust test coverage.

In the case of bad byte range math, we do have some implicit protection. Each segment is compressed independently. Fetching a malformed byte range would yield unreadable data. A bad byte range either truncates the compression headers or includes unintelligible bytes at the beginning or end of the sequence. If we manage to decompress a subset of a valid byte range the decompressed output would be malformed JSON and would not be returnable to the user.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved

Additionally, each segment could be encrypted with an encryption-key that is stored on the row. Requesting an invalid byte range would yield malformed data which could not be decrypted with the encryption-key stored on the row.