博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
60、Spark Streaming:缓存与持久化机制、Checkpoint机制
阅读量:5219 次
发布时间:2019-06-14

本文共 4067 字,大约阅读时间需要 13 分钟。

一、缓存与持久化机制

与RDD类似,Spark Streaming也可以让开发人员手动控制,将数据流中的数据持久化到内存中。对DStream调用persist()方法,就可以让Spark Streaming自动将该数据流中的所有产生的RDD,都持久化到内存中。如果要对一个DStream多次执行操作,那么,对DStream持久化是非常有用的。因为多次操作,可以共享使用内存中的一份缓存数据。对于基于窗口的操作,比如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,比如updateStateByKey,默认就隐式开启了持久化机制。即Spark Streaming默认就会将上述操作产生的Dstream中的数据,缓存到内存中,不需要开发人员手动调用persist()方法。对于通过网络接收数据的输入流,比如socket、Kafka、Flume等,默认的持久化级别,是将数据复制一份,以便于容错。相当于是,用的是类似MEMORY_ONLY_SER_2。与RDD不同的是,默认的持久化级别,统一都是要序列化的。

二、Checkpoint机制

1、Checkpoint机制概述

每一个Spark Streaming应用,正常来说,都是要7 * 24小时运转的,这就是实时计算程序的特点。因为要持续不断的对数据进行计算。因此,对实时计算应用的要求,应该是必须要能够对与应用程序逻辑无关的失败,进行容错。如果要实现这个目标,Spark Streaming程序就必须将足够的信息checkpoint到容错的存储系统上,从而让它能够从失败中进行恢复。有两种数据需要被进行checkpoint:1、元数据checkpoint——将定义了流式计算逻辑的信息,保存到容错的存储系统上,比如HDFS。当运行Spark Streaming应用程序的Driver进程所在节点失败时,该信息可以用于进行恢复。元数据信息包括了:  1.1 配置信息——创建Spark Streaming应用程序的配置信息,比如SparkConf中的信息。  1.2 DStream的操作信息——定义了Spark Stream应用程序的计算逻辑的DStream操作信息。  1.3 未处理的batch信息——那些job正在排队,还没处理的batch信息。2、数据checkpoint——将实时计算过程中产生的RDD的数据保存到可靠的存储系统中。对于一些将多个batch的数据进行聚合的,有状态的transformation操作,这是非常有用的。在这种transformation操作中,生成的RDD是依赖于之前的batch的RDD的,这会导致随着时间的推移,RDD的依赖链条变得越来越长。要避免由于依赖链条越来越长,导致的一起变得越来越长的失败恢复时间,有状态的transformation操作执行过程中间产生的RDD,会定期地被checkpoint到可靠的存储系统上,比如HDFS。从而削减RDD的依赖链条,进而缩短失败恢复时,RDD的恢复时间。一句话概括,元数据checkpoint主要是为了从driver失败中进行恢复;而RDD checkpoint主要是为了,使用到有状态的transformation操作时,能够在其生产出的数据丢失时,进行快速的失败恢复。

2、何时启用Checkpoint机制?

1、使用了有状态的transformation操作——比如updateStateByKey,或者reduceByKeyAndWindow操作,被使用了,那么checkpoint目录要求是必须提供的,也就是必须开启checkpoint机制,从而进行周期性的RDD checkpoint。2、要保证可以从Driver失败中进行恢复——元数据checkpoint需要启用,来进行这种情况的恢复。要注意的是,并不是说,所有的Spark Streaming应用程序,都要启用checkpoint机制,如果即不强制要求从Driver失败中自动进行恢复,又没使用有状态的transformation操作,那么就不需要启用checkpoint。事实上,这么做反而是有助于提升性能的。如何启用Checkpoint机制?1、对于有状态的transformation操作,启用checkpoint机制,定期将其生产的RDD数据checkpoint,是比较简单的。可以通过配置一个容错的、可靠的文件系统(比如HDFS)的目录,来启用checkpoint机制,checkpoint数据就会写入该目录。使用StreamingContext的checkpoint()方法即可。然后,你就可以放心使用有状态的transformation操作了。2、如果为了要从Driver失败中进行恢复,那么启用checkpoint机制,是比较复杂的。需要改写Spark Streaming应用程序。当应用程序第一次启动的时候,需要创建一个新的StreamingContext,并且调用其start()方法,进行启动。当Driver从失败中恢复过来时,需要从checkpoint目录中记录的元数据中,恢复出来一个StreamingContext。

