Load RDS from S3 parquet files using lambda function

Pathan
5 min readOct 12, 2023

--

Below concepts will be discussed

  1. Prerequisites
  2. Role permissions
  3. Lambda code
  4. Utilizing secret manager(best practice)
  5. Batch processing
  6. Test Lambda function
  7. Creating test event
  8. Trigger lambda using API call

Loading files from S3 to RDS SQL server can be tough, if you have very large files and lambda will never be a solution.

This solution works when the file sizes are small.

Prerequisites:

  1. AWS account
  2. S3 bucket
  3. RDS (SQL server)
  4. some parquet files to load

First of all, let’s set the AWS permissions

You need to have below permissions to the role you are using

Secret manager to manage the RDS password

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor",
"Effect": "Allow",
"Action": [
"secretsmanager:DescribeSecret",
"secretsmanager:PutSecretValue",
"secretsmanager:CreateSecret",
"secretsmanager:DeleteSecret",
"secretsmanager:CancelRotateSecret",
"secretsmanager:ListSecretVersionIds",
"secretsmanager:UpdateSecret",
"secretsmanager:GetRandomPassword",
"secretsmanager:GetResourcePolicy",
"secretsmanager:GetSecretValue",
"secretsmanager:StopReplicationToReplica",
"secretsmanager:ReplicateSecretToRegions",
"secretsmanager:RestoreSecret",
"secretsmanager:RotateSecret",
"secretsmanager:UpdateSecretVersionStage",
"secretsmanager:RemoveRegionsFromReplication",
"secretsmanager:ListSecrets"
],
"Resource": "*"
}
]
}

S3 bucket.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor",
"Effect": "Allow",
"Action": [
"s3:ListAccessPointsForObjectLambda",
"s3:DeleteAccessPoint",
"s3:DeleteAccessPointForObjectLambda",
"s3:PutLifecycleConfiguration",
"s3:DeleteObject",
"s3:CreateMultiRegionAccessPoint",
"s3:GetBucketWebsite",
"s3:GetMultiRegionAccessPoint",
"s3:PutReplicationConfiguration",
"s3:GetObjectAttributes",
"s3:InitiateReplication",
"s3:GetObjectLegalHold",
"s3:GetBucketNotification",
"s3:GetReplicationConfiguration",
"s3:DescribeMultiRegionAccessPointOperation",
"s3:PutObject",
"s3:PutBucketNotification",
"s3:CreateJob",
"s3:PutBucketObjectLockConfiguration",
"s3:GetStorageLensDashboard",
"s3:GetLifecycleConfiguration",
"s3:GetBucketTagging",
"s3:GetInventoryConfiguration",
"s3:GetAccessPointPolicyForObjectLambda",
"s3:ListBucket",
"s3:AbortMultipartUpload",
"s3:UpdateJobPriority",
"s3:PutBucketVersioning",
"s3:GetMultiRegionAccessPointPolicyStatus",
"s3:ListBucketMultipartUploads",
"s3:PutIntelligentTieringConfiguration",
"s3:PutMetricsConfiguration",
"s3:GetBucketVersioning",
"s3:GetAccessPointConfigurationForObjectLambda",
"s3:PutInventoryConfiguration",
"s3:GetMultiRegionAccessPointRoutes",
"s3:GetStorageLensConfiguration",
"s3:DeleteStorageLensConfiguration",
"s3:GetAccountPublicAccessBlock",
"s3:PutBucketWebsite",
"s3:ListAllMyBuckets",
"s3:PutBucketRequestPayment",
"s3:PutObjectRetention",
"s3:CreateAccessPointForObjectLambda",
"s3:GetBucketCORS",
"s3:GetObjectVersion",
"s3:PutAnalyticsConfiguration",
"s3:PutAccessPointConfigurationForObjectLambda",
"s3:GetObjectVersionTagging",
"s3:PutStorageLensConfiguration",
"s3:GetStorageLensConfigurationTagging",
"s3:ReplicateObject",
"s3:GetObjectAcl",
"s3:GetBucketObjectLockConfiguration",
"s3:DeleteBucketWebsite",
"s3:GetIntelligentTieringConfiguration",
"s3:GetObjectVersionAcl",
"s3:GetBucketPolicyStatus",
"s3:GetObjectRetention",
"s3:GetJobTagging",
"s3:ListJobs",
"s3:PutObjectLegalHold",
"s3:PutBucketCORS",
"s3:ListMultipartUploadParts",
"s3:GetObject",
"s3:DescribeJob",
"s3:PutBucketLogging",
"s3:GetAnalyticsConfiguration",
"s3:GetObjectVersionForReplication",
"s3:GetAccessPointForObjectLambda",
"s3:CreateAccessPoint",
"s3:GetAccessPoint",
"s3:PutAccelerateConfiguration",
"s3:SubmitMultiRegionAccessPointRoutes",
"s3:DeleteObjectVersion",
"s3:GetBucketLogging",
"s3:ListBucketVersions",
"s3:RestoreObject",
"s3:GetAccelerateConfiguration",
"s3:GetObjectVersionAttributes",
"s3:GetBucketPolicy",
"s3:PutEncryptionConfiguration",
"s3:GetEncryptionConfiguration",
"s3:GetObjectVersionTorrent",
"s3:GetBucketRequestPayment",
"s3:GetAccessPointPolicyStatus",
"s3:GetObjectTagging",
"s3:GetBucketOwnershipControls",
"s3:GetMetricsConfiguration",
"s3:GetBucketPublicAccessBlock",
"s3:GetMultiRegionAccessPointPolicy",
"s3:GetAccessPointPolicyStatusForObjectLambda",
"s3:ListAccessPoints",
"s3:PutBucketOwnershipControls",
"s3:DeleteMultiRegionAccessPoint",
"s3:ListMultiRegionAccessPoints",
"s3:UpdateJobStatus",
"s3:GetBucketAcl",
"s3:ListStorageLensConfigurations",
"s3:GetObjectTorrent",
"s3:GetBucketLocation",
"s3:GetAccessPointPolicy",
"s3:ReplicateDelete"
],
"Resource": "*"
}
]
}

