Tensorflow: Threading and Queues 和訳

QueueはTensorFlowを利用した非同期処理のための強力なメカニズムである。

TensorFlowのその他の機能と同じように、queueはTensorFlowのグラフのノードである。 これは変数のように状態をもつノードであり、他のノードがその状態を変更することができる。 特に、他のノードがキューにエンキューしたり、キューからデキューすることができる。

queueについて理解するために、簡単な例について考えてみる。 "first in, first out"なqueueを作り、そのキューを0で満たすとする。 そして、queueから1つ取り出し、取り出したものに1を加え、またqueueに追加する、という計算グラフを考える。 これは、queueの要素を少しずつ増やしていく。

https://www.tensorflow.org/versions/r1.1/images/IncremeterFifoQueue.gif

Enqueue, Enqueue, Dequeueは特別なノードである。 これらは普通の値を受け取らず、変更したいqueueへのポインタを受け取る。 これらはqueueのメソッドのようなものであると考えると良い。 実際、Python APIではqueueオブジェクトのメソッドになっている。

注意 (q.enqueue(...)のような)queueのメソッドはqueueと同じデバイス上で実行しなければならない。

Queue usage overview

tf.FIFOQueueやtf.RandomShuffleQueueのようなqueueは、計算グラフ中で非同期にtensorを計算するのに重要なTensorFlowのオブジェクトである。

例えば、典型的な入力アーキテクチャは、RandomShuffleQueueをモデルを学習する入力を準備するのに利用する

  • 複数のスレッドが学習データを準備し、それをqueueに追加する
  • 学習を実行しているスレッドがqueueからミニバッチを取得する

このようなアーキテクチャは多くの利点がある。 詳細は Reading data  |  TensorFlow を参照のこと。

TensorFlowのSessionオブジェクトはマルチスレッドなので、複数のスレッドで同一のsessionを利用したり、処理を並列に実行したりすることが用意にできる。 しかし、上記のようなスレッドを実行するプログラムをpythonで実装するのは必ずしも簡単ではない。 すべてのスレッドは一緒に停止することができなければならないし、例外がキャッチできなければならない、そして、queueは停止する際に適切にcloseされなければならない。

TensorFlowはこれらを手助けするためにtf.train.Coordinatorとtf.train.QueueRunnerというクラスを提供している。 これらの2つのクラスは一緒に使うようにデザインされている。 Coordinatorクラスは複数のスレッドを一緒に停止したり、スレッドが停止するのを待っているプログラムに例外を渡すのを手助けする。 QueueRunnerクラスは1つのqueueにenqueueする複数のスレッドを作成するのに利用する。

Coordinator

Coordinatorクラスは複数のスレッドを一緒に停止させるのを手助けする。

主要なメソッドは下記

  • tf.train.Coordinator.should_stop: スレッドが停止すべきなときTrueを返す
  • tf.train.Coordinator.request_stop: スレッドが停止すべきであることを伝える
  • tf.train.Coordinator.join: 指定したスレッドたちが停止するまで待つ

まずCoordinatorオブジェクトを作成し、coordinatorを利用するスレッドを複数作成する。 これらのスレッドは通常、should_stop()がtrueを返したら停止するようなループを実行する。

どのスレッドも計算を停止すべきかどうかを決めることができる。 request_stop()を呼び出すだけでよく、他のスレッドはshould_stop()がTrueを返すようになるので停止する

# Thread body: loop until the coordinator indicates a stop was requested.
# If some condition becomes true, ask the coordinator to stop.
def MyLoop(coord):
  while not coord.should_stop():
    ...do something...
    if ...some condition...:
      coord.request_stop()

# Main thread: create a coordinator.
coord = tf.train.Coordinator()

# Create 10 threads that run 'MyLoop()'
threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]

# Start the threads and wait for all of them to stop.
for t in threads:
  t.start()
coord.join(threads)

明らかに、coordinatorは全く異なる処理を行うスレッドたちを管理することができる。 上記の例のように、すべてのスレッドが同じことをする必要はない。 coordinatorは例外の捕捉とレポートも可能。 詳細は tf.train.Coordinator  |  TensorFlow を参照のこと。

QueueRunner

