0%

使用filebeat + kafka + logstash收集处理kubernetes日志,配置收集处理nginx-ingress json日志

在Kubernetes日志收集的系列文章里,我们分部介绍了:

最终的架构图如下所示:

架构说明:

  • 所有的beats,这里主要说明filebeat,都使用DaemonSet的方式安装的Kubernetes集群上,收集各个node节点的容器日志,并做简单的处理,例如多行等,然后将数据发送到Kafka集群中。
  • Kafka 集群这里主要安装在Kubernetes集群以外,负责解耦、缓冲beats发来的数据,同时也可以存储处理好的日志,作为中转站,方便其他需要后期处理日志的工具订阅数据。
  • Logstash 可以安装在Kubernetes集群内部或者外部,从kafka收集beat发来的日志,做一些处理,例如:geoip获取IP地址的地理位置,处理Useragent等,然后将处理好的日志clone一份,同时输出到Elasticsearch集群不同index和Kafka中不同的topic上,方便其他流处理工具进行后期的实时计算。如果日志量很大,单台logstash无法支撑,可以启动多台logstash,属于同一个消费者组,这里可以分担处理。注意:最大的logstash台数不要超过topic的partitions数量,否则,多出来的logstash也是闲置的。
  • Elasticsearch这里主要安装在Kubernetes集群以外,负责存储、查询、分析日志。
  • Kibana 可以安装在Kubernetes集群内部或者外部,通过web UI界面查看Elasticsearch数据,查询,并出图。

这里使用Kafka而不是用Redis的原因如下:

  • Redis是内存型,而Kafka是硬盘型,日志毕竟是很大的数据,所以作为缓冲,放到Kafka里更合适;
  • kafka中已经被处理的日志也是会继续保存的,直到超过自己设定的过期时间,而redis不是;
  • kafka生态比较好,可以方便的和流处理工具集成

综上:redis适合日质量比较小的系统,而kafka适用于比较大的日志。因为都需要保证高可用,推荐搭建Kafka集群。

环境说明:

  • Elasticsearch集群:版本:7.4.2

    集群地址:

    IP Port Note
    172.17.0.87 9200 es01
    172.17.0.87 9201 es02
    172.17.0.87 9202 es02
  • Kafka集群:版本:2.0.0

    IP Port Note
    172.17.0.87 9092 kafka01
    172.17.0.87 9093 kafka02
    172.17.0.87 9094 kafka03
  • Kubernetes 1.15.4

  • Filebeat + Logstash版本:7.4.2

Filebeat

方案一: Filebeat收集K8S pod日志,直接发送到Elasticsearch中

这里将filebeat安装在Kubernetes集群中,以收集K8S pod的日志,并将日志直接发送到ES集群当中。

参考:

https://github.com/elastic/beats/tree/v7.4.2/deploy/kubernetes

https://github.com/elastic/beats/blob/v7.4.2/deploy/kubernetes/filebeat-kubernetes.yaml

我们首先下载官方的filebeat-kubernetes.yaml文件,进行修改:

