【ビッグデータ分析】APIを並列処理化することによる処理時間短縮方法

bigdata ビッグデータ

ビッグデータ処理で最新のユーザー情報を大量に取得しなければいけない場合があります。
そういった場合にユーザー1人ずつ順番で処理していたら数百万オーダーになるとチリツモで途方も無い処理時間がかかってしまいます。
そこでユーザーを分割して分割したユーザーごとに並列で API を実行すれば並列数分処理時間が短縮されます。
この記事ではそういったケースに対応する方法を記載します。

スポンサーリンク

前提

設定は下記とします。

  • HDFS に処理対象のユーザーのリストが置かれている。
  • API はユーザー ID のリストを引数に各ユーザーの詳細情報を取得し tsv 出力する。

通常, HDFS のユーザーリストをそのまま渡して API を叩くと思いますが, 数百万オーダーになるとチリツモで処理時間がかかります。
そこで, HDFS のユーザーリストを分割して, 分割したユーザーリストを並列処理で API 実行すれば処理時間は並列数分短くなります。
以下にそのサンプルコードを記載します。

API を並列処理する方法

① HDFS に格納されたユーザーリストを分割する

  • HDFS から対象のユーザー ID を取り出します。
    • uniq で重複排除します。
  • 標準入力を split で n 分割します。
    • 行数にデータ件数の 1/n を指定することで標準入力を n 分割します。
    • 分割されたファイルは tmp/000* の形式で出力されます(分割したらファイル出力されてしまう仕様です)。
split_cnt = 10
text_hdfs_dir = '/text'
tmp_dir = 'tmp'
# HDFS に user_id, attribute_1, attribute_2, ... の形式で格納されており 1 列目がユーザー ID とします。
c = f'hadoop fs -cat \'{text_hdfs_dir}/part-*\' | cut -f1 | sort | uniq | split -l $(($(hadoop fs -cat \'{text_hdfs_dir}/part-*\' | cut -f1 | sort | uniq | wc -l)/{split_cnt}+1)) -d -a 5 - {tmp_dir}/'
subprocess.run(c, shell = True)

② xargs で API を並列実行する

  • xargs で API を並列実行します。
    • api_wrapper.py は API を実行する Python スクリプトとします。
    • ls で分割したユーザーリストのファイル名を表示し xargs に渡します。
    • xargs でユーザーリストを cat して API に渡し, 標準出力として返ってくる結果を tsv に流します。
split_cnt = 10
text_hdfs_dir = '/text'
tmp_dir = 'tmp'
text_tsv = 'text.tsv'
status_output_dir = 'status'
c = f'ls {tmp_dir}/000* | xargs -L 1 -P {split_cnt} -t bash -c \'cat $0 | python3 api_wrapper.py -S {status_output_dir} 1>> {text_tsv}\''
p = subprocess.Popen(c, shell = True)
# 並列処理が完了するまで待機する
p.wait()

③分割したユーザーリストを消す

  • 並列処理完了後に分割出力したユーザーリストを消します。
tmp_dir = 'tmp'
c = f'rm {tmp_dir}/000*'
subprocess.run(c, shell = True)
タイトルとURLをコピーしました