Deep Learning環境はKeras + Docker + Jupyter Notebook + GPUがいいカンジ

はじめに

  • ポチポチKeras動かすのにどのような環境がいいのか考えてみました
  • Keras + Docker + Jupyter Notebook + GPUの環境構築作業ログを紹介します

Keras

GitHub - fchollet/keras: Deep Learning library for Python. Convnets, recurrent neural networks, and more. Runs on Theano or TensorFlow.

  • わかりやすいインターフェースがかなり好き

Docker

  • TensorFlowで学ぶディープラーニング入門~畳み込みニューラルネットワーク徹底解説 を参考にしました
    • この本ではDockerを使用してます
    • 当初はvirtualenv使用して環境作る予定だったので、勉強になりました
  • 環境の移植性いいね
  • GPU使用できるのいいね

Jupyter Notebook

  • 言わずもがな

GPU

  • 速いは正義

所感

  • GCPのDatalabでBigQuery, GCS, 分散TensorFlow使用できるようになると最高
  • AWSでもボタンポチで環境構築できるようになって欲しい

Keras + Docker + Jupyter Notebook + GPUの環境構築

前提

GPUでTensorFlowを動かす - もょもとの技術ノート

  • GPU環境 + NVIDIAまわりのもろもろがインストールされていること
  • GPUでTensorFlowが稼働すること

NVIDIAドライバ入れ直す

上記ではNVIDIAドライバを手動でインストールしましたが、バージョンの相性もあり(またこれか)、apt-getでインストールし直しました。ところがNVIDIAドライバをインストールし直すとOSが起動しなくなったので、結局OSから再インストールしました。

$ sudo add-apt-repository ppa:graphics-drivers/ppa 
$ sudo apt-cache search nvidia-\d+

nvidia-352 - Transitional package for nvidia-361
mate-sensors-applet-nvidia-dbg - Display readings from hardware sensors in your MATE panel (NVIDIA, dbg package)
nvidia-304 - NVIDIA legacy binary driver - version 304.132
nvidia-304-updates - Transitional package for nvidia-304
nvidia-340 - NVIDIA binary driver - version 340.98
nvidia-355 - NVIDIA binary driver - version 355.11
nvidia-358 - NVIDIA binary driver - version 358.16
nvidia-361 - NVIDIA binary driver - version 361.45.18
nvidia-364 - NVIDIA binary driver - version 364.19
nvidia-367 - NVIDIA binary driver - version 367.44
nvidia-370 - NVIDIA binary driver - version 370.28

$ sudo apt-get install nvidia-370

Docker

docs.docker.com

  • 公式サイト通りに実施
    • 全く躓かなかった
    • Ubuntu 16.04 (LTS)を使用

nvidia-docker

github.com

  • DockerからGPU操作できるようにこれも導入
$ wget -P /tmp https://github.com/NVIDIA/nvidia-docker/releases/download/v1.0.0-rc.3/nvidia-docker_1.0.0.rc.3-1_amd64.deb
$ sudo dpkg -i /tmp/nvidia-docker*.deb && rm /tmp/nvidia-docker*.deb

# Test nvidia-smi
$ sudo nvidia-docker run --rm nvidia/cuda nvidia-smi

注意

  • 今回はDockerグループ作成しなかったので、docker, nvidia-dockerコマンドのまえにすべてsudoつけました

ファイアウォール開放

$ sudo ufw enable
$ sudo ufw allow 8888
$ sudo ufw status
$ sudo ufw status
状態: アクティブ

To                         Action      From
--                         ------      ----
8888                       ALLOW       Anywhere                  
8888 (v6)                  ALLOW       Anywhere (v6)     

Docker起動

$ sudo nvidia-docker run -m 10g -it -p 8888:8888 gcr.io/tensorflow/tensorflow:0.10.0-gpu
  • 0.11はKerasとの相性が悪かったため、0.10を使用
  • out of memoryエラー出るので、 -m で多めにメモリ確保
    • 2GBあれば十分かな
$ sudo docker stats
CONTAINER           CPU %               MEM USAGE / LIMIT    MEM %               NET I/O               BLOCK I/O           PIDS
8661fc7f4cea        1.42%               1.345 GiB / 10 GiB   13.45%              11.97 MB / 599.6 kB   0 B / 26.95 MB      44

MNISTサンプルコード動かす

CPU使用時

  • 10分ちょっと f:id:moyomot:20161016221353p:plain

GPU使用時

  • 1分弱
  • いいかんじ! f:id:moyomot:20161016221101p:plain

お世話になったサイト

ubuntu14.04にnvidia-dockerをインストールする - Qiita

今回やってないこと

  • Dockerイメージの作成
    • Docker再起動するとKeras消えちゃうよ
  • データボリュームを接続
    • Docker再起動すると書いたコード消えちゃうよ
    • -vオプションつければOK

GPUでTensorFlowを動かす

はじめに

GTX1070を購入しました GPUでTensorFlowを動かすまでの作業ログです

f:id:moyomot:20161015222547j:plain f:id:moyomot:20161015222603j:plain

