...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/metrics/logparse.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/metrics

     1  package metrics
     2  
     3  import (
     4  	"bufio"
     5  	"cmp"
     6  	"context"
     7  	"io"
     8  	"os"
     9  	"path"
    10  	"path/filepath"
    11  	"regexp"
    12  	"strings"
    13  	"time"
    14  
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    17  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    18  )
    19  
    20  const specialMetricServerLogEventCounts = "server_log_event_counts"
    21  
    22  var PgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
    23  var PgSeveritiesLocale = map[string]map[string]string{
    24  	"C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
    25  	"de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    26  	"fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    27  	"it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
    28  	"ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
    29  	"pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
    30  	"ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
    31  	"sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
    32  	"tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
    33  	"zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
    34  }
    35  
    36  const CSVLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
    37  const CSVLogDefaultGlobSuffix = "*.csv"
    38  
    39  func getFileWithLatestTimestamp(files []string) (string, error) {
    40  	var maxDate time.Time
    41  	var latest string
    42  
    43  	for _, f := range files {
    44  		fi, err := os.Stat(f)
    45  		if err != nil {
    46  			return "", err
    47  		}
    48  		if fi.ModTime().After(maxDate) {
    49  			latest = f
    50  			maxDate = fi.ModTime()
    51  		}
    52  	}
    53  	return latest, nil
    54  }
    55  
    56  func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error) {
    57  	var nextFile string
    58  	var nextMod time.Time
    59  
    60  	files, err := filepath.Glob(logsGlobPath)
    61  	if err != nil {
    62  		return "", err
    63  	}
    64  
    65  	fiCurrent, err := os.Stat(currentFile)
    66  	if err != nil {
    67  		return "", err
    68  	}
    69  
    70  	for _, f := range files {
    71  		if f == currentFile {
    72  			continue
    73  		}
    74  		fi, err := os.Stat(f)
    75  		if err != nil {
    76  			continue
    77  		}
    78  		if (nextMod.IsZero() || fi.ModTime().Before(nextMod)) && fi.ModTime().After(fiCurrent.ModTime()) {
    79  			nextMod = fi.ModTime()
    80  			nextFile = f
    81  		}
    82  	}
    83  	return nextFile, nil
    84  }
    85  
    86  // 1. add zero counts for severity levels that didn't have any occurrences in the log
    87  func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.SourceConn) MeasurementEnvelope {
    88  	allSeverityCounts := NewMeasurement(time.Now().UnixNano())
    89  	for _, s := range PgSeverities {
    90  		parsedCount, ok := eventCounts[s]
    91  		if ok {
    92  			allSeverityCounts[strings.ToLower(s)] = parsedCount
    93  		} else {
    94  			allSeverityCounts[strings.ToLower(s)] = int64(0)
    95  		}
    96  		parsedCount, ok = eventCountsTotal[s]
    97  		if ok {
    98  			allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
    99  		} else {
   100  			allSeverityCounts[strings.ToLower(s)+"_total"] = int64(0)
   101  		}
   102  	}
   103  	return MeasurementEnvelope{
   104  		DBName:     mdb.Name,
   105  		MetricName: specialMetricServerLogEventCounts,
   106  		Data:       Measurements{allSeverityCounts},
   107  		CustomTags: mdb.CustomTags,
   108  	}
   109  }
   110  
   111  func ParseLogs(ctx context.Context,
   112  	mdb *sources.SourceConn,
   113  	realDbname string,
   114  	interval float64,
   115  	storeCh chan<- MeasurementEnvelope,
   116  	LogsMatchRegex string,
   117  	LogsGlobPath string) {
   118  
   119  	var latest, previous, serverMessagesLang string
   120  	var latestHandle *os.File
   121  	var reader *bufio.Reader
   122  	var linesRead int // to skip over already parsed lines on Postgres server restart for example
   123  	var logsMatchRegex, logsGlobPath string
   124  	var lastSendTime time.Time                    // to storage channel
   125  	var eventCounts = make(map[string]int64)      // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
   126  	var eventCountsTotal = make(map[string]int64) // for the whole instance
   127  	var err error
   128  	var firstRun = true
   129  	var csvlogRegex *regexp.Regexp
   130  	var currInterval time.Duration
   131  
   132  	logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
   133  
   134  	if ok, err := db.IsClientOnSameHost(mdb.Conn); !ok || err != nil {
   135  		if err != nil {
   136  			logger = logger.WithError(err)
   137  		}
   138  		logger.Warning("Cannot parse logs, client is not on the same host as the Postgres server")
   139  		return
   140  	}
   141  
   142  	csvlogRegex, err = regexp.Compile(cmp.Or(LogsMatchRegex, CSVLogDefaultRegEx))
   143  	if err != nil {
   144  		logger.WithError(err).Print("Invalid regex: ", logsMatchRegex)
   145  		return
   146  	}
   147  	logger.Debugf("Changing logs parsing regex to: %s", logsMatchRegex)
   148  
   149  	if LogsGlobPath != "" {
   150  		logsGlobPath = LogsGlobPath
   151  	} else {
   152  		if logsGlobPath, err = tryDetermineLogFolder(ctx, mdb.Conn); err != nil {
   153  			logger.WithError(err).Print("Could not determine Postgres logs parsing folder in ", logsGlobPath)
   154  			return
   155  		}
   156  	}
   157  	logger.Debugf("Considering log files determined by glob pattern: %s", logsGlobPath)
   158  
   159  	if serverMessagesLang, err = tryDetermineLogMessagesLanguage(ctx, mdb.Conn); err != nil {
   160  		logger.WithError(err).Print("Could not determine language (lc_collate) used for server logs")
   161  		return
   162  	}
   163  
   164  	for { // re-try loop. re-start in case of FS errors or just to refresh host config
   165  		select {
   166  		case <-ctx.Done():
   167  			return
   168  		case <-time.After(currInterval):
   169  			if currInterval == 0 {
   170  				currInterval = time.Second * time.Duration(interval)
   171  			}
   172  		}
   173  
   174  		if latest == "" || firstRun {
   175  			globMatches, err := filepath.Glob(logsGlobPath)
   176  			if err != nil || len(globMatches) == 0 {
   177  				logger.Infof("No logfiles found to parse from glob '%s'", logsGlobPath)
   178  				continue
   179  			}
   180  
   181  			logger.Debugf("Found %v logfiles from glob pattern, picking the latest", len(globMatches))
   182  			if len(globMatches) > 1 {
   183  				// find latest timestamp
   184  				latest, _ = getFileWithLatestTimestamp(globMatches)
   185  				if latest == "" {
   186  					logger.Warningf("Could not determine the latest logfile")
   187  					continue
   188  				}
   189  
   190  			} else if len(globMatches) == 1 {
   191  				latest = globMatches[0]
   192  			}
   193  			logger.Infof("Starting to parse logfile: %s ", latest)
   194  		}
   195  
   196  		if latestHandle == nil {
   197  			latestHandle, err = os.Open(latest)
   198  			if err != nil {
   199  				logger.Warningf("Failed to open logfile %s: %s", latest, err)
   200  				continue
   201  			}
   202  			defer latestHandle.Close()
   203  			reader = bufio.NewReader(latestHandle)
   204  			if previous == latest && linesRead > 0 { // handle postmaster restarts
   205  				i := 1
   206  				for i <= linesRead {
   207  					_, err = reader.ReadString('\n')
   208  					if err == io.EOF && i < linesRead {
   209  						logger.Warningf("Failed to open logfile %s: %s", latest, err)
   210  						linesRead = 0
   211  						break
   212  					} else if err != nil {
   213  						logger.Warningf("Failed to skip %d logfile lines for %s, there might be duplicates reported. Error: %s", linesRead, latest, err)
   214  						linesRead = i
   215  						break
   216  					}
   217  					i++
   218  				}
   219  				logger.Debugf("Skipped %d already processed lines from %s", linesRead, latest)
   220  			} else if firstRun { // seek to end
   221  				_, _ = latestHandle.Seek(0, 2)
   222  				firstRun = false
   223  			}
   224  		}
   225  
   226  		for {
   227  			line, err := reader.ReadString('\n')
   228  			if err != nil && err != io.EOF {
   229  				logger.Warningf("Failed to read logfile %s: %s", latest, err)
   230  				_ = latestHandle.Close()
   231  				latestHandle = nil
   232  				break
   233  			}
   234  
   235  			if err == io.EOF {
   236  				// // EOF reached, wait for new files to be added
   237  				select {
   238  				case <-ctx.Done():
   239  					return
   240  				case <-time.After(currInterval):
   241  				}
   242  				// check for newly opened logfiles
   243  				file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
   244  				if file != "" {
   245  					previous = latest
   246  					latest = file
   247  					_ = latestHandle.Close()
   248  					latestHandle = nil
   249  					logger.Infof("Switching to new logfile: %s", file)
   250  					linesRead = 0
   251  					break
   252  				}
   253  			} else {
   254  				linesRead++
   255  			}
   256  
   257  			if err == nil && line != "" {
   258  				matches := csvlogRegex.FindStringSubmatch(line)
   259  				if len(matches) == 0 {
   260  					goto send_to_storage_if_needed
   261  				}
   262  				result := regexMatchesToMap(csvlogRegex, matches)
   263  				errorSeverity, ok := result["error_severity"]
   264  				if !ok {
   265  					logger.Error("error_severity group must be defined in parse regex:", csvlogRegex)
   266  					time.Sleep(time.Minute)
   267  					break
   268  				}
   269  				if serverMessagesLang != "en" {
   270  					errorSeverity = severityToEnglish(serverMessagesLang, errorSeverity)
   271  				}
   272  
   273  				databaseName, ok := result["database_name"]
   274  				if !ok {
   275  					logger.Error("database_name group must be defined in parse regex:", csvlogRegex)
   276  					time.Sleep(time.Minute)
   277  					break
   278  				}
   279  				if realDbname == databaseName {
   280  					eventCounts[errorSeverity]++
   281  				}
   282  				eventCountsTotal[errorSeverity]++
   283  			}
   284  
   285  		send_to_storage_if_needed:
   286  			if lastSendTime.IsZero() || lastSendTime.Before(time.Now().Add(-time.Second*time.Duration(interval))) {
   287  				logger.Debugf("Sending log event counts for last interval to storage channel. Local eventcounts: %+v, global eventcounts: %+v", eventCounts, eventCountsTotal)
   288  				select {
   289  				case <-ctx.Done():
   290  					return
   291  				case storeCh <- eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb):
   292  					zeroEventCounts(eventCounts)
   293  					zeroEventCounts(eventCountsTotal)
   294  					lastSendTime = time.Now()
   295  				}
   296  			}
   297  
   298  		} // file read loop
   299  	} // config loop
   300  
   301  }
   302  
   303  func severityToEnglish(serverLang, errorSeverity string) string {
   304  	//log.Debug("severityToEnglish", serverLang, errorSeverity)
   305  	if serverLang == "en" {
   306  		return errorSeverity
   307  	}
   308  	severityMap := PgSeveritiesLocale[serverLang]
   309  	severityEn, ok := severityMap[errorSeverity]
   310  	if !ok {
   311  		return errorSeverity
   312  	}
   313  	return severityEn
   314  }
   315  
   316  func zeroEventCounts(eventCounts map[string]int64) {
   317  	for _, severity := range PgSeverities {
   318  		eventCounts[severity] = 0
   319  	}
   320  }
   321  
   322  func tryDetermineLogFolder(ctx context.Context, conn db.PgxIface) (string, error) {
   323  	sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld`
   324  	var dd, ld string
   325  	err := conn.QueryRow(ctx, sql).Scan(&dd, &ld)
   326  	if err != nil {
   327  		return "", err
   328  	}
   329  	if strings.HasPrefix(ld, "/") {
   330  		// we have a full path we can use
   331  		return path.Join(ld, CSVLogDefaultGlobSuffix), nil
   332  	}
   333  	return path.Join(dd, ld, CSVLogDefaultGlobSuffix), nil
   334  }
   335  
   336  func tryDetermineLogMessagesLanguage(ctx context.Context, conn db.PgxIface) (string, error) {
   337  	sql := `select current_setting('lc_messages')::varchar(2) as lc_messages;`
   338  	var lc string
   339  	err := conn.QueryRow(ctx, sql).Scan(&lc)
   340  	if err != nil {
   341  		return "", err
   342  	}
   343  	if _, ok := PgSeveritiesLocale[lc]; !ok {
   344  		return "en", nil
   345  	}
   346  	return lc, nil
   347  }
   348  
   349  func regexMatchesToMap(csvlogRegex *regexp.Regexp, matches []string) map[string]string {
   350  	result := make(map[string]string)
   351  	if len(matches) == 0 || csvlogRegex == nil {
   352  		return result
   353  	}
   354  	for i, name := range csvlogRegex.SubexpNames() {
   355  		if i != 0 && name != "" {
   356  			result[name] = matches[i]
   357  		}
   358  	}
   359  	return result
   360  }
   361