Spark-HDFS 删除空文件 合并小文件

news/2024/5/20 0:49:28 标签: Spark, HDFS, 小文件

一.引言

hive 执行任务后目录下生成过多小文件,过多的小文件会占用 namenode 的内存,对于 HDFS 非常不友好,所以可以通过删除空文件或者合并小文件的方法进行优化。

二.删除空文件

可以看到有很多空的gz,blockSize=20。如果是空文件的话,blockSize占用会是0。

思路很简单,通过 listStatus 方法判断目标路径是文件还是文件夹,文件夹的话递归到下层文件,文件的话直接 getLen 获取大小,如果满足 blockSize,则进行删除 delete 操作。任务需求简单所以这里偷懒没有写递归的实现,有需要的小伙伴可以自己动手实现一下

通过设定 blockSize 参数,可以删除大小为 0 或者是 20 或者是其他大小的文件。 

import org.apache.hadoop.fs.Path
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession


object DeleteBlockFile {

  def main(args: Array[String]): Unit = {
    Logger.getRootLogger.setLevel(Level.WARN)

    val (argsList, argsMap) = ArgsParseUtil.parseArgs(args)

    val conf = new SparkConf
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val baseDir = argsMap.getOrElse("input","input")
    val blockSize = argsMap.getOrElse("blockSize","0").toInt

    val spark = SparkSession
      .builder
      .config(conf)
      .appName("Delete Block HDFS FIle")
      .getOrCreate()

    val baseDirPath = new Path(baseDir)
    val fs = baseDirPath.getFileSystem(spark.sparkContext.hadoopConfiguration)

    // 遍历删除
    fs.listStatus(baseDirPath).foreach(dir => {
      // 判断是文件还是文件夹
      if (dir.isDirectory) {
        fs.listStatus(dir.getPath).foreach(file => {
          if (file.getLen == blockSize && file.isFile) {
            fs.delete(file.getPath, true)
            println("Delete File ", file.getPath.toString)
          }
        })
      } else if (dir.isFile) {
        if (dir.getLen == blockSize) {
          fs.delete(dir.getPath, true)
          println("Delete File ", dir.getPath.toString)
        }
      } else {
        println("Dir is nor File Or Directory")
      }
    })

  }

三.合并小文件

删除空文件并不能解决占用 namenode 太多的问题,因为还有很多小文件有内容,作为原始数据无法删除该类小文件,所以采用新办法,合并小文件

合并小文件后还有新的坑,我想要将合并后的小文件继续生成在当前目录,如果直接生成会得到 file already exists 的报错,如果想要覆盖目录,通过设置 sparkConf.set("spark.hadoop.validateOutputSpecs", "false") 参数得到的结果却是文件后的追加输出:

A是之前的内容,B是合并后追加输出的内容,不符合预期的目标,所以需要进行 -> 删除原有路径 -> 新建临时路径 ->再重命名临时路径到原有路径的繁琐操作,有简洁的方法欢迎大佬指导!除此之外这里新增了 st,end参数支持对一段时间内的全部文件遍历合并~

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, SparkContext}

import java.io.File
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import scala.collection.mutable.ArrayBuffer

object MergeSmallFiles {

  def main(args: Array[String]): Unit = {
    val (listArgs, mapArgs) = ArgsParseUtil.parseArgs(args)
    val local = mapArgs.getOrElse("local", "true").toBoolean
    val input = mapArgs.getOrElse("input", "")
    val output = mapArgs.getOrElse("output", "")
    val appName = mapArgs.getOrElse("appname", "MergeSmallFile")
    val numPartition = mapArgs.getOrElse("numPartition", "1").toInt
    val conf = if (local) {
      new SparkConf()
        .setMaster("local")
        .setAppName(appName)
    } else {
      new SparkConf().setAppName(appName)
    }
    val sc = new SparkContext(conf)

    val baseDirPath = new Path(input)
    val fs = baseDirPath.getFileSystem(sc.hadoopConfiguration)


    val dateTimeFormat = DateTimeFormatter.ofPattern("yyyyMMdd")

    val st = mapArgs.getOrElse("st", "20210101")
    val end = mapArgs.getOrElse("end", "20210101")
    var now = st
    val allDateTime = new ArrayBuffer[String]()

    while (now.toInt <= end.toInt) {
      allDateTime.append(now)
      now = LocalDate.parse(now, dateTimeFormat).plusDays(1).format(dateTimeFormat)
    }

    allDateTime.foreach(dt => {
      val path = input + File.separator + "dt=" + dt
      val outputPath = path + "_tmp"
      val outputRDD = sc.textFile(path)
      sc.setJobGroup(dt, path)
      println("Input: ", path)
      println("Output: ", outputPath)
      println("RepartitionNum", numPartition)
      // 输出到临时路径
      outputRDD.repartition(numPartition).saveAsTextFile(outputPath, classOf[GzipCodec])
      // 删除原有路径
      fs.delete(new Path(path), true)
      // 重命名会原始路径
      fs.rename(new Path(outputPath), new Path(path))
    })

  }

}

