Lambda で S3 に格納された複数の CSV を 1 個の CSV にマージして S3 に保存する処理を実装するのが意外と大変だったため実装したコードを紹介します。
設定
pandas を使用するために Lambda Layer を利用します。
https://github.com/keithrozario/Klayers から deployments, Python のバージョン, リージョンの順に絞っていくと使いたい Lambda Layer の ARN が見つかります。
実装
ポイント
- S3 からオブジェクトキーの一覧を取得し, マージしたい分だけに絞り込めるようにしています。
- Boto3 の S3 クライアントで CSV のデータを取得し, バイトデータに変換した後に Pandas で DataFrame として読み込みます。
- Pandas.concat() で DataFrame をマージします。
- DataFrame を CSV のバイトデータに変換し, S3 に PUT します。
コード
import boto3
import pandas as pd
import io
BUCKET_NAME = 'your_s3_bucket'
PREFIX = 'your_s3_csv_object_key_prefix'
TERM = 4
OUTPUT_CSV_OBJECT_KEY = f'{PREFIX}/exclusion_users.csv'
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
def lambda_handler(event, context):
# オブジェクトキーのリストを取得
object_keys = fetch_object_keys(BUCKET_NAME, PREFIX)
# 直近のオブジェクトキーのみに絞り込む
recent_object_keys = object_keys[len(object_keys)-TERM:len(object_keys)]
print(f'recent_object_keys: {recent_object_keys}')
# S3 に格納された CSV ファイルを DataFrame に読み込む
df = read_csv_files_from_s3_as_df(BUCKET_NAME, recent_object_keys)
# DataFrame を S3 に書き込む
write_csv_to_s3(df, BUCKET_NAME, OUTPUT_CSV_OBJECT_KEY)
def fetch_object_keys(bucket_name, prefix):
"""
S3 からオブジェクトキー一覧を取得します。
"""
bucket = s3.Bucket(bucket_name)
object_keys = []
marker = ''
while True:
# オブジェクト取得
objects = bucket.objects.filter(Marker=marker, Prefix=prefix, MaxKeys=1000)
# オブジェクトキー取得
last_key = None
for obj in objects:
object_keys.append(obj.key)
last_key = obj.key
# 最後のキーを marker にセットし次のオブジェクトを取得
if last_key is None:
break
else:
marker = last_key
return sorted(object_keys)
def read_csv_files_from_s3_as_df(bucket_name, object_keys):
"""
S3 に格納された CSV を 1 ファイルずつ読み込み 1 個の DataFrame にマージします。
"""
df_list = []
for object_key in object_keys:
print(f'read csv file: "s3://{bucket_name}/{object_key}"')
df_list.append(read_csv_file_from_s3_as_df(bucket_name, object_key))
merged_df = pd.concat(df_list, axis=0, sort=True)
return merged_df
def read_csv_file_from_s3_as_df(bucket_name, object_key):
"""
S3 に格納された CSV ファイルを読み込み DataFrame に変換します。
"""
# オブジェクトデータを取得
s3_object = s3_client.get_object(Bucket=bucket_name, Key=object_key)
# バイトデータに変換
csv_data = io.BytesIO(s3_object['Body'].read())
# ストリーム位置をリセット
csv_data.seek(0)
# DataFrameに変換
df = pd.read_csv(csv_data, encoding='utf8')
return df
def write_csv_to_s3(df, bucket_name, object_key):
"""
DataFrame を CSV に変換し S3 に保存します。
"""
buf = io.BytesIO()
df.to_csv(buf, index=False)
s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=buf.getvalue())
print(f'merged recent csv file: "s3://{bucket_name}/{object_key}"')