Flume日志采集流程(log->kafka->hdfs)

news/2024/5/20 1:30:25 标签: flume, log4j, kafka, java, hdfs

埋点数据:用户访问业务服务器如Nginx,利用log4j的技术,将客户端的埋点数据以日志的形式记录在文件中

服务器日志文件——>HDFS文件

日志文件——>Flume(agent source(interceptor) channel)——>kafka topic ——>
Flume(agent source(interceptor) channel sink) ——> HDFS文件

环境:Hadoop+zookeeper+kafka+flume

准备好日志文件放入本地或hdfs
日志文件appdemo.log

linux>vi appdemo.log
{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"4"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548955000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"6"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548957000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"8"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548959000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"17"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":""
,"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548960000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"4"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548961000"}{"eventid":"clickChannelEvent","event":{"belongTab":"6","channelIndex":"6","channelID":"6","channelName":""},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegist
ered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548963000"}{"eventid":"appviewEvent","event":{"screen_id":"681","screen_name":"","title":""},"user":{"uid":"611429","account":"","email":"","phoneNbr":"15669989777","birthday":"","isRegistered":"","isLogin":"","addr
":"","gender":"","phone":{"imei":"2478250569537801","mac":"a1-27-fd-a6-9a-0a-c9","imsi":"5597152087882061","osName":"windows","osVer":"9.0","androidId":"","resolution":"800*600","deviceType":"XIAOLAJIAO","deviceId":"2Ry864","uuid":"EFVu1NVmld9Dpykq"},"app":{"appid":"cn.kgc.mall","appVer":"1.0.0","release_ch":"应用宝","promotion_ch":"04"},"loc":{"areacode":130403205,"longtitude":114.43980636414214,"latitude":36.690319995072464,"carrier":"ISP07","netType":"N","cid_sn":"021803165366","ip":"42.24.217.87"},"sessionId":"sid-e760ea4b-5016-4eb3-bd6c-a9eaa2f27314"},"timestamp":"1575595490000"}{"eventid":"webStayEvent","event":{"pgid":"681","title":"","url":"http://www.kgcedu.cn/acc/pg681"},"user":{"uid":"611429","account":"","email":"","phoneNbr":"15669989777","birthday":"","isRegistered":"","
isLogin":"","addr":"","gender":"","phone":{"imei":"2478250569537801","mac":"a1-27-fd-a6-9a-0a-c9","imsi":"5597152087882061","osName":"windows","osVer":"9.0","androidId":"","resolution":"800*600","deviceType":"XIAOLAJIAO","deviceId":"2Ry864","uuid":"EFVu1NVmld9Dpykq"},"app":{"appid":"cn.kgc.mall","appVer":"1.0.0","release_ch":"应用宝","promotion_ch":"04"},"loc":{"areacode":130403205,"longtitude":114.43980636414214,"latitude":36.690319995072464,"carrier":"ISP07","netType":"N","cid_sn":"021803165366","ip":"42.24.217.87"},"sessionId":"sid-e760ea4b-5016-4eb3-bd6c-a9eaa2f27314"},"timestamp":"1575595491000"}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test.flume</groupId>
    <artifactId>flume-interceptor</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.69</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

idea:编写json连接器过滤出json数据

package flume.intercepter;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

//第一个拦截器
public class JsonInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        /**
         * agent 中传递的数据格式就是一个event:header body
         * 文件:一行就是一个event
         */

        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        /**
         * 每条记录是否是一个完整的jons记录
         */
        if (JsonUtil.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        Iterator<Event> iterator = events.iterator();
        while (iterator.hasNext()) {
            Event next = iterator.next();
            if (intercept(next) == null) {
                iterator.remove();
            }
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new JsonInterceptor();
        }


        @Override
        public void configure(Context context) {

        }
    }
}

Idea:编写时间戳接口(用于flume采集到hdfs上时将时间设为数据时间而非hdfs的时间)

package flume.intercepter;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

