Java抽取Hive、HDFS元数据信息

news/2024/5/20 2:57:45 标签: java, hive, hdfs

文章目录

  • 一、技术
  • 二、构建SpringBoot工程
    • 2.1 创建maven工程并配置 pom.xml文件
    • 2.2 编写配置文件 application.yml
    • 2.3 编写配置文件 application.propertites
    • 2.4 开发主启动类
    • 2.5 开发配置类
  • 三、测试抽取Hive、HDFS元数据
  • 四、将抽取的元数据存储到MySQL
    • 4.1 引入依赖
    • 4.2 配置application.yml
    • 4.3 创建元数据信息Bean
    • 4.4 定义Service
    • 4.5 创建Mapper
    • 4.6 测试

一、技术

SpringBoot + Mybatis Plus

二、构建SpringBoot工程

2.1 创建maven工程并配置 pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.17</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.songshuang</groupId>
    <artifactId>dwmeta</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- 必须 ,用于开发一个web项目-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 测试必须加 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- 连接hive的元数据服务 -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>3.1.2</version>
        </dependency>

        <!-- json处理 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>
    </dependencies>

</project>

2.2 编写配置文件 application.yml

null

2.3 编写配置文件 application.propertites

hive.client.uri:hive元数据服务metastore地址
hdfs.admin.user:hdfs用户
hdfs.uri:hdfs NameNode RPC端口

hive.client.uri=thrift://hadoop102:9083
hdfs.admin.user=hadoop
hdfs.uri=hdfs://hadoop102:9820

2.4 开发主启动类

java">package com.songshuang.dga;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

//当前这个类是App的主启动类
@SpringBootApplication
public class MainApp {

    public static void main(String[] args) {

        //启动app
        SpringApplication.run(MainApp.class, args);

    }
}

2.5 开发配置类

java">import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.net.URI;

/*
    所有的客户端,都应该随用随建,用完就关。
 */
@Configuration
public class DgaConfig {

    @Value("${hive.client.uri}")
    private String hiveUri;

    @Bean
    @Scope("prototype")
    public HiveMetaStoreClient createHiveMetastoreClient(){

        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        //客户端连接服务端,配置地址和端口
        conf.set("hive.metastore.uris",hiveUri);

        try {
            HiveMetaStoreClient client = new HiveMetaStoreClient(conf);
            return client;
        } catch (MetaException e) {
            throw new RuntimeException(e);
        }
    }

    @Value("${hdfs.admin.user}")
    private String hdfsAdmin;

