1 package reaper
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "maps"
8 "strings"
9 "time"
10
11 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
12 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
13 "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
15 "github.com/jackc/pgx/v5"
16 )
17
18 func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
19 if strings.TrimSpace(sql) == "" {
20 return nil, errors.New("empty SQL")
21 }
22
23
24 if !md.IsPostgresSource() {
25 args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
26 }
27
28 rows, err := md.Conn.Query(ctx, sql, args...)
29 if err == nil {
30 return pgx.CollectRows(rows, metrics.RowToMeasurement)
31 }
32 return nil, err
33 }
34
35 func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults) {
36 detectedChanges := make(metrics.Measurements, 0)
37 var firstRun bool
38 l := log.GetLogger(ctx)
39 changeCounts.Target = "functions"
40 l.Debug("checking for sproc changes...")
41 if _, ok := md.ChangeState["sproc_hashes"]; !ok {
42 firstRun = true
43 md.ChangeState["sproc_hashes"] = make(map[string]string)
44 }
45
46 mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
47 if !ok {
48 l.Error("could not get sproc_hashes sql")
49 return
50 }
51
52 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
53 if err != nil {
54 l.Error(err)
55 return
56 }
57
58 for _, dr := range data {
59 objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
60 prevHash, ok := md.ChangeState["sproc_hashes"][objIdent]
61 ll := l.WithField("sproc", dr["tag_sproc"]).WithField("oid", dr["tag_oid"])
62 if ok {
63 if prevHash != dr["md5"].(string) {
64 ll.Debug("change detected")
65 dr["event"] = "alter"
66 detectedChanges = append(detectedChanges, dr)
67 md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
68 changeCounts.Altered++
69 }
70 } else {
71 if !firstRun {
72 ll.Debug("new sproc detected")
73 dr["event"] = "create"
74 detectedChanges = append(detectedChanges, dr)
75 changeCounts.Created++
76 }
77 md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
78 }
79 }
80
81 if !firstRun && len(md.ChangeState["sproc_hashes"]) != len(data) {
82
83 currentOidMap := make(map[string]bool)
84 for _, dr := range data {
85 currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
86 }
87 for sprocIdent := range md.ChangeState["sproc_hashes"] {
88 _, ok := currentOidMap[sprocIdent]
89 if !ok {
90 splits := strings.Split(sprocIdent, dbMetricJoinStr)
91 l.WithField("sproc", splits[0]).WithField("oid", splits[1]).Debug("deleted sproc detected")
92 m := metrics.NewMeasurement(data.GetEpoch())
93 m["event"] = "drop"
94 m["tag_sproc"] = splits[0]
95 m["tag_oid"] = splits[1]
96 detectedChanges = append(detectedChanges, m)
97 delete(md.ChangeState["sproc_hashes"], sprocIdent)
98 changeCounts.Dropped++
99 }
100 }
101 }
102 l.Debugf("sproc changes detected: %d", len(detectedChanges))
103 if len(detectedChanges) > 0 {
104 r.measurementCh <- metrics.MeasurementEnvelope{
105 DBName: md.Name,
106 MetricName: "sproc_changes",
107 Data: detectedChanges,
108 CustomTags: md.CustomTags,
109 }
110 }
111
112 return changeCounts
113 }
114
115 func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
116 detectedChanges := make(metrics.Measurements, 0)
117 var firstRun bool
118 var changeCounts ChangeDetectionResults
119 l := log.GetLogger(ctx)
120 changeCounts.Target = "tables"
121 l.Debug("checking for table changes...")
122 if _, ok := md.ChangeState["table_hashes"]; !ok {
123 firstRun = true
124 md.ChangeState["table_hashes"] = make(map[string]string)
125 }
126
127 mvp, ok := metricDefs.GetMetricDef("table_hashes")
128 if !ok {
129 l.Error("could not get table_hashes sql")
130 return changeCounts
131 }
132
133 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
134 if err != nil {
135 l.Error(err)
136 return changeCounts
137 }
138
139 for _, dr := range data {
140 objIdent := dr["tag_table"].(string)
141 prevHash, ok := md.ChangeState["table_hashes"][objIdent]
142 ll := l.WithField("table", dr["tag_table"])
143 if ok {
144 if prevHash != dr["md5"].(string) {
145 ll.Debug("change detected")
146 dr["event"] = "alter"
147 detectedChanges = append(detectedChanges, dr)
148 md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
149 changeCounts.Altered++
150 }
151 } else {
152 if !firstRun {
153 ll.Debug("new table detected")
154 dr["event"] = "create"
155 detectedChanges = append(detectedChanges, dr)
156 changeCounts.Created++
157 }
158 md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
159 }
160 }
161
162 if !firstRun && len(md.ChangeState["table_hashes"]) != len(data) {
163 deletedTables := make([]string, 0)
164
165 currentTableMap := make(map[string]bool)
166 for _, dr := range data {
167 currentTableMap[dr["tag_table"].(string)] = true
168 }
169 for table := range md.ChangeState["table_hashes"] {
170 _, ok := currentTableMap[table]
171 if !ok {
172 l.WithField("table", table).Debug("deleted table detected")
173 influxEntry := metrics.NewMeasurement(data.GetEpoch())
174 influxEntry["event"] = "drop"
175 influxEntry["tag_table"] = table
176 detectedChanges = append(detectedChanges, influxEntry)
177 deletedTables = append(deletedTables, table)
178 changeCounts.Dropped++
179 }
180 }
181 for _, deletedTable := range deletedTables {
182 delete(md.ChangeState["table_hashes"], deletedTable)
183 }
184 }
185
186 l.Debugf("table changes detected: %d", len(detectedChanges))
187 if len(detectedChanges) > 0 {
188 r.measurementCh <- metrics.MeasurementEnvelope{
189 DBName: md.Name,
190 MetricName: "table_changes",
191 Data: detectedChanges,
192 CustomTags: md.CustomTags,
193 }
194 }
195
196 return changeCounts
197 }
198
199 func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
200 detectedChanges := make(metrics.Measurements, 0)
201 var firstRun bool
202 var changeCounts ChangeDetectionResults
203 l := log.GetLogger(ctx)
204 changeCounts.Target = "indexes"
205 l.Debug("checking for index changes...")
206 if _, ok := md.ChangeState["index_hashes"]; !ok {
207 firstRun = true
208 md.ChangeState["index_hashes"] = make(map[string]string)
209 }
210
211 mvp, ok := metricDefs.GetMetricDef("index_hashes")
212 if !ok {
213 l.Error("could not get index_hashes sql")
214 return changeCounts
215 }
216
217 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
218 if err != nil {
219 l.Error(err)
220 return changeCounts
221 }
222
223 for _, dr := range data {
224 objIdent := dr["tag_index"].(string)
225 prevHash, ok := md.ChangeState["index_hashes"][objIdent]
226 ll := l.WithField("index", dr["tag_index"]).WithField("table", dr["table"])
227 if ok {
228 if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
229 ll.Debug("change detected")
230 dr["event"] = "alter"
231 detectedChanges = append(detectedChanges, dr)
232 md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
233 changeCounts.Altered++
234 }
235 } else {
236 if !firstRun {
237 ll.Debug("new index detected")
238 dr["event"] = "create"
239 detectedChanges = append(detectedChanges, dr)
240 changeCounts.Created++
241 }
242 md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
243 }
244 }
245
246 if !firstRun && len(md.ChangeState["index_hashes"]) != len(data) {
247 deletedIndexes := make([]string, 0)
248
249 currentIndexMap := make(map[string]bool)
250 for _, dr := range data {
251 currentIndexMap[dr["tag_index"].(string)] = true
252 }
253 for indexName := range md.ChangeState["index_hashes"] {
254 _, ok := currentIndexMap[indexName]
255 if !ok {
256 l.WithField("index", indexName).Debug("deleted index detected")
257 influxEntry := metrics.NewMeasurement(data.GetEpoch())
258 influxEntry["event"] = "drop"
259 influxEntry["tag_index"] = indexName
260 detectedChanges = append(detectedChanges, influxEntry)
261 deletedIndexes = append(deletedIndexes, indexName)
262 changeCounts.Dropped++
263 }
264 }
265 for _, deletedIndex := range deletedIndexes {
266 delete(md.ChangeState["index_hashes"], deletedIndex)
267 }
268 }
269 l.Debugf("index changes detected: %d", len(detectedChanges))
270 if len(detectedChanges) > 0 {
271 r.measurementCh <- metrics.MeasurementEnvelope{
272 DBName: md.Name,
273 MetricName: "index_changes",
274 Data: detectedChanges,
275 CustomTags: md.CustomTags,
276 }
277 }
278
279 return changeCounts
280 }
281
282 func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
283 detectedChanges := make(metrics.Measurements, 0)
284 var firstRun bool
285 var changeCounts ChangeDetectionResults
286 l := log.GetLogger(ctx)
287 changeCounts.Target = "privileges"
288 l.Debug("checking object privilege changes...")
289 if _, ok := md.ChangeState["object_privileges"]; !ok {
290 firstRun = true
291 md.ChangeState["object_privileges"] = make(map[string]string)
292 }
293
294 mvp, ok := metricDefs.GetMetricDef("privilege_changes")
295 if !ok || mvp.GetSQL(int(md.Version)) == "" {
296 l.Warning("could not get SQL for 'privilege_changes'. cannot detect privilege changes")
297 return changeCounts
298 }
299
300
301 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
302 if err != nil {
303 l.Error(err)
304 return changeCounts
305 }
306
307 currentState := make(map[string]bool)
308 for _, dr := range data {
309 objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
310 ll := l.WithField("role", dr["tag_role"]).
311 WithField("object_type", dr["object_type"]).
312 WithField("object", dr["tag_object"]).
313 WithField("privilege_type", dr["privilege_type"])
314 if firstRun {
315 md.ChangeState["object_privileges"][objIdent] = ""
316 } else {
317 _, ok := md.ChangeState["object_privileges"][objIdent]
318 if !ok {
319 ll.Debug("new object privileges detected")
320 dr["event"] = "GRANT"
321 detectedChanges = append(detectedChanges, dr)
322 changeCounts.Created++
323 md.ChangeState["object_privileges"][objIdent] = ""
324 }
325 currentState[objIdent] = true
326 }
327 }
328
329 if !firstRun && len(currentState) > 0 {
330 for objPrevRun := range md.ChangeState["object_privileges"] {
331 if _, ok := currentState[objPrevRun]; !ok {
332 splits := strings.Split(objPrevRun, "#:#")
333 l.WithField("role", splits[1]).
334 WithField("object_type", splits[0]).
335 WithField("object", splits[2]).
336 WithField("privilege_type", splits[3]).
337 Debug("removed object privileges detected")
338 revokeEntry := metrics.NewMeasurement(data.GetEpoch())
339 revokeEntry["object_type"] = splits[0]
340 revokeEntry["tag_role"] = splits[1]
341 revokeEntry["tag_object"] = splits[2]
342 revokeEntry["privilege_type"] = splits[3]
343 revokeEntry["event"] = "REVOKE"
344 detectedChanges = append(detectedChanges, revokeEntry)
345 changeCounts.Dropped++
346 delete(md.ChangeState["object_privileges"], objPrevRun)
347 }
348 }
349 }
350
351 l.Debugf("object privilege changes detected: %d", len(detectedChanges))
352 if len(detectedChanges) > 0 {
353 r.measurementCh <- metrics.MeasurementEnvelope{
354 DBName: md.Name,
355 MetricName: "privilege_changes",
356 Data: detectedChanges,
357 CustomTags: md.CustomTags,
358 }
359 }
360
361 return changeCounts
362 }
363
364 func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
365 detectedChanges := make(metrics.Measurements, 0)
366 var firstRun bool
367 var changeCounts ChangeDetectionResults
368 l := log.GetLogger(ctx)
369 changeCounts.Target = "settings"
370 l.Debug("checking for configuration changes...")
371 if _, ok := md.ChangeState["configuration_hashes"]; !ok {
372 firstRun = true
373 md.ChangeState["configuration_hashes"] = make(map[string]string)
374 }
375
376 mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
377 if !ok {
378 l.Error("could not get configuration_hashes sql")
379 return changeCounts
380 }
381
382 rows, err := md.Conn.Query(ctx, mvp.GetSQL(md.Version))
383 if err != nil {
384 l.Error(err)
385 return changeCounts
386 }
387 defer rows.Close()
388 var (
389 objIdent, objValue string
390 epoch int64
391 )
392 for rows.Next() {
393 if rows.Scan(&epoch, &objIdent, &objValue) != nil {
394 return changeCounts
395 }
396 prevРash, ok := md.ChangeState["configuration_hashes"][objIdent]
397 ll := l.WithField("setting", objIdent)
398 if ok {
399 if prevРash != objValue {
400 ll.Warningf("settings change detected: %s = %s (prev: %s)", objIdent, objValue, prevРash)
401 detectedChanges = append(detectedChanges, metrics.Measurement{
402 metrics.EpochColumnName: epoch,
403 "tag_setting": objIdent,
404 "value": objValue,
405 "event": "alter"})
406 md.ChangeState["configuration_hashes"][objIdent] = objValue
407 changeCounts.Altered++
408 }
409 } else {
410 md.ChangeState["configuration_hashes"][objIdent] = objValue
411 if firstRun {
412 continue
413 }
414 ll.Debug("new setting detected")
415 detectedChanges = append(detectedChanges, metrics.Measurement{
416 metrics.EpochColumnName: epoch,
417 "tag_setting": objIdent,
418 "value": objValue,
419 "event": "create"})
420 changeCounts.Created++
421 }
422 }
423
424 l.Debugf("configuration changes detected: %d", len(detectedChanges))
425 if len(detectedChanges) > 0 {
426 r.measurementCh <- metrics.MeasurementEnvelope{
427 DBName: md.Name,
428 MetricName: "configuration_changes",
429 Data: detectedChanges,
430 CustomTags: md.CustomTags,
431 }
432 }
433 return changeCounts
434 }
435
436
437
438 func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
439 err := md.Conn.Ping(ctx)
440 return metrics.Measurements{
441 metrics.Measurement{
442 metrics.EpochColumnName: time.Now().UnixNano(),
443 "instance_up": func() int {
444 if err == nil {
445 return 1
446 }
447 return 0
448 }(),
449 },
450 }, err
451 }
452
453 func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
454 md.Lock()
455 defer md.Unlock()
456
457 spN := r.DetectSprocChanges(ctx, md)
458 tblN := r.DetectTableChanges(ctx, md)
459 idxN := r.DetectIndexChanges(ctx, md)
460 cnfN := r.DetectConfigurationChanges(ctx, md)
461 privN := r.DetectPrivilegeChanges(ctx, md)
462
463 if spN.Total()+tblN.Total()+idxN.Total()+cnfN.Total()+privN.Total() == 0 {
464 return nil, nil
465 }
466
467 m := metrics.NewMeasurement(time.Now().UnixNano())
468 m["details"] = strings.Join([]string{spN.String(), tblN.String(), idxN.String(), cnfN.String(), privN.String()}, " ")
469 return metrics.Measurements{m}, nil
470 }
471
472
473
474
475 func TryCreateMissingExtensions(ctx context.Context, md *sources.SourceConn, extensionNames []string, existingExtensions map[string]int) []string {
476
477 sqlAvailable := `select name::text from pg_available_extensions`
478 extsCreated := make([]string, 0)
479
480
481 data, err := QueryMeasurements(ctx, md, sqlAvailable)
482 if err != nil {
483 log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", md, err)
484 return extsCreated
485 }
486
487 availableExts := make(map[string]bool)
488 for _, row := range data {
489 availableExts[row["name"].(string)] = true
490 }
491
492 for _, extToCreate := range extensionNames {
493 if _, ok := existingExtensions[extToCreate]; ok {
494 continue
495 }
496 _, ok := availableExts[extToCreate]
497 if !ok {
498 log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", md, extToCreate)
499 } else {
500 sqlCreateExt := `create extension ` + extToCreate
501 _, err := QueryMeasurements(ctx, md, sqlCreateExt)
502 if err != nil {
503 log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", md, extToCreate, err)
504 }
505 extsCreated = append(extsCreated, extToCreate)
506 }
507 }
508
509 return extsCreated
510 }
511
512
513 func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
514 sl := log.GetLogger(ctx).WithField("source", md.Name)
515 metrics := maps.Clone(md.Metrics)
516 maps.Insert(metrics, maps.All(md.MetricsStandby))
517 for metricName := range metrics {
518 metric, ok := metricDefs.GetMetricDef(metricName)
519 if !ok {
520 continue
521 }
522 if _, err = md.Conn.Exec(ctx, metric.InitSQL); err != nil {
523 return
524 }
525 sl.WithField("metric", metricName).Info("Successfully created metric fetching helpers")
526 }
527 return
528 }
529
530 func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) {
531 for _, prevDB := range r.prevLoopMonitoredDBs {
532 if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil {
533 prevDB.Conn.Close()
534 _ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
535 }
536 }
537
538 for roleChangedDB := range shutDownDueToRoleChange {
539 if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil {
540 db.Conn.Close()
541 }
542 _ = r.SinksWriter.SyncMetric(roleChangedDB, "", sinks.DeleteOp)
543 }
544 }
545