pythonのheapqが強力すぎる

お題は、10x10の二次元迷路の経路探索なんですけど、これにheapqを使ったらやたら早かった、というお話です。
使うアルゴリズムは、スタート地点(1, 1)から移動可能な経路のリストをバッファに積んでいって、最初にゴール(10,10)に到着した経路を出力するだけのものです。まずは、迷路なんだけど全然迷路じゃない地図を探索するという、非常に意地悪な問題の実行結果をごらんください。

solve_with_stack(data)
SSSSSSSSSENNNNNNNNNESSSSSSSSSENNNNNNNNNESSSSSSSSSENNNNNNNNNESSSSSSSSSENNNNNNNNNESSSSSSSSSE
         1524 function calls in 0.002 seconds
(中略)
solve_with_deque_fifo(data)
EEEEEEEEESSSSSSSSS
         31796002 function calls in 54.238 seconds
(中略)
solve_with_heapq(data)
EEEEEEEEESSSSSSSSS
         348 function calls in 0.000 seconds

S(NEW)は南(北東西)に行くという意味です。
スタック(LIFO)を使うと、実行時間はまぁまぁですが、経路が無駄に長い答えを見つけてきます。キュー(FIFO)を使うと、最短経路を先に見つけてくれるけど、実行時間が滅茶苦茶長くなる。heapqを使うと、関数呼出の回数が飛躍的に少なくなるのに、最短経路を先に見つけてくれます。優れた待ち行列だといえると思います。

ところで、heapqって何?

スタックやキューが要素を追加された順番で保持するのに対して、heapqは要素をソートして保持します。heapqでpopして出てくる要素は、heapqの要素の中で最小の物であることが保証されています。

どうやって使うの?

heapqは要素をソートする為のキーが必要になります。追加する要素がそのままでソート可能であれば問題はありませんが、そうでなければ要素をpushしたりpopする代わりに、(key, 要素)のタプルをpushしたりpopしたりします。それと、stackやdequeがオブジェクト指向風のインターフェイスなのに対して、heapqはC言語みたいなインターフェイスです。

#stack
stack = []                          # スタックの作成
stack.append(x)                     # 要素の追加
y = stack.pop()                     # 要素の取り出し

#heapq
import heapq                        # heapq のインポート
hq = []                             # heapqの作成
heapq.heappush(hq, (key(x), x))     # 要素の追加
_, y = heapq.heappop(hq)            # 要素の取り出し keyは要らないので捨てる

今回のソース

経路をソートするキーとして、経路の末尾からゴールまでの距離(道程)を与えてみました。これで、ゴールに近い経路を優先して探索してくれるようになりました。

from collections import deque
import heapq


def solve_with_stack(maze_map):
    buf = [[(1, 1)]]
    while buf:
        route = buf.pop()
        for x in get_adjoin_pathes(route[-1], maze_map):
            if x == (10, 10):
                print(get_answer(route + [x]))
                return
            elif x not in route:
                buf.append(route + [x])

    print('bad maze; no way to goal')


def solve_with_deque_fifo(maze_map):
    buf = deque([[(1, 1)]])
    while buf:
        route = buf.popleft()
        for x in get_adjoin_pathes(route[-1], maze_map):
            if x == (10, 10):
                print(get_answer(route + [x]))
                return
            elif x not in route:
                buf.append(route + [x])

    print('bad maze; no way to goal')


def solve_with_heapq(maze_map):
    dist = lambda x: 20 - x[0] - x[1]
    buf = []
    heapq.heappush(buf, (dist((1, 1)), [(1, 1), ]))
    while buf:
        _, route = heapq.heappop(buf)
        for x in get_adjoin_pathes(route[-1], maze_map):
            if x == (10, 10):
                print(get_answer(route + [x]))
                return
            elif x not in route:
                heapq.heappush(buf, (dist(x), route + [x]))

    print('bad maze; no way to goal')


snew = {(0, 1): 'E', (1, 0): 'S', (0, -1): 'W', (-1, 0): 'N'}


def get_answer(route):
    _diff = lambda a, b: (b[0] - a[0], b[1] - a[1])
    _move = lambda x: _diff(route[x], route[x + 1])
    genexp = (_move(i) for i in range(len(route) - 1))
    return ''.join(snew[x] for x in genexp)


def get_adjoin_pathes(last, maze_map):
    _in_pathes = lambda x: maze_map[x[0]][x[1]] == 0
    _add_to_last = lambda x: (x[0] + last[0], x[1] + last[1])
    return filter(_in_pathes, map(_add_to_last, snew.keys()))


if __name__ == '__main__':
# 0が通路 1が壁 (1, 1)がスタート (10, 10)がゴール
    data = [
        [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]

    import cProfile
    test1 = 'solve_with_stack(data)'
    test2 = 'solve_with_deque_fifo(data)'
    test3 = 'solve_with_heapq(data)'
    test_list = [test1, test2, test3]

    for x in test_list:
        print(x)
        cProfile.run(x)

ところで、pythonでlambdaを多用すると、プロファイルがちょっと見にくいですね。