Driver容错安全性是什么

本篇内容主要讲解“Driver容错安全性是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Driver容错安全性是什么”吧!

从数据层面,ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息。

从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关。

ReceivedBlockTracker在接收到元数据信息后调用addBlock方法,先写入磁盘中,然后在写入内存中。

Driver容错安全性是什么  driver 第1张Driver容错安全性是什么  driver 第2张

根据batchTime分配属于当前BatchDuration要处理的数据到timToAllocatedBlocks数据结构中。

Driver容错安全性是什么  driver 第3张

Time类的是一个case class,记录时间,重载了操作符,隐式转换,值得借鉴。

case class Time(private val millis: Long) {
  def milliseconds: Long = millis
  def < (that: Time): Boolean = (this.millis < that.millis)
  def <= (that: Time): Boolean = (this.millis <= that.millis)
  def > (that: Time): Boolean = (this.millis > that.millis)
  def >= (that: Time): Boolean = (this.millis >= that.millis)
  def + (that: Duration): Time = new Time(millis + that.milliseconds)
  def - (that: Time): Duration = new Duration(millis - that.millis)
  def - (that: Duration): Time = new Time(millis - that.milliseconds)
  // Java-friendlier versions of the above.
  def less(that: Time): Boolean = this < that
  def lessEq(that: Time): Boolean = this <= that
  def greater(that: Time): Boolean = this > that
  def greaterEq(that: Time): Boolean = this >= that
  def plus(that: Duration): Time = this + that
  def minus(that: Time): Duration = this - that
  def minus(that: Duration): Time = this - that
  def floor(that: Duration): Time = {
    val t = that.milliseconds
    new Time((this.millis / t) * t)
  }
  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }
  def isMultipleOf(that: Duration): Boolean =
    (this.millis % that.milliseconds == 0)
  def min(that: Time): Time = if (this < that) this else that
  def max(that: Time): Time = if (this > that) this else that
  def until(that: Time, interval: Duration): Seq[Time] = {
    (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
  }
  def to(that: Time, interval: Duration): Seq[Time] = {
    (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
  }
  override def toString: String = (millis.toString + " ms")
}
object Time {
  implicit val ordering = Ordering.by((time: Time) => time.millis)
}

跟踪Time对象,ReceiverTracker的allocateBlocksToBatch方法中的入参batchTime是被JobGenerator的generateJobs方法调用的。

Driver容错安全性是什么  driver 第4张

JobGenerator的generateJobs方法是被定时器发送GenerateJobs消息调用的。

Driver容错安全性是什么  driver 第5张Driver容错安全性是什么  driver 第6张Driver容错安全性是什么  driver 第7张

GenerateJobs中的时间参数就是nextTime,而nextTime+=period,这个period就是ssc.graph.batchDuration.milliseconds。

Driver容错安全性是什么  driver 第8张

nextTime的初始值是在start方法中传入的startTime赋值的,即RecurringTimer的getStartTime方法的返回值,是当前时间period的(整数倍+1)。

Driver容错安全性是什么  driver 第9张Driver容错安全性是什么  driver 第10张

Period这个值是我们调用new StreamingContext来构造StreamingContext时传入的Duration值。

Driver容错安全性是什么  driver 第11张Driver容错安全性是什么  driver 第12张

ReceivedBlockTracker会清除过期的元数据信息,从HashMap中移除,也是先写入磁盘,然后在写入内存。

Driver容错安全性是什么  driver 第13张

元数据的生成,消费和销毁都有WAL,所以失败时就可以从日志中恢复。从源码分析中得出只有设置了checkpoint目录,才进行WAL机制。

Driver容错安全性是什么  driver 第14张

对传入的checkpoint目录来创建日志目录进行WAL。

Driver容错安全性是什么  driver 第15张

这里是在checkpoint目录下创建文件夹名为receivedBlockMetadata的文件夹来保存WAL记录的数据。

Driver容错安全性是什么  driver 第16张

Driver容错安全性是什么  driver 第17张

把当前的DStream和JobGenerator的状态进行checkpoint,该方法是在generateJobs方法最后通过发送DoCheckpoint消息,来调用的。

Driver容错安全性是什么  driver 第18张Driver容错安全性是什么  driver 第19张Driver容错安全性是什么  driver 第20张

到此,相信大家对“Driver容错安全性是什么”有了更深的了解,不妨来实际操作一番吧!这里是蜗牛博客网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:niceseo99@gmail.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

评论

有免费节点资源,我们会通知你!加入纸飞机订阅群

×
天气预报查看日历分享网页手机扫码留言评论电报频道链接