読者です 読者をやめる 読者になる 読者になる

ハウテレビジョン開発者ブログ

『外資就活ドットコム』を日夜開発している技術陣がプログラミングネタ・業務改善ネタ・よしなしごとについて記していきます。

Cloud Dataflow入門〜データ処理の実践

弊社ハウテレビジョンでは、週の1日をR&D dayとして、業務と直接関係しない技術を学んでみたり、今まであまり触れてこなかった領域を調べたりしています。

今回はCloud Dataflowに入門し、簡単なデータの分析コードを組み、動かしてみました。
とても簡単に強力な並列処理が出来るので、ログの分析などで活用できそうです。

f:id:itamisky:20170419112030j:plain

概略

Cloud Dataflowは、Google Cloudで提供されているデータ処理用のプラットフォームです。
2017/04現在、JavaとPythonのクライアントが提供されております(Pythonはβ版)。

「便利だよ!」という話はよく訊くのですが、実際に使ってみたことはなかったため、これを機に入門してみます。

できたこと

基本的なDataflowの使い方、そのコンセプトを把握しました。
また、簡単なデータ処理として、「Trump氏のツイート」を対象に「1週間ごとのつぶやき数」を集計、ただし「RTは除く」、という3つの条件でカウントをしました。

進めかた

利用方法をまとめてくれている有用なブログ記事もありますが、変化が激しいのとまとまった時間があるため、下記のGoogle公式のドキュメントを使って入門してみます。
https://cloud.google.com/dataflow/docs/

また、せっかくですので、Pythonのクライアントライブラリを使って進めます。

やったこと

クイックスタート

https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python

セットアップ
まずは新規プロジェクトを作り、APIを有効にします。

https://d2mxuefqeaa7sj.cloudfront.net/s_8ACB1D17AA95783CAEC7C335CBA2166A66ED243120AE0FF2F7C766C37CADD651_1492145849905_+2017-04-14+12.06.51.png

なお、どのGCPのプロジェクトでも必要なgcloud コマンドのインストール、認証などは事前に済ませているものとします。
次に、Cloud Storageにバケットを作成します。

今回は、プロジェクト名が dataflow-research 、バケット名が st-dataflow-research として進めます。
また、現状Pythonのバージョンは2.7のみ使えますので、pyenvで固定しておきます。

必要なコマンドはpipになっているため、簡単にインストールできます。
pip install google-cloud-dataflow

動作確認
一応確認しておきます。
ローカルでの実行、リモートでの実行のそれぞれが載っているので、両方試しておくと無難です。

さて、公式ドキュメントどおりリモート実行を下記コマンドで行うとエラーが出ます(2017/04現在)。

python -m apache_beam.examples.wordcount \
  --project $PROJECT \
  --job_name $PROJECT-wordcount \
  --runner BlockingDataflowPipelineRunner \
  --staging_location $BUCKET/staging \
  --temp_location $BUCKET/temp \
  --output $BUCKET/output

エラーメッセージはこの通り。
ValueError: Unexpected pipeline runner: BlockingDataflowPipelineRunner. Valid values are DirectRunner, EagerRunner, DataflowRunner, TestDataflowRunner or the fully qualified name of a PipelineRunner subclass.

BlockingDataflowPipelineRunner が無いため、DataflowRunner に書き換えて実行します。
正常に実行されると5分ほどかかり、操作が完了します。
ステップごとに実行している様子はGCP上で確認できます。

https://d2mxuefqeaa7sj.cloudfront.net/s_8ACB1D17AA95783CAEC7C335CBA2166A66ED243120AE0FF2F7C766C37CADD651_1492145861146_+2017-04-14+13.56.03.png

Storageに結果が出力されていることも確認しておくと安心です。

パイプラインについて

サンプルが実行できたので、次にコンセプトを学んでゆきました。

公式ドキュメントが充実していますので、重要な所を拾い読みすると捗ります。
https://cloud.google.com/dataflow/model/pipelines

パイプラインはステップの有向グラフ と書かれており、分岐やループが出来ることが併せて述べられています。

変換の項目を見ると、パイプライン(中身はapache beam)は関数型言語で書くような感覚で記述すれば良いことがわかります。
実際サンプルで挙げられているコードも、以下のようにmapやflatMapなどをメソッドチェーン的に呼び出しており、見慣れた形になっています。

(p
 | beam.io.ReadFromText(my_options.input)
 | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
 | beam.Map(lambda x: (x, 1))
 | beam.combiners.Count.PerKey()
 | beam.io.WriteToText(my_options.output))

サンプルを実行するとまたしてもエラーが出ます。
NameError: name 'argv' is not defined
これは単純にサンプルコードでimportが抜けている問題なので、 以下のように修正します。

