Skip to content

Commit

Permalink
Merge pull request #2 from rearc-data/Ayush-General
Browse files Browse the repository at this point in the history
add all code
  • Loading branch information
ruchika817 committed Jul 23, 2021
2 parents b7e8dc8 + a40923d commit 53f41ba
Show file tree
Hide file tree
Showing 4 changed files with 427 additions and 0 deletions.
119 changes: 119 additions & 0 deletions pre-processing/pre-processing-cfn.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
---
AWSTemplateFormatVersion: '2010-09-09'
Transform: "AWS::Serverless-2016-10-31"
# CloudFormation template in YAML to setup an automated process to update revisions to datasets in AWS Data Exchange
Parameters:
S3Bucket:
Type: String
Default: rearc-data-provider
Description: Provide the S3 Bucket name where this dataset resides. For e.g. rearc-data-provider
DataSetName:
Type: String
Description: Name of the dataset
DataSetArn:
Type: String
Description: ARN for the AWS Data Exchange dataset that needs to be updated
ProductId:
Type: String
Description: ID of the AWS Data Exchange product that needs to be updated
Region:
Type: String
Default: us-east-1
Description: AWS Region for AWS Data Exchange

Resources:
# Create an IAM role for Lambda to use
LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Action:
- sts:AssumeRole
Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Version: 2012-10-17
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambdaExecute
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::796406704065:policy/AWSDataExchangeService
- arn:aws:iam::aws:policy/AWSMarketplaceSellerFullAccess
Path: /

# Create a Lambda function that will update daily revisions to AWS Data Exchange dataset
LambdaFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName:
!Join
- ''
- - 'source-for-'
- !Ref DataSetName
Description: "Source revision updates to AWS Data Exchange data set"
Runtime: "python3.7"
Code:
S3Bucket:
Fn::Sub: ${S3Bucket}
S3Key:
!Join
- ''
- - Fn::Sub: ${DataSetName}
- '/automation/pre-processing-code.zip'
Handler: "lambda_function.lambda_handler"
MemorySize: 256
Timeout: 120
Role:
Fn::GetAtt:
- LambdaRole
- Arn
Environment:
Variables:
S3_BUCKET:
Fn::Sub: ${S3Bucket}
DATA_SET_NAME:
Fn::Sub: ${DataSetName}
DATA_SET_ARN:
Fn::Sub: ${DataSetArn}
PRODUCT_ID:
Fn::Sub: ${ProductId}
REGION:
Fn::Sub: ${Region}

ScheduledRule:
Type: AWS::Events::Rule
Properties:
Description: "ScheduledRule"
ScheduleExpression: "cron(0 10 1/4 * ? *)"
State: "ENABLED"
Targets:
-
Arn:
Fn::GetAtt:
- "LambdaFunction"
- "Arn"
Id: "TargetFunctionV1"

PermissionForEventsToInvokeLambda:
Type: AWS::Lambda::Permission
Properties:
FunctionName:
Ref: "LambdaFunction"
Action: "lambda:InvokeFunction"
Principal: "events.amazonaws.com"
SourceArn:
Fn::GetAtt:
- "ScheduledRule"
- "Arn"

Outputs:
LambdaRoleARN:
Description: Role for Lambda execution.
Value:
Fn::GetAtt:
- LambdaRole
- Arn
LambdaFunctionName:
Value:
Ref: LambdaFunction
179 changes: 179 additions & 0 deletions pre-processing/pre-processing-code/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import time
import json
from source_data import source_dataset
import boto3
import os
from datetime import date, datetime
from multiprocessing.dummy import Pool

os.environ['AWS_DATA_PATH'] = '/opt/'

dataexchange = boto3.client(
service_name='dataexchange',
region_name=os.environ['REGION']
)

marketplace = boto3.client(
service_name='marketplace-catalog',
region_name=os.environ['REGION']
)

s3_bucket = os.environ['S3_BUCKET']
data_set_arn = os.environ['DATA_SET_ARN']
data_set_id = data_set_arn.split('/', 1)[1]
product_id = os.environ['PRODUCT_ID']
data_set_name = os.environ['DATA_SET_NAME']
new_s3_key = data_set_name + '/dataset/'
cfn_template = data_set_name + '/automation/cloudformation.yaml'
post_processing_code = data_set_name + '/automation/post-processing-code.zip'

today = date.today().strftime('%Y-%m-%d')
revision_comment = 'Revision Updates v' + today

if not s3_bucket:
raise Exception("'S3_BUCKET' environment variable must be defined!")

if not new_s3_key:
raise Exception("'DATA_SET_NAME' environment variable must be defined!")

if not data_set_arn:
raise Exception("'DATA_SET_ARN' environment variable must be defined!")

if not product_id:
raise Exception("'PRODUCT_ID' environment variable must be defined!")


def start_change_set(describe_entity_response, revision_arn):

# Call AWSMarketplace Catalog's start-change-set API to add revisions to the Product
change_details = {
'DataSetArn': data_set_arn,
'RevisionArns': [
revision_arn
]
}

