ビッグデータ分析では処理前に読み込むファイルが存在するか確認したり, 排他制御したり等色々細かい対応が必要だったりします。
そういった時に使う処理のサンプルコードをまとめましたので使えるパターンがありましたらぜひ参考にしてみてください。
前提
- Click パッケージを使って Python のバッチを実装
- ロギングは logging パッケージを使用
- この辺はお好みのものをご使用いただければと思います。
リトライ時の処理スキップ
- ある処理を実行する前に処理結果が既に HDFS に存在するか確認し, 存在する場合は処理をスキップするようにします。
- 強制実行オプションを設け, 指定された場合は処理結果が既にある場合でも処理を実行するようにします。
# 強制実行オプション
@click.option(
'--force',
'-f',
'force',
is_flag=True,
help='強制実行フラグ。True の場合は処理結果が存在する場合でも処理も再実行する。'
)
def exists_in_hdfs(hdfs_path: str) -> bool:
"""HDFS上のパスに指定したディレクトリ・ファイルが存在するか否かを返します。
Arguments:
hdfs_path {str} -- HDFS 上のパス
Returns:
bool -- 存在する場合は True, 存在しない場合は False
"""
result = subprocess.run(["hadoop", "fs", "-test", "-e", hdfs_path])
if result.returncode == 0:
return True
else:
return False
def exists_all_in_hdfs(src_list) -> bool:
"""リストで指定したディレクトリ・ファイルが全て HDFS に存在するかを検証します。
Arguments:
src_tupple -- HDFS のパス
Returns:
bool -- 全て存在すれば True, それ以外の場合は False
"""
for src in src_list:
if(not exists_in_hdfs(src)):
logger.error(f'{src} が存在しません。')
return False
return True
logger.info('input が HDFS に存在するか検証します。')
if not common_function.exists_all_in_hdfs(input):
logger.info('input に不備があるため処理を終了します。')
sys.exit(1)
logger.info('input が HDFS に全て存在することを確認しました。')
インプットの存在確認
- 処理実行前に読み込むディレクトリが HDFS に存在するか確認し,存在しない場合は処理を終了するようにします。
def exists_in_hdfs(hdfs_path: str) -> bool:
"""HDFS上のパスに指定したディレクトリ・ファイルが存在するか否かを返します。
Arguments:
hdfs_path {str} -- HDFS 上のパス
Returns:
bool -- 存在する場合は True, 存在しない場合は False
"""
result = subprocess.run(["hadoop", "fs", "-test", "-e", hdfs_path])
if result.returncode == 0:
return True
else:
return False
# force は強制実行オプション
# Hadoop で処理が成功した場合 _SUCCESS ファイルが生成されるためそれが存在するかを確認すれば処理が成功したかが判断できる
if not force and common_function.exists_in_hdfs(f'{output}/_SUCCESS'):
logger.info('既に処理済みのため処理をスキップします。')
return
リトライ時の多重処理検知
- 実行するジョブが YARN に既に登録されているかを確認することでリトライ時にジョブを多重起動しないようにします。
def duplicate_job(job_name: str):
"""
YARN に既にジョブが登録されているかを確認します。
"""
c = f"yarn application -list 2> /dev/null | grep '{job_name}'"
logger.info(c)
p = subprocess.run(c, stdout = subprocess.PIPE, shell = True)
logger.info(f'登録済みジョブ: {p.stdout.decode("utf8")}')
if(p.stdout.decode('utf8') and p.stdout.decode('utf8') != ''):
logger.info(f'ジョブ「{job_name}」が重複しています。')
return True
else:
logger.info(f'ジョブ「{job_name}」は重複していません。')
return False
logger.info('ジョブが重複起動していないか検証します。')
if common_function.duplicate_job(job_name):
logger.info('同じジョブが既に YARN に登録されているため処理を終了します。')
sys.exit(1)
logger.info('ジョブが重複起動していないことを確認しました。')
外部 API 並列処理
外部 API 周りで検討すべき異常ケースは以下が考えられます。
- 前のバッチで 外部 API 実行中に次のバッチが走り 外部 API を重複起動しようとしてしまう
- 外部 API 異常終了後にデータが欠損したまま処理が進んでしまう
- 外部 API 異常終了後に後続の処理で異常終了し,次のバッチを起動しても 外部 API が再実行されない
- 外部 API 正常終了後に後続の処理で異常終了し,次のバッチでまた 外部 API を実行しようとする
そこで, 外部 API ラッパーにて 外部 API の処理状況を任意の場所に出力するようにします。
- 処理を開始したら _RUNNING ファイルを置く
- 処理中にエラーが発生したら _ERROR ファイルを置く
- 処理が完了したら _SUCCESS ファイルを置く
また, バッチにて 外部 API の処理状況を参照するようにします。
- 外部 API 実行前に処理状況を確認する
- _RUNNING の場合はケース 1 に該当するためバッチを終了する
- _ERROR の場合はケース 3 に該当するため外部 API を再実行する
- _SUCCESS の場合はケース 4 に該当するため処理をスキップする
- 外部 API 実行後に処理状況を確認する
- _ERROR の場合はケース 2 に該当するためバッチを終了する
- _SUCCESS の場合は次の処理に進む
# _RUNNING ファイル作成
for f in glob.glob(f'{status_output_dir}/*', recursive=True):
os.remove(f)
status_running = pathlib.Path(f'{status_output_dir}/_RUNNING')
status_running.touch()
# _ERROR ファイル作成
for f in glob.glob(f'{status_output_dir}/*', recursive=True):
os.remove(f)
status_error = pathlib.Path(f'{status_output_dir}/_ERROR')
status_error.touch()
# _SUCCESS ファイル作成
for f in glob.glob(f'{status_output_dir}/*', recursive=True):
os.remove(f)
status_success = pathlib.Path(f'{status_output_dir}/_SUCCESS')
status_success.touch()
# 処理状況確認
if not force and os.path.exists(f'{status_output_dir}/_SUCCESS'):
logger.info('既に処理済みのため処理をスキップします。')
return
if os.path.exists(f'{status_output_dir}/_RUNNING'):
logger.info('外部 API で処理実行中のため処理を終了します。')
sys.exit(1)
if os.path.exists(f'{status_output_dir}/_ERROR'):
logger.info('前回実行時にエラーが発生したため再実行します。')