Boto3でEMRを使う方法まとめ

EMR

Python Boto3 で Amazon EMR を操作する方法をまとめました。
EMR Cluster の起動から、ステップの追加、監視、EMR Cluster の停止まで Boto3 で出来ます。
また、これらの処理を Class にしたので汎用的に使えます。

スポンサーリンク

EMR Cluster を起動する

  • run_job_flow() で EMR クラスターを起動出来ます。
  • レスポンスに含まれる JobFlowId は クラスター ID に相当します。
    • クラスター ID をキーにステップを追加したり EMR クラスターを停止したりします。
  • Steps を追加すれば EMR クラスター起動時にステップ実行することも可能です。
import boto3

client = boto3.client('emr', 'ap-northeast-1')
emr_cluster_conf = {
    'Instances': {
        'Placement': {},
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
        'Ec2KeyName': 'dev',
        'Ec2SubnetId': 'subnet-xxxxxxxxxxxxxxxxx',
        'HadoopVersion': '3.2.1',
        'InstanceFleets': [],
        'InstanceGroups': [
            {
            'EbsConfiguration': {
                'EbsBlockDeviceConfigs': []
            },
            'InstanceCount': 1,
            'InstanceRole': 'MASTER',
            'InstanceType': 'm5.xlarge',
            'Market': 'ON_DEMAND',
            'Name': 'マスターインスタンスグループ - 1',
            'Configurations': []
            },
            {
            'EbsConfiguration': {
                'EbsBlockDeviceConfigs': []
            },
            'InstanceCount': 2,
            'InstanceRole': 'CORE',
            'InstanceType': 'm5.xlarge',
            'Market': 'ON_DEMAND',
            'Name': 'コアインスタンスグループ - 2',
            'Configurations': []
            }
        ],
        'AdditionalMasterSecurityGroups': [
            'sg-xxxxxxxxxxxxxxxxx'
        ],
        'AdditionalSlaveSecurityGroups': [
            'sg-xxxxxxxxxxxxxxxxx'
        ]
    },
    'VisibleToAllUsers': True,
    'EbsRootVolumeSize': 10,
    'StepConcurrencyLevel': 1,
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'LogUri': 's3://dev/logs/elasticmapreduce/',
    'Name': 'dev-cluster',
    'ReleaseLabel': 'emr-6.5.0',
    'ServiceRole': 'EMR_DefaultRole',
    'Applications': [
        { 'Name': 'Hadoop' },
        { 'Name': 'Hive' },
        { 'Name': 'Spark' }
    ],
    'Configurations': [
        {
            'Classification': 'hbase',
            'Properties': {
            'hbase.emr.storageMode': 's3'
            }
        },
        {
            'Classification': 'hbase-site',
            'Properties': {
            'hbase.rootdir': 's3://hbase-emr/'
            }
        },
        {
            'Classification': 'hive-site',
            'Properties': {
            'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory',
            'hive.execution.engine': 'tez'
            }
        }
    ],
    'BootstrapActions': [
        {
            'Name': 'BOOTSTRAP',
            'ScriptBootstrapAction': {
                'Path': 's3://dev/emr_bootstrap.sh',
                'Args': []
            }
        },
    ],
    'Tags': [
        {
            'Key': 'YourTagKey',
            'Value': 'your-tag-value'
        }
    ],
    'AutoTerminationPolicy': {
        'IdleTimeout': 3600
    }
}
res = client.run_job_flow(**emr_cluster_conf)
cluster_id = res['JobFlowId']

run_job_flow() のレスポンス

{
    'JobFlowId': 'string',
    'ClusterArn': 'string'
}

ステップ追加

  • add_job_flow_steps() で EMR クラスターにステップを追加出来ます。
    • どの EMR クラスターにどういったステップを追加するかをパラメータとして渡します。
  • レスポンスとして ステップ ID が返ってきます。
    • list_steps() でステップの実行状況を確認するのに使います。
import boto3

client = boto3.client('emr', 'ap-northeast-1')
cluster_id = 'xxxxxxxxxxxxx'
steps = [
    {
        'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Properties': [],
        'Args': [
            'spark-submit',
            '--deploy-mode',
            'client',
            's3://test.py',
            '-p',
            'some_parameter'
        ]
        },
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'Name': 'test'
    }
]
res = client.add_job_flow_steps(
    JobFlowId = cluster_id,
    Steps = steps
)
step_ids = res['StepIds']

add_job_flow_steps() のレスポンス

{
    'StepIds': [
        'string',
    ]
}

ステップ監視

  • list_steps() で EMR クラスターのステップの状態を確認出来ます。
    • クラスター ID と ステップ ID のリストをパラメータとして渡すことにより対象クラスターのステップの状態を取得します。
  • レスポンスとして ステップ情報 が返ってきます。
import boto3
import time
import sys

client = boto3.client('emr', 'ap-northeast-1')
cluster_id = 'xxxxxxxxxxxxx'
step_ids = ['aaaaaaaaaa', 'bbbbbbbbbb']
# 追加したステップの状態を確認する
res = client.list_steps(ClusterId=cluster_id, StepIds=step_ids)
step_states = []
for i in range(len(res['Steps'])):
    step_states.append(res['Steps'][i]['Status']['State'])
