hive表小文件合并

news/2024/5/20 1:15:56 标签: hive, hdfs, 小文件

1. 背景

公司的 hive 表中的数据是通过 flink sql 程序,从 kafka 读取,然后写入 hive 的,为了数据能够被及时可读,我设置了 flink sql 程序的 checkpoint 时间为 1 分钟,因此,在 hive 表对应的 hdfs 上,会每隔 1 分钟,生成一个小文件,每天生成 1440 个小文件,时间长了之后,就会造成 hdfs小文件过多的问题。为了解决这个问题,我编写了一个工具类,用来合并指定目录及其子目录下所有的文件。

2. 代码

TimeUtil 工具类

package reach.store.tools.common.utils;
import cn.hutool.core.date.LocalDateTimeUtil;
import java.time.format.DateTimeFormatter;


public class TimeUtil {

    private static final DateTimeFormatter DATE_TIME_FORMATTER_DEFAULT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    /**
     * 获取当前时间字符串
     */
    public static String now() {
        return LocalDateTimeUtil.now().format(DATE_TIME_FORMATTER_DEFAULT);
    }

}

MergeHdfsFiles 主类

package reach.store.tools.hdfs;

import cn.hutool.core.lang.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reach.store.tools.common.utils.TimeUtil;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 合并 hdfs 上同一个目录下的小文件
 */
public class MergeHdfsFiles {


    private static final Logger LOGGER = LoggerFactory.getLogger(MergeHdfsFiles.class);
    /**
     * 将 main 函数入参转化为 map 表
     */
    private static final Map<String, Object> ARGS_MAP = new HashMap<>();
    private static final String DIR_KEY = "dir";
    /**
     * 合并后文件名前缀对应的 key
     */
    private static final String MERGE_FILE_NAME_PRE_KEY = "mergeFileNamePre";
    /**
     * 合并后文件名默认前缀
     */
    private static final String MERGE_FILE_NAME_PRE_DEFAULT = "merge";
    /**
     * 合并后文件名前缀
     */
    private static String MERGE_FILE_NAME_PRE = MERGE_FILE_NAME_PRE_DEFAULT;
    /**
     * 合并时被过滤文件最大大小对应的 key
     */
    private static final String MAX_FILE_SIZE_KEY = "maxFileSize";
    /**
     * 合并时被过滤文件最大大小默认值
     */
    private static final Long MAX_FILE_SIZE_DEFAULT = 50 * 1024 * 1024L;
    /**
     * 合并时被过滤文件最大大小
     */
    private static Long MAX_FILE_SIZE = MAX_FILE_SIZE_DEFAULT;

    private static final Configuration CONFIGURATION = new Configuration();
    private static final FileSystem FILE_SYSTEM;