//第二个拦截器 提前时间戳放入headers里
public class TimestampIntercepter implements Interceptor {

    List<Event> events=new ArrayList<>();

    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
         Map<String,String> headers=event.getHeaders();      //取头部
         String log=new String(event.getBody(), StandardCharsets.UTF_8); //取主体

         JSONObject jsonObject = JSON.parseObject(log); //解析里面的东西
         String ts = jsonObject.getString("timestamp");

         headers.put("timestamp", ts);    //放到里面

    //取出日志内的时间,否则落盘时可能是系统时间
           return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        events.clear();
        for (Event event : list) {
            events.add(intercept(event));
        }
        return events;
    }

    @Override
    public void close() {
    }


    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampIntercepter();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

编写flume运行文件
LogToKafka:

linux>vi log_to_kafka.conf
##为各组件命名
logToKafka.sources = logtokafkasource
logToKafka.channels = logkafkachannel

#描述source
logToKafka.sources.logtokafkasource.type = TAILDIR
logToKafka.sources.logtokafkasource.filegroups = f1
logToKafka.sources.logtokafkasource.filegroups.f1 = /root/tmp_data/log/appdemo.*
logToKafka.sources.logtokafkasource.positionFile = /root/tmp_data/log/taildir_position.json
logToKafka.sources.logtokafkasource.interceptors =  i1
logToKafka.sources.logtokafkasource.interceptors.i1.type = flume.intercepter.JsonInterceptor$Builder
#filegroups 可以指定多个文件夹	positionFile 采集完后记录文件位置
#描述channel
logToKafka.channels.logkafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel
logToKafka.channels.logkafkachannel.kafka.bootstrap.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
logToKafka.channels.logkafkachannel.kafka.topic = appdemo_log
logToKafka.channels.logkafkachannel.parseAsFlumeEvent = false

#绑定source和channel以及sink和channel的关系
logToKafka.sources.logtokafkasource.channels = logkafkachannel

KafkaToHDFS:

linux>vi kafka_to_hdfs.conf
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
a1.sources.r1.kafka.topics=appdemo_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = flume.intercepter.TimestampIntercepter$Builder

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/tmp_data/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /root/tmp_data/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /data/log/appdemo_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
#a1.sinks.k1.hdfs.fileType = CompressedStream
#a1.sinks.k1.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

注意:运行kafka_to_hdfs.conf 需要提前在本地创好文件夹

a1.channels.c1.checkpointDir = /root/tmp_data/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /root/tmp_data/flume/data/behavior1/

这两项需要创建好文件夹

linux>mkdir -p /root/tmp_data/flume/checkpoint/behavior1
linux>mkdir -p /root/tmp_data/flume/data/behavior1/

注意:idea拦截器编辑好后 打包放入flume/lib中

linux>mv flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/install/flume/lib

运行flume进行logToKafka

linux>bin/flume-ng agent --conf conf --conf-file log_to_kafka.conf --name logToKafka -Dflume.root.logger=INFO,console

运行flume进行KafkaToHdfs

linux>bin/flume-ng agent --conf conf --conf-file kafka_to_hdfs.conf --name a1 -Dflume.root.logger=INFO,console

报错1:运行完后异常 hdfs无文件,需要重置kafka topic 偏移量否则不读或缺数据

旧版kafka重置偏移量:
linux>kafka-streams-application-reset.sh --zookeeper 192.168.58.201:2181,192.168.58.202:2181,192.168.58.203:2181 --bootstrap-servers 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092--application-id flume --input-topics appdemo_log
旧版kafka查看topic的记录的条数
linux>bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092- --topic appdemo_log -time -1 --offsets 1

新版kafka重置偏移量:
linux>kafka-consumer-groups.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --group flume --reset-offsets --topic appdemo_log --to-earliest --execute
#查看组
kafka-consumer-groups.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --list	
#查看所有Topic
kafka-topics.sh --list --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092

#查看Topic
linux>kafka-topics.sh --bootstrap-server  192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --describe  --topic appdemo_log	

