Apache SparkでTwitter Streaming

はじめに

Apache Sparkを利用してTwitter Streamingを取得する。
題材はampcampHands-on Exercisesを活用する。
Hands-on Exercisesはシリーズ物でEC2で稼働させたり、クラスタHDFSが必要みたいだけど、それらなしでがんばる。

前提条件

必要なもの

  • Sparkのインストール
  • Scala
    • ダウンロードして適当なディレクトリで解凍、パスを通す
  • TwitterのConsumer key (API key), Consumer secret (API secret), Access token, Access token secret
    • ここで取得できますよ
  • amplab/trainingが提供しているソースコード
    • ダウンロードして適当なディレクトリで解凍する、以降ここが作業ディレクトリ
  • Javaのバージョンは1.8.0_05だとうまく動かないので1.7.0_60へダウングレード

不必要なもの

JavaScalaのパスメモ

export JAVA_HOME=`/System/Library/Frameworks/JavaVM.framework/Versions/A/Commands/java_home -v "1.7"`
export SCALA_HOME=/usr/local/bin/scala-2.11.1
export PATH=${JAVA_HOME}/bin:${SCALA_HOME}/bin/:$PATH
ソースコード
  • build.sbt
name := "Tutorial"

scalaVersion := "2.10.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "0.9.0-incubating",
  "org.apache.spark" %% "spark-streaming-twitter" % "0.9.0-incubating"
)

提供ファイルのScala Versionは2.10。
それを2.10.0に変更する。

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._

object Tutorial {
  def main(args: Array[String]) {
    
    // Location of the Spark directory 
    val sparkHome = "インストールしたSparkディレクトリを指定"
    
    // URL of the Spark cluster
    val sparkUrl = "local"

    // Location of the required JAR files 
    val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"

    // HDFS directory for checkpointing
    // ローカルディレクトリに変更する
    val checkpointDir = "./hdfs/checkpoint/"

    // Configure Twitter credentials using twitter.txt
    TutorialHelper.configureTwitterCredentials()

    val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile))
    val tweets = TwitterUtils.createStream(ssc, None)
    val statuses = tweets.map(status => status.getText())
    statuses.print()
    ssc.checkpoint(checkpointDir)
    ssc.start()
  }
}

Twitter情報を埋める

consumerKey = 
consumerSecret = 
accessToken = 
accessTokenSecret = 
実行
$ sbt/sbt package run
まとめ

とりあえずOK