时间:2021-07-01 10:21:17 帮助过:37人阅读
代码如下:
package com.dt.spark.streaming
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Duration}
/**
 * 使用SparkStreaming结合SparkSQL对日志进行分析。
 * 假设电商网站点击日志格式(简化)如下:
 * userid,itemId,clickTime
 * 需求:处理10分钟内item点击次数排序Top10,并且将商品名称显示出来。商品itemId与商品名称的对应关系存放在MySQL数据库中
 * Created by dinglq on 2016/5/4.
 */
object LogAnalyzerStreamingSQL {
  val WINDOW_LENGTH = new Duration(600 * 1000)
  val SLIDE_INTERVAL = new Duration(10 * 1000)
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    //从数据库中加载itemInfo表
    val itemInfoDF = sqlContext.read.format("jdbc").options(Map(
      "url"-> "jdbc:mysql://spark-master:3306/spark",
      "driver"->"com.mysql.jdbc.Driver",
      "dbtable"->"iteminfo",
      "user"->"root",
      "password"-> "vincent"
      )).load()
    itemInfoDF.registerTempTable("itemInfo")
    val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)
    val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming")
    val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache()
    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)
    windowDStream.foreachRDD(accessLogs => {
      if (accessLogs.isEmpty()) {
        println("No logs received in this time interval")
      } else {
        accessLogs.toDF().registerTempTable("accessLogs")
        val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " +
          " (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " +
          " ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 "
        val topTenClickItemLast10Minus = sqlContext.sql(sqlStr)
        // Persist top ten table for this window to HDFS as parquet file
        topTenClickItemLast10Minus.show()
      }
    })
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
case class AccessLog(userId: String, itemId: String, clickTime: String) {
}
object AccessLog {
  def parseLogLine(log: String): AccessLog = {
    val logInfo = log.split(",")
    if (logInfo.length == 3) {
      AccessLog(logInfo(0),logInfo(1), logInfo(2))
    }
    else {
      AccessLog("0","0","0")
    }
  }
}MySQL中表的内容如下:
mysql> select * from spark.iteminfo; +--------+----------+ | itemid | itemname | +--------+----------+ | 001 | phone | | 002 | computer | | 003 | TV | +--------+----------+ 3 rows in set (0.00 sec)
在D创建目录logs_incoming
运行Spark Streaming 程序。
新建文件,内容如下:
0001,001,2016-05-04 22:10:20 0002,001,2016-05-04 22:10:21 0003,001,2016-05-04 22:10:22 0004,002,2016-05-04 22:10:23 0005,002,2016-05-04 22:10:24 0006,001,2016-05-04 22:10:25 0007,002,2016-05-04 22:10:26 0008,001,2016-05-04 22:10:27 0009,003,2016-05-04 22:10:28 0010,003,2016-05-04 22:10:29 0011,001,2016-05-04 22:10:30 0012,003,2016-05-04 22:10:31 0013,003,2016-05-04 22:10:32
将文件保存到目录logs_incoming 中,观察Spark程序的输出:
+------+--------+---+ |itemid|itemname|cnt| +------+--------+---+ | 001| phone| 6| | 003| TV| 4| | 002|computer| 3| +------+--------+---+
备注:
1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
本文出自 “叮咚” 博客,请务必保留此出处http://lqding.blog.51cto.com/9123978/1770198
第97课:Spark Streaming 结合Spark SQL 案例
标签:spark streaming spark sql