Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate handling with rollover #44794

Open
jpountz opened this issue Jul 24, 2019 · 16 comments
Open

Duplicate handling with rollover #44794

jpountz opened this issue Jul 24, 2019 · 16 comments
Labels
:Data Management/ILM+SLM Index and Snapshot lifecycle management >feature high hanging fruit Team:Data Management Meta label for data/management team

Comments

@jpountz
Copy link
Contributor

jpountz commented Jul 24, 2019

Say your data shippers sometimes send duplicates. With time-based indices it used to be possible to remove dups by using Logstash's fingerprint filter to create the document _id based on a hash of its content and making sure to index into the index that matched the value of the @timestamp field to make sure that two documents with the same date would go to the same index, even if one is sent before midnight UTC, and the other one is sent after midnight - when a new daily index is created.

Unfortunately with rollover, one can't control anymore the index to which a document is written. This has benefits, such as ensuring that previous indices won't be written to anymore, which might not be true with the approach outlined in the previous paragraph. However the downside is that users can no longer handle duplicates in their indices as sending the same document before and after a new index is created would result in duplicates across the index pattern. The main practical implication is that these users can't use ILM.

Is there something we can do to enable these users to use ILM?

@jpountz jpountz added >feature discuss :Data Management/ILM+SLM Index and Snapshot lifecycle management labels Jul 24, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features

@jpountz
Copy link
Contributor Author

jpountz commented Jul 26, 2019

This is a frequent problem when using systems that provide at-least-once delivery semantics like Kafka. Rollover brings significant benefits compared to regular time-based indices, so we would like to make it work for this use-case, but it is challenging. One idea could be to look up whether the document already exists in the index from the previous time-frame, but this could slow down indexing significantly given that id lookups are a significant part of the indexing process. The idea was raised whether the deduplication logic could be moved to Logstash, but this is likely not applicable as it relies on the assumption that all documents go through a single instance.

@jpountz
Copy link
Contributor Author

jpountz commented Sep 6, 2019

Maybe we could implement this idea of deleting in the previous index and creating in the current index actually. The trick would be to compute ids in such a way that ids from the current time frame are unlikely to share a prefix with ids from the previous time frame, so that deletions in the previous time frame run without going to disk.

For instance if we prefixed the hash by the number of minutes since Epoch, then about one minute after we move to a new time frame, deletions in the previous time frame would likely not need to go to disk anymore?

@jpountz
Copy link
Contributor Author

jpountz commented Sep 10, 2019

I talked with a user who would like this feature this morning. Some data points:

  • they need this feature because they use Kafka in a way that only guarantees at-least-once delivery, so there might be duplicates - I believe this is a common situation
  • ids are computed by a custom data shipper, but it works fairly similarly to Logstash's fingerprint filter
  • at times they also used this to replay indexing of a subset of the logs
  • they plan to allow up to 30 days between the timestamp of the event and when it gets indexed (which means we might have to delete in lots of indices if applying the above idea)

@Ryado
Copy link

Ryado commented Sep 18, 2019

Some use-cases that suffer from this issue are also the ones where indexing speed is key. That means that we often optimize and accept tradeoffs for fast indexing (longer refresh intervals, async translog durability, etc.). That makes me think that anything we introduce that impacts indexing speed can be tricky to adopt in that case.

Maybe we can think of a "Lazy deduplication" approach, where we tolerate the duplicates for short life-spans but we have another independent process responsible for deleting them in past (rolled over) indices based on document ids. To help reduce the management overhead, this can be liked with the rollover task itself and triggered with a successful rollover action. eg:

POST /logs_write/_rollover 
{
  "conditions": {
    "max_age":   "7d",
    "max_docs":  1000,
    "max_size":  "5gb"
  },
  "settings": {
    "lazy_deduplication":   true
  }
}

Maybe it can be a good enough solution for some use-cases?

@jpountz
Copy link
Contributor Author

jpountz commented Oct 30, 2019

@Ryado I think that the fact that duplicates might be spread across multiple indices makes it a bit tricky to implement efficiently?

@ArtemChekunov
Copy link

maybe make sense to throw duplicates out during aggregation as it implemented in Cassandra

Something like that:

d = {
"my_index-000001": [
  {"@t": 0, "msg": "a message", "fingerprint": "a"},
  {"@t": 0, "msg": "b message", "fingerprint": "b"},
  {"@t": 0, "msg": "c message", "fingerprint": "c"},
],

"my_index-000002": [{"@t": 0, "msg": "d message", "fingerprint": "d"}],

"my_index-000003": [

  {"@t": 0, "msg": "e message", "fingerprint": "e"},
  {"@t": 0, "msg": "a message", "fingerprint": "a"},
]
}

In : list({doc["fingerprint"]: doc for k,v in d.items() for doc in v}.values())
Out : 
[{'@t': 0, 'msg': 'a message', 'fingerprint': 'a'},
 {'@t': 0, 'msg': 'b message', 'fingerprint': 'b'},
 {'@t': 0, 'msg': 'c message', 'fingerprint': 'c'},
 {'@t': 0, 'msg': 'd message', 'fingerprint': 'd'},
 {'@t': 0, 'msg': 'e message', 'fingerprint': 'e'}]

@Bernhard-Fluehmann
Copy link

A datastream wide _id check on create would be a nice solution. For performance reasons and backwards compatibility with a datastream configuration option. E.g. id_scope: global, duplicate_ids: false, unique_ids: true etc.

@jackalq
Copy link

jackalq commented Nov 10, 2021

Is there any progress on this issue?

