1 package reaper
2
3 import (
4 "bufio"
5 "io"
6 "os"
7 "path/filepath"
8 "time"
9
10 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
11 )
12
13 func (lp *LogParser) parseLogsLocal() error {
14 var latest, previous string
15 var latestHandle *os.File
16 var reader *bufio.Reader
17 var linesRead int
18 var err error
19 var firstRun = true
20 var currInterval time.Duration
21
22 logger := log.GetLogger(lp.ctx)
23 logsGlobPath := filepath.Join(lp.LogFolder, csvLogDefaultGlobSuffix)
24
25 for {
26 select {
27 case <-lp.ctx.Done():
28 return nil
29 case <-time.After(currInterval):
30 if currInterval == 0 {
31 currInterval = time.Second * time.Duration(lp.Interval)
32 }
33 }
34
35 if latest == "" || firstRun {
36 globMatches, err := filepath.Glob(logsGlobPath)
37 if err != nil || len(globMatches) == 0 {
38 logger.Infof("No logfiles found to parse from glob '%s'", logsGlobPath)
39 continue
40 }
41
42 logger.Debugf("Found %v logfiles from glob pattern, picking the latest", len(globMatches))
43 if len(globMatches) > 1 {
44
45 latest, _ = getFileWithLatestTimestamp(globMatches)
46 if latest == "" {
47 logger.Warningf("Could not determine the latest logfile")
48 continue
49 }
50
51 } else if len(globMatches) == 1 {
52 latest = globMatches[0]
53 }
54 logger.Infof("Starting to parse logfile: %s ", latest)
55 }
56
57 if latestHandle == nil {
58 latestHandle, err = os.Open(latest)
59 if err != nil {
60 logger.Warningf("Failed to open logfile %s: %s", latest, err)
61 continue
62 }
63 defer latestHandle.Close()
64 reader = bufio.NewReader(latestHandle)
65
66 linesOffset, ok := lp.fileOffsets[latest]
67 if ok && lp.LogTruncOnRotation == "off" {
68 linesRead = int(linesOffset)
69 }
70 if (ok || previous == latest) && linesRead > 0 {
71 i := 1
72 for i <= linesRead {
73 _, err = reader.ReadString('\n')
74 if err == io.EOF && i < linesRead {
75 logger.Warningf("Failed to open logfile %s: %s", latest, err)
76 linesRead = 0
77 break
78 } else if err != nil {
79 logger.Warningf("Failed to skip %d logfile lines for %s, there might be duplicates reported. Error: %s", linesRead, latest, err)
80 linesRead = i
81 break
82 }
83 i++
84 }
85 logger.Debugf("Skipped %d already processed lines from %s", linesRead, latest)
86 } else if firstRun {
87 _, _ = latestHandle.Seek(0, 2)
88 firstRun = false
89 }
90 }
91
92 for {
93 line, err := reader.ReadString('\n')
94 if err != nil && err != io.EOF {
95 logger.Warningf("Failed to read logfile %s: %s", latest, err)
96 _ = latestHandle.Close()
97 latestHandle = nil
98 break
99 }
100
101 if err == io.EOF {
102
103 select {
104 case <-lp.ctx.Done():
105 return nil
106 case <-time.After(currInterval):
107 }
108
109 file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
110 if file != "" && file != latest {
111 if lp.LogTruncOnRotation == "off" {
112 lp.fileOffsets[latest] = uint64(linesRead)
113 if len(lp.fileOffsets) > maxTrackedFiles {
114 clear(lp.fileOffsets)
115 }
116 }
117 previous = latest
118 latest = file
119 _ = latestHandle.Close()
120 latestHandle = nil
121 logger.Infof("Switching to new logfile: %s", file)
122 linesRead = 0
123 break
124 }
125 } else {
126 linesRead++
127 }
128
129 if err == nil && line != "" {
130 matches := lp.LogsMatchRegex.FindStringSubmatch(line)
131 if len(matches) == 0 {
132 goto send_to_storage_if_needed
133 }
134 result := lp.regexMatchesToMap(matches)
135 errorSeverity, ok := result["error_severity"]
136 if !ok {
137 logger.Error("error_severity group must be defined in parse regex:", lp.LogsMatchRegex)
138 time.Sleep(time.Minute)
139 break
140 }
141 if lp.ServerMessagesLang != "en" {
142 errorSeverity = severityToEnglish(lp.ServerMessagesLang, errorSeverity)
143 }
144
145 databaseName, ok := result["database_name"]
146 if !ok {
147 logger.Error("database_name group must be defined in parse regex:", lp.LogsMatchRegex)
148 time.Sleep(time.Minute)
149 break
150 }
151 if lp.SourceConn.RealDbname == databaseName {
152 lp.eventCounts[errorSeverity]++
153 }
154 lp.eventCountsTotal[errorSeverity]++
155 }
156
157 send_to_storage_if_needed:
158 if lp.HasSendIntervalElapsed() {
159 logger.Debugf("Sending log event counts for last interval to storage channel. Local eventcounts: %+v, global eventcounts: %+v", lp.eventCounts, lp.eventCountsTotal)
160 select {
161 case <-lp.ctx.Done():
162 return nil
163 case lp.StoreCh <- lp.GetMeasurementEnvelope():
164 zeroEventCounts(lp.eventCounts)
165 zeroEventCounts(lp.eventCountsTotal)
166 lp.lastSendTime = time.Now()
167 }
168 }
169
170 }
171 }
172
173 }
174
175 func getFileWithLatestTimestamp(files []string) (string, error) {
176 var maxDate time.Time
177 var latest string
178
179 for _, f := range files {
180 fi, err := os.Stat(f)
181 if err != nil {
182 return "", err
183 }
184 if fi.ModTime().After(maxDate) {
185 latest = f
186 maxDate = fi.ModTime()
187 }
188 }
189 return latest, nil
190 }
191
192 func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error) {
193 var nextFile string
194 var nextMod time.Time
195
196 files, err := filepath.Glob(logsGlobPath)
197 if err != nil {
198 return "", err
199 }
200
201 fiCurrent, err := os.Stat(currentFile)
202 if err != nil {
203 return "", err
204 }
205
206 for _, f := range files {
207 if f == currentFile {
208 continue
209 }
210 fi, err := os.Stat(f)
211 if err != nil {
212 continue
213 }
214 if (nextMod.IsZero() || fi.ModTime().Before(nextMod)) && fi.ModTime().After(fiCurrent.ModTime()) {
215 nextMod = fi.ModTime()
216 nextFile = f
217 }
218 }
219 return nextFile, nil
220 }
221