@@ -1,3 +1,4 @@
+import sys
 import re

 import apache_beam as beam
@@ -17,7 +18,7 @@ class MyOptions(PipelineOptions):
                         required=True,
                         help='Output file to write results to.')

-pipeline_options = PipelineOptions(argv)
+pipeline_options = PipelineOptions(sys.argv)
 my_options = pipeline_options.view_as(MyOptions)

実行してみると、前回と同様の結果が得られました。

実践

https://cloud.google.com/dataflow/pipelines/constructing-your-pipeline

パイプラインのコンセプトをほんのりと理解したので、その周辺の理解を深めるため、実際にデータ処理を行ってみます。
今回は簡単なデータ処理として、「Trump氏のツイート」を対象に「1週間ごとのつぶやき数」を集計、ただし「RTは除く」、という3つの条件でカウントをします。

ちなみに、以下が対象の「本物」なTrump氏のTwitterです。
https://twitter.com/realDonaldTrump

データとしてツイートの一覧が必要なので、tweepyでさっくりと取得しました。
全ツイートは制限により取れないため、直近の約3000ツイート程度です。

import tweepy
import pandas

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)

data = []

def process_status(status):
    data.append([status.created_at, status.text.replace('\n', ' ')])

page = 1
while True:
    print("fetch page {0}".format(page))
    if len(data) > 0:
        print("  top: {0}".format(data[-1]))
    statuses = api.user_timeline('@realDonaldTrump', count=200, page=page)
    if statuses:
        for status in statuses:
            process_status(status)
    else:
        # All done
        break
    page += 1  # next page

df = pandas.DataFrame(data, columns=['created_at', 'text'])
df.to_csv('trump.tsv', sep='\t', index=False, encoding='utf-8')

ローカルで実行

毎回リモートで実行するのは時間がかかるので、ローカルでパイプライン処理を実行させます。
RunnerにDirectRunner を指定するだけでローカル実行されます。
処理がすぐ返ってくるので「実行されてない?」と思ってしまいますが、バックグラウンドで動いています。
しばらくすると結果が同じように出力されます。

パイプラインの組み立て

目的の処理を実現するため、今回は以下の流れで処理をしてゆきます。

  1. TSVファイルの読み込み
  2. RTのツイートを除外
  3. 各ツイートのタイムスタンプを修正する
  4. ウィンドウで区切る
  5. 集計用にデータを変換する(ウィンドウ番号をkeyとして持つ)
  6. カウント(groupBy + count)
  7. 結果を出力する

では、パイプラインのステップを順番に見てゆきます。

TSVファイルの読み込み
これは前のサンプルでもあった通り、beam.io.ReadFromText で読み込めます。
| beam.io.ReadFromText(my_options.input)

RTのツイートを除外
これはフィルタリングするだけで事足りそうです。
ツイート内容にRT @ が入っていれば除外とします。
beam.Filter というプロセッサがあるので、これをありがたく利用します。
| beam.Filter(``**lambda** x: 'RT @' **not in** x)

各行のタイムスタンプを設定する
BigQueryやStorageなどからではない、テキストから読み込んだファイルなどは、全ての行が共通のタイムスタンプを持っています。
データとして持っているものを自動で認識してくれる訳ではありませんので、自分で設定してあげる必要があります。
https://cloud.google.com/dataflow/model/windowing#TimeStamping

JavaではParDoという変換が用意されていますが、PythonではさらにMapとFlatmapが用意されています。
https://cloud.google.com/dataflow/model/par-do

さて、Pythonのサンプルは提供されていませんが、Githubのサンプルを漁った限りでは以下のようにwindow.TimestampedValue を使って出力をすれば良さそうです。
https://github.com/apache/beam/blob/d2b8b2886ce42f138b634d90208780bdce7e058e/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py#L68

まずは、入力データが以下のような\t で区切られたTSV形式になっているので、日時と内容を分離します。
2017-04-13 19:21:07 It was a great honor to welcome Atlanta's heroic first responders to the White House this afternoon! https://t.co/ZtC14AJ0xs
次に、タイムスタンプが日付形式になっているので、Unixタイムスタンプに変換します。
結果をwindow.TimestampedValue で返してあげれば、DoFnの完成です。

class ExtractUserAndTimestampDoFn(beam.DoFn):
    def process(self, context):
        import time
        from datetime import datetime
        [str_timestamp, text] = context.split('\t')
        timestamp = datetime.strptime(str_timestamp, '%Y-%m-%d %H:%M:%S')
        unixtime = int(time.mktime(timestamp.timetuple()))
        yield window.TimestampedValue(text, unixtime)

importを関数内で行っている ことに注意です。
グローバルに読み込むと、各関数が別のマシンで実行された際にimportがされておらずエラーになってしまいます。
https://cloud.google.com/dataflow/faq#nameerror-

