...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/metrics/logparse.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/metrics

     1  package metrics
     2  
     3  import (
     4  	"bufio"
     5  	"cmp"
     6  	"context"
     7  	"io"
     8  	"os"
     9  	"path"
    10  	"path/filepath"
    11  	"regexp"
    12  	"strings"
    13  	"time"
    14  
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    17  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    18  )
    19  
    20  const specialMetricServerLogEventCounts = "server_log_event_counts"
    21  
    22  var PgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
    23  var PgSeveritiesLocale = map[string]map[string]string{
    24  	"C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
    25  	"de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    26  	"fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    27  	"it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
    28  	"ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
    29  	"pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
    30  	"ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
    31  	"sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
    32  	"tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
    33  	"zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
    34  }
    35  
    36  const CSVLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
    37  const CSVLogDefaultGlobSuffix = "*.csv"
    38  
    39  func getFileWithLatestTimestamp(files []string) (string, error) {
    40  	var maxDate time.Time
    41  	var latest string
    42  
    43  	for _, f := range files {
    44  		fi, err := os.Stat(f)
    45  		if err != nil {
    46  			return "", err
    47  		}
    48  		if fi.ModTime().After(maxDate) {
    49  			latest = f
    50  			maxDate = fi.ModTime()
    51  		}
    52  	}
    53  	return latest, nil
    54  }
    55  
    56  func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error) {
    57  	var nextFile string
    58  	var nextMod time.Time
    59  
    60  	files, err := filepath.Glob(logsGlobPath)
    61  	if err != nil {
    62  		return "", err
    63  	}
    64  
    65  	fiCurrent, err := os.Stat(currentFile)
    66  	if err != nil {
    67  		return "", err
    68  	}
    69  
    70  	for _, f := range files {
    71  		if f == currentFile {
    72  			continue
    73  		}
    74  		fi, err := os.Stat(f)
    75  		if err != nil {
    76  			continue
    77  		}
    78  		if (nextMod.IsZero() || fi.ModTime().Before(nextMod)) && fi.ModTime().After(fiCurrent.ModTime()) {
    79  			nextMod = fi.ModTime()
    80  			nextFile = f
    81  		}
    82  	}
    83  	return nextFile, nil
    84  }
    85  
    86  // 1. add zero counts for severity levels that didn't have any occurrences in the log
    87  func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.SourceConn) MeasurementEnvelope {
    88  	allSeverityCounts := NewMeasurement(time.Now().UnixNano())
    89  	for _, s := range PgSeverities {
    90  		parsedCount, ok := eventCounts[s]
    91  		if ok {
    92  			allSeverityCounts[strings.ToLower(s)] = parsedCount
    93  		} else {
    94  			allSeverityCounts[strings.ToLower(s)] = int64(0)
    95  		}
    96  		parsedCount, ok = eventCountsTotal[s]
    97  		if ok {
    98  			allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
    99  		} else {
   100  			allSeverityCounts[strings.ToLower(s)+"_total"] = int64(0)
   101  		}
   102  	}
   103  	return MeasurementEnvelope{
   104  		DBName:     mdb.Name,
   105  		MetricName: specialMetricServerLogEventCounts,
   106  		Data:       Measurements{allSeverityCounts},
   107  		CustomTags: mdb.CustomTags,
   108  	}
   109  }
   110  
   111  func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string, interval float64, storeCh chan<- MeasurementEnvelope) {
   112  
   113  	var latest, previous, serverMessagesLang string
   114  	var latestHandle *os.File
   115  	var reader *bufio.Reader
   116  	var linesRead int // to skip over already parsed lines on Postgres server restart for example
   117  	var logsMatchRegex, logsGlobPath string
   118  	var lastSendTime time.Time                    // to storage channel
   119  	var eventCounts = make(map[string]int64)      // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
   120  	var eventCountsTotal = make(map[string]int64) // for the whole instance
   121  	var err error
   122  	var firstRun = true
   123  	var csvlogRegex *regexp.Regexp
   124  	var currInterval time.Duration
   125  
   126  	logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
   127  
   128  	if ok, err := db.IsClientOnSameHost(mdb.Conn); !ok || err != nil {
   129  		if err != nil {
   130  			logger = logger.WithError(err)
   131  		}
   132  		logger.Warning("Cannot parse logs, client is not on the same host as the Postgres server")
   133  		return
   134  	}
   135  
   136  	csvlogRegex, err = regexp.Compile(cmp.Or(mdb.HostConfig.LogsMatchRegex, CSVLogDefaultRegEx))
   137  	if err != nil {
   138  		logger.WithError(err).Print("Invalid regex: ", logsMatchRegex)
   139  		return
   140  	}
   141  	logger.Debugf("Changing logs parsing regex to: %s", logsMatchRegex)
   142  
   143  	if mdb.HostConfig.LogsGlobPath != "" {
   144  		logsGlobPath = mdb.HostConfig.LogsGlobPath
   145  	} else {
   146  		if logsGlobPath, err = tryDetermineLogFolder(ctx, mdb.Conn); err != nil {
   147  			logger.WithError(err).Print("Could not determine Postgres logs parsing folder in ", logsGlobPath)
   148  			return
   149  		}
   150  	}
   151  	logger.Debugf("Considering log files determined by glob pattern: %s", logsGlobPath)
   152  
   153  	if serverMessagesLang, err = tryDetermineLogMessagesLanguage(ctx, mdb.Conn); err != nil {
   154  		logger.WithError(err).Print("Could not determine language (lc_collate) used for server logs")
   155  		return
   156  	}
   157  
   158  	for { // re-try loop. re-start in case of FS errors or just to refresh host config
   159  		select {
   160  		case <-ctx.Done():
   161  			return
   162  		case <-time.After(currInterval):
   163  			if currInterval == 0 {
   164  				currInterval = time.Second * time.Duration(interval)
   165  			}
   166  		}
   167  
   168  		if latest == "" || firstRun {
   169  			globMatches, err := filepath.Glob(logsGlobPath)
   170  			if err != nil || len(globMatches) == 0 {
   171  				logger.Infof("No logfiles found to parse from glob '%s'", logsGlobPath)
   172  				continue
   173  			}
   174  
   175  			logger.Debugf("Found %v logfiles from glob pattern, picking the latest", len(globMatches))
   176  			if len(globMatches) > 1 {
   177  				// find latest timestamp
   178  				latest, _ = getFileWithLatestTimestamp(globMatches)
   179  				if latest == "" {
   180  					logger.Warningf("Could not determine the latest logfile")
   181  					continue
   182  				}
   183  
   184  			} else if len(globMatches) == 1 {
   185  				latest = globMatches[0]
   186  			}
   187  			logger.Infof("Starting to parse logfile: %s ", latest)
   188  		}
   189  
   190  		if latestHandle == nil {
   191  			latestHandle, err = os.Open(latest)
   192  			if err != nil {
   193  				logger.Warningf("Failed to open logfile %s: %s", latest, err)
   194  				continue
   195  			}
   196  			defer latestHandle.Close()
   197  			reader = bufio.NewReader(latestHandle)
   198  			if previous == latest && linesRead > 0 { // handle postmaster restarts
   199  				i := 1
   200  				for i <= linesRead {
   201  					_, err = reader.ReadString('\n')
   202  					if err == io.EOF && i < linesRead {
   203  						logger.Warningf("Failed to open logfile %s: %s", latest, err)
   204  						linesRead = 0
   205  						break
   206  					} else if err != nil {
   207  						logger.Warningf("Failed to skip %d logfile lines for %s, there might be duplicates reported. Error: %s", linesRead, latest, err)
   208  						linesRead = i
   209  						break
   210  					}
   211  					i++
   212  				}
   213  				logger.Debugf("Skipped %d already processed lines from %s", linesRead, latest)
   214  			} else if firstRun { // seek to end
   215  				_, _ = latestHandle.Seek(0, 2)
   216  				firstRun = false
   217  			}
   218  		}
   219  
   220  		for {
   221  			line, err := reader.ReadString('\n')
   222  			if err != nil && err != io.EOF {
   223  				logger.Warningf("Failed to read logfile %s: %s", latest, err)
   224  				_ = latestHandle.Close()
   225  				latestHandle = nil
   226  				break
   227  			}
   228  
   229  			if err == io.EOF {
   230  				// // EOF reached, wait for new files to be added
   231  				select {
   232  				case <-ctx.Done():
   233  					return
   234  				case <-time.After(currInterval):
   235  				}
   236  				// check for newly opened logfiles
   237  				file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
   238  				if file != "" {
   239  					previous = latest
   240  					latest = file
   241  					_ = latestHandle.Close()
   242  					latestHandle = nil
   243  					logger.Infof("Switching to new logfile: %s", file)
   244  					linesRead = 0
   245  					break
   246  				}
   247  			} else {
   248  				linesRead++
   249  			}
   250  
   251  			if err == nil && line != "" {
   252  				matches := csvlogRegex.FindStringSubmatch(line)
   253  				if len(matches) == 0 {
   254  					goto send_to_storage_if_needed
   255  				}
   256  				result := regexMatchesToMap(csvlogRegex, matches)
   257  				errorSeverity, ok := result["error_severity"]
   258  				if !ok {
   259  					logger.Error("error_severity group must be defined in parse regex:", csvlogRegex)
   260  					time.Sleep(time.Minute)
   261  					break
   262  				}
   263  				if serverMessagesLang != "en" {
   264  					errorSeverity = severityToEnglish(serverMessagesLang, errorSeverity)
   265  				}
   266  
   267  				databaseName, ok := result["database_name"]
   268  				if !ok {
   269  					logger.Error("database_name group must be defined in parse regex:", csvlogRegex)
   270  					time.Sleep(time.Minute)
   271  					break
   272  				}
   273  				if realDbname == databaseName {
   274  					eventCounts[errorSeverity]++
   275  				}
   276  				eventCountsTotal[errorSeverity]++
   277  			}
   278  
   279  		send_to_storage_if_needed:
   280  			if lastSendTime.IsZero() || lastSendTime.Before(time.Now().Add(-time.Second*time.Duration(interval))) {
   281  				logger.Debugf("Sending log event counts for last interval to storage channel. Local eventcounts: %+v, global eventcounts: %+v", eventCounts, eventCountsTotal)
   282  				select {
   283  				case <-ctx.Done():
   284  					return
   285  				case storeCh <- eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb):
   286  					zeroEventCounts(eventCounts)
   287  					zeroEventCounts(eventCountsTotal)
   288  					lastSendTime = time.Now()
   289  				}
   290  			}
   291  
   292  		} // file read loop
   293  	} // config loop
   294  
   295  }
   296  
   297  func severityToEnglish(serverLang, errorSeverity string) string {
   298  	//log.Debug("severityToEnglish", serverLang, errorSeverity)
   299  	if serverLang == "en" {
   300  		return errorSeverity
   301  	}
   302  	severityMap := PgSeveritiesLocale[serverLang]
   303  	severityEn, ok := severityMap[errorSeverity]
   304  	if !ok {
   305  		return errorSeverity
   306  	}
   307  	return severityEn
   308  }
   309  
   310  func zeroEventCounts(eventCounts map[string]int64) {
   311  	for _, severity := range PgSeverities {
   312  		eventCounts[severity] = 0
   313  	}
   314  }
   315  
   316  func tryDetermineLogFolder(ctx context.Context, conn db.PgxIface) (string, error) {
   317  	sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld`
   318  	var dd, ld string
   319  	err := conn.QueryRow(ctx, sql).Scan(&dd, &ld)
   320  	if err != nil {
   321  		return "", err
   322  	}
   323  	if strings.HasPrefix(ld, "/") {
   324  		// we have a full path we can use
   325  		return path.Join(ld, CSVLogDefaultGlobSuffix), nil
   326  	}
   327  	return path.Join(dd, ld, CSVLogDefaultGlobSuffix), nil
   328  }
   329  
   330  func tryDetermineLogMessagesLanguage(ctx context.Context, conn db.PgxIface) (string, error) {
   331  	sql := `select current_setting('lc_messages')::varchar(2) as lc_messages;`
   332  	var lc string
   333  	err := conn.QueryRow(ctx, sql).Scan(&lc)
   334  	if err != nil {
   335  		return "", err
   336  	}
   337  	if _, ok := PgSeveritiesLocale[lc]; !ok {
   338  		return "en", nil
   339  	}
   340  	return lc, nil
   341  }
   342  
   343  func regexMatchesToMap(csvlogRegex *regexp.Regexp, matches []string) map[string]string {
   344  	result := make(map[string]string)
   345  	if len(matches) == 0 || csvlogRegex == nil {
   346  		return result
   347  	}
   348  	for i, name := range csvlogRegex.SubexpNames() {
   349  		if i != 0 && name != "" {
   350  			result[name] = matches[i]
   351  		}
   352  	}
   353  	return result
   354  }
   355