Cloudwatch logs

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor",
"Effect": "Allow",
"Action": [
"logs:DescribeQueries",
"logs:CreateLogStream",
"logs:GetLogRecord",
"logs:GetQueryResults",
"logs:DescribeLogStreams",
"logs:StartQuery",
"logs:GetLogEvents",
"logs:StopQuery",
"logs:GetLogGroupFields",
"logs:DescribeQueryDefinitions",
"logs:CreateLogGroup",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}

Lambda

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "ARN here:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"ARN:log-group:log group here:*"
]
}
]
}

Now that permissions are setup, we need to load the files into S3.

Once the files are uploaded. Let’s create a lambda function

import boto3
import pandas as pd
import pyarrow.parquet as pq
from io import BytesIO
from sqlalchemy import create_engine
import boto3
import os
import pyodbc
import json
import awswrangler as wr
from botocore.exceptions import ClientError

def get_secret():
# we will create the connectionString below and use it in the process
global connectionString
server=os.environ.get('Server') # get the details from environment variables
db=os.environ.get('db') # get the details from environment variables
secret_name =os.environ.get('secret_name') # get the details from environment variables
region_name =os.environ.get('region_name') # get the details from environment variables

# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager', #don't change this
region_name=region_name
)

try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
raise e

response=json.loads(get_secret_value_response['SecretString'])
# Decrypts secret using the associated KMS key.
username = response.get('username') #change this variable according to the value you provided during secret creation
password = response.get('password')
connectionString= f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={db};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=yes;Connection Timeout=300;'


def s3_to_df(bucket,keys):
s3Client = boto3.client('s3')
file_objects = s3Client.list_objects_v2(Bucket=bucket, Prefix=keys)['Contents']
for file_object in file_objects:
file_key = file_object['Key']
print(file_key)
file_obj = s3Client.get_object(Bucket=bucket, Key=file_key)
parquet_file = pq.ParquetFile(BytesIO(file_obj['Body'].read()))
df = parquet_file.read().to_pandas()
return df


def lambda_handler(event, context):
#get our bucket and file name
db=os.environ.get('db')
bucket = event['Records'][0]['s3']['bucket']['name']
keys=event['Records'][0]['s3']['object']['key']
get_secret()

print(f"this is the file name {keys}")

# df = s3_to_df(bucket=bucket,keys=keys)
conn = pyodbc.connect(connectionString)
conn.execute(f"USE {db};")
#you can change the engine connection string to connect to other RDS as well.
engine = create_engine('mssql+pyodbc:///?odbc_connect={}'.format(connectionString))
s3Client = boto3.client('s3')
file_objects = s3Client.list_objects_v2(Bucket=bucket, Prefix=keys)['Contents']
table_name=get_table_name(keys)



