Flume1.9.0自定义拦截器

news/2024/11/16 18:42:39 标签: flume, 大数据

需求

1、在linux日志文件/data/log/moreInfoRes.log中一直会产生如下JSON数据:

{"id":"14943445328940974601","uid":"840717325115457536","lat":"53.530598","lnt":"-2.5620373","hots":0,"title":"0","status":"1","topicId":"0","end_time":"1494344570","watch_num":0,"share_num":"1","replay_url":null,"replay_num":0,"start_time":"1494344544","timestamp":1494344571,"type":"video_info"}
{"uid":"861848974414839801","nickname":"mick","usign":"","sex":1,"birthday":"","face":"","big_face":"","email":"abc@qq.com","mobile":"","reg_type":"102","last_login_time":"1494344580","reg_time":"1494344580","last_update_time":"1494344580","status":"5","is_verified":"0","verified_info":"","is_seller":"0","level":1,"exp":0,"anchor_level":0,"anchor_exp":0,"os":"android","timestamp":1494344580,"type":"user_info"}
{"send_id":"834688818270961664","good_id":"223","video_id":"14943443045138661356","gold":"10","timestamp":1494344574,"type":"gift_record"}

 2、需要根据数据中的type字段分目录存储,,并且还要对type字段的值进行一定的处理,最终处理之后的数据需要存储到HDFS上的/moreInfoRes目录中。例如:

  • ​type:video_info 类型的数据需要存储到 /moreInfoRes/videoInfo 目录里面。

  • ​type:user_info 类型的数据需要存储到 /moreInfoRes/userInfo 目录里面。

  • ​type:gift_record 类型的数据需要存储到 /moreInfoRes/giftRecord 目录里面。

3、这边拦截器用 Search and Replace Interceptor + Regex Extractor Interceptor 可以实现,但是这边使用前者的话效率有点低,故采用 自定义Interceptor + Regex Extractor Interceptor 实现。

实现

鉴于此,可以使用 Exec Source + Custom Interceptor + Regex Extractor Interceptor + File Channel + HDFS Sink 来实现。官方文档如下:

Exec Source:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#exec-source

Custom Interceptor:
可参考其他 Interceptor 的实现

Regex Extractor Interceptor:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#regex-extractor-interceptor

File Channel:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#file-channel

HDFS Sink:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#hdfs-sink

创建工程

引入依赖

主要是 flume-ng-core 和 jackson 依赖,其他可不引入。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>flume-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.15.4</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>com.alibaba</groupId>-->
<!--            <artifactId>fastjson</artifactId>-->
<!--            <version>2.0.25</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>cn.hutool</groupId>-->
<!--            <artifactId>hutool-core</artifactId>-->
<!--            <version>5.8.27</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
 
    </dependencies>
 
</project>

编写 Custom Interceptor 

package com.example.flumedemo.interceptor;

import com.example.flumedemo.constant.OptType;
import com.example.flumedemo.util.JsonUtil;
import com.example.flumedemo.util.NamingCaseUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.SearchAndReplaceInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * 自定义拦截器,
 * 将字段的值按照指定操作转换,得到的结果替换原来的值。
 *
 * @author liaorj
 * @date 2024/11/13
 */
public class MyInterceptor implements Interceptor {
    private static final Logger logger = LoggerFactory.getLogger(SearchAndReplaceInterceptor.class);

    /**
     * json中需要处理的字段
     */
    private final String jsonField;
    /**
     * 需要对字段的值进行什么操作
     */
    private final String optType;

    private final Charset charset;

