Golang / 编程开发

使用Go写一个LogAgent

温馨提示:本文最后更新于2022-05-31 09:37:10,某些文章具有时效性,若有错误或已失效,请在下方留言或提交工单提交工单
浅时光 · 5月23日 · 2022年 · 本文2063个字,预计阅读6分钟 3380次已读

一、工作流程


文章来源(Source):https://www.dqzboy.com
  1. 读日志:使用tail库实现
  2. 原文链接:https://www.dqzboy.com日志:通过sarama库往Kafka中写日志
  3. Logst原文链接:https://www.dqzboy.comash接入Kafka并推送给ES集群
  4. 通过Kibana进行日志展示

二、项目结构


Kafka

package kafka

import (
	"fmt"
	"github.com/Shopify/sarama"
)

// 专门往kafka写日志的模块
var (
	client sarama.SyncProducer //声明一个全局的连接kafka的生产者

)

// Init 初始化
func Init(addrs []string) (err error) {
	config := sarama.NewConfig()
	//tailf 包使用
	config.Producer.RequiredAcks = sarama.WaitForAll        //WaitForAll等待所有节点响应
	config.Producer.Partitioner = sarama.NewHashPartitioner //新选出一个partition
	config.Producer.Return.Successes = true                 //成功发送的消息将在succcess channel返回

	//连接kafka
	client, err = sarama.NewSyncProducer(addrs, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	return
}

func SendToKafka(topic, data string) {
	//构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = topic                      //topic名称
	msg.Value = sarama.StringEncoder(data) //使用sarama.StringEncoder序列化

	//发送到kafka
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("sed msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

taillog

package taillog

import (
	"fmt"
	"github.com/hpcloud/tail"
)

var (
	tailObj *tail.Tail
	LogChan chan string
)

// Init 专门从日志文件收集日志的模块
func Init(fileName string) (err error) {
	config := tail.Config{
		ReOpen:    true,                                 //重新打开,日志切割
		Follow:    true,                                 //是否跟随上次关闭的文件
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
		MustExist: false,                                //文件不存在不报错
		Poll:      true,
	}
	tailObj, err = tail.TailFile(fileName, config)
	if err != nil {
		fmt.Println("tail file failed, err:", err)
		return
	}
	return
}

func ReadChan() <-chan *tail.Line {
	return tailObj.Lines
}

入口

package main

import (
	"fmt"
	"logagent/kafka"
	"logagent/taillog"
	"time"
)

func run() {
	//1、读取日志
	for {
		select {
		case line := <-taillog.ReadChan():
			//2、发送kafka
			kafka.SendToKafka("web_log", line.Text)
		default:
			time.Sleep(time.Second)
		}
	}
}

//logAgent入口
func main() {
	//1、初始化kafka连接
	err := kafka.Init([]string{"192.168.66.10:9092"})
	if err != nil {
		fmt.Printf("Kafka initialization failed, err:%v\n", err)
		return
	}
	fmt.Println("Kafka initialization succeeded")
	//2、收集日志
	err = taillog.Init("./my.log")
	if err != nil {
		fmt.Printf("taillog initialization failed,err:%v\n", err)
		return
	}
	fmt.Println("tailLog initialization succeeded")
	run()
}

三、项目测试


go mod logagent
go mod download
go build

# 运行程序
$ ./logagent.exe
使用Go写一个LogAgent-浅时光博客



关注本站官方微信公众号『精彩程序人生』

扫描左侧二维码关注我们的微信公众帐号,在微信公众帐号中回复【资料】即可获取各类互联网技术学习资料。
关注博主不迷路~


本文作者:浅时光
原文链接:https://www.dqzboy.com/10345.html
版权声明:知识共享署名-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)协议进行许可
转载时请以超链接形式标明文章原始出处和作者信息
免责声明:本站发布的内容(图片、视频和文字)以及一切破解补丁、注册激活和软件解密分析文章仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。访问和下载本站内容,说明您已同意上述条款!


1 条回应

必须 注册 为本站用户, 登录 后才可以发表评论!

  1. 流年山西·长治2022-5-31 · 14:16

    文档写的非常详细,非常好的分享

本站已稳定运行: | 耗时 0.283 秒 | 查询 13 次 | 内存 12.38 MB