1 package reaper
2
3 import (
4 "context"
5 "os"
6 "path"
7 "path/filepath"
8 "regexp"
9 "strings"
10 "time"
11
12 "github.com/cybertec-postgresql/pgwatch/v5/internal/db"
13 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
14 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
15 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
16 "github.com/jackc/pgx/v5"
17 )
18
19
20 var pgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
21 var pgSeveritiesLocale = map[string]map[string]string{
22 "C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
23 "de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
24 "fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
25 "it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
26 "ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
27 "pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
28 "ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
29 "sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
30 "tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
31 "zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
32 }
33
34 const csvLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
35 const csvLogDefaultGlobSuffix = "*.csv"
36
37 const maxChunkSize uint64 = 10 * 1024 * 1024
38 const maxTrackedFiles = 2500
39
40 type LogParser struct {
41 ctx context.Context
42 LogsMatchRegex *regexp.Regexp
43 LogFolder string
44 ServerMessagesLang string
45 LogTruncOnRotation string
46 SourceConn *sources.SourceConn
47 Interval float64
48 StoreCh chan<- metrics.MeasurementEnvelope
49 eventCounts map[string]int64
50 eventCountsTotal map[string]int64
51 lastSendTime time.Time
52 fileOffsets map[string]uint64
53 }
54
55 func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (*LogParser, error) {
56
57 logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
58 ctx = log.WithLogger(ctx, logger)
59
60 logsRegex, err := regexp.Compile(csvLogDefaultRegEx)
61 if err != nil {
62 logger.WithError(err).Error("Invalid log parsing regex")
63 return nil, err
64 }
65 logger.Debugf("Using %s as log parsing regex", logsRegex)
66
67 var logFolder, serverMessagesLang, logTrunc string
68 if logFolder, serverMessagesLang, logTrunc, err = tryDetermineLogSettings(ctx, mdb.Conn); err != nil {
69 logger.WithError(err).Error("Could not determine Postgres logs settings")
70 return nil, err
71 }
72 logger.Debugf("Considering log files in folder: %s", logFolder)
73
74 return &LogParser{
75 ctx: ctx,
76 LogsMatchRegex: logsRegex,
77 LogFolder: logFolder,
78 ServerMessagesLang: serverMessagesLang,
79 LogTruncOnRotation: logTrunc,
80 SourceConn: mdb,
81 Interval: mdb.GetMetricInterval(specialMetricServerLogEventCounts),
82 StoreCh: storeCh,
83 eventCounts: make(map[string]int64),
84 eventCountsTotal: make(map[string]int64),
85 fileOffsets: make(map[string]uint64),
86 }, nil
87 }
88
89 func (lp *LogParser) HasSendIntervalElapsed() bool {
90 return lp.lastSendTime.IsZero() || lp.lastSendTime.Before(time.Now().Add(-time.Second*time.Duration(lp.Interval)))
91 }
92
93 func (lp *LogParser) ParseLogs() error {
94 l := log.GetLogger(lp.ctx)
95 if ok, err := db.IsClientOnSameHost(lp.SourceConn.Conn); ok && err == nil {
96 l.Info("DB is on the same host. parsing logs locally")
97 if err = checkHasLocalPrivileges(lp.LogFolder); err == nil {
98 return lp.parseLogsLocal()
99 }
100 l.WithError(err).Error("Could't parse logs locally. lacking required privileges")
101 }
102
103 l.Info("DB is not detected to be on the same host. parsing logs remotely")
104 if err := checkHasRemotePrivileges(lp.ctx, lp.SourceConn, lp.LogFolder); err != nil {
105 l.WithError(err).Error("Could't parse logs remotely. lacking required privileges")
106 return err
107 }
108 return lp.parseLogsRemote()
109 }
110
111 func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (string, string, string, error) {
112 sql := `select
113 current_setting('data_directory') as dd,
114 current_setting('log_directory') as ld,
115 current_setting('lc_messages')::varchar(2) as lc_messages,
116 current_setting('log_truncate_on_rotation') as log_trunc`
117 var dd, ld, lc, logTrunc string
118 err := conn.QueryRow(ctx, sql).Scan(&dd, &ld, &lc, &logTrunc)
119 if err != nil {
120 return "", "", "", err
121 }
122 if !strings.HasPrefix(ld, "/") {
123 ld = path.Join(dd, ld)
124 }
125 if _, ok := pgSeveritiesLocale[lc]; !ok {
126 lc = "en"
127 }
128 return ld, lc, logTrunc, nil
129 }
130
131 func checkHasRemotePrivileges(ctx context.Context, mdb *sources.SourceConn, logsDirPath string) error {
132 var logFile string
133 err := mdb.Conn.QueryRow(ctx, "select name from pg_ls_logdir() limit 1").Scan(&logFile)
134 if err != nil && err != pgx.ErrNoRows {
135 return err
136 }
137
138 var dummy string
139 err = mdb.Conn.QueryRow(ctx, "select pg_read_file($1, 0, 0)", filepath.Join(logsDirPath, logFile)).Scan(&dummy)
140 return err
141 }
142
143 func checkHasLocalPrivileges(logsDirPath string) error {
144 _, err := os.ReadDir(logsDirPath)
145 if err != nil {
146 return err
147 }
148 return nil
149 }
150
151 func severityToEnglish(serverLang, errorSeverity string) string {
152 if serverLang == "en" {
153 return errorSeverity
154 }
155 severityMap := pgSeveritiesLocale[serverLang]
156 severityEn, ok := severityMap[errorSeverity]
157 if !ok {
158 return errorSeverity
159 }
160 return severityEn
161 }
162
163 func (lp *LogParser) regexMatchesToMap(matches []string) map[string]string {
164 result := make(map[string]string)
165 if len(matches) == 0 || lp.LogsMatchRegex == nil {
166 return result
167 }
168 for i, name := range lp.LogsMatchRegex.SubexpNames() {
169 if i != 0 && name != "" {
170 result[name] = matches[i]
171 }
172 }
173 return result
174 }
175
176
177 func (lp *LogParser) GetMeasurementEnvelope() metrics.MeasurementEnvelope {
178 allSeverityCounts := metrics.NewMeasurement(time.Now().UnixNano())
179 for _, s := range pgSeverities {
180 parsedCount, ok := lp.eventCounts[s]
181 if ok {
182 allSeverityCounts[strings.ToLower(s)] = parsedCount
183 } else {
184 allSeverityCounts[strings.ToLower(s)] = int64(0)
185 }
186 parsedCount, ok = lp.eventCountsTotal[s]
187 if ok {
188 allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
189 } else {
190 allSeverityCounts[strings.ToLower(s)+"_total"] = int64(0)
191 }
192 }
193 return metrics.MeasurementEnvelope{
194 DBName: lp.SourceConn.Name,
195 MetricName: specialMetricServerLogEventCounts,
196 Data: metrics.Measurements{allSeverityCounts},
197 CustomTags: lp.SourceConn.CustomTags,
198 }
199 }
200
201 func zeroEventCounts(eventCounts map[string]int64) {
202 for _, severity := range pgSeverities {
203 eventCounts[severity] = 0
204 }
205 }
206