change_set = [
{
'ChangeType': 'AddRevisions',
'Entity': {
'Identifier': describe_entity_response['EntityIdentifier'],
'Type': describe_entity_response['EntityType']
},
'Details': json.dumps(change_details)
}
]

response = marketplace.start_change_set(
Catalog='AWSMarketplace', ChangeSet=change_set)
return response


def jobs_handler(data):

# Used to store the Ids of the Jobs importing the assets to S3.
job_ids = set()

print('asset_list {} of {}:'.format(
data['job_num'], data['total_jobs']), data['asset_list'])

import_job = dataexchange.create_job(
Type='IMPORT_ASSETS_FROM_S3',
Details={
'ImportAssetsFromS3': {
'DataSetId': data_set_id,
'RevisionId': data['revision_id'],
'AssetSources': data['asset_list']
}
}
)

# Start the Job and save the JobId.
dataexchange.start_job(JobId=import_job['Id'])
job_ids.add(import_job['Id'])

# Iterate until all remaining jobs have reached a terminal state, or an error is found.
completed_jobs = set()

while job_ids != completed_jobs:
for job_id in job_ids:
if job_id in completed_jobs:
continue
get_job_response = dataexchange.get_job(JobId=job_id)
if get_job_response['State'] == 'COMPLETED':
print('JobId: {}, Job {} of {} completed'.format(
job_id, data['job_num'], data['total_jobs']))
completed_jobs.add(job_id)
if get_job_response['State'] == 'ERROR':
job_errors = get_job_response['Errors']
raise Exception(
'JobId: {} failed with errors:\n{}'.format(job_id, job_errors))
# Sleep to ensure we don't get throttled by the GetJob API.
time.sleep(.5)


def lambda_handler(event, context):
asset_list = source_dataset()
asset_lists = [asset_list[i:i+100] for i in range(0, len(asset_list), 100)]

if type(asset_lists) == list:

if len(asset_lists) == 0:
print(
'No need for a revision, all datasets included with this product are up to date')
return {
'statusCode': 200,
'body': json.dumps('No need for a revision, all datasets included with this product are up to date')
}

create_revision_response = dataexchange.create_revision(
DataSetId=data_set_id)
revision_id = create_revision_response['Id']
revision_arn = create_revision_response['Arn']

for idx in range(len(asset_lists)):
asset_lists[idx] = {
'asset_list': asset_lists[idx],
'revision_id': revision_id,
'job_num': str(idx + 1),
'total_jobs': str(len(asset_lists))
}

with (Pool(10)) as p:
p.map(jobs_handler, asset_lists)

update_revision_response = dataexchange.update_revision(
DataSetId=data_set_id,
RevisionId=revision_id,
Comment=revision_comment,
Finalized=True
)

revision_state = update_revision_response['Finalized']

if revision_state == True:
# Call AWSMarketplace Catalog's APIs to add revisions
describe_entity_response = marketplace.describe_entity(
Catalog='AWSMarketplace', EntityId=product_id)
start_change_set_response = start_change_set(
describe_entity_response, revision_arn)
if start_change_set_response['ChangeSetId']:
print('Revision updated successfully and added to the dataset')
return {
'statusCode': 200,
'body': json.dumps('Revision updated successfully and added to the dataset')
}
else:
print('Something went wrong with AWSMarketplace Catalog API')
return {
'statusCode': 500,
'body': json.dumps('Something went wrong with AWSMarketplace Catalog API')
}
else:
print('Revision did not complete successfully')
return {
'statusCode': 500,
'body': json.dumps('Revision did not complete successfully')
}
else:
raise Exception('Something went wrong when uploading files to s3')
49 changes: 49 additions & 0 deletions pre-processing/pre-processing-code/s3_md5_compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Compare the md5 of a file to the s3 etag md5
# Source: li xin on StackOverflow
# https://stackoverflow.com/questions/1775816/how-to-get-the-md5sum-of-a-file-on-amazons-s3

import hashlib
import botocore.exceptions


def md5_checksum(filename):
m = hashlib.md5()
with open(filename, 'rb') as f:
for data in iter(lambda: f.read(1024 * 1024), b''):
m.update(data)
return m.hexdigest()


def etag_checksum(filename, chunk_size=8 * 1024 * 1024):
md5s = []
with open(filename, 'rb') as f:
for data in iter(lambda: f.read(chunk_size), b''):
md5s.append(hashlib.md5(data).digest())
m = hashlib.md5(b"".join(md5s))
return '{}-{}'.format(m.hexdigest(), len(md5s))


def etag_compare(filename, etag):
et = etag[1:-1] # strip quotes
if '-' in et and et == etag_checksum(filename):
return False
if '-' not in et and et == md5_checksum(filename):
return False
return True


def md5_compare(s3, bucket_name, s3_key, filename):
# Get the file metadata from s3
# If the file does not exist, return True for changes found
try:
obj_dict = s3.head_object(Bucket=bucket_name, Key=s3_key)
except botocore.exceptions.ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
return True

etag = (obj_dict['ETag'])

md5_matches = etag_compare(filename, etag)

return md5_matches
Loading

0 comments on commit 53f41ba

Please sign in to comment.