ビッグデータ分析で使用するPythonコードまとめ

bigdata ビッグデータ

Python でビッグデータ分析処理をする際によく使うコードをまとめました。

Python でシェルコマンドを実行したり, 標準出力を扱ったりとシェル芸を組み合わせることもあります。

ロギングや Pandas でのデータ加工, テンプレートファイルを読み込んで動的に変数を設定するなどよく使う処理のサンプルを掲載していますのでぜひ参考にしてみてください。

スポンサーリンク

環境構築

  • 必要に応じて仮想環境を作る。

仮想環境作成

$ python3 -m venv test

仮想環境有効化

$ source test/bin/activate
(test)$

仮想環境無効化

(test)$ deactivate
$

文字列処理

文字列を切り出す

  • [開始インデックス:終了文字番号(インデックスではないことに注意)]とする。
s = "2019-06-01"
print(f"{s[0:4]}-{s[5:7]}-{s[8:10]}")

エスケープ

波括弧のエスケープ

  • 波括弧は波括弧でエスケープする。
var = 'aiuto'
print( f"val is {{{var}}}" )

ディレクトリ操作

ディレクトリ作成

import os

os.makedirs('tmp', exist_ok=True)

class

  • プロパティ等を別ファイルにしたい場合等に使用する。
classsample
├── main.py
└── prop
    └── user_property.py

main.py

from prop.user_property import UserProperty

user_property = UserProperty({'first_name': 'イチロー', 'family_name': 'テスト'})
print(f'{user_property.FAMILY_NAME} {user_property.FIRST_NAME}')

prop/user_property.py

from typing import ClassVar, Dict, List, Any

class UserProperty:
    def __init__(self, kwargs: Dict[str, Any]):
        self.FIRST_NAME = kwargs['first_name']
        self.FAMILY_NAME = kwargs['family_name']

実行結果

$ python main.py
テスト イチロー

ジェネレータ

  • yield でジェネレータイテレータ(ジェネレータで生成された要素を1個ずつ取り出すことが出来るオブジェクト)を生成する。
  • ジェネレータイテレータから要素を取り出すタイミングでジェネレータ関数が実行される。
  • 例えば1,000,000行のレコードを読み込むとき,リストに格納してreturnする関数を用意するとメモリを多く使用してしまうがジェネレータ関数を使えば1行ずつ読み込むためメモリへの負荷を抑えることが出来る。

main.py

def genarate_items():
    yield 1
    yield 2
    yield 3

if __name__ == "__main__":
    print('ジェネレータイテレータ生成')
    items = genarate_items()
    print('イテレーション開始')
    for item in items:
        print(item)
    print('イテレーション終了')

実行結果

$ python main.py
ジェネレータイテレータ生成
イテレーション開始
1
2
3
イテレーション終了

subprocess

  • subprocess で Python からシェルコマンドを実行出来る。
    • むしろシェルコマンドを実行しないとビッグデータ分析出来ない。

シェルコマンド実行

import subprocess

c = ['hadoop', 'fs', '-rm', '-r', '/tmp/test']
subprocess.run(c)

xargs でシェルコマンドを並列実行

  • パイプを使うには shell=True とする必要がある。
  • subprocess.Popen() でプロセスを受け取り,wait()で処理完了を待機出来る。
    • 後続の処理はプロセスが完了するまで実行されない。
  • サンプルは Python から tmp に入った各ファイルを cat して test.sh を並列数10で実行している。
  • 途中の処理だけ並列実行して処理時間を短縮したい場合に使用する。
c = 'ls tmp/* | xargs -L 1 -P 10 -t bash -c \'cat $0 | test.sh -'
p = subprocess.Popen(c, shell = True)
p.wait()

# 後続の処理

標準出力の扱い

  • stdout, stderr は必要なければあえて pipe に渡さない。
    • python test.py &> log/test.log で標準出力を受け取れる。

click

  • click で簡単にターミナルで実行可能なコマンドを実装出来る。
  • @click.command() でコマンドを実装する。
  • @click.group(),add_command() で複数コマンドを実装可能。
  • @click.option() でコマンドの引数を追加出来る。
  • click.get_text_stream(“stdin”, encoding=”utf-8″) で標準入力を受け取る。
  • click.echo() で標準出力に出力する。
click
├── cli.py
└── command
    └── hello
        └── cli.py

click/cli.py

