Tensorflow: Threading and Queues 和訳
QueueはTensorFlowを利用した非同期処理のための強力なメカニズムである。
TensorFlowのその他の機能と同じように、queueはTensorFlowのグラフのノードである。 これは変数のように状態をもつノードであり、他のノードがその状態を変更することができる。 特に、他のノードがキューにエンキューしたり、キューからデキューすることができる。
queueについて理解するために、簡単な例について考えてみる。 "first in, first out"なqueueを作り、そのキューを0で満たすとする。 そして、queueから1つ取り出し、取り出したものに1を加え、またqueueに追加する、という計算グラフを考える。 これは、queueの要素を少しずつ増やしていく。
Enqueue, Enqueue, Dequeueは特別なノードである。 これらは普通の値を受け取らず、変更したいqueueへのポインタを受け取る。 これらはqueueのメソッドのようなものであると考えると良い。 実際、Python APIではqueueオブジェクトのメソッドになっている。
注意 (q.enqueue(...)のような)queueのメソッドはqueueと同じデバイス上で実行しなければならない。
Queue usage overview
tf.FIFOQueueやtf.RandomShuffleQueueのようなqueueは、計算グラフ中で非同期にtensorを計算するのに重要なTensorFlowのオブジェクトである。
例えば、典型的な入力アーキテクチャは、RandomShuffleQueueをモデルを学習する入力を準備するのに利用する
- 複数のスレッドが学習データを準備し、それをqueueに追加する
- 学習を実行しているスレッドがqueueからミニバッチを取得する
このようなアーキテクチャは多くの利点がある。 詳細は Reading data | TensorFlow を参照のこと。
TensorFlowのSessionオブジェクトはマルチスレッドなので、複数のスレッドで同一のsessionを利用したり、処理を並列に実行したりすることが用意にできる。 しかし、上記のようなスレッドを実行するプログラムをpythonで実装するのは必ずしも簡単ではない。 すべてのスレッドは一緒に停止することができなければならないし、例外がキャッチできなければならない、そして、queueは停止する際に適切にcloseされなければならない。
TensorFlowはこれらを手助けするためにtf.train.Coordinatorとtf.train.QueueRunnerというクラスを提供している。 これらの2つのクラスは一緒に使うようにデザインされている。 Coordinatorクラスは複数のスレッドを一緒に停止したり、スレッドが停止するのを待っているプログラムに例外を渡すのを手助けする。 QueueRunnerクラスは1つのqueueにenqueueする複数のスレッドを作成するのに利用する。
Coordinator
Coordinatorクラスは複数のスレッドを一緒に停止させるのを手助けする。
主要なメソッドは下記
- tf.train.Coordinator.should_stop: スレッドが停止すべきなときTrueを返す
- tf.train.Coordinator.request_stop: スレッドが停止すべきであることを伝える
- tf.train.Coordinator.join: 指定したスレッドたちが停止するまで待つ
まずCoordinatorオブジェクトを作成し、coordinatorを利用するスレッドを複数作成する。 これらのスレッドは通常、should_stop()がtrueを返したら停止するようなループを実行する。
どのスレッドも計算を停止すべきかどうかを決めることができる。 request_stop()を呼び出すだけでよく、他のスレッドはshould_stop()がTrueを返すようになるので停止する
# Thread body: loop until the coordinator indicates a stop was requested. # If some condition becomes true, ask the coordinator to stop. def MyLoop(coord): while not coord.should_stop(): ...do something... if ...some condition...: coord.request_stop() # Main thread: create a coordinator. coord = tf.train.Coordinator() # Create 10 threads that run 'MyLoop()' threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)] # Start the threads and wait for all of them to stop. for t in threads: t.start() coord.join(threads)
明らかに、coordinatorは全く異なる処理を行うスレッドたちを管理することができる。 上記の例のように、すべてのスレッドが同じことをする必要はない。 coordinatorは例外の捕捉とレポートも可能。 詳細は tf.train.Coordinator | TensorFlow を参照のこと。
QueueRunner
QueueRunnerクラスはenqueueオペレーションを繰り返し実行する複数のスレッドを生成する。 これらのスレッドは停止するのにcoordinatorを利用できる。 さらに、queue runnerはcloser threadを実行できる。 これは、coordinatorに例外がレポートされるとqueueを自動的にcloseする。
上記のようなアーキテクチャを実装するのにqueue runnerが利用できる。
まず、(tf.RandomShuffleQueueなどの)TensorFlowのqueueを例えばinputに使うような計算グラフを作成する。 入力データを処理しqueueにenqueueするようなオペレーションを追加する。 queueからdequeueし学習を実行するようなオペレーションを追加する。
example = ...ops to create one example... # Create a queue, and an op that enqueues examples one at a time in the queue. queue = tf.RandomShuffleQueue(...) enqueue_op = queue.enqueue(example) # Create a training graph that starts by dequeuing a batch of examples. inputs = queue.dequeue_many(batch_size) train_op = ...use 'inputs' to build the training part of the graph...
Pythonのtraining programでは、いくつかのexampleをenqueueするスレッドを作成するQueueRunnerを作成している。 Coordinatorを作りqueue runnerにcoordinatorを利用してスレッドを開始するよう依頼している。
# Create a queue runner that will run 4 threads in parallel to enqueue # examples. qr = tf.train.QueueRunner(queue, [enqueue_op] * 4) # Launch the graph. sess = tf.Session() # Create a coordinator, launch the queue runner threads. coord = tf.train.Coordinator() enqueue_threads = qr.create_threads(sess, coord=coord, start=True) # Run the training loop, controlling termination with the coordinator. for step in xrange(1000000): if coord.should_stop(): break sess.run(train_op) # When done, ask the threads to stop. coord.request_stop() # And wait for them to actually do it. coord.join(enqueue_threads)
Handling exceptions
queue runnerによって開始されたスレッドは単にenqueueオペレーションを実行するだけではない。 これらのスレッドは、queueによって投げられた例外のキャッチとハンドリングができる。 tf.errors.OutOfRangeErrorの例外は、queueがcloseしたことを通知するのに利用される。
coordinatorを利用するtraining programはメインのループ中で例外のキャッチとレポートをする必要がある。
上記の例を改善した例が下記。
try: for step in xrange(1000000): if coord.should_stop(): break sess.run(train_op) except Exception, e: # Report exceptions to the coordinator. coord.request_stop(e) finally: # Terminate as usual. It is safe to call `coord.request_stop()` twice. coord.request_stop() coord.join(threads)