一百七十二、Flume——Flume采集Kafka数据写入HDFS中(亲测有效、附截图)

news/2024/5/20 4:51:36 标签: flume, kafka, hdfs

一、目的

作为日志采集工具Flume,它在项目中最常见的就是采集Kafka中的数据然后写入HDFS或者HBase中,这里就是用flume采集Kafka的数据导入HDFS中

二、各工具版本

(一)Kafka

kafka_2.13-3.0.0.tgz

(二)Hadoop(HDFS)

hadoop-3.1.3.tar.gz

(三)Flume

apache-flume-1.9.0-bin.tar.gz

三、实施步骤

(一)到flume的conf的目录下

# cd  /home/hurys/dc_env/flume190/conf

(二)创建配置文件evaluation.properties

# vi  evaluation.properties

### Name agent, source, channels and sink alias
a1.sources = s1
a1.channels = c1
a1.sinks = k1

### define kafka source
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource

# Maximum number of messages written to Channel in one batch
a1.sources.s1.batchSize = 5000

# Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached.
a1.sources.s1.batchDurationMillis = 2000

# set kafka broker address
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092

# set kafka consumer group Id and offset consume
# 官网推荐1.9.0版本只设置了topic,但测试后不能正常消费,需要添加消费组id(自己写一个),并定义偏移量消费方式
a1.sources.s1.kafka.consumer.group.id = evaluation_group
a1.sources.s1.kafka.consumer.auto.offset.reset = earliest

# set kafka topic
a1.sources.s1.kafka.topics = topic_b_evaluation


### defind hdfs sink
a1.sinks.k1.type = hdfs
# set store hdfs path
a1.sinks.k1.hdfs.path = hdfs://hurys22:8020/rtp/evaluation/evaluation_%Y-%m-%d
# set file size to trigger roll
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.threadsPoolSize = 30
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text


### define channel from kafka source to hdfs sink
# memoryChannel:快速,但是当设备断电,数据会丢失
# FileChannel:速度较慢,即使设备断电,数据也不会丢失
a1.channels.c1.type = file
# 这里不单独设置checkpointDir和dataDirs文件位置,参考官网不设置会有默认位置
# channel store size
a1.channels.c1.capacity = 100000
# transaction size
a1.channels.c1.transactionCapacity = 10000


### 绑定source、channel和sink
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

(三)配置文件创建好后启动flume服务

# cd /home/hurys/dc_env/flume190/

# ./bin/flume-ng agent -n a1  -f /home/hurys/dc_env/flume190/conf/evaluation.properties

(四)到HDFS文件里验证一下

HDFS中生成evaluation_2023-09-07 文件夹,里面有很多小文件

(五)注意:小文件里的数据是JSON格式,即使我设置文件后缀名为csv也没用(可能配置文件中的文件类型设置需要优化

a1.sinks.k1.hdfs.writeFormat=Text

(六)jps查看Flume的服务

[root@hurys22 conf]# jps
16801 ResourceManager
4131 Application
18055 AlertServer
16204 DataNode
22828 Application
17999 LoggerServer
2543 launcher.jar
22224 Application
17393 QuorumPeerMain
16980 NodeManager
17942 WorkerServer
16503 SecondaryNameNode
11384 Application
32669 Application
17886 MasterServer
10590 Jps
16031 NameNode
18111 ApiApplicationServer

注意:Application就是Flume运行的任务

(七)关闭Flume服务

如果想要关闭Flume服务,直接杀死服务就好了

# kill -9 32669

(八)checkpointDir和dataDirs默认的文件位置

默认的文件位置:/root/.flume/file-channel/

总之,Flume这个工具的用法还需进一步研究优化,当然kettle也可以,所以这个项目目前还是用kettle吧!


http://www.niftyadmin.cn/n/5009020.html

相关文章

uniapp 高度铺满全屏

问题&#xff1a;在有uni-tabbar的情况下&#xff0c;页面铺满剩下的部分 <template><view :style"{height:screenHeightpx}" class"page"></view> </template> <script>export default {data() {return {screenHeight: &q…

lambda表达式介绍

前言 lambda表达式是C11标准才支持的&#xff0c;有了它以后在一些地方进行使用会方便很多&#xff0c;尤其在一些需要仿函数的地方&#xff0c;lambda表达式完全可以替代它的功能。代码的可读性也会提高。 目录 1.lambda表达式 2.lambda表达式语法 3.函数对象和lambda表达…

pytorch再次学习

目录 数据可视化切换设备device定义类打印每层的参数大小自动微分计算梯度禁用梯度追踪优化模型参数 模型保存模型加载 数据可视化 import torch from torch.utils.data import Dataset from torchvision import datasets from torchvision.transforms import ToTensor import…

基于nginx+keepalived的负载均衡、高可用web集群

目录 基于nginxkeepalived的负载均衡、高可用web集群关闭所有机器的相关服务一、按照IP规划配置好静态IP。二、建立免密通道&#xff0c;使用Ansible自动化批量部署软件环境1、安装配置ansible2、编写主机清单3、在ansible上生成密钥对&#xff0c;并将公钥复制到其他主机上4、…

l8-d8 TCP并发实现

一、TCP多进程并发 1.地址快速重用 先退出服务端&#xff0c;后退出客户端&#xff0c;则服务端会出现以下错误&#xff1a; 地址仍在使用中 解决方法&#xff1a; /*地址快速重用*/ int flag1,len sizeof (int); if ( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &a…

【C++】—— 单例模式详解

前言&#xff1a; 本期&#xff0c;我将要讲解的是有关C中常见的设计模式之单例模式的相关知识&#xff01;&#xff01; 目录 &#xff08;一&#xff09;设计模式的六⼤原则 &#xff08;二&#xff09;设计模式的分类 &#xff08;三&#xff09;单例模式 1、定义 2、…

项目01—基于nignx+keepalived双vip的负载均衡高可用Web集群

文章目录 一.项目介绍1.拓扑图2.详细介绍 二.前期准备1.项目环境2.IP划分 三. 项目步骤1.ansible部署软件环境1.1 安装ansible环境1.2 建立免密通道1.3 批量部署nginx 2.配置NFS服务器和负载均衡器搭建keepalived2.1 修改nginx的index.html界面2.2 nginx实现七层负载均衡2.4 使…

信息检索与数据挖掘 |(一)介绍

文章目录 &#x1f4da;信息检索&#x1f407;概念&#x1f407;结构化与非结构化数据&#x1f407;信息检索的基本假设&#x1f407;信息检索小结&#x1f407;附&#xff1a;IR新课题 &#x1f4da;数据挖掘&#x1f407;定义&#x1f407;数据挖掘 vs 机器学习 &#x1f4da…