文件内容采集-go

发布时间:2024年01月01日

需求背景

1、多个线程写文件,并且在达到文件大小限制时进行文件绕接,但是绕接文件不压缩

2、另起一个线程对文件内容进行采集,然后将内容统一归档到指定路径

写文件客户端

package collectFile

import (
	"bufio"
	"fmt"
	"os"
	"path/filepath"
	"strconv"
	"time"
)

func WriteClientRun() {
	serviceNamePrefix := "svc-"
	for i := 0; i < 10; i++ {
		go writeFile(serviceNamePrefix + strconv.Itoa(i))
	}
}

func writeFile(serviceName string) {
	for {
		doWriteFile(serviceName)
	}
}

func doWriteFile(serviceName string) {
	dirPath := filepath.Join("D:\\tmp\\alarm_log", serviceName, BaseParent)
	_, dirStatErr := os.Stat(dirPath)
	if os.IsNotExist(dirStatErr) {
		dirCreateStatErr := os.MkdirAll(dirPath, 0777)
		if dirCreateStatErr != nil {
			fmt.Println("dirCreateStatErr ==> ", dirCreateStatErr.Error())
			return
		}
	}
	filePath := filepath.Join("D:\\tmp\\alarm_log", serviceName, BaseParent, serviceName+".wlog")
	file, OpenFileErr := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
	if OpenFileErr != nil {
		if OpenFileErr != nil {
			fmt.Println("OpenFileErr ==> ", OpenFileErr.Error())
		}
		return
	}
	stat, fileStatErr := file.Stat()
	if fileStatErr != nil {
		fmt.Println("fileStatErr ==> ", fileStatErr.Error())
		return
	}
	if stat.Size() > 1024 {
		// 在绕接前必须关闭文件流避免无法绕接时句柄冲突
		fileCloseErr := file.Close()
		if fileCloseErr != nil {
			fmt.Println("fileCloseErr when rolling ==> ", fileCloseErr.Error())
			return
		}

		// 进行绕接,并且生成新的文件用于记录信息
		formatTime := time.Now().Format("20060102150405")
		rollingFilePath := filepath.Join("D:\\tmp\\alarm_log", serviceName, BaseParent, serviceName+"_"+formatTime+".wlog")
		renameErr := os.Rename(filePath, rollingFilePath)
		if renameErr != nil {
			fmt.Println("renameErr when rolling ==> ", renameErr.Error())
			return
		}
		file, OpenFileErr = os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666)
		if OpenFileErr != nil {
			fmt.Println("OpenFileErr when rolling ==> ", OpenFileErr.Error())
			return
		}
	}
	defer func() {
		deferCloseErr := file.Close()
		if deferCloseErr != nil {
			fmt.Println("deferCloseErr ==> ", deferCloseErr.Error())
		}
	}()
	writer := bufio.NewWriter(file)
	_, writeStringErr := writer.WriteString("<<<" + serviceName + "-->" + time.Now().Format("15:04:05") + ">>>" + "\r\n")
	if writeStringErr != nil {
		fmt.Println("writeStringErr ==> ", writeStringErr.Error())
		return
	}

	flushErr := writer.Flush()
	if flushErr != nil {
		fmt.Println("flushErr ==> ", flushErr.Error())
		return
	}

	time.Sleep(time.Second.Round(10))
}

采集文件内容

采集时的缓存记录

package collectFile

import "sync"

type CursorInfo struct {
	// 已读位置记录
	offset int64
	// 被读取过的文件
	source string
	// 更新时间
	updateTime int64
}

// key: 服务名 val: 对应的文件已读信息CursorInfo
var cursorInfoMap sync.Map

// 更新缓存
func updateCache(newCursorInfoMap map[string]CursorInfo) {
	for serviceName, newCursorInfo := range newCursorInfoMap {
		cursorInfoMap.Store(serviceName, newCursorInfo)
	}
}

// 从缓存中获取结果
func getCursorInfo(serviceName string) CursorInfo {
	value, ok := cursorInfoMap.Load(serviceName)
	if !ok {
		return CursorInfo{}
	}
	return value.(CursorInfo)
}

// TODO 启动协程进行缓存同步持久化
func syncCursorInfo() {
	// 扫描磁盘存在日志的服务名称列表
	dirs := scanTargetDir()
	serviceNames := []string{}
	for _, dir := range dirs {
		_, serviceName := scanFilePath(dir)
		serviceNames = append(serviceNames, serviceName)
	}
	// 不存在的服务则直接剔除出缓存
	//cursorInfoMap.LoadAndDelete()

	// 将缓存信息备份持久化的本地磁盘

}

工具类

package collectFile

import (
	"bufio"
	"fmt"
	"io"
	"io/fs"
	"os"
	"path/filepath"
	"regexp"
	"sort"
	"strings"
)

