侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

Spark机器学习· 实时机器学习

2023-12-02 星期六 / 0 评论 / 0 点赞 / 62 阅读 / 3964 字

Spark机器学习 1 在线学习 模型随着接收的新消息,不断更新自己;而不是像离线训练一次次重新训练。 2 Spark Streaming 离散化流(DStream) 输入源:Akka act

 

Spark机器学习

1 在线学习

模型随着接收的新消息,不断更新自己;而不是像离线训练一次次重新训练。

2 Spark Streaming

  • 离散化流(DStream)
  • 输入源:Akka actors、消息队列、Flume、Kafka、……

    http://spark.apache.org/docs/latest/streaming-programming-guide.html

  • 类群(lineage):应用到RDD上的转换算子和执行算子的集合

3 MLib+Streaming应用

3.0 build.sbt

依赖Spark MLlib和Spark Streaming

name := "scala-spark-streaming-app"version := "1.0"scalaVersion := "2.11.7"libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.1"libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.1"

使用国内镜像仓库

~/.sbt/repositories

[repositories]localosc: http://maven.oschina.net/content/groups/public/typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnlysonatype-oss-releasesmaven-centralsonatype-oss-snapshots

3.1 生产消息

object StreamingProducer {  def main(args: Array[String]) {    val random = new Random()    // Maximum number of events per second    val MaxEvents = 6    // Read the list of possible names    val namesResource = this.getClass.getResourceAsStream("/names.csv")    val names = scala.io.Source.fromInputStream(namesResource)      .getLines()      .toList      .head      .split(",")      .toSeq    // Generate a sequence of possible products    val products = Seq(      "iPhone Cover" -> 9.99,      "Headphones" -> 5.49,      "Samsung Galaxy Cover" -> 8.95,      "iPad Cover" -> 7.49    )    /** Generate a number of random product events */    def generateProductEvents(n: Int) = {      (1 to n).map { i =>        val (product, price) = products(random.nextInt(products.size))        val user = random.shuffle(names).head        (user, product, price)      }    }    // create a network producer    val listener = new ServerSocket(9999)    println("Listening on port: 9999")    while (true) {      val socket = listener.accept()      new Thread() {        override def run = {          println("Got client connected from: " + socket.getInetAddress)          val out = new PrintWriter(socket.getOutputStream(), true)          while (true) {            Thread.sleep(1000)            val num = random.nextInt(MaxEvents)            val productEvents = generateProductEvents(num)            productEvents.foreach{ event =>              out.write(event.productIterator.mkString(","))              out.write("/n")            }            out.flush()            println(s"Created $num events...")          }          socket.close()        }      }.start()    }  }}
sbt runMultiple main classes detected, select one to run: [1] MonitoringStreamingModel [2] SimpleStreamingApp [3] SimpleStreamingModel [4] StreamingAnalyticsApp [5] StreamingModelProducer [6] StreamingProducer [7] StreamingStateAppEnter number: 6

3.2 打印消息

object SimpleStreamingApp {  def main(args: Array[String]) {    val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))    val stream = ssc.socketTextStream("localhost", 9999)    // here we simply print out the first few elements of each batch    stream.print()    ssc.start()    ssc.awaitTermination()  }}
sbt runEnter number: 2

3.3 流式分析

阅读全文请点击:http://click.aliyun.com/m/8713/

广告 广告

评论区