1、多个线程写文件,并且在达到文件大小限制时进行文件绕接,但是绕接文件不压缩
2、另起一个线程对文件内容进行采集,然后将内容统一归档到指定路径
写文件客户端
package collectFile
import (
"bufio"
"fmt"
"os"
"path/filepath"
"strconv"
"time"
)
func WriteClientRun() {
serviceNamePrefix := "svc-"
for i := 0; i < 10; i++ {
go writeFile(serviceNamePrefix + strconv.Itoa(i))
}
}
func writeFile(serviceName string) {
for {
doWriteFile(serviceName)
}
}
func doWriteFile(serviceName string) {
dirPath := filepath.Join("D:\\tmp\\alarm_log", serviceName, BaseParent)
_, dirStatErr := os.Stat(dirPath)
if os.IsNotExist(dirStatErr) {
dirCreateStatErr := os.MkdirAll(dirPath, 0777)
if dirCreateStatErr != nil {
fmt.Println("dirCreateStatErr ==> ", dirCreateStatErr.Error())
return
}
}
filePath := filepath.Join("D:\\tmp\\alarm_log", serviceName, BaseParent, serviceName+".wlog")
file, OpenFileErr := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if OpenFileErr != nil {
if OpenFileErr != nil {
fmt.Println("OpenFileErr ==> ", OpenFileErr.Error())
}
return
}
stat, fileStatErr := file.Stat()
if fileStatErr != nil {
fmt.Println("fileStatErr ==> ", fileStatErr.Error())
return
}
if stat.Size() > 1024 {
// 在绕接前必须关闭文件流避免无法绕接时句柄冲突
fileCloseErr := file.Close()
if fileCloseErr != nil {
fmt.Println("fileCloseErr when rolling ==> ", fileCloseErr.Error())
return
}
// 进行绕接,并且生成新的文件用于记录信息
formatTime := time.Now().Format("20060102150405")
rollingFilePath := filepath.Join("D:\\tmp\\alarm_log", serviceName, BaseParent, serviceName+"_"+formatTime+".wlog")
renameErr := os.Rename(filePath, rollingFilePath)
if renameErr != nil {
fmt.Println("renameErr when rolling ==> ", renameErr.Error())
return
}
file, OpenFileErr = os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666)
if OpenFileErr != nil {
fmt.Println("OpenFileErr when rolling ==> ", OpenFileErr.Error())
return
}
}
defer func() {
deferCloseErr := file.Close()
if deferCloseErr != nil {
fmt.Println("deferCloseErr ==> ", deferCloseErr.Error())
}
}()
writer := bufio.NewWriter(file)
_, writeStringErr := writer.WriteString("<<<" + serviceName + "-->" + time.Now().Format("15:04:05") + ">>>" + "\r\n")
if writeStringErr != nil {
fmt.Println("writeStringErr ==> ", writeStringErr.Error())
return
}
flushErr := writer.Flush()
if flushErr != nil {
fmt.Println("flushErr ==> ", flushErr.Error())
return
}
time.Sleep(time.Second.Round(10))
}
package collectFile
import "sync"
type CursorInfo struct {
// 已读位置记录
offset int64
// 被读取过的文件
source string
// 更新时间
updateTime int64
}
// key: 服务名 val: 对应的文件已读信息CursorInfo
var cursorInfoMap sync.Map
// 更新缓存
func updateCache(newCursorInfoMap map[string]CursorInfo) {
for serviceName, newCursorInfo := range newCursorInfoMap {
cursorInfoMap.Store(serviceName, newCursorInfo)
}
}
// 从缓存中获取结果
func getCursorInfo(serviceName string) CursorInfo {
value, ok := cursorInfoMap.Load(serviceName)
if !ok {
return CursorInfo{}
}
return value.(CursorInfo)
}
// TODO 启动协程进行缓存同步持久化
func syncCursorInfo() {
// 扫描磁盘存在日志的服务名称列表
dirs := scanTargetDir()
serviceNames := []string{}
for _, dir := range dirs {
_, serviceName := scanFilePath(dir)
serviceNames = append(serviceNames, serviceName)
}
// 不存在的服务则直接剔除出缓存
//cursorInfoMap.LoadAndDelete()
// 将缓存信息备份持久化的本地磁盘
}
package collectFile
import (
"bufio"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
)
const (
// BaseDir 文件扫描的基础路径
BaseDir = "D:\\tmp"
// BaseParent 父目录名称
BaseParent = "warning"
// SuffixName 文件后缀
SuffixName = "*.wlog"
// 每条记录的最大值
maxLen = 40 * 1024
// 待读取内容前缀
prefix = "<<<"
// 待读取内容后缀
suffix = ">>>"
)
var (
rollingFileCompile, _ = regexp.Compile("^([\\s\\S]+)_[0-9]{14}.wlog$")
fileCompile, _ = regexp.Compile("^([\\s\\S]+).wlog$")
)
func scanTargetDir() (dirs []string) {
err := filepath.WalkDir(BaseDir, func(path string, d fs.DirEntry, err error) error {
if !d.IsDir() {
return nil
}
_, fileName := filepath.Split(path)
if fileName == BaseParent {
dirs = append(dirs, path)
}
return nil
})
if err != nil {
fmt.Println("scanTargetDir err ==> ", err.Error())
return []string{}
}
return
}
func scanFilePath(dir string) (filePaths []string, serviceName string) {
filePaths, err := filepath.Glob(filepath.Join(dir, string(filepath.Separator), SuffixName))
if err != nil || len(filePaths) == 0 {
fmt.Println("scanFilePath ==> ", err.Error())
return []string{}, ""
}
sort.Slice(filePaths, func(i, j int) bool {
return pathCompare(filePaths[i], filePaths[j]) < 0
})
// 截取服务名
_, fileName := filepath.Split(filePaths[0])
if rollingFileCompile.MatchString(fileName) {
serviceName = rollingFileCompile.FindStringSubmatch(fileName)[1]
return
}
if fileCompile.MatchString(fileName) {
serviceName = fileCompile.FindStringSubmatch(fileName)[1]
return
}
return
}
func isRollingFile(filePath string) bool {
_, fileName := filepath.Split(filePath)
return rollingFileCompile.MatchString(fileName)
}
// 通过文件路径名称比较文件新旧
// firstPath比secPath新,则返回-1
// firstPath比secPath旧,则返回1
// firstPath和secPath相同,则返回0
// 文件新旧关系与文件名强相关
// 例如:aad\warning\aad_20231130114550.wlog > aad\warning\aad_20231131124550.wlog > aad\warning\aad.wlog
func pathCompare(firstPath string, secPath string) int {
firstPathLen := len(firstPath)
secPathLen := len(secPath)
if firstPathLen == 0 || secPathLen == 0 || firstPathLen == secPathLen {
return strings.Compare(firstPath, secPath)
}
if firstPathLen < secPathLen {
return 1
}
if firstPathLen > secPathLen {
return -1
}
return 0
}
// 读取文件内容
func readFileInfo(filePath string, startOffSet int64) (fileInfos []string, newOffset int64) {
file, fileOpenErr := os.Open(filePath)
if fileOpenErr != nil {
fmt.Println("readFileInfo fileOpenErr ===> ", fileOpenErr.Error())
return
}
defer file.Close()
_, seekErr := file.Seek(startOffSet, io.SeekCurrent)
if seekErr != nil {
fmt.Println("readFileInfo seekErr ===> ", seekErr.Error())
return
}
// 初始化定位
newOffset = startOffSet
scanner := bufio.NewScanner(file)
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
var newFileInfos []string
advance, newFileInfos = parseFileInfo(data)
fileInfos = append(fileInfos, newFileInfos...)
newOffset += int64(advance)
return
})
for scanner.Scan() {
}
return
}
func parseFileInfo(data []byte) (offset int, fileInfos []string) {
// 临时缓存'<<<'和'>>>' 之间的数据
var dataBuf []byte
// 开始记录数据的标识
var startFlag bool
for index, c := range data {
if startFlag && c == '>' {
// 如果前两位都是 '>', 并且startFlag == true 则说明读取完成
dataBuf = append(dataBuf, c)
if index > 1 &&
data[index-1] == '>' && data[index-2] == '>' {
dataBuf = dataBuf[:len(dataBuf)-3]
if len(dataBuf) != 0 {
fileInfos = append(fileInfos, string(dataBuf))
}
startFlag = false
dataBuf = []byte{}
}
continue
}
if startFlag {
dataBuf = append(dataBuf, c)
continue
}
if !startFlag && c == '<' {
// 如果前两位都是 '<', 则将标识位startFlag置为true,后续则开始记录数据
if index > 1 &&
data[index-1] == '<' && data[index-2] == '<' {
startFlag = true
}
}
}
// 如果不存在dataBuf,则说明data的数据都符合格式,需要直接删除
if len(dataBuf) != 0 {
offset = len(data) - (len(dataBuf) + 3)
} else {
offset = len(data)
}
return
}
package collectFile
import (
"bufio"
"fmt"
"os"
"time"
)
func ScanFile() {
// 扫描目录
for {
dirs := scanTargetDir()
if len(dirs) == 0 {
time.Sleep(time.Second * 5)
continue
}
collectFileInfo(dirs)
}
}
func collectFileInfo(dirs []string) {
newCursorInfoCache := make(map[string]CursorInfo)
var fileInfos []string
for _, dir := range dirs {
// 获取当前目录下的所有告警日志文件名称,包含绕接文件和非绕接文件
filePaths, serviceName := scanFilePath(dir)
filePathsLen := len(filePaths)
if filePathsLen == 0 {
fmt.Println("collectFileInfo filePaths is empty ==> ", filePaths)
continue
}
if len(filePaths) > 2 {
filePaths = filePaths[filePathsLen-3:]
}
cursorInfo := getCursorInfo(serviceName)
for _, filePath := range filePaths {
unixMilli := time.Now().UnixMilli()
fileStat, fileStatErr := os.Stat(filePath)
if fileStatErr != nil {
fmt.Println("collectFileInfo fileStateError ==> ", fileStatErr.Error())
break
}
if fileStat.ModTime().UnixMilli() < cursorInfo.updateTime {
// 说明已经被读取过,则直接跳过
continue
}
if cursorInfo.offset >= fileStat.Size() && cursorInfo.source == filePath {
cursorInfo.offset = 0
}
newFileInfos, newOffSet := readFileInfo(filePath, cursorInfo.offset)
// 缓存游标信息
newCursorInfo := CursorInfo{newOffSet, filePath, unixMilli}
fileInfos = append(fileInfos, newFileInfos...)
newCursorInfoCache[serviceName] = newCursorInfo
// 重置游标
cursorInfo.offset = newOffSet
if len(fileInfos) >= 100 {
// 将扫描内容归档到新的文件中,此处用于模拟采集的数据归档操作
recordInfo(fileInfos)
// 归档完成之后,清理临时缓存
fileInfos = []string{}
// 刷新游标缓存信息,并且更新缓存
updateCache(newCursorInfoCache)
newCursorInfoCache = make(map[string]CursorInfo)
// 等待5s再进行后续扫描
time.Sleep(time.Second * 5)
// 此时最好跳过当前服务的文件扫描,避免又有绕接文件,同时避免后面的服务饥饿的问题
break
}
}
}
if len(fileInfos) > 0 {
// 将扫描内容归档到新的文件中,此处用于模拟采集的数据归档
recordInfo(fileInfos)
// 刷新游标缓存信息
updateCache(newCursorInfoCache)
}
// 等待5s再进行后续扫描
time.Sleep(time.Second * 5)
}
func recordInfo(infos []string) {
file, OpenFileErr := os.OpenFile("D:\\tmp\\alarm_log\\all.txt", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if OpenFileErr != nil {
fmt.Println("recordInfo OpenFileErr ==> ", OpenFileErr.Error())
return
}
defer file.Close()
writer := bufio.NewWriter(file)
for _, info := range infos {
writer.WriteString(info + "\r\n")
}
writer.Flush()
}
package collectFile
import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
)
func CheckFileInfo() {
allMap := getInfo("D:\\tmp\\alarm_log\\all.txt")
for i := 0; i < 10; i++ {
join := filepath.Join("D:\\tmp\\alarm_log\\svc-"+strconv.Itoa(i), "warning", "*.wlog")
matchePaths, _ := filepath.Glob(join)
for _, path := range matchePaths {
infoMap := getInfo(path)
// 校验allMap中是否都存在infoMap的内容
for key, _ := range infoMap {
key = key[3 : len(key)-3]
if allMap[key] != key {
fmt.Println("allMap not exist : ", key)
}
}
}
}
}
func getInfo(path string) map[string]string {
allMap := make(map[string]string)
file, openFileErr := os.Open(path)
if openFileErr != nil {
return allMap
}
defer file.Close()
reader := bufio.NewReader(file)
for {
line, _, err := reader.ReadLine()
if err == io.EOF {
break
}
allMap[string(line)] = string(line)
}
return allMap
}
collectFile.WriteClientRun() // 写文件
collectFile.ScanFile() // 采集文件
// collectFile.CheckFileInfo() 进行文件内容校验