const (
	// BaseDir 文件扫描的基础路径
	BaseDir = "D:\\tmp"
	// BaseParent 父目录名称
	BaseParent = "warning"
	// SuffixName 文件后缀
	SuffixName = "*.wlog"
	// 每条记录的最大值
	maxLen = 40 * 1024
	// 待读取内容前缀
	prefix = "<<<"
	// 待读取内容后缀
	suffix = ">>>"
)

var (
	rollingFileCompile, _ = regexp.Compile("^([\\s\\S]+)_[0-9]{14}.wlog$")
	fileCompile, _        = regexp.Compile("^([\\s\\S]+).wlog$")
)

func scanTargetDir() (dirs []string) {
	err := filepath.WalkDir(BaseDir, func(path string, d fs.DirEntry, err error) error {
		if !d.IsDir() {
			return nil
		}
		_, fileName := filepath.Split(path)
		if fileName == BaseParent {
			dirs = append(dirs, path)
		}
		return nil
	})
	if err != nil {
		fmt.Println("scanTargetDir err ==> ", err.Error())
		return []string{}
	}
	return
}

func scanFilePath(dir string) (filePaths []string, serviceName string) {
	filePaths, err := filepath.Glob(filepath.Join(dir, string(filepath.Separator), SuffixName))
	if err != nil || len(filePaths) == 0 {
		fmt.Println("scanFilePath ==> ", err.Error())
		return []string{}, ""
	}
	sort.Slice(filePaths, func(i, j int) bool {
		return pathCompare(filePaths[i], filePaths[j]) < 0
	})
	// 截取服务名
	_, fileName := filepath.Split(filePaths[0])
	if rollingFileCompile.MatchString(fileName) {
		serviceName = rollingFileCompile.FindStringSubmatch(fileName)[1]
		return
	}
	if fileCompile.MatchString(fileName) {
		serviceName = fileCompile.FindStringSubmatch(fileName)[1]
		return
	}
	return
}

func isRollingFile(filePath string) bool {
	_, fileName := filepath.Split(filePath)
	return rollingFileCompile.MatchString(fileName)
}

// 通过文件路径名称比较文件新旧
// firstPath比secPath新,则返回-1
// firstPath比secPath旧,则返回1
// firstPath和secPath相同,则返回0
// 文件新旧关系与文件名强相关
// 例如:aad\warning\aad_20231130114550.wlog > aad\warning\aad_20231131124550.wlog > aad\warning\aad.wlog
func pathCompare(firstPath string, secPath string) int {
	firstPathLen := len(firstPath)
	secPathLen := len(secPath)
	if firstPathLen == 0 || secPathLen == 0 || firstPathLen == secPathLen {
		return strings.Compare(firstPath, secPath)
	}
	if firstPathLen < secPathLen {
		return 1
	}
	if firstPathLen > secPathLen {
		return -1
	}
	return 0
}

// 读取文件内容
func readFileInfo(filePath string, startOffSet int64) (fileInfos []string, newOffset int64) {
	file, fileOpenErr := os.Open(filePath)
	if fileOpenErr != nil {
		fmt.Println("readFileInfo fileOpenErr ===> ", fileOpenErr.Error())
		return
	}
	defer file.Close()
	_, seekErr := file.Seek(startOffSet, io.SeekCurrent)
	if seekErr != nil {
		fmt.Println("readFileInfo seekErr ===> ", seekErr.Error())
		return
	}
	// 初始化定位
	newOffset = startOffSet
	scanner := bufio.NewScanner(file)
	scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
		var newFileInfos []string
		advance, newFileInfos = parseFileInfo(data)
		fileInfos = append(fileInfos, newFileInfos...)
		newOffset += int64(advance)
		return
	})

	for scanner.Scan() {
	}

	return
}

func parseFileInfo(data []byte) (offset int, fileInfos []string) {
	// 临时缓存'<<<'和'>>>' 之间的数据
	var dataBuf []byte
	// 开始记录数据的标识
	var startFlag bool
	for index, c := range data {
		if startFlag && c == '>' {
			// 如果前两位都是 '>', 并且startFlag == true 则说明读取完成
			dataBuf = append(dataBuf, c)
			if index > 1 &&
				data[index-1] == '>' && data[index-2] == '>' {
				dataBuf = dataBuf[:len(dataBuf)-3]
				if len(dataBuf) != 0 {
					fileInfos = append(fileInfos, string(dataBuf))
				}
				startFlag = false
				dataBuf = []byte{}
			}
			continue
		}
		if startFlag {
			dataBuf = append(dataBuf, c)
			continue
		}
		if !startFlag && c == '<' {
			// 如果前两位都是 '<', 则将标识位startFlag置为true,后续则开始记录数据
			if index > 1 &&
				data[index-1] == '<' && data[index-2] == '<' {
				startFlag = true
			}
		}
	}
	// 如果不存在dataBuf,则说明data的数据都符合格式,需要直接删除
	if len(dataBuf) != 0 {
		offset = len(data) - (len(dataBuf) + 3)
	} else {
		offset = len(data)
	}
	return
}