#删除Topic
linux>kafka-topics.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --delete --topic  appdemo_log	
查看topic的记录的条数
linux>kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --topic appdemo_log	 --time -1

#消费消息(可能端口号与配置文件保持一致,或与发送端口保持一致)
linux>kafka-console-consumer.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --topic appdemo_log	--from-beginning   #加了--from-beginning 重头消费所有的消息
linux>kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic appdemo_log        #不加--from-beginning 从最新的一条消息开始消费


#查询topic的offset的范围offset的最小值
linux>kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092  -topic appdemo_log	 --time -2
#查询offset的最大值:
linux>bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 -topic appdemo_log  --time -1

报错2:java.lang.OutOfMemoryError: Java heap space 内存不足

###解决flume内存不足 OOM  在flume-env.sh
linux>vi flume-env.sh
export JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

结果:

 


http://www.niftyadmin.cn/n/178667.html

相关文章

【微信小程序】随手笔记

标题微信小程序开发笔记 文章目录标题微信小程序开发笔记一&#xff0c;引言二&#xff0c;写小程序一定要看官方文档三&#xff0c;文件目录四&#xff0c;全局配置文件五&#xff0c;公共配置1. 公共配置文件2. 公共照片文件3. 页面模板六&#xff0c;页面配置七&#xff0c;…

linux查看端口开启

linux查看端口开启

MapReduce数据倾斜产生的原因及其解决方案

在shuffle的时候&#xff0c;必须在各个节点尚来去相同的key到某个节点尚的一个task来进行处理&#xff0c;比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大&#xff0c;就会发生数据倾斜。比如大部分key对饮10条数据&#xff0c;但个别key对应了100万条数…

leetcode -- 21. 合并两个有序链表

&#x1f428;目录&#x1f4d1;1. 题目&#x1f6f6;2. 解法- 头插到新链表&#x1f42c;2.1 思路&#x1f42c;2.1 代码实现⛵3. 解法优化 - 带哨兵位&#x1f40b;3.1 思路&#x1f40b;3.2 代码实现&#x1f6a4;4. 题目链接&#x1f4d1;1. 题目 将两个升序链表合并为一个…

execl文件中有多个sheet,并且每个sheet以byte数组存在数据库中,现在要把数据库中把execl导出来?

文章目录 execl文件中有多个sheet,并且每个sheet以byte数组存在数据库中,现在要把数据库中把execl导出来?结语execl文件中有多个sheet,并且每个sheet以byte数组存在数据库中,现在要把数据库中把execl导出来? 要把数据库中的 Excel 导出,可以按照以下步骤进行: 从数据库中…

经典算法面试题——Java篇-附带赠书活动,评论区随机选取一人赠书

目录 一.图书推荐 二.说一下什么是二分法&#xff1f;使用二分法时需要注意什么&#xff1f;如何用代码实现&#xff1f; 三.什么是插入排序&#xff1f;用代码如何实现&#xff1f; 四.什么是冒泡排序&#xff1f;用代码如何实现&#xff1f; 五.什么是斐波那契数列&#…

从暴力递归到动态规划(2)小乖,你也在为转移方程而烦恼吗?

前引&#xff1a;继上篇我们讲到暴力递归的过程&#xff0c;这一篇blog我们将继续对从暴力递归到动态规划的实现过程&#xff0c;与上篇类似&#xff0c;我们依然采用题目的方式对其转化过程进行论述。上篇博客&#xff1a;https://blog.csdn.net/m0_65431718/article/details/…

不做孔乙己也不做骆驼祥子

对教书育人的探讨前言一、为什么要“育人”1.育人为先2.育人是快乐的二、怎么“育人”前言 借着本次师德师风建设的主题&#xff0c;跟各位老师谈一谈对于“育人”的一些观点&#xff0c;和教育的一些看法。本文仅代表自己的观点&#xff0c;有不到位的地方&#xff0c;大家可以…