$ wget -c "https://github.com/elastic/beats/blob/v7.4.2/deploy/kubernetes/filebeat-kubernetes.yaml"
$ vim filebeat-kubernetes.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
namespace: kube-system
labels:
k8s-app: filebeat
data:
filebeat.yml: |-
filebeat.inputs:
- type: container
paths:
- /var/log/containers/*.log
processors:
- add_kubernetes_metadata: # 增加kubernetes的属性
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
# To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this:
#filebeat.autodiscover:
# providers:
# - type: kubernetes
# host: ${NODE_NAME}
# hints.enabled: true
# hints.default_config:
# type: container
# paths:
# - /var/log/containers/*${data.kubernetes.container.id}.log
processors:
- add_cloud_metadata: # 增加cloud属性,我这里是aws环境
- add_host_metadata: # 增加k8s node节点属性
cloud.id: ${ELASTIC_CLOUD_ID}
cloud.auth: ${ELASTIC_CLOUD_AUTH}
output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: filebeat
namespace: kube-system
labels:
k8s-app: filebeat
spec:
selector:
matchLabels:
k8s-app: filebeat
template:
metadata:
labels:
k8s-app: filebeat
spec:
serviceAccountName: filebeat
terminationGracePeriodSeconds: 30
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: filebeat
image: docker.elastic.co/beats/filebeat:7.4.2
args: [
"-c", "/etc/filebeat.yml",
"-e",
]
env:
- name: ELASTICSEARCH_HOST
value: "172.17.0.87" # 指定ES的地址
- name: ELASTICSEARCH_PORT
value: "9200" # 指定ES的端口
- name: ELASTICSEARCH_USERNAME
value: admin # 指定ES的用户名
- name: ELASTICSEARCH_PASSWORD
value: admin123 # 指定ES的密码
- name: ELASTIC_CLOUD_ID
value:
- name: ELASTIC_CLOUD_AUTH
value:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
runAsUser: 0
# If using Red Hat OpenShift uncomment this:
#privileged: true
resources:
limits:
memory: 200Mi
requests:
cpu: 100m
memory: 100Mi
volumeMounts:
- name: config
mountPath: /etc/filebeat.yml
readOnly: true
subPath: filebeat.yml
- name: data
mountPath: /usr/share/filebeat/data
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: varlog
mountPath: /var/log
readOnly: true
volumes:
- name: config
configMap:
defaultMode: 0600
name: filebeat-config
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: varlog
hostPath:
path: /var/log
# data folder stores a registry of read status for all files, so we don't send everything again on a Filebeat pod restart
- name: data
hostPath:
path: /var/lib/filebeat-data
type: DirectoryOrCreate
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: filebeat
subjects:
- kind: ServiceAccount
name: filebeat
namespace: kube-system
roleRef:
kind: ClusterRole
name: filebeat
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: filebeat
labels:
k8s-app: filebeat
rules:
- apiGroups: [""] # "" indicates the core API group
resources:
- namespaces
- pods
verbs:
- get
- watch
- list
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: filebeat
namespace: kube-system
labels:
k8s-app: filebeat
---

然后直接应用:

$ kubectl apply -f filebeat-kubernetes.yaml

检查状态:

$ kubectl -n kube-system get po -l k8s-app=filebeat
NAME READY STATUS RESTARTS AGE
filebeat-brk5b 1/1 Running 0 3d13h
filebeat-wbwpx 1/1 Running 0 3d13h
filebeat-z672h 1/1 Running 0 3d13h

我们在创建测试用Pod:

cat testlogs-pods.yaml 
apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox
spec:
replicas: 1
selector:
matchLabels:
app: busybox
template:
metadata:
labels:
app: busybox
spec:
containers:
- name: busybox
image: busybox
imagePullPolicy: IfNotPresent
command:
- /bin/sh
- -c
args:
- num=1;while true;do echo "{\"mydate\":\"$(date)\",\"num\":$((num=num+1))}";sleep 5;done

查看测试pod的日志格式:

$ kubectl logs busybox-xxxxxxx
{"mydate":"Wed Dec 11 07:20:08 UTC 2019","num":2}
{"mydate":"Wed Dec 11 07:20:13 UTC 2019","num":3}
{"mydate":"Wed Dec 11 07:20:18 UTC 2019","num":4}
{"mydate":"Wed Dec 11 07:20:23 UTC 2019","num":5}
{"mydate":"Wed Dec 11 07:20:28 UTC 2019","num":6}

此时我们到Kibana上查看:

增加filebeat-*的index,然后搜索:kubernetes.labels.app:"busybox" 收集的日志格式如下:

Josn格式如下:

{
"_index": "filebeat-7.4.1-2019.12.05-000001",
"_type": "_doc",
"_id": "-kce724B-DEl4DemmqgT",
"_version": 1,
"_score": null,
"_source": {
"@timestamp": "2019-12-10T09:23:07.847Z",
"input": {
"type": "container"
},
"kubernetes": {
"node": {
"name": "k8s01.test.awsbj.cn"
},
"container": {
"name": "busybox",
"image": "busybox:latest"
},
"namespace": "default",
"replicaset": {
"name": "busybox-dc8c98dbd"
},
"labels": {
"pod-template-hash": "dc8c98dbd",
"app": "busybox"
},
"pod": {
"uid": "7fae0c4a-6979-4563-926f-6187fc67815d",
"name": "busybox-dc8c98dbd-g9jt2"
}
},
"ecs": {
"version": "1.1.0"
},
"host": {
"name": "k8s01.test.awsbj.cn",
"hostname": "k8s01.test.awsbj.cn",
"architecture": "x86_64",
"os": {
"platform": "centos",
"version": "7 (Core)",
"family": "redhat",
"name": "CentOS Linux",
"kernel": "4.14.138-114.102.amzn2.x86_64",
"codename": "Core"
},
"containerized": true
},
"cloud": {
"provider": "aws",
"instance": {
"id": "i-066bf576192908763"
},
"machine": {
"type": "m4.xlarge"
},
"region": "cn-north-1",
"availability_zone": "cn-north-1a",
"account": {
"id": "600060780818"
},
"image": {
"id": "ami-00c02e45635d96f87"
}
},
"log": {
"offset": 158428,
"file": {
"path": "/var/log/containers/busybox-dc8c98dbd-g9jt2_default_busybox-1445d3c7e06d462fce96c0ed7da25d0105a555bedf68c3a55cb123f42f4cac47.log"
}
},
"stream": "stdout",
"message": "{\"mydate\":\"Tue Dec 10 09:23:07 UTC 2019\",\"num\":1240}",
"agent": {
"type": "filebeat",
"ephemeral_id": "f9a8eb22-62bd-4b65-969b-74ea0e55fa8f",
"hostname": "k8s01.test.awsbj.cn",
"id": "32551f33-2dcf-4a5c-bacd-64ce83433f34",
"version": "7.4.1"
}
},
"fields": {
"suricata.eve.timestamp": [
"2019-12-10T09:23:07.847Z"
],
"@timestamp": [
"2019-12-10T09:23:07.847Z"
]
},
"highlight": {
"kubernetes.labels.app": [
"@kibana-highlighted-field@busybox@/kibana-highlighted-field@"
]
},
"sort": [
1575969787847
]
}

最终的效果说明:

  • 所有的K8S Pod日志都被收集到了一个filebeat的index中
  • 未对pod中的json日志做格式化解析

显然效果不太好。

方案二: Filebeat收集K8S pod日志,直接发送到Elasticsearch中,通过filebeat的自动发现功能

参考:

https://www.elastic.co/guide/en/beats/filebeat/current/configuration-autodiscover.html 自动发现配置

https://www.elastic.co/guide/en/beats/filebeat/current/configuration-autodiscover-hints.html 自动发现hints配置

根据官方的filebeat-kubernetes.yaml文件中configMap部分,我们发现有一小段的注释:

# To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this:
#filebeat.autodiscover:
# providers:
# - type: kubernetes
# host: ${NODE_NAME}
# hints.enabled: true
# hints.default_config:
# type: container
# paths:
# - /var/log/containers/*${data.kubernetes.container.id}.log

我们更改filebeat的configMap配置如下:

--
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
namespace: kube-system
labels:
k8s-app: filebeat
data:
filebeat.yml: |-
filebeat.autodiscover: # 使用autodiscover功能
providers:
- type: kubernetes
host: ${NODE_NAME}
hints.enabled: true
hints.default_config:
type: container
paths:
- /var/log/containers/*${data.kubernetes.container.id}.log
processors:
- add_cloud_metadata:
- add_host_metadata:
cloud.id: ${ELASTIC_CLOUD_ID}
cloud.auth: ${ELASTIC_CLOUD_AUTH}
output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
---

我们应用后,发现结果是跟通过filebeat.inputs: 方式直接采集/var/log/containers/*.log的结果是一样的。

但是我们在官方文档中发现:hints 有特殊的玩法。我们测试一下,更改configMap的配置如下:

---
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
namespace: kube-system
labels:
k8s-app: filebeat
data:
filebeat.yml: |-
filebeat.autodiscover:
providers:
- type: kubernetes
host: ${NODE_NAME}
hints.enabled: true
hints.default_config.enabled: false # 我们这里将默认配置给关闭了
processors:
- add_cloud_metadata:
- add_host_metadata:
cloud.id: ${ELASTIC_CLOUD_ID}
cloud.auth: ${ELASTIC_CLOUD_AUTH}
output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}

我们应用后,发现kibana中最近一分钟已经没有日志了。结果是对的,我们更改我们之前的测试pod:

apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox
spec:
replicas: 1
selector:
matchLabels:
app: busybox
template:
metadata:
labels:
app: busybox
annotations: # 增加了如下annotations,开启该pod的日志收集,并配置multiline多行
co.elastic.logs/enabled: 'true'
co.elastic.logs/multiline.pattern: '^\{'
co.elastic.logs/multiline.negate: 'true'
co.elastic.logs/multiline.match: 'after'
spec:
containers:
- name: busybox
image: busybox
imagePullPolicy: IfNotPresent
command:
- /bin/sh
- -c
args:
- num=1;while true;do echo "{\"mydate\":\"$(date)\",\"num\":$((num=num+1))}";echo "wanghk-${num}";sleep 5;done # 这里也更改了

我们查看一下该pod的日志:

$ kubectl logs busybox-xxxxxxx
{"mydate":"Wed Dec 11 07:20:08 UTC 2019","num":2}
wanghk-2
{"mydate":"Wed Dec 11 07:20:13 UTC 2019","num":3}
wanghk-3
{"mydate":"Wed Dec 11 07:20:18 UTC 2019","num":4}
wanghk-4
{"mydate":"Wed Dec 11 07:20:23 UTC 2019","num":5}
wanghk-5
{"mydate":"Wed Dec 11 07:20:28 UTC 2019","num":6}

我们再到Kibana中查看,发现只有该pod的日志,并且多行的配置已经生效了。

但是呢,测试pod中的json字段还是没有解析,好吧,我们根据hints文档,更改pod的annotations:

apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox
spec:
replicas: 1
selector:
matchLabels:
app: busybox
template:
metadata:
labels:
app: busybox
annotations:
co.elastic.logs/enabled: 'true'
co.elastic.logs/multiline.pattern: '^\{'
co.elastic.logs/multiline.negate: 'true'
co.elastic.logs/multiline.match: 'after'
co.elastic.logs/processors.1.decode_json_fields.fields: "message" # 使用decode_json_fields
co.elastic.logs/processors.1.decode_json_fields.add_error_key: "true"
co.elastic.logs/processors.1.decode_json_fields.overwrite_keys: "true"
co.elastic.logs/processors.1.decode_json_fields.target: ""
spec:
containers:
- name: busybox
image: busybox
imagePullPolicy: IfNotPresent
command:
- /bin/sh
- -c
args:
- num=1;while true;do echo "{\"mydate\":\"$(date)\",\"num\":$((num=num+1))}";sleep 5;done

我们这里增加了一个processors decode_json_fields,官网参考:

https://www.elastic.co/guide/en/beats/filebeat/current/filtering-and-enhancing-data.html processors

https://www.elastic.co/guide/en/beats/filebeat/current/decode-json-fields.html decode_json_fields配置

再次查看Kibana,发现json日志已经被解析了:

方案三: 通过Filebeat自动发现功能收集K8S pod日志,直接发送到Elasticsearch的不同index中

好了,还有一个问题,就是所有的pod日志都输出到了一个叫filebeat-xxx的index中了,我们如何自定义将不通的服务输出到不同的index呢?

还是更改filebeat的configMap如下:

---
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
namespace: kube-system
labels:
k8s-app: filebeat
data:
filebeat.yml: |-
filebeat.autodiscover:
providers:
- type: kubernetes
host: ${NODE_NAME}
hints.enabled: true
hints.default_config.enabled: false
processors:
- add_cloud_metadata:
- add_host_metadata:
cloud.id: ${ELASTIC_CLOUD_ID}
cloud.auth: ${ELASTIC_CLOUD_AUTH}
output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
indices:
- index: "busybox-%{+yyyy.MM.dd}"
when.regexp: # 通过when.regexp 正则表达式匹配
kubernetes.labels.app: busybox.*

输出到elasticsearch中的配置,参考官方:https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html

filebeat的条件判断配置,参考官方:https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#conditions

我们通过正则匹配kubernetes.labels.app中的值,如果匹配busybox.*的则输出到busybox-%{+yyyy.MM.dd} index中,如果不匹配的还是走默认filebeat index中。

我们在kibana中增加index:busybox-*,发现里边有了我们的busybox容器的日志。

好了,大部分问题都解决了(解析容器Josn日志,输出到不同的index中),但是K8S中的其他pod怎么办呢,难道都需要配置annotations嘛,那也太麻烦了,我们测试一下在开启hints的默认配置:

--
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
namespace: kube-system
labels:
k8s-app: filebeat
data:
filebeat.yml: |-
filebeat.autodiscover: # 使用autodiscover功能
providers:
- type: kubernetes
host: ${NODE_NAME}
hints.enabled: true
hints.default_config: # 开启默认的配置
type: container
paths:
- /var/log/containers/*${data.kubernetes.container.id}.log
processors:
- add_cloud_metadata:
- add_host_metadata:
cloud.id: ${ELASTIC_CLOUD_ID}
cloud.auth: ${ELASTIC_CLOUD_AUTH}
output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
indices:
- index: "busybox-%{+yyyy.MM.dd}"
when.regexp:
kubernetes.labels.app: busybox.*
---

应用后,我们发现其他的pod日志都存在默认的index中:filebeat-xxxx,而我们的测试pod busybox还是输出在busybox-xxx中,并且还是能够解析json日志的。

所以我们得出结论:

  • 开启hints的默认配置,以收集所有容器的日志
  • 对已特殊容器,需要解析json的,或者配置多行的,我们通过annotations对该容器进行特殊配置

好了,我们进行最终的方案:

方案四: Filebeat收集K8S pod日志,通过自动发现功能,发送到Kafka集群中

更改filebeat的configMap:

---
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
namespace: kube-system
labels:
k8s-app: filebeat
data:
filebeat.yml: |-
filebeat.autodiscover:
providers:
- type: kubernetes
host: ${NODE_NAME}
hints.enabled: true
hints.default_config:
type: container
paths:
- /var/log/containers/*${data.kubernetes.container.id}.log
processors:
- add_cloud_metadata:
- add_host_metadata:
cloud.id: ${ELASTIC_CLOUD_ID}
cloud.auth: ${ELASTIC_CLOUD_AUTH}
output:
elasticsearch:
enabled: false # 关闭了elasticsearch的输出
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
indices:
- index: "busybox-%{+yyyy.MM.dd}"
when.regexp:
kubernetes.labels.app: busybox.*
kafka:
enabled: true # 增加kafka的输出
hosts: ["172.17.0.87:9092","172.17.0.87:9093","172.17.0.87:9094"]
topic: filebeat
max_message_bytes: 5242880
partition.round_robin:
reachable_only: true
keep-alive: 120
required_acks: 1

关于如何配置Kafka的输出,请参考官方文档:https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html

创建所需的Topic:

$ ./kafka01/bin/kafka-topics.sh --zookeeper zk01:2181 --create --topic filebeat --partitions 5 --replication-factor 1
Created topic "filebeat".

$ ./kafka01/bin/kafka-topics.sh --zookeeper zk01:2181 --list
__consumer_offsets
filebeat
my-replicated-topic
test

$ ./kafka01/bin/kafka-topics.sh --zookeeper zk01:2181 --describe --topic filebeat
Topic:filebeat PartitionCount:5 ReplicationFactor:1 Configs:
4Topic: filebeat Partition: 0 Leader: 0 Replicas: 0 Isr: 0
4Topic: filebeat Partition: 1 Leader: 1 Replicas: 1 Isr: 1
4Topic: filebeat Partition: 2 Leader: 2 Replicas: 2 Isr: 2
4Topic: filebeat Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: filebeat Partition: 4 Leader: 1 Replicas: 1 Isr: 1

好了,应用filebeat的配置后,我们启动一个消费者看看是否有日志近来:

./kafka01/bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9093,kafka03:9094 --topic filebeat --from-beginning

收到日志如下:

收到的:
{"@timestamp":"2019-12-11T13:45:55.958Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.4.1","topic":"filebeat"},"kubernetes":{"namespace":"default","replicaset":{"name":"busybox-bf68bcffd"},"labels":{"app":"busybox","pod-template-hash":"bf68bcffd"},"pod":{"name":"busybox-bf68bcffd-h7595","uid":"f7a1f370-938d-401e-8a54-8f12a0b0cf3d"},"node":{"name":"k8s01.test.awsbj.cn"},"container":{"name":"busybox","image":"busybox"}},"num":2566,"stream":"stdout","message":"{\"mydate\":\"Wed Dec 11 13:45:55 UTC 2019\",\"num\":2566}","input":{"type":"container"},"host":{"name":"k8s01.test.awsbj.cn","hostname":"k8s01.test.awsbj.cn","architecture":"x86_64","os":{"family":"redhat","name":"CentOS Linux","kernel":"4.14.138-114.102.amzn2.x86_64","codename":"Core","platform":"centos","version":"7 (Core)"},"containerized":true},"agent":{"type":"filebeat","ephemeral_id":"74d3b9f0-f9f3-4ead-b82b-7dd6a214046e","hostname":"k8s01.test.awsbj.cn","id":"32551f33-2dcf-4a5c-bacd-64ce83433f34","version":"7.4.1"},"cloud":{"image":{"id":"ami-00c02e45635d96f87"},"instance":{"id":"i-066bf576192908763"},"provider":"aws","machine":{"type":"m4.xlarge"},"region":"cn-north-1","availability_zone":"cn-north-1a","account":{"id":"600060780818"}},"log":{"offset":329362,"file":{"path":"/var/lib/docker/containers/3ca6b693a233756466f827451f66745a3199b0bfa32c54f68422706295baa053/3ca6b693a233756466f827451f66745a3199b0bfa32c54f68422706295baa053-json.log"}},"mydate":"Wed Dec 11 13:45:55 UTC 2019","ecs":{"version":"1.1.0"}}

格式化后的:
{
"@metadata": {
"beat": "filebeat",
"topic": "filebeat",
"type": "_doc",
"version": "7.4.1"
},
"@timestamp": "2019-12-11T13:45:55.958Z",
"agent": {
"ephemeral_id": "74d3b9f0-f9f3-4ead-b82b-7dd6a214046e",
"hostname": "k8s01.test.awsbj.cn",
"id": "32551f33-2dcf-4a5c-bacd-64ce83433f34",
"type": "filebeat",
"version": "7.4.1"
},
"cloud": {
"account": {
"id": "600060780818"
},
"availability_zone": "cn-north-1a",
"image": {
"id": "ami-00c02e45635d96f87"
},
"instance": {
"id": "i-066bf576192908763"
},
"machine": {
"type": "m4.xlarge"
},
"provider": "aws",
"region": "cn-north-1"
},
"ecs": {
"version": "1.1.0"
},
"host": {
"architecture": "x86_64",
"containerized": true,
"hostname": "k8s01.test.awsbj.cn",
"name": "k8s01.test.awsbj.cn",
"os": {
"codename": "Core",
"family": "redhat",
"kernel": "4.14.138-114.102.amzn2.x86_64",
"name": "CentOS Linux",
"platform": "centos",
"version": "7 (Core)"
}
},
"input": {
"type": "container"
},
"kubernetes": {
"container": {
"image": "busybox",
"name": "busybox"
},
"labels": {
"app": "busybox",
"pod-template-hash": "bf68bcffd"
},
"namespace": "default",
"node": {
"name": "k8s01.test.awsbj.cn"
},
"pod": {
"name": "busybox-bf68bcffd-h7595",
"uid": "f7a1f370-938d-401e-8a54-8f12a0b0cf3d"
},
"replicaset": {
"name": "busybox-bf68bcffd"
}
},
"log": {
"file": {
"path": "/var/lib/docker/containers/3ca6b693a233756466f827451f66745a3199b0bfa32c54f68422706295baa053/3ca6b693a233756466f827451f66745a3199b0bfa32c54f68422706295baa053-json.log"
},
"offset": 329362
},
"message": "{\"mydate\":\"Wed Dec 11 13:45:55 UTC 2019\",\"num\":2566}",
"mydate": "Wed Dec 11 13:45:55 UTC 2019",
"num": 2566,
"stream": "stdout"
}

测试没有问题,接下来,我们按照logstash,来从filebeat topic采集日志。

Logstash

安装

我们使用二进制的方式安装:

$ tar xf logstash-7.4.2.tar.gz -C /data/knner
$ cd /data/knner
$ ln -s logstash-7.4.2 logstash

目录结构:

.zip and .tar.gz

Type Description Default Location Setting
home Home directory of the Logstash installation. {extract.path}- Directory created by unpacking the archive
bin Binary scripts, including logstash to start Logstash and logstash-plugin to install plugins {extract.path}/bin
settings Configuration files, including logstash.yml and jvm.options {extract.path}/config path.settings
logs Log files {extract.path}/logs path.logs
plugins Local, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only. {extract.path}/plugins path.plugins
data Data files used by logstash and its plugins for any persistence needs. {extract.path}/data path.data

Debian and RPM

Type Description Default Location Setting
home Home directory of the Logstash installation. /usr/share/logstash
bin Binary scripts including logstash to start Logstash and logstash-plugin to install plugins /usr/share/logstash/bin
settings Configuration files, including logstash.yml, jvm.options, and startup.options /etc/logstash path.settings
conf Logstash pipeline configuration files /etc/logstash/conf.d/*.conf See /etc/logstash/pipelines.yml
logs Log files /var/log/logstash path.logs
plugins Local, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only. /usr/share/logstash/plugins path.plugins
data Data files used by logstash and its plugins for any persistence needs. /var/lib/logstash path.data

Docker images:

没有logs的目录,所有的日志都是标准输出。

Type Description Default Location Setting
home Home directory of the Logstash installation. /usr/share/logstash
bin Binary scripts, including logstash to start Logstash and logstash-plugin to install plugins /usr/share/logstash/bin
settings Configuration files, including logstash.yml and jvm.options /usr/share/logstash/config path.settings
conf Logstash pipeline configuration files /usr/share/logstash/pipeline path.config
plugins Local, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only. /usr/share/logstash/plugins path.plugins
data Data files used by logstash and its plugins for any persistence needs. /usr/share/logstash/data path.data

配置

参考:https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html

在Logstash主要分为三步:input -> filter -> output

input {
...
}

filter {
...
}

output {
...
}

值类型

  • string

    user => "knner"
  • lists

    path => [ "/var/log/messages", "/var/log/*.log" ]
    uris => [ "http://elastic.co", "http://example.net" ]
  • Boolean
    true or false,注意不需要引号

    ssl_enable => true
  • Bytes
    其实就是string类型,包括了bytes的单位:支持:k,M,G,T Ki,Mi,Gi,Ti

    my_bytes => "1113"   # 1113 bytes
    my_bytes => "10MiB" # 10485760 bytes
    my_bytes => "100kib" # 102400 bytes
    my_bytes => "180 mb" # 180000000 bytes
  • Codec
    在logstash中叫做编码解码器,用于在input中解码,在output中编码。默认是:plain

    codec => "json"
  • Hash
    就是kv格式,"field1" => "value1" 写法如下:

    match => {
    "field1" => "value1"
    "field2" => "value2"
    ...
    }
    # or as a single line. No commas between entries:
    match => { "field1" => "value1" "field2" => "value2" }
  • Number
    数字,包括浮点数(floating)和整数(integer)

    port => 33
  • Password
    就是string类型,只是不会被打印出来

    my_password => "password"
  • URI
    可以是完整的域名:http://elastic.co,可以是简单的字符串:foobar。如果是格式:http://user:pass@knner.wang 此时的密码也不会被打印。

    my_uri => "http://foo:bar@knner.wang"
  • Path
    系统路径,string类型

    my_path => "/tmp/logstash"
  • Comments 注释

    # 我就是注释
    input { # comments can appear at the end of a line, too
    ...
    }

在Logstash使用Events和fields中的数据

参考:https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html

引用field字段

最简单的语法就是[fieldname]来引用名为fieldname的字段;

如果该字段是顶级的,可以省略[],而直接使用fieldname

如果该字段是嵌套的,你必须指定该field的绝对路径:[top-level-field][nested-field]

例如:

{
"agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
"ip": "192.168.24.44",
"request": "/index.html"
"response": {
"status": 200,
"bytes": 52353
},
"ua": {
"os": "Windows 7"
}
}

如果引用os 字段,需要指定:[ua][os]

如果引用ip 字段,可以通过[ip] 或者直接ip的方式

printf format

字段引用格式也在Logstash中使用,称作:print format。使用改格式,你可以获取字段中的值,进行动态的配置。

例如:输出elasticsearch中的index上,你可以引用字段的值,来动态输出到不同的index中

output {
elasticsearch {
host => ["localhost:9200"]
index => "apache-%{[response][status]}-%{+yyyy.MM.dd.HH}"
}
}

所以呢,从上面的apache log中,根据不同的响应码,而输出到不同的index中,例如:状态码为200的,将输出到index:apache-200-2019.12.15,状态码为503的将输出到index:apache-503-2019.12.15 中。

条件语句

在logstash中的filteroutput 中可以使用条件语句进行不同的操作。

logstash中的条件语句关键词有if,else if,else,等同于shell中的if,elif,else,语法如下:

if EXPRESSION {
...
}
else if EXPRESSION {
...
}
else {
...
}

EXPRESSION 表达式中可以使用如下操作符:

比较操作符:

  • 等于:== 不等于:!= 大于:> 小于:< 大于等于:>= 小于等于:<=
  • 正则匹配:=~ 正则不匹配:!~
  • in,not in

布尔运算符:

  • and(与),or(或),nand(非与),xor(非或)

一元运算符:

  • !

表达式可以很长或者很复杂,可以使用! 取反,使用() 进行分组运算。

例如:当action 字段值为login 时,使用mutate 移除secret 字段

filter {
if [action] == "login" {
mutate { remove_field => "secret" }
}
}

可以在一行指定多个表达式:

output {
# Send production errors to pagerduty
if [loglevel] == "ERROR" and [deployment] == "production" {
pagerduty {
...
}
}
}

您可以使用in操作符来测试字段是否包含特定的字符串、键或(用于列表)元素:

filter {
if [foo] in [foobar] {
mutate { add_tag => "field in field" }
}
if [foo] in "foo" {
mutate { add_tag => "field in string" }
}
if "hello" in [greeting] {
mutate { add_tag => "string in field" }
}
if [foo] in ["hello", "world", "foo"] {
mutate { add_tag => "field in list" }
}
if [missing] in [alsomissing] {
mutate { add_tag => "shouldnotexist" }
}
if !("foo" in ["hello", "world"]) {
mutate { add_tag => "shouldexist" }
}
}

在条件句中使用not in也是一样的。例如,当grok成功时,你可以使用not in将事件路由到Elasticsearch:

output {
if "_grokparsefailure" not in [tags] {
elasticsearch { ... }
}
}

你没有办法判断一个字段是否存在,还是其值是false或者空值:

例如:表达式if [foo]在下面的情况下都是false

  • foo 字段不存在
  • foo 字段是flase
  • foo 字段存在,但是值为空值
@metadata 字段

在logstash中有一个特殊的字段@metadata,该字段在output的时候是看不见的,它会在output的时候被清除掉。最佳实践就是通过@metadata 字段进行条件判断。

例如:从stdin 进入的都会存放到message 字段中,通过使用mutate在filter中增加字段:

input { stdin { } }

filter {
mutate { add_field => { "show" => "This data will be in the output" } }
mutate { add_field => { "[@metadata][test]" => "Hello" } }
mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}

output {
if [@metadata][test] == "Hello" {
stdout { codec => rubydebug }
}
}

我们运行一下看看,会发生什么:

$ bin/logstash -f ../test.conf
Pipeline main started
asdf # 手动输入的
{
"@timestamp" => 2016-06-30T02:42:51.496Z,
"@version" => "1",
"host" => "example.com",
"show" => "This data will be in the output",
"message" => "asdf" # 手动输入的放到了message字段中
}

我们发现,添加的[@metadata][test][@metadata][no_show] 并没有看到。

在rubydebug的output中可以使用metadata => true 来显示@metadata 字段,注意:只有在rubydebug中才允许使用metadata => true查看@metadata 字段。

stdout { codec => rubydebug { metadata => true } }

再次查看:

$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
"@timestamp" => 2016-06-30T02:46:48.565Z,
"@metadata" => { # 这里看到了我们添加的两个字段
"test" => "Hello",
"no_show" => "This data will not be in the output"
},
"@version" => "1",
"host" => "example.com",
"show" => "This data will be in the output",
"message" => "asdf"
}

可以使用@metadata 作为临时的字段,用于判断,但是有不想在output中真实的看到该字段。

例如:使用来自apache,nginx日志中的timestamp,用完之后,你还需要删除timestamp。但是通过@metadata 则不需要删除,如下:

input { stdin { } }

filter {
grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] }
date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] }
}

output {
stdout { codec => rubydebug }
}

解释:使用grok 来通过正则匹配日志:正则的匹配格式为:HTTPDATE,已经内置在logstash中,可以在GitHub上查看,如下:

https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

MONTH \b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|Mm?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|Oo?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b

YEAR (?>\d\d){1,2}

TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])

INT (?:[+-]?(?:[0-9]+))

将匹配到的存入到:[@metadata][timestamp] 中,然后date在从[@metadata][timestamp] 获取以替代@timestamp。

测试如下:

$ bin/logstash -f ../test.conf
Pipeline main started
02/Mar/2014:15:36:43 +0100 ## 手动输入
{
"@timestamp" => 2014-03-02T14:36:43.000Z,
"@version" => "1",
"host" => "example.com",
"message" => "02/Mar/2014:15:36:43 +0100"
}

我们了解了Logstash的基本配置后,测试一下:

logstash/config/filebeat-kafka.conf
input {
kafka {
bootstrap_servers => "kafka01:9092,kafka02:9093,kafka03:9094"
client_id => "logstash01"
topics => ["filebeat"]
group_id => "logstash"
decorate_events => true
codec => "json"
}
}

output {
elasticsearch {
hosts => ["http://es01:9200","http://es02:9201","http://es03:9202"]
index => "filebeat-kafka-%{+YYYY.MM.dd}"
user => "admin"
password => "admin123"
}
}

启动

查看一下logstash的命令帮助:

$ ./bin/logstash --help
Thread.exclusive is deprecated, use Thread::Mutex
Usage:
bin/logstash [OPTIONS]

Options:
-n, --node.name NAME Specify the name of this logstash instance, if no value is given
it will default to the current hostname.
(default: "test01.dev.awsbj.cn")
-f, --path.config CONFIG_PATH Load the logstash config from a specific file
or directory. If a directory is given, all
files in that directory will be concatenated
in lexicographical order and then parsed as a
single config file. You can also specify
wildcards (globs) and any matched files will
be loaded in the order described above.
-e, --config.string CONFIG_STRING Use the given string as the configuration
data. Same syntax as the config file. If no
input is specified, then the following is
used as the default input:
"input { stdin { type => stdin } }"
and if no output is specified, then the
following is used as the default output:
"output { stdout { codec => rubydebug } }"
If you wish to use both defaults, please use
the empty string for the '-e' flag.
(default: nil)
--field-reference-parser MODE (DEPRECATED) This option is no longer
configurable.

Use the given MODE when parsing field
references.

The field reference parser is used to expand
field references in your pipeline configs,
and has become more strict to better handle
ambiguous- and illegal-syntax inputs.

The only available MODE is:
- `STRICT`: parse in a strict manner; when
given ambiguous- or illegal-syntax input,
raises a runtime exception that should
be handled by the calling plugin.

(default: "STRICT")
--modules MODULES Load Logstash modules.
Modules can be defined using multiple instances
'--modules module1 --modules module2',
or comma-separated syntax
'--modules=module1,module2'
Cannot be used in conjunction with '-e' or '-f'
Use of '--modules' will override modules declared
in the 'logstash.yml' file.
-M, --modules.variable MODULES_VARIABLE Load variables for module template.
Multiple instances of '-M' or
'--modules.variable' are supported.
Ignored if '--modules' flag is not used.
Should be in the format of
'-M "MODULE_NAME.var.PLUGIN_TYPE.PLUGIN_NAME.VARIABLE_NAME=VALUE"'
as in
'-M "example.var.filter.mutate.fieldname=fieldvalue"'
--setup Load index template into Elasticsearch, and saved searches,
index-pattern, visualizations, and dashboards into Kibana when
running modules.
(default: false)
--cloud.id CLOUD_ID Sets the elasticsearch and kibana host settings for
module connections in Elastic Cloud.
Your Elastic Cloud User interface or the Cloud support
team should provide this.
Add an optional label prefix '<label>:' to help you
identify multiple cloud.ids.
e.g. 'staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy'
--cloud.auth CLOUD_AUTH Sets the elasticsearch and kibana username and password
for module connections in Elastic Cloud
e.g. 'username:<password>'
--pipeline.id ID Sets the ID of the pipeline.
(default: "main")
-w, --pipeline.workers COUNT Sets the number of pipeline workers to run.
(default: 4)
--java-execution Use Java execution engine.
(default: true)
--plugin-classloaders (Beta) Load Java plugins in independent classloaders to isolate their dependencies.
(default: false)
-b, --pipeline.batch.size SIZE Size of batches the pipeline is to work in.
(default: 125)
-u, --pipeline.batch.delay DELAY_IN_MS When creating pipeline batches, how long to wait while polling
for the next event.
(default: 50)
--pipeline.unsafe_shutdown Force logstash to exit during shutdown even
if there are still inflight events in memory.
By default, logstash will refuse to quit until all
received events have been pushed to the outputs.
(default: false)
--path.data PATH This should point to a writable directory. Logstash
will use this directory whenever it needs to store
data. Plugins will also have access to this path.
(default: "/data/knner/logstash/data")
-p, --path.plugins PATH A path of where to find plugins. This flag
can be given multiple times to include
multiple paths. Plugins are expected to be
in a specific directory hierarchy:
'PATH/logstash/TYPE/NAME.rb' where TYPE is
'inputs' 'filters', 'outputs' or 'codecs'
and NAME is the name of the plugin.
(default: [])
-l, --path.logs PATH Write logstash internal logs to the given
file. Without this flag, logstash will emit
logs to standard output.
(default: "/data/knner/logstash/logs")
--log.level LEVEL Set the log level for logstash. Possible values are:
- fatal
- error
- warn
- info
- debug
- trace
(default: "info")
--config.debug Print the compiled config ruby code out as a debug log (you must also have --log.level=debug enabled).
WARNING: This will include any 'password' options passed to plugin configs as plaintext, and may result
in plaintext passwords appearing in your logs!
(default: false)
-i, --interactive SHELL Drop to shell instead of running as normal.
Valid shells are "irb" and "pry"
-V, --version Emit the version of logstash and its friends,
then exit.
-t, --config.test_and_exit Check configuration for valid syntax and then exit.
(default: false)
-r, --config.reload.automatic Monitor configuration changes and reload
whenever it is changed.
NOTE: use SIGHUP to manually reload the config
(default: false)
--config.reload.interval RELOAD_INTERVAL How frequently to poll the configuration location
for changes, in seconds.
(default: 3000000000)
--http.host HTTP_HOST Web API binding host (default: "127.0.0.1")
--http.port HTTP_PORT Web API http port (default: 9600..9700)
--log.format FORMAT Specify if Logstash should write its own logs in JSON form (one
event per line) or in plain text (using Ruby's Object#inspect)
(default: "plain")
--path.settings SETTINGS_DIR Directory containing logstash.yml file. This can also be
set through the LS_SETTINGS_DIR environment variable.
(default: "/data/knner/logstash/config")
--verbose Set the log level to info.
DEPRECATED: use --log.level=info instead.
--debug Set the log level to debug.
DEPRECATED: use --log.level=debug instead.
--quiet Set the log level to info.
DEPRECATED: use --log.level=info instead.
-h, --help print help

启动Logstash:

$ cd logstash
$ ./bin/logstash -f config/filebeat-kafka.conf &

因为我们将日志输出到了ES的filebeat-kafka的index中,我们去Kibana中增加:filebeat-kafka-* index。

然后查看kibana,日志都已经收集到了,之前的busybox的message也解析正常。

在Kubernetes环境中运行Logstash

如果你想在Kubernetes中跑Logstash,请参考:

可以使用非官方的Chart:

https://hub.kubeapps.com/charts/bitnami/logstash/0.2.3 对应logstash7.4.2

https://hub.kubeapps.com/charts/bitnami/logstash/0.2.4 对应logstash7.5.0

官方的Chart:

https://github.com/elastic/helm-charts 不过只有最新版本的7.5

这里使用elastic 官方的logstash 7.5.0 helm chart更改image版本为7.4.2而成的。

添加Helm repo:

$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo add elastic https://helm.elastic.co
$ helm repo list
NAME URL
stable http://mirror.azure.cn/kubernetes/charts
elastic https://helm.elastic.co
bitnami https://charts.bitnami.com/bitnami
jetstack https://charts.jetstack.io
$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "stable" chart repository
...Successfully got an update from the "elastic" chart repository
...Successfully got an update from the "jetstack" chart repository
...Successfully got an update from the "bitnami" chart repository
Update Complete. ⎈ Happy Helming!⎈

下载elastic/logstash:

$ helm search repo logstash
NAME CHART VERSION APP VERSION DESCRIPTION
bitnami/logstash 0.2.4 7.5.0 Logstash is an open source, server-side data pr...
elastic/logstash 7.5.0 7.5.0 Official Elastic helm chart for Logstash
stable/dmarc2logstash 1.2.0 1.0.3 Provides a POP3-polled DMARC XML report injecto...
stable/logstash 2.3.0 7.1.1 Logstash is an open source, server-side data pr...

$ helm pull elastic/logstash

编辑values.yaml:

values.yaml
logstashPipeline:
uptime.conf: |
input {
kafka {
bootstrap_servers => "172.17.0.87:9092,172.17.0.87:9093,172.17.0.87:9094"
client_id => "${LOGSTASH_ID}"
topics => ["filebeat"]
group_id => "logstash"
decorate_events => true
codec => "json"
}
}

output {
elasticsearch {
hosts => ["172.17.0.87:9200","172.17.0.87:9201","172.17.0.87:9202"] # 不知如何使用环境变量的方式
index => "filebeat-kafka-%{+YYYY.MM.dd}"
user => "${XPACK_MONITORING_ELASTICSEARCH_USERNAME}"
password => "${XPACK_MONITORING_ELASTICSEARCH_PASSWORD}"
}
}

extraEnvs:
- name: LOG_LEVEL
value: info
- name: XPACK_MONITORING_ENABLED
value: "true"
- name: XPACK_MONITORING_ELASTICSEARCH_HOSTS
value: '"172.17.0.87:9200","172.17.0.87:9201","172.17.0.87:9202"'
- name: XPACK_MONITORING_ELASTICSEARCH_USERNAME
value: admin
- name: XPACK_MONITORING_ELASTICSEARCH_PASSWORD
value: admin123
- name: LOGSTASH_ID
valueFrom:
fieldRef:
fieldPath: metadata.name

image: "docker.elastic.co/logstash/logstash"
imageTag: "7.4.2"

volumeClaimTemplate:
storageClassName: nfs-client
accessModes: [ "ReadWriteMany" ]
resources:
requests:
storage: 500Mi

persistence:
enabled: true

service:
annotations: {}
type: ClusterIP
ports:
- name: http
port: 8080
protocol: TCP
targetPort: 8080

从哪里知道,docker image支持哪些环境变量呢?

Docker 启动脚本:

https://github.com/elastic/logstash/tree/7.4/docker/data/logstash/bin

主要有两个:

  1. 环境变量转换到配置文件

    env2yaml /usr/share/logstash/config/logstash.yml

  2. 启动logstash

关于logstash docker环境变量的配置:

https://github.com/elastic/logstash/blob/7.4/docker/data/logstash/env2yaml/env2yaml.go

XPACK_MONITORING_ENABLED
XPACK_MONITORING_ELASTICSEARCH_HOSTS
XPACK_MONITORING_ELASTICSEARCH_USERNAME
XPACK_MONITORING_ELASTICSEARCH_PASSWORD

分别对应配置:
"xpack.monitoring.enabled",
"xpack.monitoring.elasticsearch.hosts",
"xpack.monitoring.elasticsearch.username",
"xpack.monitoring.elasticsearch.password",

规则:全部大写,点和下划线互换。

Helm dry-run 查看:

$ helm install logstash ./ --dry-run --debug
install.go:148: [debug] Original chart version: ""
install.go:165: [debug] CHART PATH: /home/knner/kubernetes/felk/logstash/logstash-7.5.0

NAME: logstash
LAST DEPLOYED: Thu Dec 12 15:29:12 2019
NAMESPACE: default
STATUS: pending-install
REVISION: 1
TEST SUITE: None
USER-SUPPLIED VALUES:
{}

COMPUTED VALUES:
antiAffinity: hard
antiAffinityTopologyKey: kubernetes.io/hostname
extraContainers: ""
extraEnvs:
- name: LOG_LEVEL
value: warn
- name: XPACK_MONITORING_ENABLED
value: "true"
- name: XPACK_MONITORING_ELASTICSEARCH_HOSTS
value: '"172.17.0.87:9200","172.17.0.87:9201","172.17.0.87:9202"'
- name: XPACK_MONITORING_ELASTICSEARCH_USERNAME
value: admin
- name: XPACK_MONITORING_ELASTICSEARCH_PASSWORD
value: admin123
- name: LOGSTASH_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
extraInitContainers: ""
extraVolumeMounts: ""
extraVolumes: ""
fullnameOverride: ""
httpPort: 9600
image: docker.elastic.co/logstash/logstash
imagePullPolicy: IfNotPresent
imagePullSecrets: []
imageTag: 7.4.2
labels: {}
lifecycle: {}
livenessProbe:
failureThreshold: 3
httpGet:
path: /
port: http
initialDelaySeconds: 300
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 5
logstashConfig: {}
logstashJavaOpts: -Xmx1g -Xms1g
logstashPipeline:
uptime.conf: "input {\n kafka {\n bootstrap_servers => \"172.17.0.87:9092,172.17.0.87:9093,172.17.0.87:9094\"\n
\ client_id => \"${LOGSTASH_ID}\"\n topics => [\"filebeat\"]\n group_id
=> \"logstash\"\n decorate_events => true\n codec => \"json\"\n }\n}\n\noutput
{\n elasticsearch {\n hosts => [\"172.17.0.87:9200\",\"172.17.0.87:9201\",\"172.17.0.87:9202\"]\n
\ index => \"filebeat-kafka-%{+YYYY.MM.dd}\"\n user => \"${XPACK_MONITORING_ELASTICSEARCH_USERNAME}\"\n
\ password => \"${XPACK_MONITORING_ELASTICSEARCH_PASSWORD}\"\n } \n}\n"
maxUnavailable: 1
nameOverride: ""
nodeAffinity: {}
nodeSelector: {}
persistence:
annotations: {}
enabled: true
podAnnotations: {}
podManagementPolicy: Parallel
podSecurityContext:
fsGroup: 1000
runAsUser: 1000
podSecurityPolicy:
create: false
name: ""
spec:
fsGroup:
rule: RunAsAny
privileged: true
runAsUser:
rule: RunAsAny
seLinux:
rule: RunAsAny
supplementalGroups:
rule: RunAsAny
volumes:
- secret
- configMap
- persistentVolumeClaim
priorityClassName: ""
rbac:
create: false
serviceAccountName: ""
readinessProbe:
failureThreshold: 3
httpGet:
path: /
port: http
initialDelaySeconds: 60
periodSeconds: 10
successThreshold: 3
timeoutSeconds: 5
replicas: 1
resources:
limits:
cpu: 1000m
memory: 1536Mi
requests:
cpu: 100m
memory: 1536Mi
schedulerName: ""
secretMounts: []
securityContext:
capabilities:
drop:
- ALL
runAsNonRoot: true
runAsUser: 1000
service:
annotations: {}
ports:
- name: http
port: 8080
protocol: TCP
targetPort: 8080
type: ClusterIP
terminationGracePeriod: 120
tolerations: []
updateStrategy: RollingUpdate
volumeClaimTemplate:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 500Mi
storageClassName: nfs-client

HOOKS:
MANIFEST:
---
# Source: logstash/templates/poddisruptionbudget.yaml
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: "logstash-logstash-pdb"
labels:
app: "logstash-logstash"
chart: "logstash"
heritage: "Helm"
release: "logstash"
spec:
maxUnavailable: 1
selector:
matchLabels:
app: "logstash-logstash"
---
# Source: logstash/templates/configmap-pipeline.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-logstash-pipeline
labels:
app: "logstash-logstash"
chart: "logstash"
heritage: "Helm"
release: "logstash"
data:
uptime.conf: |
input {
kafka {
bootstrap_servers => "172.17.0.87:9092,172.17.0.87:9093,172.17.0.87:9094"
client_id => "${LOGSTASH_ID}"
topics => ["filebeat"]
group_id => "logstash"
decorate_events => true
codec => "json"
}
}

output {
elasticsearch {
hosts => ["172.17.0.87:9200","172.17.0.87:9201","172.17.0.87:9202"]
index => "filebeat-kafka-%{+YYYY.MM.dd}"
user => "${XPACK_MONITORING_ELASTICSEARCH_USERNAME}"
password => "${XPACK_MONITORING_ELASTICSEARCH_PASSWORD}"
}
}
---
# Source: logstash/templates/service.yaml
kind: Service
apiVersion: v1
metadata:
name: "logstash-logstash"
labels:
app: "logstash-logstash"
chart: "logstash"
heritage: "Helm"
release: "logstash"
annotations:
{}
spec:
type: ClusterIP
selector:
app: "logstash-logstash"
chart: "logstash"
heritage: "Helm"
release: "logstash"
ports:
- name: http
port: 8080
protocol: TCP
targetPort: 8080
---
# Source: logstash/templates/statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: logstash-logstash
labels:
app: "logstash-logstash"
chart: "logstash"
heritage: "Helm"
release: "logstash"
spec:
serviceName: logstash-logstash
selector:
matchLabels:
app: "logstash-logstash"
release: "logstash"
replicas: 1
podManagementPolicy: Parallel
updateStrategy:
type: RollingUpdate
volumeClaimTemplates:
- metadata:
name: logstash-logstash
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 500Mi
storageClassName: nfs-client
template:
metadata:
name: "logstash-logstash"
labels:
app: "logstash-logstash"
chart: "logstash"
heritage: "Helm"
release: "logstash"
annotations:


pipelinechecksum: ad971ab08f50f0c2dfa05246e11c97764cad8b47e4c904206176cb4006655da
spec:
securityContext:
fsGroup: 1000
runAsUser: 1000
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- "logstash-logstash"
topologyKey: kubernetes.io/hostname
terminationGracePeriodSeconds: 120
volumes:
- name: logstashpipeline
configMap:
name: logstash-logstash-pipeline
containers:
- name: "logstash"
securityContext:
capabilities:
drop:
- ALL
runAsNonRoot: true
runAsUser: 1000
image: "docker.elastic.co/logstash/logstash:7.4.2"
imagePullPolicy: "IfNotPresent"
livenessProbe:
failureThreshold: 3
httpGet:
path: /
port: http
initialDelaySeconds: 300
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 5
readinessProbe:
failureThreshold: 3
httpGet:
path: /
port: http
initialDelaySeconds: 60
periodSeconds: 10
successThreshold: 3
timeoutSeconds: 5
ports:
- name: http
containerPort: 9600
resources:
limits:
cpu: 1000m
memory: 1536Mi
requests:
cpu: 100m
memory: 1536Mi
env:
- name: LS_JAVA_OPTS
value: "-Xmx1g -Xms1g"
- name: LOG_LEVEL
value: warn
- name: XPACK_MONITORING_ENABLED
value: "true"
- name: XPACK_MONITORING_ELASTICSEARCH_HOSTS
value: '"172.17.0.87:9200","172.17.0.87:9201","172.17.0.87:9202"'
- name: XPACK_MONITORING_ELASTICSEARCH_USERNAME
value: admin
- name: XPACK_MONITORING_ELASTICSEARCH_PASSWORD
value: admin123
- name: LOGSTASH_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
volumeMounts:
- name: "logstash-logstash"
mountPath: /usr/share/logstash/data
- name: logstashpipeline
mountPath: /usr/share/logstash/pipeline/uptime.conf
subPath: uptime.conf

然后安装即可:

$ helm install logstash ./

示例:收集nginx-ingress-controller容器json日志,并将处理后的日志clone一份存储到kafka中

前提依赖,安装nginx-ingress-controller

如有疏忽错误欢迎在留言区评论指正,如果对您有所帮助欢迎点击下方进行打赏。