読者です 読者をやめる 読者になる 読者になる

EcilpseでSpark Streaming

Spark

はじめに

Scala IDE for Eclipseを利用して、Apache Sparkで、Twitter Streamingする。

バージョン

Scala IDE for Eclipseはこちらから2.10.4をダウンロード
Download Scala IDE for Eclipse - Scala IDE for Eclipse
f:id:moyomot:20140921195602p:plain

Sparkのダウンロードはこちらから1.1.0 for Hadoop 2.4をダウンロード
Downloads | Apache Spark
spark-assembly-1.1.0-hadoop2.4.0.jarを利用する
f:id:moyomot:20140921195610p:plain

Spark Streaming Twitterも別途ダウンロード
バージョンはApache Sparkにあわせる(1.1.0)
Maven Repository: org.apache.spark » spark-streaming-twitter_2.10 » 1.1.0

twitter4j 3.0.3はこちらからダウンロード
http://twitter4j.org/archive/
下記サイトに3.0.3を使用せよと指定している
Spark Streaming Programming Guide - Spark 1.1.0 Documentation

Twitter: Spark Streaming’s TwitterUtils uses Twitter4j 3.0.3 to get the public stream of tweets using Twitter’s Streaming API.

そろったライブラリをもとに、Scala IDEでプロジェクトを下記のように作成する
f:id:moyomot:20140921195607p:plain

つぎにTwitter Streaming用のソースコードを作成する

Twitter Streaming

package twitter

import org.apache.spark.SparkConf
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.log4j.Logger
import org.apache.log4j.Level

object TwitterStreaming {
  def main(args: Array[String]){
    Logger.getLogger("org").setLevel(Level.WARN)
    System.setProperty("twitter4j.oauth.consumerKey", "XXX")
    System.setProperty("twitter4j.oauth.consumerSecret", "XXX")
    System.setProperty("twitter4j.oauth.accessToken", "XXX")
    System.setProperty("twitter4j.oauth.accessTokenSecret", "XXX")
    val conf = new SparkConf().setMaster("local[2]").setAppName("TwitterStreaming")
    val ssc = new StreamingContext(conf, Seconds(1))
    
    val tweets = TwitterUtils.createStream(ssc, None)
    val statuses = tweets.map(status => status.getText())
//    statuses.print()

    val words = statuses.flatMap(status => status.split(" "))
    val hashtags = words.filter(word => word.startsWith("#"))
    val counts = hashtags.countByValueAndWindow(Seconds(60 * 5), Seconds(1))
                         .map { case(tag, count) => (count, tag) }
    counts.foreach(rdd => println(rdd.top(10).mkString("\n")))

    ssc.checkpoint("checkpoint")
    ssc.start()
    ssc.awaitTermination()
  }
}

Spark Streamingのイメージ(超簡易訳)

Spark Streamingは高度に抽象化したDStreamと呼ばれる離散的ストーリムを用意している。これでデータの連続したストリームを実現する。
http://spark.apache.org/docs/latest/img/streaming-flow.png

具体的にDStreamは、Spark特有のデータ構造RDDの連続した構造からなる。
http://spark.apache.org/docs/latest/img/streaming-dstream.png

Window Operationは、5分間の人気ハッシュタグランキングを計算時など、
一定間隔のストリーミングバッチに利用することができる。

http://spark.apache.org/docs/latest/img/streaming-dstream-window.png


Spark Streaming Programming Guide - Spark 1.1.0 Documentationから画像を引用