Golang / 编程开发

使用Go写一个LogAgent

浅时光博客 · 5月23日 · 2022年 · 3.2w 次已读

一、工作流程

原文链接:https://dqzboy.com
  1. 读日志:使用tail库实现
  2. 写日志:通过sarama库往Kafka中写日志
  3. Logstash接入Kafka并推送给ES集群
  4. 通过Kibana进行日志展示

二、项目结构


Kafka

package kafka

import (
	"fmt"
	"github.com/Shopify/sarama"
)

// 专门往kafka写日志的模块
var (
	client sarama.SyncProducer //声明一个全局的连接kafka的生产者

)

// Init 初始化
func Init(addrs []string) (err error) {
	config := sarama.NewConfig()
	//tailf 包使用
	config.Producer.RequiredAcks = sarama.WaitForAll        //WaitForAll等待所有节点响应
	config.Producer.Partitioner = sarama.NewHashPartitioner //新选出一个partition
	config.Producer.Return.Successes = true                 //成功发送的消息将在succcess channel返回

	//连接kafka
	client, err = sarama.NewSyncProducer(addrs, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	return
}

func SendToKafka(topic, data string) {
	//构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = topic                      //topic名称
	msg.Value = sarama.StringEncoder(data) //使用sarama.StringEncoder序列化

	//发送到kafka
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("sed msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

taillog

package taillog

import (
	"fmt"
	"github.com/hpcloud/tail"
)

var (
	tailObj *tail.Tail
	LogChan chan string
)

// Init 专门从日志文件收集日志的模块
func Init(fileName string) (err error) {
	config := tail.Config{
		ReOpen:    true,                                 //重新打开,日志切割
		Follow:    true,                                 //是否跟随上次关闭的文件
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
		MustExist: false,                                //文件不存在不报错
		Poll:      true,
	}
	tailObj, err = tail.TailFile(fileName, config)
	if err != nil {
		fmt.Println("tail file failed, err:", err)
		return
	}
	return
}

func ReadChan() <-chan *tail.Line {
	return tailObj.Lines
}

入口

package main

import (
	"fmt"
	"logagent/kafka"
	"logagent/taillog"
	"time"
)

func run() {
	//1、读取日志
	for {
		select {
		case line := <-taillog.ReadChan():
			//2、发送kafka
			kafka.SendToKafka("web_log", line.Text)
		default:
			time.Sleep(time.Second)
		}
	}
}

//logAgent入口
func main() {
	//1、初始化kafka连接
	err := kafka.Init([]string{"192.168.66.10:9092"})
	if err != nil {
		fmt.Printf("Kafka initialization failed, err:%v\n", err)
		return
	}
	fmt.Println("Kafka initialization succeeded")
	//2、收集日志
	err = taillog.Init("./my.log")
	if err != nil {
		fmt.Printf("taillog initialization failed,err:%v\n", err)
		return
	}
	fmt.Println("tailLog initialization succeeded")
	run()
}

三、项目测试


go mod logagent
go mod download
go build

# 运行程序
$ ./logagent.exe

本文作者:浅时光博客
原文链接:https://www.dqzboy.com/10345.html
版权声明:知识共享署名-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)协议进行许可,转载时请以>超链接形式标明文章原始出处和作者信息
免责声明:本站内容仅供个人学习与研究,严禁用于商业或非法目的。请在下载后24小时内删除相应内容。继续浏览或下载即表明您接受上述条件,任何后果由用户自行承担。

1 条回应

必须 注册 为本站用户, 登录 后才可以发表评论!

  1. 流年2022-5-31 · 14:16

    文档写的非常详细,非常好的分享