Python でスレッド

Python のスレッドについてちょっと調べてみた. Javaだとselectがない(new IOを使うとそれっぽくは書けるが, すごく面倒) ので, スレッドがないと通信系のプログラムが書けないけど, Pythonにはちゃんとselectがあるので, これまであまり使ってこなかった. でも, やっぱりちゃんと勉強しておこうということで.

Pythonにはスレッド用のモジュールとして, thread と threading の二つがある. 前者は非常に基本的なスレッドとロックオブジェクトを提供していて, 後者は前者を用いて構築された, より抽象度の高いスレッドライブラリ, という位置づけらしい. プログラマは基本的に後者だけ見てればいいのだろう. threadingが提供するオブジェクトは, 次の7つ.

  • Thread:スレッド
  • Lock:排他ロック
  • RLock: リエントラントな排他ロック
  • Condition: 状態変数
  • Semaphore: セマフォ
  • Event: 簡単なスレッド間同期機構
  • Timer: タイマー

Thread

スレッドそのもの. 適当な関数を指定してスレッドの中で実行させる. Javaと違って, Runnable クラスみたいなものを間に挟まなくてすむ.

def hello(arg):
    print arg

threading.Thread(target=hello, args=("world",)).start()

LockとRLock

LockとRLockは両方とも排他ロックなんだけど, 後者は同一スレッドからは何度でもロックできる.

>>> import threading
>>> l = threading.Lock()
>>> l.acquire(0)
True
>>> l.acquire(0)
False
>>> r = threading.RLock()
>>> r.acquire(0)
True
>>> r.acquire(0)
1

Lockだと2回目のacquireが失敗しているが, RLockでは大丈夫.
なぜかRLockに対するacquireの返り値が, 1回目はTrueで2回目以降はFalseになっているが. . . なんで?

実際, リエントラントでないロックなんて使いにくくてしょうがないので, Lockなんて使う機会はないだろう.

Condition

いわゆる状態変数. Javaではすべてのオブジェクトにデフォルトでこれがくっついているようなものなので, Javaプログラマに取っては非常に使いやすい. これだけあれば困らないだろう.

Semaphore

かのダイクストラ先生によって提案された同期機構. ちょっと賢い排他ロックで, 同時に排他領域に入れるスレッドの数を任意に設定できる. 教科書には必ず出てくるけど, 使い道がいまいち分からない. Javaには直接これに相当する機能はないけど, 欲しいと思ったことは無い.

>>> s = threading.Semaphore(3)
>>> s.acquire(False)
True
>>> s.acquire(False)
True
>>> s.acquire(False)
True
>>> s.acquire(False)
False

3回はacquireできるけど, 4回目で失敗する, と.

Event

非常に単純なスレッド間同期機構. あるスレッドがset()するのを別のスレッドでwait()することができる. 状態変数があれば簡単に実装できるが, シンプルでいいかもしれない.

Timer

一定時間後に特定の関数を一度だけ実行してくれる. Javajava.util.Timer と違って, 繰り返し実行はサポートしていないし, 素直にThreadと1対1対応で実装されているので, あまり使うメリットはないかも.

with statementとスレッディング

プリミティブなスレッド機構を使った場合によくあるバグは, aquireした排他オブジェクトのreleaseし忘れ. Javaの場合, synchronizedブロックで確保した排他オブジェクトは, ブロックを抜けるときに自動的に解放されるのでまったく気にしなくて済むのだが.

そこで, Python 2.6で導入されるのが with statement. これを使うと, aquireとrelease を明示的に書かなくて済むようになるし, 自動的に解放もしてくれる.

cv.acquire()
...
...
cv.release()

と書く代わりに,

with cv:
  ...

とだけ書けばよい. 例外やreturnで抜けてしまう場合にも必ず解放してくれる.

排他キューを書いてみる.

よくあるサンプルとして排他キューを書いてみよう. putで書き込み, getで読み出す. getしたときに, キューに何も入っていないとブロックする.

class queue:
    def __init__(self):
        self.cv = threading.Condition()
        self.queue = []

    def put(self, item):
        with self.cv:
            self.queue.append(item)
            self.cv.notifyAll()

    def get(self):
        with self.cv:
            while not len(self.queue) > 0:
                self.cv.wait()
            return self.queue.pop(0)

Conditionをwith文で使っている.

比較のために, Javaでも書いてみた.

class Queue<T> {
  LinkedList<T> list = new LinkedList<T>();

  synchronized void put(T v) {
    list.addLast(v);
    notifyAll();
  }

  synchronized T get() {		
    while (! (list.size() > 0)) {
      try {
        wait();
      } catch (InterruptedException e) {}
    }
    return list.removeFirst();
  }
}

こうして見ると, wait, notifyAllなどのメソッド名も含めて, ほとんど同じであることがわかる.

まとめ

threadingパッケージにはいろいろとクラスがあって惑わされがちだが, Javaになれている人は Condition だけ使っていれば, ほとんどJavaと同じように書ける.

問題は, 各ライブラリクラスのスレッドセーフティだ. なにしろ print 文すらちょっとあやしいくらいで, あるスレッドで

print a, b

と書いたときに, a と bの間に, 別のスレッドの出力が割り込んだりすることもあった. print って普通の関数と違って, 引数を全部評価してから呼ばれる訳ですらないようなので, 引き合いに出してもしょうがないのかもしれないけど, 予想外の動作だ.

マニュアルをみても, スレッドセーフティに関する記述があまり無いような. 当面, セーフでないことを前提に書くしかないか.

2008/1/8 追記

スレッドの実行を開始するメソッドを間違ってrun()としていましたが,正しくはstart()でした.修正しました.