QueueRunnerクラスはenqueueオペレーションを繰り返し実行する複数のスレッドを生成する。 これらのスレッドは停止するのにcoordinatorを利用できる。 さらに、queue runnerはcloser threadを実行できる。 これは、coordinatorに例外がレポートされるとqueueを自動的にcloseする。

上記のようなアーキテクチャを実装するのにqueue runnerが利用できる。

まず、(tf.RandomShuffleQueueなどの)TensorFlowのqueueを例えばinputに使うような計算グラフを作成する。 入力データを処理しqueueにenqueueするようなオペレーションを追加する。 queueからdequeueし学習を実行するようなオペレーションを追加する。

example = ...ops to create one example...
# Create a queue, and an op that enqueues examples one at a time in the queue.
queue = tf.RandomShuffleQueue(...)
enqueue_op = queue.enqueue(example)
# Create a training graph that starts by dequeuing a batch of examples.
inputs = queue.dequeue_many(batch_size)
train_op = ...use 'inputs' to build the training part of the graph...

Pythonのtraining programでは、いくつかのexampleをenqueueするスレッドを作成するQueueRunnerを作成している。 Coordinatorを作りqueue runnerにcoordinatorを利用してスレッドを開始するよう依頼している。

# Create a queue runner that will run 4 threads in parallel to enqueue
# examples.
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)

# Launch the graph.
sess = tf.Session()
# Create a coordinator, launch the queue runner threads.
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
# Run the training loop, controlling termination with the coordinator.
for step in xrange(1000000):
    if coord.should_stop():
        break
    sess.run(train_op)
# When done, ask the threads to stop.
coord.request_stop()
# And wait for them to actually do it.
coord.join(enqueue_threads)

Handling exceptions

queue runnerによって開始されたスレッドは単にenqueueオペレーションを実行するだけではない。 これらのスレッドは、queueによって投げられた例外のキャッチとハンドリングができる。 tf.errors.OutOfRangeErrorの例外は、queueがcloseしたことを通知するのに利用される。

coordinatorを利用するtraining programはメインのループ中で例外のキャッチとレポートをする必要がある。

上記の例を改善した例が下記。

try:
    for step in xrange(1000000):
        if coord.should_stop():
            break
        sess.run(train_op)
except Exception, e:
    # Report exceptions to the coordinator.
    coord.request_stop(e)
finally:
    # Terminate as usual. It is safe to call `coord.request_stop()` twice.
    coord.request_stop()
    coord.join(threads)

rnnlmを作ってみる その2 誤差について

入力された単語に対して何が出力されるのか、ということは前回やりました。

今回は、rnnlmの学習でどのような誤差を評価するかについてです。

早速ですが、tensorflowのチュートリアル Recurrent Neural Networks  |  TensorFlow では下記のように書かれています

We want to minimize the average negative log probability of the target words:

{ \displaystyle   loss = - \frac{1}{N} \sum^{N}_{i = 1} \ln p_{target_i} }

N は文(書)中の単語の数、 { \displaystyle  p_{target_i} } は i番目の出力( これは語彙がM個だったらM個の単語それぞれが次に出現する確率となっている) のうちの正解の単語に対応する要素となります

学習データが、"今日 は 良い 天気 ですね" で、学習が "今日 は 良い" まで進んだとき、rnnlmの出力としては各単語の出現確率になるので、語彙が [今日, 昨日, は, 良い, 天気, ですね, カレーライス] だけであれば、これと同じ長さのベクトル(的なもの)が出力されるはずです。 これが [0.1, 0.1, 0.1, 0.1, 0.4, 0.1, 0.1] だとします。 学習データで "今日 は 良い" のあとに続くのは"天気" なので { \displaystyle  p_{target_i} } は この例だと0.4 となる、みたいな感じです。

チュートリアル本文ではこの計算は sequence_loss_by_example でできる、と書いてますがdeprecatedだったので少し前に tf.contrib.seq2seq.sequence_loss を利用するように書き換えられたようです。

チュートリアルのサンプルコードでこの誤差を計算しているのが下記です。

