Development/Python

automatically upload AWS WorkDocs files to specific AWS S3 in AWS cross account

gepp 2023. 2. 16. 13:30

 

It's a sample python code in AWS Lambda based on AWS documentation

import json
import boto3
import requests
import logging

sns_client = boto3.client('sns')
ssm_client = boto3.client('ssm')
workdocs_client = boto3.client('workdocs')
s3_client = boto3.client('s3')
sns_topic_arn = 'arn:aws:sns:ap-northeast-2:[AWS account number]:[your sns]'

logger = logging.getLogger()
logger.setLevel(logging.INFO)

## The function to confirm the subscription from Amazon Workdocs
def confirmsubscription (topicArn, subToken):
    try:
        response = sns_client.confirm_subscription(
            TopicArn=topicArn,
            Token=subToken
        )
        logger.info ("Amazon Workdocs Subscripton Confirmaiton Message : " + str(response)) 
    except Exception as e:
        logger.error("Error with subscription confirmation : " + " Exception Stacktrace : " + str(e) )
# This would result in failing the AWS Lambda function and the event will be retried.
# One of the mechanism to handle retries would be to configure Dead Letter Queue (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html) as part of the Amazon SQS service.
# Another mechanism could be to skip raising the error and Amazon Cloudwatch can be used to detect logged error messages to collect error metrics and trigger corresponding retry process.
        raise Exception("Error Confirming Subscription from Amazon Workdocs")
    
def copyFileworkdocstos3 (documentid):

    # ssm parameter code
    # Reading the Amazon S3 prefixes to Amazon Workdocs folder id mapping, Bucket Name and configured File Extensions from AWS System Manager.
    try:
        bucketnm = str(ssm_client.get_parameter(Name='/[your_bucket_param]')['Parameter']['Value'])
        folder_ids = json.loads(ssm_client.get_parameter(Name='/[your workdocs folder id param]')['Parameter']['Value'])
        file_exts = str(json.loads(ssm_client.get_parameter(Name='/[your workdocs extension param]')['Parameter']['Value'])['file_ext']).split(",")
        
        logger.info ("Configured Amazon S3 Bucket Name : " + bucketnm)
        logger.info ("Configured Folder Ids to be synced : : " + str(folder_ids))
        logger.info ("Configured Supported File Extensions : " + str(file_exts))

        resp_doc = workdocs_client.get_document (DocumentId = documentid)
        logger.info ("Amazon Workdocs Metadata Response : " + str(resp_doc))
        
        # Retrieving the Amazon Workdocs Metadata
        parentfolderid = str(resp_doc['Metadata']['ParentFolderId'])
        docversionid = str(resp_doc['Metadata']['LatestVersionMetadata']['Id'])
        docname = str(resp_doc['Metadata']['LatestVersionMetadata']['Name'])
        
        logger.info ("Amazon Workdocs Parent Folder Id : " + parentfolderid)
        logger.info ("Amazon Workdocs Document Version Id : " + docversionid)
        logger.info ("Amazon Workdocs Document Name : " + docname)
        
        prefix_path = folder_ids.get(parentfolderid, None)
        logger.info ("Retrieving Amaozn S3 Prefix Path : " + prefix_path)
        
        ## Currently the provided sample code supports syncing documents for the configured Amazon Workdocs Folder Ids in AWS System Manager and not for the sub-folders.
        ## It can be extended to supported syncing documents for the sub-folders.
        if ( (prefix_path != None) and (docname.endswith( tuple(file_exts) )) ):
            resp_doc_version = workdocs_client.get_document_version (DocumentId = documentid,
                                                     VersionId= docversionid,
                                                     Fields = 'SOURCE'
            )
            logger.info ("Retrieve Amazon Workdocs Document Latest Version Details : " + str(resp_doc_version))
            
            ## Retrieve Amazon Workdocs Download Url
            url = resp_doc_version["Metadata"]["Source"]["ORIGINAL"]
            logger.info ("Amazon Workdocs Download url : " + url)
            ## Retrieve Amazon Workdocs Document contents
            ## As part of this sample code, we are reading the document in memory but it can be enhanced to stream the document in chunks to Amazon S3 to improve memory utilization 
            workdocs_resp = requests.get(url)
            ## Uploading the Amazon Workdocs Document to Amazon S3
            response = s3_client.put_object(
                Body=bytes(workdocs_resp.content),
                Bucket=bucketnm,
                Key=f'{prefix_path}/{docname}',
            )
            logger.info ("Amazon S3 upload response : " + str(response))
        else:
            logger.info ("Unsupported File type")
    except Exception as e:
        logger.error("Error with processing Document : " + str(documentid) + " Exception Stacktrace : " + str(e) )
# This would result in failing the AWS Lambda function and the event will be retried.
# One of the mechanism to handle retries would be to configure Dead Letter Queue (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html) as part of the Amazon SQS service.
# Another mechanism could be to skip raising the error and Amazon Cloudwatch can be used to detect logged error messages to collect error metrics and trigger corresponding retry process.
        raise Exception("Error Processing Amazon Workdocs Events.")
    
    
def lambda_handler(event, context):
    sts_connection = boto3.client('sts') #added from here
    acct_b = sts_connection.assume_role(
        RoleArn="arn:aws:iam::[cross account number]:role/[your sts role]",
        RoleSessionName="cross_acct_lambda"
    )
    
    ACCESS_KEY = acct_b['Credentials']['AccessKeyId']
    SECRET_KEY = acct_b['Credentials']['SecretAccessKey']
    SESSION_TOKEN = acct_b['Credentials']['SessionToken']

    # create service client using the assumed role credentials, e.g. S3
    client = boto3.client(
        's3',
        aws_access_key_id=ACCESS_KEY,
        aws_secret_access_key=SECRET_KEY,
        aws_session_token=SESSION_TOKEN,
    )
    s3_client = client
    #to here
    
    
    logger.info ("Event Recieved from Amazon Workdocs : " + str(event))
        
    msg_body = json.loads(str(event['Records'][0]['body']))

    ## To Process Amazon Workdocs Subscription Confirmation Event
    if msg_body['Type'] == 'SubscriptionConfirmation':
        confirmsubscription (msg_body['TopicArn'], msg_body['Token'])
    ## To Process Amazon Workdocs Notifications
    elif (msg_body['Type'] == 'Notification') :
        event_msg = json.loads(msg_body['Message'])
        ## To Process Amazon Workdocs Move Document Event
        if (event_msg['action'] == 'move_document'):
            copyFileworkdocstos3 (event_msg['entityId'])
        ## To Process Amazon Workdocs Upload Document when a new version of the document is updated
        elif (event_msg['action'] == 'upload_document_version'):
            copyFileworkdocstos3 (event_msg['parentEntityId'])
        else:
        ## Currently the provided sample code supports two Amazon Workdocs Events but it can be extended to process other Amazon Workdocs Events.
        ## Refer this link for details on other supported Amazon Workdocs https://docs.aws.amazon.com/workdocs/latest/developerguide/subscribe-notifications.html.
            logger.info("Unsupported Action Type")
    else:
    ## Currently the provided sample code supports two Amazon Workdocs Events but it can be extended to process other Amazon Workdocs Events.
    ## Refer this link for details on other supported Amazon Workdocs https://docs.aws.amazon.com/workdocs/latest/developerguide/subscribe-notifications.html.
        logger.info("Unsupported Event Type")
   
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Amazon Workdoc sync to Amazon S3 Lambda!')
    }