2.2 如何使用FlinkSQL读取写入到文件系统(HDFS\Local\Hive)

news/2024/5/20 2:22:13 标签: hdfs, 大数据, 服务器

目录

1、文件系统 SQL 连接器

2、如何指定文件系统类型

3、如何指定文件格式

4、读取文件系统

4.1 开启 目录监控 

4.2 可用的 Metadata

5、写出文件系统

5.1 创建分区表

5.2 滚动策略、文件合并、分区提交

5.3 指定 Sink Parallelism

6、示例_通过FlinkSQL读取kafka在写入hive表

6.1、创建 kafka source表用于读取kafka

hdfs%20sink%E8%A1%A8%E7%94%A8%E4%BA%8E%E5%86%99%E5%87%BA%E5%88%B0hdfs-toc" style="margin-left:40px;">6.2、创建 hdfs sink表用于写出到hdfs

hdfs_sink_table-toc" style="margin-left:40px;">6.3、insert into 写入到 hdfs_sink_table

hdfs_sink_table-toc" style="margin-left:40px;">6.4、查询 hdfs_sink_table

6.5、创建hive表,指定local


1、文件系统 SQL 连接器

文件系统连接器允许从本地分布式文件系统进行读写数据

官网链接:文件系统 SQL 连接器


2、如何指定文件系统类型

创建表时通过 'path' = '协议名称:///path' 来指定 文件系统类型

参考官网:文件系统类型

CREATE TABLE filesystem_table (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  -- 本地文件系统
  'path' = 'file:///URI',
  -- HDFS文件系统
  'path' = 'hdfs://URI',
  -- 阿里云对象存储 
  'path' = 'oss://URI',
  'format' = 'json'
);

3、如何指定文件格式

FlinkSQL 文件系统连接器支持多种format,来读取和写入文件

比如当读取的source格式为 csv、json、Parquet... 可以在建表是指定相应的格式类型

来对数据进行解析后映射到表中的字段中

CREATE TABLE filesystem_table_file_format (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  -- 指定文件格式类型
  'format' = 'json|csv|orc|raw'
);

4、读取文件系统

FlinkSQL可以将单个文件或整个目录的数据读取到单个表中

注意:

        1、当读取目录时,对目录中的文件进行 无序的读取

        2、默认情况下,读取文件时为批处理模式,只会扫描配置路径一遍后就会停止

             当开启目录监控(source.monitor-interval)时,才是流处理模式

4.1 开启 目录监控 

通过设置 source.monitor-interval 属性来开启目录监控,以便在新文件出现时继续扫描

注意:

        只会对指定目录内新增文件进行读取,不会读取更新后的旧文件

-- 目录监控
drop table filesystem_source_table;
CREATE TABLE filesystem_source_table (
  id INT,
  name STRING,
  `file.name` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016',
  'format' = 'json',
  'source.monitor-interval' = '3' -- 开启目录监控,设置监控时间间隔
);

-- 持续读取
select * from filesystem_source_table;

4.2 可用的 Metadata

使用FLinkSQL读取文件系统中的数据时,支持对 metadata 进行读取

注意: 所有 metadata 都是只读的

-- 可用的Metadata
drop table filesystem_source_table_read_metadata;
CREATE TABLE filesystem_source_table_read_metadata (
  id INT,
  name STRING,
  `file.path` STRING NOT NULL METADATA,
  `file.name` STRING NOT NULL METADATA,
  `file.size` BIGINT NOT NULL METADATA,
  `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  'format' = 'json'
);

select * from filesystem_source_table_read_metadata;

运行结果:


5、写出文件系统

5.1 创建分区表

FlinkSQL支持创建分区表,并且通过 insert into(追加) insert overwrite(覆盖) 写入数据

-- 创建分区表
drop table filesystem_source_table_partition;
CREATE TABLE filesystem_source_table_partition (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  'partition.default-name' = 'default_partition',
  'format' = 'json'
);

-- 动态分区写入
insert into filesystem_source_table_partition
SELECT * FROM (VALUES
  (1,'a','20231010')
, (2,'b','20231010')
, (3,'c','20231011')
, (4,'d','20231011')
, (5,'e','20231012')
, (6,'f','20231012')
) AS user1 (id,name,ds);

-- 静态分区写入
insert into filesystem_source_table_partition partition(ds = '20231010')
SELECT * FROM (VALUES
  (1,'a')
, (2,'b')
, (3,'c')
, (4,'d')
, (5,'e')
, (6,'f')
) AS user1 (id,name);

-- 查询分区表数据
select * from filesystem_source_table_partition where ds = '20231010';

5.2 滚动策略、文件合并、分区提交

可以看之前的博客:flink写入文件时分桶策略

官网链接:官网分桶策略


5.3 指定 Sink Parallelism

当使用FlinkSQL写出到文件系统时,可以通过 sink.parallelism 设置sink算子的并行度

注意:当且仅当上游的 changelog 模式为 INSERT-ONLY 时,才支持配置 sink parallelism。否则,程序将会抛出异常

CREATE TABLE hdfs_sink_table (
  `log` STRING,
  `dt` STRING,  -- 分区字段,天
  `hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  'sink.parallelism' = '2', -- 指定sink算子并行度
  'format' = 'raw'
);

6、示例_通过FlinkSQL读取kafka在写入hive表

需求:

        使用FlinkSQL将kafka数据写入到hdfs指定目录中

        根据kafka的timestamp进行分区(按小时分区)

6.1、创建 kafka source表用于读取kafka

-- TODO 创建读取kafka表时,同时读取kafka元数据字段
drop table kafka_source_table;
CREATE TABLE kafka_source_table(
  `log` STRING,
  `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的时间戳
) WITH (
  'connector' = 'kafka',
  'topic' = '20231017',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'raw'
);

hdfs%20sink%E8%A1%A8%E7%94%A8%E4%BA%8E%E5%86%99%E5%87%BA%E5%88%B0hdfs">6.2、创建 hdfs sink表用于写出到hdfs

drop table hdfs_sink_table;
CREATE TABLE hdfs_sink_table (
  `log` STRING,
  `dt` STRING,  -- 分区字段,天
  `hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  'sink.parallelism' = '2', -- 指定sink算子并行度
  'format' = 'raw'
);

hdfs_sink_table">6.3、insert into 写入到 hdfs_sink_table

-- 流式 sql,插入文件系统表
insert into hdfs_sink_table
select 
	log
	,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt
	,DATE_FORMAT(`timestamp`,'HH') as `hour`
from kafka_source_table;

hdfs_sink_table">6.4、查询 hdfs_sink_table

-- 批式 sql,使用分区修剪进行选择
select * from hdfs_sink_table;

6.5、创建hive表,指定local

create table `kafka_to_hive` (
`log` string comment '日志数据')
 comment '埋点日志数据' PARTITIONED BY (dt string,`hour` string) 
row format delimited fields terminated by '\t' lines terminated by '\n' stored as orc
LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';


http://www.niftyadmin.cn/n/5097323.html

相关文章

【前端设计模式】之外观模式

外观模式是一种结构型设计模式,它提供了一个简单的接口,隐藏了复杂的子系统,并使得客户端能够更方便地使用这些子系统。在前端开发中,外观模式可以帮助我们简化复杂的代码结构,提高代码的可维护性和可读性。 外观模式…

Spring事件ApplicationEvent源码浅读

文章目录 demo应用实现基于注解事件过滤异步事件监听 源码解读总结 ApplicationContext 中的事件处理是通过 ApplicationEvent 类和 ApplicationListener 接口提供的。如果将实现了 ApplicationListener 接口的 bean 部署到容器中,则每次将 ApplicationEvent 发布到…

【ESP32】C语言映射表在嵌入式串口解析中的应用

本文章主要以ESP32开发环境为例记录,C语言映射表在嵌入式串口解析中的应用 【ESP32】C语言映射表在嵌入式串口解析中的应用 一、C语言映射表在串口数据解析中的应用1、数据结构2、指令、函数映射表3、串口解析函数实现 二、实验现象三、实验代码 一、C语言映射表在串…

fastadmin找不到后台控制器。登录之后找不到后台控制器

nginx加配置项 伪静态那块 location / { if (!-e KaTeX parse error: Expected }, got EOF at end of input: … rewrite ^(.*) /index.php?s 1 l a s t ; b r e a k ; r e w r i t e ( . ? p ˙ h p ) ( / . ) 1 last; break; rewrite ^(.?\.php)(/.) 1last;break;rewrit…

linux入门到精通-第四章-gcc编译器

目录 参考gcc概述gcc的工作流程 参考 gcc编译器 gcc概述 编辑器vi、记事本)是指我用它来写程序的 (编辑码),而我们写的代码语句,电脑是不懂的,我们需要把它转成电脑能懂的语句,编译器就是这样的转化工具。就是说,我…

运动品牌如何做到“全都要”?来看看安踏的答案

文 | 螳螂观察 作者 | 易不二 运动鞋服是兼具高景气和清晰格局的优质消费赛道。 中信证券给出的这一预测,欧睿国际也做出了更具体的测算:预计到2027年,中国运动服饰市场规模有望以约为8.7%的年复合增长率,突破5500亿元人民币。…

【已解决】vs2022 编译成功但是疯狂报错E1696找不到源文件

vs2022 编译成功但是疯狂报错E1696找不到源文件 从控制台可以看出,编译成功 但是错误列表里面却有大量的报错,包括但不限于:E1696无法打开源文件,而且打不开的都是标准库文件;错误过多导致智能提示无法工作&#xff…

阿里云优惠券(代金券)免费领取方法及使用教程分享

阿里云优惠券是阿里云提供给用户的一种优惠凭证,通常包括代金券和折扣券,领取之后支付订单时可以抵扣或者打折,是阿里云的一种重要优惠方式。本文将为大家详细介绍阿里云优惠券的免费领取方法及使用教程,帮助大家在购买阿里云产品…