models/ptb_word_lm.py at master · tensorflow/models · GitHub

    # Use the contrib sequence loss and average over the batches
    loss = tf.contrib.seq2seq.sequence_loss(
        logits,
        input_.targets,
        tf.ones([self.batch_size, self.num_steps], dtype=data_type()),
        average_across_timesteps=False,
        average_across_batch=True)

これで、理論的なところについてはだいたい準備できた感じだと思います。

次回から実装を進めていきます。

rnnlmを作ってみる その1 定義とか

深層学習による自然言語処理 の本によれば、

"言語モデル(Language model: LM)あるいは確率的言語モデル (probabilistic language model)とは、人間が扱う自然言語で書かれた文や文書が生成される確率をモデル化したものです"

となっています。

例えば、"今日はいい天気ですね"という文はよくある表現ですが、"今日はいい確率モデルですね"とは文脈によっては言うことは無いとは言えませんが最初の例よりは生成されにくいでしょうし、"カレーライスがいい確率モデルですね"などはわけがわからないですね。

このように、"文の自然さ"とか"文がよくありそうか"みたいなものはそれぞれ異なるのでそういったものを"文や文書が生成される確率"として表すのが言語モデルなんだと思います。

この場合、 P(今日はいい天気ですね) > P(今日はいい確率モデルですね) > P(レーライスがいい確率モデルですね) みたいな感じになるのだと思います。

{ \displaystyle {\bf y_t} } を単語とし、 { \displaystyle {\bf Y = \left( y_0, y_1, y_2, ... , y_T, y_{T+1} \right) } } が文を表すものとします。

{ \displaystyle {\bf y_t} } はone-hot ベクトルで表されているとします。

また、{ \displaystyle {\bf y_0} } { \displaystyle {\bf y_{T+1}} } はそれぞれ文頭と文末を表す擬似的な単語(BOSとEOS)とします。

再帰ニューラルネットワークを利用した言語モデル(recurrent neural network language model; rnnlm)は下記の様に表せます。

{ \displaystyle P_{rnnlm} ({\bf Y}) = \prod_{t=1}^{T+1} P(  {\bf y}_t | {\bf Y}_{\lbrack 0, t-1 \rbrack }) }

ここで { \displaystyle {\bf Y}_{\lbrack 0, t-1 \rbrack }   }{ \displaystyle {\bf Y = \left( y_0, y_1, y_2, ... , y_{t-1} \right) } } を表すものとします。

{ \displaystyle  P(  {\bf y}_t | {\bf Y}_{\lbrack 0, t-1 \rbrack }) } の計算は下記で行うとのことです。 (1層、活性化関数はtanhとした場合)

埋め込みベクトルの取得

{ \displaystyle \overline{{\bf y}}_t = {\bf E y}_{t-1} }

隠れ層の計算

{ \displaystyle {\bf h}_t = \tanh \left( {\bf W}^{(l)}  \begin{bmatrix}   \overline{{\bf y}}_t  \\  {\bf h}_{t-1}  \end{bmatrix}  + {\bf b}^{(l)}  \right)       }

出力層の計算

{ \displaystyle {\bf o}_t = {\bf W}^{(o)} {\bf h}_t + {\bf b}^{(o)}   }

確率化

{ \displaystyle {\bf p}_t = softmax ({\bf o}_t) }

確率の抽出

{ \displaystyle P(  {\bf y}_t | {\bf Y}_{\lbrack 0, t-1 \rbrack }) = {\bf p}_t \cdot {\bf y}_t  }

単語を入力するたびに、 { \displaystyle P(  {\bf y}_t | {\bf Y}_{\lbrack 0, t-1 \rbrack })   } を計算していき、その結果をすべてかけ合わせたものが { \displaystyle P_{rnnlm} ({\bf Y}) } となります

この出力からなんらかの誤差関数を評価してそれを最小化するようにEとかWを調整していくことになるのですが、それは次回書きます。

引っ越しました

はてなダイアリーからはてなブログへ引っ越しました

juman++を利用して分かち書き

ひとまずの目標がrnnlmを作ってみることなので、その準備として日本語の文書を渡してそれを形態素ごとに区切って出力するやつを作っておきたいです。

環境を汚したくないのでdocker内にjuman++をインストールします

FROM ubuntu:17.10