    @Value("${hdfs.uri}")
    private String hdfsUri;
    @Bean
    @Scope("prototype")
    public FileSystem createHDFSClient(){

        try {
            FileSystem hdfsClient = FileSystem.get(new URI(hdfsUri), new org.apache.hadoop.conf.Configuration(), hdfsAdmin);
            return hdfsClient;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

三、测试抽取Hive、HDFS元数据

连接Metastore服务抽取Hive元数据;连接NameNode抽取HDFS元数据;

java">import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.thrift.TException;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;

import java.io.IOException;

/**
 * @date 2024/1/29 16:27
 */

@SpringBootTest
public class MetaTest {
    @Autowired
    private ApplicationContext context;

    @Test
    public void testHiveClient() throws TException {
        HiveMetaStoreClient client = context.getBean(HiveMetaStoreClient.class);
        //获取库下所有的表
        System.out.println(client.getAllTables("dw_ods"));
        //获取某张表的元数据信息
        System.out.println(client.getTable("dw_ods", "ods_activity_info_full"));
        client.close();
    }

    @Test
    public void testHDFSClient() throws IOException {
        //1.获取hdfs客户端
        FileSystem hdfsClient = context.getBean(FileSystem.class);
        //2.遍历tableMetaInfos,为每一个TableMetaInfo补充hdfs的元数据信息
        FsStatus status = hdfsClient.getStatus();
        long capacity = status.getCapacity();
        long remaining = status.getRemaining();
        long used = status.getUsed();
        System.out.println("capacity:" + capacity + "remaining:" + remaining + "used:" + used );
    }
}

四、将抽取的元数据存储到MySQL

4.1 引入依赖

        <!-- 使用springboot插件,不会和springboot的其他插件冲突了 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.15</version>
        </dependency>

        <!-- 驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>

        <!-- 动态数据源切换,允许使用一个注解,可以切换Dao查询的数据源
                内置了数据库连接池,会和之前配置的Druid冲突
         -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>2.5.8</version>
        </dependency>

        <!-- 注释掉mybatis,否则会冲突 -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.1</version>
        </dependency>


        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
            <version>3.5.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity-engine-core</artifactId>
            <version>2.3</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-freemarker</artifactId>
        </dependency>

4.2 配置application.yml

spring:
  datasource:
    dynamic:
      primary: dga #设置默认的数据源或者数据源组
      strict: false #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
      datasource:
        dga:
          url: jdbc:mysql://mall:3306/dga?useSSL=false&useUnicode=true&characterEncoding=UTF-8
          username: root
          password: "123456"
          driver-class-name: com.mysql.cj.jdbc.Driver
          druid:
            initial-size: 5
            max-active: 20
            max-wait: 60000
            min-idle: 5
            test-on-borrow: true
            test-on-return: false
            test-while-idle: true
  autoconfigure:
    exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure

mybatis-plus:
  mapper-locations: classpath*:/sqls/*Mapper.xml
  configuration:
    mapUnderscoreToCamelCase: true

logging:
  level:
    com:
      songshuang:
        dga:
          meta:
            mapper: debug

server:
  port: 80

4.3 创建元数据信息Bean

java">import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.sql.Timestamp;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * <p>
 * 元数据表附加信息
 * </p>
 *
 * @since 2024-01-29
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("table_meta_info_extra")
public class TableMetaInfoExtra implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * id
     */
    @TableId(value = "id", type = IdType.AUTO)
    private Long id;

    /**
     * 表名
     */
    private String tableName;

    /**
     * 库名
     */
    private String schemaName;

    /**
     * 技术负责人
     */
    private String tecOwnerUserName;

    /**
     * 业务负责人
     */
    private String busiOwnerUserName;

    /**
     * 存储周期类型
     */
    private String lifecycleType;

    /**
     * 生命周期(天)
     */
    private Long lifecycleDays;

    /**
     * 安全级别
     */
    private String securityLevel;

    /**
     * 数仓所在层级
     */
    private String dwLevel;

    /**
     * 创建时间 (自动生成)
     */
    private Timestamp createTime;

    /**
     * 更新时间  (自动生成)
     */
    private Timestamp updateTime;
}

4.4 定义Service

java">import com.baomidou.mybatisplus.extension.service.IService;
import com.songshuang.dga.meta.bean.TableMetaInfoExtra;
import org.apache.hadoop.hive.metastore.api.MetaException;

public interface TableMetaInfoExtraService extends IService<TableMetaInfoExtra> {

    //生成所有表的辅助信息。
    void initMetaInfoExtra(String db) throws MetaException;

}
java">import com.songshuang.dga.config.MetaConstant;
import com.songshuang.dga.meta.bean.TableMetaInfoExtra;
import com.songshuang.dga.meta.mapper.TableMetaInfoExtraMapper;
import com.songshuang.dga.meta.service.TableMetaInfoExtraService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;

import java.sql.Timestamp;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * <p>
 * 元数据表附加信息 服务实现类
 * </p>
 *
 * @since 2024-01-29
 */
@Service
public class TableMetaInfoExtraServiceImpl extends ServiceImpl<TableMetaInfoExtraMapper, TableMetaInfoExtra> implements TableMetaInfoExtraService {

    @Autowired
    private ApplicationContext context;
    /*
        辅助信息不经常变动。
            只有当你去创建新表的时候,才需要想数据库中写入新表的辅助信息。
            如果一张表已经有了辅助信息,无需写入

            调用initMetaInfoExtra(),只需要写入新表(今天刚创建表的元数据信息)
     */
    @Override
    public void initMetaInfoExtra(String db) throws MetaException {

        //查询当前db中的新表
        //第一步: 先查询table_meta_info_extra中当前db已经有信息的表。 老表
        Set<String> existsTableNames = list(new QueryWrapper<TableMetaInfoExtra>().eq("schema_name", db))
                .stream()
                .map(info -> info.getTableName())
                .collect(Collectors.toSet());

        //第二步: 查询db下所有的表,根据老表,过滤得到新表
        HiveMetaStoreClient client = context.getBean(HiveMetaStoreClient.class);
        List<String> allTables = client.getAllTables(db);
        List<String> newTables = allTables.stream()
                .filter(name -> !existsTableNames.contains(name))
                .collect(Collectors.toList());

        //为新表生成辅助信息,存入到数据库中
        List<TableMetaInfoExtra> infos = newTables.stream()
                .map(name -> {
                    TableMetaInfoExtra extra = new TableMetaInfoExtra();
                    extra.setSchemaName(db);
                    extra.setTableName(name);
                    //其他的信息应该由员工手动录入,这里为了后续方便,初始化一些默认值,假设员工已经录入了
                    initExtraInfo(extra);
                    extra.setCreateTime(new Timestamp(System.currentTimeMillis()));
                    return extra;
                })
                .collect(Collectors.toList());

        saveBatch(infos);

    }

    private void initExtraInfo(TableMetaInfoExtra extra) {

        String [] bon = {"张三","李四","王五","赵六"};
        String [] ton = {"张小三","李中四","王大五","赵老六"};

        extra.setBusiOwnerUserName(bon[RandomUtils.nextInt(0,bon.length)]);
        extra.setTecOwnerUserName(ton[RandomUtils.nextInt(0,ton.length)]);
        extra.setLifecycleType(MetaConstant.LIFECYCLE_TYPE_UNSET);
        extra.setLifecycleDays(-1l);
        extra.setSecurityLevel(MetaConstant.SECURITY_LEVEL_UNSET);
        extra.setDwLevel(extra.getTableName().substring(0,3).toUpperCase());
    }
}

4.5 创建Mapper

java">import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.songshuang.dga.meta.bean.TableMetaInfoExtra;
import org.apache.ibatis.annotations.Mapper;

/**
 * @date 2024/1/29 19:44
 */
@Mapper
public interface TableMetaInfoExtraMapper extends BaseMapper<TableMetaInfoExtra> {
}

4.6 测试

java">    @Autowired
    private TableMetaInfoExtraService extraService;
    @Test
    public void testExtraInfo() throws Exception {
        extraService.initMetaInfoExtra("dw_ods");
    }

http://www.niftyadmin.cn/n/5356073.html

相关文章

《向量数据库指南》——AIGC 需求的快速变化,催生了Milvus Cloud向量数据库的超高速迭代

对于“版本”成为热度排名第一的关键词,我开始是有点意外的,仔细一想似乎也在情理之中。2023年,是 AIGC 大爆发的一年,LLM 展现出了强大的分析、推理、归纳、总结能力。但是,由于缺乏最新的和特定领域的训练数据,大模型“幻觉”成为困扰 AIGC 开发者的一大难题。随着 RAG…

TCP/IP网络模型

大家好我是苏麟 , 今天聊聊TCP/IP四层网络模型 . 资料来源 : 小林coding 小林官方网站 : 小林coding (xiaolincoding.com) 应用层 最上层的&#xff0c;也是我们能直接接触到的就是应用层&#xff08;Application Layer&#xff09;&#xff0c;我们电脑或手机使用的应用软件都…

开源博客项目Blog .NET Core源码学习(8:EasyCaching使用浅析)

开源博客项目Blog使用EasyCaching模块实现缓存功能&#xff0c;主要是在App.Framwork项目中引用了多类包&#xff0c;包括内存缓存&#xff08;EasyCaching.InMemory&#xff09;、Redis缓存&#xff08;EasyCaching.CSRedis&#xff09;&#xff0c;同时支持多种序列化方式&am…

Spring AOP实现

Spring AOP实现 AOP概述什么是AOP什么是Spring AOP Spring AOP快速入门引入依赖实现计时器 Spring AOP详解Spring AOP核心概念切点(Pointcut)连接点(Join Point)通知(Advice)切面(Aspect) 通知类型注意事项 PointCut多个切面切面优先级 Order切点表达式execution表达式annotati…

Redis -- 开篇热身,常用的全局命令

目录 Redis重要文件 启动停止脚本 配置文件 持久化文件存储目录 核心命令 set get 全局命令 keys exists del expire ttl 过期策略是如何实现的 定时器 type 小结 Redis重要文件 启动停止脚本 /usr/bin/redis-benchmark &#xff1a; 用于对Redis做性能基准…

重置vCenter的root和administrator@vsphere.local密码

1&#xff1a;首先要重置root密码&#xff0c;登录vCenter安装的ESXI主机&#xff0c;重启vCenter。 2:&#xff1a;重启机器的出现下面界面的时候按e键。 3&#xff1a;按e后出现下面的界面。 4&#xff1a;在最后一行结尾处输入rw init/bin/bash ,之后按ctrl-x或者F10重启。 …

《区块链简易速速上手小册》第3章:区块链的类型(2024 最新版)

文章目录 3.1 公共区块链3.1.1 公共区块链基础3.1.2 主要案例&#xff1a;比特币3.1.3 拓展案例 1&#xff1a;以太坊3.1.4 拓展案例 2&#xff1a;去中心化金融&#xff08;DeFi&#xff09; 3.2 私有区块链3.2.1 私有区块链基础3.2.2 主要案例&#xff1a;Hyperledger Fabric…

JavaScript 的 ~~ 运算和floor 的性能差异

在JavaScript中&#xff0c;~~&#xff08;双波浪号&#xff09;和Math.floor()都可以用于向下取整&#xff0c;但它们在行为和性能上有一些差异。要测试这两者之间的性能差异&#xff0c;你可以使用JavaScript的performance.now()方法来进行基准测试。 行为差异 Math.floor()…