Apache SparkでカスタムStreamingする

はじめに

Spark, SQL on Hadoop etc. Advent Calendar 2014 - Qiita 3日目の記事です。
SparkでカスタムStreamingする方法を紹介します。

TwitterやFlumeなどのSpark Streamingの活用例が下記にあります。
spark/examples/src/main/scala/org/apache/spark/examples/streaming at master · apache/spark · GitHub
spark/external at master · apache/spark · GitHub
これらは、いろいろ利用できそうですね。

一方で、オリジナルのStreaming処理を行いたい場合には、
Sparkが提供するReceiverクラスを拡張する必要があります。
この記事では、Receiverクラスを拡張して、
カスタムしたStreaming処理を行う方法について紹介します。

ざっくりとしたSparkの説明

Apache Sparkについて秒速で振り返ります

  • はやい

インメモリだからHadoopMapReduceより、はやい

  • 使いやすい

JavaScalaPythonで書ける
並行処理を実現するための便利APIがたくさんある

  • すごい

機会学習、ストリーミング処理、SQLライクな処理、グラフなんでもできる
もうなんかすごい、プラットフォームだこれは

次に、Spark Streamingについて簡単に見ていきます。

ざっくりとしたSpark Streamingの説明

Streaming処理で何ができるのか
ドカドカ流れてくるログデータ等をバッチで一括処理するのとは別に、
より短時間の処理を実現できます。

  • 使いやすい

Spark Streamingでは、もちろんSparkのAPIが利用できるので、いろいろ便利です

  • 耐障害性

耐障害性がある。今度はこの辺も検証してみたいです

言葉だけだと、イメージがつかめないのでコードを見ていきたいと思います。

Twitter Streaming概要

Twitter Streamingのサンプルコードを見て、Spark Streamingの雰囲気をつかみます。
spark/TwitterPopularTags.scala at master · apache/spark · GitHubから抜粋します

val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters)

val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
                 .map{case (topic, count) => (count, topic)}
                 .transform(_.sortByKey(false))
// Print popular hashtags
topCounts60.foreachRDD(rdd => {
  val topList = rdd.take(10)
  println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
  topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})

1分間の頻出ハッシュタグを集計するコードです。
ストリーム処理するためのAPI(TwitterUtils.createStream)を起点に、
空白で文字列を分割し、#で始まる単語を抽出し、
1分間あたりの単語数を集計しています。
直感的で分かりやすいですね。

Sparkが事前に用意しているAPIは、
Flume、Kafka、MQTT、Twitter、ZeroMQです。
spark/external at master · apache/spark · GitHub
よって、自分たちの目的にあわせたStreamin処理を行うには、
別の手段が必要になります。

オリジナルStreaming

ここから本題です。
いきなりですが、オリジナルのStreaming処理を行うには、Receiverクラスを拡張します。
ということで、Receiverクラスを見てみます。
spark/Receiver.scala at master · apache/spark · GitHub
仕様がいろいろ書かれてます。
重要なのはonStartメソッドとonStopメソッドです。これらをオーバーライドして、オリジナルのReceiverを作成します。

onStartメソッド
  1. データを受け取るために、スレッドとかのリソースはここで初期化する
  2. Spark上で処理できるように、受け取ったデータはstoreメソッドに渡す
  3. Non Blockingである必要がある

しかし、ここの公式ドキュメントはNon Blockingでないんですよね。
Spark Streaming Custom Receivers - Spark 1.1.1 Documentation

onStopメソッド
  1. Receiverが終了したときに呼ばれる
  2. 終了処理を書く

Sparkのサンプルは全体的にonStop処理が軽視されているような気がします。
そういうのって良くないですよね。でもすいません、今回は範囲外とさせてください。

今回やりたいこと

f:id:moyomot:20141202213135p:plain
telnetなどのクライアントでSpark Streamingに接続し、
送信したテキストの出現回数を集計したいと思います。
Non BlockingはNettyで実現します。むしろNettyをやりたかった。

登場人物
  1. PrintReceiver : ネットワーク経由でテキストを受け取る。Non BlockingはNettyで実現する
  2. PrintUtils : Receiverのためのインターフェース的な役割です
  3. TestPrintStreaming : 実行クラス
PrintReceiver
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage._
import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.Receiver
import java.net.ServerSocket
import java.io.BufferedReader
import java.io.InputStreamReader
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel._
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging._

class PrintInputDStream(@transient ssc_ : StreamingContext, storageLevel: StorageLevel)
  extends ReceiverInputDStream[String](ssc_) {
  override def getReceiver(): Receiver[String] = {
    new PrintReceiver(storageLevel)
  }
}

class PrintReceiver(storageLevel: StorageLevel)
  extends Receiver[String](storageLevel) with Logging {
  def onStart() {
    // メインはNettyコード
    val bossGroup = new NioEventLoopGroup(1);
    val workerGroup = new NioEventLoopGroup();
    try {
      val bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(classOf[NioServerSocketChannel])
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer[SocketChannel] {

          override def initChannel(socketChannel: SocketChannel) {
            val pipeline = socketChannel.pipeline
            pipeline.addLast(new ChannelHandlerAdapter {
              override def channelRead(ctx: ChannelHandlerContext, msg: Object) {
                val buf = msg.asInstanceOf[ByteBuf]
                val str = new StringBuilder
                while (buf.isReadable) { // 文字を文字列にする
                  str.append(buf.readByte.asInstanceOf[Char]);
                }
               store(str.toString.trim) // 受け取った文字列をSparkに渡す
              }
            })
          }
        })
      val f = bootstrap.bind(60000).sync(); // ポート番号
      f.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  def onStop() {
  }
}
PrintUtils
import java.net.InetSocketAddress

import org.apache.spark.annotation.Experimental
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object PrintUtils {
  def createStream (ssc: StreamingContext, storageLevel: StorageLevel
	): ReceiverInputDStream[String] = {
	  new PrintInputDStream(ssc, storageLevel)
  }
}
TestPrintStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.Logger
import org.apache.log4j.Level
import main.scala.spark.streaming.print.PrintUtils

object TestPrintStreaming {
  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.WARN)
    val conf = new SparkConf().setMaster("local[2]").setAppName("PrintStreaming")
    val ssc = new StreamingContext(conf, Seconds(1))
    val nc = PrintUtils.createStream(ssc, StorageLevel.MEMORY_AND_DISK_SER_2)
    val counts = nc.countByValueAndWindow(Seconds(60 * 1), Seconds(1))
    counts.print
    ssc.checkpoint("checkpoint")
    ssc.start()
  }
}
実行結果
-------------------------------------------
Time: 1417525094000 ms
-------------------------------------------
(hello,6)
(hellohellohello,3)
(hellohellohellohellohellohello,1)

-------------------------------------------
Time: 1417525095000 ms
-------------------------------------------
(hello,6)
(hellohellohello,3)
(hellohellohellohellohellohello,1)

-------------------------------------------
Time: 1417525096000 ms
-------------------------------------------
(hello,6)
(hellohellohello,3)
(hellohellohellohellohellohello,1)

14/12/02 21:58:16 WARN storage.BlockManager: Block input-0-1417525096000 already exists on this machine; not re-adding it

まとめ

とまあ、こんな感じでカスタムStreamingしてみました。
いろいろ世界が広がりそうで、楽しいです。
簡単にNon Blockingを実現するためにNettyを用いましたが、
Nettyの調査に一番時間を要しちゃいました。

あと、ActorReceiverなんてものあります。
spark/ActorReceiver.scala at master · apache/spark · GitHub