海量日志采集工具——Flume

文章目录

  • 海量日志采集工具——Flume
    • 一、Flume的简介
      • 1.1、大数据处理流程
      • 1.2、Flume的简介
      • 1.3、版本区别
    • 二、Flume的体系结构
      • 2.1、体系结构简介
      • 2.2、组件及其作用
    • 三、Flume的安装
    • 四、Flume的部署
      • 4.1、数据模型
        • 4.1.1、单一数据模型
        • 4.1.2、多数据流模型
        • 4.1.3、小总结
      • 4.2、配置介绍
        • 4.2.1、定义组件名称
        • 4.2.2、配置组件属性
      • 4.3、常用的source和sink种类
        • 4.3.1、常用的flume sources
        • 4.3.2、常用的flume channels
        • 4.3.3、常用的flume sinks
    • 五、案例演示
      • 5.1、案例演示:avro+memory+logger
      • 5.2、实时采集(监听文件):exec+memory+hdfs
      • 5.3、实时采集(监听目录):spool+ mem+logger
      • 5.4、案例演示:http+ mem+logger
    • 六、拦截器的使用
      • 6.1、常用拦截器:
      • 6.2、案例演示:Syslogtcp+mem+hdfs
      • 6.3、案例演示:regex+Syslogtcp+mem+hdfs
      • 6.4、自定义拦截器
    • 七、选择器的使用
      • 7.1、说明
      • 7.2、案例演示:replicating selector
      • 7.3、案例演示:Multiplexing selector
      • 7.4、自定义拦截器

海量日志采集工具——Flume

一、Flume的简介

1.1、大数据处理流程

在企业中,大数据的处理流程一般是:

  1. 数据采集

  2. 数据存储

  3. 数据清洗

  4. 数据分析

  5. 数据展示

参考下图:

海量日志采集工具——Flume

在数据采集和搜集的工具中,Flume框架占有一定的市场份量。

1.2、Flume的简介

Flume 是一种分布式、可靠且可用的服务,用于高效收集、聚合和移动大量日志数据。它具有基于流数据流的简单灵活的架构。它具有可调整的可靠性机制以及许多故障转移和恢复机制,具有健壮性和容错性。它使用允许在线分析应用程序的简单可扩展数据模型。

参考官网: http://flume.apache.org/

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

flume 最开始是由 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。但随着 flume 功能的扩展,flume的代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点渐渐暴露出来,尤其是在发行版本 0.9.4中,日志传输不稳定的现象尤为严重。

为了解决这些问题,2011 年 10 月 22 号,cloudera 对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,并将 Flume 纳入 apache 旗下,从cloudera Flume 改名为 Apache Flume。

1.3、版本区别

​ 为了与之前版本区分开,重构后的版本统称为 Flume NG(next generation),重构前的版本被统称为 Flume OG(original generation),Flume目前只有Linux系统的启动脚本,没有Windows环境的启动脚本。

二、Flume的体系结构

2.1、体系结构简介

Flume 运行的核心是 Agent。Flume是以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方。如下图所示:

海量日志采集工具——Flume

2.2、组件及其作用

- Client:
	客户端,Client生产数据,运行在一个独立的线程中

- Event: 
	一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)

- Flow:
	Event从源点到达目的点的迁移的抽象。

- Agent: 
	一个独立的Flume进程,运行在JVM中,包含组件Source、 Channel、 Sink。
	每台机器运行一个agent,但是一个agent中可以包含多个sources和sinks。

- Source: 
	数据收集组件。source从Client收集数据,传递给Channel

- Channel: 
	管道,负责接收source端的数据,然后将数据推送到sink端。

- Sink: 
	负责从channel端拉取数据,并将其推送到持久化系统或者是下一个Agent。
	
- selector:
	选择器,作用于source端,然后决定数据发往哪个目标。

- interceptor:
	拦截器,flume允许使用拦截器拦截数据。允许使用拦截器链,作用于source和sink阶段。	

三、Flume的安装

3.1、安装和配置环境变量

3.1.1、准备软件包

将apache-flume-1.8.0-bin.tar.gz 上传到linux系统中的/root/soft/目录中

3.1.2、解压软件包

[root@master soft]# pwd
/root/soft
[root@master soft]# tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local/