完结撒花~


http://www.niftyadmin.cn/n/1236012.html

相关文章

分布式系统之CAP理论学习

1、分布式系统中CAP分别代表什么&#xff1f;对它们的理解&#xff1f; CAP即一致性&#xff08;Consistency&#xff09;、可用性&#xff08;Availability&#xff09;、分区容错性&#xff08;Partition tolerance&#xff09;。 一致性&#xff08;Consistency&#xff0…

TensorFlow-Keras 10.CNN+RNN 处理文本序列-温度预测问题

一.引言 上一篇文章基础文本处理 processing && embedding 介绍了常用的文本处理方法&#xff0c;趁热打铁了解一下处理连续文本的 demo 流程。 二.数据信息与获取 下面例子将用到气象记录站的天气时间序列&#xff0c;数据集中每10分钟记录14个不同的指标&#xff0…

服务器CPU飙升问题分析

4核的服务器突然很卡&#xff0c;急忙跑上去看看发生了什么事。 top命令查看 占用率最高的进程pid为12033&#xff0c;top -Hp 12033查看进程下面线程的使用情况 这俩罪魁祸首用的最多 jstack 12033 > check.txt查看进程的堆栈信息&#xff0c;输出到文件后&#xff0c;通…

447. 回旋镖的数量

2021-09-13 LeetCode每日一题 链接&#xff1a;https://leetcode-cn.com/problems/number-of-boomerangs/ 标签&#xff1a;数组、数学、哈希表 题目 给定平面上 n 对 互不相同 的点 points &#xff0c;其中 points[i] [xi, yi] 。回旋镖 是由点 (i, j, k) 表示的元组 &…

524. 通过删除字母匹配到字典里最长单词

2021-09-14 LeetCode每日一题 链接&#xff1a;https://leetcode-cn.com/problems/longest-word-in-dictionary-through-deleting/ 标签&#xff1a;数组、双指针、字符串、排序 题目 给你一个字符串 s 和一个字符串数组 dictionary 作为字典&#xff0c;找出并返回字典中最长…

TensorFlow-Keras 11.多输入模型

一.引言 函数式 API 可用于构建具有多个输入的模型&#xff0c;通常情况下&#xff0c;模型会在某一时刻用一个可以组合多个张量的层将不同输入得到的结果进行组合&#xff0c;组合方式可以是相加&#xff0c;连接等等&#xff0c;这其中常用的为 keras.layers.add&#xff0c…

TensorFlow-Keras 12.多输出模型

一.引言 上一篇文章介绍了 TensorFlow-Keras 多输入模型&#xff0c;利用相同的方法&#xff0c;还可以使用函数式 API 构建具有多个输出即多头的模型&#xff0c;一个简单的例子就是利用同一个数据&#xff0c;一次性预测某个体多个属性&#xff0c;例如输入某个用户的评论信…

TensorFlow-Keras 13.Inception 模块

一.引言 通过函数式 API&#xff0c;前面已经实现了多输入与多输出的模型&#xff0c;除此之外&#xff0c;函数式 API 还可以实现具有复杂内部拓扑结构的网络。 Keras 中的神经网络可以是层组成的有向无环图&#xff0c;无环代表图不能有循环&#xff0c;即张量 x 不能成为生成…