Apache SparkでTwitter Streaming
はじめに
Apache Sparkを利用してTwitter Streamingを取得する。
題材はampcampのHands-on Exercisesを活用する。
Hands-on Exercisesはシリーズ物でEC2で稼働させたり、クラスタやHDFSが必要みたいだけど、それらなしでがんばる。
前提条件
必要なもの
- Sparkのインストール
- Scala
- ダウンロードして適当なディレクトリで解凍、パスを通す
- TwitterのConsumer key (API key), Consumer secret (API secret), Access token, Access token secret
- ここで取得できますよ
- amplab/trainingが提供しているソースコード
- ダウンロードして適当なディレクトリで解凍する、以降ここが作業ディレクトリ
- Javaのバージョンは1.8.0_05だとうまく動かないので1.7.0_60へダウングレード
不必要なもの
export JAVA_HOME=`/System/Library/Frameworks/JavaVM.framework/Versions/A/Commands/java_home -v "1.7"` export SCALA_HOME=/usr/local/bin/scala-2.11.1 export PATH=${JAVA_HOME}/bin:${SCALA_HOME}/bin/:$PATH
ソースコード
- build.sbt
name := "Tutorial" scalaVersion := "2.10.0" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % "0.9.0-incubating", "org.apache.spark" %% "spark-streaming-twitter" % "0.9.0-incubating" )
提供ファイルのScala Versionは2.10。
それを2.10.0に変更する。
- Tutorial.scala
import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.StreamingContext._ import TutorialHelper._ object Tutorial { def main(args: Array[String]) { // Location of the Spark directory val sparkHome = "インストールしたSparkディレクトリを指定" // URL of the Spark cluster val sparkUrl = "local" // Location of the required JAR files val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar" // HDFS directory for checkpointing // ローカルディレクトリに変更する val checkpointDir = "./hdfs/checkpoint/" // Configure Twitter credentials using twitter.txt TutorialHelper.configureTwitterCredentials() val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile)) val tweets = TwitterUtils.createStream(ssc, None) val statuses = tweets.map(status => status.getText()) statuses.print() ssc.checkpoint(checkpointDir) ssc.start() } }
- twitter.txt
Twitter情報を埋める
consumerKey = consumerSecret = accessToken = accessTokenSecret =
実行
$ sbt/sbt package run
まとめ
とりあえずOK