AWS コンソールから DynamoDB に複数件のデータを投入するのが面倒なので CSV を DynamoDB にインポートする Lambda を実装しました。
CloudFormation テンプレートを用意したのでスタックを作成したらすぐ使えます。
CloudFormation テンプレート
- S3 から CSV を読み込みます。
- 数値型のフィールドは Python で辞書のフィールドを数値型に戻す必要があります。
- 同様に list 型のフィールドも Python で辞書にフィールドを list 型に戻す必要があります。
- batch_write() で 100 件ごとに DynamoDB に投入します。
AWSTemplateFormatVersion: '2010-09-09'
Parameters:
System:
Type: String
Default: sample
Description: System name.
Environment:
Type: String
Default: dev
AllowedValues:
- dev
- qa
- prod
Description: Environment name. Choose from [dev, qa, prod].
Resources:
CsvToDDBLambdaFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub "${Environment}-${System}-csv-to-dynamodb"
Handler: index.lambda_handler
Role:
# Lambda 用の IAM Role を指定します。
Role: arn:aws:iam::012345678901:role/your-lambda-role
Code:
ZipFile: |
import json
import boto3
import os
import csv
import codecs
import sys
import traceback
s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')
def lambda_handler(event, context):
numeric_fields = event['numeric_fields']
list_fields = event['list_fields']
targets = event['targets']
for target in targets:
bucket = target['bucket']
key = target['key']
tableName = target['table']
#get() does not store in memory
try:
obj = s3.Object(bucket, key).get()['Body']
except:
print(f"S3 Object s3://{bucket}/{key} could not be opened. Check request parameters.")
return {
'statusCode': 404,
'body': json.dumps(f'S3 Object s3://{bucket}/{key} could not be opened. Check request parameters.')
}
try:
table = dynamodb.Table(tableName)
# 遅延評価のためテーブル情報取得で存在確認
print(table.load())
except Exception as e:
print(traceback.format_exc())
print(f"Error loading DynamoDB table {tableName}. Check if table was created correctly and request parameters.")
return {
'statusCode': 404,
'body': json.dumps(f'Error loading DynamoDB table {tableName}. Check if table was created correctly and request parameters.')
}
batch_size = 100
batch = []
#DictReader is a generator; not stored in memory
for row in csv.DictReader(codecs.getreader('utf-8')(obj)):
# 数値が文字列として扱われてしまい DynamoDB に投入出来ないため int に変換する必要がある
# パラメータとして渡された int に変換したいカラムを文字列から辞書に変換し OrderedDict を更新する
for numeric_field in numeric_fields:
numeric_field_value = int(row[numeric_field])
numeric_field_dict = eval(f'{{"{numeric_field}": {numeric_field_value}}}')
row.update(numeric_field_dict)
# 以下は JSON 文字列のリストが来る想定で実装しています。必要に応じて修正してください。
for list_field in list_fields:
list_field_value = eval(row[list_field])
l = []
for x in list_field_value:
j = json.dumps(x)
l.append(j)
list_field_dict = eval(f'{{"{list_field}": {l}}}')
row.update(list_field_dict)
if len(batch) >= batch_size:
write_to_dynamo(tableName, batch)
batch.clear()
batch.append(row)
if batch:
write_to_dynamo(tableName, batch)
return {
'statusCode': 200,
'body': json.dumps('Uploaded to DynamoDB Table')
}
def write_to_dynamo(tableName, rows):
try:
table = dynamodb.Table(tableName)
# 遅延評価のためテーブル情報取得で存在確認
print(table.load())
except Exception as e:
print(traceback.format_exc())
print(f"Error loading DynamoDB table {tableName}. Check if table was created correctly and request parameters.")
return {
'statusCode': 404,
'body': json.dumps(f'Error loading DynamoDB table {tableName}. Check if table was created correctly and request parameters.')
}
try:
with table.batch_writer() as batch:
for i in range(len(rows)):
batch.put_item(
Item=rows[i]
)
except Exception as e:
print(traceback.format_exc())
print(f"Error executing batch_writer. DynamoDB table {tableName}")
return {
'statusCode': 500,
'body': json.dumps(f'Error executing batch_writer. DynamoDB table {tableName}')
}
Runtime: python3.7
Timeout: 900
MemorySize: 512
Lambda 実行方法
- イベント JSON に以下のようなパラメータを入力し [テスト] を実行します。
- 数値型のフィールドがある場合は numeric_fields にフィールド名を入れてください。
- list 型のフィールドがある場合は list_fields にフィールド名を入れてください。
- bucket には CSV が格納されている S3 のバケット名を入力してください。
- key には CSV のオブジェクトキを入力してください。
- table には投入先の DynamoDB テーブル名を入力してください。
{
"numeric_fields": [ ],
"list_fields": [ ],
"targets": [
{
"bucket": "your-bucket",
"key": "example/your-csv.csv",
"table": "your-dynamodb-table"
}
]
}