3.1.3、更名操作

[root@master soft]# cd /usr/local/
[root@master local]# mv apache-flume-1.8.0-bin/ flume

3.1.4、配置环境变量

[root@master local]# vi /etc/profile
........省略..........
export FLUME_HOME=/usr/local/flume
export PATH=$FLUME_HOME/bin:$PATH

# 加载环境变量
[root@master apps]# source /etc/profile

3.1.5、验证环境变量

[root@master local]# flume-ng version
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
From source with checksum fbb44c8c8fb63a49be0a59e27316833d

3.2、配置文件

[root@master local]# cd flume/conf/
[root@master conf]# ll	  #查看里面是否有一个flume-env.sh.template文件
[root@master conf]# cp flume-env.sh.template flume-env.sh
[root@master conf]# vim flume-env.sh
........省略..........
export JAVA_HOME=/usr/local/jdk
........省略..........

四、Flume的部署

4.1、数据模型

  • 单一数据模型
  • 多数据流模型

4.1.1、单一数据模型

在单个 Agent 内由单个 Source, Channel, Sink 建立一个单一的数据流模型,如下图所示,整个数据流为

Web Server --> Source --> Channel --> Sink --> HDFS。

海量日志采集工具——Flume

4.1.2、多数据流模型

**1)**多 Agent 串行传输数据流模型

海量日志采集工具——Flume

**2)**多 Agent 汇聚数据流模型

海量日志采集工具——Flume

**3)**单 Agent 多路数据流模型

海量日志采集工具——Flume

**4)**Sinkgroups 数据流模型

海量日志采集工具——Flume

4.1.3、小总结

在flume提供的数据流模型中,几个原则很重要。

Source--> Channel
  1.单个Source组件可以和多个Channel组合建立数据流,既可以replicating 和 multiplexing。
  2.多个Sources可以写入单个 Channel

Channel-->Sink
  1.多个Sinks又可以组合成Sinkgroups从Channel中获取数据,既可以loadbalancing和failover机制。
  2.多个Sinks也可以从单个Channel中取数据。
  3.单个Sink只能从单个Channel中取数据

根据上述 5 个原则,你可以设计出满足你需求的数据流模型。

4.2、配置介绍

4.2.1、定义组件名称

要定义单个代理中的流,您需要通过通道链接源和接收器。您需要列出给定代理的源,接收器和通道,然后将源和接收器指向一个通道。一个源实例可以指定多个通道,但是一个接收器实例只能指定一个通道。格式如下:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

案例如下:

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1

# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1

# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

4.2.2、配置组件属性

# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>

# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>

# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

案例如下:

agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1

# set channel for sources, sinks

# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000

# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100

# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata

#...

4.3、常用的source和sink种类

4.3.1、常用的flume sources

# Avro source:
	avro
# Syslog TCP source:
	syslogtcp
# Syslog UDP Source:
	syslogudp
# HTTP Source:
	http	
# Exec source:
	exec
# JMS source:
	jms
# Thrift source:
	thrift	
# Spooling directory source:
	spooldir
# Kafka source:
	org.apache.flume.source.kafka,KafkaSource
.....	

4.3.2、常用的flume channels

# Memory Channel
	memory
# JDBC Channel
	jdbc
# Kafka Channel
	org.apache.flume.channel.kafka.KafkaChannel
# File Channel
	file

4.3.3、常用的flume sinks

# HDFS Sink
	hdfs
# HIVE Sink
	hive
# Logger Sink
	logger
# Avro Sink
	avro
# Kafka Sink
	org.apache.flume.sink.kafka.KafkaSink
# Hbase Sink
	hbase

五、案例演示

配置可参考:https://flume.apache.org/releases/content/1.10.0/FlumeUserGuide.html

5.1、案例演示:avro+memory+logger

Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger

5.1.1、编写采集方案

[root@master flume]# mkdir flumeconf
[root@master flume]# cd flumeconf
[root@master flumeconf]# vi avro-logger.conf
#定义各个组件的名字
a1.sources=avro-sour1
a1.channels=mem-chan1
a1.sinks=logger-sink1

