1 package reaper
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "maps"
8 "strings"
9 "time"
10
11 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
12 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
13 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
16 "github.com/jackc/pgx/v5"
17 )
18
19 func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
20 var conn db.PgxIface
21 var err error
22 var tx pgx.Tx
23 if strings.TrimSpace(sql) == "" {
24 return nil, errors.New("empty SQL")
25 }
26
27 conn = md.Conn
28 if md.IsPostgresSource() {
29
30 if tx, err = conn.Begin(ctx); err != nil {
31 return nil, err
32 }
33 defer func() { _ = tx.Commit(ctx) }()
34 _, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
35 if err != nil {
36 return nil, err
37 }
38 conn = tx
39 } else {
40
41 args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
42 }
43 rows, err := conn.Query(ctx, sql, args...)
44 if err == nil {
45 return pgx.CollectRows(rows, metrics.RowToMeasurement)
46 }
47 return nil, err
48 }
49
50 func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults) {
51 detectedChanges := make(metrics.Measurements, 0)
52 var firstRun bool
53 l := log.GetLogger(ctx)
54 changeCounts.Target = "functions"
55 l.Debug("checking for sproc changes...")
56 if _, ok := md.ChangeState["sproc_hashes"]; !ok {
57 firstRun = true
58 md.ChangeState["sproc_hashes"] = make(map[string]string)
59 }
60
61 mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
62 if !ok {
63 l.Error("could not get sproc_hashes sql")
64 return
65 }
66
67 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
68 if err != nil {
69 l.Error(err)
70 return
71 }
72
73 for _, dr := range data {
74 objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
75 prevHash, ok := md.ChangeState["sproc_hashes"][objIdent]
76 ll := l.WithField("sproc", dr["tag_sproc"]).WithField("oid", dr["tag_oid"])
77 if ok {
78 if prevHash != dr["md5"].(string) {
79 ll.Debug("change detected")
80 dr["event"] = "alter"
81 detectedChanges = append(detectedChanges, dr)
82 md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
83 changeCounts.Altered++
84 }
85 } else {
86 if !firstRun {
87 ll.Debug("new sproc detected")
88 dr["event"] = "create"
89 detectedChanges = append(detectedChanges, dr)
90 changeCounts.Created++
91 }
92 md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
93 }
94 }
95
96 if !firstRun && len(md.ChangeState["sproc_hashes"]) != len(data) {
97
98 currentOidMap := make(map[string]bool)
99 for _, dr := range data {
100 currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
101 }
102 for sprocIdent := range md.ChangeState["sproc_hashes"] {
103 _, ok := currentOidMap[sprocIdent]
104 if !ok {
105 splits := strings.Split(sprocIdent, dbMetricJoinStr)
106 l.WithField("sproc", splits[0]).WithField("oid", splits[1]).Debug("deleted sproc detected")
107 m := metrics.NewMeasurement(data.GetEpoch())
108 m["event"] = "drop"
109 m["tag_sproc"] = splits[0]
110 m["tag_oid"] = splits[1]
111 detectedChanges = append(detectedChanges, m)
112 delete(md.ChangeState["sproc_hashes"], sprocIdent)
113 changeCounts.Dropped++
114 }
115 }
116 }
117 l.Debugf("sproc changes detected: %d", len(detectedChanges))
118 if len(detectedChanges) > 0 {
119 r.measurementCh <- metrics.MeasurementEnvelope{
120 DBName: md.Name,
121 MetricName: "sproc_changes",
122 Data: detectedChanges,
123 CustomTags: md.CustomTags,
124 }
125 }
126
127 return changeCounts
128 }
129
130 func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
131 detectedChanges := make(metrics.Measurements, 0)
132 var firstRun bool
133 var changeCounts ChangeDetectionResults
134 l := log.GetLogger(ctx)
135 changeCounts.Target = "tables"
136 l.Debug("checking for table changes...")
137 if _, ok := md.ChangeState["table_hashes"]; !ok {
138 firstRun = true
139 md.ChangeState["table_hashes"] = make(map[string]string)
140 }
141
142 mvp, ok := metricDefs.GetMetricDef("table_hashes")
143 if !ok {
144 l.Error("could not get table_hashes sql")
145 return changeCounts
146 }
147
148 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
149 if err != nil {
150 l.Error(err)
151 return changeCounts
152 }
153
154 for _, dr := range data {
155 objIdent := dr["tag_table"].(string)
156 prevHash, ok := md.ChangeState["table_hashes"][objIdent]
157 ll := l.WithField("table", dr["tag_table"])
158 if ok {
159 if prevHash != dr["md5"].(string) {
160 ll.Debug("change detected")
161 dr["event"] = "alter"
162 detectedChanges = append(detectedChanges, dr)
163 md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
164 changeCounts.Altered++
165 }
166 } else {
167 if !firstRun {
168 ll.Debug("new table detected")
169 dr["event"] = "create"
170 detectedChanges = append(detectedChanges, dr)
171 changeCounts.Created++
172 }
173 md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
174 }
175 }
176
177 if !firstRun && len(md.ChangeState["table_hashes"]) != len(data) {
178 deletedTables := make([]string, 0)
179
180 currentTableMap := make(map[string]bool)
181 for _, dr := range data {
182 currentTableMap[dr["tag_table"].(string)] = true
183 }
184 for table := range md.ChangeState["table_hashes"] {
185 _, ok := currentTableMap[table]
186 if !ok {
187 l.WithField("table", table).Debug("deleted table detected")
188 influxEntry := metrics.NewMeasurement(data.GetEpoch())
189 influxEntry["event"] = "drop"
190 influxEntry["tag_table"] = table
191 detectedChanges = append(detectedChanges, influxEntry)
192 deletedTables = append(deletedTables, table)
193 changeCounts.Dropped++
194 }
195 }
196 for _, deletedTable := range deletedTables {
197 delete(md.ChangeState["table_hashes"], deletedTable)
198 }
199 }
200
201 l.Debugf("table changes detected: %d", len(detectedChanges))
202 if len(detectedChanges) > 0 {
203 r.measurementCh <- metrics.MeasurementEnvelope{
204 DBName: md.Name,
205 MetricName: "table_changes",
206 Data: detectedChanges,
207 CustomTags: md.CustomTags,
208 }
209 }
210
211 return changeCounts
212 }
213
214 func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
215 detectedChanges := make(metrics.Measurements, 0)
216 var firstRun bool
217 var changeCounts ChangeDetectionResults
218 l := log.GetLogger(ctx)
219 changeCounts.Target = "indexes"
220 l.Debug("checking for index changes...")
221 if _, ok := md.ChangeState["index_hashes"]; !ok {
222 firstRun = true
223 md.ChangeState["index_hashes"] = make(map[string]string)
224 }
225
226 mvp, ok := metricDefs.GetMetricDef("index_hashes")
227 if !ok {
228 l.Error("could not get index_hashes sql")
229 return changeCounts
230 }
231
232 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
233 if err != nil {
234 l.Error(err)
235 return changeCounts
236 }
237
238 for _, dr := range data {
239 objIdent := dr["tag_index"].(string)
240 prevHash, ok := md.ChangeState["index_hashes"][objIdent]
241 ll := l.WithField("index", dr["tag_index"]).WithField("table", dr["table"])
242 if ok {
243 if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
244 ll.Debug("change detected")
245 dr["event"] = "alter"
246 detectedChanges = append(detectedChanges, dr)
247 md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
248 changeCounts.Altered++
249 }
250 } else {
251 if !firstRun {
252 ll.Debug("new index detected")
253 dr["event"] = "create"
254 detectedChanges = append(detectedChanges, dr)
255 changeCounts.Created++
256 }
257 md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
258 }
259 }
260
261 if !firstRun && len(md.ChangeState["index_hashes"]) != len(data) {
262 deletedIndexes := make([]string, 0)
263
264 currentIndexMap := make(map[string]bool)
265 for _, dr := range data {
266 currentIndexMap[dr["tag_index"].(string)] = true
267 }
268 for indexName := range md.ChangeState["index_hashes"] {
269 _, ok := currentIndexMap[indexName]
270 if !ok {
271 l.WithField("index", indexName).Debug("deleted index detected")
272 influxEntry := metrics.NewMeasurement(data.GetEpoch())
273 influxEntry["event"] = "drop"
274 influxEntry["tag_index"] = indexName
275 detectedChanges = append(detectedChanges, influxEntry)
276 deletedIndexes = append(deletedIndexes, indexName)
277 changeCounts.Dropped++
278 }
279 }
280 for _, deletedIndex := range deletedIndexes {
281 delete(md.ChangeState["index_hashes"], deletedIndex)
282 }
283 }
284 l.Debugf("index changes detected: %d", len(detectedChanges))
285 if len(detectedChanges) > 0 {
286 r.measurementCh <- metrics.MeasurementEnvelope{
287 DBName: md.Name,
288 MetricName: "index_changes",
289 Data: detectedChanges,
290 CustomTags: md.CustomTags,
291 }
292 }
293
294 return changeCounts
295 }
296
297 func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
298 detectedChanges := make(metrics.Measurements, 0)
299 var firstRun bool
300 var changeCounts ChangeDetectionResults
301 l := log.GetLogger(ctx)
302 changeCounts.Target = "privileges"
303 l.Debug("checking object privilege changes...")
304 if _, ok := md.ChangeState["object_privileges"]; !ok {
305 firstRun = true
306 md.ChangeState["object_privileges"] = make(map[string]string)
307 }
308
309 mvp, ok := metricDefs.GetMetricDef("privilege_changes")
310 if !ok || mvp.GetSQL(int(md.Version)) == "" {
311 l.Warning("could not get SQL for 'privilege_changes'. cannot detect privilege changes")
312 return changeCounts
313 }
314
315
316 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
317 if err != nil {
318 l.Error(err)
319 return changeCounts
320 }
321
322 currentState := make(map[string]bool)
323 for _, dr := range data {
324 objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
325 ll := l.WithField("role", dr["tag_role"]).
326 WithField("object_type", dr["object_type"]).
327 WithField("object", dr["tag_object"]).
328 WithField("privilege_type", dr["privilege_type"])
329 if firstRun {
330 md.ChangeState["object_privileges"][objIdent] = ""
331 } else {
332 _, ok := md.ChangeState["object_privileges"][objIdent]
333 if !ok {
334 ll.Debug("new object privileges detected")
335 dr["event"] = "GRANT"
336 detectedChanges = append(detectedChanges, dr)
337 changeCounts.Created++
338 md.ChangeState["object_privileges"][objIdent] = ""
339 }
340 currentState[objIdent] = true
341 }
342 }
343
344 if !firstRun && len(currentState) > 0 {
345 for objPrevRun := range md.ChangeState["object_privileges"] {
346 if _, ok := currentState[objPrevRun]; !ok {
347 splits := strings.Split(objPrevRun, "#:#")
348 l.WithField("role", splits[1]).
349 WithField("object_type", splits[0]).
350 WithField("object", splits[2]).
351 WithField("privilege_type", splits[3]).
352 Debug("removed object privileges detected")
353 revokeEntry := metrics.NewMeasurement(data.GetEpoch())
354 revokeEntry["object_type"] = splits[0]
355 revokeEntry["tag_role"] = splits[1]
356 revokeEntry["tag_object"] = splits[2]
357 revokeEntry["privilege_type"] = splits[3]
358 revokeEntry["event"] = "REVOKE"
359 detectedChanges = append(detectedChanges, revokeEntry)
360 changeCounts.Dropped++
361 delete(md.ChangeState["object_privileges"], objPrevRun)
362 }
363 }
364 }
365
366 l.Debugf("object privilege changes detected: %d", len(detectedChanges))
367 if len(detectedChanges) > 0 {
368 r.measurementCh <- metrics.MeasurementEnvelope{
369 DBName: md.Name,
370 MetricName: "privilege_changes",
371 Data: detectedChanges,
372 CustomTags: md.CustomTags,
373 }
374 }
375
376 return changeCounts
377 }
378
379 func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
380 detectedChanges := make(metrics.Measurements, 0)
381 var firstRun bool
382 var changeCounts ChangeDetectionResults
383 l := log.GetLogger(ctx)
384 changeCounts.Target = "settings"
385 l.Debug("checking for configuration changes...")
386 if _, ok := md.ChangeState["configuration_hashes"]; !ok {
387 firstRun = true
388 md.ChangeState["configuration_hashes"] = make(map[string]string)
389 }
390
391 mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
392 if !ok {
393 l.Error("could not get configuration_hashes sql")
394 return changeCounts
395 }
396
397 rows, err := md.Conn.Query(ctx, mvp.GetSQL(md.Version))
398 if err != nil {
399 l.Error(err)
400 return changeCounts
401 }
402 defer rows.Close()
403 var (
404 objIdent, objValue string
405 epoch int64
406 )
407 for rows.Next() {
408 if rows.Scan(&epoch, &objIdent, &objValue) != nil {
409 return changeCounts
410 }
411 prevРash, ok := md.ChangeState["configuration_hashes"][objIdent]
412 ll := l.WithField("setting", objIdent)
413 if ok {
414 if prevРash != objValue {
415 ll.Warningf("settings change detected: %s = %s (prev: %s)", objIdent, objValue, prevРash)
416 detectedChanges = append(detectedChanges, metrics.Measurement{
417 metrics.EpochColumnName: epoch,
418 "tag_setting": objIdent,
419 "value": objValue,
420 "event": "alter"})
421 md.ChangeState["configuration_hashes"][objIdent] = objValue
422 changeCounts.Altered++
423 }
424 } else {
425 md.ChangeState["configuration_hashes"][objIdent] = objValue
426 if firstRun {
427 continue
428 }
429 ll.Debug("new setting detected")
430 detectedChanges = append(detectedChanges, metrics.Measurement{
431 metrics.EpochColumnName: epoch,
432 "tag_setting": objIdent,
433 "value": objValue,
434 "event": "create"})
435 changeCounts.Created++
436 }
437 }
438
439 l.Debugf("configuration changes detected: %d", len(detectedChanges))
440 if len(detectedChanges) > 0 {
441 r.measurementCh <- metrics.MeasurementEnvelope{
442 DBName: md.Name,
443 MetricName: "configuration_changes",
444 Data: detectedChanges,
445 CustomTags: md.CustomTags,
446 }
447 }
448 return changeCounts
449 }
450
451
452
453 func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
454 err := md.Conn.Ping(ctx)
455 return metrics.Measurements{
456 metrics.Measurement{
457 metrics.EpochColumnName: time.Now().UnixNano(),
458 "instance_up": func() int {
459 if err == nil {
460 return 1
461 }
462 return 0
463 }(),
464 },
465 }, err
466 }
467
468 func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
469 md.Lock()
470 defer md.Unlock()
471
472 spN := r.DetectSprocChanges(ctx, md)
473 tblN := r.DetectTableChanges(ctx, md)
474 idxN := r.DetectIndexChanges(ctx, md)
475 cnfN := r.DetectConfigurationChanges(ctx, md)
476 privN := r.DetectPrivilegeChanges(ctx, md)
477
478 if spN.Total()+tblN.Total()+idxN.Total()+cnfN.Total()+privN.Total() == 0 {
479 return nil, nil
480 }
481
482 m := metrics.NewMeasurement(time.Now().UnixNano())
483 m["details"] = strings.Join([]string{spN.String(), tblN.String(), idxN.String(), cnfN.String(), privN.String()}, " ")
484 return metrics.Measurements{m}, nil
485 }
486
487
488
489
490 func TryCreateMissingExtensions(ctx context.Context, md *sources.SourceConn, extensionNames []string, existingExtensions map[string]int) []string {
491
492 sqlAvailable := `select name::text from pg_available_extensions`
493 extsCreated := make([]string, 0)
494
495
496 data, err := QueryMeasurements(ctx, md, sqlAvailable)
497 if err != nil {
498 log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", md, err)
499 return extsCreated
500 }
501
502 availableExts := make(map[string]bool)
503 for _, row := range data {
504 availableExts[row["name"].(string)] = true
505 }
506
507 for _, extToCreate := range extensionNames {
508 if _, ok := existingExtensions[extToCreate]; ok {
509 continue
510 }
511 _, ok := availableExts[extToCreate]
512 if !ok {
513 log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", md, extToCreate)
514 } else {
515 sqlCreateExt := `create extension ` + extToCreate
516 _, err := QueryMeasurements(ctx, md, sqlCreateExt)
517 if err != nil {
518 log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", md, extToCreate, err)
519 }
520 extsCreated = append(extsCreated, extToCreate)
521 }
522 }
523
524 return extsCreated
525 }
526
527
528 func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
529 sl := log.GetLogger(ctx).WithField("source", md.Name)
530 metrics := maps.Clone(md.Metrics)
531 maps.Insert(metrics, maps.All(md.MetricsStandby))
532 for metricName := range metrics {
533 metric, ok := metricDefs.GetMetricDef(metricName)
534 if !ok {
535 continue
536 }
537 if _, err = md.Conn.Exec(ctx, metric.InitSQL); err != nil {
538 return
539 }
540 sl.WithField("metric", metricName).Info("Successfully created metric fetching helpers")
541 }
542 return
543 }
544
545 func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) {
546 for _, prevDB := range r.prevLoopMonitoredDBs {
547 if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil {
548 prevDB.Conn.Close()
549 _ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
550 }
551 }
552
553 for roleChangedDB := range shutDownDueToRoleChange {
554 if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil {
555 db.Conn.Close()
556 }
557 _ = r.SinksWriter.SyncMetric(roleChangedDB, "", sinks.DeleteOp)
558 }
559 }
560