    static {
        try {
            FILE_SYSTEM = FileSystem.get(CONFIGURATION);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 所有需要合并小文件的目录
     */
    private static final List<Path> DIRS_PATH = new ArrayList<>();

    /**
     * 必须先调用此函数,来处理入参,初始化 dirsPath map 表
     * 将输入的参数转化为 map 表,输入参数格式为 --key value ...
     */
    private static void argsToMap(String[] args) {
        if (args.length == 0 || args.length % 2 != 0) {
            System.out.println(TimeUtil.now() + "   未输入必要的参数,或输入参数个数不是偶数,请检查输入参数");
            System.exit(1);
        } else {
            String key = null;
            for (int index = 0; index < args.length; index++) {
                if (index % 2 == 0) {
                    // key
                    key = args[index].substring(2);
                } else {
                    // value
                    ARGS_MAP.put(key, args[index]);
                }
            }
            // 处理可选参数值
            if (ARGS_MAP.containsKey(MERGE_FILE_NAME_PRE_KEY)) {
                MERGE_FILE_NAME_PRE = String.valueOf(ARGS_MAP.get(MERGE_FILE_NAME_PRE_KEY));
            }
            if (ARGS_MAP.containsKey(MAX_FILE_SIZE_KEY)) {
                MAX_FILE_SIZE = Long.parseLong(String.valueOf(ARGS_MAP.get(MAX_FILE_SIZE_KEY))) * 1024 * 1024;
            }
        }
    }

    /**
     * 合并所有的文件,然后删除合并之前的文件
     *
     * @param dirPath         合并后的文件目录
     * @param sourceFilePaths 被合并的所有文件 path
     */
    private static void mergeFiles(Path dirPath, List<Path> sourceFilePaths) throws IOException {
        if (sourceFilePaths == null || sourceFilePaths.size() == 0) {
            return;
        }
        // 不合并以 . 开头,并且大小超过设定值的文件
        sourceFilePaths.removeIf(next -> {
            try {
                return next.getName().startsWith(".") || FILE_SYSTEM.getFileStatus(next).getLen() >= MAX_FILE_SIZE;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        if (sourceFilePaths.size() == 0) {
            return;
        }
        // 如果目录下需要合并的文件只有一个,则无需进行合并操作
        if (sourceFilePaths.size() == 1) {
            System.out.println(TimeUtil.now() + "   " + dirPath.toString() + "目录下只有一个文件符合合并条件,不进行合并操作。");
            return;
        }


        // 读写 parquet 文件
        MessageType schema = ParquetFileReader.readFooter(CONFIGURATION, sourceFilePaths.get(0), ParquetMetadataConverter.NO_FILTER).getFileMetaData().getSchema();
        GroupWriteSupport.setSchema(schema, CONFIGURATION);
        // 合并文件,合并后文件名,需要以 . 开头,防止被读取 hive 读取表操作读取到,最后通过重命名合并后文件名及删除所有原始文件的方式使合并后的文件可见。
        Path targetFilePath = new Path(dirPath + "/." + MERGE_FILE_NAME_PRE + "_" + UUID.randomUUID());
        ParquetWriter<Group> writer = new ParquetWriter<>(targetFilePath, new GroupWriteSupport(), CompressionCodecName.SNAPPY, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION, CONFIGURATION);
        for (Path filePath : sourceFilePaths) {
            ParquetReader<Group> parquetReader = ParquetReader.builder(new GroupReadSupport(), filePath).build();
            SimpleGroup group;
            while ((group = (SimpleGroup) parquetReader.read()) != null) {
                writer.write(group);
            }
        }
        writer.close();

        // 重命名合并后的文件,删除原来的所有文件
        Path newTargetFilePath = new Path(targetFilePath.getParent() + "/" + targetFilePath.getName().substring(1));
        FILE_SYSTEM.rename(targetFilePath, newTargetFilePath);
        System.out.println(TimeUtil.now() + "   被合并小文件的目录为:" + targetFilePath.getParent().toString() +
                ",原始符合合并条件文件数量:" + sourceFilePaths.size() +
                ",合并后文件名:" + newTargetFilePath.getName());
        for (Path path : sourceFilePaths) {
            FILE_SYSTEM.delete(path, false);
        }
    }


    /**
     * 处理单个目录下的文件和目录<br>
     * 如果有目录,则将目录加入 dirsPath 列表,下次循环时继续从列表中获取,然后继续处理目录<br>
     * 如果有文件,则将文件信息放入一个列表,之后对所有文件合并即可
     *
     * @param dirPath 需要处理的目录 path 对象
     */
    private static void dirHandle(Path dirPath, Configuration configuration, FileSystem fileSystem) throws IOException {
        List<Path> filesPath = new ArrayList<>();
        RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listLocatedStatus(dirPath);
        // 查找所有的文件及目录
        while (locatedFileStatusRemoteIterator.hasNext()) {
            LocatedFileStatus locatedFileStatus = locatedFileStatusRemoteIterator.next();
            if (locatedFileStatus.isFile()) {
                filesPath.add(locatedFileStatus.getPath());
            } else if (locatedFileStatus.isDirectory()) {
                DIRS_PATH.add(locatedFileStatus.getPath());
            }
        }
        mergeFiles(dirPath, filesPath);
    }

    /**
     * 开始进行指定目录下的小文件合并
     */
    private static void start() throws Exception {
        String dir;
        if (ARGS_MAP.containsKey(DIR_KEY)) {
            dir = String.valueOf(ARGS_MAP.get(DIR_KEY));
            if (dir.endsWith("/")) {
                // 去掉最后的 /
                dir = dir.substring(0, dir.length() - 1);
            }
        } else {
            throw new RuntimeException("未指定要合并小文件的目录 dir 参数");
        }

        Configuration configuration = new Configuration();
        DIRS_PATH.add(new Path(dir));
        FileSystem fileSystem = FileSystem.get(configuration);

        while (DIRS_PATH.size() > 0) {
            Path dirPath = DIRS_PATH.get(0);
            dirHandle(dirPath, configuration, fileSystem);
            DIRS_PATH.remove(dirPath);
        }

        fileSystem.close();

    }

    /**
     * @param args dir:必选,需要合并小文件的目录,绝对路径,直接从 / 开始写即可。<br>
     *             mergeFileNamePre:可选,合并后的文件名前缀,默认为 merge <br>
     *             maxFileSize:可选,被合并小文件的最大大小,超过该值,则不参与合并,单位:M,默认:50M
     */
    public static void main(String[] args) throws Exception {
        argsToMap(args);
        start();
    }

}

程序执行逻辑:

  1. 去掉以 . 开头的文件。
  2. 去掉大小超过 50M 的文件,这个 50M 可以在程序启动时作为参数设置。
  3. 如果目录下只有一个文件,则不执行合并操作。

注意:该程序只能合并 parquet 格式的文件,并且采用 snappy 压缩。

logback.xml 文件,注意将该文件放到项目的 resources 资源目录下。在该文件的配置中,将日志的级别设置为 WARN,是为了减少后续程序运行时过多的 INFO 日志输出,影响程序日志的输出。

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d %p [%c] - %m%n</pattern>
        </encoder>
    </appender>

    <root level="WARN">
        <appender-ref ref="console"/>
    </root>

</configuration>

core-site.xml、hdfs-site.xml,这两个文件,大家从自己的集群下载下来放到项目的 resource 资源目录下即可。

pom.xml 文件内容:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>reach.store</groupId>
    <artifactId>bigdata-tools</artifactId>
    <version>1.0</version>
    <packaging>pom</packaging>
    <modules>
        <module>tools-common</module>
        <module>dolphinscheduler</module>
        <module>tools-hdfs</module>
    </modules>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <hadoop.version>3.0.3</hadoop.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.8.2</version>
            <scope>test</scope>
        </dependency>

        <!-- hadoop -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>1.9.0</version>
        </dependency>

        <!-- 其他 -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.21</version>
        </dependency>
        <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 测试代码运行插件,可以在打包之前跳过test包下符合命名规范的所有类的代码 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
            <!-- 打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <!-- 要包含的资源文件目录,写相对路径,相对于项目的根路径 -->
                <directory>src/main/resources</directory>
                <includes>
                    <!-- 要包含的文件,相对于上面指定的目录 -->
                    <include>**</include>
                </includes>
            </resource>
        </resources>
    </build>


</project>

注意修改对应的依赖版本为自己集群的版本。

最后直接通过 idea 的 maven 窗口的 package 按钮进行打包即可。

3. 执行

主类:xxx.xxx.xxx.MergeHdfsFiles

启动:调用时,直接通过命令 java -cp 主类完全限定名 xxx.jar --key1 value1 --key2 value2 ... 执行即可。

入参说明

  1. dir:必选,需要合并小文件的目录,直接从根目录 / 开始即可,开头无需指定 hdfs 等信息,最后无需添加 /
  2. mergeFileNamePre:可选,合并后的文件名称前缀,默认为:merge,合并后的文件名会在该前缀后面添加一个 UUID。
  3. maxFileSize:可选,需要被合并的最大文件大小,单位:M,默认:50M。如果小文件大小超过该参数设置,则不参与小文件合并。

http://www.niftyadmin.cn/n/1005928.html

相关文章

Laravel 多字段去重count计数

Laravel 多字段去重count计数 背景&#xff1a;需要统计数据列表总条数&#xff08;字段1、字段2去重统计&#xff09; table&#xff1a;policy_view,去重字段admin_id和permission 期望结果&#xff1a;count不含重复统计数据 解决思路&#xff1a; 语法&#xff1a;DISTI…

ubuntu22.04配置双网卡双静态ip不通网段访问服务器的相同服务

ubuntu22.04配置双网卡双静态ip不通网段访问服务器的相同服务 技术博客 http://idea.coderyj.com/ 1.需求 南方电网网段(假如)是 192.168.3.1的网段机器人服务器在隧道ip是 172.16.1.1网段要求这2个网段都能访问到服务器上的服务 2.解决方案 服务器上配置双网卡 双ip 以ubun…

7.MHA高可用配置及故障切换

文章目录 MHA高可用配置及故障切换MHA概念实验配置时间同步与主从复制安装MHA服务SSH免交互认证验证MHA服务是否开启启动服务 故障模拟恢复故障过程 MHA高可用配置及故障切换 MHA概念 MHA&#xff08;MasterHigh Availability&#xff09;是一套优秀的MySQL高可用环境下故障切…

Presto(Trino)分布式(物理)执行计划的生成和调度

文章目录 1.前言2.物理执行生成(Stage)的生成2.1不同的调度分区策略2.1.1 Connector自己提供的分区策略2.1.2 Presto提供的Partition策略(SystemPartitioningHandle)&#xff1a; 2.2 为Stage创建StageScheduler2.2.1 普通的非bucket表的TableScan StageSplit 放置策略解析 2.2…

移动隔断屏风墙,无地轨设计,空间灵活应用

移动隔断屏风墙是一种非常适合办公室的设计选择&#xff0c;它可以提供灵活的办公空间布局&#xff0c;并且无地轨设计可以避免地面安装轨道&#xff0c;给空间带来更大的自由度。以下是一些关于移动隔断屏风墙的特点和设计建议&#xff1a; 1. 灵活应用&#xff1a;移动隔断屏…

论好名字的重要性: Linux内核page到folio的变迁

一、引子 Once upon a time&#xff0c;Netscape的大拿 Phil Karlton曾经说过&#xff1a;“There are only two hard things in Computer Science: cache invalidation and naming things”&#xff0c;成为程序界流传甚广的名言&#xff0c;可见取名是计算机科学中最难的两件…

iframe主子应用通信

背景 在工作中有许多项目需要嵌套在一个主项目中&#xff0c;最常用的方式是通过iframe嵌套&#xff0c;这里主要总结记录下我在公司项目中使用iframe主子应用相互传递参数。我项目中主要是用的vue&#xff0c;这里的demo是vue 直接上代码&#xff1a; 父传子 /*主应用代码*…

并查集和LRUCache

目录 1. 并查集 1.1原理 1.2实现 1.3应用 1.3.1省份数量 1.3.2等式方程的可满足性 2.LRUCache 1.概念 2.实现 3.JDK中类似LRUCahe的数据结构LinkedHashMap 4.LRU Cache的OJ 1. 并查集 1.1原理 把不同的元素划分到不想交的集合.开始时,每个元素自成一个单元集合,然后…