EcilpseでSpark Streaming
バージョン
Scala IDE for Eclipseはこちらから2.10.4をダウンロード
Download Scala IDE for Eclipse - Scala IDE for Eclipse
Sparkのダウンロードはこちらから1.1.0 for Hadoop 2.4をダウンロード
Downloads | Apache Spark
spark-assembly-1.1.0-hadoop2.4.0.jarを利用する
Spark Streaming Twitterも別途ダウンロード
バージョンはApache Sparkにあわせる(1.1.0)
Maven Repository: org.apache.spark » spark-streaming-twitter_2.10 » 1.1.0
twitter4j 3.0.3はこちらからダウンロード
http://twitter4j.org/archive/
下記サイトに3.0.3を使用せよと指定している
Spark Streaming Programming Guide - Spark 1.1.0 Documentation
Twitter: Spark Streaming’s TwitterUtils uses Twitter4j 3.0.3 to get the public stream of tweets using Twitter’s Streaming API.
Twitter Streaming
package twitter import org.apache.spark.SparkConf import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.log4j.Logger import org.apache.log4j.Level object TwitterStreaming { def main(args: Array[String]){ Logger.getLogger("org").setLevel(Level.WARN) System.setProperty("twitter4j.oauth.consumerKey", "XXX") System.setProperty("twitter4j.oauth.consumerSecret", "XXX") System.setProperty("twitter4j.oauth.accessToken", "XXX") System.setProperty("twitter4j.oauth.accessTokenSecret", "XXX") val conf = new SparkConf().setMaster("local[2]").setAppName("TwitterStreaming") val ssc = new StreamingContext(conf, Seconds(1)) val tweets = TwitterUtils.createStream(ssc, None) val statuses = tweets.map(status => status.getText()) // statuses.print() val words = statuses.flatMap(status => status.split(" ")) val hashtags = words.filter(word => word.startsWith("#")) val counts = hashtags.countByValueAndWindow(Seconds(60 * 5), Seconds(1)) .map { case(tag, count) => (count, tag) } counts.foreach(rdd => println(rdd.top(10).mkString("\n"))) ssc.checkpoint("checkpoint") ssc.start() ssc.awaitTermination() } }
Spark Streamingのイメージ(超簡易訳)
Spark Streamingは高度に抽象化したDStreamと呼ばれる離散的ストーリムを用意している。これでデータの連続したストリームを実現する。
具体的にDStreamは、Spark特有のデータ構造RDDの連続した構造からなる。
Window Operationは、5分間の人気ハッシュタグランキングを計算時など、
一定間隔のストリーミングバッチに利用することができる。
Spark Streaming Programming Guide - Spark 1.1.0 Documentationから画像を引用