How to load local files to AWS Redshift using Python.
Rodolfo Marcos
Sr Data / Full-Stack Engineer | Tech Writer | Google cloud certified | AWS @ Freestar
In this tutorial, you'll learn how to upload local files to an AWS Redshift Serverless table using a Python script. This is a basic solution for those who want to load some data from a local machine to Redshift.
Overview
Welcome to this tutorial! In the next few minutes, I’ll guide you through the setup and uploading files to Amazon Redshift, a powerful and fully managed data warehouse service. Let’s dive in and harness the potential of Amazon Redshift for your data needs!
Process Overview
Our Python script will
Prerequisites
3. Return to the user screen and select the new user, in the Security Credentials → Create a new Access Key. Keep its name and value saved somewhere since we'll need it later.
4. Again, inside Security Credentials grant access to the AWS console to the new user by creating a password in the Console sign-in section, keep it noted somewhere. You’ll use it in the next step.
5. Once the user is created you need to access the Redshift Serverless workspace for the first time as the new user to have it created in the Redshift database.
Open an incognito window in your browser and log in as the new user with the username and password you saved before. Once you are logged in, go to the Redshift Serverless clusters and access the RedShift Query Editor. Then you can check if your new IAM user was created in your Redshift database by running
SELECT usename FROM pg_user;
where you should see the new user as IAM:RedshiftServerlessUser.
Now, your new user is created in the Redshift database. Keep the incognito window apart and log in again with your admin credentials in the AWS console
Inside the Redshift console, run the following commands to grant access to the database and table to the new user.
grant usage on schema public to "IAM:RedshiftServerlessUser";
grant select on all tables in schema public to "IAM:RedshiftServerlessUser";
Now, the new user has access to the Redshift database and tables. Later, you will access AWS on your local machine as this new user.
Local Authentication
AWS credentials files play a crucial role in authenticating and authorizing access to AWS services. These files, often located in the user’s home directory, store essential security credentials such as access keys and secret keys. By managing these credentials, users can securely interact with AWS resources, enabling seamless and controlled access to cloud services.
Create credentials file
1. Create a new file named credentials (You don't need to specify an extension) at /Users/<Username>/.aws/credentials (Mac Users) or C:\Users\ USERNAME \.aws\credentials (Windows Users). You don't need the extension.
2. The file content should be as follows; replace the access key ID and secret access key with the previous saved values.
领英推荐
[default]
aws_access_key_id=<Key ID>
aws_secret_access_key=<Access Key>
Python Script
Now that we have everything set up. Let's start developing the script that runs our data process.
import sys
import logging
import boto3
import time
S3_BUCKET = '<S3 bucket name>' #Your S3 Bucket Name
TEMP_FILE_PREFIX = 'redshift_data_upload' #Temporary file prefix
REDSHIFT_WORKGROUP = '<Redshift Workgroup Name>'
REDSHIFT_DATABASE = '<Redshift Database>' #default "dev"
MAX_WAIT_CYCLES = 5
2. Create the function that uploads the local files to the S3 bucket.
def load_file_to_s3(local_file_path, temp_file_name):
s3 = boto3.resource('s3')
# Save the file in S3 with a temporary naming convention.
s3.Object(S3_BUCKET, temp_file_name).put(Body=open(local_file_path, 'rb'))
logging.info(f'Uploaded {local_file_path} to S3')
3. Create the function that runs the Redshift SQL statement
def run_redshift_statement(sql_statement):
"""
Generic function to handle redshift statements (DDL, SQL..),
it retries for the maximum MAX_WAIT_CYCLES.
Returns the result set if the statement return results.
"""
res = client.execute_statement(
Database=REDSHIFT_DATABASE,
WorkgroupName=REDSHIFT_WORKGROUP,
Sql=sql_statement
)
# DDL statements such as CREATE TABLE doesn't have result set.
has_result_set = False
done = False
attempts = 0
while not done and attempts < MAX_WAIT_CYCLES:
attempts += 1
time.sleep(1)
desc = client.describe_statement(Id=res['Id'])
query_status = desc['Status']
if query_status == "FAILED":
raise Exception('SQL query failed: ' + desc["Error"])
elif query_status == "FINISHED":
done = True
has_result_set = desc['HasResultSet']
else:
logging.info("Current working... query status is: {} ".format(query_status))
if not done and attempts >= MAX_WAIT_CYCLES:
raise Exception('Maximum of ' + str(attempts) + ' attempts reached.')
if has_result_set:
data = client.get_statement_result(Id=res['Id'])
return data
4. Create the Redshift Table
def create_redshift_table():
create_table_ddl = """
CREATE TABLE IF NOT EXISTS public.employee (
id integer NOT NULL ENCODE az64,
name character varying(100) NOT NULL ENCODE lzo,
birth_date character varying(10) NOT NULL ENCODE lzo,
occupation character varying(100) NOT NULL ENCODE lzo,
gender character varying(1) NOT NULL ENCODE lzo
) DISTSTYLE AUTO;
"""
run_redshift_statement(create_table_ddl)
logging.info('Table created successfully.')
5. Import file content to S3 Table
def import_s3_file(file_name):
"""
Loads the content of the S3 temporary file into the Redshift table.
"""
load_data_ddl = f"""
COPY employee
FROM 's3://{S3_BUCKET}/{file_name}'
DELIMITER ','
IGNOREHEADER as 1
REGION 'us-east-2'
IAM_ROLE default;
"""
run_redshift_statement(load_data_ddl)
logging.info('Imported S3 file to Redshift.')
6. Query data from the Redshift table
def query_redshift_table():
# You can use your own SQL to fetch data.
select_sql = 'SELECT * FROM employee;'
data = run_redshift_statement(select_sql)
print(data['Records']);
7. After all methods are defined, let's tie them together to create the final logic. Paste the following code at the end of your script then save it <your_script_name>.py
# Main Logic
client = boto3.client('redshift-data', region_name='us-east-2')
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logging.info('Process started')
local_file_path = sys.argv[1]
temp_file_name = TEMP_FILE_PREFIX + '_' + str(uuid.uuid1()) + '.csv'
load_file_to_s3(local_file_path, temp_file_name)
create_redshift_table()
import_s3_file(temp_file_name)
query_redshift_table()
logging.info('Process finished')
Finally, run your script in the terminal (Be sure to be in the same directory as your final script). Place the actual file name you want to upload as the first argument. In the following example, the file sample_20240101.csv is in the same folder as the <your_script_name>.py script.
python <your_script_name>.py ./sample_20240101.csv
Once you run it you should have in your terminal the output status of each step and the final result of the SQL query result. You're done!
Employee file content example (sample_20240101.csv)
Congratulations!
In this article, you learned how to upload local file contents to Redshift serverless cluster tables and download the result of a SELECT statement.
Post-notes
You can connect me on Linkedin at: https://www.dhirubhai.net/in/rodolfo-marcos-41ab3198/
Thank you so much! I hope it will be useful for you!