1 package metrics
2
3 import (
4 "bufio"
5 "context"
6 "io"
7 "os"
8 "path"
9 "path/filepath"
10 "regexp"
11 "strings"
12 "time"
13
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
17 )
18
19 const specialMetricServerLogEventCounts = "server_log_event_counts"
20
21 var PgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
22 var PgSeveritiesLocale = map[string]map[string]string{
23 "C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
24 "de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
25 "fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
26 "it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
27 "ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
28 "pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
29 "ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
30 "sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
31 "tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
32 "zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
33 }
34
35 const CSVLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
36 const CSVLogDefaultGlobSuffix = "*.csv"
37
38 func getFileWithLatestTimestamp(files []string) (string, error) {
39 var maxDate time.Time
40 var latest string
41
42 for _, f := range files {
43 fi, err := os.Stat(f)
44 if err != nil {
45 return "", err
46 }
47 if fi.ModTime().After(maxDate) {
48 latest = f
49 maxDate = fi.ModTime()
50 }
51 }
52 return latest, nil
53 }
54
55 func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error) {
56 var nextFile string
57 var nextMod time.Time
58
59 files, err := filepath.Glob(logsGlobPath)
60 if err != nil {
61 return "", err
62 }
63
64 fiCurrent, err := os.Stat(currentFile)
65 if err != nil {
66 return "", err
67 }
68
69 for _, f := range files {
70 if f == currentFile {
71 continue
72 }
73 fi, err := os.Stat(f)
74 if err != nil {
75 continue
76 }
77
78 if (nextMod.IsZero() || fi.ModTime().Before(nextMod)) && fi.ModTime().After(fiCurrent.ModTime()) {
79 nextMod = fi.ModTime()
80 nextFile = f
81 }
82 }
83 return nextFile, nil
84 }
85
86
87 func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.SourceConn) []MeasurementEnvelope {
88 allSeverityCounts := NewMeasurement(time.Now().UnixNano())
89 for _, s := range PgSeverities {
90 parsedCount, ok := eventCounts[s]
91 if ok {
92 allSeverityCounts[strings.ToLower(s)] = parsedCount
93 } else {
94 allSeverityCounts[strings.ToLower(s)] = 0
95 }
96 parsedCount, ok = eventCountsTotal[s]
97 if ok {
98 allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
99 } else {
100 allSeverityCounts[strings.ToLower(s)+"_total"] = 0
101 }
102 }
103 return []MeasurementEnvelope{{
104 DBName: mdb.Name,
105 SourceType: string(mdb.Kind),
106 MetricName: specialMetricServerLogEventCounts,
107 Data: Measurements{allSeverityCounts},
108 CustomTags: mdb.CustomTags,
109 }}
110 }
111
112 func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string, interval float64, storeCh chan<- []MeasurementEnvelope) {
113
114 var latest, previous, serverMessagesLang string
115 var latestHandle *os.File
116 var reader *bufio.Reader
117 var linesRead = 0
118 var logsMatchRegex, logsMatchRegexPrev, logsGlobPath string
119 var lastSendTime time.Time
120 var eventCounts = make(map[string]int64)
121 var eventCountsTotal = make(map[string]int64)
122 var hostConfig sources.HostConfigAttrs
123 var err error
124 var firstRun = true
125 var csvlogRegex *regexp.Regexp
126 logger := log.GetLogger(ctx).WithField("source", mdb.Name)
127 for {
128 select {
129 case <-ctx.Done():
130 return
131 default:
132 }
133
134 if hostConfig.LogsMatchRegex != "" {
135 logsMatchRegex = hostConfig.LogsMatchRegex
136 }
137 if logsMatchRegex == "" {
138 logger.Debug("Log parsing enabled with default CSVLOG regex")
139 logsMatchRegex = CSVLogDefaultRegEx
140 }
141 if hostConfig.LogsGlobPath != "" {
142 logsGlobPath = hostConfig.LogsGlobPath
143 }
144 if logsGlobPath == "" {
145 logsGlobPath, err = tryDetermineLogFolder(ctx, mdb.Conn)
146 if err != nil {
147 logger.WithError(err).Print("Could not determine Postgres logs parsing folder. Configured logs_glob_path = ", logsGlobPath)
148 time.Sleep(60 * time.Second)
149 continue
150 }
151 }
152 serverMessagesLang, err = tryDetermineLogMessagesLanguage(ctx, mdb.Conn)
153 if err != nil {
154 logger.WithError(err).Warning("Could not determine language (lc_collate) used for server logs, cannot parse logs...")
155 time.Sleep(60 * time.Second)
156 continue
157 }
158
159 if logsMatchRegexPrev != logsMatchRegex {
160 csvlogRegex, err = regexp.Compile(logsMatchRegex)
161 if err != nil {
162 logger.WithError(err).Print("Invalid regex: ", logsMatchRegex)
163 time.Sleep(60 * time.Second)
164 continue
165 }
166 logger.Infof("Changing logs parsing regex to: %s", logsMatchRegex)
167 logsMatchRegexPrev = logsMatchRegex
168 }
169
170 logger.Debugf("Considering log files determined by glob pattern: %s", logsGlobPath)
171
172 if latest == "" || firstRun {
173 globMatches, err := filepath.Glob(logsGlobPath)
174 if err != nil || len(globMatches) == 0 {
175 logger.Infof("No logfiles found to parse from glob '%s'. Sleeping 60s...", logsGlobPath)
176 time.Sleep(60 * time.Second)
177 continue
178 }
179
180 logger.Debugf("Found %v logfiles from glob pattern, picking the latest", len(globMatches))
181 if len(globMatches) > 1 {
182
183 latest, _ = getFileWithLatestTimestamp(globMatches)
184 if latest == "" {
185 logger.Warningf("Could not determine the latest logfile. Sleeping 60s...")
186 time.Sleep(60 * time.Second)
187 continue
188 }
189
190 } else if len(globMatches) == 1 {
191 latest = globMatches[0]
192 }
193 logger.Infof("Starting to parse logfile: %s ", latest)
194 }
195
196 if latestHandle == nil {
197 latestHandle, err = os.Open(latest)
198 if err != nil {
199 logger.Warningf("Failed to open logfile %s: %s. Sleeping 60s...", latest, err)
200 time.Sleep(60 * time.Second)
201 continue
202 }
203 reader = bufio.NewReader(latestHandle)
204 if previous == latest && linesRead > 0 {
205 i := 1
206 for i <= linesRead {
207 _, err = reader.ReadString('\n')
208 if err == io.EOF && i < linesRead {
209 logger.Warningf("Failed to open logfile %s: %s. Sleeping 60s...", latest, err)
210 linesRead = 0
211 break
212 } else if err != nil {
213 logger.Warningf("Failed to skip %d logfile lines for %s, there might be duplicates reported. Error: %s", linesRead, latest, err)
214 time.Sleep(60 * time.Second)
215 linesRead = i
216 break
217 }
218 i++
219 }
220 logger.Debugf("Skipped %d already processed lines from %s", linesRead, latest)
221 } else if firstRun {
222 _, _ = latestHandle.Seek(0, 2)
223 firstRun = false
224 }
225 }
226
227 var eofSleepMillis = 0
228
229
230 for {
231
232
233
234 line, err := reader.ReadString('\n')
235 if err != nil && err != io.EOF {
236 logger.Warningf("Failed to read logfile %s: %s. Sleeping 60s...", latest, err)
237 err = latestHandle.Close()
238 if err != nil {
239 logger.Warningf("Failed to close logfile %s properly: %s", latest, err)
240 }
241 latestHandle = nil
242 time.Sleep(60 * time.Second)
243 break
244 }
245
246 if err == io.EOF {
247
248 if eofSleepMillis < 5000 && float64(eofSleepMillis) < interval*1000 {
249 eofSleepMillis += 100
250 }
251 time.Sleep(time.Millisecond * time.Duration(eofSleepMillis))
252
253
254 file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
255 if file != "" {
256 previous = latest
257 latest = file
258 err = latestHandle.Close()
259 latestHandle = nil
260 if err != nil {
261 logger.Warningf("Failed to close logfile %s properly: %s", latest, err)
262 }
263 logger.Infof("Switching to new logfile: %s", file)
264 linesRead = 0
265 break
266 }
267 } else {
268 eofSleepMillis = 0
269 linesRead++
270 }
271
272 if err == nil && line != "" {
273
274 matches := csvlogRegex.FindStringSubmatch(line)
275 if len(matches) == 0 {
276
277
278 goto send_to_storage_if_needed
279 }
280
281 result := regexMatchesToMap(csvlogRegex, matches)
282
283 errorSeverity, ok := result["error_severity"]
284 if !ok {
285 logger.Error("error_severity group must be defined in parse regex:", csvlogRegex)
286 time.Sleep(time.Minute)
287 break
288 }
289 if serverMessagesLang != "en" {
290 errorSeverity = severityToEnglish(serverMessagesLang, errorSeverity)
291 }
292
293 databaseName, ok := result["database_name"]
294 if !ok {
295 logger.Error("database_name group must be defined in parse regex:", csvlogRegex)
296 time.Sleep(time.Minute)
297 break
298 }
299 if realDbname == databaseName {
300 eventCounts[errorSeverity]++
301 }
302 eventCountsTotal[errorSeverity]++
303 }
304
305 send_to_storage_if_needed:
306 if lastSendTime.IsZero() || lastSendTime.Before(time.Now().Add(-1*time.Second*time.Duration(interval))) {
307 logger.Debugf("Sending log event counts for last interval to storage channel. Local eventcounts: %+v, global eventcounts: %+v", eventCounts, eventCountsTotal)
308 metricStoreMessages := eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb)
309 storeCh <- metricStoreMessages
310 zeroEventCounts(eventCounts)
311 zeroEventCounts(eventCountsTotal)
312 lastSendTime = time.Now()
313 }
314
315 }
316 }
317
318 }
319
320 func severityToEnglish(serverLang, errorSeverity string) string {
321
322 if serverLang == "en" {
323 return errorSeverity
324 }
325 severityMap := PgSeveritiesLocale[serverLang]
326 severityEn, ok := severityMap[errorSeverity]
327 if !ok {
328 return errorSeverity
329 }
330 return severityEn
331 }
332
333 func zeroEventCounts(eventCounts map[string]int64) {
334 for _, severity := range PgSeverities {
335 eventCounts[severity] = 0
336 }
337 }
338
339 func tryDetermineLogFolder(ctx context.Context, conn db.PgxIface) (string, error) {
340 sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld`
341 var dd, ld string
342 err := conn.QueryRow(ctx, sql).Scan(&dd, &ld)
343 if err != nil {
344 return "", err
345 }
346 if strings.HasPrefix(ld, "/") {
347
348 return path.Join(ld, CSVLogDefaultGlobSuffix), nil
349 }
350 return path.Join(dd, ld, CSVLogDefaultGlobSuffix), nil
351 }
352
353 func tryDetermineLogMessagesLanguage(ctx context.Context, conn db.PgxIface) (string, error) {
354 sql := `select current_setting('lc_messages')::varchar(2) as lc_messages;`
355 var lc string
356 err := conn.QueryRow(ctx, sql).Scan(&lc)
357 if err != nil {
358 return "", err
359 }
360 if _, ok := PgSeveritiesLocale[lc]; !ok {
361 return "en", nil
362 }
363 return lc, nil
364 }
365
366 func regexMatchesToMap(csvlogRegex *regexp.Regexp, matches []string) map[string]string {
367 result := make(map[string]string)
368 if len(matches) == 0 || csvlogRegex == nil {
369 return result
370 }
371 for i, name := range csvlogRegex.SubexpNames() {
372 if i != 0 && name != "" {
373 result[name] = matches[i]
374 }
375 }
376 return result
377 }
378