    public MyInterceptor(String jsonField, String optType, Charset charset) {
        this.jsonField = jsonField;
        this.optType = optType;
        this.charset = charset;
    }

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        try {
            logger.info("----event={}", JsonUtil.toJson(event));
            if (null == event || ArrayUtils.isEmpty(event.getBody())) {
                logger.info("----event or body is null");
                return event;
            }
            //将body转为map对象
            Map<String, Object> jsonObject;
            jsonObject = JsonUtil.toBean(event.getBody(), new TypeReference<Map<String, Object>>() {});
            logger.info("----jsonObject={}", jsonObject);

            //获取jsonField的值
            logger.info("----jsonField={}", this.jsonField);
            Object value = jsonObject.get(this.jsonField);
            logger.info("----jsonFieldValue={}", value);
            if (jsonObject.containsKey(this.jsonField)) {
                logger.info("----containsKey");
            } else {
                logger.info("----not containsKey");
            }
            logger.info("----jsonObject.keySet={}", jsonObject.keySet());
            if (jsonObject.keySet().contains(this.jsonField)) {
                logger.info("----keySet containsKey");
            } else {
                logger.info("----keySet not containsKey");
            }
            //如果含有下划线
            if (null != value) {
                String newValue = null;
                logger.info("-----opt={},code={}", this.optType, OptType.toCamelCase.getCode());
                logger.info("----opt equals={}", OptType.toCamelCase.getCode().equals(this.optType));
                if (OptType.toCamelCase.getCode().equals(this.optType)) {
                    //将下划线字符串转为驼峰
                    newValue = NamingCaseUtil.toCamelCase(value.toString());
                    logger.info("----newValue={}", newValue);
                } else if (OptType.toKebabCase.getCode().equals(this.optType)) {
                    //hutool和fastjson的类本地跑可以,上环境却用不了,执行到相关类就没有日志了,可能包冲突了,暂不用。
//                    newValue = NamingCase.toKebabCase(value.toString());
                } else if (OptType.toPascalCase.getCode().equals(this.optType)) {
//                    newValue = NamingCase.toPascalCase(value.toString());
                } else if (OptType.toUnderlineCase.getCode().equals(this.optType)) {
                    newValue = NamingCaseUtil.toUnderlineCase2(value.toString());
                } else {
                    newValue = value.toString();
                }
                //替换原来的值
                logger.info("----newValue2={}", newValue);
                jsonObject.put(this.jsonField, newValue);
                logger.info("----jsonObject2={}", jsonObject);
                event.setBody(JsonUtil.toJson(jsonObject).getBytes(charset));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        Iterator var2 = events.iterator();

        while (var2.hasNext()) {
            Event event = (Event) var2.next();
            this.intercept(event);
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        private String jsonField;
        private String optType;
        private Charset charset;

        public Builder() {
            this.charset = Charsets.UTF_8;
        }

        @Override
        public void configure(Context context) {
            String jsonObjField = context.getString("jsonField");
            Preconditions.checkArgument(!StringUtils.isEmpty(jsonObjField), "Must supply a valid jsonField (may not be empty)");
            this.jsonField = jsonObjField;

            String optType = context.getString("optType");
            Preconditions.checkArgument(!StringUtils.isEmpty(optType), "Must supply a valid opt (may not be empty)");
            this.optType = optType;

            if (context.containsKey("charset")) {
                // May throw IllegalArgumentException for unsupported charsets.
                charset = Charset.forName(context.getString("charset"));
            }
        }

        @Override
        public Interceptor build() {
            Preconditions.checkNotNull(this.jsonField, "jsonField required");
            Preconditions.checkNotNull(this.optType, "opt required");
            return new MyInterceptor(this.jsonField, this.optType, this.charset);
        }
    }

    /*public static void main(String[] args) {
        String str = "{\"send_id\":\"834688818270961664\",\"good_id\":\"223\",\"video_id\":\"14943443045138661356\",\"gold\":\"10\",\"timestamp\":1494344574,\"type\":\"gift_record\"}";

        MyInterceptor myInterceptor = new MyInterceptor("type", "toCamelCase", Charsets.UTF_8);
        Event event = new SimpleEvent();
        event.setBody(str.getBytes(StandardCharsets.UTF_8));
        Event result = myInterceptor.intercept(event);

        System.out.println(JsonUtil.toJson(JsonUtil.toBean(result.getBody(), new TypeReference<Map<String, Object>>() {})));
    }*/
}

package com.example.flumedemo.constant;

/**
 * @author liaorj
 * @date 2024/11/13
 */
public enum OptType {
    //将下划线方式命名的字符串转换为驼峰式。
    toCamelCase("toCamelCase"),
    //将驼峰式命名的字符串转换为短横连接方式。
    toKebabCase("toKebabCase"),
    //将下划线方式命名的字符串转换为帕斯卡式。
    toPascalCase("toPascalCase"),
    //将驼峰式命名的字符串转换为下划线方式
    toUnderlineCase("toUnderlineCase");

    private String code;

    OptType(String code) {
        this.code = code;
    }

    public String getCode() {
        return code;
    }
}
package com.example.flumedemo.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ser.FilterProvider;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;

import java.io.IOException;

/**
 * @author liaorj
 * @date 2024/10/24
 */
public class JsonUtil {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static byte[] toBytes(Object object) {
        try {
            return OBJECT_MAPPER.writeValueAsBytes(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static String toJson(Object object) {
        try {
            return OBJECT_MAPPER.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 排除字段,敏感字段或太长的字段不显示:身份证、手机号、邮箱、密码等
     * 参考:https://www.baeldung-cn.com/jackson-ignore-properties-on-serialization
     *
     * @param object
     * @param excludeProperties
     * @return
     */
    public static String toJson(Object object, String[] excludeProperties) {
        try {
            SimpleBeanPropertyFilter theFilter = SimpleBeanPropertyFilter
                    .serializeAllExcept(excludeProperties);
            FilterProvider filterProvider = new SimpleFilterProvider()
                    .addFilter("myFilter", theFilter);
            OBJECT_MAPPER.setFilterProvider(filterProvider);
            return OBJECT_MAPPER.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static String toPrettyJson(Object object) {
        try {
            return OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(String json, Class<T> clazz) {
        try {
            return OBJECT_MAPPER.readValue(json, clazz);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(byte[] json, Class<T> clazz) {
        try {
            return OBJECT_MAPPER.readValue(json, clazz);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(byte[] json, TypeReference<T> typeReference) {
        try {
            return OBJECT_MAPPER.readValue(json, typeReference);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
package com.example.flumedemo.util;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 字符串转化工具类
 * @author liaorj
 * @date 2024/11/14
 */
public class NamingCaseUtil {

    private static Pattern linePattern = Pattern.compile("_(\\w)");

    /**
     * 下划线转驼峰
     */
    public static String toCamelCase(String str) {
        str = str.toLowerCase();
        Matcher matcher = linePattern.matcher(str);
        StringBuffer sb = new StringBuffer();
        while (matcher.find()) {
            matcher.appendReplacement(sb, matcher.group(1).toUpperCase());
        }
        matcher.appendTail(sb);
        return sb.toString();
    }

    /**
     * 驼峰转下划线(简单写法,效率低于{@link #toUnderlineCase2(String)})
     */
    public static String toUnderlineCase(String str) {
        return str.replaceAll("[A-Z]", "_$0").toLowerCase();
    }

    private static Pattern humpPattern = Pattern.compile("[A-Z]");

    /**
     * 驼峰转下划线,效率比上面高
     */
    public static String toUnderlineCase2(String str) {
        Matcher matcher = humpPattern.matcher(str);
        StringBuffer sb = new StringBuffer();
        while (matcher.find()) {
            matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
        }
        matcher.appendTail(sb);
        return sb.toString();
    }
}

打包

打包前可以使用MyInterceptor类的main函数测试下。
mvn clean
mvn package
打包好后,需要把当前jar包上传到linux上的flume目录下的lib目录中。

配置文件

然后在flume目录下的conf目录下创建配置文件:file-to-hdfs-customInterceptor.conf,内容如下,注意自定义拦截器所在包和HDFS主机ip要修改成自己的。

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/moreInfoRes.log

# Describe/configure the source interceptors
a1.sources.r1.interceptors = i1 i2 
a1.sources.r1.interceptors.i1.type = com.example.flumedemo.interceptor.MyInterceptor$Builder
a1.sources.r1.interceptors.i1.jsonField = type
a1.sources.r1.interceptors.i1.optType = toCamelCase

a1.sources.r1.interceptors.i2.type = regex_extractor
a1.sources.r1.interceptors.i2.regex = "type":"(\\w+)"
a1.sources.r1.interceptors.i2.serializers = s1
a1.sources.r1.interceptors.i2.serializers.s1.name = logType

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.163.130:9000/moreInfoRes/%{logType}
a1.sinks.k1.hdfs.filePrefix = data
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/moreInfoRes/checkpointDir
a1.channels.c1.dataDirs = /data/moreInfoRes/dataDirs

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume

切换到flume目录,执行:

bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-hdfs-customInterceptor.conf -Dflume.root.logger=INFO,console

测试结果

执行 hdfs dfs -ls -R / 命令查看 HDFS上 的 /moreInfoRes 目录文件信息,可以看到处理成功了:

 


http://www.niftyadmin.cn/n/5754521.html

相关文章

【阅读记录-章节2】Build a Large Language Model (From Scratch)

目录 2.Working with text data2.1 Understanding word embeddings2.2 Tokenizing text通过一个简单的实验来理解文本的词元化概念关键概念 2.3 Converting tokens into token IDs实现分词器类&#xff08;Tokenizer Class&#xff09;应用分词器测试文本的编码与解码通过分词器…

AI风向标|算力与通信的完美融合,SRM6690解锁端侧AI的智能密码

当前&#xff0c;5G技术已经成为推动数字经济和实体经济深度融合的关键驱动力&#xff0c;进入5G发展的下半场&#xff0c;5G与AI的融合正推动诸多行业的数字化转型和创新发展&#xff0c;终端侧AI和端云混合式AI将广泛应用于各类消费终端和各行各业。 在推动5G和AI与各行业场…

【数据库取证】快速从服务器镜像文件中获取后台隐藏数据

文章关键词&#xff1a;电子数据取证、数据库取证、电子物证、云取证、手机取证、计算机取证、服务器取证 小编最近做了很多鉴定案件和参加相关电子数据取证比武赛&#xff0c;经常涉及到服务器数据库分析。现在分享一下技术方案&#xff0c;供各位在工作中和取证赛事中取得好成…

0 -vscode搭建python环境教程参考(windows)

引用一篇非常详细的vscode搭建python环境教程 链接&#xff1a;vscode安装以及配置Python基本环境 以下是VSCode和PyCharm的对比 个人更建议使用VSCode Visual Studio Code (VSCode) Visual Studio Code 是由微软开发的一款免费、开源的轻量级代码编辑器。它支持多种编程语…

[Mysql] Mysql的多表查询----多表关系(上)

1、介绍 在实际开发中&#xff0c;一个项目通常需要很多张表才能完成。例如&#xff1a;一个商城项目就需要分类表、商品表、订单表等多张表。且这些表的数据之间存在一定的关系。 2、多表关系 Mysql多表之间的关系可以概括为&#xff1a;一对一、一对多/多对一、多对多关系…

智慧农业的前世今生

智慧农业是将现代信息技术与传统农业相结合的新型农业生产方式&#xff0c;其发展历程如下&#xff1a; 20世纪70年代末&#xff0c;以美国为代表的欧美国家率先开始农业信息化、智能化的应用研究&#xff0c;以农业专家系统为代表的农业信息化应用开始在农业生产领域萌芽。我…

JVM回收机制与算法

jvm基本结构 JVM&#xff08;Java虚拟机&#xff09;是Java程序可以跨平台运行的关键。它负责将Java字节码转换为特定平台的机器码&#xff0c;使Java程序能够在不同的硬件和操作系统上运行而无需重新编译。JVM的基本结构主要包括以下几个核心部分&#xff1a; ‌类加载器&…

鸿蒙 管理应用拥有的状态有Localstorage、Appstorage、PersistentStorage、Environment、用户首选项、持久化方案。

LocalStorage&#xff1a; LocalStorage是页面级UI状态存储&#xff0c;通过Entry装饰器接收的参数可以在页面内共享同一个LocalStorage实例。支持UIAbility实例内多个页面间状态共享。 // 存储数据 localStorage.setItem(key, value); // 获取数据 const value localStorage…