RUN apt-get update
RUN apt-get install -y git wget build-essential libboost-all-dev

RUN wget http://lotus.kuee.kyoto-u.ac.jp/nl-resource/jumanpp/jumanpp-1.02.tar.xz
RUN tar Jxf jumanpp-1.02.tar.xz
RUN cd jumanpp-1.02 && ./configure && make && make install

これは

docker build . -t jumanpp

みたいな感じでイメージを作成できます

イメージを作成したら

docker run -d -it --name jumanpp-container jumanpp

みたいな感じでコンテナを起動しておきます

すると

$ echo "吾輩は猫である。" | docker exec -i jumanpp-container jumanpp

吾輩 わがはい 吾輩 名詞 6 普通名詞 1 * 0 * 0 "代表表記:我が輩/わがはい カテゴリ:人"
は は は 助詞 9 副助詞 2 * 0 * 0 NIL
猫 ねこ 猫 名詞 6 普通名詞 1 * 0 * 0 "代表表記:猫/ねこ 漢字読み:訓 カテゴリ:動物"
である である だ 判定詞 4 * 0 判定詞 25 デアル列基本形 15 NIL
。 。 。 特殊 1 句点 1 * 0 * 0 NIL
EOS

みたいな感じで実行できます。


やりたいのは分かち書きなので、適当にシェルを書きます

#!/bin/bash

for line in $(cat -)
do
    echo "$line" | docker exec -i jumanpp-container jumanpp | while read output
    do
      if [ "${output}" != "EOS" ]; then
          word=$(echo ${output} | cut -d " " -f 1)
          if [ "${word}" != "@" ]; then
              result="${result} ${word}"
          fi
      else
          echo $result
      fi
    done
done

これをwakachi.shとか名前をつけて保存しておきます。

吾輩は猫である。
名前はまだ無い。
どこで生まれたかとんと見当がつかぬ。

みたいなファイルをテストのために用意しました

$ cat test.txt | ./wakachi.sh 
吾輩 は 猫 である 。
名前 は まだ 無い 。
どこ で 生まれた か とんと 見当 が つか ぬ 。

こんな感じで出力できました。

batch, epochについて学ぶ

1回のパラメータ更新のために利用するデータの数をbatch_size,
データを何周するかをepochと呼ぶみたいです

前回は簡単のためbatch_size = 1, epochs = 1 としましたが、現実的には両方1というのはあまりないと思うので前回のプログラムを修正してbatch_sizeとepochsに対応できるようにします。

dynamic_rnnのinputは [batch_size, max_len, input_size]なshapeなので、batch_size = 5, max_len = 3, input_size = 1なら

xs = [ [[1], [2], [3]],
       [[4], [5], [6]],
       [[7], [8], [9]],
       [[0], [0], [0]],
       [[1], [1], [1]] ] 

な感じで渡してやる必要があります

また、outputは [batch_size, max_len, cell.output_size] なshapeなので今回は上記xsと同じような形になります

output = [ [[1], [4], [7]],
           [[2], [5], [8]],
           [[3], [6], [9]],
           [[9], [9], [9]],
           [[8], [8], [8]] ] 

このとき、[ [1], [4], [7] ] は xsの [ [1], [2], [3] ] に対応していて、xsの1を入力したときの出力が1, 2を入力したときの出力が4, 3を入力したときの出力が7 となっているっぽいです。

今回の例では最後の入力を与えたときの出力がほしいので、outputの右端の列をもらうために、output[:, -1, 0]としています

まとめると、下記な感じになります。

import random
import tensorflow as tf
sess = tf.Session()

def generate_train_data(n):
    train_x = []
    train_y = []
    for i in range(n):
        xs = [random.random() * 10 for j in range(max_len)]
        ys = sum(xs)
        train_x.append(xs)
        train_y.append(ys)
    return train_x, train_y

size = 1
rnn_cell = tf.nn.rnn_cell.BasicRNNCell(num_units=size, activation=tf.nn.leaky_relu)

n_batch = 5
epochs = 4
max_len = 3
data_size = 1000

x = tf.placeholder(tf.float32, shape=[n_batch, max_len, size])

output, state = tf.nn.dynamic_rnn(rnn_cell, x, dtype=tf.float32)

