一.引言
hive 执行任务后目录下生成过多小文件,过多的小文件会占用 namenode 的内存,对于 HDFS 非常不友好,所以可以通过删除空文件或者合并小文件的方法进行优化。
二.删除空文件
可以看到有很多空的gz,blockSize=20。如果是空文件的话,blockSize占用会是0。
思路很简单,通过 listStatus 方法判断目标路径是文件还是文件夹,文件夹的话递归到下层文件,文件的话直接 getLen 获取大小,如果满足 blockSize,则进行删除 delete 操作。任务需求简单所以这里偷懒没有写递归的实现,有需要的小伙伴可以自己动手实现一下
通过设定 blockSize 参数,可以删除大小为 0 或者是 20 或者是其他大小的文件。
import org.apache.hadoop.fs.Path
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DeleteBlockFile {
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.WARN)
val (argsList, argsMap) = ArgsParseUtil.parseArgs(args)
val conf = new SparkConf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val baseDir = argsMap.getOrElse("input","input")
val blockSize = argsMap.getOrElse("blockSize","0").toInt
val spark = SparkSession
.builder
.config(conf)
.appName("Delete Block HDFS FIle")
.getOrCreate()
val baseDirPath = new Path(baseDir)
val fs = baseDirPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
// 遍历删除
fs.listStatus(baseDirPath).foreach(dir => {
// 判断是文件还是文件夹
if (dir.isDirectory) {
fs.listStatus(dir.getPath).foreach(file => {
if (file.getLen == blockSize && file.isFile) {
fs.delete(file.getPath, true)
println("Delete File ", file.getPath.toString)
}
})
} else if (dir.isFile) {
if (dir.getLen == blockSize) {
fs.delete(dir.getPath, true)
println("Delete File ", dir.getPath.toString)
}
} else {
println("Dir is nor File Or Directory")
}
})
}
三.合并小文件
删除空文件并不能解决占用 namenode 太多的问题,因为还有很多小文件有内容,作为原始数据无法删除该类小文件,所以采用新办法,合并小文件。
合并小文件后还有新的坑,我想要将合并后的小文件继续生成在当前目录,如果直接生成会得到 file already exists 的报错,如果想要覆盖目录,通过设置 sparkConf.set("spark.hadoop.validateOutputSpecs", "false") 参数得到的结果却是文件后的追加输出:
A是之前的内容,B是合并后追加输出的内容,不符合预期的目标,所以需要进行 -> 删除原有路径 -> 新建临时路径 ->再重命名临时路径到原有路径的繁琐操作,有简洁的方法欢迎大佬指导!除此之外这里新增了 st,end参数支持对一段时间内的全部文件遍历合并~
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, SparkContext}
import java.io.File
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import scala.collection.mutable.ArrayBuffer
object MergeSmallFiles {
def main(args: Array[String]): Unit = {
val (listArgs, mapArgs) = ArgsParseUtil.parseArgs(args)
val local = mapArgs.getOrElse("local", "true").toBoolean
val input = mapArgs.getOrElse("input", "")
val output = mapArgs.getOrElse("output", "")
val appName = mapArgs.getOrElse("appname", "MergeSmallFile")
val numPartition = mapArgs.getOrElse("numPartition", "1").toInt
val conf = if (local) {
new SparkConf()
.setMaster("local")
.setAppName(appName)
} else {
new SparkConf().setAppName(appName)
}
val sc = new SparkContext(conf)
val baseDirPath = new Path(input)
val fs = baseDirPath.getFileSystem(sc.hadoopConfiguration)
val dateTimeFormat = DateTimeFormatter.ofPattern("yyyyMMdd")
val st = mapArgs.getOrElse("st", "20210101")
val end = mapArgs.getOrElse("end", "20210101")
var now = st
val allDateTime = new ArrayBuffer[String]()
while (now.toInt <= end.toInt) {
allDateTime.append(now)
now = LocalDate.parse(now, dateTimeFormat).plusDays(1).format(dateTimeFormat)
}
allDateTime.foreach(dt => {
val path = input + File.separator + "dt=" + dt
val outputPath = path + "_tmp"
val outputRDD = sc.textFile(path)
sc.setJobGroup(dt, path)
println("Input: ", path)
println("Output: ", outputPath)
println("RepartitionNum", numPartition)
// 输出到临时路径
outputRDD.repartition(numPartition).saveAsTextFile(outputPath, classOf[GzipCodec])
// 删除原有路径
fs.delete(new Path(path), true)
// 重命名会原始路径
fs.rename(new Path(outputPath), new Path(path))
})
}
}
完结撒花~