Apache FlumeとSpark Streaming
はじめに
Flumeから流れてきたデータをSpark Streamingする。
実現したいことのイメージ。
- netcatサーバーでデータ生成
- Flumeはクライアントからデータを受け取り、Sparkに流し込む
- Spark Streamingでデータを集計
環境
- Scala IDE for Ecipse : 2.10.4
- flume-ng-sdk-1.3.1.jar
- spark-assembly-1.1.0-hadoop2.4.0.jar
- spark-streaming-flume_2.10-1.1.0.jar
- spark-streaming-flume-sink_2.10-1.1.0.jar
Spark Streamingを開始する
ソースコードはSparkのサンプルコードをもとに作成。
spark/FlumeEventCount.scala at master · apache/spark · GitHub
package streaming import org.apache.spark.streaming.Milliseconds import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.storage.StorageLevel import org.apache.log4j.Logger import org.apache.log4j.Level object FlumeStreaming { def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.WARN) val batchInterval = Milliseconds(2000) val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumeEventCount") val ssc = new StreamingContext(sparkConf, batchInterval) val stream = FlumeUtils.createStream(ssc, "localhost", 60000) stream.count().map(cnt => "Received " + cnt + " flume events.").print() ssc.start() ssc.awaitTermination() } }
Flumeを起動する
- ダウンロード&適当な場所で解凍
- 設定ファイル作成
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 60000 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- Flume起動
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
telnetで接続し、データを流してみる
$ telnet localhost 44444 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. data1 OK data1 OK
------------------------------------------- Time: 1414284566000 ms ------------------------------------------- Received 0 flume events. 14/10/26 09:49:26 WARN BlockManager: Block input-0-1414284566000 already exists on this machine; not re-adding it ------------------------------------------- Time: 1414284568000 ms ------------------------------------------- Received 2 flume events. ------------------------------------------- Time: 1414284570000 ms ------------------------------------------- Received 0 flume events.