埋点数据:用户访问业务服务器如Nginx,利用log4j的技术,将客户端的埋点数据以日志的形式记录在文件中
服务器日志文件——>HDFS文件
日志文件——>Flume(agent source(interceptor) channel)——>kafka topic ——>
Flume(agent source(interceptor) channel sink) ——> HDFS文件
环境:Hadoop+zookeeper+kafka+flume
准备好日志文件放入本地或hdfs中
日志文件appdemo.log
linux>vi appdemo.log
{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"4"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548955000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"6"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548957000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"8"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548959000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"17"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":""
,"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548960000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"4"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548961000"}{"eventid":"clickChannelEvent","event":{"belongTab":"6","channelIndex":"6","channelID":"6","channelName":""},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegist
ered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548963000"}{"eventid":"appviewEvent","event":{"screen_id":"681","screen_name":"","title":""},"user":{"uid":"611429","account":"","email":"","phoneNbr":"15669989777","birthday":"","isRegistered":"","isLogin":"","addr
":"","gender":"","phone":{"imei":"2478250569537801","mac":"a1-27-fd-a6-9a-0a-c9","imsi":"5597152087882061","osName":"windows","osVer":"9.0","androidId":"","resolution":"800*600","deviceType":"XIAOLAJIAO","deviceId":"2Ry864","uuid":"EFVu1NVmld9Dpykq"},"app":{"appid":"cn.kgc.mall","appVer":"1.0.0","release_ch":"应用宝","promotion_ch":"04"},"loc":{"areacode":130403205,"longtitude":114.43980636414214,"latitude":36.690319995072464,"carrier":"ISP07","netType":"N","cid_sn":"021803165366","ip":"42.24.217.87"},"sessionId":"sid-e760ea4b-5016-4eb3-bd6c-a9eaa2f27314"},"timestamp":"1575595490000"}{"eventid":"webStayEvent","event":{"pgid":"681","title":"","url":"http://www.kgcedu.cn/acc/pg681"},"user":{"uid":"611429","account":"","email":"","phoneNbr":"15669989777","birthday":"","isRegistered":"","
isLogin":"","addr":"","gender":"","phone":{"imei":"2478250569537801","mac":"a1-27-fd-a6-9a-0a-c9","imsi":"5597152087882061","osName":"windows","osVer":"9.0","androidId":"","resolution":"800*600","deviceType":"XIAOLAJIAO","deviceId":"2Ry864","uuid":"EFVu1NVmld9Dpykq"},"app":{"appid":"cn.kgc.mall","appVer":"1.0.0","release_ch":"应用宝","promotion_ch":"04"},"loc":{"areacode":130403205,"longtitude":114.43980636414214,"latitude":36.690319995072464,"carrier":"ISP07","netType":"N","cid_sn":"021803165366","ip":"42.24.217.87"},"sessionId":"sid-e760ea4b-5016-4eb3-bd6c-a9eaa2f27314"},"timestamp":"1575595491000"}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test.flume</groupId>
<artifactId>flume-interceptor</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.69</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
idea:编写json连接器过滤出json数据
package flume.intercepter;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
//第一个拦截器
public class JsonInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
/**
* agent 中传递的数据格式就是一个event:header body
* 文件:一行就是一个event
*/
String log = new String(event.getBody(), StandardCharsets.UTF_8);
/**
* 每条记录是否是一个完整的jons记录
*/
if (JsonUtil.isJSONValidate(log)) {
return event;
} else {
return null;
}
}
@Override
public List<Event> intercept(List<Event> events) {
Iterator<Event> iterator = events.iterator();
while (iterator.hasNext()) {
Event next = iterator.next();
if (intercept(next) == null) {
iterator.remove();
}
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new JsonInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
Idea:编写时间戳接口(用于flume采集到hdfs上时将时间设为数据时间而非hdfs的时间)
package flume.intercepter;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
//第二个拦截器 提前时间戳放入headers里
public class TimestampIntercepter implements Interceptor {
List<Event> events=new ArrayList<>();
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
Map<String,String> headers=event.getHeaders(); //取头部
String log=new String(event.getBody(), StandardCharsets.UTF_8); //取主体
JSONObject jsonObject = JSON.parseObject(log); //解析里面的东西
String ts = jsonObject.getString("timestamp");
headers.put("timestamp", ts); //放到里面
//取出日志内的时间,否则落盘时可能是系统时间
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
events.clear();
for (Event event : list) {
events.add(intercept(event));
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimestampIntercepter();
}
@Override
public void configure(Context context) {
}
}
}
编写flume运行文件
LogToKafka:
linux>vi log_to_kafka.conf
##为各组件命名
logToKafka.sources = logtokafkasource
logToKafka.channels = logkafkachannel
#描述source
logToKafka.sources.logtokafkasource.type = TAILDIR
logToKafka.sources.logtokafkasource.filegroups = f1
logToKafka.sources.logtokafkasource.filegroups.f1 = /root/tmp_data/log/appdemo.*
logToKafka.sources.logtokafkasource.positionFile = /root/tmp_data/log/taildir_position.json
logToKafka.sources.logtokafkasource.interceptors = i1
logToKafka.sources.logtokafkasource.interceptors.i1.type = flume.intercepter.JsonInterceptor$Builder
#filegroups 可以指定多个文件夹 positionFile 采集完后记录文件位置
#描述channel
logToKafka.channels.logkafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel
logToKafka.channels.logkafkachannel.kafka.bootstrap.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
logToKafka.channels.logkafkachannel.kafka.topic = appdemo_log
logToKafka.channels.logkafkachannel.parseAsFlumeEvent = false
#绑定source和channel以及sink和channel的关系
logToKafka.sources.logtokafkasource.channels = logkafkachannel
KafkaToHDFS:
linux>vi kafka_to_hdfs.conf
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
a1.sources.r1.kafka.topics=appdemo_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = flume.intercepter.TimestampIntercepter$Builder
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/tmp_data/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /root/tmp_data/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /data/log/appdemo_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## 控制输出文件是原生文件。
#a1.sinks.k1.hdfs.fileType = CompressedStream
#a1.sinks.k1.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
注意:运行kafka_to_hdfs.conf 需要提前在本地创好文件夹
a1.channels.c1.checkpointDir = /root/tmp_data/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /root/tmp_data/flume/data/behavior1/
这两项需要创建好文件夹
linux>mkdir -p /root/tmp_data/flume/checkpoint/behavior1
linux>mkdir -p /root/tmp_data/flume/data/behavior1/
注意:idea拦截器编辑好后 打包放入flume/lib中
linux>mv flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/install/flume/lib
运行flume进行logToKafka
linux>bin/flume-ng agent --conf conf --conf-file log_to_kafka.conf --name logToKafka -Dflume.root.logger=INFO,console
运行flume进行KafkaToHdfs
linux>bin/flume-ng agent --conf conf --conf-file kafka_to_hdfs.conf --name a1 -Dflume.root.logger=INFO,console
报错1:运行完后异常 hdfs无文件,需要重置kafka topic 偏移量否则不读或缺数据
旧版kafka重置偏移量:
linux>kafka-streams-application-reset.sh --zookeeper 192.168.58.201:2181,192.168.58.202:2181,192.168.58.203:2181 --bootstrap-servers 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092--application-id flume --input-topics appdemo_log
旧版kafka查看topic的记录的条数
linux>bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092- --topic appdemo_log -time -1 --offsets 1
新版kafka重置偏移量:
linux>kafka-consumer-groups.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --group flume --reset-offsets --topic appdemo_log --to-earliest --execute
#查看组
kafka-consumer-groups.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --list
#查看所有Topic
kafka-topics.sh --list --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
#查看Topic
linux>kafka-topics.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --describe --topic appdemo_log
#删除Topic
linux>kafka-topics.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --delete --topic appdemo_log
查看topic的记录的条数
linux>kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --topic appdemo_log --time -1
#消费消息(可能端口号与配置文件保持一致,或与发送端口保持一致)
linux>kafka-console-consumer.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --topic appdemo_log --from-beginning #加了--from-beginning 重头消费所有的消息
linux>kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic appdemo_log #不加--from-beginning 从最新的一条消息开始消费
#查询topic的offset的范围offset的最小值
linux>kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 -topic appdemo_log --time -2
#查询offset的最大值:
linux>bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 -topic appdemo_log --time -1
报错2:java.lang.OutOfMemoryError: Java heap space 内存不足
###解决flume内存不足 OOM 在flume-env.sh
linux>vi flume-env.sh
export JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
结果: