...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/logparser.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"os"
     8  	"path/filepath"
     9  	"regexp"
    10  	"strings"
    11  	"time"
    12  
    13  	"github.com/cybertec-postgresql/pgwatch/v5/internal/db"
    14  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    15  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    16  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    17  	"github.com/jackc/pgx/v5"
    18  )
    19  
    20  // Constants and types
    21  var pgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
    22  var pgSeveritiesLocale = map[string]map[string]string{
    23  	"C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
    24  	"de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    25  	"fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    26  	"it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
    27  	"ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
    28  	"pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
    29  	"ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
    30  	"sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
    31  	"tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
    32  	"zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
    33  }
    34  
    35  const csvLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
    36  const csvLogDefaultGlobSuffix = "*.csv"
    37  
    38  const maxChunkSize uint64 = 10 * 1024 * 1024 // 10 MB
    39  const maxTrackedFiles = 2500
    40  
    41  type LogParser struct {
    42  	*LogConfig
    43  	ctx              context.Context
    44  	LogsMatchRegex   *regexp.Regexp
    45  	SourceConn       *sources.SourceConn
    46  	Interval         time.Duration
    47  	StoreCh          chan<- metrics.MeasurementEnvelope
    48  	eventCounts      map[string]int64 // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
    49  	eventCountsTotal map[string]int64 // for the whole instance
    50  	lastSendTime     time.Time
    51  	fileOffsets      map[string]uint64 // map of log file paths to last read offsets
    52  }
    53  
    54  type LogConfig struct {
    55  	CollectorEnabled   bool
    56  	CSVDestination     bool
    57  	TruncateOnRotation bool
    58  	Directory          string
    59  	ServerMessagesLang string
    60  }
    61  
    62  func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (lp *LogParser, err error) {
    63  
    64  	logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
    65  	ctx = log.WithLogger(ctx, logger)
    66  
    67  	logsRegex := regexp.MustCompile(csvLogDefaultRegEx)
    68  
    69  	logger.Debugf("Using %s as log parsing regex", logsRegex)
    70  
    71  	var cfg *LogConfig
    72  	if cfg, err = tryDetermineLogSettings(ctx, mdb.Conn); err != nil {
    73  		return nil, fmt.Errorf("could not determine Postgres logs settings: %w", err)
    74  	}
    75  
    76  	if !cfg.CollectorEnabled {
    77  		return nil, errors.New("logging_collector is not enabled on the db server")
    78  	}
    79  
    80  	if !cfg.CSVDestination {
    81  		return nil, errors.New("log_destination must contain 'csvlog' for log parsing to work")
    82  	}
    83  
    84  	logger.Debugf("Considering log files in folder: %s", cfg.Directory)
    85  
    86  	return &LogParser{
    87  		ctx:              ctx,
    88  		LogsMatchRegex:   logsRegex,
    89  		SourceConn:       mdb,
    90  		Interval:         mdb.GetMetricInterval(specialMetricServerLogEventCounts),
    91  		StoreCh:          storeCh,
    92  		LogConfig:        cfg,
    93  		eventCounts:      make(map[string]int64),
    94  		eventCountsTotal: make(map[string]int64),
    95  		fileOffsets:      make(map[string]uint64),
    96  	}, nil
    97  }
    98  
    99  func (lp *LogParser) HasSendIntervalElapsed() bool {
   100  	return lp.lastSendTime.IsZero() || lp.lastSendTime.Before(time.Now().Add(-lp.Interval))
   101  }
   102  
   103  func (lp *LogParser) ParseLogs() error {
   104  	l := log.GetLogger(lp.ctx)
   105  	if ok, err := db.IsClientOnSameHost(lp.SourceConn.Conn); ok && err == nil {
   106  		l.Info("DB is on the same host, parsing logs locally")
   107  		if err = checkHasLocalPrivileges(lp.Directory); err == nil {
   108  			return lp.parseLogsLocal()
   109  		}
   110  		l.WithError(err).Error("Couldn't parse logs locally, lacking required privileges")
   111  	}
   112  
   113  	l.Info("DB is not detected to be on the same host, parsing logs remotely")
   114  	if err := checkHasRemotePrivileges(lp.ctx, lp.SourceConn, lp.Directory); err != nil {
   115  		l.WithError(err).Error("couldn't parse logs remotely, lacking required privileges")
   116  		return err
   117  	}
   118  	return lp.parseLogsRemote()
   119  }
   120  
   121  func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (cfg *LogConfig, err error) {
   122  	sql := `select 
   123  	current_setting('logging_collector') = 'on' as is_enabled,
   124  	strpos(current_setting('log_destination'), 'csvlog') > 0 as csvlog_dest,
   125  	current_setting('log_truncate_on_rotation') = 'on' as log_trunc,
   126  	case 
   127  		when current_setting('log_directory') ~ '^(\w:)?\/.+' then current_setting('log_directory') 
   128  		else current_setting('data_directory') || '/' || current_setting('log_directory') 
   129  	end as log_dir,
   130  	current_setting('lc_messages')::varchar(2) as lc_messages`
   131  	var res pgx.Rows
   132  	if res, err = conn.Query(ctx, sql); err == nil {
   133  		if cfg, err = pgx.CollectOneRow(res, pgx.RowToAddrOfStructByPos[LogConfig]); err == nil {
   134  			if _, ok := pgSeveritiesLocale[cfg.ServerMessagesLang]; !ok {
   135  				cfg.ServerMessagesLang = "en"
   136  			}
   137  			return cfg, nil
   138  		}
   139  	}
   140  	return nil, err
   141  }
   142  
   143  func checkHasRemotePrivileges(ctx context.Context, mdb *sources.SourceConn, logsDirPath string) error {
   144  	var logFile string
   145  	err := mdb.Conn.QueryRow(ctx, "select name from pg_ls_logdir() limit 1").Scan(&logFile)
   146  	if err != nil && err != pgx.ErrNoRows {
   147  		return err
   148  	}
   149  
   150  	var dummy string
   151  	err = mdb.Conn.QueryRow(ctx, "select pg_read_file($1, 0, 0)", filepath.Join(logsDirPath, logFile)).Scan(&dummy)
   152  	return err
   153  }
   154  
   155  func checkHasLocalPrivileges(logsDirPath string) error {
   156  	_, err := os.ReadDir(logsDirPath)
   157  	if err != nil {
   158  		return err
   159  	}
   160  	return nil
   161  }
   162  
   163  func severityToEnglish(serverLang, errorSeverity string) string {
   164  	if serverLang == "en" {
   165  		return errorSeverity
   166  	}
   167  	severityMap := pgSeveritiesLocale[serverLang]
   168  	severityEn, ok := severityMap[errorSeverity]
   169  	if !ok {
   170  		return errorSeverity
   171  	}
   172  	return severityEn
   173  }
   174  
   175  func (lp *LogParser) regexMatchesToMap(matches []string) map[string]string {
   176  	result := make(map[string]string)
   177  	if len(matches) == 0 || lp.LogsMatchRegex == nil {
   178  		return result
   179  	}
   180  	for i, name := range lp.LogsMatchRegex.SubexpNames() {
   181  		if i != 0 && name != "" {
   182  			result[name] = matches[i]
   183  		}
   184  	}
   185  	return result
   186  }
   187  
   188  // GetMeasurementEnvelope converts current event counts to a MeasurementEnvelope
   189  func (lp *LogParser) GetMeasurementEnvelope() metrics.MeasurementEnvelope {
   190  	allSeverityCounts := metrics.NewMeasurement(time.Now().UnixNano())
   191  	for _, s := range pgSeverities {
   192  		parsedCount, ok := lp.eventCounts[s]
   193  		if ok {
   194  			allSeverityCounts[strings.ToLower(s)] = parsedCount
   195  		} else {
   196  			allSeverityCounts[strings.ToLower(s)] = int64(0)
   197  		}
   198  		parsedCount, ok = lp.eventCountsTotal[s]
   199  		if ok {
   200  			allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
   201  		} else {
   202  			allSeverityCounts[strings.ToLower(s)+"_total"] = int64(0)
   203  		}
   204  	}
   205  	return metrics.MeasurementEnvelope{
   206  		DBName:     lp.SourceConn.Name,
   207  		MetricName: specialMetricServerLogEventCounts,
   208  		Data:       metrics.Measurements{allSeverityCounts},
   209  		CustomTags: lp.SourceConn.CustomTags,
   210  	}
   211  }
   212  
   213  func zeroEventCounts(eventCounts map[string]int64) {
   214  	for _, severity := range pgSeverities {
   215  		eventCounts[severity] = 0
   216  	}
   217  }
   218