import click

from command.hello.cli import hello

@click.group()
def entry_point():
    print('click/cli.py のメッセージ。')

entry_point.add_command(hello)

def init():
    entry_point(**{})

if __name__ == '__main__':
    init()

click/command/hello/cli.py

import click

@click.command('hello')
@click.option('--msg', '-m', 'msg', type=str, help='表示したいメッセージを入力してください。')
def hello(**kwargs):
    print(f'入力されたメッセージ:{kwargs["msg"]}')
    print('click/cmd/hello/cli.py のメッセージ。')
    lines = click.get_text_stream("stdin", encoding="utf-8")
    for line in lines:
        print(line)
    click.echo('標準出力に出力。')

test.tsv

001    a
002    b
003    c
$ cat test.tsv | python cli.py hello -m 'テスト'
click/cli.py のメッセージ。
入力されたメッセージ:テスト
click/cmd/hello/cli.py のメッセージ。
001    a
002    b
003    c
標準出力に出力。

pandas

  • データを加工する際に使用する。

tsv 読み込み

  • delimiter で区切り文字を指定出来る。
  • names で列名を設定出来る。
  • dtype でデータ型を指定出来る。
  • 大容量ファイルを扱う場合は low_memory=False とする。
import pandas as pd

df = pd.read_csv('user.tsv', delimiter='\t', header=None, names=['id', 'name'], dtype={'id': str, 'name': str}, low_memory=False)

tsv 出力

df.to_csv('test.tsv', sep='\t')

特定の列を指定して出力

  • 分析する上では必要だが結果として不要な列を配所したい時に使用する。
columns = ['id', 'name']
df[colums].to_csv('test.tsv', sep='\t', index=False)

件数を絞って出力する

  • サンプリングで使用する。
df.sample(n=100).to_csv('test.tsv', sep='\t')

重複行削除

df.drop_duplicates()

query でダブルクォートを使う

df.query('row_name.str.contains("\\\"keyword\\\"")')

エラーハンドリング

Python スクリプト強制終了

  • エラー発生時に Python スクリプトを強制終了したい時に使用する。
import sys

sys.exit(1)

ファイル存在確認

  • データ分析をかける前に必要なインプットが揃っているかを見る際に使う。
import os

if os.path.exists():
    print('ファイルが存在します。後続の処理を実行します。')
else:
    print('ファイルが存在しません。処理を終了します。')
    sys.exit(1)

ロギング

  • Python の標準モジュール logging を使用する。
  • 以下のような構成でのロギングサンプルを記載する。
test
├── module
│   └── sub.py
└── main.py

main.py

# 自作モジュール
import module.sub as sub

from logging import CRITICAL, DEBUG, ERROR, INFO, WARNING
from logging import NullHandler, StreamHandler, basicConfig, getLogger, Formatter
from logging.handlers import TimedRotatingFileHandler

logger = getLogger(__name__)
logger.addHandler(NullHandler())
logger.setLevel(DEBUG)
sh = StreamHandler()

def init() -> None:
    basicConfig(
        handlers=[sh],
        format="[%(asctime)s] %(name)s %(levelname)s: %(message)s",
        datefmt="%y-%m-%d %H:%M:%S",
    )
    root_logger = getLogger()
    root_logger.setLevel(DEBUG)

    rfh = TimedRotatingFileHandler(
        "log/test.log",
        when="midnight",
        backupCount=30,
    )
    format_template = (
        f"PID:%(process)d [%(asctime)s] %(name)s %(levelname)s: %(message)s"
    )
    log_format = Formatter(fmt=format_template, datefmt="%y-%m-%d %H:%M:%S")
    rfh.setFormatter(log_format)
    root_logger.addHandler(rfh)

    logger.debug("スクリプト実行開始")

if __name__ == "__main__":
    init()
    # 自作モジュールの関数を呼び出す
    sub.hello()

module/sub.py

from logging import getLogger

logger = getLogger(__name__)

def hello():
    print('hello! this is sub module.')
    logger.debug('sub moduleから出力')
$ python main.py
[20-06-25 14:20:56] __main__ DEBUG: スクリプト実行開始
hello! this is sub module.
[20-06-25 14:20:56] module.sub DEBUG: sub moduleから出力

$ head log/test.log
PID:15171 [20-06-25 14:20:56] __main__ DEBUG: スクリプト実行開始
PID:15171 [20-06-25 14:20:56] module.sub DEBUG: sub moduleから出力

