Spark 持久化
为什么要将变量定义成广播变量?
Spark中最重要的功能之一是跨操作在内存中持久化数据集。持久化一个RDD时,每个节点在内存中存储它计算的任何分区,并在该数据集(或从中派生的数据集)的其他操作中重构它们。这使得将来的操作要快得多(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。
可以使用其上的persist()或cache()方法将RDD标记为持久的。第一次在操作中计算它时,它将保存在节点的内存中。Spark的缓存是容错的——如果一个RDD的任何分区丢失了,它将使用最初创建它的转换自动重新计算。
持久化的存储级别很多,常用的是MEMORY_ONLY、MEMORY_ONLY_SER、MEMORY_AND_DISK
如何选择它们?
Storage Level的选择是内存和CPU的权衡
- 内存多:MEMORY_ONLY (不进行序列化)
- CPU跟的上:MEMORY_ONLY_SER (进行了序列化,推介)
- 不建议写Disk
使用cache()和persist()进行持久化操作,它们都是lazy的,需要action才能触发,默认使用MEMORY_ONLY。
1 | scala> forRDD.cache |
结果可以在Web UI的Storage中查看
如果需要清除缓存,使用unpersist(),清除缓存数据是立即执行的
1 | scala> forRDD.unpersist() |
怎么修改存储级别?
1 | val forRDD = rdd.map(x => { |
StorageLevel是个object,需要的级别都可以从里面拿出来
考点:cache和persist有什么区别?
- cache调用的persist,persist调用的persist(storage level)
考点:序列化和非序列化有什么区别? - 序列化将对象转换成字节数组了,节省空间,占CPU
Removing Data
Spark自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧的数据分区。如果想要手动删除一个RDD,而不是等待它从缓存中消失,那么可以使用RDD.unpersist()方法。