GPUを使って無線LANをクラックする話 / モニター、ロック、スケーラビリティ

弊社は最近、それまでの神谷町(東京タワーの近く)から、神田と秋葉原の間くらいに引っ越しをしました。とは言っても、去年の夏の終わりぐらいなので、だいぶ前なのですが。

オフィスの立地で重要なことって、何でしょう。駅からの距離?ビルの高さ?地名のブランド?歓楽街からの近さ?ランチ営業をやってるレストランの多さ?

みなさま、各々重視する点があると思うのですが、「一番近いコンビニが何か」は意外と見過ごされるところではないでしょうか。朝ちょっと寄ってコーヒーを買ったり、お弁当を買ったり。結構お世話になる所だと思います。

というわけで今月の写真は「セブンイレブンのカフェラテ・マシンの『顔』」です。いままでの所は一番近いコンビニが「セブン・イレブン」だったのが「ファミマ!」になってしまって、この眉毛のかわいい顔が見れなくちゃってさ。ちょっとさびしいよね。

…という世間話(?)をしたら、「へ、へぇ…(困惑)」って言われました。

コミュニケーション、難しいね!

前回までのあらすじ

前回分かってしまったことは、仕事を要求するCPUやGPUが増えれば増えるほど、パスワードの解析を依頼する役割を担うPythonのメイン・スレッドは、ロックの取り合いで時間を使い潰すようになってしまうということでした。ここを改善すれば、CPUやGPUがもっと活用できるかも。

このロックは、実際にはどのように使われているのでしょうか?ちょっとここで、Pyritで仕事が分配される様を眺めてみましょう。

Pyritでの「流れ作業」の様子を見学する

Pyritのベンチマークモードでは、メインのPythonスレッドで候補となるパスワードを生成した後に、それらをCPUやGPUに分配しています。で、並列に計算された結果は再度メインのスレッドでCPyritクラスが受け取る、という流れ作業になっています。その流れを見ていきましょう。

仕事の受付け

CPyrit.enqueueに渡されます。このメソッドは、パスワードを生成するPythonのメインスレッドから呼ばれます:

    def enqueue(self, essid, passwords, block=True):
        with self.cv:
            # ... 省略 ...
            passwordlist = list(passwords)
            if len(self.inqueue) > 0 and self.inqueue[-1][0] == essid:
                self.inqueue[-1][1][self.in_idx] = passwordlist
            else:
                self.inqueue.append((essid, {self.in_idx: passwordlist}))
            self.workunits.append(len(passwordlist))
            self.in_idx += len(passwordlist)
            self.cv.notifyAll()

self.cvはthreading.Conditionで、いわゆるモニターと呼ばれている排他制御のためのしくみ、です。「モニタ」って言葉は改めて聞くことはあんまり無い気がするんですが、Javaでおなじみのwait/notify/notifyAllのアレといえば通じる人もいるかもしれません。なにはともあれ、with self.cv: とすることで、このブロックの中のコードが実行されるスレッドが高々1つになるように制御しています。その後の処理は基本的には仕事をself.inqueueその他に適切にセットしているだけ、です。

適切に情報をセットしたら、self.cv.notifyAll() して他のスレッドにnotify(通知)します。具体的に何が起こるのかというと、with self.cv: の中でself.cv.wait() している他のスレッドが全て起こされます。じゃあwith self.cv: しているのは何箇所かというと、これを含めてなんと7箇所もあります。今回はそのうち、たくさん使われていそうなコードパスを眺めていきます。

仕事の受取り

inqueueに入れられた仕事は、CPUやGPUの各スレッドからCPyrit._gatherというメソッドを呼ぶことで受け取られます:

    def _gather(self, desired_size, block=True, timeout=None):
        t = time.time()
        with self.cv:
            passwords = []
            pwslices = []
            cur_essid = None
            restsize = desired_size
            while True:
                self._check_cores()
                for essid, pwdict in self.inqueue:
                   # ... passwordsを埋め、self.inqueueからそのぶん削除する処理 ...
                if len(passwords) > 0:
                    wu = (cur_essid, tuple(passwords))
                    try:
                        self.slices[wu].append(pwslices)
                    except KeyError:
                        self.slices[wu] = [pwslices]
                    self.cv.notifyAll()
                    return wu
                else:
                    if block:
                        if timeout is not None and time.time() - t > timeout:
                            return None, None
                    else:
                        return None, None
                    self.cv.wait(0.1)

self.inqueueからself.slicesに情報を移しつつ(self.slices[wu] = … )、WorkUnitである(ESSID, passwords)のタプルをCPUやGPUのために返します。この情報を使って、CPUはSSEを、GPUはCUDAやOpenCLを使って結果(results )を計算します。今回は情報のやり取りだけに注目するので、実際にどのように計算をするのかは省略する三分クッキング方式で次に進みましょう。

