MapReduce on Python
Googleの分散処理技術であるMapReduceの解説記事をid:YuichiTanaka:20060923さん経由で知りました。
面白そうなので、このMapReduceをPythonで実装したらどうなるかやってみました。
やってることはid:u-no:20060318さんと同じで、
要は通信の代わりにメモリ使って、プロセスの代わりにスレッド使ってってこと。
です。つまりMapReduce簡単版ってところですかね。
でも、プログラミングモデルとしては参考になると思います。
ソースコード
import threading import urllib2 import utils class MapReduce: class Worker(threading.Thread): def __init__(self, func, args): threading.Thread.__init__(self) self.func = func self.args = args def run(self): self.result = self.func(*self.args) def __init__(self, map_func, reduce_func): self.map_func = map_func self.reduce_func = reduce_func def execute(self, seq): return reduce(self.reduce_func, self.rectify(map(self.mapper, seq)), {}) def mapper(self, *args): t = self.Worker(self.map_func, args) t.start() return t def rectify(self, seq): num_finished = 0 num_workers = len(seq) while num_finished != num_workers: for t in seq: if hasattr(t, 'result') and not hasattr(t, 'yielded'): yield t.result t.yielded = 1 num_finished += 1 time.sleep(0.1) return self.reduce_func(a, b.result) if __name__ == '__main__': def map_task(url): return utils.url2tfdict(url) def reduce_task(a, b): for key in b.iterkeys(): a[key] = key in a and a[key] + b[key] or b[key] return a import time start = time.time() mr = MapReduce(map_task, reduce_task) urls = ('http://www.python.jp', 'http://ja.wikipedia.org/wiki/Python', 'http://www.python.jp/Zope/intro', 'http://e-words.jp/w/Python.html') utils.printdict(mr.execute(urls)) print time.time() - start, 'sec'
実行結果
************************** 言語 ************** スクリプト ********* リスト ********* モジュール ******* アプリケーション ****** ライブラリ ***** 標準 ***** クラス **** フォーム **** ファイル **** サイト **** コンテンツ 0.589 sec
解説
やってることは、MapReduceの元の論文で言及されている「複数文書の語数カウント」の例と大体同じです。
要するに
- 処理の元となるデータ群 (urls)
- 今回の場合、「URLのリスト」
- (並列で実行される)処理内容 (map_task)
- 今回の場合、「URLで指定されたページを取得、HTMLタグを削除した後に語数カウントし、単語->出現回数 として 辞書オブジェクトにまとめる」
- 実行結果のまとめかた (reduce_task)
- 今回の場合、「処理結果の複数の辞書オブジェクトを、単一の辞書オブジェクトにまとめる」
を用意してやると、自動的に並列で(今回の場合Threadで)処理して結果を返してくれる、というわけです。並列処理をwrappingするためのパターンともいえるのかな。
クライアントコードは並列実行を意識せずにコーディングが可能になるので、シンプルに記述できているのがわかるかと思います。こんなに簡単に重い処理をスケールアウト出来るんだから素敵です。
追記
- PyroなんかでMapReduce.Workerをリモートのマシンで実行するようにすれば、分散処理が比較的手軽に可能になるかもしれません。
- Pythonのデコレータを使って、クライアントコードをもっとシンプルに記述できないかなぁ。
reduceの実行順序を「元データの並び順」から「処理が終わったものから順次適用」に変えればパフォーマンスが上がるだろうな。。- メソッドrectify(=整流)によって、このように動作するように修正しました。(10/2 12:46)