0%

Elastic-Filebeat 实现原理剖析

Filebeat 是使用 Golang 实现的轻量型日志采集器,也是 Elasticsearch stack 里面的一员。本质上是一个 agent,可以安装在各个节点上,根据配置读取对应位置的日志,并上报到相应的地方去。

Filebeat 的可靠性很强,可以保证日志 At least once 的上报,同时也考虑了日志搜集中的各类问题,例如日志断点续读、文件名更改、日志 Truncated 等。

Filebeat 并不依赖于 Elasticsearch,可以单独存在。我们可以单独使用 Filebeat 进行日志的上报和搜集。filebeat 内置了常用的 Output 组件, 例如 kafka、Elasticsearch、redis 等。出于调试考虑,也可以输出到 console 和 file。我们可以利用现有的 Output 组件,将日志进行上报。

当然,我们也可以自定义 Output 组件,让 Filebeat 将日志转发到我们想要的地方。

filebeat 其实是 elastic/beats 的一员,除了 filebeat 外,还有 HeartBeat、PacketBeat。这些 beat 的实现都是基于 libbeat 框架。

整体架构

下图是 Filebeat 官方提供的架构图:

filebeat

除了图中提到的各个组件,整个 filebeat 主要包含以下重要组件:

  1. Crawler:负责管理和启动各个 Input
  2. Input:负责管理和解析输入源的信息,以及为每个文件启动 Harvester。可由配置文件指定输入源信息。
  3. Harvester: Harvester 负责读取一个文件的信息。
  4. Pipeline: 负责管理缓存、Harvester 的信息写入以及 Output 的消费等,是 Filebeat 最核心的组件。
  5. Output: 输出源,可由配置文件指定输出源信息。
  6. Registrar:管理记录每个文件处理状态,包括偏移量、文件名等信息。当 Filebeat 启动时,会从 Registrar 恢复文件处理状态。

filebeat 的整个生命周期,几个组件共同协作,完成了日志从采集到上报的整个过程。

日志采集流程

Filebeat 不仅支持普通文本日志的作为输入源,还内置支持了 redis 的慢查询日志、stdin、tcp 和 udp 等作为输入源。

本文只分析下普通文本日志的处理方式,对于普通文本日志,可以按照以下配置方式,指定 log 的输入源信息。

1
2
3
4
5
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log

其中 Input 也可以指定多个, 每个 Input 下的 Log 也可以指定多个。

filebeat 启动时会开启 Crawler,对于配置中的每条 Input,Crawler 都会启动一个 Input 进行处理,代码如下所示:

1
2
3
4
5
6
7
8
9
10
func (c *Crawler) Start(...){
...
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig, r.GetStates())
if err != nil {
return err
}
}
...
}

由于指定的 paths 可以配置多个,而且可以是 Glob 类型,因此 Filebeat 将会匹配到多个配置文件。

Input 对于每个匹配到的文件,都会开启一个 Harvester 进行逐行读取,每个 Harvester 都工作在自己的的 goroutine 中。

Harvester 的工作流程非常简单,就是逐行读取文件,并更新该文件暂时在 Input 中的文件偏移量(注意,并不是 Registrar 中的偏移量),读取完成则结束流程。

同时,我们需要考虑到,日志型的数据其实是在不断增长和变化的:

  1. 会有新的日志在不断产生
  2. 可能一个日志文件对应的 Harvester 退出后,又再次有了内容更新。

为了解决这两个情况,filebeat 采用了 Input 定时扫描的方式。代码如下,可以看出,Input 扫描的频率是由用户指定的 scan_frequency 配置来决定的 (默认 10s 扫描一次)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (p *Runner) Run() {
p.input.Run()

if p.Once {
return
}

for {
select {
case <-p.done:
logp.Info("input ticker stopped")
return
case <-time.After(p.config.ScanFrequency): // 定时扫描
logp.Debug("input", "Run input")
p.input.Run()
}
}
}

此外,如果用户启动时指定了 --once 选项,则扫描只会进行一次,就退出了。

日志定时扫描及异常处理

我们之前讲到 Registrar 会记录每个文件的状态,当 Filebeat 启动时,会从 Registrar 恢复文件处理状态。

其实在 filebeat 运行过程中,Input 组件也记录了文件状态。不一样的是,Registrar 是持久化存储,而 Input 中的文件状态仅表示当前文件的读取偏移量,且修改时不会同步到磁盘中。

每次,Filebeat 刚启动时,Input 都会载入 Registrar 中记录的文件状态,作为初始状态。Input 中的状态有两个非常重要:

  1. offset: 代表文件当前读取的 offset,从 Registrar 中初始化。Harvest 读取文件后,会同时修改 offset。
  2. finished: 代表该文件对应的 Harvester 是否已经结束,Harvester 开始时置为 false,结束时置为 False。

对于每次定时扫描到的文件,概括来说,会有三种大的情况:

  1. Input 找不到该文件状态的记录, 说明是新增文件,则开启一个 Harvester,从头开始解析该文件
  2. 如果可以找到文件状态,且 finished 等于 false。这个说明已经有了一个 Harvester 在处理了,这种情况直接忽略就好了。
  3. 如果可以找到文件状态,且 finished 等于 true。说明之前有 Harvester 处理过,但已经处理结束了。

