ビッグデータ処理で最新のユーザー情報を大量に取得しなければいけない場合があります。
そういった場合にユーザー1人ずつ順番で処理していたら数百万オーダーになるとチリツモで途方も無い処理時間がかかってしまいます。
そこでユーザーを分割して分割したユーザーごとに並列で API を実行すれば並列数分処理時間が短縮されます。
この記事ではそういったケースに対応する方法を記載します。
前提
設定は下記とします。
- HDFS に処理対象のユーザーのリストが置かれている。
- API はユーザー ID のリストを引数に各ユーザーの詳細情報を取得し tsv 出力する。
通常, HDFS のユーザーリストをそのまま渡して API を叩くと思いますが, 数百万オーダーになるとチリツモで処理時間がかかります。
そこで, HDFS のユーザーリストを分割して, 分割したユーザーリストを並列処理で API 実行すれば処理時間は並列数分短くなります。
以下にそのサンプルコードを記載します。
API を並列処理する方法
① HDFS に格納されたユーザーリストを分割する
- HDFS から対象のユーザー ID を取り出します。
- uniq で重複排除します。
- 標準入力を split で n 分割します。
- 行数にデータ件数の 1/n を指定することで標準入力を n 分割します。
- 分割されたファイルは tmp/000* の形式で出力されます(分割したらファイル出力されてしまう仕様です)。
split_cnt = 10
text_hdfs_dir = '/text'
tmp_dir = 'tmp'
# HDFS に user_id, attribute_1, attribute_2, ... の形式で格納されており 1 列目がユーザー ID とします。
c = f'hadoop fs -cat \'{text_hdfs_dir}/part-*\' | cut -f1 | sort | uniq | split -l $(($(hadoop fs -cat \'{text_hdfs_dir}/part-*\' | cut -f1 | sort | uniq | wc -l)/{split_cnt}+1)) -d -a 5 - {tmp_dir}/'
subprocess.run(c, shell = True)
② xargs で API を並列実行する
- xargs で API を並列実行します。
- api_wrapper.py は API を実行する Python スクリプトとします。
- ls で分割したユーザーリストのファイル名を表示し xargs に渡します。
- xargs でユーザーリストを cat して API に渡し, 標準出力として返ってくる結果を tsv に流します。
split_cnt = 10
text_hdfs_dir = '/text'
tmp_dir = 'tmp'
text_tsv = 'text.tsv'
status_output_dir = 'status'
c = f'ls {tmp_dir}/000* | xargs -L 1 -P {split_cnt} -t bash -c \'cat $0 | python3 api_wrapper.py -S {status_output_dir} 1>> {text_tsv}\''
p = subprocess.Popen(c, shell = True)
# 並列処理が完了するまで待機する
p.wait()
③分割したユーザーリストを消す
- 並列処理完了後に分割出力したユーザーリストを消します。
tmp_dir = 'tmp'
c = f'rm {tmp_dir}/000*'
subprocess.run(c, shell = True)