Deep Learning環境はKeras + Docker + Jupyter Notebook + GPUがいいカンジ
はじめに
- ポチポチKeras動かすのにどのような環境がいいのか考えてみました
- Keras + Docker + Jupyter Notebook + GPUの環境構築作業ログを紹介します
Keras
- わかりやすいインターフェースがかなり好き
Docker
- TensorFlowで学ぶディープラーニング入門~畳み込みニューラルネットワーク徹底解説 を参考にしました
- この本ではDockerを使用してます
- 当初はvirtualenv使用して環境作る予定だったので、勉強になりました
- 環境の移植性いいね
- GPU使用できるのいいね
Jupyter Notebook
- 言わずもがな
GPU
- 速いは正義
所感
Keras + Docker + Jupyter Notebook + GPUの環境構築
前提
GPUでTensorFlowを動かす - もょもとの技術ノート
NVIDIAドライバ入れ直す
上記ではNVIDIAドライバを手動でインストールしましたが、バージョンの相性もあり(またこれか)、apt-getでインストールし直しました。ところがNVIDIAドライバをインストールし直すとOSが起動しなくなったので、結局OSから再インストールしました。
$ sudo add-apt-repository ppa:graphics-drivers/ppa $ sudo apt-cache search nvidia-\d+ nvidia-352 - Transitional package for nvidia-361 mate-sensors-applet-nvidia-dbg - Display readings from hardware sensors in your MATE panel (NVIDIA, dbg package) nvidia-304 - NVIDIA legacy binary driver - version 304.132 nvidia-304-updates - Transitional package for nvidia-304 nvidia-340 - NVIDIA binary driver - version 340.98 nvidia-355 - NVIDIA binary driver - version 355.11 nvidia-358 - NVIDIA binary driver - version 358.16 nvidia-361 - NVIDIA binary driver - version 361.45.18 nvidia-364 - NVIDIA binary driver - version 364.19 nvidia-367 - NVIDIA binary driver - version 367.44 nvidia-370 - NVIDIA binary driver - version 370.28 $ sudo apt-get install nvidia-370
Docker
- 公式サイト通りに実施
- 全く躓かなかった
- Ubuntu 16.04 (LTS)を使用
nvidia-docker
- DockerからGPU操作できるようにこれも導入
$ wget -P /tmp https://github.com/NVIDIA/nvidia-docker/releases/download/v1.0.0-rc.3/nvidia-docker_1.0.0.rc.3-1_amd64.deb $ sudo dpkg -i /tmp/nvidia-docker*.deb && rm /tmp/nvidia-docker*.deb # Test nvidia-smi $ sudo nvidia-docker run --rm nvidia/cuda nvidia-smi
注意
- 今回はDockerグループ作成しなかったので、docker, nvidia-dockerコマンドのまえにすべてsudoつけました
ファイアウォール開放
$ sudo ufw enable $ sudo ufw allow 8888 $ sudo ufw status $ sudo ufw status 状態: アクティブ To Action From -- ------ ---- 8888 ALLOW Anywhere 8888 (v6) ALLOW Anywhere (v6)
Docker起動
$ sudo nvidia-docker run -m 10g -it -p 8888:8888 gcr.io/tensorflow/tensorflow:0.10.0-gpu
- 0.11はKerasとの相性が悪かったため、0.10を使用
- out of memoryエラー出るので、 -m で多めにメモリ確保
- 2GBあれば十分かな
$ sudo docker stats CONTAINER CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 8661fc7f4cea 1.42% 1.345 GiB / 10 GiB 13.45% 11.97 MB / 599.6 kB 0 B / 26.95 MB 44
MNISTサンプルコード動かす
- Jupyter Notebook起動
- Jupyter上からterminalを起動し、pip install kerasを実行
- Jupyter上からこんなことできるのですね
- サンプルコードを動かす keras/mnist_cnn.py at master · fchollet/keras · GitHub
CPU使用時
- 10分ちょっと
GPU使用時
- 1分弱
- いいかんじ!
お世話になったサイト
ubuntu14.04にnvidia-dockerをインストールする - Qiita
今回やってないこと
- Dockerイメージの作成
- Docker再起動するとKeras消えちゃうよ
- データボリュームを接続
- Docker再起動すると書いたコード消えちゃうよ
- -vオプションつければOK
GPUでTensorFlowを動かす
はじめに
GTX1070を購入しました GPUでTensorFlowを動かすまでの作業ログです
環境
ハードウェア(2年前に自作したマイサーバー)
- CPU: Intel CPU Xeon E3-1241V3 3.50GHz
- マザーボード: ASUSTeK Intel H97 Pro
- SSD: Samsung SSD840EVO 250GB
- メモリ: 8GB * 4
今回購入
ソフトウェア
- OS: Ubuntu Server 16.04.1 LTS
- NVIDIAドライバ: cuda-repo-ubuntu1504-7-5-local_7.5-18_amd64.deb → apt-getで入れる
- cuDNN: cudnn-7.5-linux-x64-v5.1.tgz
- TensorFlow: 0.11
注意
- 2016/10/15時点でのバージョン構成
- CUDAはapt-getで導入しました
- CUDA8はTensorFlow0.11に対応してませんでした
- 必要ソフトウェアのバージョンを合わせるのに試行錯誤が必要でした
お世話になったサイト
Ubuntu14.04 + GPU + TensorFlow 環境構築 - Qiita
GPGPUマシンの更新(2) 〜 CUDA 7.5 と cuDNN 5.0RC - まんぼう日記
GPU合体
- GTX1070はPCI Express 3.0がマザボに備わっているかを確認すればOK
OSインストール
GPUに必要なソフトウェア導入
とりあえず
sudo apt-get install gcc make
NVIDIAドライバ
- ダウンロード
- NVIDIA DRIVERS Linux x64 (AMD64/EM64T) Display Driver
- scpでサーバーへ
sudo chmod 755 NVIDIA-Linux-x86_64-367.57.run
sudo ./NVIDIA-Linux-x86_64-367.27.run
[追記]
- dockerで動かすにはapt-getで入れたほうがよい
$ sudo add-apt-repository ppa:graphics-drivers/ppa $ sudo apt-cache search nvidia-\d+ nvidia-352 - Transitional package for nvidia-361 mate-sensors-applet-nvidia-dbg - Display readings from hardware sensors in your MATE panel (NVIDIA, dbg package) nvidia-304 - NVIDIA legacy binary driver - version 304.132 nvidia-304-updates - Transitional package for nvidia-304 nvidia-340 - NVIDIA binary driver - version 340.98 nvidia-355 - NVIDIA binary driver - version 355.11 nvidia-358 - NVIDIA binary driver - version 358.16 nvidia-361 - NVIDIA binary driver - version 361.45.18 nvidia-364 - NVIDIA binary driver - version 364.19 nvidia-367 - NVIDIA binary driver - version 367.44 nvidia-370 - NVIDIA binary driver - version 370.28 $ sudo apt-get install nvidia-370
そして、再起動
CUDA
sudo apt install nvidia-cuda-toolkit
- NVIDIAサイトからダウンロードしたものはインストールできず断念
cuDNN
tar xvzf cudnn-7.5-linux-x64-v5.0-ga.tgz cd cuda sudo cp lib64/* /usr/lib/x86_64-linux-gnu sudo cp include/cudnn.h /usr/include sudo chmod a+r /usr/lib/x86_64-linux-gnu/libcudnn*
以上で環境構築は完了
TensorFlow動かしてみる
インストール
- 公式サイト通りに実行
export TF_BINARY_URL=https://storage.googleapis.com/tensorflow/linux/gpu/tensorflow-0.11.0rc0-cp27-none-linux_x86_64.whl sudo apt-get install python-pip python-dev sudo pip install --upgrade $TF_BINARY_URL
サンプルコード動かす
- 適当にgit cloneし、中に移動
GitHub - tensorflow/tensorflow: Computation using data flow graphs for scalable machine learning
Python 2.7.12 (default, Jul 1 2016, 15:12:24) [GCC 5.4.0 20160609] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import tensorflow as tf I tensorflow/stream_executor/dso_loader.cc:111] successfully opened CUDA library libcublas.so locally I tensorflow/stream_executor/dso_loader.cc:111] successfully opened CUDA library libcudnn.so locally I tensorflow/stream_executor/dso_loader.cc:111] successfully opened CUDA library libcufft.so locally I tensorflow/stream_executor/dso_loader.cc:111] successfully opened CUDA library libcuda.so.1 locally I tensorflow/stream_executor/dso_loader.cc:111] successfully opened CUDA library libcurand.so locally >>> hello = tf.constant('Hello, TensorFlow!') >>> sess = tf.Session() I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:925] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero I tensorflow/core/common_runtime/gpu/gpu_device.cc:951] Found device 0 with properties: name: GeForce GTX 1070 major: 6 minor: 1 memoryClockRate (GHz) 1.835 pciBusID 0000:01:00.0 Total memory: 7.92GiB Free memory: 7.84GiB I tensorflow/core/common_runtime/gpu/gpu_device.cc:972] DMA: 0 I tensorflow/core/common_runtime/gpu/gpu_device.cc:982] 0: Y I tensorflow/core/common_runtime/gpu/gpu_device.cc:1041] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 1070, pci bus id: 0000:01:00.0)
ふおおー
- チュートリアル実行
ページ最後のコード実行は約30分かかるとのこと 今回の環境ではGPUを使用したため、約2-3分で実行できた
cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(y_conv, y_)) train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy) correct_prediction = tf.equal(tf.argmax(y_conv,1), tf.argmax(y_,1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) sess.run(tf.initialize_all_variables()) for i in range(20000): batch = mnist.train.next_batch(50) if i%100 == 0: train_accuracy = accuracy.eval(feed_dict={ x:batch[0], y_: batch[1], keep_prob: 1.0}) print("step %d, training accuracy %g"%(i, train_accuracy)) train_step.run(feed_dict={x: batch[0], y_: batch[1], keep_prob: 0.5}) print("test accuracy %g"%accuracy.eval(feed_dict={ x: mnist.test.images, y_: mnist.test.labels, keep_prob: 1.0}))
nvidia-smi Sat Oct 15 21:59:52 2016 +-----------------------------------------------------------------------------+ | NVIDIA-SMI 367.57 Driver Version: 367.57 | |-------------------------------+----------------------+----------------------+ | GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC | | Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. | |===============================+======================+======================| | 0 GeForce GTX 1070 Off | 0000:01:00.0 Off | N/A | | 38% 60C P2 114W / 166W | 7797MiB / 8112MiB | 83% Default | +-------------------------------+----------------------+----------------------+
いいかんじ!
Kerasでシンプルなニューラルネットワークを実装する
はじめに
- 下記記事をKerasで実装してみたくなったのでやってみた
ライブラリーを使わずにPythonでニューラルネットワークを構築してみる - Qiita
GitHub - dennybritz/nn-from-scratch: Implementing a Neural Network from Scratch
chainerでニューラルネットワーク構築 - Qiita
パッケージ
import numpy as np import sklearn import matplotlib.pyplot as plt matplotlib.rcParams['figure.figsize'] = (10.0, 8.0) from keras.models import Sequential from keras.layers.core import Dense, Activation, Dropout from keras.regularizers import l1l2
moon データセット
np.random.seed(0) X, y = sklearn.datasets.make_moons(200, noise=0.20) plt.scatter(X[:,0], X[:,1], s=40, c=y, cmap=plt.cm.Spectral)
モデル構築
model = Sequential() model.add(Dense(20, input_shape=(2,), W_regularizer=l1l2(l1=0., l2=0. ))) model.add(Activation('relu')) model.add(Dropout(0.)) model.add(Dense(2)) model.add(Activation('softmax')) model.compile(loss='sparse_categorical_crossentropy', optimizer='Adadelta') model.fit(X, y, validation_split=0.2, batch_size=20, nb_epoch=100, verbose=1)
描画
def plot_decision_boundary(pred_func): # Set min and max values and give it some padding x_min, x_max = X[:, 0].min() - .5, X[:, 0].max() + .5 y_min, y_max = X[:, 1].min() - .5, X[:, 1].max() + .5 h = 0.01 # Generate a grid of points with distance h between them xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h)) # Predict the function value for the whole gid Z = pred_func(np.c_[xx.ravel(), yy.ravel()]) Z = Z.reshape(xx.shape) # Plot the contour and training examples plt.contourf(xx, yy, Z, cmap=plt.cm.Spectral) plt.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.Spectral) def predict(model, x): y = model.predict(x) return np.argmax(y.data, axis=1) plot_decision_boundary(lambda x: predict(model, x))
circle データセット
np.random.seed(0) X, y = sklearn.datasets.make_circles(noise=0.2, factor=0.5, random_state=1) plt.scatter(X[:,0], X[:,1], s=40, c=y, cmap=plt.cm.Spectral)
描画
ScalikeJDBCでMySQLにBulk Insertする
Bulk insert to MySQL Table by ScalikeJDBC
テーブル定義
CREATE TABLE `members` ( `id` int(11) DEFAULT NULL, `name` varchar(64) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
サンプルコード
import scalikejdbc._ object BulkInsert extends App { // initialize JDBC driver & connection pool Class.forName("com.mysql.jdbc.Driver") ConnectionPool.singleton("jdbc:mysql://hostname/daname?rewriteBatchedStatements=true", "user", "password") // ad-hoc session provider on the REPL implicit val session = AutoSession val params: Seq[Seq[Any]] = (1 to 1000).map(i => Seq(i, "user_" + i)) SQL("insert into members values (?, ?)").batch(params: _*).apply() }
- ポイントは
rewriteBatchedStatements=true
を使用すること
build.sbt
scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.scalikejdbc" %% "scalikejdbc" % "2.4.+", "ch.qos.logback" % "logback-classic" % "1.1.+", "mysql" % "mysql-connector-java" % "6.0.3" )
MySQLのgeneral logでbulk insertを確認する
$ brew services stop mysql $ sudo mysql.server start --general-log $ tail -f /usr/local/var/mysql/hostname.log
参考
gensimソースコードリーディング
はじめに
- この記事はMachine Learning Advent Calendar 2015 - Qiitaの3日目の記事です
- gensimはトピックモデルのpythonライブラリです
- gensimのチュートリアルgensim: Tutorialsに沿って、gensimソースコードリーディングをほんの少し紹介します
全体の流れ
注意
- 英語の文章を対象とします
1. 文書からコーパス生成
文章をベクトル化してコーパスを生成します。
# ログ出力の設定 import logging logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO) # 必要モジュールのインポート from gensim import corpora, models, similarities from pprint import pprint # コーパス元となる文書 documents = ["Human machine interface for lab abc computer applications", "A survey of user opinion of computer system response time", "The EPS user interface management system", "System and human system engineering testing of EPS", "Relation of user perceived response time to error measurement", "The generation of random binary unordered trees", "The intersection graph of paths in trees", "Graph minors IV Widths of trees and well quasi ordering", "Graph minors A survey"] # ストップワード stoplist = set('for a of the and to in'.split()) texts = [[word for word in document.lower().split() if word not in stoplist] for document in documents] # 2回以上出現する単語に限定する from collections import defaultdict frequency = defaultdict(int) for text in texts: for token in text: frequency[token] += 1 texts = [[token for token in text if frequency[token] > 1] for text in texts] # テキスト内容を確認 from pprint import pprint # pretty-printer pprint(texts) [['human', 'interface', 'computer'], ['survey', 'user', 'computer', 'system', 'response', 'time'], ['eps', 'user', 'interface', 'system'], ['system', 'human', 'system', 'eps'], ['user', 'response', 'time'], ['trees'], ['graph', 'trees'], ['graph', 'minors', 'trees'], ['graph', 'minors', 'survey']] # 単語と単語に与えられるIDをマッピングする dictionary = corpora.Dictionary(texts) dictionary.save('/tmp/deerwester.dict') # 保存 print(dictionary) print(dictionary.token2id)
ここまでチュートリアル通りです。
与えられた文書を元にコーパスを生成する下準備をしてます。
文書を単語に分割して、単語にIDを付与します。
英語はホワイトスペース区切りで単語分割できるから楽ちんですね。
corpora.Dictionary(texts)
から文章をbag-of-wordsでベクトル化して、コーパスを生成できます。
ベクトル化では1文章内の単語の登場回数
を利用しています。
次に、corpora.Dictionary(texts)
がどうなっているか把握するため、gensim/dictionary.py at develop · piskvorky/gensim · GitHubを見ていきます。
class Dictionary(utils.SaveLoad, Mapping): """ Dictionary encapsulates the mapping between normalized words and their integer ids. The main function is `doc2bow`, which converts a collection of words to its bag-of-words representation: a list of (word_id, word_frequency) 2-tuples. """ def __init__(self, documents=None, prune_at=2000000): """ If `documents` are given, use them to initialize Dictionary (see `add_documents()`). """ self.token2id = {} # token -> tokenId self.id2token = {} # reverse mapping for token2id; only formed on request, to save memory self.dfs = {} # document frequencies: tokenId -> in how many documents this token appeared self.num_docs = 0 # number of documents processed self.num_pos = 0 # total number of corpus positions self.num_nnz = 0 # total number of non-zeroes in the BOW matrix if documents is not None: self.add_documents(documents, prune_at=prune_at) def add_documents(self, documents, prune_at=2000000): """ Update dictionary from a collection of documents. Each document is a list of tokens = **tokenized and normalized** strings (either utf8 or unicode). This is a convenience wrapper for calling `doc2bow` on each document with `allow_update=True`, which also prunes infrequent words, keeping the total number of unique words <= `prune_at`. This is to save memory on very large inputs. To disable this pruning, set `prune_at=None`. >>> print(Dictionary(["máma mele maso".split(), "ema má máma".split()])) Dictionary(5 unique tokens) """ for docno, document in enumerate(documents): # log progress & run a regular check for pruning, once every 10k docs if docno % 10000 == 0: if prune_at is not None and len(self) > prune_at: self.filter_extremes(no_below=0, no_above=1.0, keep_n=prune_at) logger.info("adding document #%i to %s", docno, self) # update Dictionary with the document self.doc2bow(document, allow_update=True) # ignore the result, here we only care about updating token ids logger.info("built %s from %i documents (total %i corpus positions)", self, self.num_docs, self.num_pos) def doc2bow(self, document, allow_update=False, return_missing=False): """ Convert `document` (a list of words) into the bag-of-words format = list of `(token_id, token_count)` 2-tuples. Each word is assumed to be a **tokenized and normalized** string (either unicode or utf8-encoded). No further preprocessing is done on the words in `document`; apply tokenization, stemming etc. before calling this method. If `allow_update` is set, then also update dictionary in the process: create ids for new words. At the same time, update document frequencies -- for each word appearing in this document, increase its document frequency (`self.dfs`) by one. If `allow_update` is **not** set, this function is `const`, aka read-only. """ if isinstance(document, string_types): raise TypeError("doc2bow expects an array of unicode tokens on input, not a single string") # Construct (word, frequency) mapping. counter = defaultdict(int) for w in document: counter[w if isinstance(w, unicode) else unicode(w, 'utf-8')] += 1 token2id = self.token2id if allow_update or return_missing: missing = dict((w, freq) for w, freq in iteritems(counter) if w not in token2id) if allow_update: for w in missing: # new id = number of ids made so far; # NOTE this assumes there are no gaps in the id sequence! token2id[w] = len(token2id) result = dict((token2id[w], freq) for w, freq in iteritems(counter) if w in token2id) if allow_update: self.num_docs += 1 self.num_pos += sum(itervalues(counter)) self.num_nnz += len(result) # increase document count for each unique token that appeared in the document dfs = self.dfs for tokenid in iterkeys(result): dfs[tokenid] = dfs.get(tokenid, 0) + 1 # return tokenids, in ascending id order result = sorted(iteritems(result)) if return_missing: return result, missing else: return result
流れとしては、Dictionary
→ add_documents
→ doc2bow
です。
コメントにも書いてあるように doc2bow
がここのメインです。
ここでは、どの単語が何回登場するのか計算するのが目的です。
そのために、単語とIDのマッピングなどの処理があります(self.token2id
)
単語の登場回数はself.dfs
(document frequencies)に保存されます。
doc2bow
はread onlyでも使用できるようにallow_update
のオプションがあります。
いろいろ書いてありますが、下記で登場回数を更新していることがわかると思います。
for tokenid in iterkeys(result): dfs[tokenid] = dfs.get(tokenid, 0) + 1
チュートリアルに戻ります
new_doc = "Human computer interaction" # 新しい文書 new_vec = dictionary.doc2bow(new_doc.lower().split()) # ベクトル化 corpus = [dictionary.doc2bow(text) for text in texts] # コーパス生成 corpora.MmCorpus.serialize('/tmp/deerwester.mm', corpus) # ディスクに保存
新しい文書を与え、doc2bowでベクトル化できます。 また、Matrix Market形式でコーパスをディスクに保存します。
2. コーパスを元にトピックモデルを生成
コーパスを生成できました。 これを元にTF-IDFを算出するためのモデルを生成し、LSIやLDAのモデルを生成する流れを見ていきます。 ディスクに保存したコーパスを読み込み、TF-IDFのモデルを生成します。
corpus = corpora.MmCorpus('/tmp/deerwester.mm') # コーパス読み込み tfidf = models.TfidfModel(corpus) # tf idfのモデル生成
TF-IDFを簡単に紹介すれば、出現頻度の多い単語は重要視して、異なる文章間に何度も出現する単語を重要視しない的な重み付けです。
TfidfModel gensim/tfidfmodel.py at develop · piskvorky/gensim · GitHub の中を見ていきます。
idfの計算はdf2idfの中で行われてます。
def df2idf(docfreq, totaldocs, log_base=2.0, add=0.0): """ Compute default inverse-document-frequency for a term with document frequency `doc_freq`:: idf = add + log(totaldocs / doc_freq) """ return add + math.log(1.0 * totaldocs / docfreq, log_base) def precompute_idfs(wglobal, dfs, total_docs): """Precompute the inverse document frequency mapping for all terms.""" # not strictly necessary and could be computed on the fly in TfidfModel__getitem__. # this method is here just to speed things up a little. return dict((termid, wglobal(df, total_docs)) for termid, df in iteritems(dfs))
チュートリアルに戻ります。
doc_bow = [(0, 1), (2, 1)] # ベクトル化した文章を新たに定義します print(tfidf[doc_bow]) # モデルからTF-IDFを算出します corpus_tfidf = tfidf[corpus] for doc in corpus_tfidf: print(doc)
モデルを元に文書ベクトルのTF-IDFを求めています。 コーパスからTF-IDFを計算できました。 それを元にLSIやLDAなどのトピックモデルを求めることができます。
lsi = models.LsiModel(corpus_tfidf, id2word=dictionary, num_topics=2) # LSIモデル生成 corpus_lsi = lsi[corpus_tfidf] # bow->tfidf->fold-in-lsi lsi.print_topics(2) for doc in corpus_lsi: # bow->tfidf と tfidf->lsi の変換はここで行われてます print(doc)
文章量が極小なので、出力されたトピックは意味不明ですね。
gensim/lsimodel.py at develop · piskvorky/gensim · GitHub LSIモデルの中を少し見ていきます。(とほほ)
class Projection(utils.SaveLoad): def __init__(self, m, k, docs=None, use_svdlibc=False, power_iters=P2_EXTRA_ITERS, extra_dims=P2_EXTRA_DIMS): """ Construct the (U, S) projection from a corpus `docs`. The projection can be later updated by merging it with another Projection via `self.merge()`. This is the class taking care of the 'core math'; interfacing with corpora, splitting large corpora into chunks and merging them etc. is done through the higher-level `LsiModel` class. """ self.m, self.k = m, k self.power_iters = power_iters self.extra_dims = extra_dims if docs is not None: # base case decomposition: given a job `docs`, compute its decomposition, # *in-core*. if not use_svdlibc: u, s = stochastic_svd( docs, k, chunksize=sys.maxsize, num_terms=m, power_iters=self.power_iters, extra_dims=self.extra_dims) else: try: import sparsesvd except ImportError: raise ImportError("`sparsesvd` module requested but not found; run `easy_install sparsesvd`") logger.info("computing sparse SVD of %s matrix", str(docs.shape)) if not scipy.sparse.issparse(docs): docs = matutils.corpus2csc(docs) ut, s, vt = sparsesvd.sparsesvd(docs, k + 30) # ask for extra factors, because for some reason SVDLIBC sometimes returns fewer factors than requested u = ut.T del ut, vt k = clip_spectrum(s**2, self.k) self.u = u[:, :k].copy() self.s = s[:k].copy() else: self.u, self.s = None, None (省略) class LsiModel(interfaces.TransformationABC): """ Objects of this class allow building and maintaining a model for Latent Semantic Indexing (also known as Latent Semantic Analysis). The main methods are: 1. constructor, which initializes the projection into latent topics space, 2. the ``[]`` method, which returns representation of any input document in the latent space, 3. `add_documents()` for incrementally updating the model with new documents. The left singular vectors are stored in `lsi.projection.u`, singular values in `lsi.projection.s`. Right singular vectors can be reconstructed from the output of `lsi[training_corpus]`, if needed. See also FAQ [2]_. Model persistency is achieved via its load/save methods. .. [2] https://github.com/piskvorky/gensim/wiki/Recipes-&-FAQ#q4-how-do-you-output-the-u-s-vt-matrices-of-lsi """ def __init__(self, corpus=None, num_topics=200, id2word=None, chunksize=20000, decay=1.0, distributed=False, onepass=True, power_iters=P2_EXTRA_ITERS, extra_samples=P2_EXTRA_DIMS): """ `num_topics` is the number of requested factors (latent dimensions). After the model has been trained, you can estimate topics for an arbitrary, unseen document, using the ``topics = self[document]`` dictionary notation. You can also add new training documents, with ``self.add_documents``, so that training can be stopped and resumed at any time, and the LSI transformation is available at any point. If you specify a `corpus`, it will be used to train the model. See the method `add_documents` for a description of the `chunksize` and `decay` parameters. Turn `onepass` off to force a multi-pass stochastic algorithm. `power_iters` and `extra_samples` affect the accuracy of the stochastic multi-pass algorithm, which is used either internally (`onepass=True`) or as the front-end algorithm (`onepass=False`). Increasing the number of power iterations improves accuracy, but lowers performance. See [3]_ for some hard numbers. Turn on `distributed` to enable distributed computing. Example: >>> lsi = LsiModel(corpus, num_topics=10) >>> print(lsi[doc_tfidf]) # project some document into LSI space >>> lsi.add_documents(corpus2) # update LSI on additional documents >>> print(lsi[doc_tfidf]) .. [3] http://nlp.fi.muni.cz/~xrehurek/nips/rehurek_nips.pdf """ self.id2word = id2word self.num_topics = int(num_topics) self.chunksize = int(chunksize) self.decay = float(decay) if distributed: if not onepass: logger.warning("forcing the one-pass algorithm for distributed LSA") onepass = True self.onepass = onepass self.extra_samples, self.power_iters = extra_samples, power_iters if corpus is None and self.id2word is None: raise ValueError('at least one of corpus/id2word must be specified, to establish input space dimensionality') if self.id2word is None: logger.warning("no word id mapping provided; initializing from corpus, assuming identity") self.id2word = utils.dict_from_corpus(corpus) self.num_terms = len(self.id2word) else: self.num_terms = 1 + max([-1] + self.id2word.keys()) self.docs_processed = 0 self.projection = Projection(self.num_terms, self.num_topics, power_iters=self.power_iters, extra_dims=self.extra_samples) (省略) def stochastic_svd(corpus, rank, num_terms, chunksize=20000, extra_dims=None, power_iters=0, dtype=numpy.float64, eps=1e-6): """ Run truncated Singular Value Decomposition (SVD) on a sparse input. Return (U, S): the left singular vectors and the singular values of the input data stream `corpus` [4]_. The corpus may be larger than RAM (iterator of vectors). This may return less than the requested number of top `rank` factors, in case the input itself is of lower rank. The `extra_dims` (oversampling) and especially `power_iters` (power iterations) parameters affect accuracy of the decomposition. This algorithm uses `2+power_iters` passes over the input data. In case you can only afford a single pass, set `onepass=True` in :class:`LsiModel` and avoid using this function directly. The decomposition algorithm is based on **Halko, Martinsson, Tropp. Finding structure with randomness, 2009.** .. [4] If `corpus` is a scipy.sparse matrix instead, it is assumed the whole corpus fits into core memory and a different (more efficient) code path is chosen. """ (省略)
流れは、LsiModel
→ Projection
→ stochastic_svd
となります。
LsiModelの中では規模の大きいデータを処理する分散処理の仕組みが用意されています。ふむ。
Projectionでは特異値分解にstochastic_svdを使用するか、sparsesvdを使用するか決定できます。
また、文章を後に追加した時に結合するための仕組みもあります。
stochastic_svdで特異値分解を行っています。ここで次元圧縮してLSIを実現しているのですね。
3. モデルを利用して文書の類似度を算出
LSIモデルの中で新たな文章のベクトルを求めます。
doc = "Human computer interaction" vec_bow = dictionary.doc2bow(doc.lower().split()) vec_lsi = lsi[vec_bow] print(vec_lsi)
おわりに
gensimソースコードリーディングは以上となります。
gensimソースコードの上辺だけ追いましたが、ブラックボックスであったライブラリの一片が見えて楽しかったです。
みなさまも楽しんでいただけたら幸いです。
Web企業のログ基盤まわりをまとめてみた
はじめに
- Web企業のログ解析基盤系資料をまとめてみました
- SlideShare, Speaker Deck, 企業技術ブロクをもとに調査しました
- 2014年以降の資料に限定(たぶん)
- 自分用メモ
- 勢いで調べたので、結構もれてると思う
メルカリ
CyberAgent
Apache SparkでカスタムStreamingする
はじめに
Spark, SQL on Hadoop etc. Advent Calendar 2014 - Qiita 3日目の記事です。
SparkでカスタムStreamingする方法を紹介します。
TwitterやFlumeなどのSpark Streamingの活用例が下記にあります。
spark/examples/src/main/scala/org/apache/spark/examples/streaming at master · apache/spark · GitHub
spark/external at master · apache/spark · GitHub
これらは、いろいろ利用できそうですね。
一方で、オリジナルのStreaming処理を行いたい場合には、
Sparkが提供するReceiverクラスを拡張する必要があります。
この記事では、Receiverクラスを拡張して、
カスタムしたStreaming処理を行う方法について紹介します。
ざっくりとしたSparkの説明
Apache Sparkについて秒速で振り返ります
- はやい
インメモリだからHadoopのMapReduceより、はやい
- 使いやすい
Java、Scala、Pythonで書ける
並行処理を実現するための便利APIがたくさんある
- すごい
機会学習、ストリーミング処理、SQLライクな処理、グラフなんでもできる
もうなんかすごい、プラットフォームだこれは
次に、Spark Streamingについて簡単に見ていきます。
ざっくりとしたSpark Streamingの説明
Streaming処理で何ができるのか
ドカドカ流れてくるログデータ等をバッチで一括処理するのとは別に、
より短時間の処理を実現できます。
- 使いやすい
Spark Streamingでは、もちろんSparkのAPIが利用できるので、いろいろ便利です
- 耐障害性
耐障害性がある。今度はこの辺も検証してみたいです
言葉だけだと、イメージがつかめないのでコードを見ていきたいと思います。
Twitter Streaming概要
Twitter Streamingのサンプルコードを見て、Spark Streamingの雰囲気をつかみます。
spark/TwitterPopularTags.scala at master · apache/spark · GitHubから抜粋します
val sparkConf = new SparkConf().setAppName("TwitterPopularTags") val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) .map{case (topic, count) => (count, topic)} .transform(_.sortByKey(false)) // Print popular hashtags topCounts60.foreachRDD(rdd => { val topList = rdd.take(10) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} })
1分間の頻出ハッシュタグを集計するコードです。
ストリーム処理するためのAPI(TwitterUtils.createStream)を起点に、
空白で文字列を分割し、#で始まる単語を抽出し、
1分間あたりの単語数を集計しています。
直感的で分かりやすいですね。
Sparkが事前に用意しているAPIは、
Flume、Kafka、MQTT、Twitter、ZeroMQです。
spark/external at master · apache/spark · GitHub
よって、自分たちの目的にあわせたStreamin処理を行うには、
別の手段が必要になります。
オリジナルStreaming
ここから本題です。
いきなりですが、オリジナルのStreaming処理を行うには、Receiverクラスを拡張します。
ということで、Receiverクラスを見てみます。
spark/Receiver.scala at master · apache/spark · GitHub
仕様がいろいろ書かれてます。
重要なのはonStartメソッドとonStopメソッドです。これらをオーバーライドして、オリジナルのReceiverを作成します。
onStartメソッド
- データを受け取るために、スレッドとかのリソースはここで初期化する
- Spark上で処理できるように、受け取ったデータはstoreメソッドに渡す
- Non Blockingである必要がある
しかし、ここの公式ドキュメントはNon Blockingでないんですよね。
Spark Streaming Custom Receivers - Spark 1.1.1 Documentation
onStopメソッド
- Receiverが終了したときに呼ばれる
- 終了処理を書く
Sparkのサンプルは全体的にonStop処理が軽視されているような気がします。
そういうのって良くないですよね。でもすいません、今回は範囲外とさせてください。
今回やりたいこと
telnetなどのクライアントでSpark Streamingに接続し、
送信したテキストの出現回数を集計したいと思います。
Non BlockingはNettyで実現します。むしろNettyをやりたかった。
登場人物
- PrintReceiver : ネットワーク経由でテキストを受け取る。Non BlockingはNettyで実現する
- PrintUtils : Receiverのためのインターフェース的な役割です
- TestPrintStreaming : 実行クラス
PrintReceiver
import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.storage._ import org.apache.spark.Logging import org.apache.spark.streaming.receiver.Receiver import java.net.ServerSocket import java.io.BufferedReader import java.io.InputStreamReader import io.netty.bootstrap.ServerBootstrap import io.netty.buffer.ByteBuf import io.netty.channel._ import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.logging._ class PrintInputDStream(@transient ssc_ : StreamingContext, storageLevel: StorageLevel) extends ReceiverInputDStream[String](ssc_) { override def getReceiver(): Receiver[String] = { new PrintReceiver(storageLevel) } } class PrintReceiver(storageLevel: StorageLevel) extends Receiver[String](storageLevel) with Logging { def onStart() { // メインはNettyコード val bossGroup = new NioEventLoopGroup(1); val workerGroup = new NioEventLoopGroup(); try { val bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(classOf[NioServerSocketChannel]) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer[SocketChannel] { override def initChannel(socketChannel: SocketChannel) { val pipeline = socketChannel.pipeline pipeline.addLast(new ChannelHandlerAdapter { override def channelRead(ctx: ChannelHandlerContext, msg: Object) { val buf = msg.asInstanceOf[ByteBuf] val str = new StringBuilder while (buf.isReadable) { // 文字を文字列にする str.append(buf.readByte.asInstanceOf[Char]); } store(str.toString.trim) // 受け取った文字列をSparkに渡す } }) } }) val f = bootstrap.bind(60000).sync(); // ポート番号 f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } def onStop() { } }
PrintUtils
import java.net.InetSocketAddress import org.apache.spark.annotation.Experimental import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream object PrintUtils { def createStream (ssc: StreamingContext, storageLevel: StorageLevel ): ReceiverInputDStream[String] = { new PrintInputDStream(ssc, storageLevel) } }
TestPrintStreaming
import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel import org.apache.log4j.Logger import org.apache.log4j.Level import main.scala.spark.streaming.print.PrintUtils object TestPrintStreaming { def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName("PrintStreaming") val ssc = new StreamingContext(conf, Seconds(1)) val nc = PrintUtils.createStream(ssc, StorageLevel.MEMORY_AND_DISK_SER_2) val counts = nc.countByValueAndWindow(Seconds(60 * 1), Seconds(1)) counts.print ssc.checkpoint("checkpoint") ssc.start() } }
実行結果
------------------------------------------- Time: 1417525094000 ms ------------------------------------------- (hello,6) (hellohellohello,3) (hellohellohellohellohellohello,1) ------------------------------------------- Time: 1417525095000 ms ------------------------------------------- (hello,6) (hellohellohello,3) (hellohellohellohellohellohello,1) ------------------------------------------- Time: 1417525096000 ms ------------------------------------------- (hello,6) (hellohellohello,3) (hellohellohellohellohellohello,1) 14/12/02 21:58:16 WARN storage.BlockManager: Block input-0-1417525096000 already exists on this machine; not re-adding it
まとめ
とまあ、こんな感じでカスタムStreamingしてみました。
いろいろ世界が広がりそうで、楽しいです。
簡単にNon Blockingを実現するためにNettyを用いましたが、
Nettyの調査に一番時間を要しちゃいました。
あと、ActorReceiverなんてものあります。
spark/ActorReceiver.scala at master · apache/spark · GitHub