...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/metrics/logparse.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/metrics

     1  package metrics
     2  
     3  import (
     4  	"bufio"
     5  	"context"
     6  	"io"
     7  	"os"
     8  	"path"
     9  	"path/filepath"
    10  	"regexp"
    11  	"strings"
    12  	"time"
    13  
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    17  )
    18  
    19  const specialMetricServerLogEventCounts = "server_log_event_counts"
    20  
    21  var PgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
    22  var PgSeveritiesLocale = map[string]map[string]string{
    23  	"C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
    24  	"de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    25  	"fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    26  	"it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
    27  	"ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
    28  	"pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
    29  	"ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
    30  	"sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
    31  	"tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
    32  	"zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
    33  }
    34  
    35  const CSVLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
    36  const CSVLogDefaultGlobSuffix = "*.csv"
    37  
    38  func getFileWithLatestTimestamp(files []string) (string, error) {
    39  	var maxDate time.Time
    40  	var latest string
    41  
    42  	for _, f := range files {
    43  		fi, err := os.Stat(f)
    44  		if err != nil {
    45  			return "", err
    46  		}
    47  		if fi.ModTime().After(maxDate) {
    48  			latest = f
    49  			maxDate = fi.ModTime()
    50  		}
    51  	}
    52  	return latest, nil
    53  }
    54  
    55  func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error) {
    56  	var nextFile string
    57  	var nextMod time.Time
    58  
    59  	files, err := filepath.Glob(logsGlobPath)
    60  	if err != nil {
    61  		return "", err
    62  	}
    63  
    64  	fiCurrent, err := os.Stat(currentFile)
    65  	if err != nil {
    66  		return "", err
    67  	}
    68  
    69  	for _, f := range files {
    70  		if f == currentFile {
    71  			continue
    72  		}
    73  		fi, err := os.Stat(f)
    74  		if err != nil {
    75  			continue
    76  		}
    77  		//log.Debugf("Stat().ModTime() for %s: %v", f, fi.ModTime())
    78  		if (nextMod.IsZero() || fi.ModTime().Before(nextMod)) && fi.ModTime().After(fiCurrent.ModTime()) {
    79  			nextMod = fi.ModTime()
    80  			nextFile = f
    81  		}
    82  	}
    83  	return nextFile, nil
    84  }
    85  
    86  // 1. add zero counts for severity levels that didn't have any occurrences in the log
    87  func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.SourceConn) []MeasurementEnvelope {
    88  	allSeverityCounts := NewMeasurement(time.Now().UnixNano())
    89  	for _, s := range PgSeverities {
    90  		parsedCount, ok := eventCounts[s]
    91  		if ok {
    92  			allSeverityCounts[strings.ToLower(s)] = parsedCount
    93  		} else {
    94  			allSeverityCounts[strings.ToLower(s)] = 0
    95  		}
    96  		parsedCount, ok = eventCountsTotal[s]
    97  		if ok {
    98  			allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
    99  		} else {
   100  			allSeverityCounts[strings.ToLower(s)+"_total"] = 0
   101  		}
   102  	}
   103  	return []MeasurementEnvelope{{
   104  		DBName:     mdb.Name,
   105  		SourceType: string(mdb.Kind),
   106  		MetricName: specialMetricServerLogEventCounts,
   107  		Data:       Measurements{allSeverityCounts},
   108  		CustomTags: mdb.CustomTags,
   109  	}}
   110  }
   111  
   112  func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string, interval float64, storeCh chan<- []MeasurementEnvelope) {
   113  
   114  	var latest, previous, serverMessagesLang string
   115  	var latestHandle *os.File
   116  	var reader *bufio.Reader
   117  	var linesRead = 0 // to skip over already parsed lines on Postgres server restart for example
   118  	var logsMatchRegex, logsMatchRegexPrev, logsGlobPath string
   119  	var lastSendTime time.Time                    // to storage channel
   120  	var eventCounts = make(map[string]int64)      // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
   121  	var eventCountsTotal = make(map[string]int64) // for the whole instance
   122  	var hostConfig sources.HostConfigAttrs
   123  	var err error
   124  	var firstRun = true
   125  	var csvlogRegex *regexp.Regexp
   126  	logger := log.GetLogger(ctx).WithField("source", mdb.Name)
   127  	for { // re-try loop. re-start in case of FS errors or just to refresh host config
   128  		select {
   129  		case <-ctx.Done():
   130  			return
   131  		default:
   132  		}
   133  
   134  		if hostConfig.LogsMatchRegex != "" {
   135  			logsMatchRegex = hostConfig.LogsMatchRegex
   136  		}
   137  		if logsMatchRegex == "" {
   138  			logger.Debug("Log parsing enabled with default CSVLOG regex")
   139  			logsMatchRegex = CSVLogDefaultRegEx
   140  		}
   141  		if hostConfig.LogsGlobPath != "" {
   142  			logsGlobPath = hostConfig.LogsGlobPath
   143  		}
   144  		if logsGlobPath == "" {
   145  			logsGlobPath, err = tryDetermineLogFolder(ctx, mdb.Conn)
   146  			if err != nil {
   147  				logger.WithError(err).Print("Could not determine Postgres logs parsing folder. Configured logs_glob_path = ", logsGlobPath)
   148  				time.Sleep(60 * time.Second)
   149  				continue
   150  			}
   151  		}
   152  		serverMessagesLang, err = tryDetermineLogMessagesLanguage(ctx, mdb.Conn)
   153  		if err != nil {
   154  			logger.WithError(err).Warning("Could not determine language (lc_collate) used for server logs, cannot parse logs...")
   155  			time.Sleep(60 * time.Second)
   156  			continue
   157  		}
   158  
   159  		if logsMatchRegexPrev != logsMatchRegex { // avoid regex recompile if no changes
   160  			csvlogRegex, err = regexp.Compile(logsMatchRegex)
   161  			if err != nil {
   162  				logger.WithError(err).Print("Invalid regex: ", logsMatchRegex)
   163  				time.Sleep(60 * time.Second)
   164  				continue
   165  			}
   166  			logger.Infof("Changing logs parsing regex to: %s", logsMatchRegex)
   167  			logsMatchRegexPrev = logsMatchRegex
   168  		}
   169  
   170  		logger.Debugf("Considering log files determined by glob pattern: %s", logsGlobPath)
   171  
   172  		if latest == "" || firstRun {
   173  			globMatches, err := filepath.Glob(logsGlobPath)
   174  			if err != nil || len(globMatches) == 0 {
   175  				logger.Infof("No logfiles found to parse from glob '%s'. Sleeping 60s...", logsGlobPath)
   176  				time.Sleep(60 * time.Second)
   177  				continue
   178  			}
   179  
   180  			logger.Debugf("Found %v logfiles from glob pattern, picking the latest", len(globMatches))
   181  			if len(globMatches) > 1 {
   182  				// find latest timestamp
   183  				latest, _ = getFileWithLatestTimestamp(globMatches)
   184  				if latest == "" {
   185  					logger.Warningf("Could not determine the latest logfile. Sleeping 60s...")
   186  					time.Sleep(60 * time.Second)
   187  					continue
   188  				}
   189  
   190  			} else if len(globMatches) == 1 {
   191  				latest = globMatches[0]
   192  			}
   193  			logger.Infof("Starting to parse logfile: %s ", latest)
   194  		}
   195  
   196  		if latestHandle == nil {
   197  			latestHandle, err = os.Open(latest)
   198  			if err != nil {
   199  				logger.Warningf("Failed to open logfile %s: %s. Sleeping 60s...", latest, err)
   200  				time.Sleep(60 * time.Second)
   201  				continue
   202  			}
   203  			reader = bufio.NewReader(latestHandle)
   204  			if previous == latest && linesRead > 0 { // handle postmaster restarts
   205  				i := 1
   206  				for i <= linesRead {
   207  					_, err = reader.ReadString('\n')
   208  					if err == io.EOF && i < linesRead {
   209  						logger.Warningf("Failed to open logfile %s: %s. Sleeping 60s...", latest, err)
   210  						linesRead = 0
   211  						break
   212  					} else if err != nil {
   213  						logger.Warningf("Failed to skip %d logfile lines for %s, there might be duplicates reported. Error: %s", linesRead, latest, err)
   214  						time.Sleep(60 * time.Second)
   215  						linesRead = i
   216  						break
   217  					}
   218  					i++
   219  				}
   220  				logger.Debugf("Skipped %d already processed lines from %s", linesRead, latest)
   221  			} else if firstRun { // seek to end
   222  				_, _ = latestHandle.Seek(0, 2)
   223  				firstRun = false
   224  			}
   225  		}
   226  
   227  		var eofSleepMillis = 0
   228  		// readLoopStart := time.Now()
   229  
   230  		for {
   231  			// if readLoopStart.Add(time.Second * time.Duration(opts.Source.Refresh)).Before(time.Now()) {
   232  			// 	break // refresh config
   233  			// }
   234  			line, err := reader.ReadString('\n')
   235  			if err != nil && err != io.EOF {
   236  				logger.Warningf("Failed to read logfile %s: %s. Sleeping 60s...", latest, err)
   237  				err = latestHandle.Close()
   238  				if err != nil {
   239  					logger.Warningf("Failed to close logfile %s properly: %s", latest, err)
   240  				}
   241  				latestHandle = nil
   242  				time.Sleep(60 * time.Second)
   243  				break
   244  			}
   245  
   246  			if err == io.EOF {
   247  				//log.Debugf("EOF reached for logfile %s", latest)
   248  				if eofSleepMillis < 5000 && float64(eofSleepMillis) < interval*1000 {
   249  					eofSleepMillis += 100 // progressively sleep more if nothing going on but not more that 5s or metric interval
   250  				}
   251  				time.Sleep(time.Millisecond * time.Duration(eofSleepMillis))
   252  
   253  				// check for newly opened logfiles
   254  				file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
   255  				if file != "" {
   256  					previous = latest
   257  					latest = file
   258  					err = latestHandle.Close()
   259  					latestHandle = nil
   260  					if err != nil {
   261  						logger.Warningf("Failed to close logfile %s properly: %s", latest, err)
   262  					}
   263  					logger.Infof("Switching to new logfile: %s", file)
   264  					linesRead = 0
   265  					break
   266  				}
   267  			} else {
   268  				eofSleepMillis = 0
   269  				linesRead++
   270  			}
   271  
   272  			if err == nil && line != "" {
   273  
   274  				matches := csvlogRegex.FindStringSubmatch(line)
   275  				if len(matches) == 0 {
   276  					//log.Debugf("No logline regex match for line:") // normal case actually for queries spanning multiple loglines
   277  					//log.Debugf(line)
   278  					goto send_to_storage_if_needed
   279  				}
   280  
   281  				result := regexMatchesToMap(csvlogRegex, matches)
   282  				//log.Debugf("RegexMatchesToMap: %+v", result)
   283  				errorSeverity, ok := result["error_severity"]
   284  				if !ok {
   285  					logger.Error("error_severity group must be defined in parse regex:", csvlogRegex)
   286  					time.Sleep(time.Minute)
   287  					break
   288  				}
   289  				if serverMessagesLang != "en" {
   290  					errorSeverity = severityToEnglish(serverMessagesLang, errorSeverity)
   291  				}
   292  
   293  				databaseName, ok := result["database_name"]
   294  				if !ok {
   295  					logger.Error("database_name group must be defined in parse regex:", csvlogRegex)
   296  					time.Sleep(time.Minute)
   297  					break
   298  				}
   299  				if realDbname == databaseName {
   300  					eventCounts[errorSeverity]++
   301  				}
   302  				eventCountsTotal[errorSeverity]++
   303  			}
   304  
   305  		send_to_storage_if_needed:
   306  			if lastSendTime.IsZero() || lastSendTime.Before(time.Now().Add(-1*time.Second*time.Duration(interval))) {
   307  				logger.Debugf("Sending log event counts for last interval to storage channel. Local eventcounts: %+v, global eventcounts: %+v", eventCounts, eventCountsTotal)
   308  				metricStoreMessages := eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb)
   309  				storeCh <- metricStoreMessages
   310  				zeroEventCounts(eventCounts)
   311  				zeroEventCounts(eventCountsTotal)
   312  				lastSendTime = time.Now()
   313  			}
   314  
   315  		} // file read loop
   316  	} // config loop
   317  
   318  }
   319  
   320  func severityToEnglish(serverLang, errorSeverity string) string {
   321  	//log.Debug("severityToEnglish", serverLang, errorSeverity)
   322  	if serverLang == "en" {
   323  		return errorSeverity
   324  	}
   325  	severityMap := PgSeveritiesLocale[serverLang]
   326  	severityEn, ok := severityMap[errorSeverity]
   327  	if !ok {
   328  		return errorSeverity
   329  	}
   330  	return severityEn
   331  }
   332  
   333  func zeroEventCounts(eventCounts map[string]int64) {
   334  	for _, severity := range PgSeverities {
   335  		eventCounts[severity] = 0
   336  	}
   337  }
   338  
   339  func tryDetermineLogFolder(ctx context.Context, conn db.PgxIface) (string, error) {
   340  	sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld`
   341  	var dd, ld string
   342  	err := conn.QueryRow(ctx, sql).Scan(&dd, &ld)
   343  	if err != nil {
   344  		return "", err
   345  	}
   346  	if strings.HasPrefix(ld, "/") {
   347  		// we have a full path we can use
   348  		return path.Join(ld, CSVLogDefaultGlobSuffix), nil
   349  	}
   350  	return path.Join(dd, ld, CSVLogDefaultGlobSuffix), nil
   351  }
   352  
   353  func tryDetermineLogMessagesLanguage(ctx context.Context, conn db.PgxIface) (string, error) {
   354  	sql := `select current_setting('lc_messages')::varchar(2) as lc_messages;`
   355  	var lc string
   356  	err := conn.QueryRow(ctx, sql).Scan(&lc)
   357  	if err != nil {
   358  		return "", err
   359  	}
   360  	if _, ok := PgSeveritiesLocale[lc]; !ok {
   361  		return "en", nil
   362  	}
   363  	return lc, nil
   364  }
   365  
   366  func regexMatchesToMap(csvlogRegex *regexp.Regexp, matches []string) map[string]string {
   367  	result := make(map[string]string)
   368  	if len(matches) == 0 || csvlogRegex == nil {
   369  		return result
   370  	}
   371  	for i, name := range csvlogRegex.SubexpNames() {
   372  		if i != 0 && name != "" {
   373  			result[name] = matches[i]
   374  		}
   375  	}
   376  	return result
   377  }
   378