計算した結果を通知するために使われるのが次の_scatter メソッドです:

結果の受付け

CPUやGPUの各スレッドで計算された結果は、CPyrit._scatterメソッドを使って集められます:

    def _scatter(self, essid, passwords, results):
        assert len(results) == len(passwords)
        with self.cv:
            wu = (essid, passwords)
            slices = self.slices[wu].pop(0)
            if len(self.slices[wu]) == 0:
                del self.slices[wu]
            ptr = 0
            for idx, length in slices:
                self.outqueue[idx] = list(results[ptr:ptr + length])
                ptr += length
            for idx in sorted(self.outqueue.iterkeys(), reverse=True)[1:]:
                res = self.outqueue[idx]
                o_idx = idx + len(res)
                if o_idx in self.outqueue:
                    res.extend(self.outqueue[o_idx])
                    del self.outqueue[o_idx]
            self.cv.notifyAll()

self.slicesの情報を消費・削除しつつ、self.outqueue に結果を載せています。難しいところはないですね。

結果の受取り

最後に、Pythonのメインスレッドが、CPyrit.dequeueメソッドを使ってCPUやGPUの各スレッドがパスワードを受取ります:

    def dequeue(self, block=True, timeout=None):
        t = time.time()
        with self.cv:
            if len(self.workunits) == 0:
                return None
            while True:
                wu_length = self.workunits[0]
                if self.out_idx not in self.outqueue \
                 or len(self.outqueue[self.out_idx]) < wu_length:
                    self._check_cores()
            # ... 十分な数の結果が得られない時に待つ処理 ...
                else:
                    reslist = self.outqueue[self.out_idx]
                    del self.outqueue[self.out_idx]
                    results = reslist[:wu_length]
                    self.out_idx += wu_length
                    self.outqueue[self.out_idx] = reslist[wu_length:]
                    self.workunits.pop(0)
                    self.cv.notifyAll()
                    return tuple(results)

self.outqueue から結果を適切に取り出してself.outqueue から削除して、結果をreturnするだけです。

ここから更にCrackerがクラックに成功したかの判定をするわけですが、benchmarkではその処理は省略されているので、今回もここで止めておきます。

ここまでの観察

self.inqueue とself.outqueue の2本のキューがあることがわかりました。inqueueはパスワードを、outqueueにはパスワードから計算した結果をやり取りするために使われています。

しかし、それらを保護するためのthreading.Conditionは2つ…ではなく、1つのself.cv だけです。つまり、実際にはself.inqueueしかいじっていなくても、self.outqueueのための保護も同時に行っていることになります。…なんだか、無駄な感じがしませんか?それぞれのために2つthreading.Conditionを作って管理できないんでしょうか?

しかしながら、これは言うのは簡単なんですが、実際にやるのは難しそうです。まず、self.inqueueとself.outqueueは実は完全に独立しているわけではありません。_scatterや_gatherでいじられるself.slicesを介して微妙に両者は繋がっています。なので、実はCondition変数を2つそのまま使うだけでは正しくモニターを分割できません。self.slices用の排他制御も作りこめば分割できます。が、そういう事をするとデッドロックが起きがちなので、正直避けたい。

Pythonはシングルスレッドでしか動かないので、それぞれのQueueのためにthreading.Conditionを2つにわけられたとしても、Javaの時と違って2スレッドが並列で動くようになったりはしません。が、同じConditionを取り合うことはなくなるので、性能の向上が見込まれる…かもしれません。ロックは一般にスケーラブルではない(待機するスレッドの数が増えれば性能が悪化する)からです。

今回はプロファイリングの結果から言ってあまり実益は無さそうなのですが、もしConditionが2つに分割できればself.cv.wait() が無駄に起こされる回数が減るのは間違いない、ということはコメントさせてください。上のself.cv.wait() をしているコードを丹念に眺めてもらうと分かるのですが、無駄に起こされる事に備えて、特定の条件が満たされているかチェックして、条件が満たされてなかったら「やっぱ駄目じゃん!」と叫びながらもう一度眠りにつくようになっています(これ自体は一般的な使い方です)。さらに余談ながら、この「無駄に起こされるスレッド」が存在するので、self.cv.notify() ではなく、self.cv.notifyAll() を使わねばなりません。もしself.cv.notify()を使ってしまったら、運悪く無関係なスレッドだけが起きて、もう一度self.cv.wait() で眠りにつき、その後は何も起こらなくなり、永遠に応答しなくなってしまう可能性があります。

待機するのは最大何スレッド?

