1 package reaper
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "maps"
8 "strings"
9 "time"
10
11 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
12 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
13 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
16 "github.com/jackc/pgx/v5"
17 )
18
19 func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
20 var conn db.PgxIface
21 var err error
22 var tx pgx.Tx
23 if strings.TrimSpace(sql) == "" {
24 return nil, errors.New("empty SQL")
25 }
26
27 conn = md.Conn
28 if md.IsPostgresSource() {
29
30 if tx, err = conn.Begin(ctx); err != nil {
31 return nil, err
32 }
33 defer func() { _ = tx.Commit(ctx) }()
34 _, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
35 if err != nil {
36 return nil, err
37 }
38 conn = tx
39 } else {
40
41 args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
42 }
43 rows, err := conn.Query(ctx, sql, args...)
44 if err == nil {
45 return pgx.CollectRows(rows, metrics.RowToMeasurement)
46 }
47 return nil, err
48 }
49
50 func DetectSprocChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
51 detectedChanges := make(metrics.Measurements, 0)
52 var firstRun bool
53 var changeCounts ChangeDetectionResults
54
55 log.GetLogger(ctx).Debugf("[%s][%s] checking for sproc changes...", md.Name, specialMetricChangeEvents)
56 if _, ok := hostState["sproc_hashes"]; !ok {
57 firstRun = true
58 hostState["sproc_hashes"] = make(map[string]string)
59 }
60
61 mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
62 if !ok {
63 log.GetLogger(ctx).Error("could not get sproc_hashes sql")
64 return changeCounts
65 }
66
67 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
68 if err != nil {
69 log.GetLogger(ctx).Error("could not read sproc_hashes from monitored host: ", md.Name, ", err:", err)
70 return changeCounts
71 }
72
73 for _, dr := range data {
74 objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
75 prevHash, ok := hostState["sproc_hashes"][objIdent]
76 if ok {
77 if prevHash != dr["md5"].(string) {
78 log.GetLogger(ctx).Info("detected change in sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
79 dr["event"] = "alter"
80 detectedChanges = append(detectedChanges, dr)
81 hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
82 changeCounts.Altered++
83 }
84 } else {
85 if !firstRun {
86 log.GetLogger(ctx).Info("detected new sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
87 dr["event"] = "create"
88 detectedChanges = append(detectedChanges, dr)
89 changeCounts.Created++
90 }
91 hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
92 }
93 }
94
95 if !firstRun && len(hostState["sproc_hashes"]) != len(data) {
96 deletedSProcs := make([]string, 0)
97
98 currentOidMap := make(map[string]bool)
99 for _, dr := range data {
100 currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
101 }
102 for sprocIdent := range hostState["sproc_hashes"] {
103 _, ok := currentOidMap[sprocIdent]
104 if !ok {
105 splits := strings.Split(sprocIdent, dbMetricJoinStr)
106 log.GetLogger(ctx).Info("detected delete of sproc:", splits[0], ", oid:", splits[1])
107 influxEntry := metrics.NewMeasurement(data.GetEpoch())
108 influxEntry["event"] = "drop"
109 influxEntry["tag_sproc"] = splits[0]
110 influxEntry["tag_oid"] = splits[1]
111 detectedChanges = append(detectedChanges, influxEntry)
112 deletedSProcs = append(deletedSProcs, sprocIdent)
113 changeCounts.Dropped++
114 }
115 }
116 for _, deletedSProc := range deletedSProcs {
117 delete(hostState["sproc_hashes"], deletedSProc)
118 }
119 }
120 log.GetLogger(ctx).Debugf("[%s][%s] detected %d sproc changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
121 if len(detectedChanges) > 0 {
122 storageCh <- metrics.MeasurementEnvelope{
123 DBName: md.Name,
124 MetricName: "sproc_changes",
125 Data: detectedChanges,
126 CustomTags: md.CustomTags,
127 }
128 }
129
130 return changeCounts
131 }
132
133 func DetectTableChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
134 detectedChanges := make(metrics.Measurements, 0)
135 var firstRun bool
136 var changeCounts ChangeDetectionResults
137
138 log.GetLogger(ctx).Debugf("[%s][%s] checking for table changes...", md.Name, specialMetricChangeEvents)
139 if _, ok := hostState["table_hashes"]; !ok {
140 firstRun = true
141 hostState["table_hashes"] = make(map[string]string)
142 }
143
144 mvp, ok := metricDefs.GetMetricDef("table_hashes")
145 if !ok {
146 log.GetLogger(ctx).Error("could not get table_hashes sql")
147 return changeCounts
148 }
149
150 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
151 if err != nil {
152 log.GetLogger(ctx).Error("could not read table_hashes from monitored host:", md.Name, ", err:", err)
153 return changeCounts
154 }
155
156 for _, dr := range data {
157 objIdent := dr["tag_table"].(string)
158 prevHash, ok := hostState["table_hashes"][objIdent]
159 if ok {
160 if prevHash != dr["md5"].(string) {
161 log.GetLogger(ctx).Info("detected DDL change in table:", dr["tag_table"])
162 dr["event"] = "alter"
163 detectedChanges = append(detectedChanges, dr)
164 hostState["table_hashes"][objIdent] = dr["md5"].(string)
165 changeCounts.Altered++
166 }
167 } else {
168 if !firstRun {
169 log.GetLogger(ctx).Info("detected new table:", dr["tag_table"])
170 dr["event"] = "create"
171 detectedChanges = append(detectedChanges, dr)
172 changeCounts.Created++
173 }
174 hostState["table_hashes"][objIdent] = dr["md5"].(string)
175 }
176 }
177
178 if !firstRun && len(hostState["table_hashes"]) != len(data) {
179 deletedTables := make([]string, 0)
180
181 currentTableMap := make(map[string]bool)
182 for _, dr := range data {
183 currentTableMap[dr["tag_table"].(string)] = true
184 }
185 for table := range hostState["table_hashes"] {
186 _, ok := currentTableMap[table]
187 if !ok {
188 log.GetLogger(ctx).Info("detected drop of table:", table)
189 influxEntry := metrics.NewMeasurement(data.GetEpoch())
190 influxEntry["event"] = "drop"
191 influxEntry["tag_table"] = table
192 detectedChanges = append(detectedChanges, influxEntry)
193 deletedTables = append(deletedTables, table)
194 changeCounts.Dropped++
195 }
196 }
197 for _, deletedTable := range deletedTables {
198 delete(hostState["table_hashes"], deletedTable)
199 }
200 }
201
202 log.GetLogger(ctx).Debugf("[%s][%s] detected %d table changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
203 if len(detectedChanges) > 0 {
204 storageCh <- metrics.MeasurementEnvelope{
205 DBName: md.Name,
206 MetricName: "table_changes",
207 Data: detectedChanges,
208 CustomTags: md.CustomTags,
209 }
210 }
211
212 return changeCounts
213 }
214
215 func DetectIndexChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
216 detectedChanges := make(metrics.Measurements, 0)
217 var firstRun bool
218 var changeCounts ChangeDetectionResults
219
220 log.GetLogger(ctx).Debugf("[%s][%s] checking for index changes...", md.Name, specialMetricChangeEvents)
221 if _, ok := hostState["index_hashes"]; !ok {
222 firstRun = true
223 hostState["index_hashes"] = make(map[string]string)
224 }
225
226 mvp, ok := metricDefs.GetMetricDef("index_hashes")
227 if !ok {
228 log.GetLogger(ctx).Error("could not get index_hashes sql")
229 return changeCounts
230 }
231
232 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
233 if err != nil {
234 log.GetLogger(ctx).Error("could not read index_hashes from monitored host:", md.Name, ", err:", err)
235 return changeCounts
236 }
237
238 for _, dr := range data {
239 objIdent := dr["tag_index"].(string)
240 prevHash, ok := hostState["index_hashes"][objIdent]
241 if ok {
242 if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
243 log.GetLogger(ctx).Info("detected index change:", dr["tag_index"], ", table:", dr["table"])
244 dr["event"] = "alter"
245 detectedChanges = append(detectedChanges, dr)
246 hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
247 changeCounts.Altered++
248 }
249 } else {
250 if !firstRun {
251 log.GetLogger(ctx).Info("detected new index:", dr["tag_index"])
252 dr["event"] = "create"
253 detectedChanges = append(detectedChanges, dr)
254 changeCounts.Created++
255 }
256 hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
257 }
258 }
259
260 if !firstRun && len(hostState["index_hashes"]) != len(data) {
261 deletedIndexes := make([]string, 0)
262
263 currentIndexMap := make(map[string]bool)
264 for _, dr := range data {
265 currentIndexMap[dr["tag_index"].(string)] = true
266 }
267 for indexName := range hostState["index_hashes"] {
268 _, ok := currentIndexMap[indexName]
269 if !ok {
270 log.GetLogger(ctx).Info("detected drop of index_name:", indexName)
271 influxEntry := metrics.NewMeasurement(data.GetEpoch())
272 influxEntry["event"] = "drop"
273 influxEntry["tag_index"] = indexName
274 detectedChanges = append(detectedChanges, influxEntry)
275 deletedIndexes = append(deletedIndexes, indexName)
276 changeCounts.Dropped++
277 }
278 }
279 for _, deletedIndex := range deletedIndexes {
280 delete(hostState["index_hashes"], deletedIndex)
281 }
282 }
283 log.GetLogger(ctx).Debugf("[%s][%s] detected %d index changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
284 if len(detectedChanges) > 0 {
285 storageCh <- metrics.MeasurementEnvelope{
286 DBName: md.Name,
287 MetricName: "index_changes",
288 Data: detectedChanges,
289 CustomTags: md.CustomTags,
290 }
291 }
292
293 return changeCounts
294 }
295
296 func DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
297 detectedChanges := make(metrics.Measurements, 0)
298 var firstRun bool
299 var changeCounts ChangeDetectionResults
300
301 log.GetLogger(ctx).Debugf("[%s][%s] checking object privilege changes...", md.Name, specialMetricChangeEvents)
302 if _, ok := hostState["object_privileges"]; !ok {
303 firstRun = true
304 hostState["object_privileges"] = make(map[string]string)
305 }
306
307 mvp, ok := metricDefs.GetMetricDef("privilege_changes")
308 if !ok || mvp.GetSQL(int(md.Version)) == "" {
309 log.GetLogger(ctx).Warningf("[%s][%s] could not get SQL for 'privilege_changes'. cannot detect privilege changes", md.Name, specialMetricChangeEvents)
310 return changeCounts
311 }
312
313
314 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
315 if err != nil {
316 log.GetLogger(ctx).Errorf("[%s][%s] failed to fetch object privileges info: %v", md.Name, specialMetricChangeEvents, err)
317 return changeCounts
318 }
319
320 currentState := make(map[string]bool)
321 for _, dr := range data {
322 objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
323 if firstRun {
324 hostState["object_privileges"][objIdent] = ""
325 } else {
326 _, ok := hostState["object_privileges"][objIdent]
327 if !ok {
328 log.GetLogger(ctx).Infof("[%s][%s] detected new object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
329 md.Name, specialMetricChangeEvents, dr["tag_role"], dr["object_type"], dr["tag_object"], dr["privilege_type"])
330 dr["event"] = "GRANT"
331 detectedChanges = append(detectedChanges, dr)
332 changeCounts.Created++
333 hostState["object_privileges"][objIdent] = ""
334 }
335 currentState[objIdent] = true
336 }
337 }
338
339 if !firstRun && len(currentState) > 0 {
340 for objPrevRun := range hostState["object_privileges"] {
341 if _, ok := currentState[objPrevRun]; !ok {
342 splits := strings.Split(objPrevRun, "#:#")
343 log.GetLogger(ctx).Infof("[%s][%s] detected removed object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
344 md.Name, specialMetricChangeEvents, splits[1], splits[0], splits[2], splits[3])
345 revokeEntry := metrics.NewMeasurement(data.GetEpoch())
346 revokeEntry["object_type"] = splits[0]
347 revokeEntry["tag_role"] = splits[1]
348 revokeEntry["tag_object"] = splits[2]
349 revokeEntry["privilege_type"] = splits[3]
350 revokeEntry["event"] = "REVOKE"
351 detectedChanges = append(detectedChanges, revokeEntry)
352 changeCounts.Dropped++
353 delete(hostState["object_privileges"], objPrevRun)
354 }
355 }
356 }
357
358 log.GetLogger(ctx).Debugf("[%s][%s] detected %d object privilege changes...", md.Name, specialMetricChangeEvents, len(detectedChanges))
359 if len(detectedChanges) > 0 {
360 storageCh <- metrics.MeasurementEnvelope{
361 DBName: md.Name,
362 MetricName: "privilege_changes",
363 Data: detectedChanges,
364 CustomTags: md.CustomTags,
365 }
366 }
367
368 return changeCounts
369 }
370
371 func DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
372 detectedChanges := make(metrics.Measurements, 0)
373 var firstRun bool
374 var changeCounts ChangeDetectionResults
375
376 logger := log.GetLogger(ctx).WithField("source", md.Name).WithField("metric", specialMetricChangeEvents)
377
378 if _, ok := hostState["configuration_hashes"]; !ok {
379 firstRun = true
380 hostState["configuration_hashes"] = make(map[string]string)
381 }
382
383 mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
384 if !ok {
385 logger.Error("could not get configuration_hashes sql")
386 return changeCounts
387 }
388
389 rows, err := md.Conn.Query(ctx, mvp.GetSQL(md.Version))
390 if err != nil {
391 logger.Error("could not read configuration_hashes from monitored host: ", err)
392 return changeCounts
393 }
394 defer rows.Close()
395 var (
396 objIdent, objValue string
397 epoch int64
398 )
399 for rows.Next() {
400 if rows.Scan(&epoch, &objIdent, &objValue) != nil {
401 return changeCounts
402 }
403 prevРash, ok := hostState["configuration_hashes"][objIdent]
404 if ok {
405 if prevРash != objValue {
406 logger.Warningf("detected settings change: %s = %s (prev: %s)", objIdent, objValue, prevРash)
407 detectedChanges = append(detectedChanges, metrics.Measurement{
408 metrics.EpochColumnName: epoch,
409 "tag_setting": objIdent,
410 "value": objValue,
411 "event": "alter"})
412 hostState["configuration_hashes"][objIdent] = objValue
413 changeCounts.Altered++
414 }
415 } else {
416 hostState["configuration_hashes"][objIdent] = objValue
417 if firstRun {
418 continue
419 }
420 logger.Warning("detected new setting: ", objIdent)
421 detectedChanges = append(detectedChanges, metrics.Measurement{
422 metrics.EpochColumnName: epoch,
423 "tag_setting": objIdent,
424 "value": objValue,
425 "event": "create"})
426 changeCounts.Created++
427 }
428 }
429
430 if len(detectedChanges) > 0 {
431 storageCh <- metrics.MeasurementEnvelope{
432 DBName: md.Name,
433 MetricName: "configuration_changes",
434 Data: detectedChanges,
435 CustomTags: md.CustomTags,
436 }
437 }
438 return changeCounts
439 }
440
441
442
443 func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
444 err := md.Conn.Ping(ctx)
445 return metrics.Measurements{
446 metrics.Measurement{
447 metrics.EpochColumnName: time.Now().UnixNano(),
448 "instance_up": func() int {
449 if err == nil {
450 return 1
451 }
452 return 0
453 }(),
454 },
455 }, err
456 }
457
458 func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, md *sources.SourceConn, hostState map[string]map[string]string) {
459 storageCh := r.measurementCh
460 sprocCounts := DetectSprocChanges(ctx, md, storageCh, hostState)
461 tableCounts := DetectTableChanges(ctx, md, storageCh, hostState)
462 indexCounts := DetectIndexChanges(ctx, md, storageCh, hostState)
463 confCounts := DetectConfigurationChanges(ctx, md, storageCh, hostState)
464 privChangeCounts := DetectPrivilegeChanges(ctx, md, storageCh, hostState)
465
466
467 message := ""
468 if sprocCounts.Altered > 0 || sprocCounts.Created > 0 || sprocCounts.Dropped > 0 {
469 message += fmt.Sprintf(" sprocs %d/%d/%d", sprocCounts.Created, sprocCounts.Altered, sprocCounts.Dropped)
470 }
471 if tableCounts.Altered > 0 || tableCounts.Created > 0 || tableCounts.Dropped > 0 {
472 message += fmt.Sprintf(" tables/views %d/%d/%d", tableCounts.Created, tableCounts.Altered, tableCounts.Dropped)
473 }
474 if indexCounts.Altered > 0 || indexCounts.Created > 0 || indexCounts.Dropped > 0 {
475 message += fmt.Sprintf(" indexes %d/%d/%d", indexCounts.Created, indexCounts.Altered, indexCounts.Dropped)
476 }
477 if confCounts.Altered > 0 || confCounts.Created > 0 {
478 message += fmt.Sprintf(" configuration %d/%d/%d", confCounts.Created, confCounts.Altered, confCounts.Dropped)
479 }
480 if privChangeCounts.Dropped > 0 || privChangeCounts.Created > 0 {
481 message += fmt.Sprintf(" privileges %d/%d/%d", privChangeCounts.Created, privChangeCounts.Altered, privChangeCounts.Dropped)
482 }
483
484 if message > "" {
485 message = "Detected changes for \"" + dbUnique + "\" [Created/Altered/Dropped]:" + message
486 log.GetLogger(ctx).Info(message)
487 detectedChangesSummary := make(metrics.Measurements, 0)
488 influxEntry := metrics.NewMeasurement(time.Now().UnixNano())
489 influxEntry["details"] = message
490 detectedChangesSummary = append(detectedChangesSummary, influxEntry)
491 storageCh <- metrics.MeasurementEnvelope{
492 DBName: dbUnique,
493 MetricName: "object_changes",
494 Data: detectedChangesSummary,
495 CustomTags: md.CustomTags,
496 }
497
498 }
499 }
500
501
502
503
504 func TryCreateMissingExtensions(ctx context.Context, md *sources.SourceConn, extensionNames []string, existingExtensions map[string]int) []string {
505
506 sqlAvailable := `select name::text from pg_available_extensions`
507 extsCreated := make([]string, 0)
508
509
510 data, err := QueryMeasurements(ctx, md, sqlAvailable)
511 if err != nil {
512 log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", md, err)
513 return extsCreated
514 }
515
516 availableExts := make(map[string]bool)
517 for _, row := range data {
518 availableExts[row["name"].(string)] = true
519 }
520
521 for _, extToCreate := range extensionNames {
522 if _, ok := existingExtensions[extToCreate]; ok {
523 continue
524 }
525 _, ok := availableExts[extToCreate]
526 if !ok {
527 log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", md, extToCreate)
528 } else {
529 sqlCreateExt := `create extension ` + extToCreate
530 _, err := QueryMeasurements(ctx, md, sqlCreateExt)
531 if err != nil {
532 log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", md, extToCreate, err)
533 }
534 extsCreated = append(extsCreated, extToCreate)
535 }
536 }
537
538 return extsCreated
539 }
540
541
542 func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
543 sl := log.GetLogger(ctx).WithField("source", md.Name)
544 metrics := maps.Clone(md.Metrics)
545 maps.Insert(metrics, maps.All(md.MetricsStandby))
546 for metricName := range metrics {
547 metric, ok := metricDefs.GetMetricDef(metricName)
548 if !ok {
549 continue
550 }
551 if _, err = md.Conn.Exec(ctx, metric.InitSQL); err != nil {
552 return
553 }
554 sl.WithField("metric", metricName).Info("Successfully created metric fetching helpers")
555 }
556 return
557 }
558
559 func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) {
560 for _, prevDB := range r.prevLoopMonitoredDBs {
561 if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil {
562 prevDB.Conn.Close()
563 _ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
564 }
565 }
566
567 for roleChangedDB := range shutDownDueToRoleChange {
568 if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil {
569 db.Conn.Close()
570 }
571 _ = r.SinksWriter.SyncMetric(roleChangedDB, "", sinks.DeleteOp)
572 }
573 }
574