1 package metrics
2
3 import (
4 "bufio"
5 "cmp"
6 "context"
7 "io"
8 "os"
9 "path"
10 "path/filepath"
11 "regexp"
12 "strings"
13 "time"
14
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
17 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
18 )
19
20 const specialMetricServerLogEventCounts = "server_log_event_counts"
21
22 var PgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
23 var PgSeveritiesLocale = map[string]map[string]string{
24 "C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
25 "de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
26 "fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
27 "it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
28 "ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
29 "pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
30 "ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
31 "sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
32 "tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
33 "zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
34 }
35
36 const CSVLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
37 const CSVLogDefaultGlobSuffix = "*.csv"
38
39 func getFileWithLatestTimestamp(files []string) (string, error) {
40 var maxDate time.Time
41 var latest string
42
43 for _, f := range files {
44 fi, err := os.Stat(f)
45 if err != nil {
46 return "", err
47 }
48 if fi.ModTime().After(maxDate) {
49 latest = f
50 maxDate = fi.ModTime()
51 }
52 }
53 return latest, nil
54 }
55
56 func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error) {
57 var nextFile string
58 var nextMod time.Time
59
60 files, err := filepath.Glob(logsGlobPath)
61 if err != nil {
62 return "", err
63 }
64
65 fiCurrent, err := os.Stat(currentFile)
66 if err != nil {
67 return "", err
68 }
69
70 for _, f := range files {
71 if f == currentFile {
72 continue
73 }
74 fi, err := os.Stat(f)
75 if err != nil {
76 continue
77 }
78 if (nextMod.IsZero() || fi.ModTime().Before(nextMod)) && fi.ModTime().After(fiCurrent.ModTime()) {
79 nextMod = fi.ModTime()
80 nextFile = f
81 }
82 }
83 return nextFile, nil
84 }
85
86
87 func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.SourceConn) MeasurementEnvelope {
88 allSeverityCounts := NewMeasurement(time.Now().UnixNano())
89 for _, s := range PgSeverities {
90 parsedCount, ok := eventCounts[s]
91 if ok {
92 allSeverityCounts[strings.ToLower(s)] = parsedCount
93 } else {
94 allSeverityCounts[strings.ToLower(s)] = int64(0)
95 }
96 parsedCount, ok = eventCountsTotal[s]
97 if ok {
98 allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
99 } else {
100 allSeverityCounts[strings.ToLower(s)+"_total"] = int64(0)
101 }
102 }
103 return MeasurementEnvelope{
104 DBName: mdb.Name,
105 MetricName: specialMetricServerLogEventCounts,
106 Data: Measurements{allSeverityCounts},
107 CustomTags: mdb.CustomTags,
108 }
109 }
110
111 func ParseLogs(ctx context.Context,
112 mdb *sources.SourceConn,
113 realDbname string,
114 interval float64,
115 storeCh chan<- MeasurementEnvelope,
116 LogsMatchRegex string,
117 LogsGlobPath string) {
118
119 var latest, previous, serverMessagesLang string
120 var latestHandle *os.File
121 var reader *bufio.Reader
122 var linesRead int
123 var logsMatchRegex, logsGlobPath string
124 var lastSendTime time.Time
125 var eventCounts = make(map[string]int64)
126 var eventCountsTotal = make(map[string]int64)
127 var err error
128 var firstRun = true
129 var csvlogRegex *regexp.Regexp
130 var currInterval time.Duration
131
132 logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
133
134 if ok, err := db.IsClientOnSameHost(mdb.Conn); !ok || err != nil {
135 if err != nil {
136 logger = logger.WithError(err)
137 }
138 logger.Warning("Cannot parse logs, client is not on the same host as the Postgres server")
139 return
140 }
141
142 csvlogRegex, err = regexp.Compile(cmp.Or(LogsMatchRegex, CSVLogDefaultRegEx))
143 if err != nil {
144 logger.WithError(err).Print("Invalid regex: ", logsMatchRegex)
145 return
146 }
147 logger.Debugf("Changing logs parsing regex to: %s", logsMatchRegex)
148
149 if LogsGlobPath != "" {
150 logsGlobPath = LogsGlobPath
151 } else {
152 if logsGlobPath, err = tryDetermineLogFolder(ctx, mdb.Conn); err != nil {
153 logger.WithError(err).Print("Could not determine Postgres logs parsing folder in ", logsGlobPath)
154 return
155 }
156 }
157 logger.Debugf("Considering log files determined by glob pattern: %s", logsGlobPath)
158
159 if serverMessagesLang, err = tryDetermineLogMessagesLanguage(ctx, mdb.Conn); err != nil {
160 logger.WithError(err).Print("Could not determine language (lc_collate) used for server logs")
161 return
162 }
163
164 for {
165 select {
166 case <-ctx.Done():
167 return
168 case <-time.After(currInterval):
169 if currInterval == 0 {
170 currInterval = time.Second * time.Duration(interval)
171 }
172 }
173
174 if latest == "" || firstRun {
175 globMatches, err := filepath.Glob(logsGlobPath)
176 if err != nil || len(globMatches) == 0 {
177 logger.Infof("No logfiles found to parse from glob '%s'", logsGlobPath)
178 continue
179 }
180
181 logger.Debugf("Found %v logfiles from glob pattern, picking the latest", len(globMatches))
182 if len(globMatches) > 1 {
183
184 latest, _ = getFileWithLatestTimestamp(globMatches)
185 if latest == "" {
186 logger.Warningf("Could not determine the latest logfile")
187 continue
188 }
189
190 } else if len(globMatches) == 1 {
191 latest = globMatches[0]
192 }
193 logger.Infof("Starting to parse logfile: %s ", latest)
194 }
195
196 if latestHandle == nil {
197 latestHandle, err = os.Open(latest)
198 if err != nil {
199 logger.Warningf("Failed to open logfile %s: %s", latest, err)
200 continue
201 }
202 defer latestHandle.Close()
203 reader = bufio.NewReader(latestHandle)
204 if previous == latest && linesRead > 0 {
205 i := 1
206 for i <= linesRead {
207 _, err = reader.ReadString('\n')
208 if err == io.EOF && i < linesRead {
209 logger.Warningf("Failed to open logfile %s: %s", latest, err)
210 linesRead = 0
211 break
212 } else if err != nil {
213 logger.Warningf("Failed to skip %d logfile lines for %s, there might be duplicates reported. Error: %s", linesRead, latest, err)
214 linesRead = i
215 break
216 }
217 i++
218 }
219 logger.Debugf("Skipped %d already processed lines from %s", linesRead, latest)
220 } else if firstRun {
221 _, _ = latestHandle.Seek(0, 2)
222 firstRun = false
223 }
224 }
225
226 for {
227 line, err := reader.ReadString('\n')
228 if err != nil && err != io.EOF {
229 logger.Warningf("Failed to read logfile %s: %s", latest, err)
230 _ = latestHandle.Close()
231 latestHandle = nil
232 break
233 }
234
235 if err == io.EOF {
236
237 select {
238 case <-ctx.Done():
239 return
240 case <-time.After(currInterval):
241 }
242
243 file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
244 if file != "" {
245 previous = latest
246 latest = file
247 _ = latestHandle.Close()
248 latestHandle = nil
249 logger.Infof("Switching to new logfile: %s", file)
250 linesRead = 0
251 break
252 }
253 } else {
254 linesRead++
255 }
256
257 if err == nil && line != "" {
258 matches := csvlogRegex.FindStringSubmatch(line)
259 if len(matches) == 0 {
260 goto send_to_storage_if_needed
261 }
262 result := regexMatchesToMap(csvlogRegex, matches)
263 errorSeverity, ok := result["error_severity"]
264 if !ok {
265 logger.Error("error_severity group must be defined in parse regex:", csvlogRegex)
266 time.Sleep(time.Minute)
267 break
268 }
269 if serverMessagesLang != "en" {
270 errorSeverity = severityToEnglish(serverMessagesLang, errorSeverity)
271 }
272
273 databaseName, ok := result["database_name"]
274 if !ok {
275 logger.Error("database_name group must be defined in parse regex:", csvlogRegex)
276 time.Sleep(time.Minute)
277 break
278 }
279 if realDbname == databaseName {
280 eventCounts[errorSeverity]++
281 }
282 eventCountsTotal[errorSeverity]++
283 }
284
285 send_to_storage_if_needed:
286 if lastSendTime.IsZero() || lastSendTime.Before(time.Now().Add(-time.Second*time.Duration(interval))) {
287 logger.Debugf("Sending log event counts for last interval to storage channel. Local eventcounts: %+v, global eventcounts: %+v", eventCounts, eventCountsTotal)
288 select {
289 case <-ctx.Done():
290 return
291 case storeCh <- eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb):
292 zeroEventCounts(eventCounts)
293 zeroEventCounts(eventCountsTotal)
294 lastSendTime = time.Now()
295 }
296 }
297
298 }
299 }
300
301 }
302
303 func severityToEnglish(serverLang, errorSeverity string) string {
304
305 if serverLang == "en" {
306 return errorSeverity
307 }
308 severityMap := PgSeveritiesLocale[serverLang]
309 severityEn, ok := severityMap[errorSeverity]
310 if !ok {
311 return errorSeverity
312 }
313 return severityEn
314 }
315
316 func zeroEventCounts(eventCounts map[string]int64) {
317 for _, severity := range PgSeverities {
318 eventCounts[severity] = 0
319 }
320 }
321
322 func tryDetermineLogFolder(ctx context.Context, conn db.PgxIface) (string, error) {
323 sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld`
324 var dd, ld string
325 err := conn.QueryRow(ctx, sql).Scan(&dd, &ld)
326 if err != nil {
327 return "", err
328 }
329 if strings.HasPrefix(ld, "/") {
330
331 return path.Join(ld, CSVLogDefaultGlobSuffix), nil
332 }
333 return path.Join(dd, ld, CSVLogDefaultGlobSuffix), nil
334 }
335
336 func tryDetermineLogMessagesLanguage(ctx context.Context, conn db.PgxIface) (string, error) {
337 sql := `select current_setting('lc_messages')::varchar(2) as lc_messages;`
338 var lc string
339 err := conn.QueryRow(ctx, sql).Scan(&lc)
340 if err != nil {
341 return "", err
342 }
343 if _, ok := PgSeveritiesLocale[lc]; !ok {
344 return "en", nil
345 }
346 return lc, nil
347 }
348
349 func regexMatchesToMap(csvlogRegex *regexp.Regexp, matches []string) map[string]string {
350 result := make(map[string]string)
351 if len(matches) == 0 || csvlogRegex == nil {
352 return result
353 }
354 for i, name := range csvlogRegex.SubexpNames() {
355 if i != 0 && name != "" {
356 result[name] = matches[i]
357 }
358 }
359 return result
360 }
361