From a40923d168f9ce60bfefdeea8010b7b633a9dac8 Mon Sep 17 00:00:00 2001 From: Ayush Date: Sat, 5 Dec 2020 00:02:50 -0600 Subject: [PATCH] add all code --- pre-processing/pre-processing-cfn.yaml | 119 ++++++++++++ .../pre-processing-code/lambda_function.py | 179 ++++++++++++++++++ .../pre-processing-code/s3_md5_compare.py | 49 +++++ .../pre-processing-code/source_data.py | 80 ++++++++ 4 files changed, 427 insertions(+) create mode 100644 pre-processing/pre-processing-cfn.yaml create mode 100644 pre-processing/pre-processing-code/lambda_function.py create mode 100644 pre-processing/pre-processing-code/s3_md5_compare.py create mode 100644 pre-processing/pre-processing-code/source_data.py diff --git a/pre-processing/pre-processing-cfn.yaml b/pre-processing/pre-processing-cfn.yaml new file mode 100644 index 0000000..ee813c9 --- /dev/null +++ b/pre-processing/pre-processing-cfn.yaml @@ -0,0 +1,119 @@ +--- +AWSTemplateFormatVersion: '2010-09-09' +Transform: "AWS::Serverless-2016-10-31" +# CloudFormation template in YAML to setup an automated process to update revisions to datasets in AWS Data Exchange +Parameters: + S3Bucket: + Type: String + Default: rearc-data-provider + Description: Provide the S3 Bucket name where this dataset resides. For e.g. rearc-data-provider + DataSetName: + Type: String + Description: Name of the dataset + DataSetArn: + Type: String + Description: ARN for the AWS Data Exchange dataset that needs to be updated + ProductId: + Type: String + Description: ID of the AWS Data Exchange product that needs to be updated + Region: + Type: String + Default: us-east-1 + Description: AWS Region for AWS Data Exchange + +Resources: +# Create an IAM role for Lambda to use + LambdaRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Statement: + - Action: + - sts:AssumeRole + Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Version: 2012-10-17 + ManagedPolicyArns: + - arn:aws:iam::aws:policy/AWSLambdaExecute + - arn:aws:iam::aws:policy/AmazonS3FullAccess + - arn:aws:iam::796406704065:policy/AWSDataExchangeService + - arn:aws:iam::aws:policy/AWSMarketplaceSellerFullAccess + Path: / + +# Create a Lambda function that will update daily revisions to AWS Data Exchange dataset + LambdaFunction: + Type: AWS::Lambda::Function + Properties: + FunctionName: + !Join + - '' + - - 'source-for-' + - !Ref DataSetName + Description: "Source revision updates to AWS Data Exchange data set" + Runtime: "python3.7" + Code: + S3Bucket: + Fn::Sub: ${S3Bucket} + S3Key: + !Join + - '' + - - Fn::Sub: ${DataSetName} + - '/automation/pre-processing-code.zip' + Handler: "lambda_function.lambda_handler" + MemorySize: 256 + Timeout: 120 + Role: + Fn::GetAtt: + - LambdaRole + - Arn + Environment: + Variables: + S3_BUCKET: + Fn::Sub: ${S3Bucket} + DATA_SET_NAME: + Fn::Sub: ${DataSetName} + DATA_SET_ARN: + Fn::Sub: ${DataSetArn} + PRODUCT_ID: + Fn::Sub: ${ProductId} + REGION: + Fn::Sub: ${Region} + + ScheduledRule: + Type: AWS::Events::Rule + Properties: + Description: "ScheduledRule" + ScheduleExpression: "cron(0 10 1/4 * ? *)" + State: "ENABLED" + Targets: + - + Arn: + Fn::GetAtt: + - "LambdaFunction" + - "Arn" + Id: "TargetFunctionV1" + + PermissionForEventsToInvokeLambda: + Type: AWS::Lambda::Permission + Properties: + FunctionName: + Ref: "LambdaFunction" + Action: "lambda:InvokeFunction" + Principal: "events.amazonaws.com" + SourceArn: + Fn::GetAtt: + - "ScheduledRule" + - "Arn" + +Outputs: + LambdaRoleARN: + Description: Role for Lambda execution. + Value: + Fn::GetAtt: + - LambdaRole + - Arn + LambdaFunctionName: + Value: + Ref: LambdaFunction \ No newline at end of file diff --git a/pre-processing/pre-processing-code/lambda_function.py b/pre-processing/pre-processing-code/lambda_function.py new file mode 100644 index 0000000..10cc4b1 --- /dev/null +++ b/pre-processing/pre-processing-code/lambda_function.py @@ -0,0 +1,179 @@ +import time +import json +from source_data import source_dataset +import boto3 +import os +from datetime import date, datetime +from multiprocessing.dummy import Pool + +os.environ['AWS_DATA_PATH'] = '/opt/' + +dataexchange = boto3.client( + service_name='dataexchange', + region_name=os.environ['REGION'] +) + +marketplace = boto3.client( + service_name='marketplace-catalog', + region_name=os.environ['REGION'] +) + +s3_bucket = os.environ['S3_BUCKET'] +data_set_arn = os.environ['DATA_SET_ARN'] +data_set_id = data_set_arn.split('/', 1)[1] +product_id = os.environ['PRODUCT_ID'] +data_set_name = os.environ['DATA_SET_NAME'] +new_s3_key = data_set_name + '/dataset/' +cfn_template = data_set_name + '/automation/cloudformation.yaml' +post_processing_code = data_set_name + '/automation/post-processing-code.zip' + +today = date.today().strftime('%Y-%m-%d') +revision_comment = 'Revision Updates v' + today + +if not s3_bucket: + raise Exception("'S3_BUCKET' environment variable must be defined!") + +if not new_s3_key: + raise Exception("'DATA_SET_NAME' environment variable must be defined!") + +if not data_set_arn: + raise Exception("'DATA_SET_ARN' environment variable must be defined!") + +if not product_id: + raise Exception("'PRODUCT_ID' environment variable must be defined!") + + +def start_change_set(describe_entity_response, revision_arn): + + # Call AWSMarketplace Catalog's start-change-set API to add revisions to the Product + change_details = { + 'DataSetArn': data_set_arn, + 'RevisionArns': [ + revision_arn + ] + } + + change_set = [ + { + 'ChangeType': 'AddRevisions', + 'Entity': { + 'Identifier': describe_entity_response['EntityIdentifier'], + 'Type': describe_entity_response['EntityType'] + }, + 'Details': json.dumps(change_details) + } + ] + + response = marketplace.start_change_set( + Catalog='AWSMarketplace', ChangeSet=change_set) + return response + + +def jobs_handler(data): + + # Used to store the Ids of the Jobs importing the assets to S3. + job_ids = set() + + print('asset_list {} of {}:'.format( + data['job_num'], data['total_jobs']), data['asset_list']) + + import_job = dataexchange.create_job( + Type='IMPORT_ASSETS_FROM_S3', + Details={ + 'ImportAssetsFromS3': { + 'DataSetId': data_set_id, + 'RevisionId': data['revision_id'], + 'AssetSources': data['asset_list'] + } + } + ) + + # Start the Job and save the JobId. + dataexchange.start_job(JobId=import_job['Id']) + job_ids.add(import_job['Id']) + + # Iterate until all remaining jobs have reached a terminal state, or an error is found. + completed_jobs = set() + + while job_ids != completed_jobs: + for job_id in job_ids: + if job_id in completed_jobs: + continue + get_job_response = dataexchange.get_job(JobId=job_id) + if get_job_response['State'] == 'COMPLETED': + print('JobId: {}, Job {} of {} completed'.format( + job_id, data['job_num'], data['total_jobs'])) + completed_jobs.add(job_id) + if get_job_response['State'] == 'ERROR': + job_errors = get_job_response['Errors'] + raise Exception( + 'JobId: {} failed with errors:\n{}'.format(job_id, job_errors)) + # Sleep to ensure we don't get throttled by the GetJob API. + time.sleep(.5) + + +def lambda_handler(event, context): + asset_list = source_dataset() + asset_lists = [asset_list[i:i+100] for i in range(0, len(asset_list), 100)] + + if type(asset_lists) == list: + + if len(asset_lists) == 0: + print( + 'No need for a revision, all datasets included with this product are up to date') + return { + 'statusCode': 200, + 'body': json.dumps('No need for a revision, all datasets included with this product are up to date') + } + + create_revision_response = dataexchange.create_revision( + DataSetId=data_set_id) + revision_id = create_revision_response['Id'] + revision_arn = create_revision_response['Arn'] + + for idx in range(len(asset_lists)): + asset_lists[idx] = { + 'asset_list': asset_lists[idx], + 'revision_id': revision_id, + 'job_num': str(idx + 1), + 'total_jobs': str(len(asset_lists)) + } + + with (Pool(10)) as p: + p.map(jobs_handler, asset_lists) + + update_revision_response = dataexchange.update_revision( + DataSetId=data_set_id, + RevisionId=revision_id, + Comment=revision_comment, + Finalized=True + ) + + revision_state = update_revision_response['Finalized'] + + if revision_state == True: + # Call AWSMarketplace Catalog's APIs to add revisions + describe_entity_response = marketplace.describe_entity( + Catalog='AWSMarketplace', EntityId=product_id) + start_change_set_response = start_change_set( + describe_entity_response, revision_arn) + if start_change_set_response['ChangeSetId']: + print('Revision updated successfully and added to the dataset') + return { + 'statusCode': 200, + 'body': json.dumps('Revision updated successfully and added to the dataset') + } + else: + print('Something went wrong with AWSMarketplace Catalog API') + return { + 'statusCode': 500, + 'body': json.dumps('Something went wrong with AWSMarketplace Catalog API') + } + else: + print('Revision did not complete successfully') + return { + 'statusCode': 500, + 'body': json.dumps('Revision did not complete successfully') + } + else: + raise Exception('Something went wrong when uploading files to s3') \ No newline at end of file diff --git a/pre-processing/pre-processing-code/s3_md5_compare.py b/pre-processing/pre-processing-code/s3_md5_compare.py new file mode 100644 index 0000000..07aebe4 --- /dev/null +++ b/pre-processing/pre-processing-code/s3_md5_compare.py @@ -0,0 +1,49 @@ +# Compare the md5 of a file to the s3 etag md5 +# Source: li xin on StackOverflow +# https://stackoverflow.com/questions/1775816/how-to-get-the-md5sum-of-a-file-on-amazons-s3 + +import hashlib +import botocore.exceptions + + +def md5_checksum(filename): + m = hashlib.md5() + with open(filename, 'rb') as f: + for data in iter(lambda: f.read(1024 * 1024), b''): + m.update(data) + return m.hexdigest() + + +def etag_checksum(filename, chunk_size=8 * 1024 * 1024): + md5s = [] + with open(filename, 'rb') as f: + for data in iter(lambda: f.read(chunk_size), b''): + md5s.append(hashlib.md5(data).digest()) + m = hashlib.md5(b"".join(md5s)) + return '{}-{}'.format(m.hexdigest(), len(md5s)) + + +def etag_compare(filename, etag): + et = etag[1:-1] # strip quotes + if '-' in et and et == etag_checksum(filename): + return False + if '-' not in et and et == md5_checksum(filename): + return False + return True + + +def md5_compare(s3, bucket_name, s3_key, filename): + # Get the file metadata from s3 + # If the file does not exist, return True for changes found + try: + obj_dict = s3.head_object(Bucket=bucket_name, Key=s3_key) + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + if error_code == '404': + return True + + etag = (obj_dict['ETag']) + + md5_matches = etag_compare(filename, etag) + + return md5_matches \ No newline at end of file diff --git a/pre-processing/pre-processing-code/source_data.py b/pre-processing/pre-processing-code/source_data.py new file mode 100644 index 0000000..e8155bb --- /dev/null +++ b/pre-processing/pre-processing-code/source_data.py @@ -0,0 +1,80 @@ +import os +import boto3 +import time +from urllib.request import urlopen +from urllib.error import URLError, HTTPError +from zipfile import ZipFile +from s3_md5_compare import md5_compare + + +def source_dataset(): + + source_dataset_url = 'https://download.cms.gov/nppes/NPPES_Data_Dissemination_November_2020.zip' + response = None + + retries = 5 + for attempt in range(retries): + try: + response = urlopen(source_dataset_url) + except HTTPError as e: + if attempt == retries: + raise Exception('HTTPError: ', e.code) + time.sleep(0.2 * attempt) + + except URLError as e: + if attempt == retries: + raise Exception('URLError: ', e.reason) + time.sleep(0.2 * attempt) + else: + break + + if response == None: + raise Exception('There was an issue downloading the dataset') + + data_set_name = os.environ['DATA_SET_NAME'] + zip_location = '/tmp/' + data_set_name + '.zip' + + # unzips the zipped folder + with open(zip_location, 'wb') as f: + f.write(response.read()) + + with ZipFile(zip_location, 'r') as z: + z.extractall('/tmp') + + os.remove(zip_location) + + folder_dir = os.listdir('/tmp')[0] + + # variables/resources used to upload to s3 + s3_bucket = os.environ['S3_BUCKET'] + s3 = boto3.client('s3') + + s3_uploads = [] + for r, d, f in os.walk('/tmp/' + folder_dir): + for filename in f: + obj_name = os.path.join(r, filename).split( + '/', 3).pop().replace(' ', '_').lower() + file_location = os.path.join(r, filename) + new_s3_key = data_set_name + '/dataset/' + obj_name + + has_changes = md5_compare(s3, s3_bucket, new_s3_key, file_location) + if has_changes: + s3.upload_file(file_location, s3_bucket, new_s3_key) + print('Uploaded: ' + filename) + else: + print('No changes in: ' + filename) + + asset_source = {'Bucket': s3_bucket, 'Key': new_s3_key} + s3_uploads.append({'has_changes': has_changes, + 'asset_source': asset_source}) + + count_updated_data = sum( + upload['has_changes'] == True for upload in s3_uploads) + if count_updated_data > 0: + asset_list = list( + map(lambda upload: upload['asset_source'], s3_uploads)) + if len(asset_list) == 0: + raise Exception('Something went wrong when uploading files to s3') + return asset_list + else: + return [] \ No newline at end of file