Apache SparkでカスタムStreamingする
はじめに
Spark, SQL on Hadoop etc. Advent Calendar 2014 - Qiita 3日目の記事です。
SparkでカスタムStreamingする方法を紹介します。
TwitterやFlumeなどのSpark Streamingの活用例が下記にあります。
spark/examples/src/main/scala/org/apache/spark/examples/streaming at master · apache/spark · GitHub
spark/external at master · apache/spark · GitHub
これらは、いろいろ利用できそうですね。
一方で、オリジナルのStreaming処理を行いたい場合には、
Sparkが提供するReceiverクラスを拡張する必要があります。
この記事では、Receiverクラスを拡張して、
カスタムしたStreaming処理を行う方法について紹介します。
ざっくりとしたSparkの説明
Apache Sparkについて秒速で振り返ります
- はやい
インメモリだからHadoopのMapReduceより、はやい
- 使いやすい
Java、Scala、Pythonで書ける
並行処理を実現するための便利APIがたくさんある
- すごい
機会学習、ストリーミング処理、SQLライクな処理、グラフなんでもできる
もうなんかすごい、プラットフォームだこれは
次に、Spark Streamingについて簡単に見ていきます。
ざっくりとしたSpark Streamingの説明
Streaming処理で何ができるのか
ドカドカ流れてくるログデータ等をバッチで一括処理するのとは別に、
より短時間の処理を実現できます。
- 使いやすい
Spark Streamingでは、もちろんSparkのAPIが利用できるので、いろいろ便利です
- 耐障害性
耐障害性がある。今度はこの辺も検証してみたいです
言葉だけだと、イメージがつかめないのでコードを見ていきたいと思います。
Twitter Streaming概要
Twitter Streamingのサンプルコードを見て、Spark Streamingの雰囲気をつかみます。
spark/TwitterPopularTags.scala at master · apache/spark · GitHubから抜粋します
val sparkConf = new SparkConf().setAppName("TwitterPopularTags") val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) .map{case (topic, count) => (count, topic)} .transform(_.sortByKey(false)) // Print popular hashtags topCounts60.foreachRDD(rdd => { val topList = rdd.take(10) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} })
1分間の頻出ハッシュタグを集計するコードです。
ストリーム処理するためのAPI(TwitterUtils.createStream)を起点に、
空白で文字列を分割し、#で始まる単語を抽出し、
1分間あたりの単語数を集計しています。
直感的で分かりやすいですね。
Sparkが事前に用意しているAPIは、
Flume、Kafka、MQTT、Twitter、ZeroMQです。
spark/external at master · apache/spark · GitHub
よって、自分たちの目的にあわせたStreamin処理を行うには、
別の手段が必要になります。
オリジナルStreaming
ここから本題です。
いきなりですが、オリジナルのStreaming処理を行うには、Receiverクラスを拡張します。
ということで、Receiverクラスを見てみます。
spark/Receiver.scala at master · apache/spark · GitHub
仕様がいろいろ書かれてます。
重要なのはonStartメソッドとonStopメソッドです。これらをオーバーライドして、オリジナルのReceiverを作成します。
onStartメソッド
- データを受け取るために、スレッドとかのリソースはここで初期化する
- Spark上で処理できるように、受け取ったデータはstoreメソッドに渡す
- Non Blockingである必要がある
しかし、ここの公式ドキュメントはNon Blockingでないんですよね。
Spark Streaming Custom Receivers - Spark 1.1.1 Documentation
onStopメソッド
- Receiverが終了したときに呼ばれる
- 終了処理を書く
Sparkのサンプルは全体的にonStop処理が軽視されているような気がします。
そういうのって良くないですよね。でもすいません、今回は範囲外とさせてください。
今回やりたいこと
telnetなどのクライアントでSpark Streamingに接続し、
送信したテキストの出現回数を集計したいと思います。
Non BlockingはNettyで実現します。むしろNettyをやりたかった。
登場人物
- PrintReceiver : ネットワーク経由でテキストを受け取る。Non BlockingはNettyで実現する
- PrintUtils : Receiverのためのインターフェース的な役割です
- TestPrintStreaming : 実行クラス
PrintReceiver
import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.storage._ import org.apache.spark.Logging import org.apache.spark.streaming.receiver.Receiver import java.net.ServerSocket import java.io.BufferedReader import java.io.InputStreamReader import io.netty.bootstrap.ServerBootstrap import io.netty.buffer.ByteBuf import io.netty.channel._ import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.logging._ class PrintInputDStream(@transient ssc_ : StreamingContext, storageLevel: StorageLevel) extends ReceiverInputDStream[String](ssc_) { override def getReceiver(): Receiver[String] = { new PrintReceiver(storageLevel) } } class PrintReceiver(storageLevel: StorageLevel) extends Receiver[String](storageLevel) with Logging { def onStart() { // メインはNettyコード val bossGroup = new NioEventLoopGroup(1); val workerGroup = new NioEventLoopGroup(); try { val bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(classOf[NioServerSocketChannel]) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer[SocketChannel] { override def initChannel(socketChannel: SocketChannel) { val pipeline = socketChannel.pipeline pipeline.addLast(new ChannelHandlerAdapter { override def channelRead(ctx: ChannelHandlerContext, msg: Object) { val buf = msg.asInstanceOf[ByteBuf] val str = new StringBuilder while (buf.isReadable) { // 文字を文字列にする str.append(buf.readByte.asInstanceOf[Char]); } store(str.toString.trim) // 受け取った文字列をSparkに渡す } }) } }) val f = bootstrap.bind(60000).sync(); // ポート番号 f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } def onStop() { } }
PrintUtils
import java.net.InetSocketAddress import org.apache.spark.annotation.Experimental import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream object PrintUtils { def createStream (ssc: StreamingContext, storageLevel: StorageLevel ): ReceiverInputDStream[String] = { new PrintInputDStream(ssc, storageLevel) } }
TestPrintStreaming
import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel import org.apache.log4j.Logger import org.apache.log4j.Level import main.scala.spark.streaming.print.PrintUtils object TestPrintStreaming { def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName("PrintStreaming") val ssc = new StreamingContext(conf, Seconds(1)) val nc = PrintUtils.createStream(ssc, StorageLevel.MEMORY_AND_DISK_SER_2) val counts = nc.countByValueAndWindow(Seconds(60 * 1), Seconds(1)) counts.print ssc.checkpoint("checkpoint") ssc.start() } }
実行結果
------------------------------------------- Time: 1417525094000 ms ------------------------------------------- (hello,6) (hellohellohello,3) (hellohellohellohellohellohello,1) ------------------------------------------- Time: 1417525095000 ms ------------------------------------------- (hello,6) (hellohellohello,3) (hellohellohellohellohellohello,1) ------------------------------------------- Time: 1417525096000 ms ------------------------------------------- (hello,6) (hellohellohello,3) (hellohellohellohellohellohello,1) 14/12/02 21:58:16 WARN storage.BlockManager: Block input-0-1417525096000 already exists on this machine; not re-adding it
まとめ
とまあ、こんな感じでカスタムStreamingしてみました。
いろいろ世界が広がりそうで、楽しいです。
簡単にNon Blockingを実現するためにNettyを用いましたが、
Nettyの調査に一番時間を要しちゃいました。
あと、ActorReceiverなんてものあります。
spark/ActorReceiver.scala at master · apache/spark · GitHub