作ったクラスはParDoで適用すればOKです。

| beam.ParDo(ExtractUserAndTimestampDoFn())

ウィンドウで区切る
タイムスタンプを一定の区間で区切るものとして、ウィンドウという仕組みがあります。
https://cloud.google.com/dataflow/model/windowing
データ集計ではよく出てくる方法ですので、標準で用意されていて便利です。

ウィンドウで区切るには、WindowInto という関数を利用します。
これも公式ドキュメントには無いので、Githubのサンプルから漁ると見つかります。

from apache_beam.transforms import WindowInto
ONE_WEEK_IN_SECONDS = 60 * 60 * 24 * 7
| WindowInto(FixedWindows(size=ONE_WEEK_IN_SECONDS))

集計用にデータを変換する
ウィンドウで区切ったデータを、(window-number,1) という形に変換し、groupByが出来るようにします。
これは以下のようなクラスを作り、いつも通りにParDoで適用します。

class WithWindow(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
      yield (str(window), 1)


| beam.ParDo(WithWindow())

カウントと出力
これは標準で提供されているプロセッサを適用させれば完了です。

| beam.combiners.Count.PerKey()
| beam.io.WriteToText(my_options.output))

処理のまとめ
処理部分のソースは以下のようになりました。

class ExtractUserAndTimestampDoFn(beam.DoFn):
    def process(self, context):
        [str_timestamp, text] = context.split('\t')
        timestamp = datetime.strptime(str_timestamp, '%Y-%m-%d %H:%M:%S')
        unixtime = int(time.mktime(timestamp.timetuple()))
        yield window.TimestampedValue(text, unixtime)


class SessionsToStringsDoFn(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
    yield u"{0}: {1}".format(str(element).encode('utf-8'), str(window).encode('utf-8'))


class WithWindow(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
      yield (str(window), 1)

(p
 | beam.io.ReadFromText(my_options.input)
 | beam.Filter(lambda x: 'RT @' not in x)
 | beam.ParDo(ExtractUserAndTimestampDoFn())
 | WindowInto(FixedWindows(size=ONE_WEEK_IN_SECONDS))
 | beam.ParDo(WithWindow())
 | beam.combiners.Count.PerKey()
 | beam.io.WriteToText(my_options.output))

各ステップでやることがはっきりしているので、分かりやすいですね。

結果
実行してみると、以下のような出力が得られました。

('[1479945600.0, 1480550400.0)', 39)
('[1461801600.0, 1462406400.0)', 60)
('[1480550400.0, 1481155200.0)', 35)
('[1476316800.0, 1476921600.0)', 160)
('[1466640000.0, 1467244800.0)', 78)
...
('[1484179200.0, 1484784000.0)', 47)
('[1483574400.0, 1484179200.0)', 51)
('[1490832000.0, 1491436800.0)', 38)

もちろん、数の代わりに平均を出したり、ひと月あたりに変換したり、ということがすぐできます。

リモートで実行

ソースとなるツイートのデータはStorage上にある必要がありますので、gs://st-dataflow-research/tweet_trump.tsv というファイル名で保存したとします。

さて、このまま意気揚々とリモートで実行をするとコケてしまいます。
これは、例えばdatetimeなど、グローバルでimportしているパッケージをParDoなどで使用しているためです。
ParDo内で実行された関数は、並列に別のマシンで実行されるため、importしていない事になってしまいます。
https://cloud.google.com/dataflow/faq#nameerror-
これを防ぐため、--save_main_session オプションを利用します。

他は同じように、リモートで実行するよう指示を出します。

PROJECT=dataflow-research
BUCKET=gs://st-dataflow-research

python pipeline-tweet.py \
  --project $PROJECT \
  --job_name $PROJECT-tweet-trump \
  --runner DataflowRunner \
  --staging_location $BUCKET/staging \
  --temp_location $BUCKET/temp \
  --output $BUCKET/output \
  --save_main_session

しばらくすると、ローカルで実行したものと同じ結果が出力されました!

まとめ

Dataflowに入門し、軽いデータ処理が行えるようになりました。
並列性の高い処理が簡潔に記述して実行できるため、特に大規模なデータを扱う際には重宝しそうです。

また、軽い気持ちでPythonクライアントを使うと辛いことになるということがわかりました。
ドキュメントが充実していないのはしょうがないにしても、重要な機能であるストリーミング実行がJavaでしか対応していないなど、まだまだ未対応なので使う際は強い意思を持ってご利用ください。

なお、ドキュメントがおかしい時はフィードバックを送ってみましょう。
上記で問題になっていた箇所には既にフィードバックを送っているので、もしかしたら直っているかもしれません。