ついでに、with self.cv:self.cv.wait()で待つことになるスレッドの数を概算しましょう。CPUは48スレッド、GPUは4スレッドと仮定します。

  • enqueue:たかだか1スレッド(Pythonのメインスレッド)
  • _gather:最大52スレッド(CPU48コア + GPU4コア)
  • _scatter:最大52スレッド(CPU48コア + GPU4コア)
  • dequeue:たかだか1スレッド(Pythonのメインスレッド)

各スレッドが同時に2つのメソッドで待機することはないので、全体でもself.cvを取り合うのは最大53スレッドと思って間違いないと思います。

ふーむ。ここまで観察したので、次はwith self.cv:self.cv.wait()で待つ時、なにが行われているのか観察しましょう。このthreading.Conditionのソースは、Python本体にあるのでそちらを参照していきます。

threading.Conditionの中を眺める

with self.cv:って具体的に何をしているの?

この↑文字列を書いた時、実際にはself.cv.__enter__という関数が呼ばれます(これはPythonでのお約束です)。で、実装はこんな感じです

    def __enter__(self):
        return self._lock.__enter__()

self._lockはRLock(再入可能ロック)で、この関数はさらにC言語で実装されておりまして

static PyMethodDef rlock_methods[] = {
    {"acquire",      (PyCFunction)(void(*)(void))rlock_acquire,
     METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
//....
    {"__enter__",    (PyCFunction)(void(*)(void))rlock_acquire,
     METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
// ...
};

__enter__はacquireと同じ関数で、rlock_acquireです。

その先の実装は結構複雑そうですので一端置いておきます。pthreadとか適当にwrapしてるだけかと思ったけどそんなことは無い。

self.cv.wait()は実際には何を待っているのか?

with self.cv:の中では必ずself.cv.wait()を呼ぶことで他のスレッドが処理するのを待っています。この処理は何をしているのかというと

    def wait(self, timeout=None):
        # ... 前処理 ...
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass

新しくlockを作り、このロックに対して二度acquireをしています。このロックは再入可能ではないので、この2度めのacquireでスレッドは必ず中断されて、ほかのスレッドが代わりに起こされます。で、その起こされた別のスレッドがnotifyした時にロックがreleaseされて、二度目のロック以降の中断されたプログラムが実行される、という仕組みになっている、と。

notify/notifyAllするときにはロックに対するacquireは行われません。releaseだけ行います。前回ロックのacquireで時間を使い切っていることがわかりましたが、つまり、時間を食ってるのはこのwith self.cv:の中のacquireか、self.cv.wait()の中のacquireのどちらか、ということになります。

ロックに掛かる時間の内訳を見る

条件 lockのacquireをした回数(掛かった時間) with self.cv:した回数(掛かった時間) self.cv.wait()を呼び出した回数(掛かった時間)
1cpu/0gpu 1963回(0.514秒) 204回(0.002秒) 51回(69.438秒)
48cpu/0gpu 7355回(3.202秒) 1781回(1.988秒) 756回(66.133秒)
0cpu/4gpu 5689回(7.122秒) 5645回(6.685秒) 8回(0.762回)
48cpu/4gpu 5113回(44.287秒) 4676回(43.910秒) 309回(3.161 秒)

回数がいまいち釣り合わない気がするんですが(0cpu/4gpuのself.cv.wait()の呼び出しがたった「8回」って本当かいな??)、仕事をとりあえば取り合うようになるほどwith self.cv: を実行する回数が増え、実行時間も掛かるようになっていくことが読み取れます。で、このwith self.cv:は、新しく生成したロックに対して獲得(acquire)しようとするwait()の時と違って、1つのself.cv._lock を獲得しようとしています。

これらの観察結果から、次の仮定を立てます:「with self.cv: したときに起こるself.cv._lock の取り合いがボトルネックの1つである」。

ここ、なんとかならないんでしょうか。次回実験してみましょう。

余談ですが、CPUだけで実行している時、どちらもself.cv.waitに60秒以上掛かっていますが、これらは実際には概ねthread.Sleepの実行に費やされています(つまりスレッドがやることなくて完全に遊んでいる)。

今月のまとめ

  • Pyritのパスワード候補と結果のリスト(inqueue/outqueue)は、threading.Condition(モニター)を使って排他制御されている
  • モニターは複数のロックを使って制御されている:
    • モニター自体のロック(cv._lock)
    • 待機しているスレッドを起こすためのロック(cv.wait()の中で生成されるwaiterという名前のロック)
  • プロファイリングの結果、モニター自体のロックを獲得しようとするスレッドが増えた結果として時間を使い潰していそう
    • このモニターでの排他制御を改善したら全体の性能も改善してくれないかなぁ

明日から東京では花粉が飛散しはじめるそうです。花粉症の方も、まだそうでない方も、みなさまお気をつけて!

投稿者: 平藤 燎

実在する架空のプログラマー