その他

ファイル件数取得

  • 1行でシェルコマンドを実行することなく取得可能。
cnt = str(sum(1 for line in open('test.tsv')))

ファイルを1行の文字列にする

  • 複数キーワードをOR条件で1行にする際に使用した。

main.py

import os

def load_file_as_one_line(file, sep):
    with open(file) as f:
        lines_one_str = ''
        # a\nb\nc\n -> a|b|c|d
        lines = f.readlines()
        for line in lines:
            w = line.rstrip(os.linesep)
            if(w != ''):
                lines_one_str += w + sep
        return lines_one_str[:-1]

print(load_file_as_one_line('data.txt', '|'))
$ cat data.txt
テスト
test
テキスト
text
テースト
$ python main.py
テスト|test|テキスト|text|テースト|taste

日付パーティションのディレクトリを動的に生成する

  • データ分析では n ヵ月分のデータを読み込む,n 日分のデータを読み込むという場面がよくあるのでそういった時に使用する。

main.py

import datetime
from dateutil.relativedelta import relativedelta

def out_term(year, month, term, base_dir):
    d = datetime.date(year, month, 1)
    txt = ""

    for i in range(term):
        txt += base_dir + (d + relativedelta(months=i)).strftime("%Y/%m")
        if(i != term - 1) :
            txt += ","
    return txt

def out_reverse_term_by_day(d, reverse_term, base_dir):
    txt = ""

    d = d - relativedelta(days=reverse_term - 1)
    for i in range(reverse_term):
        txt += base_dir + (d + relativedelta(days=i)).strftime("%Y/%m/%d")
        if(i != reverse_term - 1) :
            txt += ","
    return txt

# 2019-11 から4ヵ月分のディレクトリを用意する
print(out_term(2019, 11, 4, '/tmp/input/'))
# 2019-11-02 から5日遡った分のディレクトリを用意する
print(out_reverse_term_by_day(datetime.date(2019, 11, 2), 5, '/tmp/input/'))

実行結果

$ python main.py
/tmp/input/2019/11,/tmp/input/2019/12,/tmp/input/2020/01,/tmp/input/2020/02
/tmp/input/2019/10/29,/tmp/input/2019/10/30,/tmp/input/2019/10/31,/tmp/input/2019/11/01,/tmp/input/2019/11/02

Pigテンプレートに条件式やパス等を動的に埋め込む

  • 辞書で置換したい単語と代入したい値を定義し,テンプレートに埋め込んだPigを生成する。
  • 複雑な条件式や動的に変わるパスを埋め込みたい時に使用する。

main.py

def substitute_condition(template, output, target_word, condition):
    txt = ''
    with open(template) as f:
        lines_one_str = f.read()
        txt = lines_one_str.replace(target_word, condition)
    with open(output, mode='w') as f:
        f.write(txt)

def translate(template: str, output: str, d: {str, str}):
    for i, (k, v) in enumerate(d.items()):
        if i == 0:
            substitute_condition(template, output, k, v)
        else:
            substitute_condition(output, output, k, v)

d = {'$INPUT': '/tmp/input', '$COND': 'テスト|test', '$OUTPUT': '/tmp/output'}
translate('template.pig', 'output.pig', d)

実行

$ python main.py

template.pig

L = LOAD '$INPUT' USING PigStorage('\t');
F = FILTER L BY note matches '$COND';
FS -rm -r -f -skipTrash $OUTPUT
STORE F INTO '$OUTPUT' USING PigStorage('\t', '-schema');

output.pig

L = LOAD '/tmp/input' USING PigStorage('\t');
F = FILTER L BY note matches 'テスト|test';
FS -rm -r -f -skipTrash /tmp/output
STORE F INTO '/tmp/output' USING PigStorage('\t', '-schema');

メール送信

def send_mail(subject: str, body: str, from: str, to: str, svr: str, port: str, id: str, password: str):
    msg = MIMEText(body, 'html')
    msg['Subject'] = subject
    msg['From'] = from
    msg['To'] = to

    server = smtplib.SMTP_SSL(svr, port)
    # SSL の場合
    # server = smtplib.SMTP_SSL(svr, port, context=ssl.create_default_context())
    server.login(id, password)
    server.send_message(msg)

関連記事

タイトルとURLをコピーしました