...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/logparser.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"os"
     6  	"path"
     7  	"path/filepath"
     8  	"regexp"
     9  	"strings"
    10  	"time"
    11  
    12  	"github.com/cybertec-postgresql/pgwatch/v5/internal/db"
    13  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    14  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    15  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    16  	"github.com/jackc/pgx/v5"
    17  )
    18  
    19  // Constants and types
    20  var pgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
    21  var pgSeveritiesLocale = map[string]map[string]string{
    22  	"C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
    23  	"de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    24  	"fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    25  	"it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
    26  	"ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
    27  	"pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
    28  	"ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
    29  	"sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
    30  	"tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
    31  	"zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
    32  }
    33  
    34  const csvLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
    35  const csvLogDefaultGlobSuffix = "*.csv"
    36  
    37  const maxChunkSize uint64 = 10 * 1024 * 1024 // 10 MB
    38  const maxTrackedFiles = 2500
    39  
    40  type LogParser struct {
    41  	ctx                context.Context
    42  	LogsMatchRegex     *regexp.Regexp
    43  	LogFolder          string
    44  	ServerMessagesLang string
    45  	LogTruncOnRotation string
    46  	SourceConn         *sources.SourceConn
    47  	Interval           float64
    48  	StoreCh            chan<- metrics.MeasurementEnvelope
    49  	eventCounts        map[string]int64 // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
    50  	eventCountsTotal   map[string]int64 // for the whole instance
    51  	lastSendTime       time.Time
    52  	fileOffsets        map[string]uint64 // map of log file paths to last read offsets
    53  }
    54  
    55  func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (*LogParser, error) {
    56  
    57  	logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
    58  	ctx = log.WithLogger(ctx, logger)
    59  
    60  	logsRegex, err := regexp.Compile(csvLogDefaultRegEx)
    61  	if err != nil {
    62  		logger.WithError(err).Error("Invalid log parsing regex")
    63  		return nil, err
    64  	}
    65  	logger.Debugf("Using %s as log parsing regex", logsRegex)
    66  
    67  	var logFolder, serverMessagesLang, logTrunc string
    68  	if logFolder, serverMessagesLang, logTrunc, err = tryDetermineLogSettings(ctx, mdb.Conn); err != nil {
    69  		logger.WithError(err).Error("Could not determine Postgres logs settings")
    70  		return nil, err
    71  	}
    72  	logger.Debugf("Considering log files in folder: %s", logFolder)
    73  
    74  	return &LogParser{
    75  		ctx:                ctx,
    76  		LogsMatchRegex:     logsRegex,
    77  		LogFolder:          logFolder,
    78  		ServerMessagesLang: serverMessagesLang,
    79  		LogTruncOnRotation: logTrunc,
    80  		SourceConn:         mdb,
    81  		Interval:           mdb.GetMetricInterval(specialMetricServerLogEventCounts),
    82  		StoreCh:            storeCh,
    83  		eventCounts:        make(map[string]int64),
    84  		eventCountsTotal:   make(map[string]int64),
    85  		fileOffsets:        make(map[string]uint64),
    86  	}, nil
    87  }
    88  
    89  func (lp *LogParser) HasSendIntervalElapsed() bool {
    90  	return lp.lastSendTime.IsZero() || lp.lastSendTime.Before(time.Now().Add(-time.Second*time.Duration(lp.Interval)))
    91  }
    92  
    93  func (lp *LogParser) ParseLogs() error {
    94  	l := log.GetLogger(lp.ctx)
    95  	if ok, err := db.IsClientOnSameHost(lp.SourceConn.Conn); ok && err == nil {
    96  		l.Info("DB is on the same host. parsing logs locally")
    97  		if err = checkHasLocalPrivileges(lp.LogFolder); err == nil {
    98  			return lp.parseLogsLocal()
    99  		}
   100  		l.WithError(err).Error("Could't parse logs locally. lacking required privileges")
   101  	}
   102  
   103  	l.Info("DB is not detected to be on the same host. parsing logs remotely")
   104  	if err := checkHasRemotePrivileges(lp.ctx, lp.SourceConn, lp.LogFolder); err != nil {
   105  		l.WithError(err).Error("Could't parse logs remotely. lacking required privileges")
   106  		return err
   107  	}
   108  	return lp.parseLogsRemote()
   109  }
   110  
   111  func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (string, string, string, error) {
   112  	sql := `select 
   113  			current_setting('data_directory') as dd, 
   114  			current_setting('log_directory') as ld,
   115  			current_setting('lc_messages')::varchar(2) as lc_messages,
   116  			current_setting('log_truncate_on_rotation') as log_trunc`
   117  	var dd, ld, lc, logTrunc string
   118  	err := conn.QueryRow(ctx, sql).Scan(&dd, &ld, &lc, &logTrunc)
   119  	if err != nil {
   120  		return "", "", "", err
   121  	}
   122  	if !strings.HasPrefix(ld, "/") {
   123  		ld = path.Join(dd, ld)
   124  	}
   125  	if _, ok := pgSeveritiesLocale[lc]; !ok {
   126  		lc = "en"
   127  	}
   128  	return ld, lc, logTrunc, nil
   129  }
   130  
   131  func checkHasRemotePrivileges(ctx context.Context, mdb *sources.SourceConn, logsDirPath string) error {
   132  	var logFile string
   133  	err := mdb.Conn.QueryRow(ctx, "select name from pg_ls_logdir() limit 1").Scan(&logFile)
   134  	if err != nil && err != pgx.ErrNoRows {
   135  		return err
   136  	}
   137  
   138  	var dummy string
   139  	err = mdb.Conn.QueryRow(ctx, "select pg_read_file($1, 0, 0)", filepath.Join(logsDirPath, logFile)).Scan(&dummy)
   140  	return err
   141  }
   142  
   143  func checkHasLocalPrivileges(logsDirPath string) error {
   144  	_, err := os.ReadDir(logsDirPath)
   145  	if err != nil {
   146  		return err
   147  	}
   148  	return nil
   149  }
   150  
   151  func severityToEnglish(serverLang, errorSeverity string) string {
   152  	if serverLang == "en" {
   153  		return errorSeverity
   154  	}
   155  	severityMap := pgSeveritiesLocale[serverLang]
   156  	severityEn, ok := severityMap[errorSeverity]
   157  	if !ok {
   158  		return errorSeverity
   159  	}
   160  	return severityEn
   161  }
   162  
   163  func (lp *LogParser) regexMatchesToMap(matches []string) map[string]string {
   164  	result := make(map[string]string)
   165  	if len(matches) == 0 || lp.LogsMatchRegex == nil {
   166  		return result
   167  	}
   168  	for i, name := range lp.LogsMatchRegex.SubexpNames() {
   169  		if i != 0 && name != "" {
   170  			result[name] = matches[i]
   171  		}
   172  	}
   173  	return result
   174  }
   175  
   176  // GetMeasurementEnvelope converts current event counts to a MeasurementEnvelope
   177  func (lp *LogParser) GetMeasurementEnvelope() metrics.MeasurementEnvelope {
   178  	allSeverityCounts := metrics.NewMeasurement(time.Now().UnixNano())
   179  	for _, s := range pgSeverities {
   180  		parsedCount, ok := lp.eventCounts[s]
   181  		if ok {
   182  			allSeverityCounts[strings.ToLower(s)] = parsedCount
   183  		} else {
   184  			allSeverityCounts[strings.ToLower(s)] = int64(0)
   185  		}
   186  		parsedCount, ok = lp.eventCountsTotal[s]
   187  		if ok {
   188  			allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
   189  		} else {
   190  			allSeverityCounts[strings.ToLower(s)+"_total"] = int64(0)
   191  		}
   192  	}
   193  	return metrics.MeasurementEnvelope{
   194  		DBName:     lp.SourceConn.Name,
   195  		MetricName: specialMetricServerLogEventCounts,
   196  		Data:       metrics.Measurements{allSeverityCounts},
   197  		CustomTags: lp.SourceConn.CustomTags,
   198  	}
   199  }
   200  
   201  func zeroEventCounts(eventCounts map[string]int64) {
   202  	for _, severity := range pgSeverities {
   203  		eventCounts[severity] = 0
   204  	}
   205  }
   206