Flume多进程传输

news/2024/5/20 0:49:37 标签: flume, 大数据, kafka, hdfs

img

1.Flume介绍

Flume 是一种分布式、可靠且可用的服务,用于高效收集、聚合和移动大量日志数据。它具有基于流数据流的简单而灵活的架构。它具有鲁棒性和容错性,具有可调的可靠性机制和许多故障转移和恢复机制。它使用简单的可扩展数据模型,允许在线分析应用程序。
img
Flume是一个即装即用的传输组件,下载安装后配置conf文件即可使用,非常方便。支持文件存储压缩、多进程传输、动态修改配置文件、负载均衡和错误恢复等。

2.场景分析

在使用flumekafka中数据流保存到HDFS中时,由于数据量过大,2g~5g/min,数据传输慢,主要瓶颈在于为hdfs保存:采取的解决办法主要为:

  1. 将file channel 调整为 memory channel,降低本地磁盘压力
  2. 将sink单进程调整为多进程

💡
多sink可以直接按常规配置,这样的话每个sink会启动一个sinkrunner,相当于每个线程一个sink,互不干扰,负载均衡是通过channel实现的,效率会提高为n倍,如果在此基础上加入sinkgroup,则sinkgroup会启动一个sinkrunner,就是单线程,sinkgroup从channel中读取数据,然后分发到下面挂载的sink中,效率和单sink一样,没有提高,但是可以实现两个sink的负载均衡或者热备模式。

3.问题解决

配置文件

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2

#配置source
a1.sources.r1.type= org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 2000
a1.sources.r1.kafka.consumer.group.id= xxx
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = xxx:9092
a1.sources.r1.kafka.topics = xxx,xxxx
a1.sources.r1.kafka.consumer.auto.offset.reset = latest
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =xxxx$Builder



#memory channel
a1.channels.c1.type = memory
#channel的event个数
a1.channels.c1.capacity = 20000
#事务event个数
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 2147483648

#配置channel
#a1.channels.c1.type = file
#a1.channels.c1.checkpointDir =/data/xxx
#a1.channels.c1.dataDirs = /data/module/xxx
#a1.channels.c1.maxFileSize = 2147483648
#a1.channels.c1.capacity = 2000000
#a1.channels.c1.transactionCapacity=20000
#a1.channels.c1.keep-alive = 6
#a1.chhannels.c1.checkpointInterval=60000
#a1.minimumRequirdSpace=26214400

#配置sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =/hadoop/dm_dw/tmp_data/log/%{table}/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = log1
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 360
a1.sinks.k1.hdfs.rollSize = 1174405120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize=3000
#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip


#配置sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path =/hadoop/dm_dw/tmp_data/log/%{table}/%Y%m%d/%H
a1.sinks.k2.hdfs.filePrefix = log2
a1.sinks.k2.hdfs.round = false
a1.sinks.k2.hdfs.rollInterval = 360
a1.sinks.k2.hdfs.rollSize = 1174405120
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.batchSize=3000
#控制输出文件类型
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.codeC = gzip


#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

#k1.batchSize+k2.batchSize < c1.capacity

启动命令

 nohup /data/module/flume-1.9.0/bin/flume-ng agent -Xms1024m -Xmx2048m -n a1 -c /data/module/flume-1.9.0/conf -f /data/module/flume-1.9.0/job/test.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=36001  >/dev/null 2>&1 &

http://www.niftyadmin.cn/n/5372656.html

相关文章

H12-821_23

23.网络管理员在某台路由器上查看BGP部居信息,邻居信息如图所示,关于该信息下列说法中正确的是? A.该路由器未从对等体接收到BGP路由前缀 B.本地路由器的Router ID为1.1.1.1 C.与对等体3.3.3 3接收、发送的报文数量分别为10、20个 D.与对等体3.3.3.3之间的邻居关系为IBGP对等体…

基于物联网的实时数据分析(简单介绍)

在当今这个信息化、数字化飞速发展的时代&#xff0c;物联网&#xff08;Internet of Things, IoT&#xff09;和实时数据分析成为了技术革新的两大支柱。对于刚入行的新手来说&#xff0c;理解这两个概念及其相互作用不仅是迈入这一领域的第一步&#xff0c;更是掌握未来技术趋…

9.手写bind

bind 函数的实现步骤&#xff1a; 判断调用对象是否为函数&#xff0c;即使我们是定义在函数的原型上的&#xff0c;但是可能出现使用 call 等方式调用的情况。保存当前函数的引用&#xff0c;获取其余传入参数值。创建一个函数返回函数内部使用 apply 来绑定函数调用&#xf…

PYTHON 120道题目详解(67-69)

67.Python中的字符串常量是用单引号还是双引号定义&#xff1f; 在Python中&#xff0c;字符串常量可以用单引号&#xff08;&#xff09;或双引号&#xff08;"&#xff09;来定义。两种方式都是有效的&#xff0c;并且没有功能上的区别。你可以根据自己的喜好和代码的可…

fyne x86 32位

条件&#xff1a; gcc 32位 任意go环境&#xff08;x86 x64均可&#xff09; fyne 编译&#xff1a; set goarch386 fyne package

P59---第二阶段B C 相电流

P59---第二阶段B C 相电流

hexo部署到gitee(码云)

引言 Hexo 是一个基于Node.js的静态博客框架&#xff0c;而 Gitee&#xff08;也被称为码云&#xff09;是一个国内的代码托管平台&#xff0c;支持 Git 版本控制系统&#xff0c;与 GitHub 类似。将 Hexo 部署到 Gitee Pages 可以让你的博客受益于 Gitee 的国内服务器&#xf…

深入理解Go的垃圾回收机制

导语 如果你是一位Golang的开发者&#xff0c;你一定对于语言特性和详细结构有所了解。但是&#xff0c;你是否曾经停下来深入研究过Golang背后复杂而强大的垃圾回收&#xff08;GC&#xff09;机制&#xff1f;在这篇文章中&#xff0c;我们将具体深入探讨Golang垃圾回收机制…