Apache Pig では Hadoop の MapReduce 処理を簡単に記述することが出来ます。
そんな Apache Pig でよく使う処理をまとめました。
(私が Apache Pig を触り始める前にこういった記事が欲しかった。。。)
テキスト処理
改行除去
- ^M(CR)も除きます。
REPLACE(REPLACE(REPLACE(REPLACE(text, '\\t', ''), '\\r\\n', ''), '\\n', ''), '\\\\^M', '')
CONCAT
- 文字列を結合します。
CONCAT('https://sample.jp?user=', user)
条件演算
INDEXOF
- 指定した文字列がカラムに含まれているかを見ます。
-- text に「プレゼント」が含まれていないレコードを抽出する
FILTERED = FILTER DATA BY INDEXOF(text, 'プレゼント') < 0
-- text に「ビッグデータ」が含まれているレコードを抽出する
FILTERED = FILTER DATA BY INDEXOF(text, 'ビッグデータ') > 0
日時フィルター
- ToDate() で文字列を日時に変換出来ます。
-- 2019-05-31 00:00:00 から 2019-06-01 00:00:00 に登録されたユーザを抽出する
FILTERED_USER = FILTER USER BY ToDate('2019-05-31 00:00:00','yyyy-MM-dd HH:mm:ss') <= ToDate(created,'yyyy-MM-dd HH:mm:ss') AND ToDate(created,'yyyy-MM-dd HH:mm:ss') < ToDate('2019-06-01 00:00:00','yyyy-MM-dd HH:mm:ss');
関係演算
ユニークを取る
- 重複を除きます。
UNIQ_USER = DISTINCT USER;
取得件数を制限する
-- 5 件に制限する
L = LIMIT DATA 5;
集合演算
内部結合する
USER1 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
USER2 = LOAD '/tmp/user2' USING PigStorage('\\t') AS (id:chararray);
J = JOIN USER1 BY id, USER2 BY id;
外部結合する
USER1 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
USER2 = LOAD '/tmp/user2' USING PigStorage('\\t') AS (id:chararray);
J = JOIN USER1 BY id LEFT OUTER, USER2 BY id;
差集合を取る
- JOIN して片方の Relation が NULL なレコードを抽出します。
USER1 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
USER2 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
J = JOIN USER1 BY id LEFT OUTER, USER2 BY id;
USER1_MINUS_USER2 = FILTER J BY USER2::id is NULL;
積集合を取る
- JOIN して片方の Relation が not NULL なレコードを抽出します。
USER1 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
USER2 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
J = JOIN USER1 BY id LEFT OUTER, USER2 BY id;
USER1_AND_USER2 = FILTER J BY USER2::id is not NULL;
出力
TSV 出力
FS -rm -r -f -skipTrash /tmp/some_dir/
STORE DATA INTO '/tmp/some_dir' USING PigStorage('\t');
- -schema オプションを付与することで load するときにカラム名と型を指定しなくてよくなります。
FS -rm -r -f -skipTrash /tmp/some_dir/
STORE DATA INTO '/tmp/some_dir' USING PigStorage('\t', '-schema');
ORC 出力
- データ量を削減したい場合に使用します。
FS -rm -r -f -skipTrash /tmp/some_dir/
STORE DATA INTO '/tmp/some_dir' USING OrcStorage('-c ZLIB');
Pig 実行
コマンド実行
- Pig ファイルを指定し pig コマンドを実行します。
$ pig test.pig
pig でパラメータ埋め込み
- パラメータをファイルの前に指定します。
$ INPUT_DIR="/tmp/input"
$ OUTPUT_DIR="/tmp/output"
$ pig -param INPUT_DIR=${INPUT_DIR} -param OUTPUT_DIR=${OUTPUT_DIR} test.pig
パラメータ数が多い場合
- パラメータ数が多すぎると下記エラーが発生します。
- Pigファイルのテンプレートを用意し, パラメータを動的に埋め込み, 引数でパラメータの指定をしないようにすれば回避出来ます。
2020-01-09 15:50:39,492 [MainThread] ERROR org.apache.pig.tools.grunt.GruntParser - ERROR 2998: Unhandled internal error. Vertex failed, vertexName=scope-4152, vertexId=vertex_1507856747070_358659_1_00, diagnostics=[Task failed, taskId=task_1507856747070_358659_1_00_000101, diagnostics=[TaskAttempt 0 failed, info=[Error: Fatal Error cause TezChild exit.:java.lang.StackOverflowError at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)
Pigファイルの条件が多い場合
- 下記例外が発生するため, -t PredicatePushdownOptimizer を付与します。
2020-01-29 10:21:32,056 [MainThread] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 2000: Error processing rule PredicatePushdownOptimizer. Try -t PredicatePushdownOptimizer
pig large.pig -t PredicatePushdownOptimizer
対話モードで実行
スキーマを確認する
$ pig
grunt> U = LOAD '/tmp/user' USING PigStorage('\\t') AS (id:chararray, name:chararray);
grunt> DESCRIBE U;
U: {id: chararray, name:chararray}
行数を取得する
$ pig
U = LOAD '/tmp/user' USING PigStorage('\\t') AS (id:chararray, name:chararray);
grunt> U_COUNT = FOREACH (GROUP U ALL) GENERATE COUNT(U);
grunt> DUMP U_COUNT;
(1024)
出力する
$ pig
grunt> U = LOAD '/tmp/user' USING PigStorage('\\t') AS (id:chararray, name:chararray);
grunt> L = LIMIT U 5;
grunt> DUMP L;
(00001, user1)
(00002, user2)
(00003, user3)
(00004, user4)
(00005, user5)
マクロ
- Pig に関数を記述し外部から読み込むことが出来ます。
- IMPORT で外部ファイルを読み込んで関数を使用します。
DEFINE exclude_advertisement(DATA) RETURNS FILTERED {
$FILTERED = FILTER $DATA BY
INDEXOF(text, '広告') < 0
AND INDEXOF(text, 'プレゼント') < 0
;
};
IMPORT 'macro.pig';
L = LOAD '/tmp/data' USING PigStorage('\t') AS (id:chararray, text:chararray);
F = exclude_advertisement(L);