对于这种第三种情况,我们需要考虑到一些异常情况,Filebeat 是这么处理的:

  1. 如果 offset 大于当前文件大小:说明文件被 Truncate 过,此时按做一个新文件处理,直接从头开始解析该文件
  2. 如果 offset 小于当前文件大小,说明文件内容有新增,则从上次 offset 处继续读即可。

对于第二种情况,Filebeat 似乎有一个逻辑上的问题: 如果文件被 Truncate 过,后来又新增了数据,且文件大小也比之前 offset 大,那么 Filebeat 是检查不出来这个问题的。

除此之外,一个比较有意思的点是,Filebeat 甚至可以处理文件名修改的问题。即使一个日志的文件名被修改过,Filebeat 重启后,也能找到该文件,从上次读过的地方继续读。

这是因为 Filebeat 除了在 Registrar 存储了文件名,还存储了文件的唯一标识。对于 Linux 来说,这个文件的唯一标识就是该文件的 inode ID + device ID。

至此,我们可以清楚的知道,Filebeat 是如何采集日志文件,同时做到监听日志文件的更新和修改。而日志采集过程,Harvest 会将数据写到 Pipeline 中。我们接下来看下数据是如何写入到 Pipeline 中的。

Pipeline 的写入

Haveseter 会将数据写入缓存中,而另一方面 Output 会从缓存将数据读走。整个生产消费的过程都是由 Pipeline 进行调度的,而整个调度过程也非常复杂。

此外,Filebeat 的缓存目前分为 memqueue 和 spool。memqueue 顾名思义就是内存缓存,spool 则是将数据缓存到磁盘中。本文将基于 memqueue 讲解整个调度过程。

我们首先看下 Haveseter 是如何将数据写入缓存中的,如下图所示:

Harvester 通过 pipeline 提供的 pipelineClient 将数据写入到 pipeline 中,Haveseter 会将读到的数据会包装成一个 Event 结构体,再递交给 pipeline。

在 Filebeat 的实现中,pipelineClient 并不直接操作缓存,而是将 event 先写入一个 events channel 中。

同时,有一个 eventloop 组件,会监听 events channel 的事件到来,等 event 到达时,eventloop 会将其放入缓存中。

当缓存满的时候,eventloop 直接移除对该 channel 的监听。
每次 event ACK 或者取消后,缓存不再满了,则 eventloop 会重新监听 events channel。

以上是 Pipeline 的写入过程,此时 event 已被写入到了缓存中。
但是 Output 是如何从缓存中拿到 event 数据的?

Pipeline 的消费过程

整个消费的过程非常复杂,数据会在多个 channel 之间传递流转,如下图所示:

首先再介绍两个角色:

  1. consumer: pipeline 在创建的时候,会同时创建一个 consumer。consumer 负责从缓存中取数据
  2. client worker:负责接收 consumer 传来的数据,并调用 Output 的 Publish 函数进行上报。

与 producer 类似,consumer 也不直接操作缓存,而是会向 get channel 中写入消费请求。
consumer 本身是个后台 loop 的过程,这个消费请求会不断进行。

eventloop 监听 get channel, 拿到之后会从缓存中取数据。并将数据写入到 resp channel 中。
consumer 从 resp channel 中拿到 event 数据后,又会将其写入到 workQueue。

workQueue 也是个 channel。client worker 会监听该 channel 上的数据到来,将数据交给 Output client 进行 Publish 上报。

而且,Output 收到的是 Batch Events,即会一次收到一批 Events。BatchSize 由各个 Output 自行决定。

至此,消息已经递交给了 Output 组件。

Ack 机制

filebeat 之所以可以保证日志可以 at least once 的上报,就是基于其 Ack 机制。

简单来说,Ack 机制就是,当 Output Publish 成功之后会调用 ACK,最终 Registrar 会收到 ACK,并修改偏移量。

而且, Registrar 只会在 Output 调用 batch 的相关信号时,才改变文件偏移量。其中 Batch 对外提供了这些信号:

1
2
3
4
5
6
7
8
9
10
11
type Batch interface {
Events() []Event

// signals
ACK()
Drop()
Retry()
RetryEvents(events []Event)
Cancelled()
CancelledEvents(events []Event)
}

Output 在 Publish 之后,无论失败,必须调用这些函数中的其中一个。

以下是 Output Publish 成功后调用 Ack 的流程:
ack

可以看到其中起核心作用的组件是 Ackloop。AckLoop 中有一个 ackChanList,其中每一个 ackChan,对应于转发给 Output 的一个 Batch。
每次新建一个 Batch,同时会建立一个 ackChan,该 ackChan 会被 append 到 ackChanList 中。

而 AckLoop 每次只监听处于 ackChanList 最头部的 ackChan。

当 Batch 被 Output 调用 Ack 后,AckLoop 会收到对应 ackChan 上的事件,并将其最终转发给 Registrar。同时,ackChanList 将会 pop 头部的 ackChan,继续监听接下来的 Ack 事件。

总结

了解了 Filebeat 的实现原理,我们才有会明白 Filebeat 配置中各个参数对程序的最终影响。同时,由于 FileBeat 是 At least once 的上报,但并不保证 Exactly once, 因此一条数据可能会被上报多次,所以接收端需要自行进行去重过滤。

相关阅读

参考

cyhone wechat