LambdaでS3にあるCSVをマージしてS3に保存する方法まとめ

Lambda

Lambda で S3 に格納された複数の CSV を 1 個の CSV にマージして S3 に保存する処理を実装するのが意外と大変だったため実装したコードを紹介します。

スポンサーリンク

設定

pandas を使用するために Lambda Layer を利用します。

https://github.com/keithrozario/Klayers から deployments, Python のバージョン, リージョンの順に絞っていくと使いたい Lambda Layer の ARN が見つかります。

実装

ポイント

  • S3 からオブジェクトキーの一覧を取得し, マージしたい分だけに絞り込めるようにしています。
  • Boto3 の S3 クライアントで CSV のデータを取得し, バイトデータに変換した後に Pandas で DataFrame として読み込みます。
  • Pandas.concat() で DataFrame をマージします。
  • DataFrame を CSV のバイトデータに変換し, S3 に PUT します。

コード

import boto3
import pandas as pd
import io

BUCKET_NAME = 'your_s3_bucket'
PREFIX = 'your_s3_csv_object_key_prefix'
TERM = 4
OUTPUT_CSV_OBJECT_KEY = f'{PREFIX}/exclusion_users.csv'

s3 = boto3.resource('s3')
s3_client = boto3.client('s3')

def lambda_handler(event, context):
    # オブジェクトキーのリストを取得
    object_keys = fetch_object_keys(BUCKET_NAME, PREFIX)
    # 直近のオブジェクトキーのみに絞り込む
    recent_object_keys = object_keys[len(object_keys)-TERM:len(object_keys)]
    print(f'recent_object_keys: {recent_object_keys}')
    # S3 に格納された CSV ファイルを DataFrame に読み込む
    df = read_csv_files_from_s3_as_df(BUCKET_NAME, recent_object_keys)
    # DataFrame を S3 に書き込む
    write_csv_to_s3(df, BUCKET_NAME, OUTPUT_CSV_OBJECT_KEY)

def fetch_object_keys(bucket_name, prefix):
    """
    S3 からオブジェクトキー一覧を取得します。
    """
    bucket = s3.Bucket(bucket_name)

    object_keys = []
    marker = ''
    while  True:
        # オブジェクト取得
        objects = bucket.objects.filter(Marker=marker, Prefix=prefix, MaxKeys=1000)

        # オブジェクトキー取得
        last_key = None
        for obj in objects:
            object_keys.append(obj.key)
            last_key = obj.key

        # 最後のキーを marker にセットし次のオブジェクトを取得
        if last_key is None:
            break
        else:
            marker = last_key

    return sorted(object_keys)

def read_csv_files_from_s3_as_df(bucket_name, object_keys):
    """
    S3 に格納された CSV を 1 ファイルずつ読み込み 1 個の DataFrame にマージします。
    """
    df_list = []
    for object_key in object_keys:
        print(f'read csv file: "s3://{bucket_name}/{object_key}"')
        df_list.append(read_csv_file_from_s3_as_df(bucket_name, object_key))
    merged_df = pd.concat(df_list, axis=0, sort=True)
    return merged_df

def read_csv_file_from_s3_as_df(bucket_name, object_key):
    """
    S3 に格納された CSV ファイルを読み込み DataFrame に変換します。
    """
    # オブジェクトデータを取得
    s3_object = s3_client.get_object(Bucket=bucket_name, Key=object_key)
    # バイトデータに変換
    csv_data = io.BytesIO(s3_object['Body'].read())
    # ストリーム位置をリセット
    csv_data.seek(0)
    # DataFrameに変換
    df = pd.read_csv(csv_data, encoding='utf8')
    return df

def write_csv_to_s3(df, bucket_name, object_key):
    """
    DataFrame を CSV に変換し S3 に保存します。
    """
    buf = io.BytesIO()
    df.to_csv(buf, index=False)
    s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=buf.getvalue())
    print(f'merged recent csv file: "s3://{bucket_name}/{object_key}"')
スポンサーリンク
Lambda
Developer Note
タイトルとURLをコピーしました