一、工作流程
原文链接:https://dqzboy.com
- 读日志:使用
tail库实现 - 写日志:通过
sarama库往Kafka中写日志 - Logstash接入Kafka并推送给ES集群
- 通过Kibana进行日志展示
二、项目结构
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
// 专门往kafka写日志的模块
var (
client sarama.SyncProducer //声明一个全局的连接kafka的生产者
)
// Init 初始化
func Init(addrs []string) (err error) {
config := sarama.NewConfig()
//tailf 包使用
config.Producer.RequiredAcks = sarama.WaitForAll //WaitForAll等待所有节点响应
config.Producer.Partitioner = sarama.NewHashPartitioner //新选出一个partition
config.Producer.Return.Successes = true //成功发送的消息将在succcess channel返回
//连接kafka
client, err = sarama.NewSyncProducer(addrs, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
return
}
func SendToKafka(topic, data string) {
//构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = topic //topic名称
msg.Value = sarama.StringEncoder(data) //使用sarama.StringEncoder序列化
//发送到kafka
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("sed msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
package taillog
import (
"fmt"
"github.com/hpcloud/tail"
)
var (
tailObj *tail.Tail
LogChan chan string
)
// Init 专门从日志文件收集日志的模块
func Init(fileName string) (err error) {
config := tail.Config{
ReOpen: true, //重新打开,日志切割
Follow: true, //是否跟随上次关闭的文件
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
MustExist: false, //文件不存在不报错
Poll: true,
}
tailObj, err = tail.TailFile(fileName, config)
if err != nil {
fmt.Println("tail file failed, err:", err)
return
}
return
}
func ReadChan() <-chan *tail.Line {
return tailObj.Lines
}
package main
import (
"fmt"
"logagent/kafka"
"logagent/taillog"
"time"
)
func run() {
//1、读取日志
for {
select {
case line := <-taillog.ReadChan():
//2、发送kafka
kafka.SendToKafka("web_log", line.Text)
default:
time.Sleep(time.Second)
}
}
}
//logAgent入口
func main() {
//1、初始化kafka连接
err := kafka.Init([]string{"192.168.66.10:9092"})
if err != nil {
fmt.Printf("Kafka initialization failed, err:%v\n", err)
return
}
fmt.Println("Kafka initialization succeeded")
//2、收集日志
err = taillog.Init("./my.log")
if err != nil {
fmt.Printf("taillog initialization failed,err:%v\n", err)
return
}
fmt.Println("tailLog initialization succeeded")
run()
}
三、项目测试
go mod logagent
go mod download
go build
# 运行程序
$ ./logagent.exe
本文作者:浅时光博客
原文链接:https://www.dqzboy.com/10345.html
版权声明:知识共享署名-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)协议进行许可,转载时请以>超链接形式标明文章原始出处和作者信息
免责声明:本站内容仅供个人学习与研究,严禁用于商业或非法目的。请在下载后24小时内删除相应内容。继续浏览或下载即表明您接受上述条件,任何后果由用户自行承担。
文档写的非常详细,非常好的分享