環境

ハードウェア(2年前に自作したマイサーバー)

今回購入

ソフトウェア

  • OS: Ubuntu Server 16.04.1 LTS

- NVIDIAドライバ: cuda-repo-ubuntu1504-7-5-local_7.5-18_amd64.deb → apt-getで入れる

  • cuDNN: cudnn-7.5-linux-x64-v5.1.tgz
  • TensorFlow: 0.11

注意

  • 2016/10/15時点でのバージョン構成
  • CUDAはapt-getで導入しました
  • CUDA8はTensorFlow0.11に対応してませんでした
  • 必要ソフトウェアのバージョンを合わせるのに試行錯誤が必要でした

お世話になったサイト

Ubuntu14.04 + GPU + TensorFlow 環境構築 - Qiita

GPGPUマシンの更新(2) 〜 CUDA 7.5 と cuDNN 5.0RC - まんぼう日記

GPU合体

f:id:moyomot:20161015222533j:plain

  • GTX1070はPCI Express 3.0がマザボに備わっているかを確認すればOK
    • CPUの場合、チップセットを気にしなければならないが、GPUは何を確認すればいいのか当初わからなかった
    • D-sub装備していないので、HDMIでテレビに繋いで作業しました

OSインストール

  • Ubuntu Server 16.04.1 LTS入れました
  • OS入れば、SSHで作業できる

GPUに必要なソフトウェア導入

とりあえず

sudo apt-get install gcc make

NVIDIAドライバ

- ダウンロード - NVIDIA DRIVERS Linux x64 (AMD64/EM64T) Display Driver - scpでサーバーへ sudo chmod 755 NVIDIA-Linux-x86_64-367.57.run sudo ./NVIDIA-Linux-x86_64-367.27.run

[追記]
- dockerで動かすにはapt-getで入れたほうがよい

$ sudo add-apt-repository ppa:graphics-drivers/ppa 
$ sudo apt-cache search nvidia-\d+

nvidia-352 - Transitional package for nvidia-361
mate-sensors-applet-nvidia-dbg - Display readings from hardware sensors in your MATE panel (NVIDIA, dbg package)
nvidia-304 - NVIDIA legacy binary driver - version 304.132
nvidia-304-updates - Transitional package for nvidia-304
nvidia-340 - NVIDIA binary driver - version 340.98
nvidia-355 - NVIDIA binary driver - version 355.11
nvidia-358 - NVIDIA binary driver - version 358.16
nvidia-361 - NVIDIA binary driver - version 361.45.18
nvidia-364 - NVIDIA binary driver - version 364.19
nvidia-367 - NVIDIA binary driver - version 367.44
nvidia-370 - NVIDIA binary driver - version 370.28

$ sudo apt-get install nvidia-370

そして、再起動

CUDA

sudo apt install nvidia-cuda-toolkit
  • NVIDIAサイトからダウンロードしたものはインストールできず断念

cuDNN

  • NVIDIAサイトでユーザー登録が必要
    • 登録後、すぐにダウンロード可能(2016/10/15時点)
    • アンケートに答える必要あり
  • cudnn-7.5-linux-x64-v5.1.tgz