#定义sources组件的相关属性
a1.sources.avro-sour1.type=avro
a1.sources.avro-sour1.bind=master
a1.sources.avro-sour1.port=9999

#定义channels组件的相关属性
a1.channels.mem-chan1.type=memory

#定义sinks组件的相关属性
a1.sinks.logger-sink1.type=logger
a1.sinks.logger-sink1.maxBytesToLog=100

#组件之间进行绑定
a1.sources.avro-sour1.channels=mem-chan1
a1.sinks.logger-sink1.channel=mem-chan1

5.1.2、启动Agent

[root@master flumeconf]# flume-ng agent -c ../conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

5.1.3、测试数据

另起一个窗口测试

[root@master ~]# mkdir flumedata
[root@master ~]# cd flumedata/
[root@master flumedata]#
[root@master flumedata]# date >> test.data
[root@master flumedata]# cat test.data
20191121日 星期四 21:22:36 CST
[root@master flumedata]# ping master >> test.data
[root@master flumedata]# cat test.data
....省略....
[root@master flumedata]# flume-ng avro-client -c /usr/local/flume-1.6.0/conf/ -H master -p 9999 -F ./test.dat 

5.2、实时采集(监听文件):exec+memory+hdfs

Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容

memory:传输数据的Channel为Memory

hdfs 是输出目标为Hdfs

5.2.1、配置方案

[root@master flumeconf]# vim exec-hdfs.conf
# 定义sources的属性
a1.sources=r1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /root/flumedata/test.data

# 定义sinks的属性
a1.sinks=k1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://master:8020/flume/tailout/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix=events
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=second
a1.sinks.k1.hdfs.rollInterval=3
a1.sinks.k1.hdfs.rollSize=20
a1.sinks.k1.hdfs.rollCount=5
a1.sinks.k1.hdfs.batchSize=1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.fileType=DataStream

# 定义channels的属性
a1.channels
channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

# 组件之间的绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

5.2.2、启动Agent

[root@master flumeconf]# flume-ng agent -c ../conf -f ./exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

5.2.3、测试数据

[root@master flumedata]# ping master >> test.data

5.3、实时采集(监听目录):spool+ mem+logger

spool: Source来源于目录,有文件进入目录就摄取。

mem:通过内存来传输数据

logger:是传送数据到日志

5.3.1、配置方案

[root@master flumeconf]# vi spool-logger.conf
a1.sources = r1  
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir = /home/flume/spool
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.deletePolicy=never
a1.sources.r1.fileHeader=false
a1.sources.r1.fileHeaderKey=file
a1.sources.r1.basenameHeader=false
a1.sources.r1.basenameHeaderKey=basename
a1.sources.r1.batchSize=100
a1.sources.r1.inputCharset=UTF-8
a1.sources.r1.bufferMaxLines=1000

a1.channels.c1.type=memory

a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=100

a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

5.3.2、启动agent

[root@master flumeconf]# flume-ng agent -c ../conf -f ./spool-logger.conf -n a1 -Dflume.root.logger=INFO,console

5.3.3、测试

[root@master ~]# for i in `seq 1 10`; do echo $i >> /home/flume/spool/$i;done

5.4、案例演示:http+ mem+logger

http: 表示数据来源是http网络协议,一般接收的请求为get或post请求. 所有的http请求会通过插件格式的Handle转化为一个flume的Event数据.

mem:表示用内存传输通道

logger:表示输出格式为Logger格式

5.4.1、配置方案

[root@master flumeconf]# vi http-logger.conf
a1.sources = r1  
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type=http
a1.sources.r1.bind = master
a1.sources.r1.port = 6666
a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

a1.channels.c1.type=memory

a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=16

a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

5.4.2、启动agent的服务

[root@master ~]# flume-ng agent -c ../conf -f ./http-logger.conf -n a1 -Dflume.root.logger=INFO,console

5.4.3、测试

[root@master ~]# curl -X POST -d '[{"headers":{"name":"zhangsan","pwd":"123456"},"body":"this is my content"}]' http://master:6666

六、拦截器的使用

在Flume运行过程中,Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。拦截器有下面几个特点:

  • 拦截器需要实现org.apache.flume.interceptor.Interceptor接口。

  • 拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。

  • 拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。

  • 一个拦截器返回的事件列表被传递给链中的下一个拦截器。

  • 如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。

