在Kubernetes日志收集的系列文章里,我们分部介绍了:
- 安装生产可用、高安全的Elasticsearch集群+Kibana:安装Elasticsearch 7.4集群(开启集群Auth + Transport SSL)以及 Kibana & Keystore
- 安装了Zookeeper & Kafka生产可用的集群:安装配置Zookeeper和Kafka集群
最终的架构图如下所示:
架构说明:
- 所有的beats,这里主要说明filebeat,都使用DaemonSet的方式安装的Kubernetes集群上,收集各个node节点的容器日志,并做简单的处理,例如多行等,然后将数据发送到Kafka集群中。
- Kafka 集群这里主要安装在Kubernetes集群以外,负责解耦、缓冲beats发来的数据,同时也可以存储处理好的日志,作为中转站,方便其他需要后期处理日志的工具订阅数据。
- Logstash 可以安装在Kubernetes集群内部或者外部,从kafka收集beat发来的日志,做一些处理,例如:geoip获取IP地址的地理位置,处理Useragent等,然后将处理好的日志clone一份,同时输出到Elasticsearch集群不同index和Kafka中不同的topic上,方便其他流处理工具进行后期的实时计算。如果日志量很大,单台logstash无法支撑,可以启动多台logstash,属于同一个消费者组,这里可以分担处理。注意:最大的logstash台数不要超过topic的partitions数量,否则,多出来的logstash也是闲置的。
- Elasticsearch这里主要安装在Kubernetes集群以外,负责存储、查询、分析日志。
- Kibana 可以安装在Kubernetes集群内部或者外部,通过web UI界面查看Elasticsearch数据,查询,并出图。
这里使用Kafka而不是用Redis的原因如下:
- Redis是内存型,而Kafka是硬盘型,日志毕竟是很大的数据,所以作为缓冲,放到Kafka里更合适;
- kafka中已经被处理的日志也是会继续保存的,直到超过自己设定的过期时间,而redis不是;
- kafka生态比较好,可以方便的和流处理工具集成
综上:redis适合日质量比较小的系统,而kafka适用于比较大的日志。因为都需要保证高可用,推荐搭建Kafka集群。
环境说明:
Elasticsearch集群:版本:7.4.2
集群地址:
IP Port Note 172.17.0.87 9200 es01 172.17.0.87 9201 es02 172.17.0.87 9202 es02 Kafka集群:版本:2.0.0
IP Port Note 172.17.0.87 9092 kafka01 172.17.0.87 9093 kafka02 172.17.0.87 9094 kafka03 Kubernetes 1.15.4
Filebeat + Logstash版本:7.4.2
Filebeat
方案一: Filebeat收集K8S pod日志,直接发送到Elasticsearch中
这里将filebeat安装在Kubernetes集群中,以收集K8S pod的日志,并将日志直接发送到ES集群当中。
参考:
https://github.com/elastic/beats/tree/v7.4.2/deploy/kubernetes
https://github.com/elastic/beats/blob/v7.4.2/deploy/kubernetes/filebeat-kubernetes.yaml
我们首先下载官方的filebeat-kubernetes.yaml文件,进行修改:
$ wget -c "https://github.com/elastic/beats/blob/v7.4.2/deploy/kubernetes/filebeat-kubernetes.yaml" |
然后直接应用:
$ kubectl apply -f filebeat-kubernetes.yaml |
检查状态:
$ kubectl -n kube-system get po -l k8s-app=filebeat |
我们在创建测试用Pod:
cat testlogs-pods.yaml |
查看测试pod的日志格式:
$ kubectl logs busybox-xxxxxxx |
此时我们到Kibana上查看:
增加filebeat-*的index,然后搜索:kubernetes.labels.app:"busybox"
收集的日志格式如下:
Josn格式如下:
{ |
最终的效果说明:
- 所有的K8S Pod日志都被收集到了一个filebeat的index中
- 未对pod中的json日志做格式化解析
显然效果不太好。
方案二: Filebeat收集K8S pod日志,直接发送到Elasticsearch中,通过filebeat的自动发现功能
参考:
https://www.elastic.co/guide/en/beats/filebeat/current/configuration-autodiscover.html 自动发现配置
https://www.elastic.co/guide/en/beats/filebeat/current/configuration-autodiscover-hints.html 自动发现hints配置
根据官方的filebeat-kubernetes.yaml文件中configMap部分,我们发现有一小段的注释:
# To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this: |
我们更改filebeat的configMap配置如下:
-- |
我们应用后,发现结果是跟通过filebeat.inputs:
方式直接采集/var/log/containers/*.log的结果是一样的。
但是我们在官方文档中发现:hints
有特殊的玩法。我们测试一下,更改configMap的配置如下:
|
我们应用后,发现kibana中最近一分钟已经没有日志了。结果是对的,我们更改我们之前的测试pod:
apiVersion: apps/v1 |
我们查看一下该pod的日志:
$ kubectl logs busybox-xxxxxxx |
我们再到Kibana中查看,发现只有该pod的日志,并且多行的配置已经生效了。
但是呢,测试pod中的json字段还是没有解析,好吧,我们根据hints文档,更改pod的annotations:
apiVersion: apps/v1 |
我们这里增加了一个processors decode_json_fields
,官网参考:
https://www.elastic.co/guide/en/beats/filebeat/current/filtering-and-enhancing-data.html processors
https://www.elastic.co/guide/en/beats/filebeat/current/decode-json-fields.html decode_json_fields配置
再次查看Kibana,发现json日志已经被解析了:
方案三: 通过Filebeat自动发现功能收集K8S pod日志,直接发送到Elasticsearch的不同index中
好了,还有一个问题,就是所有的pod日志都输出到了一个叫filebeat-xxx的index中了,我们如何自定义将不通的服务输出到不同的index呢?
还是更改filebeat的configMap如下:
|
输出到elasticsearch中的配置,参考官方:https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html
filebeat的条件判断配置,参考官方:https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#conditions
我们通过正则匹配kubernetes.labels.app中的值,如果匹配busybox.*的则输出到busybox-%{+yyyy.MM.dd} index中,如果不匹配的还是走默认filebeat index中。
我们在kibana中增加index:busybox-*,发现里边有了我们的busybox容器的日志。
好了,大部分问题都解决了(解析容器Josn日志,输出到不同的index中),但是K8S中的其他pod怎么办呢,难道都需要配置annotations嘛,那也太麻烦了,我们测试一下在开启hints的默认配置:
-- |
应用后,我们发现其他的pod日志都存在默认的index中:filebeat-xxxx,而我们的测试pod busybox还是输出在busybox-xxx中,并且还是能够解析json日志的。
所以我们得出结论:
- 开启hints的默认配置,以收集所有容器的日志
- 对已特殊容器,需要解析json的,或者配置多行的,我们通过annotations对该容器进行特殊配置
好了,我们进行最终的方案:
方案四: Filebeat收集K8S pod日志,通过自动发现功能,发送到Kafka集群中
更改filebeat的configMap:
|
关于如何配置Kafka的输出,请参考官方文档:https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
创建所需的Topic:
$ ./kafka01/bin/kafka-topics.sh --zookeeper zk01:2181 --create --topic filebeat --partitions 5 --replication-factor 1 |
好了,应用filebeat的配置后,我们启动一个消费者看看是否有日志近来:
./kafka01/bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9093,kafka03:9094 --topic filebeat --from-beginning |
收到日志如下:
收到的: |
测试没有问题,接下来,我们按照logstash,来从filebeat topic采集日志。
Logstash
安装
我们使用二进制的方式安装:
$ tar xf logstash-7.4.2.tar.gz -C /data/knner |
目录结构:
.zip
and .tar.gz
Type | Description | Default Location | Setting |
---|---|---|---|
home | Home directory of the Logstash installation. | {extract.path}- Directory created by unpacking the archive |
|
bin | Binary scripts, including logstash to start Logstash and logstash-plugin to install plugins |
{extract.path}/bin |
|
settings | Configuration files, including logstash.yml and jvm.options |
{extract.path}/config |
path.settings |
logs | Log files | {extract.path}/logs |
path.logs |
plugins | Local, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only. | {extract.path}/plugins |
path.plugins |
data | Data files used by logstash and its plugins for any persistence needs. | {extract.path}/data |
path.data |
Debian and RPM
Type | Description | Default Location | Setting |
---|---|---|---|
home | Home directory of the Logstash installation. | /usr/share/logstash |
|
bin | Binary scripts including logstash to start Logstash and logstash-plugin to install plugins |
/usr/share/logstash/bin |
|
settings | Configuration files, including logstash.yml , jvm.options , and startup.options |
/etc/logstash |
path.settings |
conf | Logstash pipeline configuration files | /etc/logstash/conf.d/*.conf |
See /etc/logstash/pipelines.yml |
logs | Log files | /var/log/logstash |
path.logs |
plugins | Local, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only. | /usr/share/logstash/plugins |
path.plugins |
data | Data files used by logstash and its plugins for any persistence needs. | /var/lib/logstash |
path.data |
Docker images:
没有logs的目录,所有的日志都是标准输出。
Type | Description | Default Location | Setting |
---|---|---|---|
home | Home directory of the Logstash installation. | /usr/share/logstash |
|
bin | Binary scripts, including logstash to start Logstash and logstash-plugin to install plugins |
/usr/share/logstash/bin |
|
settings | Configuration files, including logstash.yml and jvm.options |
/usr/share/logstash/config |
path.settings |
conf | Logstash pipeline configuration files | /usr/share/logstash/pipeline |
path.config |
plugins | Local, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only. | /usr/share/logstash/plugins |
path.plugins |
data | Data files used by logstash and its plugins for any persistence needs. | /usr/share/logstash/data |
path.data |
配置
参考:https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html
在Logstash主要分为三步:input
-> filter
-> output
。
input { |
值类型
string
user => "knner"
lists
path => [ "/var/log/messages", "/var/log/*.log" ]
uris => [ "http://elastic.co", "http://example.net" ]Boolean
true
orfalse
,注意不需要引号ssl_enable => true
Bytes
其实就是string类型,包括了bytes的单位:支持:k,M,G,T Ki,Mi,Gi,Timy_bytes => "1113" # 1113 bytes
my_bytes => "10MiB" # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytesCodec
在logstash中叫做编码解码器,用于在input中解码,在output中编码。默认是:plaincodec => "json"
Hash
就是kv格式,"field1" => "value1"
写法如下:match => {
"field1" => "value1"
"field2" => "value2"
...
}
# or as a single line. No commas between entries:
match => { "field1" => "value1" "field2" => "value2" }Number
数字,包括浮点数(floating)和整数(integer)port => 33
Password
就是string类型,只是不会被打印出来my_password => "password"
URI
可以是完整的域名:http://elastic.co
,可以是简单的字符串:foobar
。如果是格式:http://user:pass@knner.wang
此时的密码也不会被打印。my_uri => "http://foo:bar@knner.wang"
Path
系统路径,string类型my_path => "/tmp/logstash"
Comments 注释
# 我就是注释
input { # comments can appear at the end of a line, too
...
}
在Logstash使用Events和fields中的数据
参考:https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html
引用field字段
最简单的语法就是[fieldname]
来引用名为fieldname的字段;
如果该字段是顶级的,可以省略[]
,而直接使用fieldname
。
如果该字段是嵌套的,你必须指定该field的绝对路径:[top-level-field][nested-field]
。
例如:
{ |
如果引用os
字段,需要指定:[ua][os]
如果引用ip
字段,可以通过[ip]
或者直接ip
的方式
printf format
字段引用格式也在Logstash中使用,称作:print format
。使用改格式,你可以获取字段中的值,进行动态的配置。
例如:输出elasticsearch中的index上,你可以引用字段的值,来动态输出到不同的index中
output { |
所以呢,从上面的apache log中,根据不同的响应码,而输出到不同的index中,例如:状态码为200的,将输出到index:apache-200-2019.12.15
,状态码为503的将输出到index:apache-503-2019.12.15
中。
条件语句
在logstash中的filter
,output
中可以使用条件语句进行不同的操作。
logstash中的条件语句关键词有if,else if,else
,等同于shell中的if,elif,else
,语法如下:
if EXPRESSION { |
EXPRESSION
表达式中可以使用如下操作符:
比较操作符:
- 等于:== 不等于:!= 大于:> 小于:< 大于等于:>= 小于等于:<=
- 正则匹配:=~ 正则不匹配:!~
- in,not in
布尔运算符:
- and(与),or(或),nand(非与),xor(非或)
一元运算符:
- !
表达式可以很长或者很复杂,可以使用!
取反,使用()
进行分组运算。
例如:当action
字段值为login
时,使用mutate
移除secret
字段
filter { |
可以在一行指定多个表达式:
output { |
您可以使用in
操作符来测试字段是否包含特定的字符串、键或(用于列表)元素:
filter { |
在条件句中使用not in
也是一样的。例如,当grok成功时,你可以使用not in
将事件路由到Elasticsearch:
output { |
你没有办法判断一个字段是否存在,还是其值是false或者空值:
例如:表达式if [foo]
在下面的情况下都是false
:
foo
字段不存在foo
字段是flasefoo
字段存在,但是值为空值
@metadata 字段
在logstash中有一个特殊的字段@metadata
,该字段在output的时候是看不见的,它会在output的时候被清除掉。最佳实践就是通过@metadata
字段进行条件判断。
例如:从stdin
进入的都会存放到message
字段中,通过使用mutate
在filter中增加字段:
input { stdin { } } |
我们运行一下看看,会发生什么:
$ bin/logstash -f ../test.conf |
我们发现,添加的[@metadata][test]
和[@metadata][no_show]
并没有看到。
在rubydebug的output中可以使用metadata => true
来显示@metadata
字段,注意:只有在rubydebug中才允许使用metadata => true
查看@metadata
字段。
stdout { codec => rubydebug { metadata => true } } |
再次查看:
$ bin/logstash -f ../test.conf |
可以使用@metadata
作为临时的字段,用于判断,但是有不想在output中真实的看到该字段。
例如:使用来自apache,nginx日志中的timestamp,用完之后,你还需要删除timestamp。但是通过@metadata
则不需要删除,如下:
input { stdin { } } |
解释:使用grok 来通过正则匹配日志:正则的匹配格式为:HTTPDATE,已经内置在logstash中,可以在GitHub上查看,如下:
https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
MONTH \b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|Mm?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|Oo?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b
YEAR (?>\d\d){1,2}
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
INT (?:[+-]?(?:[0-9]+))
将匹配到的存入到:[@metadata][timestamp]
中,然后date在从[@metadata][timestamp]
获取以替代@timestamp。
测试如下:
$ bin/logstash -f ../test.conf |
我们了解了Logstash的基本配置后,测试一下:
input { |
启动
查看一下logstash的命令帮助:
$ ./bin/logstash --help |
启动Logstash:
$ cd logstash |
因为我们将日志输出到了ES的filebeat-kafka的index中,我们去Kibana中增加:filebeat-kafka-*
index。
然后查看kibana,日志都已经收集到了,之前的busybox的message也解析正常。
在Kubernetes环境中运行Logstash
如果你想在Kubernetes中跑Logstash,请参考:
可以使用非官方的Chart:
https://hub.kubeapps.com/charts/bitnami/logstash/0.2.3 对应logstash7.4.2
https://hub.kubeapps.com/charts/bitnami/logstash/0.2.4 对应logstash7.5.0
官方的Chart:
https://github.com/elastic/helm-charts 不过只有最新版本的7.5
这里使用elastic 官方的logstash 7.5.0 helm chart更改image版本为7.4.2而成的。
添加Helm repo:
$ helm repo add bitnami https://charts.bitnami.com/bitnami |
下载elastic/logstash:
$ helm search repo logstash |
编辑values.yaml:
logstashPipeline: |
从哪里知道,docker image支持哪些环境变量呢?
Docker 启动脚本:
https://github.com/elastic/logstash/tree/7.4/docker/data/logstash/bin
主要有两个:
环境变量转换到配置文件
env2yaml /usr/share/logstash/config/logstash.yml
启动logstash
关于logstash docker环境变量的配置:
https://github.com/elastic/logstash/blob/7.4/docker/data/logstash/env2yaml/env2yaml.go
XPACK_MONITORING_ENABLED |
Helm dry-run 查看:
$ helm install logstash ./ --dry-run --debug |
然后安装即可:
$ helm install logstash ./ |