Spark 开发原则
- 坐享其成
- 要省要拖
- 跳出单机思维
应用开发原则 :
- 坐享其成 : 利用 Spark SQL 优化
- 能省则省、能拖则拖 : 节省数据量 , 拖后 Shuffle
- 跳出单机思维 : 避免无谓的分布式遍历
坐享其成
设置好配置项,享受 Spark SQL 的性能优势,如钨丝计划、AQE、SQL functions
钨丝计划:
- Tungsten 自定义了紧凑的二进制格式的数据结构,避免了 Java 对象序列化与反序列化的计算开销
- Tungsten 利用 Java Unsafe API 开辟堆外(Off Heap Memory)内存来管理对象。对内存占用的估算更精确,不用反复垃圾回收
- Tungsten 用全阶段代码生成(Whol Stage Code Generation)取代火山迭代模型,减少虚函数调用和降低内存访问频率,还提升 CPU 利用率
AQE :
- 自动分区合并 : 自动检测小数据分区,并进行自动合并
- 数据倾斜 : 发现倾斜的数据分片,会自动加盐处理,并对另一张表进行复制
- Join 策略调整 : 动态调整 Join ,转换为 Broadcast Join
要省要拖
- 省 : 数据处理量,只有节省数据量 , 就节省计算负载,就有更快的处理速度
- 拖 : Shuffle ,当计算步骤越靠后,处理的数据量越少,Shuffle 越晚,落盘/分发就越少,就有更高的执行效率
实现步骤 :
- 尽量把节省数据扫描量和 数据处理量往前推
- 尽量消除 Shuffle,省去数据落盘与分发的开销
- 消除不了 Shuffle,就把 Shuffle 往后拖
val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
val rootPath: String = _
//读取日志文件,去重、并展开userInterestList
def createDF(rootPath: String, date: String): DataFrame = {
val path: String = rootPath + date
val df = spark.read.parquet(path)
.distinct
.withColumn("userInterest", explode(col("userInterestList")))
df
}
//提取字段、过滤,再次去重,把多天的结果用union合并
val distinctItems: DataFrame = dates.map {
case date: String =>
val df: DataFrame = createDF(rootPath, date)
.select("userId", "itemId", "userInterest", "accessFreq")
.filter("accessFreq in ('High', 'Medium'))")
.distinct
df
}.reduce(_ union _)
优化 :
- 减少数据访问量的操作 (filter、select) 往前推
- 引入 Shuffle 的 distinct 拖到最后面
val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
val rootPath: String = _
val filePaths: List[String] = dates.map(rootPath + _)
/**
一次性调度所有文件
先进行过滤和列剪枝
然后再展开userInterestList
最后统一去重
*/
val distinctItems = spark.read.parquet(filePaths: _*)
.filter("accessFreq in ('High', 'Medium'))")
.select("userId", "itemId", "userInterestList")
.withColumn("userInterest", explode(col("userInterestList")))
.select("userId", "itemId", "userInterest")
.distinct
跳出单机思维
在右表用 map ,在 map 内实例化 Util 类获取哈希算法,拼接 Join keys 进行哈希运算
import java.security.MessageDigest
class Util {
val md5: MessageDigest = MessageDigest.getInstance("MD5")
val sha256: MessageDigest = _ //其他哈希算法
}
val df: DataFrame = _
val ds: Dataset[Row] = df.map {
case row: Row =>
val util = new Util()
val s: String = row.getString(0) + row.getString(1) + row.getString(2)
val hashKey: String = util.md5.digest(s.getBytes).map("%02X".format(_)).mkString
(hashKey, row.getInt(3))
}
优化 :
- 实例化的动作放 map 外面
val ds: Dataset[Row] = df.mapPartitions(iterator => {
val util = new Util()
val res = iterator.map {
case row => {
val s: String = row.getString(0) + row.getString(1) + row.getString(2)
val hashKey: String = util.md5.digest(s.getBytes).map("%02X".format(_)).mkString
(hashKey, row.getInt(3))
}
}
res
})