MapReduce on Python

Googleの分散処理技術であるMapReduceの解説記事id:YuichiTanaka:20060923さん経由で知りました。

面白そうなので、このMapReducePythonで実装したらどうなるかやってみました。

やってることはid:u-no:20060318さんと同じで、

要は通信の代わりにメモリ使って、プロセスの代わりにスレッド使ってってこと。

です。つまりMapReduce簡単版ってところですかね。
でも、プログラミングモデルとしては参考になると思います。

ソースコード

import threading
import urllib2
import utils

class MapReduce:
	class Worker(threading.Thread):
		def __init__(self, func, args):
			threading.Thread.__init__(self)
			self.func = func
			self.args = args

		def run(self):
			self.result = self.func(*self.args)
		
	def __init__(self, map_func, reduce_func):
		self.map_func = map_func
		self.reduce_func = reduce_func
		
	def execute(self, seq):
		return reduce(self.reduce_func, self.rectify(map(self.mapper, seq)), {})

	def mapper(self, *args):
		t = self.Worker(self.map_func, args)
		t.start()
		return t

	def rectify(self, seq):
		num_finished = 0
		num_workers = len(seq)
		while num_finished != num_workers:
			for t in seq:
				if hasattr(t, 'result') and not hasattr(t, 'yielded'):
					yield t.result
					t.yielded = 1
					num_finished += 1
			time.sleep(0.1)
		return self.reduce_func(a, b.result)


if __name__ == '__main__':
	def map_task(url):
		return utils.url2tfdict(url)

	def reduce_task(a, b):
		for key in b.iterkeys():
			a[key] = key in a and a[key] + b[key] or b[key]
		return a

	import time
	start = time.time()
	mr = MapReduce(map_task, reduce_task)
 	urls = ('http://www.python.jp',
		'http://ja.wikipedia.org/wiki/Python',
		'http://www.python.jp/Zope/intro',
		'http://e-words.jp/w/Python.html')
 	utils.printdict(mr.execute(urls))
	print time.time() - start, 'sec'

実行結果

************************** 言語
************** スクリプト
********* リスト
********* モジュール
******* アプリケーション
****** ライブラリ
***** 標準
***** クラス
**** フォーム
**** ファイル
**** サイト
**** コンテンツ
0.589 sec

解説

やってることは、MapReduceの元の論文で言及されている「複数文書の語数カウント」の例と大体同じです。
要するに

処理の元となるデータ群 (urls)
今回の場合、「URLのリスト」
(並列で実行される)処理内容 (map_task)
今回の場合、「URLで指定されたページを取得、HTMLタグを削除した後に語数カウントし、単語->出現回数 として 辞書オブジェクトにまとめる」
実行結果のまとめかた (reduce_task)
今回の場合、「処理結果の複数の辞書オブジェクトを、単一の辞書オブジェクトにまとめる」

を用意してやると、自動的に並列で(今回の場合Threadで)処理して結果を返してくれる、というわけです。並列処理をwrappingするためのパターンともいえるのかな。

クライアントコードは並列実行を意識せずにコーディングが可能になるので、シンプルに記述できているのがわかるかと思います。こんなに簡単に重い処理をスケールアウト出来るんだから素敵です。