if any([step_state in ['CANCELLED', 'FAILED'] for step_state in step_states]):
    logger.error('ステップがキャンセルまたは失敗したため処理を終了します。')
    sys.exit(1)

# いずれかのステップの状態が待機か実行中の間はステップの状態を監視する
while any([step_state in ['PENDING', 'RUNNING'] for step_state in step_states]):
    logger.info("ステップ実行が完了するまで待機します。")
    time.sleep(60)
    res = client.list_steps(ClusterId=cluster_id, StepIds=step_ids)
    step_states = []
    for i in range(len(res['Steps'])):
        step_states.append(res['Steps'][i]['Status']['State'])
    if any([step_state in ['CANCELLED', 'FAILED'] for step_state in step_states]):
        logger.error('ステップがキャンセルまたは失敗したため処理を終了します。')
        sys.exit(1)

list_steps() のレスポンス

{
    'Steps': [
        {
            'Id': 'string',
            'Name': 'string',
            'Config': {
                'Jar': 'string',
                'Properties': {
                    'string': 'string'
                },
                'MainClass': 'string',
                'Args': [
                    'string',
                ]
            },
            'ActionOnFailure': 'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE',
            'Status': {
                'State': 'PENDING'|'CANCEL_PENDING'|'RUNNING'|'COMPLETED'|'CANCELLED'|'FAILED'|'INTERRUPTED',
                'StateChangeReason': {
                    'Code': 'NONE',
                    'Message': 'string'
                },
                'FailureDetails': {
                    'Reason': 'string',
                    'Message': 'string',
                    'LogFile': 'string'
                },
                'Timeline': {
                    'CreationDateTime': datetime(2015, 1, 1),
                    'StartDateTime': datetime(2015, 1, 1),
                    'EndDateTime': datetime(2015, 1, 1)
                }
            }
        },
    ],
    'Marker': 'string'
}

EMR クラスター停止

terminate_job_flows() で EMR クラスターを停止出来ます。

import boto3
import time

client = boto3.client('emr', 'ap-northeast-1')
cluster_id = 'xxxxxxxxxxxxx'
client.terminate_job_flows(JobFlowIds=[cluster_id])

EMR 操作クラス

これまでの処理をクラスにまとめました。

import boto3

class EMRManager:
    """
    EMR 操作クラス
    """
    def __init__(self, region: str) -> None:
        # boto3 で emr クライアントを初期化する際にリージョンを指定する必要がある
        self.client = boto3.client('emr', region)

    def create_emr_cluster(self, emr_cluster_conf: dict) -> str:
        """
        EMR クラスターを起動します。
        """
        res = self.client.run_job_flow(**emr_cluster_conf)
        self.cluster_id = res['JobFlowId']
        return self.cluster_id

    def add_steps(self, cluster_id: str, steps: list) -> list:
        """
        EMR クラスターでステップ実行します。
        """
        res = self.client.add_job_flow_steps(
            JobFlowId = cluster_id,
            Steps = steps
        )
        step_ids = res['StepIds']
        return step_ids

    def wait_steps(self, cluster_id: str, step_ids: list) -> None:
        """
        EMR クラスターに追加したステップの実行が完了するまで待機します。
        """
        # 追加したステップの状態を確認する
        res = self.client.list_steps(ClusterId=cluster_id, StepIds=step_ids)
        step_states = []
        for i in range(len(res['Steps'])):
            step_states.append(res['Steps'][i]['Status']['State'])
        if any([step_state in ['CANCELLED', 'FAILED'] for step_state in step_states]):
            logger.error('ステップがキャンセルまたは失敗したため処理を終了します。')
            sys.exit(1)

        # いずれかのステップの状態が待機か実行中の間はステップの状態を監視する
        while any([step_state in ['PENDING', 'RUNNING'] for step_state in step_states]):
            time.sleep(60)
            res = self.client.list_steps(ClusterId=cluster_id, StepIds=step_ids)
            step_states = []
            for i in range(len(res['Steps'])):
                step_states.append(res['Steps'][i]['Status']['State'])
            if any([step_state in ['CANCELLED', 'FAILED'] for step_state in step_states]):
                logger.error('ステップがキャンセルまたは失敗したため処理を終了します。')
                sys.exit(1)

    def remove_emr_cluster(self, cluster_id: str) -> None:
        """
        EMR クラスターを停止します。
        """
        self.client.terminate_job_flows(JobFlowIds=[cluster_id])

以下のように EMR 操作クラスから簡単に EMR を操作出来ます。

# emr_cluster_conf に EMR クラスターの設定を定義
emr_cluster_conf = {}

# steps に実行したいステップ情報を定義
steps = [] 

# EMR 操作クラスのインスタンスを生成
emr_manager = EMRManager('ap-northeast-1')

# EMR クラスター起動
cluster_id = emr_manager.create_emr_cluster(emr_cluster_conf)

# EMR クラスターにステップを追加
step_ids = emr_manager.add_steps(cluster_id, steps)

# ステップ実行が完了するまで待機
emr_manager.wait_steps(cluster_id, step_ids)

# EMR クラスター停止
emr_manager.remove_emr_cluster(cluster_id)
タイトルとURLをコピーしました