Load files from S3 to RDS using AWS Glue

Pathan
3 min readOct 17, 2023

--

AWS Glue is an excellent serverless service which helps in loading data from S3 to RDS.

There can be better ways to implement this solution. This code is not production ready.

Scope of this article:

  1. Configure Glue service
  2. Configure data connections
  3. Read files from S3
  4. Load into RDS

Not in scope of this article: ( I will try to add details as I explore further)

  1. Triggering glue job
  2. Monitoring glue job

Configuring Glue service:

Glue job can be created in multiple ways.

  1. Visual
  2. Spark script editor
  3. Python shell script editor
  4. Jupyter notebook
  5. Ray script editor

I have chosen Jupyter notebook for this article.

Glue has provided magic commands which starts with % which can be used within the notebook. A script tab will get updated in parallel as we update the notebook commands

Glue charges based on the usage in DBU.

Configure Data Connections:

There are few points to consider. Glue cannot connect to S3 directly. It cannot access S3 files. It requires S3 VPC endpoint to be created in the same region and VPC as Glue, so it can connect to S3. Similarly the RDS should also be in the same VPC and security groups have to be attached.

  1. Click on create a new data connection
  2. select type as JDBC
  3. If the RDS is in the same VPC, it will be available in the JDBC URL as drop down, if not, we need to select the same VPC(in the network options below in that page)
  4. Use username and password or AWS secrets Manager(suggested)
  5. Choose the VPC
  6. Choose the subnet
  7. Choose the security groups which gives glue access to RDS
  8. Click on save changes
  9. Once done, click on create job, to create a job from data connections. If you already have a job, you can use magic command to attach this connection using %connections <connection_name>

Read files from S3:

Without S3 VPC endpoint, Glue will not be able to read files. That end point should be within the same VPC as Glue and should be accessible in the role.

Permissions for S3 bucket

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor",
"Effect": "Allow",
"Action": [
"s3:DeleteObject",
"s3:GetBucketWebsite",
"s3:GetObjectAttributes",
"s3:GetBucketNotification",
"s3:PutObject",
"s3:PutBucketNotification",
"s3:CreateJob",
"s3:PutBucketObjectLockConfiguration",
"s3:ListBucket",
"s3:GetAccountPublicAccessBlock",
"s3:ListAllMyBuckets",
"s3:GetObjectVersion",
"s3:GetBucketObjectLockConfiguration",
"s3:PutObjectLegalHold",
"s3:PutBucketCORS",
"s3:ListMultipartUploadParts",
"s3:GetObject"
],
"Resource": "*"
}
]
}

Permissions for secret manager to get password to connect to RDShere

Permissions for Glue

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "arn here:role/rolename here"
}
]
}

Permissions for code whisperer. This helps in code completion suggestions

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CodeWhispererPermissions",
"Effect": "Allow",
"Action": [
"codewhisperer:GenerateRecommendations"
],
"Resource": "*"
}
]
}

AWSGlueServiceRole → here

AWSGlueServiceNotebookRole → here

Read files from S3:

s3_client = boto3.client('s3')
response=s3_client.list_objects_v2(Bucket=bucket_name_here,Prefix=prefix)
files=response.get("Contents")

get secret:

import os
import boto3
from botocore.exceptions import ClientError
import json
def get_secret(database_name,table_name):
#accepting two parameters while loading a particular table in required database
global connectionString
secret_name = <secret-name-here>
region_name =<secret-location-here>

# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)

try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
response=json.loads(get_secret_value_response['SecretString'])
# Decrypts secret using the associated KMS key.
username = response.get('username') # replace this exactly as how you created in secrets manager
password = response.get('password') # replace this exactly as how you created in secrets manager
host=response.get('host')
engine=response.get('engine')
port=response.get('port')
connectionString= {"url": f"jdbc:{engine}://{host}:{port};database={database_name}", "user":username,"password":password,"dbtable": table_name}
except ClientError as e:
raise e

Load data to RDS

import boto3
import time

#prefixes are the way to organize folders in S3
prefix=f"add_prefixes_here/{folder_name}"

mapping={
'folder_1':"table_1",
'folder_2':"table_2",
'folder_3':"table_3",
'folder_4':"table_4"
}
#below function will publish connectionString variable and that will be used in SQL DB connection
get_secret(database_name,mapping.get(folder_name))

s3_client = boto3.client('s3') # this step fails if S3 VPC is not configured.
response=s3_client.list_objects_v2(Bucket=<bucket_name>,Prefix=prefix)
files=response.get("Contents")

all_files=[f"s3://bucket-name/{file.get('Key')}" for file in files if file.get('Key').endswith('.parquet')]
print(f"FILE SUMMARY: Total files {len(all_files)}")
partition=[]
# you can change this number based on your need
# this helps in loading the files incrementally, without requiring more resources
for i in range(0,len(all_files),10):
partition.append(all_files[i:i+10])
full_time_start=time.time()
for idx,i in enumerate(partition):
#below step creates dynamic frame
dynamicFrame = glueContext.create_dynamic_frame.from_options(
connection_type = "s3",
connection_options = {"paths": i},
format = "parquet"
)
start = time.time()
# we will write to SQL server directly
# if you don't have secrets manager you can use the below code as connectionString
#{"url": "jdbc:sqlserver://{host}:{port};database={database_name}", "user":username,"password":password,"dbtable": table_name}
glueContext.write_dynamic_frame.from_options(frame=dynamicFrame, connection_type="sqlserver", connection_options=connectionString)
end = time.time()
elapsed='%.2f'%(end - start)
print(f"Batch-{idx+1}/{len(partition)} - {elapsed}secs")
full_end_time=time.time()

print(f"LOAD SUMMARY: Total time {'%.2f'%(full_end_time-full_time_start)} secs")

Please note glue will not provide logs while executing. It will provide the logs once the step is complete.

I will try to add trigger, schedule in future. Thank you for reading.

--

--

Pathan
Pathan

Written by Pathan

SQL-DB Architect, Data Scientist, Learning full-stack development

No responses yet