3、为Driver失败的恢复机制重写程序

###Java###JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {  @Override   public JavaStreamingContext create() {    JavaStreamingContext jssc = new JavaStreamingContext(...);      JavaDStream
lines = jssc.socketTextStream(...); jssc.checkpoint(checkpointDirectory); return jssc; }};JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);context.start();context.awaitTermination();
 
###scala####def functionToCreateContext(): StreamingContext = {    val ssc = new StreamingContext(...)      val lines = ssc.socketTextStream(...)     ssc.checkpoint(checkpointDirectory)       ssc}val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)context.start()context.awaitTermination()

4、配置spark-submit提交参数

按照上述方法,进行Spark Streaming应用程序的重写后,当第一次运行程序时,如果发现checkpoint目录不存在,那么就使用定义的函数来第一次创建一个StreamingContext,并将其元数据写入checkpoint目录;当从Driver失败中恢复过来时,发现checkpoint目录已经存在了,那么会使用该目录中的元数据创建一个StreamingContext。但是上面的重写应用程序的过程,只是实现Driver失败自动恢复的第一步。第二步是,必须确保Driver可以在失败时,自动被重启。要能够自动从Driver失败中恢复过来,运行Spark Streaming应用程序的集群,就必须监控Driver运行的过程,并且在它失败时将它重启。对于Spark自身的standalone模式,需要进行一些配置去supervise driver,在它失败时将其重启。首先,要在spark-submit中,添加--deploy-mode参数,默认其值为client,即在提交应用的机器上启动Driver;但是,要能够自动重启Driver,就必须将其值设置为cluster;此外,需要添加--supervise参数。使用上述第二步骤提交应用之后,就可以让driver在失败时自动被重启,并且通过checkpoint目录的元数据恢复StreamingContext。

5、Checkpoint的说明

将RDD checkpoint到可靠的存储系统上,会耗费很多性能。当RDD被checkpoint时,会导致这些batch的处理时间增加。因此,checkpoint的间隔,需要谨慎的设置。对于那些间隔很多的batch,比如1秒,如果还要执行checkpoint操作,则会大幅度削减吞吐量。而另外一方面,如果checkpoint操作执行的太不频繁,那就会导致RDD的lineage变长,又会有失败恢复时间过长的风险。对于那些要求checkpoint的有状态的transformation操作,默认的checkpoint间隔通常是batch间隔的数倍,至少是10秒。使用DStream的checkpoint()方法,可以设置这个DStream的checkpoint的间隔时长。通常来说,将checkpoint间隔设置为窗口操作的滑动间隔的5~10倍,是个不错的选择。

转载于:https://www.cnblogs.com/weiyiming007/p/11378564.html

你可能感兴趣的文章
应用软件学习心得之mapgis功能学习
查看>>
二、create-react-app自定义配置
查看>>
Android PullToRefreshExpandableListView的点击事件
查看>>
Python学习(一)
查看>>
关于Matchvs一些使用心得与建议
查看>>
Gson获取json串中的key-value
查看>>
创建spring boot项目
查看>>
Behave + Selenium(Python) 四
查看>>
系统的横向结构(AOP)
查看>>
linux常用命令
查看>>
有序链表的归并 分类: 链表 2015-06-...
查看>>
A Plug for UNIX 分类: POJ ...
查看>>
寒假作业01
查看>>
Linux常用命令
查看>>
正确适配苹果ATS审核要求的姿势
查看>>
NHibernate.3.0.Cookbook第四章第6节的翻译
查看>>
例1-1
查看>>
Java 8 新特性之 Stream&forEach&map&filter&limit&sorted&统计函数&Collectors&并行(parallel)程序(转)...
查看>>
Windows建立Cucumber和Ruby测试环境
查看>>
HBase中MVCC的实现机制及应用情况
查看>>