采集文件

package collectFile

import (
	"bufio"
	"fmt"
	"os"
	"time"
)

func ScanFile() {
	// 扫描目录
	for {
		dirs := scanTargetDir()
		if len(dirs) == 0 {
			time.Sleep(time.Second * 5)
			continue
		}
		collectFileInfo(dirs)
	}
}

func collectFileInfo(dirs []string) {
	newCursorInfoCache := make(map[string]CursorInfo)
	var fileInfos []string
	for _, dir := range dirs {
		// 获取当前目录下的所有告警日志文件名称,包含绕接文件和非绕接文件
		filePaths, serviceName := scanFilePath(dir)
		filePathsLen := len(filePaths)
		if filePathsLen == 0 {
			fmt.Println("collectFileInfo filePaths is empty ==> ", filePaths)
			continue
		}
		if len(filePaths) > 2 {
			filePaths = filePaths[filePathsLen-3:]
		}
		cursorInfo := getCursorInfo(serviceName)
		for _, filePath := range filePaths {
			unixMilli := time.Now().UnixMilli()
			fileStat, fileStatErr := os.Stat(filePath)
			if fileStatErr != nil {
				fmt.Println("collectFileInfo fileStateError ==> ", fileStatErr.Error())
				break
			}
			if fileStat.ModTime().UnixMilli() < cursorInfo.updateTime {
				// 说明已经被读取过,则直接跳过
				continue
			}
			if cursorInfo.offset >= fileStat.Size() && cursorInfo.source == filePath {
				cursorInfo.offset = 0
			}
			newFileInfos, newOffSet := readFileInfo(filePath, cursorInfo.offset)
			// 缓存游标信息
			newCursorInfo := CursorInfo{newOffSet, filePath, unixMilli}
			fileInfos = append(fileInfos, newFileInfos...)
			newCursorInfoCache[serviceName] = newCursorInfo
			// 重置游标
			cursorInfo.offset = newOffSet
			if len(fileInfos) >= 100 {
				// 将扫描内容归档到新的文件中,此处用于模拟采集的数据归档操作
				recordInfo(fileInfos)
				// 归档完成之后,清理临时缓存
				fileInfos = []string{}
				// 刷新游标缓存信息,并且更新缓存
				updateCache(newCursorInfoCache)
				newCursorInfoCache = make(map[string]CursorInfo)
				// 等待5s再进行后续扫描
				time.Sleep(time.Second * 5)
				// 此时最好跳过当前服务的文件扫描,避免又有绕接文件,同时避免后面的服务饥饿的问题
				break
			}
		}
	}
	if len(fileInfos) > 0 {
		// 将扫描内容归档到新的文件中,此处用于模拟采集的数据归档
		recordInfo(fileInfos)
		// 刷新游标缓存信息
		updateCache(newCursorInfoCache)
	}
	// 等待5s再进行后续扫描
	time.Sleep(time.Second * 5)
}

func recordInfo(infos []string) {
	file, OpenFileErr := os.OpenFile("D:\\tmp\\alarm_log\\all.txt", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
	if OpenFileErr != nil {
		fmt.Println("recordInfo OpenFileErr ==> ", OpenFileErr.Error())
		return
	}
	defer file.Close()
	writer := bufio.NewWriter(file)
	for _, info := range infos {
		writer.WriteString(info + "\r\n")
	}
	writer.Flush()
}

文件内容采集结果正确性校验

package collectFile

import (
	"bufio"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strconv"
)

func CheckFileInfo() {
	allMap := getInfo("D:\\tmp\\alarm_log\\all.txt")
	for i := 0; i < 10; i++ {
		join := filepath.Join("D:\\tmp\\alarm_log\\svc-"+strconv.Itoa(i), "warning", "*.wlog")
		matchePaths, _ := filepath.Glob(join)
		for _, path := range matchePaths {
			infoMap := getInfo(path)
			// 校验allMap中是否都存在infoMap的内容
			for key, _ := range infoMap {
				key = key[3 : len(key)-3]
				if allMap[key] != key {
					fmt.Println("allMap not exist : ", key)
				}
			}
		}
	}

}

func getInfo(path string) map[string]string {
	allMap := make(map[string]string)
	file, openFileErr := os.Open(path)
	if openFileErr != nil {
		return allMap
	}

	defer file.Close()
	reader := bufio.NewReader(file)
	for {
		line, _, err := reader.ReadLine()
		if err == io.EOF {
			break
		}
		allMap[string(line)] = string(line)
	}
	return allMap
}

调用代码

	collectFile.WriteClientRun() // 写文件
	collectFile.ScanFile() // 采集文件

// 	collectFile.CheckFileInfo() 进行文件内容校验

文章来源:https://blog.csdn.net/weixin_43317111/article/details/135326516
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。