Apache FlumeとSpark Streaming

はじめに

Flumeから流れてきたデータをSpark Streamingする。
実現したいことのイメージ。

f:id:moyomot:20141026093724p:plain

  1. netcatサーバーでデータ生成
  2. Flumeはクライアントからデータを受け取り、Sparkに流し込む
  3. Spark Streamingでデータを集計

環境

  • Scala IDE for Ecipse : 2.10.4
  • flume-ng-sdk-1.3.1.jar
  • spark-assembly-1.1.0-hadoop2.4.0.jar
  • spark-streaming-flume_2.10-1.1.0.jar
  • spark-streaming-flume-sink_2.10-1.1.0.jar

f:id:moyomot:20141025134029p:plain

Spark Streamingを開始する

ソースコードはSparkのサンプルコードをもとに作成。
spark/FlumeEventCount.scala at master · apache/spark · GitHub

package streaming

import org.apache.spark.streaming.Milliseconds
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.Logger
import org.apache.log4j.Level

object FlumeStreaming {
  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.WARN)

    val batchInterval = Milliseconds(2000)
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumeEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    val stream = FlumeUtils.createStream(ssc, "localhost", 60000)
    stream.count().map(cnt => "Received " + cnt + " flume events.").print()
    ssc.start()
    ssc.awaitTermination()
        
  }
}

Flumeを起動する

  • ダウンロード&適当な場所で解凍

Download — Apache Flume

  • 設定ファイル作成
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 60000

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • Flume起動
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

telnetで接続し、データを流してみる

$ telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
data1
OK
data1
OK
-------------------------------------------
Time: 1414284566000 ms
-------------------------------------------
Received 0 flume events.

14/10/26 09:49:26 WARN BlockManager: Block input-0-1414284566000 already exists on this machine; not re-adding it
-------------------------------------------
Time: 1414284568000 ms
-------------------------------------------
Received 2 flume events.

-------------------------------------------
Time: 1414284570000 ms
-------------------------------------------
Received 0 flume events.

まとめ

  • データをFlumeからSparkに流し込み、Streaming集計することができた
  • Flume内ではNetcatをlistenモードで起動している
  • FlumeからSparkへのデータやりとりはAvroを利用している
  • AvroはMessagePackのようなデータのやりとりのためのプロトコル
  • Twitter StreamingとFlume Streamingのソースコード見て、いろいろできることがわかったので、今度はオリジナルのStreamingクラスを作成する