6.1、常用拦截器:

  1. Timestamp Interceptor :时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多
  2. Host Interceptor:主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)
  3. Static Interceptor:静态拦截器,用于在events header中加入一组静态的key和value。

6.2、案例演示:Syslogtcp+mem+hdfs

通过时间拦截器,数据源为SyslogTcp,传送的通道模式是FileChannel,最后输出的目的地为HDFS

6.2.1 配置方案:

[root@master flumeconf]# vi ts.conf
a1.sources = r1  
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type=syslogtcp
a1.sources.r1.host=master
a1.sources.r1.port=6666
a1.sources.r1.interceptors=i1 i2 i3
a1.sources.r1.interceptors.i1.type=timestamp
a1.sources.r1.interceptors.i1.preserveExisting=false
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.preserveExisting=false
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i2.hostHeader=hostname
a1.sources.r1.interceptors.i3.type=static
a1.sources.r1.interceptors.i3.preserveExisting=false
a1.sources.r1.interceptors.i3.key=hn
a1.sources.r1.interceptors.i3.value=master

a1.channels.c1.type=memory

a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

6.2.2 启动agent的服务:

[root@master flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console

6.2.3 测试:

[root@master ~]# echo "hello world hello qiangeng" | nc master 6666

6.3、案例演示:regex+Syslogtcp+mem+hdfs

拦截器为正则表达式拦截器, 数据源为Syslogtcp格式,传送通道为MemChannel,最后传送的目的地是HDFS

6.3.1 配置方案

[root@master flumeconf]# vi regex-ts.conf
a1.sources = r1  
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666

a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=regex_filter
#不要加引号包裹正则
a1.sources.r1.interceptors.i1.regex=^[0-9].*$  
a1.sources.r1.interceptors.i1.excludeEvents=false


a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000

a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

6.3.2、启动agent的服务:

[root@master flumeconf]# flume-ng agent -c ../conf -f ./regex-ts.conf -n a1 -Dflume.root.logger=INFO,console

6.3.3、测试:

[root@master ~]# echo "hello world hello qiangeng" | nc master 6666
[root@master ~]# echo "123123123 hello world hello qiangeng" | nc master 6666

6.4、自定义拦截器

6.4.0 需求:

为了提高Flume的扩展性,用户可以自己定义一个拦截器

将event的body中的数据, 以数字开头的, 存储为 hdfs://master:8020/flume/number.log		s1	
将event的body中的数据, 以字母开头的, 存储为 hdfs://master:8020/flume/character.log		 s2
将event的body中的数据, 其他的开头的, 存储为 hdfs://master:8020/flume/other.log			s3

6.4.1 pom.xml

可以参考/code/pom.xml

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.8.0</version>
    </dependency>
</dependencies>

6.4.2 代码

具体代码可以参考 code/MyInterceptor

public class MyInterceptor implements Interceptor {

    private static final String LOCATION_KEY = "location";
    private static final String LOCATION_NUM = "number";
    private static final String LOCATION_CHAR = "character";
    private static final String LOCATION_OTHER = "other";


    @Override
    public void initialize() {

    }

    /**
     * 当拦截到单个Event的时候调用
     * @param event 这个被拦截的Event
     * @return 这个拦截器发送出去的Event
     */
    @Override
    public Event intercept(Event event) {
        // 对拦截到的event的header中添加指定的键值对
        // 1. 获取拦截到的event的body
        byte[] body = event.getBody();
        // 2. 验证是否是以数字开头
        if (body[0] >= '0' && body[0] <= '9') {
            // 在这个event的头部添加一个键值对,来标识这个event需要放入到哪一个channel中
            event.getHeaders().put(LOCATION_KEY, LOCATION_NUM);
        }
        else if (body[0] >= 'a' && body[0] <= 'z' || body[0] >= 'A' && body[0] <= 'Z') {
            event.getHeaders().put(LOCATION_KEY, LOCATION_CHAR);
        }
        else {
            event.getHeaders().put(LOCATION_KEY, LOCATION_OTHER);
        }

        return event;
    }

    /**
     * 批量拦截到Event
     * @param list 存储Event的集合
     * @return 处理之后的Event集合
     */
    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    /**
     * 设计一个获取拦截器对象的类
     */
    public static class MyBuilder implements Builder {

        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

6.4.3 打包上传

使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下

6.4.4 编写方案

a1.sources=r1
a1.channels=c1 c2 c3
a1.sinks=s1 s2 s3
a1.sources.r1.channels=c1 c2 c3
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
a1.sinks.s3.channel=c3

# 设置source的属性
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=master
a1.sources.r1.port=12345

# 设置拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.qf.MyInterceptor$MyBuilder

# 设置选择器的属性
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=location
a1.sources.r1.selector.mapping.number=c1
a1.sources.r1.selector.mapping.character=c2
a1.sources.r1.selector.mapping.other=c3

# 设置channel的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c3.type=memory
a1.channels.c3.capacity=1000

# 设置sink的属性
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=regex
a1.sinks.s1.hdfs.rollInterval=0
a1.sinks.s1.hdfs.rollSize=102400
a1.sinks.s1.hdfs.rollCount=30
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text

a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/customInterceptor/s2/%Y-%m-%d-%H
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sinks.s2.hdfs.filePrefix=regex
a1.sinks.s2.hdfs.rollInterval=0
a1.sinks.s2.hdfs.rollSize=102400
a1.sinks.s2.hdfs.rollCount=30
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text

a1.sinks.s3.type=hdfs
a1.sinks.s3.hdfs.path=hdfs://master:8020/flume/customInterceptor/s3/%Y-%m-%d-%H
a1.sinks.s3.hdfs.useLocalTimeStamp=true
a1.sinks.s3.hdfs.filePrefix=regex
a1.sinks.s3.hdfs.rollInterval=0
a1.sinks.s3.hdfs.rollSize=102400
a1.sinks.s3.hdfs.rollCount=30
a1.sinks.s3.hdfs.fileType=DataStream
a1.sinks.s3.hdfs.writeFormat=Text

6.4.5 启动agent

[root@master flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n a1 -Dflume.root.logger=INFO,console

6.4.6 测试:


七、选择器的使用

7.1、说明

Flume中的Channel选择器作用于source阶段 ,是决定Source接受的特定事件写入到哪个Channel的组件,他们告诉Channel处理器,然后由其将事件写入到Channel。

Agent中各个组件的交互

由于Flume不是两阶段提交,事件被写入到一个Channel,然后事件在写入下一个Channel之前提交,如果写入一个Channel出现异常,那么之前已经写入到其他Channel的相同事件不能被回滚。当这样的异常发生时,Channel处理器抛出ChannelException异常,事务失败,如果Source试图再次写入相同的事件(大多数情况下,会再次写入,只有Syslog,Exec等Source不能重试,因为没有办法生成相同的数据),重复的事件将写入到Channel中,而先前的提交是成功的,这样在Flume中就发生了重复。

Channel选择器的配置是通过Channel处理器完成的,Channel选择器可以指定一组Channel是必须的,另一组的可选的。

Flume分类两种选择器,如果Source配置中没有指定选择器,那么会自动使用复制Channel选择器.

  • replicating:该选择器复制每个事件到通过Source的Channels参数指定的所有Channel中。
  • multiplexing:是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入到哪个Channel,基于一个特定的事件头的值进行路由

7.2、案例演示:replicating selector

7.2.1 配置方案

[root@master flumeconf]# vi rep.conf
a1.sources = r1  
a1.channels = c1 c2
a1.sinks = s1 s2

a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=replicating

a1.channels.c1.type=memory

a1.channels.c2.type=memory


a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

7.2.2 启动agent的服务:

[root@master flumeconf]# flume-ng agent -c ../conf -f ./rep.conf -n a1 -Dflume.root.logger=INFO,console

7.2.3 测试:

[root@master ~]# echo "hello world hello qianfeng" | nc master 6666

7.3、案例演示:Multiplexing selector

7.3.1 配置方案

[root@master flumeconf]# vi mul.conf
a1.sources = r1  
a1.channels = c1 c2
a1.sinks = s1 s2

a1.sources.r1.type=http
a1.sources.r1.bind = master
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
a1.sources.r1.selector.default = c1

a1.channels.c1.type=memory

a1.channels.c2.type=memory


a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

7.3.2 启动agent的服务:

[root@master flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console

7.3.3 测试:

[root@master ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://master:6666
[root@master ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"}]' http://master:6666

7.4、自定义拦截器

7.4.0、需求:

为了提高Flume的扩展性,用户可以自己定义一个拦截器

将event的body中的数据,以数字开头的,存储为 hdfs://qianfeng01:8020/flume/number.log   s1
将event的body中的数据,以字母开头的,存储为 hdfs://qianfeng1:8020/flume/character.log   s2
将event的body中的数据,其他的开头的,存储为 hdfs: //qianfeng01:8020/flume/other.log  s3

7.4.1、pom.xml

可以参考/code/pom. xml

<!-- https: //mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.8.0</version>
</dependency>

7.4.2、代码

public class MyInterceptor implements Interceptor {

    private static final String LOCATION_KEY = "location";

    private static final String LOCATION_NUMBER = "number";

    private static final String LOCATION_CHARACTER = "character";

    private static final String LOCATION_OTHER = "other";

    /**
     * 当拦截到单个Event的时候调用
     *
     * @param event 被拦截到的Event
     * @return 这个拦截器发送出去的Event,如果返回null,则这个Event将被丢弃
     */
    @Override
    public Event intercept(Event event) {

        // 1. 获取到拦截器拦截到Event的body部分
        byte[] body = event.getBody();
        // 2. 验证是否是以数字开头的
        if (body[0] > '0' && body[0] <= '9') {
            event.getHeaders().put(LOCATION_KEY, LOCATION_NUMBER);
        } else if (body[0] >= 'a' && body[0] <= 'z' || (body[0] >= 'A' && body[0] <= 'Z')) {
            event.getHeaders().put(LOCATION_KEY, LOCATION_CHARACTER);
        } else {
            event.getHeaders().put(LOCATION_KEY, LOCATION_OTHER);
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void initialize() {
    }

    @Override
    public void close() {
    }

    public static class MyBuilder implements Builder {

        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

7.4.3、打包上传

使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下

7.4.4、编写方案

a1.sources=r1
a1.channels=c1 c2 c3
a1.sinks=s1 s2 s3
a1.sourres.r1.channels=c1 c2 c3
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
a1.sinks.s3.channel=c3

#设置source的属性
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=master
a1.sources.r1.port=12345

#设置拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.qf.flume.MyInterceptor$MyBuilder

#设置选择器的属性
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=location
a1.sources.r1.selector.mapping.number=c1
a1.sources.r1.selector.mapping.character=c2
a1.sources.r1.selector.mapping.other=c3

#设置channel的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c3.type=memory
a1.channels.c3.capacity=1000

#设置sink的属性
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
a1.sinks.s1.hdfs.useLocalTimeStamp=true
ai.sinks.s1.hdfs.filePrefix=regex
a1.sinks.s1.hdfs.rollInterval=0
a1.sinks.s1.hdfs.rollSize=102400
a1.sinks.s1.hdfs.rollCount=30
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text

a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/customInterceptor/s2/%Y-%m-%d-%H
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sinks.s2.hdfs.filePrefix=regex
a1.sinks.s2.hdfs.rollInterval=0
a1.sinks.s2.hdfs.rollSize=102400
a1.sinks.s2.hdfs.rollCount=30
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text

a1.sinks.s3.type=hdfs
a1.sinks.s3.hdfs.path=hdfs://master:8020/flume/customInterceptor/s3/%Y-%m-%d-%H
a1.sinks.s3.hdfs.useLocalTimeStamp=true
a1.sinks.s3.hdfs.filePrefix=regex
a1.sinks.s3.hdfs.rollInterval=0
a1,sinks.s3.hdfs.rollSize=102400
a1.sinks.s3.hdfs.rollCount=30
a1.sinks.s3.hdfs.fileType=DataStream
a1.sinks.s3.hdfs.writeFormat=Text

6.4.5、启动agent

[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf/-f ./mytest.conf -n a1 -Dflume.root.logger=INF0,console

测试

版权声明:程序员胖胖胖虎阿 发表于 2023年9月2日 下午6:00。
转载请注明:海量日志采集工具——Flume | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...