LambdaでCSVをDynamoDBにインポートする方法まとめ

Lambda

AWS コンソールから DynamoDB に複数件のデータを投入するのが面倒なので CSV を DynamoDB にインポートする Lambda を実装しました。
CloudFormation テンプレートを用意したのでスタックを作成したらすぐ使えます。

スポンサーリンク

CloudFormation テンプレート

  • S3 から CSV を読み込みます。
  • 数値型のフィールドは Python で辞書のフィールドを数値型に戻す必要があります。
  • 同様に list 型のフィールドも Python で辞書にフィールドを list 型に戻す必要があります。
  • batch_write() で 100 件ごとに DynamoDB に投入します。

AWSTemplateFormatVersion: '2010-09-09'

Parameters:
  System:
    Type: String
    Default: sample
    Description: System name.
  Environment:
    Type: String
    Default: dev
    AllowedValues:
      - dev
      - qa
      - prod
    Description: Environment name. Choose from [dev, qa, prod].

Resources: 
  CsvToDDBLambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub "${Environment}-${System}-csv-to-dynamodb"
      Handler: index.lambda_handler
      Role:
          # Lambda 用の IAM Role を指定します。
          Role: arn:aws:iam::012345678901:role/your-lambda-role
      Code:
        ZipFile: |
          import json
          import boto3
          import os
          import csv
          import codecs
          import sys
          import traceback

          s3 = boto3.resource('s3')
          dynamodb = boto3.resource('dynamodb')

          def lambda_handler(event, context):

              numeric_fields = event['numeric_fields']
              list_fields = event['list_fields']
              targets = event['targets']
              for target in targets:
                  bucket = target['bucket']
                  key = target['key']
                  tableName = target['table']

                  #get() does not store in memory
                  try:
                      obj = s3.Object(bucket, key).get()['Body']
                  except:
                      print(f"S3 Object s3://{bucket}/{key} could not be opened. Check request parameters.")
                      return {
                          'statusCode': 404,
                          'body': json.dumps(f'S3 Object s3://{bucket}/{key} could not be opened. Check request parameters.')
                      }
                  try:
                      table = dynamodb.Table(tableName)
                      # 遅延評価のためテーブル情報取得で存在確認
                      print(table.load())
                  except Exception as e:
                      print(traceback.format_exc())
                      print(f"Error loading DynamoDB table {tableName}. Check if table was created correctly and request parameters.")
                      return {
                          'statusCode': 404,
                          'body': json.dumps(f'Error loading DynamoDB table {tableName}. Check if table was created correctly and request parameters.')
                      }

                  batch_size = 100
                  batch = []

                  #DictReader is a generator; not stored in memory
                  for row in csv.DictReader(codecs.getreader('utf-8')(obj)):
                      # 数値が文字列として扱われてしまい DynamoDB に投入出来ないため int に変換する必要がある
                      # パラメータとして渡された int に変換したいカラムを文字列から辞書に変換し OrderedDict を更新する
                      for numeric_field in numeric_fields:
                          numeric_field_value = int(row[numeric_field])
                          numeric_field_dict = eval(f'{{"{numeric_field}": {numeric_field_value}}}')
                          row.update(numeric_field_dict)
                      # 以下は JSON 文字列のリストが来る想定で実装しています。必要に応じて修正してください。
                      for list_field in list_fields:
                          list_field_value = eval(row[list_field])
                          l = []
                          for x in list_field_value:
                              j = json.dumps(x)
                              l.append(j)
                          list_field_dict = eval(f'{{"{list_field}": {l}}}')
                          row.update(list_field_dict)
                      if len(batch) >= batch_size:
                          write_to_dynamo(tableName, batch)
                          batch.clear()

                      batch.append(row)

                  if batch:
                      write_to_dynamo(tableName, batch)

              return {
                  'statusCode': 200,
                  'body': json.dumps('Uploaded to DynamoDB Table')
              }


          def write_to_dynamo(tableName, rows):
              try:
                  table = dynamodb.Table(tableName)
                  # 遅延評価のためテーブル情報取得で存在確認
                  print(table.load())
              except Exception as e:
                  print(traceback.format_exc())
                  print(f"Error loading DynamoDB table {tableName}. Check if table was created correctly and request parameters.")
                  return {
                      'statusCode': 404,
                      'body': json.dumps(f'Error loading DynamoDB table {tableName}. Check if table was created correctly and request parameters.')
                  }

              try:
                  with table.batch_writer() as batch:
                      for i in range(len(rows)):
                          batch.put_item(
                              Item=rows[i]
                          )
              except Exception as e:
                  print(traceback.format_exc())
                  print(f"Error executing batch_writer. DynamoDB table {tableName}")
                  return {
                      'statusCode': 500,
                      'body': json.dumps(f'Error executing batch_writer. DynamoDB table {tableName}')
                  }
      Runtime: python3.7
      Timeout: 900
      MemorySize: 512

Lambda 実行方法

  • イベント JSON に以下のようなパラメータを入力し [テスト] を実行します。
    • 数値型のフィールドがある場合は numeric_fields にフィールド名を入れてください。
    • list 型のフィールドがある場合は list_fields にフィールド名を入れてください。
    • bucket には CSV が格納されている S3 のバケット名を入力してください。
    • key には CSV のオブジェクトキを入力してください。
    • table には投入先の DynamoDB テーブル名を入力してください。
{
  "numeric_fields": [ ],
  "list_fields": [ ],
  "targets": [
    {
      "bucket": "your-bucket",
      "key": "example/your-csv.csv",
      "table": "your-dynamodb-table"
    }
  ]
}
タイトルとURLをコピーしました