为啥Spark 的Broadcast要用单例模式大数据应用

来源:互联网 / 作者:SKY / 2019-06-11 15:23 / 点击:
很多用Spark Streaming 的朋友应该使用过broadcast,大多数情况下广播变量都是以单例模式声明的有没有粉丝想过为什么?

很多用Spark Streaming 的朋友应该使用过broadcast,大多数情况下广播变量都是以单例模式声明的有没有粉丝想过为什么?浪尖在这里帮大家分析一下,有以下几个原因:

广播变量大多数情况下是不会变更的,使用单例模式可以减少spark streaming每次job生成执行,重复生成广播变量带来的开销。

单例模式也要做同步。这个对于很多新手来说可以不用考虑同步问题,原因很简单因为新手不会调整spark 程序task的调度模式,而默认采用FIFO的调度模式,基本不会产生并发问题。1).假如你配置了Fair调度模式,同时修改了Spark Streaming运行的并行执行的job数,默认为1,那么就要加上同步代码了。2).还有一个原因,在多输出流的情况下共享broadcast,同时配置了Fair调度模式,也会产生并发问题。

注意。有些时候比如广播配置文件,规则等需要变更broadcast,在使用fair的时候可以在foreachrdd里面使用局部变量作为广播,避免相互干扰。

先看例子,后面逐步揭晓内部机制。

1.例子

下面是一个双重检查式的broadcast变量的声明方式。

object WordBlacklist { 

 

  @volatile private var instance: Broadcast[Seq[String]] = null 

 

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { 

    if (instance == null) { 

      synchronized { 

        if (instance == null) { 

          val wordBlacklist = Seq("a""b""c"

          instance = sc.broadcast(wordBlacklist) 

        } 

      } 

    } 

    instance 

  } 

广播变量的使用方法如下:

val lines = ssc.socketTextStream(ip, port) 

    val words = lines.flatMap(_.split(" ")) 

    val wordCounts = words.map((_, 1)).reduceByKey(_ + _) 

    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], timeTime) => 

      // Get or register the blacklist Broadcast 

      val blacklist = WordBlacklist.getInstance(rdd.sparkContext) 

      // Get or register the droppedWordsCounter Accumulator 

      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) 

      // Use blacklist to drop words and use droppedWordsCounter to count them 

      val counts = rdd.filter { case (word, count) => 

        if (blacklist.value.contains(word)) { 

          droppedWordsCounter.add(count

          false 

        } else { 

          true 

        } 

      }.collect().mkString("["", ""]"

      val output = s"Counts at time $time $counts" 

      println(output

      println(s"Dropped ${droppedWordsCounter.value} word(s) totally"

      println(s"Appending to ${outputFile.getAbsolutePath}"

阅读延展

1
3