for file_object in file_objects:
file_key = file_object['Key']
#there are two ways we can read the file.
# 1. use awsdatawrangler. This can help read the files in a single go
# 2. read bigger files in chunks and upload them in every batch

#method-1
dfs=wr.s3.read_parquet(path=[f"s3://{bucket}/{file_key}"])
#we can also pass chunked=True, so it will do it automatically or provide a number
dfs=wr.s3.read_parquet(path=[f"s3://{bucket}/{file_key}"], chunked=20_000)


for df in dfs:
df.to_sql('table_name',engine, if_exists='append', index=False)

#method-2
file_obj = s3Client.get_object(Bucket=bucket, Key=file_key)
parquet_file = pq.ParquetFile(BytesIO(file_obj['Body'].read()))
# you can change the number 100000 with your number of choice to batch them appropriately
for idx,batch in enumerate(parquet_file.iter_batches(100000)):
print(f"RecordBatch-{idx}")
batch_df = batch.to_pandas()
rows,columns =batch_df.shape
#batch is of 65536 records
#you can further loop this by using the rows variable above, so you can load fewer rows
batch_df.to_sql('table_name',engine, if_exists='append', index=False)

For this to execute we have to create layers.

These are the layers you need. Use the python3.9 runtime

  1. AWSSDKPandas-Python39 (this is available in AWS. This contains numpy and pandas)
  2. sqlalchemy
  3. pyodbc

To create a layer, follow the below steps

  1. create a folder in your local machine. If you are using ubuntu(linux), you can use these directly. If you are using windows, use WSL for linux capabilities to create these.
  2. removing the unnecessary folders reduce the size of layers. AWS has limitation on the size of the layers.
  3. If the layer is >10MB, load it to S3 and upload it to create a layer.
pip3 install sqlalchemy==2.0.21 --target python/
find . -name "tests" -type d | xargs -I{} rm -rf {}
find . -name "__pycache__" -type d | xargs -I{} rm -rf {}
find . -name "docs" -type d | xargs -I{} rm -rf {}
rm -rf boto*~
zip -r sqlalchemy_layer.zip python

Once the layers are ready, upload them to AWS lambda and you can run the lambda function.

To test the lambda function, you can either use a trigger, or create a test event.

Below is the process of creating a test event.

  1. Open AWS Lambda
  2. Click on the function you created
  3. In the code tab, click on down arrow just beside “Test” button
  4. Click on configure test event
  5. Click on create new event
  6. From template select appropriate trigger you need.
  7. Change the values in the json.
  8. save the event and trigger the lambda

You can call Lambda function from API. All you need to do is add a trigger to lambda function and select API gateway. Once you select API gateway, select HTTP and click okay.

Immediately you will get a link for the API gateway. You can call that API from your local machine(as long as you are in the same network) and trigger the API. You can call the API parallelly as well to get the respone. Below code can help create parallel threads

import requests
import asyncio
from timeit import default_timer
from concurrent.futures import ThreadPoolExecutor
import nest_asyncio
nest_asyncio.apply() #this suppresses event loop already started error
import json

START_TIME = default_timer()

def request(session, data,idx):
url = "https://xxxxxxx.amazonaws.com/default/lambda-function-name"
print(data)
with session.post(url,data=data,headers = {
'Content-Type': 'text/plain'
}) as response:
print(response)
output = response.text

if response.status_code != 200:
print("FAILURE::{0}".format(url))

elapsed_time = default_timer() - START_TIME
completed_at = "{:5.2f}s".format(elapsed_time)
print("{0:<30} {1:>20}".format(idx, completed_at))
return output

async def start_async_process():
print("{0:<30} {1:>20}".format("No", "Completed at"))
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
loop = asyncio.get_event_loop()
START_TIME = default_timer()
tasks = [
loop.run_in_executor(
executor,
request,
*(session,json.dumps({
"id": idx,
"file_name": i.get('file_name'),
"table_name":i.get('table_name'),
"database_name": i.get('database_name')
}),idx)
)
for idx,i in enumerate([{'file_name': 'sample-1.parquet',
'table_name': 'sample',
'database_name': 'test'},
{'file_name': 'sample-2.parquet',
'table_name': 'sample',
'database_name': 'test'}])
]
for response in await asyncio.gather(*tasks):
print(response)


if __name__ == "__main__":
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(start_async_process())
loop.run_until_complete(future)

--

--

Pathan
Pathan

Written by Pathan

SQL-DB Architect, Data Scientist, Learning full-stack development

No responses yet