tar xvzf cudnn-7.5-linux-x64-v5.0-ga.tgz
cd cuda 
sudo cp lib64/* /usr/lib/x86_64-linux-gnu
sudo cp include/cudnn.h /usr/include
sudo chmod a+r /usr/lib/x86_64-linux-gnu/libcudnn*

以上で環境構築は完了

TensorFlow動かしてみる

インストール

export TF_BINARY_URL=https://storage.googleapis.com/tensorflow/linux/gpu/tensorflow-0.11.0rc0-cp27-none-linux_x86_64.whl
sudo apt-get install python-pip python-dev
sudo pip install --upgrade $TF_BINARY_URL

サンプルコード動かす

  • 適当にgit cloneし、中に移動

GitHub - tensorflow/tensorflow: Computation using data flow graphs for scalable machine learning

Python 2.7.12 (default, Jul  1 2016, 15:12:24) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import tensorflow as tf
I tensorflow/stream_executor/dso_loader.cc:111] successfully opened CUDA library libcublas.so locally
I tensorflow/stream_executor/dso_loader.cc:111] successfully opened CUDA library libcudnn.so locally
I tensorflow/stream_executor/dso_loader.cc:111] successfully opened CUDA library libcufft.so locally
I tensorflow/stream_executor/dso_loader.cc:111] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:111] successfully opened CUDA library libcurand.so locally
>>> hello = tf.constant('Hello, TensorFlow!')
>>> sess = tf.Session()
I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:925] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
I tensorflow/core/common_runtime/gpu/gpu_device.cc:951] Found device 0 with properties: 
name: GeForce GTX 1070
major: 6 minor: 1 memoryClockRate (GHz) 1.835
pciBusID 0000:01:00.0
Total memory: 7.92GiB
Free memory: 7.84GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:972] DMA: 0 
I tensorflow/core/common_runtime/gpu/gpu_device.cc:982] 0:   Y 
I tensorflow/core/common_runtime/gpu/gpu_device.cc:1041] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 1070, pci bus id: 0000:01:00.0)

ふおおー

Deep MNIST for Experts

ページ最後のコード実行は約30分かかるとのこと 今回の環境ではGPUを使用したため、約2-3分で実行できた

cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(y_conv, y_))
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(y_conv,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
sess.run(tf.initialize_all_variables())
for i in range(20000):
  batch = mnist.train.next_batch(50)
  if i%100 == 0:
    train_accuracy = accuracy.eval(feed_dict={
        x:batch[0], y_: batch[1], keep_prob: 1.0})
    print("step %d, training accuracy %g"%(i, train_accuracy))
  train_step.run(feed_dict={x: batch[0], y_: batch[1], keep_prob: 0.5})

print("test accuracy %g"%accuracy.eval(feed_dict={
    x: mnist.test.images, y_: mnist.test.labels, keep_prob: 1.0}))
 nvidia-smi 
Sat Oct 15 21:59:52 2016       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 367.57                 Driver Version: 367.57                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  GeForce GTX 1070    Off  | 0000:01:00.0     Off |                  N/A |
| 38%   60C    P2   114W / 166W |   7797MiB /  8112MiB |     83%      Default |
+-------------------------------+----------------------+----------------------+

いいかんじ!

Kerasでシンプルなニューラルネットワークを実装する

はじめに

パッケージ

import numpy as np
import sklearn
import matplotlib.pyplot as plt
matplotlib.rcParams['figure.figsize'] = (10.0, 8.0)
from keras.models import Sequential
from keras.layers.core import Dense, Activation, Dropout
from keras.regularizers import l1l2

moon データセット

np.random.seed(0)
X, y = sklearn.datasets.make_moons(200, noise=0.20)
plt.scatter(X[:,0], X[:,1], s=40, c=y, cmap=plt.cm.Spectral)

f:id:moyomot:20160718134224p:plain

モデル構築

model = Sequential()
model.add(Dense(20, input_shape=(2,), W_regularizer=l1l2(l1=0., l2=0. )))
model.add(Activation('relu'))
model.add(Dropout(0.))
model.add(Dense(2))
model.add(Activation('softmax'))
model.compile(loss='sparse_categorical_crossentropy', optimizer='Adadelta')
model.fit(X, y, validation_split=0.2, batch_size=20, nb_epoch=100, verbose=1)

描画

def plot_decision_boundary(pred_func):
    # Set min and max values and give it some padding
    x_min, x_max = X[:, 0].min() - .5, X[:, 0].max() + .5
    y_min, y_max = X[:, 1].min() - .5, X[:, 1].max() + .5
    h = 0.01
    # Generate a grid of points with distance h between them
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
    # Predict the function value for the whole gid
    Z = pred_func(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)
    # Plot the contour and training examples
    plt.contourf(xx, yy, Z, cmap=plt.cm.Spectral)
    plt.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.Spectral)
    
def predict(model, x):
    y = model.predict(x)
    return np.argmax(y.data, axis=1)

plot_decision_boundary(lambda x: predict(model, x))

f:id:moyomot:20160718134239p:plain

circle データセット

np.random.seed(0)
X, y = sklearn.datasets.make_circles(noise=0.2, factor=0.5, random_state=1)
plt.scatter(X[:,0], X[:,1], s=40, c=y, cmap=plt.cm.Spectral)

f:id:moyomot:20160718134400p:plain

描画

f:id:moyomot:20160718134415p:plain

ScalikeJDBCでMySQLにBulk Insertする

Bulk insert to MySQL Table by ScalikeJDBC

テーブル定義

CREATE TABLE `members` (
  `id` int(11) DEFAULT NULL,
  `name` varchar(64) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

サンプルコード

import scalikejdbc._

object BulkInsert extends App {

  // initialize JDBC driver & connection pool
  Class.forName("com.mysql.jdbc.Driver")
  ConnectionPool.singleton("jdbc:mysql://hostname/daname?rewriteBatchedStatements=true", "user", "password")

  // ad-hoc session provider on the REPL
  implicit val session = AutoSession

  val params: Seq[Seq[Any]] = (1 to 1000).map(i => Seq(i, "user_" + i))
  SQL("insert into members values (?, ?)").batch(params: _*).apply()
}
  • ポイントは rewriteBatchedStatements=true を使用すること

build.sbt

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.scalikejdbc" %% "scalikejdbc"        % "2.4.+",
  "ch.qos.logback"  %  "logback-classic"    % "1.1.+",
  "mysql" % "mysql-connector-java" % "6.0.3"

)

MySQLのgeneral logでbulk insertを確認する

$ brew services stop mysql 
$ sudo mysql.server start --general-log
$ tail -f /usr/local/var/mysql/hostname.log

参考

GitHub - scalikejdbc/scalikejdbc: A tidy SQL-based DB access library for Scala developers. This library naturally wraps JDBC APIs and provides you easy-to-use APIs.

scalikejdbc/QueryInterfaceSpec.scala at 845c40025a7cfa146d7e9099321e0791ce554854 · scalikejdbc/scalikejdbc · GitHub

MacでMySQLのクエリログを確認する方法 | Macha's Dev Blog

MySQL Connector/Jにおける大量INSERTのチューニング - SH2の日記

gensimソースコードリーディング

はじめに

全体の流れ

  1. 文書からコーパス生成
  2. コーパスを元にトピックモデルを生成
  3. モデルを利用して文書の類似度を算出

注意

  • 英語の文章を対象とします

1. 文書からコーパス生成

文章をベクトル化してコーパスを生成します。

# ログ出力の設定
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

# 必要モジュールのインポート
from gensim import corpora, models, similarities
from pprint import pprint 

# コーパス元となる文書
documents = ["Human machine interface for lab abc computer applications",
             "A survey of user opinion of computer system response time",
             "The EPS user interface management system",
             "System and human system engineering testing of EPS",
             "Relation of user perceived response time to error measurement",
             "The generation of random binary unordered trees",
             "The intersection graph of paths in trees",
             "Graph minors IV Widths of trees and well quasi ordering",
             "Graph minors A survey"]

# ストップワード
stoplist = set('for a of the and to in'.split())
texts = [[word for word in document.lower().split() if word not in stoplist]
         for document in documents]

# 2回以上出現する単語に限定する
from collections import defaultdict
frequency = defaultdict(int)
for text in texts:
    for token in text:
        frequency[token] += 1

texts = [[token for token in text if frequency[token] > 1]
         for text in texts]

# テキスト内容を確認
from pprint import pprint   # pretty-printer
pprint(texts)
[['human', 'interface', 'computer'],
 ['survey', 'user', 'computer', 'system', 'response', 'time'],
 ['eps', 'user', 'interface', 'system'],
 ['system', 'human', 'system', 'eps'],
 ['user', 'response', 'time'],
 ['trees'],
 ['graph', 'trees'],
 ['graph', 'minors', 'trees'],
 ['graph', 'minors', 'survey']]

# 単語と単語に与えられるIDをマッピングする
dictionary = corpora.Dictionary(texts)
dictionary.save('/tmp/deerwester.dict') # 保存
print(dictionary)
print(dictionary.token2id)

ここまでチュートリアル通りです。
与えられた文書を元にコーパスを生成する下準備をしてます。
文書を単語に分割して、単語にIDを付与します。
英語はホワイトスペース区切りで単語分割できるから楽ちんですね。

corpora.Dictionary(texts)から文章をbag-of-wordsでベクトル化して、コーパスを生成できます。 ベクトル化では1文章内の単語の登場回数を利用しています。

次に、corpora.Dictionary(texts)がどうなっているか把握するため、gensim/dictionary.py at develop · piskvorky/gensim · GitHubを見ていきます。

class Dictionary(utils.SaveLoad, Mapping):
    """
    Dictionary encapsulates the mapping between normalized words and their integer ids.
    The main function is `doc2bow`, which converts a collection of words to its
    bag-of-words representation: a list of (word_id, word_frequency) 2-tuples.
    """
    def __init__(self, documents=None, prune_at=2000000):
        """
        If `documents` are given, use them to initialize Dictionary (see `add_documents()`).
        """
        self.token2id = {} # token -> tokenId
        self.id2token = {} # reverse mapping for token2id; only formed on request, to save memory
        self.dfs = {} # document frequencies: tokenId -> in how many documents this token appeared

        self.num_docs = 0 # number of documents processed
        self.num_pos = 0 # total number of corpus positions
        self.num_nnz = 0 # total number of non-zeroes in the BOW matrix

        if documents is not None:
            self.add_documents(documents, prune_at=prune_at)

    def add_documents(self, documents, prune_at=2000000):
        """
        Update dictionary from a collection of documents. Each document is a list
        of tokens = **tokenized and normalized** strings (either utf8 or unicode).
        This is a convenience wrapper for calling `doc2bow` on each document
        with `allow_update=True`, which also prunes infrequent words, keeping the
        total number of unique words <= `prune_at`. This is to save memory on very
        large inputs. To disable this pruning, set `prune_at=None`.
        >>> print(Dictionary(["máma mele maso".split(), "ema má máma".split()]))
        Dictionary(5 unique tokens)
        """
        for docno, document in enumerate(documents):
            # log progress & run a regular check for pruning, once every 10k docs
            if docno % 10000 == 0:
                if prune_at is not None and len(self) > prune_at:
                    self.filter_extremes(no_below=0, no_above=1.0, keep_n=prune_at)
                logger.info("adding document #%i to %s", docno, self)

            # update Dictionary with the document
            self.doc2bow(document, allow_update=True) # ignore the result, here we only care about updating token ids

        logger.info("built %s from %i documents (total %i corpus positions)",
                     self, self.num_docs, self.num_pos)


    def doc2bow(self, document, allow_update=False, return_missing=False):
        """
        Convert `document` (a list of words) into the bag-of-words format = list
        of `(token_id, token_count)` 2-tuples. Each word is assumed to be a
        **tokenized and normalized** string (either unicode or utf8-encoded). No further preprocessing
        is done on the words in `document`; apply tokenization, stemming etc. before
        calling this method.
        If `allow_update` is set, then also update dictionary in the process: create
        ids for new words. At the same time, update document frequencies -- for
        each word appearing in this document, increase its document frequency (`self.dfs`)
        by one.
        If `allow_update` is **not** set, this function is `const`, aka read-only.
        """
        if isinstance(document, string_types):
            raise TypeError("doc2bow expects an array of unicode tokens on input, not a single string")

        # Construct (word, frequency) mapping.
        counter = defaultdict(int)
        for w in document:
            counter[w if isinstance(w, unicode) else unicode(w, 'utf-8')] += 1

        token2id = self.token2id
        if allow_update or return_missing:
            missing = dict((w, freq) for w, freq in iteritems(counter) if w not in token2id)
            if allow_update:
                for w in missing:
                    # new id = number of ids made so far;
                    # NOTE this assumes there are no gaps in the id sequence!
                    token2id[w] = len(token2id)

        result = dict((token2id[w], freq) for w, freq in iteritems(counter) if w in token2id)

        if allow_update:
            self.num_docs += 1
            self.num_pos += sum(itervalues(counter))
            self.num_nnz += len(result)
            # increase document count for each unique token that appeared in the document
            dfs = self.dfs
            for tokenid in iterkeys(result):
                dfs[tokenid] = dfs.get(tokenid, 0) + 1

        # return tokenids, in ascending id order
        result = sorted(iteritems(result))
        if return_missing:
            return result, missing
        else:
            return result

流れとしては、Dictionaryadd_documentsdoc2bow です。 コメントにも書いてあるように doc2bowがここのメインです。 ここでは、どの単語が何回登場するのか計算するのが目的です。
そのために、単語とIDのマッピングなどの処理があります(self.token2id)

単語の登場回数はself.dfs (document frequencies)に保存されます。 doc2bowはread onlyでも使用できるようにallow_updateのオプションがあります。 いろいろ書いてありますが、下記で登場回数を更新していることがわかると思います。

for tokenid in iterkeys(result):
    dfs[tokenid] = dfs.get(tokenid, 0) + 1

チュートリアルに戻ります

new_doc = "Human computer interaction"  # 新しい文書
new_vec = dictionary.doc2bow(new_doc.lower().split())  # ベクトル化
corpus = [dictionary.doc2bow(text) for text in texts]  # コーパス生成
corpora.MmCorpus.serialize('/tmp/deerwester.mm', corpus)  # ディスクに保存

新しい文書を与え、doc2bowでベクトル化できます。 また、Matrix Market形式でコーパスをディスクに保存します。

2. コーパスを元にトピックモデルを生成

コーパスを生成できました。 これを元にTF-IDFを算出するためのモデルを生成し、LSIやLDAのモデルを生成する流れを見ていきます。 ディスクに保存したコーパスを読み込み、TF-IDFのモデルを生成します。

corpus = corpora.MmCorpus('/tmp/deerwester.mm')  # コーパス読み込み
tfidf = models.TfidfModel(corpus)  # tf idfのモデル生成

TF-IDFを簡単に紹介すれば、出現頻度の多い単語は重要視して、異なる文章間に何度も出現する単語を重要視しない的な重み付けです。
TfidfModel gensim/tfidfmodel.py at develop · piskvorky/gensim · GitHub の中を見ていきます。
idfの計算はdf2idfの中で行われてます。

def df2idf(docfreq, totaldocs, log_base=2.0, add=0.0):
    """
    Compute default inverse-document-frequency for a term with document frequency `doc_freq`::
      idf = add + log(totaldocs / doc_freq)
    """
    return add + math.log(1.0 * totaldocs / docfreq, log_base)

def precompute_idfs(wglobal, dfs, total_docs):
    """Precompute the inverse document frequency mapping for all terms."""
    # not strictly necessary and could be computed on the fly in TfidfModel__getitem__.
    # this method is here just to speed things up a little.
    return dict((termid, wglobal(df, total_docs))
                for termid, df in iteritems(dfs))

チュートリアルに戻ります。

doc_bow = [(0, 1), (2, 1)]  # ベクトル化した文章を新たに定義します
print(tfidf[doc_bow])  # モデルからTF-IDFを算出します

corpus_tfidf = tfidf[corpus]
for doc in corpus_tfidf:
    print(doc)

モデルを元に文書ベクトルのTF-IDFを求めています。 コーパスからTF-IDFを計算できました。 それを元にLSIやLDAなどのトピックモデルを求めることができます。

lsi = models.LsiModel(corpus_tfidf, id2word=dictionary, num_topics=2)  # LSIモデル生成
corpus_lsi = lsi[corpus_tfidf]  # bow->tfidf->fold-in-lsi
lsi.print_topics(2)
for doc in corpus_lsi: # bow->tfidf と tfidf->lsi の変換はここで行われてます
    print(doc)

文章量が極小なので、出力されたトピックは意味不明ですね。

gensim/lsimodel.py at develop · piskvorky/gensim · GitHub LSIモデルの中を少し見ていきます。(とほほ)

class Projection(utils.SaveLoad):
    def __init__(self, m, k, docs=None, use_svdlibc=False, power_iters=P2_EXTRA_ITERS, extra_dims=P2_EXTRA_DIMS):
        """
        Construct the (U, S) projection from a corpus `docs`. The projection can
        be later updated by merging it with another Projection via `self.merge()`.
        This is the class taking care of the 'core math'; interfacing with corpora,
        splitting large corpora into chunks and merging them etc. is done through
        the higher-level `LsiModel` class.
        """
        self.m, self.k = m, k
        self.power_iters = power_iters
        self.extra_dims = extra_dims
        if docs is not None:
            # base case decomposition: given a job `docs`, compute its decomposition,
            # *in-core*.
            if not use_svdlibc:
                u, s = stochastic_svd(
                    docs, k, chunksize=sys.maxsize,
                    num_terms=m, power_iters=self.power_iters,
                    extra_dims=self.extra_dims)
            else:
                try:
                    import sparsesvd
                except ImportError:
                    raise ImportError("`sparsesvd` module requested but not found; run `easy_install sparsesvd`")
                logger.info("computing sparse SVD of %s matrix", str(docs.shape))
                if not scipy.sparse.issparse(docs):
                    docs = matutils.corpus2csc(docs)
                ut, s, vt = sparsesvd.sparsesvd(docs, k + 30)  # ask for extra factors, because for some reason SVDLIBC sometimes returns fewer factors than requested
                u = ut.T
                del ut, vt
                k = clip_spectrum(s**2, self.k)
            self.u = u[:, :k].copy()
            self.s = s[:k].copy()
        else:
            self.u, self.s = None, None

(省略)

class LsiModel(interfaces.TransformationABC):
    """
    Objects of this class allow building and maintaining a model for Latent
    Semantic Indexing (also known as Latent Semantic Analysis).
    The main methods are:
    1. constructor, which initializes the projection into latent topics space,
    2. the ``[]`` method, which returns representation of any input document in the
       latent space,
    3. `add_documents()` for incrementally updating the model with new documents.
    The left singular vectors are stored in `lsi.projection.u`, singular values
    in `lsi.projection.s`. Right singular vectors can be reconstructed from the output
    of `lsi[training_corpus]`, if needed. See also FAQ [2]_.
    Model persistency is achieved via its load/save methods.
    .. [2] https://github.com/piskvorky/gensim/wiki/Recipes-&-FAQ#q4-how-do-you-output-the-u-s-vt-matrices-of-lsi
    """
    def __init__(self, corpus=None, num_topics=200, id2word=None, chunksize=20000,
                 decay=1.0, distributed=False, onepass=True,
                 power_iters=P2_EXTRA_ITERS, extra_samples=P2_EXTRA_DIMS):
        """
        `num_topics` is the number of requested factors (latent dimensions).
        After the model has been trained, you can estimate topics for an
        arbitrary, unseen document, using the ``topics = self[document]`` dictionary
        notation. You can also add new training documents, with ``self.add_documents``,
        so that training can be stopped and resumed at any time, and the
        LSI transformation is available at any point.
        If you specify a `corpus`, it will be used to train the model. See the
        method `add_documents` for a description of the `chunksize` and `decay` parameters.
        Turn `onepass` off to force a multi-pass stochastic algorithm.
        `power_iters` and `extra_samples` affect the accuracy of the stochastic
        multi-pass algorithm, which is used either internally (`onepass=True`) or
        as the front-end algorithm (`onepass=False`). Increasing the number of
        power iterations improves accuracy, but lowers performance. See [3]_ for
        some hard numbers.
        Turn on `distributed` to enable distributed computing.
        Example:
        >>> lsi = LsiModel(corpus, num_topics=10)
        >>> print(lsi[doc_tfidf]) # project some document into LSI space
        >>> lsi.add_documents(corpus2) # update LSI on additional documents
        >>> print(lsi[doc_tfidf])
        .. [3] http://nlp.fi.muni.cz/~xrehurek/nips/rehurek_nips.pdf
        """
        self.id2word = id2word
        self.num_topics = int(num_topics)
        self.chunksize = int(chunksize)
        self.decay = float(decay)
        if distributed:
            if not onepass:
                logger.warning("forcing the one-pass algorithm for distributed LSA")
                onepass = True
        self.onepass = onepass
        self.extra_samples, self.power_iters = extra_samples, power_iters

        if corpus is None and self.id2word is None:
            raise ValueError('at least one of corpus/id2word must be specified, to establish input space dimensionality')

        if self.id2word is None:
            logger.warning("no word id mapping provided; initializing from corpus, assuming identity")
            self.id2word = utils.dict_from_corpus(corpus)
            self.num_terms = len(self.id2word)
        else:
            self.num_terms = 1 + max([-1] + self.id2word.keys())

        self.docs_processed = 0
        self.projection = Projection(self.num_terms, self.num_topics, power_iters=self.power_iters, extra_dims=self.extra_samples)

(省略)

def stochastic_svd(corpus, rank, num_terms, chunksize=20000, extra_dims=None,
                   power_iters=0, dtype=numpy.float64, eps=1e-6):
    """
    Run truncated Singular Value Decomposition (SVD) on a sparse input.
    Return (U, S): the left singular vectors and the singular values of the input
    data stream `corpus` [4]_. The corpus may be larger than RAM (iterator of vectors).
    This may return less than the requested number of top `rank` factors, in case
    the input itself is of lower rank. The `extra_dims` (oversampling) and especially
    `power_iters` (power iterations) parameters affect accuracy of the decomposition.
    This algorithm uses `2+power_iters` passes over the input data. In case you can only
    afford a single pass, set `onepass=True` in :class:`LsiModel` and avoid using
    this function directly.
    The decomposition algorithm is based on
    **Halko, Martinsson, Tropp. Finding structure with randomness, 2009.**
    .. [4] If `corpus` is a scipy.sparse matrix instead, it is assumed the whole
       corpus fits into core memory and a different (more efficient) code path is chosen.
    """

(省略)

流れは、LsiModelProjectionstochastic_svd となります。
LsiModelの中では規模の大きいデータを処理する分散処理の仕組みが用意されています。ふむ。
Projectionでは特異値分解にstochastic_svdを使用するか、sparsesvdを使用するか決定できます。 また、文章を後に追加した時に結合するための仕組みもあります。
stochastic_svdで特異値分解を行っています。ここで次元圧縮してLSIを実現しているのですね。

3. モデルを利用して文書の類似度を算出

LSIモデルの中で新たな文章のベクトルを求めます。

doc = "Human computer interaction"
vec_bow = dictionary.doc2bow(doc.lower().split())
vec_lsi = lsi[vec_bow] 
print(vec_lsi)

おわりに

gensimソースコードリーディングは以上となります。 gensimソースコードの上辺だけ追いましたが、ブラックボックスであったライブラリの一片が見えて楽しかったです。
みなさまも楽しんでいただけたら幸いです。

Web企業のログ基盤まわりをまとめてみた

はじめに

  • Web企業のログ解析基盤系資料をまとめてみました
  • SlideShare, Speaker Deck, 企業技術ブロクをもとに調査しました
  • 2014年以降の資料に限定(たぶん)
  • 自分用メモ
  • 勢いで調べたので、結構もれてると思う

メルカリ

tech.mercari.com

CyberAgent

ameblo.jp

Apache SparkでカスタムStreamingする

はじめに

Spark, SQL on Hadoop etc. Advent Calendar 2014 - Qiita 3日目の記事です。
SparkでカスタムStreamingする方法を紹介します。

TwitterやFlumeなどのSpark Streamingの活用例が下記にあります。
spark/examples/src/main/scala/org/apache/spark/examples/streaming at master · apache/spark · GitHub
spark/external at master · apache/spark · GitHub
これらは、いろいろ利用できそうですね。

一方で、オリジナルのStreaming処理を行いたい場合には、
Sparkが提供するReceiverクラスを拡張する必要があります。
この記事では、Receiverクラスを拡張して、
カスタムしたStreaming処理を行う方法について紹介します。

ざっくりとしたSparkの説明

Apache Sparkについて秒速で振り返ります

  • はやい

インメモリだからHadoopMapReduceより、はやい

  • 使いやすい

JavaScalaPythonで書ける
並行処理を実現するための便利APIがたくさんある

  • すごい

機会学習、ストリーミング処理、SQLライクな処理、グラフなんでもできる
もうなんかすごい、プラットフォームだこれは

次に、Spark Streamingについて簡単に見ていきます。

ざっくりとしたSpark Streamingの説明

Streaming処理で何ができるのか
ドカドカ流れてくるログデータ等をバッチで一括処理するのとは別に、
より短時間の処理を実現できます。

  • 使いやすい

Spark Streamingでは、もちろんSparkのAPIが利用できるので、いろいろ便利です

  • 耐障害性

耐障害性がある。今度はこの辺も検証してみたいです

言葉だけだと、イメージがつかめないのでコードを見ていきたいと思います。

Twitter Streaming概要

Twitter Streamingのサンプルコードを見て、Spark Streamingの雰囲気をつかみます。
spark/TwitterPopularTags.scala at master · apache/spark · GitHubから抜粋します

val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters)

val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
                 .map{case (topic, count) => (count, topic)}
                 .transform(_.sortByKey(false))
// Print popular hashtags
topCounts60.foreachRDD(rdd => {
  val topList = rdd.take(10)
  println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
  topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})

1分間の頻出ハッシュタグを集計するコードです。
ストリーム処理するためのAPI(TwitterUtils.createStream)を起点に、
空白で文字列を分割し、#で始まる単語を抽出し、
1分間あたりの単語数を集計しています。
直感的で分かりやすいですね。

Sparkが事前に用意しているAPIは、
Flume、Kafka、MQTT、Twitter、ZeroMQです。
spark/external at master · apache/spark · GitHub
よって、自分たちの目的にあわせたStreamin処理を行うには、
別の手段が必要になります。

オリジナルStreaming

ここから本題です。
いきなりですが、オリジナルのStreaming処理を行うには、Receiverクラスを拡張します。
ということで、Receiverクラスを見てみます。
spark/Receiver.scala at master · apache/spark · GitHub
仕様がいろいろ書かれてます。
重要なのはonStartメソッドとonStopメソッドです。これらをオーバーライドして、オリジナルのReceiverを作成します。

onStartメソッド
  1. データを受け取るために、スレッドとかのリソースはここで初期化する
  2. Spark上で処理できるように、受け取ったデータはstoreメソッドに渡す
  3. Non Blockingである必要がある

しかし、ここの公式ドキュメントはNon Blockingでないんですよね。
Spark Streaming Custom Receivers - Spark 1.1.1 Documentation

onStopメソッド
  1. Receiverが終了したときに呼ばれる
  2. 終了処理を書く

Sparkのサンプルは全体的にonStop処理が軽視されているような気がします。
そういうのって良くないですよね。でもすいません、今回は範囲外とさせてください。

今回やりたいこと

f:id:moyomot:20141202213135p:plain
telnetなどのクライアントでSpark Streamingに接続し、
送信したテキストの出現回数を集計したいと思います。
Non BlockingはNettyで実現します。むしろNettyをやりたかった。

登場人物
  1. PrintReceiver : ネットワーク経由でテキストを受け取る。Non BlockingはNettyで実現する
  2. PrintUtils : Receiverのためのインターフェース的な役割です
  3. TestPrintStreaming : 実行クラス
PrintReceiver
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage._
import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.Receiver
import java.net.ServerSocket
import java.io.BufferedReader
import java.io.InputStreamReader
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel._
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging._

class PrintInputDStream(@transient ssc_ : StreamingContext, storageLevel: StorageLevel)
  extends ReceiverInputDStream[String](ssc_) {
  override def getReceiver(): Receiver[String] = {
    new PrintReceiver(storageLevel)
  }
}

class PrintReceiver(storageLevel: StorageLevel)
  extends Receiver[String](storageLevel) with Logging {
  def onStart() {
    // メインはNettyコード
    val bossGroup = new NioEventLoopGroup(1);
    val workerGroup = new NioEventLoopGroup();
    try {
      val bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(classOf[NioServerSocketChannel])
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer[SocketChannel] {

          override def initChannel(socketChannel: SocketChannel) {
            val pipeline = socketChannel.pipeline
            pipeline.addLast(new ChannelHandlerAdapter {
              override def channelRead(ctx: ChannelHandlerContext, msg: Object) {
                val buf = msg.asInstanceOf[ByteBuf]
                val str = new StringBuilder
                while (buf.isReadable) { // 文字を文字列にする
                  str.append(buf.readByte.asInstanceOf[Char]);
                }
               store(str.toString.trim) // 受け取った文字列をSparkに渡す
              }
            })
          }
        })
      val f = bootstrap.bind(60000).sync(); // ポート番号
      f.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  def onStop() {
  }
}
PrintUtils
import java.net.InetSocketAddress

import org.apache.spark.annotation.Experimental
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object PrintUtils {
  def createStream (ssc: StreamingContext, storageLevel: StorageLevel
	): ReceiverInputDStream[String] = {
	  new PrintInputDStream(ssc, storageLevel)
  }
}
TestPrintStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.Logger
import org.apache.log4j.Level
import main.scala.spark.streaming.print.PrintUtils

object TestPrintStreaming {
  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.WARN)
    val conf = new SparkConf().setMaster("local[2]").setAppName("PrintStreaming")
    val ssc = new StreamingContext(conf, Seconds(1))
    val nc = PrintUtils.createStream(ssc, StorageLevel.MEMORY_AND_DISK_SER_2)
    val counts = nc.countByValueAndWindow(Seconds(60 * 1), Seconds(1))
    counts.print
    ssc.checkpoint("checkpoint")
    ssc.start()
  }
}
実行結果
-------------------------------------------
Time: 1417525094000 ms
-------------------------------------------
(hello,6)
(hellohellohello,3)
(hellohellohellohellohellohello,1)

-------------------------------------------
Time: 1417525095000 ms
-------------------------------------------
(hello,6)
(hellohellohello,3)
(hellohellohellohellohellohello,1)

-------------------------------------------
Time: 1417525096000 ms
-------------------------------------------
(hello,6)
(hellohellohello,3)
(hellohellohellohellohellohello,1)

14/12/02 21:58:16 WARN storage.BlockManager: Block input-0-1417525096000 already exists on this machine; not re-adding it

まとめ

とまあ、こんな感じでカスタムStreamingしてみました。
いろいろ世界が広がりそうで、楽しいです。
簡単にNon Blockingを実現するためにNettyを用いましたが、
Nettyの調査に一番時間を要しちゃいました。

あと、ActorReceiverなんてものあります。
spark/ActorReceiver.scala at master · apache/spark · GitHub