欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > openresty(nginx)+lua+kafka实现日志搜集系统

openresty(nginx)+lua+kafka实现日志搜集系统

2025/1/27 12:32:17 来源:https://blog.csdn.net/m0_63603104/article/details/145318617  浏览:    关键词:openresty(nginx)+lua+kafka实现日志搜集系统

        今天我们来实现一下快捷的nginx日志搜集系统,主讲的是nginx服务里面的openresty的日志搜集。采用的手段是采用lua做中间桥梁。

一、安装openresty

        具体安装步骤在这里《centos7 二进制安装openresty》

二、安装kafka

        1、安装Java及配置Java

                具体安装步骤在这里《采用ELK搭建日志平台,Java安装》

        2、安装ZooKeeper安装与配置

                a、下载ZooKeeper

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz

                b、解压到/usr/local/

mkdir -p /usr/local/zookeeper3.4
tar -zxf zookeeper-3.4.12.tar.gz -C /usr/local/zookeeper3.4 --strip-components=1 #--strip-components选项表示从目录级别上去除指定的前缀,以实现更加控制解压的效果

                c、配置环境变量

vi /etc/profile
export ZOOKEEPER_HOME=/usr/local/zookeeper3.4/
export PATH=$ZOOKEEPER_HOME/bin:$PATH
source /etc/profile

                d、修改zookeeper3.4的配置文件

cd /usr/local/zookeeper3.4/conf/
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
tickTime=2000 #ZooKeeper服务器心跳时间,单位为ms
initLimit=10 #允许follower连接并同步到leader的初始化连接时间,以tickTime的倍数来表示
syncLimit=5 #leader与follower心跳检测最大容忍时间,响应超过syncLimit*tickTime,leader认为follower"死掉",从服务器列表中删除follower
dataDir=/data/zookeeper/data #数据目录
dataLogDir=/data/zookeeper/data/log #日志目录
clientPort=2181 #ZooKeeper对外服务端口

                e、创建目录zookeeper3.4及存放服务器编号

mkdir -p /data/zookeeper/data/log
cd /data/zookeeper/data
touch myid
echo 0 > myid

                f、设置开机启动

vi /lib/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper service
After=network.target
[Service]
Type=forking
Environment=ZOOKEEPER_HOME=/usr/local/zookeeper3.4/
Environment=JAVA_HOME=/usr/local/jdk/1.8.0_391
User=root
Group=root
ExecStart=/usr/local/zookeeper3.4/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper3.4/bin/zkServer.sh stop
[Install]
WantedBy=multi-user.targetsystemctl enable zookeeper.service #加入开机启动
systemctl start zookeeper.service #启动
systemctl stop zookeeper.service #停止
systemctl status zookeeper.service #状态

                g、开放端口

firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports

        3、安装kafka及配置

       

                 a、下载kafka

wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz

                b、解压到/usr/local/

mkdir -p /usr/local/kafka2.12
tar -zxf  kafka_2.12-2.8.1.tgz -C /usr/local/kafka2.12 --strip-components=1 #--strip-components选项表示从目录级别上去除指定的前缀,以实现更加控制解压的效果

                c、修改配置文件

cd /usr/local/kafka2.12/config/
vi server.properties
broker.id=0
log.dir=/data/kafka/logs #配置zookeeper管理kafka的路径
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://:9092 #配置kafka的监听端口
advertised.listeners=PLAINTEXT://192.168.0.190:9092 #把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP

                d、创建kafka日志目录

mkdir -p /data/kafka/logs

                e、配置kafka快捷路径

vi /etc/profile
export KAFKA_HOME=/usr/local/kafka2.12
export PATH=$KAFKA_HOME/bin:$PATH
source /etc/profile

                f、设置开机自动服务

vi /lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
Requires=zookeeper.service
[Service]
Type=simple
User=root
Group=root
Environment=JAVA_HOME=/usr/local/jdk/1.8.0_391
Environment=KAFKA_HOME=/usr/local/kafka2.12
Environment=KAFKA_LOG_DIRS=/data/kafka/logs
ExecStart=/usr/local/kafka2.12/bin/kafka-server-start.sh /usr/local/kafka2.12/config/server.properties
ExecStop=/usr/local/kafka2.12/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.targetsystemctl enable kafka.service #加入开机启动
systemctl start kafka.service #启动
systemctl stop kafka.service #停止
systemctl status kafka.service #状态

                g、开放防火墙

firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports

三、安装openresty与lua的依赖

        1、下载lua-resty-kafka

wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip

        2、安装unzip依赖

yum install unzip -y

        3、解压lua-resty-kafka-master.zip

unzip lua-resty-kafka-master.zip

        4、拷贝kafka插件到openresty目录下

cp -rf /data/file/lua-resty-kafka-master/lib/resty/kafka/ /usr/local/openresty/lualib/resty/kafka

        5、重新授权

chown -R www:www /usr/local/openresty

        6、nginx配置

server {listen        80 default_server;server_name _;error_page   401 /401.html;error_page   403 /403.html;error_page   404 /404.html;error_page      500 502 503 504 /50x.html;location / {access_by_lua_file /data/wwwroot/items-access.lua;#这个就是lua操作kafka的文件,成功可以走成功的路线,失败可以走失败的路线return 401;}location = /401.html {access_by_lua_file /data/wwwroot/items-access.lua;#这个就是lua操作kafka的文件,成功可以走成功的路线,失败可以走失败的路线root "/data/wwwroot/error";}location = /403.html {access_by_lua_file /data/wwwroot/items-access.lua;#这个就是lua操作kafka的文件,成功可以走成功的路线,失败可以走失败的路线root "/data/wwwroot/error";}location = /404.html {access_by_lua_file /data/wwwroot/items-access.lua;#这个就是lua操作kafka的文件,成功可以走成功的路线,失败可以走失败的路线root "/data/wwwroot/error";}location = /50x.html {access_by_lua_file /data/wwwroot/items-access.lua;#这个就是lua操作kafka的文件,成功可以走成功的路线,失败可以走失败的路线root "/data/wwwroot/error";}
}

        7、创建kafka主体

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log_topic

        8、编写lua脚本及上传到指定位置

local cjson = require "cjson"
local producer = require "resty.kafka.producer"-- Kafka 配置
local broker_list = {{ host = "192.168.223.19", port = 9092 }
}--创建生产者
local pro = producer:new(broker_list,{ producer_type="async"})
--获取IP
local headers=ngx.req.get_headers()
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"
local uri = ngx.var.uri-- 定义消息内容
local log_data = {}
log_data["uri"] = uri;
log_data["request_uri"] = ngx.var.request_uri;
log_data["ip"] = ip;
log_data["host"] = ngx.var.host;
log_data["status"] = ngx.var.status;
log_data["bytes_sent"] = ngx.var.bytes_sent;
log_data["request_time"] = ngx.var.request_time;
log_data["http_referer"] = ngx.var.http_referer;
log_data["http_user_agent"] = ngx.var.http_user_agent;
log_data["timestamp"] = ngx.now() * 1000;-- 发送消息
local ok, err = pro:send("log_topic", nil, cjson.encode(log_data))
if not ok then  ngx.log(ngx.ERR, "kafka send err:", err)  return ngx.exit(401)
end

        9、创建一个消费者来进行接收消息,也是验证结果

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic log_topic --from-beginning

        10、访问系统就能有如下结果

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com