配置filebeat
grep -Ev “^ #|$|#|^ #” /data/filebeat/filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /data/nginx_logs/nginx-access-*.logfields: #在日志中增加一个字段,字段为log_topic,值为nginx_access,logstash根据带有这个字段的日志存储到指定的es索引库app_name: nginx-appnameprofiles_active: proapp_node: nginx_hostnamefields_under_root: truetail_files: trueinclude_lines: ['/apis/order/save'] #只收集日志中的指定行
filebeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: false
setup.template.settings:index.number_of_shards: 3
setup.kibana:
processors:- add_host_metadata:when.not.contains.tags: forwarded- add_cloud_metadata: ~- add_docker_metadata: ~- add_kubernetes_metadata: ~
output.kafka: #输出到kafka系统enabled: truehosts: ["kafka1:9092","kafka2:9092","kafka3:9092"] #kafka的地址topic: 'nginx_appname_topic' #指定将日志存储到kafka集群的哪个topic中,这里的topic值是引用在inputs中定义的fields,通过这种方式可以将不同路径的日志分别存储到不同的topic中username: kafka_userpassword: kafka_password
配置logstash
cat /usr/local/app/logstash/config/logstash.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.input {kafka { #类型为kafkabootstrap_servers => ["kafka1:9092,kafka2:9092,kafka3:9092"] #kafka集群地址group_id => 'logstash_groupname_consumer'topics => ["pro_log_topic","test_log_topic","uat_log_topic","nginx_appname_topic"] #要读取那些kafka topicsclient_id => "appname_pro_logs"consumer_threads => 3sasl_mechanism => "PLAIN"security_protocol => "SASL_PLAINTEXT"sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka_user' password='kafka_password';"codec => "json" #处理json格式的数据auto_offset_reset => "latest" #只消费最新的kafka数据}kafka { #类型为kafkabootstrap_servers => ["kafkaip:9092"] #kafka集群地址group_id => 'logstash_groupname_consumer2'topics => ["topic"] #要读取那些kafka topicsclient_id => "appname_test_logs"consumer_threads => 3sasl_mechanism => "PLAIN"security_protocol => "SASL_PLAINTEXT"sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka_user' password='kafka_password';"codec => "json" #处理json格式的数据auto_offset_reset => "latest" #只消费最新的kafka数据}}filter {mutate {lowercase => ["app_name"]remove_field => ["_index","_id","_type","_version","_score","referer","agent","@version"] #删除没用的字段}date {match => ["date", "yyyy-MM-dd HH:mm:ss.SSS"]target => '@timestamp'timezone => 'Asia/Shanghai'}ruby{code => "event.set('index_day', (event.get('@timestamp').time.localtime).strftime('%Y.%m.%d'))"}
}output {elasticsearch {hosts => ["172.19.189.179:9200","172.19.38.38:9200","172.19.38.39:9200"]index => "%{[app_name]}-%{[profiles_active]}-%{index_day}"#index => "%{[app_name]}-%{[profiles_active]}-%{+YYYY.MM.dd}"codec => "json"user => "elastic"password => "esappname0227"}
}