y = tf.placeholder(tf.float32, shape=[n_batch])

loss = tf.reduce_mean(tf.square(y - output[:, -1, 0]))

optimizer = tf.train.GradientDescentOptimizer(0.0001)
train_step = optimizer.minimize(loss)

init = tf.global_variables_initializer()
sess.run(init)
losses = []
n_batches = int(data_size/n_batch)
x_train, y_train = generate_train_data(data_size)
for epoch in range(epochs):
    for i in range(n_batches):
        min_ix = i * n_batch
        max_ix = (i+1) * n_batch
        xs = x_train[min_ix:max_ix]
        # shape [batch_size, max_len, input_size]
        # xs = [ [[1], [2], [3]],
        #        [[4], [5], [6]],
        #        [[7], [8], [9]],
        #        [[0], [0], [0]],
        #        [[1], [1], [1]] ] なイメージ
        xs = [ [[xxx] for xxx in xx] for xx in xs]
        ys = y_train[min_ix:max_ix]
        sess.run(train_step, feed_dict={x: xs, y: ys})
        if i % 100 == 0:
            losses.append(sess.run(loss, feed_dict={x: xs, y: ys}))

print('vars : ', sess.run(rnn_cell.variables[0])) # RNNのウェイトを表示してみる
print('losses : ', losses)


結果は

vars :  [[ 1.0746733 ]
 [ 0.92348057]]
losses :  [138.32841, 0.81912673, 0.57693189, 0.40475434, 0.29114446, 0.1994734, 0.14868297, 0.098647617]

な感じでした

dynamic_rnn を使ってみる

dynamic_rnn は前回のinference相当の事をやってくれるらしいです。

In [47]: def inference(x, max_len):
    ...:     state = initial_state
    ...:     for i in range(max_len):
    ...:         (output, state) = rnn_cell(x[i], state)
    ...:     return tf.squeeze(output)

あと、前回reluを使いましたが変数の値がマイナスになった途端にoutputが0になっちゃうのでleaky reluとかを使ったほうが良さそうでした。

その辺を踏まえて、前回の足し合わせをdynamic_rnnを使って書き直してみました。

import random
import tensorflow as tf
sess = tf.Session()

size = 1
rnn_cell = tf.nn.rnn_cell.BasicRNNCell(num_units=size, activation=tf.nn.leaky_relu)

n_batch = 1
max_len = 3

x = tf.placeholder(tf.float32, shape=[n_batch, max_len, size])

output, state = tf.nn.dynamic_rnn(rnn_cell, x, dtype=tf.float32)

y = tf.placeholder(tf.float32, shape=[])

loss = tf.reduce_mean(tf.square(y - output[-1, -1, -1]))

optimizer = tf.train.GradientDescentOptimizer(0.0001)
train_step = optimizer.minimize(loss)

init = tf.global_variables_initializer()
sess.run(init)
losses = []
for i in range(3000):
    xs = [random.random() * 10 for j in range(max_len)]
    yy = sum(xs)
    # TODO batch_size, max_timeに対応する
    xs = [ [ [xx] for xx in xs] ] # shape 1, 3, 1
    sess.run(train_step, feed_dict={x: xs, y: yy})
    if i % 100 == 0:
        losses.append(sess.run(loss, feed_dict={x: xs, y: yy}))

print('vars : ', sess.run(rnn_cell.variables[0])) # RNNのウェイトを表示してみる
print('losses : ', losses)

動かしてみた結果がこちらです

vars :  [[ 0.94803673]
 [ 1.00511789]]
losses :  [189.65527, 120.48457, 402.87241, 0.10081632, 0.23049662, 0.54468751, 0.041665234, 0.57337528, 0.0074442979, 0.010118874, 0.097668171, 0.047499236, 0.22022037, 0.11357728, 0.14063072, 0.081412017, 0.15039197, 0.30661905, 0.010041318, 0.30713466, 0.2246805, 0.12985943, 0.013962295, 0.0031880976, 0.0033751372, 0.35504347, 0.043138403, 0.034578215, 0.17706399, 0.043236308]


一応想定した通り、variablesは(1, 1)あたりになってlossもなんとなく減っているみたいです