...
1 package reaper
2
3 import (
4 "path/filepath"
5 "strings"
6 "time"
7
8 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
9 )
10
11 func (lp *LogParser) parseLogsRemote() error {
12 var latestLogFile string
13 var linesRead int
14 var firstRun = true
15 var currInterval time.Duration
16 var size uint64
17 var offset uint64
18 var modification time.Time
19 var chunk string
20 var lines []string
21 var numOfLines int
22
23 logger := log.GetLogger(lp.ctx)
24
25 for {
26 select {
27 case <-lp.ctx.Done():
28 return nil
29 case <-time.After(currInterval):
30 if currInterval == 0 {
31 currInterval = time.Second * time.Duration(lp.Interval)
32 }
33 }
34
35 if latestLogFile == "" || firstRun {
36 sql := "select name, size, modification from pg_ls_logdir() where name like '%csv' order by modification desc limit 1;"
37 err := lp.SourceConn.Conn.QueryRow(lp.ctx, sql).Scan(&latestLogFile, &size, &modification)
38 if err != nil {
39 logger.Infof("No logfiles found in log dir: '%s'", lp.LogFolder)
40 continue
41 }
42 offset = size
43 firstRun = false
44 logger.Infof("Starting to parse logfile: '%s'", latestLogFile)
45 }
46
47 if linesRead == numOfLines && size != offset {
48 logFilePath := filepath.Join(lp.LogFolder, latestLogFile)
49 sizeToRead := min(maxChunkSize, size-offset)
50 err := lp.SourceConn.Conn.QueryRow(lp.ctx, "select pg_read_file($1, $2, $3)", logFilePath, offset, sizeToRead).Scan(&chunk)
51 offset += sizeToRead
52 if err != nil {
53 logger.Warningf("Failed to read logfile '%s': %s", latestLogFile, err)
54 continue
55 }
56 lines = strings.Split(chunk, "\n")
57 if sizeToRead == maxChunkSize {
58
59 offset -= uint64(len(lines[len(lines)-1]))
60 }
61 numOfLines = len(lines)
62 linesRead = 0
63 logger.WithField("lines", len(lines)).Info("logs fetched")
64 }
65
66 for {
67 if linesRead == numOfLines {
68 select {
69 case <-lp.ctx.Done():
70 return nil
71 case <-time.After(currInterval):
72 }
73
74 var latestSize uint64
75 err := lp.SourceConn.Conn.QueryRow(lp.ctx, "select size, modification from pg_ls_logdir() where name = $1;", latestLogFile).Scan(&latestSize, &modification)
76 if err != nil {
77 logger.Warnf("Failed to read state info of logfile: '%s'", latestLogFile)
78 }
79
80 var fileName string
81 if size == latestSize && offset == size || err != nil {
82 sql := "select name, size from pg_ls_logdir() where modification > $1 and name like '%csv' order by modification, name limit 1;"
83 err := lp.SourceConn.Conn.QueryRow(lp.ctx, sql, modification).Scan(&fileName, &latestSize)
84 if err == nil && latestLogFile != fileName {
85 if lp.LogTruncOnRotation == "off" {
86 lp.fileOffsets[latestLogFile] = size
87 if len(lp.fileOffsets) > maxTrackedFiles {
88 clear(lp.fileOffsets)
89 }
90 }
91 latestLogFile = fileName
92 size = latestSize
93 if lastOffset, ok := lp.fileOffsets[latestLogFile]; ok && lp.LogTruncOnRotation == "off" {
94 offset = lastOffset
95 } else {
96 offset = 0
97 }
98 logger.Infof("Switching to new logfile: '%s'", fileName)
99 currInterval = 0
100 break
101 }
102 } else {
103 size = latestSize
104 currInterval = 0
105 break
106 }
107 }
108
109 if linesRead < numOfLines {
110 line := lines[linesRead]
111 linesRead++
112
113 matches := lp.LogsMatchRegex.FindStringSubmatch(line)
114 if len(matches) != 0 {
115 result := lp.regexMatchesToMap(matches)
116 errorSeverity := result["error_severity"]
117 if lp.ServerMessagesLang != "en" {
118 errorSeverity = severityToEnglish(lp.ServerMessagesLang, errorSeverity)
119 }
120
121 databaseName := result["database_name"]
122 if lp.SourceConn.RealDbname == databaseName {
123 lp.eventCounts[errorSeverity]++
124 }
125 lp.eventCountsTotal[errorSeverity]++
126 }
127 }
128
129 if lp.HasSendIntervalElapsed() {
130 select {
131 case <-lp.ctx.Done():
132 return nil
133 case lp.StoreCh <- lp.GetMeasurementEnvelope():
134 zeroEventCounts(lp.eventCounts)
135 zeroEventCounts(lp.eventCountsTotal)
136 lp.lastSendTime = time.Now()
137 }
138 }
139
140 }
141 }
142
143 }
144