需求
1、在linux日志文件/data/log/moreInfoRes.log中一直会产生如下JSON数据:
{"id":"14943445328940974601","uid":"840717325115457536","lat":"53.530598","lnt":"-2.5620373","hots":0,"title":"0","status":"1","topicId":"0","end_time":"1494344570","watch_num":0,"share_num":"1","replay_url":null,"replay_num":0,"start_time":"1494344544","timestamp":1494344571,"type":"video_info"}
{"uid":"861848974414839801","nickname":"mick","usign":"","sex":1,"birthday":"","face":"","big_face":"","email":"abc@qq.com","mobile":"","reg_type":"102","last_login_time":"1494344580","reg_time":"1494344580","last_update_time":"1494344580","status":"5","is_verified":"0","verified_info":"","is_seller":"0","level":1,"exp":0,"anchor_level":0,"anchor_exp":0,"os":"android","timestamp":1494344580,"type":"user_info"}
{"send_id":"834688818270961664","good_id":"223","video_id":"14943443045138661356","gold":"10","timestamp":1494344574,"type":"gift_record"}
2、需要根据数据中的type字段分目录存储,,并且还要对type字段的值进行一定的处理,最终处理之后的数据需要存储到HDFS上的/moreInfoRes目录中。例如:
-
type:video_info 类型的数据需要存储到 /moreInfoRes/videoInfo 目录里面。
-
type:user_info 类型的数据需要存储到 /moreInfoRes/userInfo 目录里面。
-
type:gift_record 类型的数据需要存储到 /moreInfoRes/giftRecord 目录里面。
3、这边拦截器用 Search and Replace Interceptor + Regex Extractor Interceptor 可以实现,但是这边使用前者的话效率有点低,故采用 自定义Interceptor + Regex Extractor Interceptor 实现。
实现
鉴于此,可以使用 Exec Source + Custom Interceptor + Regex Extractor Interceptor + File Channel + HDFS Sink 来实现。官方文档如下:
Exec Source:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#exec-source
Custom Interceptor:
可参考其他 Interceptor 的实现
Regex Extractor Interceptor:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#regex-extractor-interceptor
File Channel:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#file-channel
HDFS Sink:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#hdfs-sink
创建工程
引入依赖
主要是 flume-ng-core 和 jackson 依赖,其他可不引入。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flume-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.4</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.alibaba</groupId>-->
<!-- <artifactId>fastjson</artifactId>-->
<!-- <version>2.0.25</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>cn.hutool</groupId>-->
<!-- <artifactId>hutool-core</artifactId>-->
<!-- <version>5.8.27</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
</dependencies>
</project>
编写 Custom Interceptor
package com.example.flumedemo.interceptor;
import com.example.flumedemo.constant.OptType;
import com.example.flumedemo.util.JsonUtil;
import com.example.flumedemo.util.NamingCaseUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.SearchAndReplaceInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* 自定义拦截器,
* 将字段的值按照指定操作转换,得到的结果替换原来的值。
*
* @author liaorj
* @date 2024/11/13
*/
public class MyInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(SearchAndReplaceInterceptor.class);
/**
* json中需要处理的字段
*/
private final String jsonField;
/**
* 需要对字段的值进行什么操作
*/
private final String optType;
private final Charset charset;
public MyInterceptor(String jsonField, String optType, Charset charset) {
this.jsonField = jsonField;
this.optType = optType;
this.charset = charset;
}
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
try {
logger.info("----event={}", JsonUtil.toJson(event));
if (null == event || ArrayUtils.isEmpty(event.getBody())) {
logger.info("----event or body is null");
return event;
}
//将body转为map对象
Map<String, Object> jsonObject;
jsonObject = JsonUtil.toBean(event.getBody(), new TypeReference<Map<String, Object>>() {});
logger.info("----jsonObject={}", jsonObject);
//获取jsonField的值
logger.info("----jsonField={}", this.jsonField);
Object value = jsonObject.get(this.jsonField);
logger.info("----jsonFieldValue={}", value);
if (jsonObject.containsKey(this.jsonField)) {
logger.info("----containsKey");
} else {
logger.info("----not containsKey");
}
logger.info("----jsonObject.keySet={}", jsonObject.keySet());
if (jsonObject.keySet().contains(this.jsonField)) {
logger.info("----keySet containsKey");
} else {
logger.info("----keySet not containsKey");
}
//如果含有下划线
if (null != value) {
String newValue = null;
logger.info("-----opt={},code={}", this.optType, OptType.toCamelCase.getCode());
logger.info("----opt equals={}", OptType.toCamelCase.getCode().equals(this.optType));
if (OptType.toCamelCase.getCode().equals(this.optType)) {
//将下划线字符串转为驼峰
newValue = NamingCaseUtil.toCamelCase(value.toString());
logger.info("----newValue={}", newValue);
} else if (OptType.toKebabCase.getCode().equals(this.optType)) {
//hutool和fastjson的类本地跑可以,上环境却用不了,执行到相关类就没有日志了,可能包冲突了,暂不用。
// newValue = NamingCase.toKebabCase(value.toString());
} else if (OptType.toPascalCase.getCode().equals(this.optType)) {
// newValue = NamingCase.toPascalCase(value.toString());
} else if (OptType.toUnderlineCase.getCode().equals(this.optType)) {
newValue = NamingCaseUtil.toUnderlineCase2(value.toString());
} else {
newValue = value.toString();
}
//替换原来的值
logger.info("----newValue2={}", newValue);
jsonObject.put(this.jsonField, newValue);
logger.info("----jsonObject2={}", jsonObject);
event.setBody(JsonUtil.toJson(jsonObject).getBytes(charset));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
Iterator var2 = events.iterator();
while (var2.hasNext()) {
Event event = (Event) var2.next();
this.intercept(event);
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
private String jsonField;
private String optType;
private Charset charset;
public Builder() {
this.charset = Charsets.UTF_8;
}
@Override
public void configure(Context context) {
String jsonObjField = context.getString("jsonField");
Preconditions.checkArgument(!StringUtils.isEmpty(jsonObjField), "Must supply a valid jsonField (may not be empty)");
this.jsonField = jsonObjField;
String optType = context.getString("optType");
Preconditions.checkArgument(!StringUtils.isEmpty(optType), "Must supply a valid opt (may not be empty)");
this.optType = optType;
if (context.containsKey("charset")) {
// May throw IllegalArgumentException for unsupported charsets.
charset = Charset.forName(context.getString("charset"));
}
}
@Override
public Interceptor build() {
Preconditions.checkNotNull(this.jsonField, "jsonField required");
Preconditions.checkNotNull(this.optType, "opt required");
return new MyInterceptor(this.jsonField, this.optType, this.charset);
}
}
/*public static void main(String[] args) {
String str = "{\"send_id\":\"834688818270961664\",\"good_id\":\"223\",\"video_id\":\"14943443045138661356\",\"gold\":\"10\",\"timestamp\":1494344574,\"type\":\"gift_record\"}";
MyInterceptor myInterceptor = new MyInterceptor("type", "toCamelCase", Charsets.UTF_8);
Event event = new SimpleEvent();
event.setBody(str.getBytes(StandardCharsets.UTF_8));
Event result = myInterceptor.intercept(event);
System.out.println(JsonUtil.toJson(JsonUtil.toBean(result.getBody(), new TypeReference<Map<String, Object>>() {})));
}*/
}
package com.example.flumedemo.constant;
/**
* @author liaorj
* @date 2024/11/13
*/
public enum OptType {
//将下划线方式命名的字符串转换为驼峰式。
toCamelCase("toCamelCase"),
//将驼峰式命名的字符串转换为短横连接方式。
toKebabCase("toKebabCase"),
//将下划线方式命名的字符串转换为帕斯卡式。
toPascalCase("toPascalCase"),
//将驼峰式命名的字符串转换为下划线方式
toUnderlineCase("toUnderlineCase");
private String code;
OptType(String code) {
this.code = code;
}
public String getCode() {
return code;
}
}
package com.example.flumedemo.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ser.FilterProvider;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import java.io.IOException;
/**
* @author liaorj
* @date 2024/10/24
*/
public class JsonUtil {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static byte[] toBytes(Object object) {
try {
return OBJECT_MAPPER.writeValueAsBytes(object);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static String toJson(Object object) {
try {
return OBJECT_MAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
/**
* 排除字段,敏感字段或太长的字段不显示:身份证、手机号、邮箱、密码等
* 参考:https://www.baeldung-cn.com/jackson-ignore-properties-on-serialization
*
* @param object
* @param excludeProperties
* @return
*/
public static String toJson(Object object, String[] excludeProperties) {
try {
SimpleBeanPropertyFilter theFilter = SimpleBeanPropertyFilter
.serializeAllExcept(excludeProperties);
FilterProvider filterProvider = new SimpleFilterProvider()
.addFilter("myFilter", theFilter);
OBJECT_MAPPER.setFilterProvider(filterProvider);
return OBJECT_MAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static String toPrettyJson(Object object) {
try {
return OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static <T> T toBean(String json, Class<T> clazz) {
try {
return OBJECT_MAPPER.readValue(json, clazz);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static <T> T toBean(byte[] json, Class<T> clazz) {
try {
return OBJECT_MAPPER.readValue(json, clazz);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static <T> T toBean(byte[] json, TypeReference<T> typeReference) {
try {
return OBJECT_MAPPER.readValue(json, typeReference);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
package com.example.flumedemo.util;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 字符串转化工具类
* @author liaorj
* @date 2024/11/14
*/
public class NamingCaseUtil {
private static Pattern linePattern = Pattern.compile("_(\\w)");
/**
* 下划线转驼峰
*/
public static String toCamelCase(String str) {
str = str.toLowerCase();
Matcher matcher = linePattern.matcher(str);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matcher.appendReplacement(sb, matcher.group(1).toUpperCase());
}
matcher.appendTail(sb);
return sb.toString();
}
/**
* 驼峰转下划线(简单写法,效率低于{@link #toUnderlineCase2(String)})
*/
public static String toUnderlineCase(String str) {
return str.replaceAll("[A-Z]", "_$0").toLowerCase();
}
private static Pattern humpPattern = Pattern.compile("[A-Z]");
/**
* 驼峰转下划线,效率比上面高
*/
public static String toUnderlineCase2(String str) {
Matcher matcher = humpPattern.matcher(str);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
}
matcher.appendTail(sb);
return sb.toString();
}
}
打包
打包前可以使用MyInterceptor类的main函数测试下。
mvn clean
mvn package
打包好后,需要把当前jar包上传到linux上的flume目录下的lib目录中。
配置文件
然后在flume目录下的conf目录下创建配置文件:file-to-hdfs-customInterceptor.conf,内容如下,注意自定义拦截器所在包和HDFS主机ip要修改成自己的。
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/moreInfoRes.log
# Describe/configure the source interceptors
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.example.flumedemo.interceptor.MyInterceptor$Builder
a1.sources.r1.interceptors.i1.jsonField = type
a1.sources.r1.interceptors.i1.optType = toCamelCase
a1.sources.r1.interceptors.i2.type = regex_extractor
a1.sources.r1.interceptors.i2.regex = "type":"(\\w+)"
a1.sources.r1.interceptors.i2.serializers = s1
a1.sources.r1.interceptors.i2.serializers.s1.name = logType
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.163.130:9000/moreInfoRes/%{logType}
a1.sinks.k1.hdfs.filePrefix = data
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/moreInfoRes/checkpointDir
a1.channels.c1.dataDirs = /data/moreInfoRes/dataDirs
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume
切换到flume目录,执行:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-hdfs-customInterceptor.conf -Dflume.root.logger=INFO,console
测试结果
执行 hdfs dfs -ls -R / 命令查看 HDFS上 的 /moreInfoRes 目录文件信息,可以看到处理成功了: