diff --git a/.gitignore b/.gitignore index 776fa6d..23d444d 100644 --- a/.gitignore +++ b/.gitignore @@ -181,3 +181,7 @@ $RECYCLE.BIN/ **/pre-processing-code.zip response.json + +### PyCharm +.idea/ +venv/ diff --git a/init.sh b/init.sh old mode 100644 new mode 100755 index bb5a18f..ba73e79 --- a/init.sh +++ b/init.sh @@ -19,16 +19,47 @@ while [[ $# -gt 0 ]]; do shift; current_arg="$1" case ${opt} in - "-s"|"--s3-bucket") export S3_BUCKET="$1"; shift;; - "-d"|"--dataset-name") export DATASET_NAME="$1"; shift;; - "-p"|"--product-name") export PRODUCT_NAME="$1"; shift;; - "-i"|"--product-id") export PRODUCT_ID="$1"; shift;; - "-r"|"--region") export REGION="$1"; shift;; - "-f"|"--profile") PROFILE=" --profile $1"; shift;; + "--rdp-role-arn") export REARC_DATA_PLATFORM_ROLE_ARN="$1"; shift;; + "--rdp-external-id") export REARC_DATA_PLATFORM_EXTERNAL_ID="$1"; shift;; + "--customer-id") export CUSTOMER_ID="$1"; shift;; + "--schedule-cron") export SCHEDULE_CRON="$1"; shift;; + "--asset-bucket") export ASSET_BUCKET="$1"; shift;; + "--manifest-bucket") export MANIFEST_BUCKET="$1"; shift;; + "--dataset-name") export DATASET_NAME="$1"; shift;; + "--product-name") export PRODUCT_NAME="$1"; shift;; + "--product-id") export PRODUCT_ID="$1"; shift;; + "--dataset-arn") export DATASET_ARN="$1"; shift;; + "--region") export REGION="$1"; shift;; + "--first-revision") export FIRST_REVISION="$1"; shift;; + "--products-info-file") export PRODUCTS_INFO_FILE="$1"; shift;; + "--source-url") export SOURCE_URL="$1"; shift;; + "--product-code") export PRODUCT_CODE="$1"; shift;; + "--product-url") export PRODUCT_URL="$1"; shift;; + "--profile") PROFILE=" --profile $1"; shift;; *) echo "ERROR: Invalid option: \""$opt"\"" >&2; exit 1;; esac done +echo "------------------------------------------------------------------------------" +echo "REARC_DATA_PLATFORM_ROLE_ARN: $REARC_DATA_PLATFORM_ROLE_ARN" +echo "REARC_DATA_PLATFORM_EXTERNAL_ID: $REARC_DATA_PLATFORM_EXTERNAL_ID" +echo "CUSTOMER_ID: $CUSTOMER_ID" +echo "ASSET_BUCKET: $ASSET_BUCKET" +echo "MANIFEST_BUCKET: $MANIFEST_BUCKET" +echo "DATASET_NAME: $DATASET_NAME" +echo "DATASET_ARN: $DATASET_ARN" +echo "PRODUCT_NAME: $PRODUCT_NAME" +echo "PRODUCT_ID: $PRODUCT_ID" +echo "SCHEDULE_CRON: $SCHEDULE_CRON" +echo "REGION: $REGION" +echo "PROFILE: $PROFILE" +echo "PRODUCTS_INFO_FILE: $PRODUCTS_INFO_FILE" +echo "SOURCE_URL: $SOURCE_URL" +echo "PRODUCT_CODE: $PRODUCT_CODE" +echo "PRODUCT_URL: $PRODUCT_URL" +echo "FIRST_REVISION: $FIRST_REVISION" +echo "------------------------------------------------------------------------------" + while [[ ${#DATASET_NAME} -gt 53 ]]; do echo "dataset-name must be under 53 characters in length, enter a shorter name:" read -p "New dataset-name: " DATASET_NAME @@ -47,109 +78,68 @@ while [[ ${#PRODUCT_NAME} -gt 72 ]]; do esac done -#creating a pre-processing zip package, these commands may need to be adjusted depending on folder structure and dependencies +echo "creating a pre-processing zip package, these commands may need to be adjusted depending on folder structure and dependencies" (cd pre-processing/pre-processing-code && zip -r pre-processing-code.zip . -x "*.dist-info/*" -x "bin/*" -x "**/__pycache__/*") -#upload pre-preprocessing.zip to s3 echo "uploading pre-preprocessing.zip to s3" -aws s3 cp pre-processing/pre-processing-code/pre-processing-code.zip s3://$S3_BUCKET/$DATASET_NAME/automation/pre-processing-code.zip --region $REGION$PROFILE - -#creating dataset on ADX -echo "creating dataset on ADX" -DATASET_COMMAND="aws dataexchange create-data-set --asset-type "S3_SNAPSHOT" --description file://dataset-description.md --name \"${PRODUCT_NAME}\" --region $REGION --output json$PROFILE" -DATASET_OUTPUT=$(eval $DATASET_COMMAND) -DATASET_ARN=$(echo $DATASET_OUTPUT | tr '\r\n' ' ' | jq -r '.Arn') -DATASET_ID=$(echo $DATASET_OUTPUT | tr '\r\n' ' ' | jq -r '.Id') - -#creating pre-processing cloudformation stack -echo "creating pre-processing cloudformation stack" -CFN_STACK_NAME="producer-${DATASET_NAME}-preprocessing" -aws cloudformation create-stack --stack-name $CFN_STACK_NAME --template-body file://pre-processing/pre-processing-cfn.yaml --parameters ParameterKey=S3Bucket,ParameterValue=$S3_BUCKET ParameterKey=DataSetName,ParameterValue=$DATASET_NAME ParameterKey=DataSetArn,ParameterValue=$DATASET_ARN ParameterKey=ProductId,ParameterValue=$PRODUCT_ID ParameterKey=Region,ParameterValue=$REGION --region $REGION --capabilities "CAPABILITY_AUTO_EXPAND" "CAPABILITY_NAMED_IAM" "CAPABILITY_IAM"$PROFILE - -echo "waiting for cloudformation stack to complete" -aws cloudformation wait stack-create-complete --stack-name $CFN_STACK_NAME --region $REGION$PROFILE - -if [[ $? -ne 0 ]] -then - # Cloudformation stack created - echo "Cloudformation stack creation failed" - exit 1 -fi - -#invoking the pre-processing lambda function to create first dataset revision -echo "invoking the pre-processing lambda function to create first dataset revision" -LAMBDA_FUNCTION_NAME="source-for-${DATASET_NAME}" -# AWS CLI version 2 changes require explicitly declairing `--cli-binary-format raw-in-base64-out` for the format of the `--payload` -LAMBDA_FUNCTION_STATUS_CODE=$(aws lambda invoke --function-name $LAMBDA_FUNCTION_NAME --invocation-type "RequestResponse" --payload '{ "test": "event" }' response.json --cli-binary-format raw-in-base64-out --region $REGION --query 'StatusCode' --output text$PROFILE) +aws s3 cp pre-processing/pre-processing-code/pre-processing-code.zip s3://$ASSET_BUCKET/$DATASET_NAME/automation/pre-processing-code.zip --region "$REGION" $PROFILE -#grabbing dataset revision status -echo "grabbing dataset revision status" -DATASET_REVISION_STATUS=$(aws dataexchange list-data-set-revisions --data-set-id $DATASET_ID --region $REGION --query "sort_by(Revisions, &CreatedAt)[-1].Finalized"$PROFILE) +if [[ "$FIRST_REVISION" == "true" ]]; then + echo "creating dataset on ADX" + DATASET_COMMAND="aws dataexchange create-data-set --asset-type "S3_SNAPSHOT" --description file://dataset-description.md --name \"${PRODUCT_NAME}\" --region $REGION --output json $PROFILE" + DATASET_OUTPUT=$(eval $DATASET_COMMAND) + DATASET_ARN=$(echo $DATASET_OUTPUT | tr '\r\n' ' ' | jq -r '.Arn') + DATASET_ID=$(echo $DATASET_OUTPUT | tr '\r\n' ' ' | jq -r '.Id') -update () { - echo "" - echo "Manually create the ADX product and enter in the Product ID below:" - read -p "Product ID: " NEW_PRODUCT_ID - - # Cloudformation stack update - echo "updating pre-processing cloudformation stack" - aws cloudformation update-stack --stack-name $CFN_STACK_NAME --use-previous-template --parameters ParameterKey=S3Bucket,ParameterValue=$S3_BUCKET ParameterKey=DataSetName,ParameterValue=$DATASET_NAME ParameterKey=DataSetArn,ParameterValue=$DATASET_ARN ParameterKey=ProductId,ParameterValue=$NEW_PRODUCT_ID ParameterKey=Region,ParameterValue=$REGION --region $REGION --capabilities "CAPABILITY_AUTO_EXPAND" "CAPABILITY_NAMED_IAM" "CAPABILITY_IAM"$PROFILE - - echo "waiting for cloudformation stack update to complete" - aws cloudformation wait stack-update-complete --stack-name $CFN_STACK_NAME --region $REGION$PROFILE - - if [[ $? -ne 0 ]] - then - echo "Cloudformation stack update failed" - break + if [[ -n "$PRODUCTS_INFO_FILE" ]]; then + echo "{\"PRODUCT_CODE\":\"${PRODUCT_CODE}\",\"PRODUCT_URL\":\"${PRODUCT_URL}\",\"SOURCE_URL\": \"${SOURCE_URL}\",\"DATASET_NAME\":\"${DATASET_NAME}\",\"DATASET_ARN\":\"${DATASET_ARN}\",\"DATASET_ID\":\"${DATASET_ID}\",\"PRODUCT_NAME\":\"${PRODUCT_NAME}\",\"PRODUCT_ID\":\"${PRODUCT_ID}\",\"SCHEDULE_CRON\":\"${SCHEDULE_CRON}\"}" >> "$PRODUCTS_INFO_FILE" fi - echo "cloudformation stack update completed" -} - -delete () { - echo "Destroying the CloudFormation stack" - aws cloudformation delete-stack --stack-name $CFN_STACK_NAME --region $REGION$PROFILE - - #check status of cloudformation stack delete action - aws cloudformation wait stack-delete-complete --stack-name $CFN_STACK_NAME --region $REGION$PROFILE - if [[ $? -eq 0 ]] - then - # Cloudformation stack deleted - echo "CloudFormation stack successfully deleted" - break - else - # Cloudformation stack deletion failed - echo "Cloudformation stack deletion failed" - exit 1 - fi -} -if [[ $DATASET_REVISION_STATUS == "true" ]] -then - echo "Dataset revision completed successfully" - echo "" + echo "Uploading intial assets to asset_bucket for the first revision" + aws s3 cp product-description.md "s3://$ASSET_BUCKET/$DATASET_NAME/dataset/product-description.md" + aws s3 cp dataset-description.md "s3://$ASSET_BUCKET/$DATASET_NAME/dataset/dataset-description.md" - while true; do - echo "Do you want use this script to update the CloudFormation stack? If you enter 'n' your CloudFormation stack will be destroyed:" - read -p "('y' to update / 'n' to destroy): " Y_N - case $Y_N in - [Yy]* ) update; exit;; - [Nn]* ) delete; break;; - * ) echo "Enter 'y' or 'n'.";; - esac - done - - echo "Manually create the ADX product and manually re-run the pre-processing CloudFormation template using the following params:" + REVISION_COMMAND="aws dataexchange create-data-set --asset-type "S3_SNAPSHOT" --description file://dataset-description.md --name \"${PRODUCT_NAME}\" --region $REGION --output json $PROFILE" + REVISION_OUTPUT=$(eval $REVISION_COMMAND) + + echo "Manually, from ADX console, create the first revision of the dataset using + product-description.md and dataset-description.md files and + then create the ADX product. + Then manually re-run the pre-processing CloudFormation template using the following params:" echo "" - echo "S3Bucket: $S3_BUCKET" + echo "AssetBucket: $ASSET_BUCKET" + echo "ManifestBucket: $MANIFEST_BUCKET" + echo "CustomerId: $CUSTOMER_ID" echo "DataSetName: $DATASET_NAME" echo "DataSetArn: $DATASET_ARN" echo "Region: $REGION" - echo "S3Bucket: $S3_BUCKET" + echo "FIRST_REVISION: false" echo "" echo "For the ProductId param use the Product ID of the ADX product" else - echo "Dataset revision failed" - cat response.json + DATASET_ID=$(echo $DATASET_ARN | awk -F/ '{print $NF}') + + echo "creating pre-processing cloudformation stack" + CFN_STACK_NAME="producer-${DATASET_NAME}-preprocessing" + aws cloudformation create-stack --stack-name "$CFN_STACK_NAME" --template-body file://pre-processing/pre-processing-cfn.yaml --parameters ParameterKey=RearcDataPlatformRoleArn,ParameterValue="$REARC_DATA_PLATFORM_ROLE_ARN" ParameterKey=RearcDataPlatformExternalId,ParameterValue="$REARC_DATA_PLATFORM_EXTERNAL_ID" ParameterKey=AssetBucket,ParameterValue="$ASSET_BUCKET" ParameterKey=ManifestBucket,ParameterValue="$MANIFEST_BUCKET" ParameterKey=CustomerId,ParameterValue="$CUSTOMER_ID" ParameterKey=DataSetName,ParameterValue="$DATASET_NAME" ParameterKey=DataSetArn,ParameterValue="$DATASET_ARN" ParameterKey=ProductId,ParameterValue="$PRODUCT_ID" ParameterKey=Region,ParameterValue="$REGION" ParameterKey=ScheduleCron,ParameterValue="'$SCHEDULE_CRON'" --region "$REGION" --capabilities "CAPABILITY_AUTO_EXPAND" "CAPABILITY_NAMED_IAM" "CAPABILITY_IAM" $PROFILE + + echo "waiting for cloudformation stack creation to complete" + aws cloudformation wait stack-create-complete --stack-name "$CFN_STACK_NAME" --region "$REGION" $PROFILE + + if [[ $? -ne 0 ]]; then + echo "Cloudformation stack creation failed" + exit 1 + fi + + echo "invoking the pre-processing lambda function to upload manifest file to manifest bucket" + LAMBDA_FUNCTION_NAME="source-for-${DATASET_NAME}" + # AWS CLI version 2 changes require explicitly declairing `--cli-binary-format raw-in-base64-out` for the format of the `--payload` + aws lambda invoke --function-name "$LAMBDA_FUNCTION_NAME" --invocation-type "RequestResponse" --payload '{ "test": "event" }' response.json --cli-binary-format raw-in-base64-out --region "$REGION" --query 'StatusCode' --output text $PROFILE + + if [[ $? -ne 0 ]]; then + echo "Lambda invocation failed" + exit 1 + fi + fi diff --git a/migrate.sh b/migrate.sh new file mode 100644 index 0000000..31f0c8f --- /dev/null +++ b/migrate.sh @@ -0,0 +1,38 @@ + +#### You only need to run this part ONCE + +# clone the template +git clone https://github.com/rearc-data/adx-product-rearc-data-platform-template.git + +# remove extra files / folders +cd temple_folder +rm *.md +rm -rf .git +rm pre-processing/pre-processing-code/source_data.py + +cd .. + + +##### Now run the following commands FOR EACH PRODUCT +# Step 1: +# Go to the cloudformation console, find the stack for the product you want to migrate, copy the parameters section, delete the stack + +# Step 2: once the stack is deleted: +git clone https://github.com/rearc-data/fred-privately-owned-housing.git +cd fred-privately-owned-housing + +git checkout -b rdp +cp -a ../adx-product-rearc-data-platform-template/. ./ + +# in run.sh +# Step 3: Using the parameters you have copied from the cloudformation stack, +# and your AWS profile name, update the variable names in run.sh + +# Step 4: in sorce.py +# replace: os.getenv('S3_BUCKET') => os.getenv('ASSET_BUCKET') +# replace: os.environ['DATA_SET_NAME'] => os.environ['DATASET_NAME'] + + +# Step 5: make sure variabke names are correct in run.sh, then run it +chmod a+x run.sh +./run.sh diff --git a/pre-processing/pre-processing-cfn.yaml b/pre-processing/pre-processing-cfn.yaml index b43a4ad..a9d50cf 100644 --- a/pre-processing/pre-processing-cfn.yaml +++ b/pre-processing/pre-processing-cfn.yaml @@ -3,10 +3,23 @@ AWSTemplateFormatVersion: '2010-09-09' Transform: "AWS::Serverless-2016-10-31" # CloudFormation template in YAML to setup an automated process to update revisions to datasets in AWS Data Exchange Parameters: - S3Bucket: + RearcDataPlatformRoleArn: + Type: String + Description: The Cross Account IAM Role ARN to Access the Manifest Bucket in Rearc Data Platform's control plane account. + RearcDataPlatformExternalId: + Type: String + Description: The Cross Account External ID to Access the Manifest Bucket in Rearc Data Platform's control plane account. + AssetBucket: Type: String Default: rearc-data-provider - Description: Provide the S3 Bucket name where this dataset resides. For e.g. rearc-data-provider + Description: The S3 Bucket name where this dataset resides. + ManifestBucket: + Type: String + Description: The Manifest S3 Bucket name where the manifest file will be uploaded. + CustomerId: + Type: String + Default: rearc + Description: This is the customer Id in Rearc Data Platform and will be used as prefix in manifest bucket. DataSetName: Type: String Description: Name of the dataset @@ -20,9 +33,14 @@ Parameters: Type: String Default: us-east-1 Description: AWS Region for AWS Data Exchange - + ScheduleCron: + Type: String + Description: Cron Expression for the Lambda Run Schedule +Mappings: + Resources: + FailureSNSTopic: + Arn: arn:aws:sns:us-east-1:796406704065:adx-lambda-error Resources: -# Create an IAM role for Lambda to use LambdaRole: Type: AWS::IAM::Role Properties: @@ -36,13 +54,20 @@ Resources: - lambda.amazonaws.com Version: 2012-10-17 ManagedPolicyArns: - - arn:aws:iam::aws:policy/AWSLambdaExecute + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/AmazonS3FullAccess - - arn:aws:iam::796406704065:policy/AWSDataExchangeService - - arn:aws:iam::aws:policy/AWSMarketplaceSellerFullAccess - Path: / + Policies: + - PolicyName: AssumeRearcDataPlatformRole + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: sts:AssumeRole + Resource: !Sub '${RearcDataPlatformRoleArn}' + - Effect: Allow + Action: sns:Publish + Resource: !FindInMap [Resources, FailureSNSTopic, Arn] -# Create a Lambda function that will update daily revisions to AWS Data Exchange dataset LambdaFunction: Type: AWS::Lambda::Function Properties: @@ -54,20 +79,16 @@ Resources: Description: "Source revision updates to AWS Data Exchange data set" Runtime: "python3.7" Code: - S3Bucket: - Fn::Sub: ${S3Bucket} + S3Bucket: !Sub ${AssetBucket} S3Key: !Join - '' - - - Fn::Sub: ${DataSetName} + - - !Sub ${DataSetName} - '/automation/pre-processing-code.zip' Handler: "lambda_function.lambda_handler" - MemorySize: 256 - Timeout: 120 - Role: - Fn::GetAtt: - - LambdaRole - - Arn + MemorySize: 2048 + Timeout: 900 + Role: !GetAtt [ LambdaRole, Arn ] Environment: Variables: RDP_ROLE_ARN: !Sub ${RearcDataPlatformRoleArn} @@ -83,18 +104,29 @@ Resources: # - Ref: PyRearcADXLayer Layers: ["arn:aws:lambda:us-east-1:796406704065:layer:rearc-data-utils:42"] +# PyRearcADXLayer: +# Type: AWS::Lambda::LayerVersion +# Properties: +# CompatibleRuntimes: +# - python3.8 +# Content: +# S3Bucket: !Sub "${AssetBucket}" +# S3Key: +# !Join +# - '' +# - - !Sub ${DataSetName} +# - '/automation/python-layer.zip' +# Description: PyRearcADXLayer Lambda Layer for Python3.8 + ScheduledRule: Type: AWS::Events::Rule Properties: Description: "ScheduledRule" - ScheduleExpression: "cron(0 10 1/4 * ? *)" + ScheduleExpression: !Sub ${ScheduleCron} State: "ENABLED" Targets: - - Arn: - Fn::GetAtt: - - "LambdaFunction" - - "Arn" + Arn: !GetAtt [ LambdaFunction, Arn ] Id: "TargetFunctionV1" PermissionForEventsToInvokeLambda: @@ -104,11 +136,8 @@ Resources: Ref: "LambdaFunction" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" - SourceArn: - Fn::GetAtt: - - "ScheduledRule" - - "Arn" - + SourceArn: !GetAtt [ ScheduledRule, Arn ] + version: Type: AWS::Lambda::Version Properties: @@ -128,10 +157,6 @@ Resources: Outputs: LambdaRoleARN: Description: Role for Lambda execution. - Value: - Fn::GetAtt: - - LambdaRole - - Arn + Value: !GetAtt [ LambdaRole, Arn ] LambdaFunctionName: - Value: - Ref: LambdaFunction + Value: !Ref "LambdaFunction" diff --git a/pre-processing/pre-processing-code/lambda_function.py b/pre-processing/pre-processing-code/lambda_function.py index 10cc4b1..8bcdecd 100644 --- a/pre-processing/pre-processing-code/lambda_function.py +++ b/pre-processing/pre-processing-code/lambda_function.py @@ -1,179 +1,65 @@ import time import json from source_data import source_dataset -import boto3 import os +import logging from datetime import date, datetime from multiprocessing.dummy import Pool -os.environ['AWS_DATA_PATH'] = '/opt/' +from rearc_data_utils.platform.sts_helper import get_data_plane_client -dataexchange = boto3.client( - service_name='dataexchange', - region_name=os.environ['REGION'] -) +logger = logging.getLogger(__name__) -marketplace = boto3.client( - service_name='marketplace-catalog', - region_name=os.environ['REGION'] -) +os.environ['AWS_DATA_PATH'] = '/opt/' -s3_bucket = os.environ['S3_BUCKET'] -data_set_arn = os.environ['DATA_SET_ARN'] -data_set_id = data_set_arn.split('/', 1)[1] -product_id = os.environ['PRODUCT_ID'] -data_set_name = os.environ['DATA_SET_NAME'] -new_s3_key = data_set_name + '/dataset/' -cfn_template = data_set_name + '/automation/cloudformation.yaml' -post_processing_code = data_set_name + '/automation/post-processing-code.zip' +rdp_role_arn = os.getenv("RDP_ROLE_ARN") +rdp_external_id = os.getenv("RDP_EXTERNAL_ID") -today = date.today().strftime('%Y-%m-%d') -revision_comment = 'Revision Updates v' + today +product_id = os.getenv("PRODUCT_ID", "blank") +dataset_arn = os.getenv("DATASET_ARN") +dataset_id = dataset_arn.split('/', 1)[1] +dataset_name = os.getenv("DATASET_NAME") -if not s3_bucket: - raise Exception("'S3_BUCKET' environment variable must be defined!") +asset_bucket = os.getenv("ASSET_BUCKET") +manifest_bucket = os.getenv("MANIFEST_BUCKET") +customer_id = os.getenv("CUSTOMER_ID") -if not new_s3_key: - raise Exception("'DATA_SET_NAME' environment variable must be defined!") +timestamp = datetime.now().strftime("%Y-%m-%d-%H%M%S") +revision_comment = 'Revision Updates v' + timestamp -if not data_set_arn: - raise Exception("'DATA_SET_ARN' environment variable must be defined!") +required_env_vars = [ rdp_role_arn, rdp_external_id, product_id, dataset_arn, dataset_name, asset_bucket, manifest_bucket, customer_id ] +for v in required_env_vars: + if not v: + raise Exception(f"'{v.upper()}' environment variable must be defined!") -if not product_id: - raise Exception("'PRODUCT_ID' environment variable must be defined!") +logger.info(f"rdp_role_arn: {rdp_role_arn} ; rdp_external_id: {rdp_external_id}") +s3 = get_data_plane_client(rdp_role_arn, rdp_external_id, service_name='s3') -def start_change_set(describe_entity_response, revision_arn): - # Call AWSMarketplace Catalog's start-change-set API to add revisions to the Product - change_details = { - 'DataSetArn': data_set_arn, - 'RevisionArns': [ - revision_arn - ] +def upload_manifest_file(asset_list): + """ + Generates and uploads a manifest file to the MANIFEST_BUCKET of the + Rearc Data Platform or AWS Data Exchange Publisher Coordinator + https://github.com/rearc-data/aws-data-exchange-publisher-coordinator + """ + manifest_file_data = { + "product_id": product_id, + "dataset_id": dataset_id, + "asset_list": asset_list } - change_set = [ - { - 'ChangeType': 'AddRevisions', - 'Entity': { - 'Identifier': describe_entity_response['EntityIdentifier'], - 'Type': describe_entity_response['EntityType'] - }, - 'Details': json.dumps(change_details) - } - ] - - response = marketplace.start_change_set( - Catalog='AWSMarketplace', ChangeSet=change_set) - return response - - -def jobs_handler(data): - - # Used to store the Ids of the Jobs importing the assets to S3. - job_ids = set() - - print('asset_list {} of {}:'.format( - data['job_num'], data['total_jobs']), data['asset_list']) - - import_job = dataexchange.create_job( - Type='IMPORT_ASSETS_FROM_S3', - Details={ - 'ImportAssetsFromS3': { - 'DataSetId': data_set_id, - 'RevisionId': data['revision_id'], - 'AssetSources': data['asset_list'] - } - } - ) - - # Start the Job and save the JobId. - dataexchange.start_job(JobId=import_job['Id']) - job_ids.add(import_job['Id']) - - # Iterate until all remaining jobs have reached a terminal state, or an error is found. - completed_jobs = set() - - while job_ids != completed_jobs: - for job_id in job_ids: - if job_id in completed_jobs: - continue - get_job_response = dataexchange.get_job(JobId=job_id) - if get_job_response['State'] == 'COMPLETED': - print('JobId: {}, Job {} of {} completed'.format( - job_id, data['job_num'], data['total_jobs'])) - completed_jobs.add(job_id) - if get_job_response['State'] == 'ERROR': - job_errors = get_job_response['Errors'] - raise Exception( - 'JobId: {} failed with errors:\n{}'.format(job_id, job_errors)) - # Sleep to ensure we don't get throttled by the GetJob API. - time.sleep(.5) + manifest_file_name = f"manifest-{timestamp}.json" + manifest_object_key = os.path.join(customer_id, product_id, dataset_id, manifest_file_name) + + s3.put_object(Body=json.dumps(manifest_file_data), Bucket=manifest_bucket, Key=manifest_object_key, ACL="bucket-owner-full-control") def lambda_handler(event, context): asset_list = source_dataset() - asset_lists = [asset_list[i:i+100] for i in range(0, len(asset_list), 100)] - - if type(asset_lists) == list: - - if len(asset_lists) == 0: - print( - 'No need for a revision, all datasets included with this product are up to date') - return { - 'statusCode': 200, - 'body': json.dumps('No need for a revision, all datasets included with this product are up to date') - } - - create_revision_response = dataexchange.create_revision( - DataSetId=data_set_id) - revision_id = create_revision_response['Id'] - revision_arn = create_revision_response['Arn'] - - for idx in range(len(asset_lists)): - asset_lists[idx] = { - 'asset_list': asset_lists[idx], - 'revision_id': revision_id, - 'job_num': str(idx + 1), - 'total_jobs': str(len(asset_lists)) - } - - with (Pool(10)) as p: - p.map(jobs_handler, asset_lists) - - update_revision_response = dataexchange.update_revision( - DataSetId=data_set_id, - RevisionId=revision_id, - Comment=revision_comment, - Finalized=True - ) - - revision_state = update_revision_response['Finalized'] - - if revision_state == True: - # Call AWSMarketplace Catalog's APIs to add revisions - describe_entity_response = marketplace.describe_entity( - Catalog='AWSMarketplace', EntityId=product_id) - start_change_set_response = start_change_set( - describe_entity_response, revision_arn) - if start_change_set_response['ChangeSetId']: - print('Revision updated successfully and added to the dataset') - return { - 'statusCode': 200, - 'body': json.dumps('Revision updated successfully and added to the dataset') - } - else: - print('Something went wrong with AWSMarketplace Catalog API') - return { - 'statusCode': 500, - 'body': json.dumps('Something went wrong with AWSMarketplace Catalog API') - } - else: - print('Revision did not complete successfully') - return { - 'statusCode': 500, - 'body': json.dumps('Revision did not complete successfully') - } - else: - raise Exception('Something went wrong when uploading files to s3') \ No newline at end of file + + if asset_list: + try: + upload_manifest_file(asset_list) + except Exception as e: + raise Exception(f"Something went wrong when uploading manifest file to manifest bucket: {e}") diff --git a/pre-processing/pre-processing-code/s3_md5_compare.py b/pre-processing/pre-processing-code/s3_md5_compare.py index 07aebe4..b4cf0e6 100644 --- a/pre-processing/pre-processing-code/s3_md5_compare.py +++ b/pre-processing/pre-processing-code/s3_md5_compare.py @@ -6,33 +6,31 @@ import botocore.exceptions -def md5_checksum(filename): +def md5_checksum(response): m = hashlib.md5() - with open(filename, 'rb') as f: - for data in iter(lambda: f.read(1024 * 1024), b''): - m.update(data) + for data in iter(lambda: response.read(1024 * 1024), b''): + m.update(data) return m.hexdigest() -def etag_checksum(filename, chunk_size=8 * 1024 * 1024): +def etag_checksum(response, chunk_size=8 * 1024 * 1024): md5s = [] - with open(filename, 'rb') as f: - for data in iter(lambda: f.read(chunk_size), b''): - md5s.append(hashlib.md5(data).digest()) + for data in iter(lambda: response.read(chunk_size), b''): + md5s.append(hashlib.md5(data).digest()) m = hashlib.md5(b"".join(md5s)) return '{}-{}'.format(m.hexdigest(), len(md5s)) -def etag_compare(filename, etag): +def etag_compare(response, etag): et = etag[1:-1] # strip quotes - if '-' in et and et == etag_checksum(filename): + if '-' in et and et == etag_checksum(response): return False - if '-' not in et and et == md5_checksum(filename): + if '-' not in et and et == md5_checksum(response): return False return True -def md5_compare(s3, bucket_name, s3_key, filename): +def md5_compare(s3, bucket_name, s3_key, response): # Get the file metadata from s3 # If the file does not exist, return True for changes found try: @@ -44,6 +42,6 @@ def md5_compare(s3, bucket_name, s3_key, filename): etag = (obj_dict['ETag']) - md5_matches = etag_compare(filename, etag) + md5_matches = etag_compare(response, etag) - return md5_matches \ No newline at end of file + return md5_matches diff --git a/rearc_logo_rgb.png b/rearc_logo_rgb.png old mode 100644 new mode 100755 diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..7720502 --- /dev/null +++ b/run.sh @@ -0,0 +1,45 @@ +REARC_DATA_PLATFORM_ROLE_ARN='arn:aws:iam::412981388937:role/CrossAccountRole-796406704065-796406704065' +REARC_DATA_PLATFORM_EXTERNAL_ID='Rearc-Data-Platform-796406704065' +ASSET_BUCKET='rearc-data-provider' +MANIFEST_BUCKET='rearc-control-plane-manifest' +CUSTOMER_ID='796406704065' +DATASET_NAME='nppes-npi-registry-data-cms' +DATASET_ARN='arn:aws:dataexchange:us-east-1:796406704065:data-sets/f71a3e9d8842cd9f32e0d889f71381bf' +PRODUCT_NAME='NPPES National Provider Identifier (NPI) Registry Data | CMS' +PRODUCT_ID='prod-zvfiqwlg2xsym' +SCHEDULE_CRON="cron(0 18 ? * 6 *)" +REGION='us-east-1' +PROFILE='default' + +echo "------------------------------------------------------------------------------" +echo "RearcDataPlatformRoleArn: $REARC_DATA_PLATFORM_ROLE_ARN" +echo "RearcDataPlatformExternalId: $REARC_DATA_PLATFORM_EXTERNAL_ID" +echo "CustomerId: $CUSTOMER_ID" +echo "AssetBucket: $ASSET_BUCKET" +echo "ManifestBucket: $MANIFEST_BUCKET" +echo "DataSetName: $DATASET_NAME" +echo "DataSetArn: $DATASET_ARN" +echo "ProductName: $PRODUCT_NAME" +echo "ProductID: $PRODUCT_ID" +echo "ScheduleCron: $SCHEDULE_CRON" +echo "Region: $REGION" +echo "PROFILE: $PROFILE" +echo "------------------------------------------------------------------------------" + + +# python pre-processing/pre-processing-code/source_data.py + +./init.sh \ + --rdp-role-arn "${REARC_DATA_PLATFORM_ROLE_ARN}" \ + --rdp-external-id "${REARC_DATA_PLATFORM_EXTERNAL_ID}" \ + --customer-id "${CUSTOMER_ID}" \ + --schedule-cron "${SCHEDULE_CRON}" \ + --asset-bucket "${ASSET_BUCKET}" \ + --manifest-bucket "${MANIFEST_BUCKET}" \ + --dataset-name "${DATASET_NAME}" \ + --product-name "${PRODUCT_NAME}" \ + --product-id "${PRODUCT_ID}" \ + --dataset-arn "${DATASET_ARN}" \ + --region "${REGION}" \ + --first-revision "false" \ + --profile "${PROFILE}" diff --git a/update.sh b/update.sh new file mode 100755 index 0000000..8c2bf22 --- /dev/null +++ b/update.sh @@ -0,0 +1,81 @@ +#!/usr/bin/env bash + +# Exit on error. Append "|| true" if you expect an error. +set -o errexit +# Exit on error inside any functions or subshells. +set -o errtrace +# Do not allow use of undefined vars. Use ${VAR:-} to use an undefined VAR +#set -o nounset +# Catch the error in case mysqldump fails (but gzip succeeds) in `mysqldump |gzip` +set -o pipefail +# Turn on traces, useful while debugging but commented out by default +# set -o xtrace + +# Sets profile variable to an empty value by default, reassigns in while loop below if it was included as a parameter +PROFILE="" + +while [[ $# -gt 0 ]]; do + opt="${1}" + shift; + current_arg="$1" + case ${opt} in + "-d"|"--dataset-name") export DATASET_NAME="$1"; shift;; + "-r"|"--region") export REGION="$1"; shift;; + "-f"|"--profile") PROFILE=" --profile $1"; shift;; + *) echo "ERROR: Invalid option: \""$opt"\"" >&2; exit 1;; + esac +done + +while [[ ${#DATASET_NAME} -gt 53 ]]; do + echo "dataset-name must be under 53 characters in length, enter a shorter name:" + read -p "New dataset-name: " DATASET_NAME + case ${#DATASET_NAME} in + [1-9]|[1-4][0-9]|5[0-3]) break;; + * ) echo "Enter in a shorter dataset-name";; + esac +done + +#get existing cloudformation stack +echo "getting existing CFN parameters" +CFN_STACK_NAME="producer-${DATASET_NAME}-preprocessing" +while read parameter_name parameter_value; do + echo "$parameter_name: $parameter_value" + case ${parameter_name} in + "S3Bucket") export S3_BUCKET="$parameter_value";; + "DataSetArn") export DATASET_ARN="$parameter_value";; + "ProductId") export PRODUCT_ID="$parameter_value";; + #Ignore these two because they were set manually already + "Region");; + "DataSetName");; + *) echo "ERROR: Invalid parameter found: \""$parameter_name"\", please update manually" >&2; exit 1;; + esac +done < <(aws cloudformation describe-stacks --stack-name $CFN_STACK_NAME --query 'Stacks[0].Parameters' --output text$PROFILE) + +#creating a pre-processing zip package, these commands may need to be adjusted depending on folder structure and dependencies +(cd pre-processing/pre-processing-code && zip -r pre-processing-code.zip . -x "*.dist-info/*" -x "bin/*" -x "**/__pycache__/*") + +#upload pre-preprocessing.zip to s3 +echo "uploading pre-preprocessing.zip to s3" +aws s3 cp pre-processing/pre-processing-code/pre-processing-code.zip s3://$S3_BUCKET/$DATASET_NAME/automation/pre-processing-code.zip --region $REGION$PROFILE + +#invoking the pre-processing lambda function to create first dataset revision +echo "updating the pre-processing lambda function code" +LAMBDA_FUNCTION_NAME="source-for-${DATASET_NAME}" +# AWS CLI version 2 changes require explicitly declairing `--cli-binary-format raw-in-base64-out` for the format of the `--payload` +aws lambda update-function-code --function-name $LAMBDA_FUNCTION_NAME --s3-bucket $S3_BUCKET --s3-key $DATASET_NAME/automation/pre-processing-code.zip$PROFILE +echo "updated lambda function code to use latest pre-processing.zip" + +#updating pre-processing cloudformation stack +echo "updating pre-processing cloudformation stack" +CFN_STACK_NAME="producer-${DATASET_NAME}-preprocessing" +aws cloudformation update-stack --stack-name $CFN_STACK_NAME --template-body file://pre-processing/pre-processing-cfn.yaml --parameters ParameterKey=S3Bucket,ParameterValue=$S3_BUCKET ParameterKey=DataSetName,ParameterValue=$DATASET_NAME ParameterKey=DataSetArn,ParameterValue=$DATASET_ARN ParameterKey=ProductId,ParameterValue=$PRODUCT_ID ParameterKey=Region,ParameterValue=$REGION --region $REGION --capabilities "CAPABILITY_AUTO_EXPAND" "CAPABILITY_NAMED_IAM" "CAPABILITY_IAM"$PROFILE + +echo "waiting for cloudformation stack update to complete" +aws cloudformation wait stack-update-complete --stack-name $CFN_STACK_NAME --region $REGION$PROFILE + +if [[ $? -ne 0 ]] +then + echo "Cloudformation stack update failed" + break +fi +echo "cloudformation stack update completed"