1 package reaper
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "os"
8 "path/filepath"
9 "regexp"
10 "strings"
11 "time"
12
13 "github.com/cybertec-postgresql/pgwatch/v5/internal/db"
14 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
15 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
16 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
17 "github.com/jackc/pgx/v5"
18 )
19
20
21 var pgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
22 var pgSeveritiesLocale = map[string]map[string]string{
23 "C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
24 "de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
25 "fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
26 "it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
27 "ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
28 "pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
29 "ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
30 "sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
31 "tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
32 "zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
33 }
34
35 const csvLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
36 const csvLogDefaultGlobSuffix = "*.csv"
37
38 const maxChunkSize uint64 = 10 * 1024 * 1024
39 const maxTrackedFiles = 2500
40
41 type LogParser struct {
42 *LogConfig
43 ctx context.Context
44 LogsMatchRegex *regexp.Regexp
45 SourceConn *sources.SourceConn
46 Interval time.Duration
47 StoreCh chan<- metrics.MeasurementEnvelope
48 eventCounts map[string]int64
49 eventCountsTotal map[string]int64
50 lastSendTime time.Time
51 fileOffsets map[string]uint64
52 }
53
54 type LogConfig struct {
55 CollectorEnabled bool
56 CSVDestination bool
57 TruncateOnRotation bool
58 Directory string
59 ServerMessagesLang string
60 }
61
62 func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (lp *LogParser, err error) {
63
64 logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
65 ctx = log.WithLogger(ctx, logger)
66
67 logsRegex := regexp.MustCompile(csvLogDefaultRegEx)
68
69 logger.Debugf("Using %s as log parsing regex", logsRegex)
70
71 var cfg *LogConfig
72 if cfg, err = tryDetermineLogSettings(ctx, mdb.Conn); err != nil {
73 return nil, fmt.Errorf("could not determine Postgres logs settings: %w", err)
74 }
75
76 if !cfg.CollectorEnabled {
77 return nil, errors.New("logging_collector is not enabled on the db server")
78 }
79
80 if !cfg.CSVDestination {
81 return nil, errors.New("log_destination must contain 'csvlog' for log parsing to work")
82 }
83
84 logger.Debugf("Considering log files in folder: %s", cfg.Directory)
85
86 return &LogParser{
87 ctx: ctx,
88 LogsMatchRegex: logsRegex,
89 SourceConn: mdb,
90 Interval: mdb.GetMetricInterval(specialMetricServerLogEventCounts),
91 StoreCh: storeCh,
92 LogConfig: cfg,
93 eventCounts: make(map[string]int64),
94 eventCountsTotal: make(map[string]int64),
95 fileOffsets: make(map[string]uint64),
96 }, nil
97 }
98
99 func (lp *LogParser) HasSendIntervalElapsed() bool {
100 return lp.lastSendTime.IsZero() || lp.lastSendTime.Before(time.Now().Add(-lp.Interval))
101 }
102
103 func (lp *LogParser) ParseLogs() error {
104 l := log.GetLogger(lp.ctx)
105 if ok, err := db.IsClientOnSameHost(lp.SourceConn.Conn); ok && err == nil {
106 l.Info("DB is on the same host, parsing logs locally")
107 if err = checkHasLocalPrivileges(lp.Directory); err == nil {
108 return lp.parseLogsLocal()
109 }
110 l.WithError(err).Error("Couldn't parse logs locally, lacking required privileges")
111 }
112
113 l.Info("DB is not detected to be on the same host, parsing logs remotely")
114 if err := checkHasRemotePrivileges(lp.ctx, lp.SourceConn, lp.Directory); err != nil {
115 l.WithError(err).Error("couldn't parse logs remotely, lacking required privileges")
116 return err
117 }
118 return lp.parseLogsRemote()
119 }
120
121 func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (cfg *LogConfig, err error) {
122 sql := `select
123 current_setting('logging_collector') = 'on' as is_enabled,
124 strpos(current_setting('log_destination'), 'csvlog') > 0 as csvlog_dest,
125 current_setting('log_truncate_on_rotation') = 'on' as log_trunc,
126 case
127 when current_setting('log_directory') ~ '^(\w:)?\/.+' then current_setting('log_directory')
128 else current_setting('data_directory') || '/' || current_setting('log_directory')
129 end as log_dir,
130 current_setting('lc_messages')::varchar(2) as lc_messages`
131 var res pgx.Rows
132 if res, err = conn.Query(ctx, sql); err == nil {
133 if cfg, err = pgx.CollectOneRow(res, pgx.RowToAddrOfStructByPos[LogConfig]); err == nil {
134 if _, ok := pgSeveritiesLocale[cfg.ServerMessagesLang]; !ok {
135 cfg.ServerMessagesLang = "en"
136 }
137 return cfg, nil
138 }
139 }
140 return nil, err
141 }
142
143 func checkHasRemotePrivileges(ctx context.Context, mdb *sources.SourceConn, logsDirPath string) error {
144 var logFile string
145 err := mdb.Conn.QueryRow(ctx, "select name from pg_ls_logdir() limit 1").Scan(&logFile)
146 if err != nil && err != pgx.ErrNoRows {
147 return err
148 }
149
150 var dummy string
151 err = mdb.Conn.QueryRow(ctx, "select pg_read_file($1, 0, 0)", filepath.Join(logsDirPath, logFile)).Scan(&dummy)
152 return err
153 }
154
155 func checkHasLocalPrivileges(logsDirPath string) error {
156 _, err := os.ReadDir(logsDirPath)
157 if err != nil {
158 return err
159 }
160 return nil
161 }
162
163 func severityToEnglish(serverLang, errorSeverity string) string {
164 if serverLang == "en" {
165 return errorSeverity
166 }
167 severityMap := pgSeveritiesLocale[serverLang]
168 severityEn, ok := severityMap[errorSeverity]
169 if !ok {
170 return errorSeverity
171 }
172 return severityEn
173 }
174
175 func (lp *LogParser) regexMatchesToMap(matches []string) map[string]string {
176 result := make(map[string]string)
177 if len(matches) == 0 || lp.LogsMatchRegex == nil {
178 return result
179 }
180 for i, name := range lp.LogsMatchRegex.SubexpNames() {
181 if i != 0 && name != "" {
182 result[name] = matches[i]
183 }
184 }
185 return result
186 }
187
188
189 func (lp *LogParser) GetMeasurementEnvelope() metrics.MeasurementEnvelope {
190 allSeverityCounts := metrics.NewMeasurement(time.Now().UnixNano())
191 for _, s := range pgSeverities {
192 parsedCount, ok := lp.eventCounts[s]
193 if ok {
194 allSeverityCounts[strings.ToLower(s)] = parsedCount
195 } else {
196 allSeverityCounts[strings.ToLower(s)] = int64(0)
197 }
198 parsedCount, ok = lp.eventCountsTotal[s]
199 if ok {
200 allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
201 } else {
202 allSeverityCounts[strings.ToLower(s)+"_total"] = int64(0)
203 }
204 }
205 return metrics.MeasurementEnvelope{
206 DBName: lp.SourceConn.Name,
207 MetricName: specialMetricServerLogEventCounts,
208 Data: metrics.Measurements{allSeverityCounts},
209 CustomTags: lp.SourceConn.CustomTags,
210 }
211 }
212
213 func zeroEventCounts(eventCounts map[string]int64) {
214 for _, severity := range pgSeverities {
215 eventCounts[severity] = 0
216 }
217 }
218