logstash从数据源拉取日志,然后通过tcp插件发送到proxy进程中。在业务侧发现日志量明显少了,所以有了这一次的问题排查。
首先从logstash侧开始检查。我们先看logstash的日志,没有明显的报错信息。
然后再查看logstash管道的状态。可以很明显的看到,在output管道中,in远远大于out,也就是logstash拉取的日志已经到了output管道,但是无法输出出去,并且duration_in_millis时间很长,这个代表着发出去的速率很慢,这是什么原因呢?
curl -XGET 'localhost:9600/_node/stats/pipelines/azure_event_hubs?pretty'
{
...
"outputs" : [ {
"id" : "99b12e190d297be5d6113d04cf10089a3dccbaef7eed0cc41515e8e5af5f4595",
"name" : "tcp",
"events" : {
"in" : 341,
"out" : 69,
"duration_in_millis" : 519709
}
}
}
要么是发送方的原因,要么是接收方的原因。我先从发送方进行排查,我在output管道中,除了tcp插件外,还添加了stdout插件,也就是日志来了除了会通过tcp发送外,还会打印在标准输出中。
output {
tcp {
...
}
stdout {}
}
然后等待一段时间,然后再查看该管道的信息,stdout插件的in和out完全相等,但tcp插件in和out还是相差甚大,也就是output管道应该没问题。
我再假设proxy端有问题。日志是可以从logstash端发送到proxy端的,只是很慢,并且还有其他数据源也在往proxy端发送日志,也没有这个问题,所以我突然想到,该数据源的日志很大,会不会是这个原因导致的呢?
我从上面标准输出中抓了一条日志出来,134k大小,然后我手动的用nc命令将日志发送到proxy,因为日志很大,我是将日志写入到文件,然后再用管道的方式发送的
cat test.txt | nc
通过查看proxy的日志发现,其根本没有收到该条日志。那么问题原因找到了,就是因为日志太大,导致日志发生了丢失。
proxy服务的是golang写的,通过查看代码,这里使用了bufio.NewScanner来循环读取连接中的数据。
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
// 处理数据
msg := scanner.Text()
...
查看NewScanner方法可以看到有一个maxTokenSize参数,然后用的默认值MaxScanTokenSize
func NewScanner(r io.Reader) *Scanner {
return &Scanner{
r: r,
split: ScanLines,
maxTokenSize: MaxScanTokenSize,
}
}
再跳转,有一个初始化缓存大小startBufSize
为4k和最大的缓存大小MaxScanTokenSize
为64k。但是我们的日志大小为134k,已经大于最大大小了,所以无法接收到该日志,也就是因为这个原因导致了日志发生了丢失。
const (
MaxScanTokenSize = 64 * 1024
startBufSize = 4096
)
我们再看下Scan方法,有一段代码如下,如果拿到的数据的大小大于maxTokenSize,则会使用s.setErr(ErrTooLong)
记录错误,然后返回false
func (s *Scanner) Scan() bool {
..
const maxInt = int(^uint(0) >> 1)
if len(s.buf) >= s.maxTokenSize || len(s.buf) > maxInt/2 {
s.setErr(ErrTooLong)
return false
}
newSize := len(s.buf) * 2
if newSize == 0 {
newSize = startBufSize
}
if newSize > s.maxTokenSize {
newSize = s.maxTokenSize
}
newBuf := make([]byte, newSize)
copy(newBuf, s.buf[s.start:s.end])
...
}
但是我们在业务代码中,并没有判断该错误,也就是如果Scan方法虽然返回了false,循环结束了,但是并没有任何错误信息。也就是无法发现该问题。
将TCP的最大缓存大小修改为配置文件可配置的,这样如果日志很大,可以修改配置增大缓存上限。库中有提供Buffer
方法来设置该上限。
在Scan发生错误时,打印错误日志,代码如下:
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
// 处理数据
msg := scanner.Text()
...
if err := scanner.Err(); err != nil {
log.Errorf("扫描输入时发生错误:%s", err)
}
我的个人博客
公众号:编程黑洞