1 package metrics
2
3 import (
4 "bufio"
5 "cmp"
6 "context"
7 "io"
8 "os"
9 "path"
10 "path/filepath"
11 "regexp"
12 "strings"
13 "time"
14
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
17 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
18 )
19
20 const specialMetricServerLogEventCounts = "server_log_event_counts"
21
22 var PgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
23 var PgSeveritiesLocale = map[string]map[string]string{
24 "C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
25 "de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
26 "fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
27 "it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
28 "ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
29 "pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
30 "ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
31 "sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
32 "tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
33 "zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
34 }
35
36 const CSVLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
37 const CSVLogDefaultGlobSuffix = "*.csv"
38
39 func getFileWithLatestTimestamp(files []string) (string, error) {
40 var maxDate time.Time
41 var latest string
42
43 for _, f := range files {
44 fi, err := os.Stat(f)
45 if err != nil {
46 return "", err
47 }
48 if fi.ModTime().After(maxDate) {
49 latest = f
50 maxDate = fi.ModTime()
51 }
52 }
53 return latest, nil
54 }
55
56 func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error) {
57 var nextFile string
58 var nextMod time.Time
59
60 files, err := filepath.Glob(logsGlobPath)
61 if err != nil {
62 return "", err
63 }
64
65 fiCurrent, err := os.Stat(currentFile)
66 if err != nil {
67 return "", err
68 }
69
70 for _, f := range files {
71 if f == currentFile {
72 continue
73 }
74 fi, err := os.Stat(f)
75 if err != nil {
76 continue
77 }
78 if (nextMod.IsZero() || fi.ModTime().Before(nextMod)) && fi.ModTime().After(fiCurrent.ModTime()) {
79 nextMod = fi.ModTime()
80 nextFile = f
81 }
82 }
83 return nextFile, nil
84 }
85
86
87 func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.SourceConn) MeasurementEnvelope {
88 allSeverityCounts := NewMeasurement(time.Now().UnixNano())
89 for _, s := range PgSeverities {
90 parsedCount, ok := eventCounts[s]
91 if ok {
92 allSeverityCounts[strings.ToLower(s)] = parsedCount
93 } else {
94 allSeverityCounts[strings.ToLower(s)] = int64(0)
95 }
96 parsedCount, ok = eventCountsTotal[s]
97 if ok {
98 allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
99 } else {
100 allSeverityCounts[strings.ToLower(s)+"_total"] = int64(0)
101 }
102 }
103 return MeasurementEnvelope{
104 DBName: mdb.Name,
105 MetricName: specialMetricServerLogEventCounts,
106 Data: Measurements{allSeverityCounts},
107 CustomTags: mdb.CustomTags,
108 }
109 }
110
111 func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string, interval float64, storeCh chan<- MeasurementEnvelope) {
112
113 var latest, previous, serverMessagesLang string
114 var latestHandle *os.File
115 var reader *bufio.Reader
116 var linesRead int
117 var logsMatchRegex, logsGlobPath string
118 var lastSendTime time.Time
119 var eventCounts = make(map[string]int64)
120 var eventCountsTotal = make(map[string]int64)
121 var err error
122 var firstRun = true
123 var csvlogRegex *regexp.Regexp
124 var currInterval time.Duration
125
126 logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
127
128 if ok, err := db.IsClientOnSameHost(mdb.Conn); !ok || err != nil {
129 if err != nil {
130 logger = logger.WithError(err)
131 }
132 logger.Warning("Cannot parse logs, client is not on the same host as the Postgres server")
133 return
134 }
135
136 csvlogRegex, err = regexp.Compile(cmp.Or(mdb.HostConfig.LogsMatchRegex, CSVLogDefaultRegEx))
137 if err != nil {
138 logger.WithError(err).Print("Invalid regex: ", logsMatchRegex)
139 return
140 }
141 logger.Debugf("Changing logs parsing regex to: %s", logsMatchRegex)
142
143 if mdb.HostConfig.LogsGlobPath != "" {
144 logsGlobPath = mdb.HostConfig.LogsGlobPath
145 } else {
146 if logsGlobPath, err = tryDetermineLogFolder(ctx, mdb.Conn); err != nil {
147 logger.WithError(err).Print("Could not determine Postgres logs parsing folder in ", logsGlobPath)
148 return
149 }
150 }
151 logger.Debugf("Considering log files determined by glob pattern: %s", logsGlobPath)
152
153 if serverMessagesLang, err = tryDetermineLogMessagesLanguage(ctx, mdb.Conn); err != nil {
154 logger.WithError(err).Print("Could not determine language (lc_collate) used for server logs")
155 return
156 }
157
158 for {
159 select {
160 case <-ctx.Done():
161 return
162 case <-time.After(currInterval):
163 if currInterval == 0 {
164 currInterval = time.Second * time.Duration(interval)
165 }
166 }
167
168 if latest == "" || firstRun {
169 globMatches, err := filepath.Glob(logsGlobPath)
170 if err != nil || len(globMatches) == 0 {
171 logger.Infof("No logfiles found to parse from glob '%s'", logsGlobPath)
172 continue
173 }
174
175 logger.Debugf("Found %v logfiles from glob pattern, picking the latest", len(globMatches))
176 if len(globMatches) > 1 {
177
178 latest, _ = getFileWithLatestTimestamp(globMatches)
179 if latest == "" {
180 logger.Warningf("Could not determine the latest logfile")
181 continue
182 }
183
184 } else if len(globMatches) == 1 {
185 latest = globMatches[0]
186 }
187 logger.Infof("Starting to parse logfile: %s ", latest)
188 }
189
190 if latestHandle == nil {
191 latestHandle, err = os.Open(latest)
192 if err != nil {
193 logger.Warningf("Failed to open logfile %s: %s", latest, err)
194 continue
195 }
196 defer latestHandle.Close()
197 reader = bufio.NewReader(latestHandle)
198 if previous == latest && linesRead > 0 {
199 i := 1
200 for i <= linesRead {
201 _, err = reader.ReadString('\n')
202 if err == io.EOF && i < linesRead {
203 logger.Warningf("Failed to open logfile %s: %s", latest, err)
204 linesRead = 0
205 break
206 } else if err != nil {
207 logger.Warningf("Failed to skip %d logfile lines for %s, there might be duplicates reported. Error: %s", linesRead, latest, err)
208 linesRead = i
209 break
210 }
211 i++
212 }
213 logger.Debugf("Skipped %d already processed lines from %s", linesRead, latest)
214 } else if firstRun {
215 _, _ = latestHandle.Seek(0, 2)
216 firstRun = false
217 }
218 }
219
220 for {
221 line, err := reader.ReadString('\n')
222 if err != nil && err != io.EOF {
223 logger.Warningf("Failed to read logfile %s: %s", latest, err)
224 _ = latestHandle.Close()
225 latestHandle = nil
226 break
227 }
228
229 if err == io.EOF {
230
231 select {
232 case <-ctx.Done():
233 return
234 case <-time.After(currInterval):
235 }
236
237 file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
238 if file != "" {
239 previous = latest
240 latest = file
241 _ = latestHandle.Close()
242 latestHandle = nil
243 logger.Infof("Switching to new logfile: %s", file)
244 linesRead = 0
245 break
246 }
247 } else {
248 linesRead++
249 }
250
251 if err == nil && line != "" {
252 matches := csvlogRegex.FindStringSubmatch(line)
253 if len(matches) == 0 {
254 goto send_to_storage_if_needed
255 }
256 result := regexMatchesToMap(csvlogRegex, matches)
257 errorSeverity, ok := result["error_severity"]
258 if !ok {
259 logger.Error("error_severity group must be defined in parse regex:", csvlogRegex)
260 time.Sleep(time.Minute)
261 break
262 }
263 if serverMessagesLang != "en" {
264 errorSeverity = severityToEnglish(serverMessagesLang, errorSeverity)
265 }
266
267 databaseName, ok := result["database_name"]
268 if !ok {
269 logger.Error("database_name group must be defined in parse regex:", csvlogRegex)
270 time.Sleep(time.Minute)
271 break
272 }
273 if realDbname == databaseName {
274 eventCounts[errorSeverity]++
275 }
276 eventCountsTotal[errorSeverity]++
277 }
278
279 send_to_storage_if_needed:
280 if lastSendTime.IsZero() || lastSendTime.Before(time.Now().Add(-time.Second*time.Duration(interval))) {
281 logger.Debugf("Sending log event counts for last interval to storage channel. Local eventcounts: %+v, global eventcounts: %+v", eventCounts, eventCountsTotal)
282 select {
283 case <-ctx.Done():
284 return
285 case storeCh <- eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb):
286 zeroEventCounts(eventCounts)
287 zeroEventCounts(eventCountsTotal)
288 lastSendTime = time.Now()
289 }
290 }
291
292 }
293 }
294
295 }
296
297 func severityToEnglish(serverLang, errorSeverity string) string {
298
299 if serverLang == "en" {
300 return errorSeverity
301 }
302 severityMap := PgSeveritiesLocale[serverLang]
303 severityEn, ok := severityMap[errorSeverity]
304 if !ok {
305 return errorSeverity
306 }
307 return severityEn
308 }
309
310 func zeroEventCounts(eventCounts map[string]int64) {
311 for _, severity := range PgSeverities {
312 eventCounts[severity] = 0
313 }
314 }
315
316 func tryDetermineLogFolder(ctx context.Context, conn db.PgxIface) (string, error) {
317 sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld`
318 var dd, ld string
319 err := conn.QueryRow(ctx, sql).Scan(&dd, &ld)
320 if err != nil {
321 return "", err
322 }
323 if strings.HasPrefix(ld, "/") {
324
325 return path.Join(ld, CSVLogDefaultGlobSuffix), nil
326 }
327 return path.Join(dd, ld, CSVLogDefaultGlobSuffix), nil
328 }
329
330 func tryDetermineLogMessagesLanguage(ctx context.Context, conn db.PgxIface) (string, error) {
331 sql := `select current_setting('lc_messages')::varchar(2) as lc_messages;`
332 var lc string
333 err := conn.QueryRow(ctx, sql).Scan(&lc)
334 if err != nil {
335 return "", err
336 }
337 if _, ok := PgSeveritiesLocale[lc]; !ok {
338 return "en", nil
339 }
340 return lc, nil
341 }
342
343 func regexMatchesToMap(csvlogRegex *regexp.Regexp, matches []string) map[string]string {
344 result := make(map[string]string)
345 if len(matches) == 0 || csvlogRegex == nil {
346 return result
347 }
348 for i, name := range csvlogRegex.SubexpNames() {
349 if i != 0 && name != "" {
350 result[name] = matches[i]
351 }
352 }
353 return result
354 }
355