I would like to provide a compromise initial idea, limited to the case where each document has an event-time and an ingest-time, and is append-only.
The deletion of duplicate documents by a background thread, lazy, periodic, batches, in a manner similar to a "transform" run.
From the event time of the document (minus the maximum allowed system time difference) against the creation time of each index, a list of possible duplicate writes to the index is obtained (only older indexes than the index where the document is located are checked).
Then add another parameter - the maximum allowed check period - to limit the number of indexes to be checked to protect the performance of the cluster.
If the document already exists in old index , delete the document from the current index.
For data stream queries, additional parameters can be added to choose whether to query the de-duplicated data or the entire data - this can be done by simply taking the processed time from the last checkpoint and adding it to the query.

All of the above is guaranteed by "time", which should be relatively easy to use for users.
Most of the checks should determine that the document is at the current index, so would not hurt the performance .
Maybe the micro-batch processing can be optimized a bit more...

In addition, I hope the "transform" can also query the de-duplicated data - that is why i am here.

@ruflin
Copy link
Member

ruflin commented Mar 28, 2024

I want to revive this thread as I think we still have not solved the problem and a lot of things have happened in the last few years:

Some users don't migrate to data streams because they use a unique _id to prevent duplication. This means, duplication is prevented as long as the rollover didn't happen (which this issue is about). I would argue, in many scenarios this is good enough but the concept breaks down when old data is reingested. Even though data streams today don't support updates (#70923), it is possible to send a second document with the same _id but then the returned error version_conflict_engine_exception must be ignored, like this users get a similar experience to alias with indices.

For the "best effort" deduplication (meaning ignore the rollover scenario, reingesting old data) I think we can improve data streams to have a mode, that when the same _id is ingested again, the error is swallowed by Elasticsearch instead of the client.

For the other scenarios, TSDS becomes interesting as it has a _tsid which is a hash for the dimensions. I wonder if we could introduce a similar concept for logs with a unique identifier for a logs stream + @timestamp. My assumption is, that inside a single data stream, the @timestamp is not unique even if we move to nano seconds by default (#102548) as there is a good chance 1000s of machines send logs to the same data stream. But the `@timestamp + log stream id/hash" is unique. For example reading logs from the same file on the same machine, it assumes each log line has a different timestamp. If two lines can have the same timestamp, additional info like the offset is needed. This sounds very similar to the fingerprint processor: https://www.elastic.co/guide/en/elasticsearch/reference/current/fingerprint-processor.html But instead of having to define a processor, the "dimensions" could be defined on the data stream directly (even better, there would be good defaults). This also removes the need to have the client generating some custom _id.

Having such a hash would likely not fully eliminate duplicated data in the different indices especially when old data is reindex, but hopefully it would make it simple to find duplicates and remove them.

Above is all brainstorming, curious to hear other thoughts.

@felixbarny
Copy link
Member

Maybe we can also take inspiration from how this is handled in time series data streams. In this indexing mode, documents are routed to backing indices depending on their timestamp. If a time series data stream is rolled over, late arriving documents are still sent to the previous backing index. However, this also comes with various tradeoffs, for example, only accepting a fairly recent documents and a rollover only taking effect after a while, which is problematic for auto-sharding, and not being able to enforce a maximum size or number of documents per backing index.

@martijnvg
Copy link
Member

I think the mechanism we use for tsdb (notion of dimensions + timestamp is unique, and select right backing index based on timestamp field) could be used for logs too. Other set a fields + timestamp could define uniqueness and we could route on timestamp field too.

The new planned logs index mode is currently designed to give best storage savings without adding any barriers to integration. For example host field will be used for index sorting, but if it for some reason is missing the new index mode will not complain about it.

So the question to me is how can provide deduplicate handling to logging data streams without adding additional barriers? For example, if we are using for example hostname, full file path, and file offset as a way to identify a unique log, then what will we do when one of these fields is missing? Another example how are going to handle logs with a timestamp for which we have not yet a backing index for?

Maybe we should take another look at tsdb routing and see what the barriers are perceived as a real integration issue and try to remove those barriers. So that at a later stage, we introduce a new routing mechanism to the logs index mode, without breaking bwc. But maybe in order to do this, we need to make certain fields required for any log document before the new logs index mode is GA?

@ruflin
Copy link
Member

ruflin commented Apr 2, 2024

I could see us rolling out deduplication in steps and as you said @martijnvg , no adoption blockers should exist. So for example, at first deduplication only works if it is in a specific timeframe and @timestamp + field x together are unique. Later we can improve to support wider time ranges, more different field types etc. I'm concerned about making specific fields required, instead if they don't exist, the optimisation is not applied and deduplication does not work.

@lhuet
Copy link

lhuet commented Apr 8, 2024

As discussed with @ruflin, I put here some issues/use cases that de-duplication would benefit:

  • When there is an incident in the ingestion process (Beats registry lost, kafka offset reset, ...), logs can be sent twice. Even with an id generated (fingerprint), it does not solve the duplication issue as the target index can change (rollover). This is a blocker for using datastreams for some customers.
  • For datasources needed to be fetched (databases, API, ...), implementing HA is not easy. Often, there is an active/passive mechanism with LS pipelines. Being able to fetch the datasource with 2 LS instances would allow an easy HA implementation if de-duplication was done on ES side.
  • When ingesting workstation logs/events (very common for security use cases), it happens regularly that logs are ingested with some delay (sometimes days after the events occurred if people are working out of the grid / in holidays / ...). This is not a duplication issue here but, the timestamp based routing could be beneficial and, for data in a "regulated business", it allows to delete the data more accurately/effectively.

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@ruflin ruflin removed their assignment Jun 20, 2024
@Rayzbam
Copy link

Rayzbam commented Jul 30, 2024

Any update ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/ILM+SLM Index and Snapshot lifecycle management >feature high hanging fruit Team:Data Management Meta label for data/management team
Projects
None yet
Development

No branches or pull requests