Apache Pig 使い方まとめ

bigdata ビッグデータ

Apache Pig では Hadoop の MapReduce 処理を簡単に記述することが出来ます。
そんな Apache Pig でよく使う処理をまとめました。
(私が Apache Pig を触り始める前にこういった記事が欲しかった。。。)


スポンサーリンク

テキスト処理

改行除去

  • ^M(CR)も除きます。
REPLACE(REPLACE(REPLACE(REPLACE(text, '\\t', ''), '\\r\\n', ''), '\\n', ''), '\\\\^M', '')

CONCAT

  • 文字列を結合します。
CONCAT('https://sample.jp?user=', user)

条件演算

INDEXOF

  • 指定した文字列がカラムに含まれているかを見ます。
-- text に「プレゼント」が含まれていないレコードを抽出する
FILTERED = FILTER DATA BY INDEXOF(text, 'プレゼント') < 0
-- text に「ビッグデータ」が含まれているレコードを抽出する
FILTERED = FILTER DATA BY INDEXOF(text, 'ビッグデータ') > 0

日時フィルター

  • ToDate() で文字列を日時に変換出来ます。
-- 2019-05-31 00:00:00 から 2019-06-01 00:00:00 に登録されたユーザを抽出する
FILTERED_USER = FILTER USER BY ToDate('2019-05-31 00:00:00','yyyy-MM-dd HH:mm:ss') <= ToDate(created,'yyyy-MM-dd HH:mm:ss') AND ToDate(created,'yyyy-MM-dd HH:mm:ss') < ToDate('2019-06-01 00:00:00','yyyy-MM-dd HH:mm:ss');

関係演算

ユニークを取る

  • 重複を除きます。
UNIQ_USER = DISTINCT USER;

取得件数を制限する

-- 5 件に制限する
L = LIMIT DATA 5;

集合演算

内部結合する

USER1 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
USER2 = LOAD '/tmp/user2' USING PigStorage('\\t') AS (id:chararray);
J = JOIN USER1 BY id, USER2 BY id;

外部結合する

USER1 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
USER2 = LOAD '/tmp/user2' USING PigStorage('\\t') AS (id:chararray);
J = JOIN USER1 BY id LEFT OUTER, USER2 BY id;

差集合を取る

  • JOIN して片方の Relation が NULL なレコードを抽出します。
USER1 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
USER2 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
J = JOIN USER1 BY id LEFT OUTER, USER2 BY id;
USER1_MINUS_USER2 = FILTER J BY USER2::id is NULL;

積集合を取る

  • JOIN して片方の Relation が not NULL なレコードを抽出します。
USER1 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
USER2 = LOAD '/tmp/user1' USING PigStorage('\\t') AS (id:chararray);
J = JOIN USER1 BY id LEFT OUTER, USER2 BY id;
USER1_AND_USER2 = FILTER J BY USER2::id is not NULL;

出力

TSV 出力

FS -rm -r -f -skipTrash /tmp/some_dir/
STORE DATA INTO '/tmp/some_dir' USING PigStorage('\t');
  • -schema オプションを付与することで load するときにカラム名と型を指定しなくてよくなります。
FS -rm -r -f -skipTrash /tmp/some_dir/
STORE DATA INTO '/tmp/some_dir' USING PigStorage('\t', '-schema');

ORC 出力

  • データ量を削減したい場合に使用します。
FS -rm -r -f -skipTrash /tmp/some_dir/
STORE DATA INTO '/tmp/some_dir' USING OrcStorage('-c ZLIB');

Pig 実行

コマンド実行

  • Pig ファイルを指定し pig コマンドを実行します。
$ pig test.pig

pig でパラメータ埋め込み

  • パラメータをファイルの前に指定します。
$ INPUT_DIR="/tmp/input"
$ OUTPUT_DIR="/tmp/output"
$ pig -param INPUT_DIR=${INPUT_DIR} -param OUTPUT_DIR=${OUTPUT_DIR} test.pig

パラメータ数が多い場合

  • パラメータ数が多すぎると下記エラーが発生します。
  • Pigファイルのテンプレートを用意し, パラメータを動的に埋め込み, 引数でパラメータの指定をしないようにすれば回避出来ます。
2020-01-09 15:50:39,492 [MainThread] ERROR org.apache.pig.tools.grunt.GruntParser - ERROR 2998: Unhandled internal error. Vertex failed, vertexName=scope-4152, vertexId=vertex_1507856747070_358659_1_00, diagnostics=[Task failed, taskId=task_1507856747070_358659_1_00_000101, diagnostics=[TaskAttempt 0 failed, info=[Error: Fatal Error cause TezChild exit.:java.lang.StackOverflowError at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)

Pigファイルの条件が多い場合

  • 下記例外が発生するため, -t PredicatePushdownOptimizer を付与します。
2020-01-29 10:21:32,056 [MainThread] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 2000: Error processing rule PredicatePushdownOptimizer. Try -t PredicatePushdownOptimizer
pig large.pig -t PredicatePushdownOptimizer

対話モードで実行

スキーマを確認する

$ pig
grunt> U = LOAD '/tmp/user' USING PigStorage('\\t') AS (id:chararray, name:chararray);
grunt> DESCRIBE U;
U: {id: chararray, name:chararray}

行数を取得する

$ pig
U = LOAD '/tmp/user' USING PigStorage('\\t') AS (id:chararray, name:chararray);
grunt> U_COUNT = FOREACH (GROUP U ALL) GENERATE COUNT(U);
grunt> DUMP U_COUNT;
(1024)

出力する

$ pig
grunt> U = LOAD '/tmp/user' USING PigStorage('\\t') AS (id:chararray, name:chararray);
grunt> L = LIMIT U 5;
grunt> DUMP L;
(00001, user1)
(00002, user2)
(00003, user3)
(00004, user4)
(00005, user5)

マクロ

  • Pig に関数を記述し外部から読み込むことが出来ます。
  • IMPORT で外部ファイルを読み込んで関数を使用します。
DEFINE exclude_advertisement(DATA) RETURNS FILTERED {
    $FILTERED = FILTER $DATA BY
        INDEXOF(text, '広告') < 0
        AND INDEXOF(text, 'プレゼント') < 0
        ;
};
IMPORT 'macro.pig';

L = LOAD '/tmp/data' USING PigStorage('\t') AS (id:chararray, text:chararray);
F = exclude_advertisement(L);
タイトルとURLをコピーしました