Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(elasticsearch): Add rest.li endpoint that does truncation cleanup of a timeseries index #8277

Merged
merged 11 commits into from
Jun 28, 2023

Conversation

iprentic
Copy link
Contributor

@iprentic iprentic commented Jun 21, 2023

Example usage:

$ curl  -d '{"entityType": "dataset", "aspect": "datasetusagestatistics", "endTimeMillis": 3000}' -X POST https://localhost:8080/operations\?action\=truncateTimeseriesAspect
{"value":"Delete 0 out of 201 rows (0.00%). Reindexing the aspect without the deleted records. This was a dry run. Run with dryRun = false to execute."}%  

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added devops PR or Issue related to DataHub backend & deployment product PR or Issue related to the DataHub UI/UX labels Jun 21, 2023
@iprentic iprentic changed the title feat(elasticsearch): Add rest.li endpoint that does truncation cleanup of a timeseries index [WIP] feat(elasticsearch): Add rest.li endpoint that does truncation cleanup of a timeseries index Jun 22, 2023
@iprentic iprentic changed the title [WIP] feat(elasticsearch): Add rest.li endpoint that does truncation cleanup of a timeseries index feat(elasticsearch): Add rest.li endpoint that does truncation cleanup of a timeseries index Jun 26, 2023
@iprentic
Copy link
Contributor Author

end to end test on quickstart:

Use the following script to ingest timeseries aspects into the dataset usage statistics aspect:

from datahub.metadata.schema_classes import (
    ChangeTypeClass,
    DatasetFieldUsageCountsClass,
    DatasetUsageStatisticsClass,
)
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter

print("make emitter")
my_emitter = DatahubRestEmitter("https://localhost:8080")
print("connection created")
for i in range(500):
  usageStats = DatasetUsageStatisticsClass(
            # Timestamp on June 1
            timestampMillis=1685654124000 + i * 1000 * 60 * 60,
            uniqueUserCount=10 + i,
            totalSqlQueries=20 + i,
            fieldCounts=[
                DatasetFieldUsageCountsClass(
                    fieldPath="user_id",
                    count=10
                )
            ]
        )
  mcpw = MetadataChangeProposalWrapper(
      entityType="dataset",
      aspectName="datasetUsageStatistics",
      changeType=ChangeTypeClass.UPSERT,
      entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
      aspect=usageStats,
  )
  my_emitter.emit(mcpw)
  print("emitted " + str(i))

Use get index sizes endpoint to check the size of the index:

$ curl -X POST https://localhost:8080/operations\?action\=getIndexSizes | jq
{
  "value": {
    "indexSizes": [
      {
        "aspectName": "datasetusagestatistics",
        "sizeMb": 140.513,
        "indexName": "dataset_datasetusagestatisticsaspect_v1",
        "entityName": "dataset"
      }
    ]
  }
}

Get the number of entries in the index using the ElasticSearch API:

$ curl -X GET "localhost:9200/dataset_datasetusagestatisticsaspect_v1/_count?pretty"                                        
{
  "count" : 1000,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}

Pick a timestamp within the range of those inserted and get the number of records less than that timestamp using the ElasticSearch API:

$ curl -X GET "localhost:9200/dataset_datasetusagestatisticsaspect_v1/_count?pretty" -H 'Content-Type: application/json' -d'
{
  "query" : {
    "range" : { "timestampMillis" : { "lte" : 1687295335000 } }
  }
}
'
{
  "count" : 912,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}

Dry run the deletion: using the new endpoint

$ curl --location --request POST  https://localhost:8080/operations\?action\=truncateTimeseriesAspect \
--header 'Content-Type: application/json' \
--data-raw '{
    "entityType": "dataset",
    "aspect": "datasetusagestatistics",
    "endTimeMillis": 1687295335000,
    "dryRun": true,
    "batchSize": 100,
    "timeoutSeconds": 3600
}'
{"value":"Delete 912 out of 1246 rows (73.19%). This was a dry run. Run with dryRun = false to execute."}%     

Run the deletion for real using the new endpoint:

$ curl --location --request POST  https://localhost:8080/operations\?action\=truncateTimeseriesAspect \
--header 'Content-Type: application/json' \
--data-raw '{
    "entityType": "dataset",
    "aspect": "datasetusagestatistics",
    "endTimeMillis": 1687295335000,
    "dryRun": false,
    "batchSize": 100,
    "timeoutSeconds": 3600
}'
{"value":"aM5bvWi4QOWWGNAmdCNg6A:571635"}% 

Query number of rows less than the timestamp after the deletion using ElasticSearch API:

$ curl -X GET "localhost:9200/dataset_datasetusagestatisticsaspect_v1/_count?pretty" -H 'Content-Type: application/json' -d'
{                                          
  "query" : {                                                                                      
    "range" : { "timestampMillis" : { "lte" : 1687295335000 } }
  }                                    
}                                    
'                   
{
  "count" : 0,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}

Get the total number of entries after the deletion using ElasticSearch API:

$ curl -X GET "localhost:9200/dataset_datasetusagestatisticsaspect_v1/_count?pretty"                                        
{
  "count" : 88,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}

Get the index size using the getIndexSizes endpoint:

$ curl -X POST https://localhost:8080/operations\?action\=getIndexSizes | jq
{
  "value": {
    "indexSizes": [
      {
      {
        "aspectName": "datasetusagestatistics",
        "sizeMb": 106.255,
        "indexName": "dataset_datasetusagestatisticsaspect_v1",
        "entityName": "dataset"
      }
    ]
  }
}

Get task status endpoint:

$ curl --location --request POST  https://localhost:8080/operations\?action\=getEsTaskStatus \
--header 'Content-Type: application/json' \
--data-raw '{
    "task": "aM5bvWi4QOWWGNAmdCNg6A:571635"
}'
{"value":"{\"completed\":true,\"taskId\":\"aM5bvWi4QOWWGNAmdCNg6A:571635\",\"runTimeNanos\":120861583,\"status\":\"{\\\"total\\\":912,\\\"updated\\\":0,\\\"created\\\":0,\\\"deleted\\\":912,\\\"batches\\\":10,\\\"version_conflicts\\\":0,\\\"noops\\\":0,\\\"retries\\\":{\\\"bulk\\\":0,\\\"search\\\":0},\\\"throttled_millis\\\":0,\\\"requests_per_second\\\":-1.0,\\\"throttled_until_millis\\\":0}\"}"}% 

return client.tasks().get(taskRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error(String.format("ERROR: Failed to get task status for %s:%d. See stacktrace for a more detailed error:", nodeId, taskId));
e.printStackTrace();
Copy link
Collaborator

@RyanHolstien RyanHolstien Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
e.printStackTrace();

Remove printStackTrace

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add the e to the log.error("message", e) to get this in the log

public class OperationsResourceTest extends TestCase {
private static final String TASK_ID = "taskId123";

TimeseriesAspectService mockTimeseriesAspectService = new TimeseriesAspectService() {
Copy link
Collaborator

@RyanHolstien RyanHolstien Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extract this preemptively as a mock implementation external class for possible reuse, people probably won't check to see if there are mock classes already inside tests before trying to write their own.

@david-leifker
Copy link
Collaborator

Overall this look good! Just look at the failing test and make sure that the test is deleting just what we expect and nothing else per Ryan's comment. Thank you!

@iprentic iprentic merged commit 6b8c4c8 into master Jun 28, 2023
34 of 35 checks passed
@iprentic iprentic deleted the nd-truncate-index-endpoint branch June 28, 2023 22:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
devops PR or Issue related to DataHub backend & deployment product PR or Issue related to the DataHub UI/UX
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants