1 package reaper
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "maps"
8 "strings"
9 "time"
10
11 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
12 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
13 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
15 "github.com/jackc/pgx/v5"
16 )
17
18 func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
19
20 var conn db.PgxIface
21 var md *sources.SourceConn
22 var err error
23 var tx pgx.Tx
24 if strings.TrimSpace(sql) == "" {
25 return nil, errors.New("empty SQL")
26 }
27 if md, err = GetMonitoredDatabaseByUniqueName(ctx, dbUnique); err != nil {
28 return nil, err
29 }
30 conn = md.Conn
31 if md.IsPostgresSource() {
32
33 if tx, err = conn.Begin(ctx); err != nil {
34 return nil, err
35 }
36 defer func() { _ = tx.Commit(ctx) }()
37 _, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
38 if err != nil {
39 return nil, err
40 }
41 conn = tx
42 } else {
43
44 args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
45 }
46 rows, err := conn.Query(ctx, sql, args...)
47 if err == nil {
48 return pgx.CollectRows(rows, metrics.RowToMeasurement)
49 }
50 return nil, err
51 }
52
53 func DetectSprocChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
54 detectedChanges := make(metrics.Measurements, 0)
55 var firstRun bool
56 var changeCounts ChangeDetectionResults
57
58 log.GetLogger(ctx).Debugf("[%s][%s] checking for sproc changes...", md.Name, specialMetricChangeEvents)
59 if _, ok := hostState["sproc_hashes"]; !ok {
60 firstRun = true
61 hostState["sproc_hashes"] = make(map[string]string)
62 }
63
64 mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
65 if !ok {
66 log.GetLogger(ctx).Error("could not get sproc_hashes sql")
67 return changeCounts
68 }
69
70 data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
71 if err != nil {
72 log.GetLogger(ctx).Error("could not read sproc_hashes from monitored host: ", md.Name, ", err:", err)
73 return changeCounts
74 }
75
76 for _, dr := range data {
77 objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
78 prevHash, ok := hostState["sproc_hashes"][objIdent]
79 if ok {
80 if prevHash != dr["md5"].(string) {
81 log.GetLogger(ctx).Info("detected change in sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
82 dr["event"] = "alter"
83 detectedChanges = append(detectedChanges, dr)
84 hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
85 changeCounts.Altered++
86 }
87 } else {
88 if !firstRun {
89 log.GetLogger(ctx).Info("detected new sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
90 dr["event"] = "create"
91 detectedChanges = append(detectedChanges, dr)
92 changeCounts.Created++
93 }
94 hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
95 }
96 }
97
98 if !firstRun && len(hostState["sproc_hashes"]) != len(data) {
99 deletedSProcs := make([]string, 0)
100
101 currentOidMap := make(map[string]bool)
102 for _, dr := range data {
103 currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
104 }
105 for sprocIdent := range hostState["sproc_hashes"] {
106 _, ok := currentOidMap[sprocIdent]
107 if !ok {
108 splits := strings.Split(sprocIdent, dbMetricJoinStr)
109 log.GetLogger(ctx).Info("detected delete of sproc:", splits[0], ", oid:", splits[1])
110 influxEntry := metrics.NewMeasurement(data.GetEpoch())
111 influxEntry["event"] = "drop"
112 influxEntry["tag_sproc"] = splits[0]
113 influxEntry["tag_oid"] = splits[1]
114 detectedChanges = append(detectedChanges, influxEntry)
115 deletedSProcs = append(deletedSProcs, sprocIdent)
116 changeCounts.Dropped++
117 }
118 }
119 for _, deletedSProc := range deletedSProcs {
120 delete(hostState["sproc_hashes"], deletedSProc)
121 }
122 }
123 log.GetLogger(ctx).Debugf("[%s][%s] detected %d sproc changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
124 if len(detectedChanges) > 0 {
125 storageCh <- []metrics.MeasurementEnvelope{{DBName: md.Name, MetricName: "sproc_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
126 }
127
128 return changeCounts
129 }
130
131 func DetectTableChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
132 detectedChanges := make(metrics.Measurements, 0)
133 var firstRun bool
134 var changeCounts ChangeDetectionResults
135
136 log.GetLogger(ctx).Debugf("[%s][%s] checking for table changes...", md.Name, specialMetricChangeEvents)
137 if _, ok := hostState["table_hashes"]; !ok {
138 firstRun = true
139 hostState["table_hashes"] = make(map[string]string)
140 }
141
142 mvp, ok := metricDefs.GetMetricDef("table_hashes")
143 if !ok {
144 log.GetLogger(ctx).Error("could not get table_hashes sql")
145 return changeCounts
146 }
147
148 data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
149 if err != nil {
150 log.GetLogger(ctx).Error("could not read table_hashes from monitored host:", md.Name, ", err:", err)
151 return changeCounts
152 }
153
154 for _, dr := range data {
155 objIdent := dr["tag_table"].(string)
156 prevHash, ok := hostState["table_hashes"][objIdent]
157
158 if ok {
159 if prevHash != dr["md5"].(string) {
160 log.GetLogger(ctx).Info("detected DDL change in table:", dr["tag_table"])
161 dr["event"] = "alter"
162 detectedChanges = append(detectedChanges, dr)
163 hostState["table_hashes"][objIdent] = dr["md5"].(string)
164 changeCounts.Altered++
165 }
166 } else {
167 if !firstRun {
168 log.GetLogger(ctx).Info("detected new table:", dr["tag_table"])
169 dr["event"] = "create"
170 detectedChanges = append(detectedChanges, dr)
171 changeCounts.Created++
172 }
173 hostState["table_hashes"][objIdent] = dr["md5"].(string)
174 }
175 }
176
177 if !firstRun && len(hostState["table_hashes"]) != len(data) {
178 deletedTables := make([]string, 0)
179
180 currentTableMap := make(map[string]bool)
181 for _, dr := range data {
182 currentTableMap[dr["tag_table"].(string)] = true
183 }
184 for table := range hostState["table_hashes"] {
185 _, ok := currentTableMap[table]
186 if !ok {
187 log.GetLogger(ctx).Info("detected drop of table:", table)
188 influxEntry := metrics.NewMeasurement(data.GetEpoch())
189 influxEntry["event"] = "drop"
190 influxEntry["tag_table"] = table
191 detectedChanges = append(detectedChanges, influxEntry)
192 deletedTables = append(deletedTables, table)
193 changeCounts.Dropped++
194 }
195 }
196 for _, deletedTable := range deletedTables {
197 delete(hostState["table_hashes"], deletedTable)
198 }
199 }
200
201 log.GetLogger(ctx).Debugf("[%s][%s] detected %d table changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
202 if len(detectedChanges) > 0 {
203 storageCh <- []metrics.MeasurementEnvelope{{DBName: md.Name, MetricName: "table_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
204 }
205
206 return changeCounts
207 }
208
209 func DetectIndexChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
210 detectedChanges := make(metrics.Measurements, 0)
211 var firstRun bool
212 var changeCounts ChangeDetectionResults
213
214 log.GetLogger(ctx).Debugf("[%s][%s] checking for index changes...", md.Name, specialMetricChangeEvents)
215 if _, ok := hostState["index_hashes"]; !ok {
216 firstRun = true
217 hostState["index_hashes"] = make(map[string]string)
218 }
219
220 mvp, ok := metricDefs.GetMetricDef("index_hashes")
221 if !ok {
222 log.GetLogger(ctx).Error("could not get index_hashes sql")
223 return changeCounts
224 }
225
226 data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
227 if err != nil {
228 log.GetLogger(ctx).Error("could not read index_hashes from monitored host:", md.Name, ", err:", err)
229 return changeCounts
230 }
231
232 for _, dr := range data {
233 objIdent := dr["tag_index"].(string)
234 prevHash, ok := hostState["index_hashes"][objIdent]
235 if ok {
236 if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
237 log.GetLogger(ctx).Info("detected index change:", dr["tag_index"], ", table:", dr["table"])
238 dr["event"] = "alter"
239 detectedChanges = append(detectedChanges, dr)
240 hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
241 changeCounts.Altered++
242 }
243 } else {
244 if !firstRun {
245 log.GetLogger(ctx).Info("detected new index:", dr["tag_index"])
246 dr["event"] = "create"
247 detectedChanges = append(detectedChanges, dr)
248 changeCounts.Created++
249 }
250 hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
251 }
252 }
253
254 if !firstRun && len(hostState["index_hashes"]) != len(data) {
255 deletedIndexes := make([]string, 0)
256
257 currentIndexMap := make(map[string]bool)
258 for _, dr := range data {
259 currentIndexMap[dr["tag_index"].(string)] = true
260 }
261 for indexName := range hostState["index_hashes"] {
262 _, ok := currentIndexMap[indexName]
263 if !ok {
264 log.GetLogger(ctx).Info("detected drop of index_name:", indexName)
265 influxEntry := metrics.NewMeasurement(data.GetEpoch())
266 influxEntry["event"] = "drop"
267 influxEntry["tag_index"] = indexName
268 detectedChanges = append(detectedChanges, influxEntry)
269 deletedIndexes = append(deletedIndexes, indexName)
270 changeCounts.Dropped++
271 }
272 }
273 for _, deletedIndex := range deletedIndexes {
274 delete(hostState["index_hashes"], deletedIndex)
275 }
276 }
277 log.GetLogger(ctx).Debugf("[%s][%s] detected %d index changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
278 if len(detectedChanges) > 0 {
279 storageCh <- []metrics.MeasurementEnvelope{{DBName: md.Name, MetricName: "index_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
280 }
281
282 return changeCounts
283 }
284
285 func DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
286 detectedChanges := make(metrics.Measurements, 0)
287 var firstRun bool
288 var changeCounts ChangeDetectionResults
289
290 log.GetLogger(ctx).Debugf("[%s][%s] checking object privilege changes...", md.Name, specialMetricChangeEvents)
291 if _, ok := hostState["object_privileges"]; !ok {
292 firstRun = true
293 hostState["object_privileges"] = make(map[string]string)
294 }
295
296 mvp, ok := metricDefs.GetMetricDef("privilege_changes")
297 if !ok || mvp.GetSQL(int(md.Version)) == "" {
298 log.GetLogger(ctx).Warningf("[%s][%s] could not get SQL for 'privilege_changes'. cannot detect privilege changes", md.Name, specialMetricChangeEvents)
299 return changeCounts
300 }
301
302
303 data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
304 if err != nil {
305 log.GetLogger(ctx).Errorf("[%s][%s] failed to fetch object privileges info: %v", md.Name, specialMetricChangeEvents, err)
306 return changeCounts
307 }
308
309 currentState := make(map[string]bool)
310 for _, dr := range data {
311 objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
312 if firstRun {
313 hostState["object_privileges"][objIdent] = ""
314 } else {
315 _, ok := hostState["object_privileges"][objIdent]
316 if !ok {
317 log.GetLogger(ctx).Infof("[%s][%s] detected new object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
318 md.Name, specialMetricChangeEvents, dr["tag_role"], dr["object_type"], dr["tag_object"], dr["privilege_type"])
319 dr["event"] = "GRANT"
320 detectedChanges = append(detectedChanges, dr)
321 changeCounts.Created++
322 hostState["object_privileges"][objIdent] = ""
323 }
324 currentState[objIdent] = true
325 }
326 }
327
328 if !firstRun && len(currentState) > 0 {
329 for objPrevRun := range hostState["object_privileges"] {
330 if _, ok := currentState[objPrevRun]; !ok {
331 splits := strings.Split(objPrevRun, "#:#")
332 log.GetLogger(ctx).Infof("[%s][%s] detected removed object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
333 md.Name, specialMetricChangeEvents, splits[1], splits[0], splits[2], splits[3])
334 revokeEntry := metrics.NewMeasurement(data.GetEpoch())
335 revokeEntry["object_type"] = splits[0]
336 revokeEntry["tag_role"] = splits[1]
337 revokeEntry["tag_object"] = splits[2]
338 revokeEntry["privilege_type"] = splits[3]
339 revokeEntry["event"] = "REVOKE"
340 detectedChanges = append(detectedChanges, revokeEntry)
341 changeCounts.Dropped++
342 delete(hostState["object_privileges"], objPrevRun)
343 }
344 }
345 }
346
347 log.GetLogger(ctx).Debugf("[%s][%s] detected %d object privilege changes...", md.Name, specialMetricChangeEvents, len(detectedChanges))
348 if len(detectedChanges) > 0 {
349 storageCh <- []metrics.MeasurementEnvelope{
350 {
351 DBName: md.Name,
352 MetricName: "privilege_changes",
353 Data: detectedChanges,
354 CustomTags: md.CustomTags,
355 }}
356 }
357
358 return changeCounts
359 }
360
361 func DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
362 detectedChanges := make(metrics.Measurements, 0)
363 var firstRun bool
364 var changeCounts ChangeDetectionResults
365
366 log.GetLogger(ctx).Debugf("[%s][%s] checking for configuration changes...", md.Name, specialMetricChangeEvents)
367 if _, ok := hostState["configuration_hashes"]; !ok {
368 firstRun = true
369 hostState["configuration_hashes"] = make(map[string]string)
370 }
371
372 mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
373 if !ok {
374 log.GetLogger(ctx).Errorf("[%s][%s] could not get configuration_hashes sql", md.Name, specialMetricChangeEvents)
375 return changeCounts
376 }
377
378 data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
379 if err != nil {
380 log.GetLogger(ctx).Errorf("[%s][%s] could not read configuration_hashes from monitored host: %v", md.Name, specialMetricChangeEvents, err)
381 return changeCounts
382 }
383
384 for _, dr := range data {
385 objIdent := dr["tag_setting"].(string)
386 objValue := dr["value"].(string)
387 prevРash, ok := hostState["configuration_hashes"][objIdent]
388 if ok {
389 if prevРash != objValue {
390 if objIdent == "connection_ID" {
391 continue
392 }
393 log.GetLogger(ctx).Warningf("[%s][%s] detected settings change: %s = %s (prev: %s)",
394 md.Name, specialMetricChangeEvents, objIdent, objValue, prevРash)
395 dr["event"] = "alter"
396 detectedChanges = append(detectedChanges, dr)
397 hostState["configuration_hashes"][objIdent] = objValue
398 changeCounts.Altered++
399 }
400 } else {
401 if !firstRun {
402 log.GetLogger(ctx).Warningf("[%s][%s] detected new setting: %s", md.Name, specialMetricChangeEvents, objIdent)
403 dr["event"] = "create"
404 detectedChanges = append(detectedChanges, dr)
405 changeCounts.Created++
406 }
407 hostState["configuration_hashes"][objIdent] = objValue
408 }
409 }
410
411 log.GetLogger(ctx).Debugf("[%s][%s] detected %d configuration changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
412 if len(detectedChanges) > 0 {
413 storageCh <- []metrics.MeasurementEnvelope{{
414 DBName: md.Name,
415 MetricName: "configuration_changes",
416 Data: detectedChanges,
417 CustomTags: md.CustomTags,
418 }}
419 }
420
421 return changeCounts
422 }
423
424 func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, md *sources.SourceConn, hostState map[string]map[string]string) {
425 storageCh := r.measurementCh
426 sprocCounts := DetectSprocChanges(ctx, md, storageCh, hostState)
427 tableCounts := DetectTableChanges(ctx, md, storageCh, hostState)
428 indexCounts := DetectIndexChanges(ctx, md, storageCh, hostState)
429 confCounts := DetectConfigurationChanges(ctx, md, storageCh, hostState)
430 privChangeCounts := DetectPrivilegeChanges(ctx, md, storageCh, hostState)
431
432
433 message := ""
434 if sprocCounts.Altered > 0 || sprocCounts.Created > 0 || sprocCounts.Dropped > 0 {
435 message += fmt.Sprintf(" sprocs %d/%d/%d", sprocCounts.Created, sprocCounts.Altered, sprocCounts.Dropped)
436 }
437 if tableCounts.Altered > 0 || tableCounts.Created > 0 || tableCounts.Dropped > 0 {
438 message += fmt.Sprintf(" tables/views %d/%d/%d", tableCounts.Created, tableCounts.Altered, tableCounts.Dropped)
439 }
440 if indexCounts.Altered > 0 || indexCounts.Created > 0 || indexCounts.Dropped > 0 {
441 message += fmt.Sprintf(" indexes %d/%d/%d", indexCounts.Created, indexCounts.Altered, indexCounts.Dropped)
442 }
443 if confCounts.Altered > 0 || confCounts.Created > 0 {
444 message += fmt.Sprintf(" configuration %d/%d/%d", confCounts.Created, confCounts.Altered, confCounts.Dropped)
445 }
446 if privChangeCounts.Dropped > 0 || privChangeCounts.Created > 0 {
447 message += fmt.Sprintf(" privileges %d/%d/%d", privChangeCounts.Created, privChangeCounts.Altered, privChangeCounts.Dropped)
448 }
449
450 if message > "" {
451 message = "Detected changes for \"" + dbUnique + "\" [Created/Altered/Dropped]:" + message
452 log.GetLogger(ctx).Info(message)
453 detectedChangesSummary := make(metrics.Measurements, 0)
454 influxEntry := metrics.NewMeasurement(time.Now().UnixNano())
455 influxEntry["details"] = message
456 detectedChangesSummary = append(detectedChangesSummary, influxEntry)
457 md, _ := GetMonitoredDatabaseByUniqueName(ctx, dbUnique)
458 storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique,
459 SourceType: string(md.Kind),
460 MetricName: "object_changes",
461 Data: detectedChangesSummary,
462 CustomTags: md.CustomTags,
463 }}
464
465 }
466 }
467
468
469
470
471 func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string {
472
473 sqlAvailable := `select name::text from pg_available_extensions`
474 extsCreated := make([]string, 0)
475
476
477 data, err := QueryMeasurements(ctx, dbUnique, sqlAvailable)
478 if err != nil {
479 log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", dbUnique, err)
480 return extsCreated
481 }
482
483 availableExts := make(map[string]bool)
484 for _, row := range data {
485 availableExts[row["name"].(string)] = true
486 }
487
488 for _, extToCreate := range extensionNames {
489 if _, ok := existingExtensions[extToCreate]; ok {
490 continue
491 }
492 _, ok := availableExts[extToCreate]
493 if !ok {
494 log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", dbUnique, extToCreate)
495 } else {
496 sqlCreateExt := `create extension ` + extToCreate
497 _, err := QueryMeasurements(ctx, dbUnique, sqlCreateExt)
498 if err != nil {
499 log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", dbUnique, extToCreate, err)
500 }
501 extsCreated = append(extsCreated, extToCreate)
502 }
503 }
504
505 return extsCreated
506 }
507
508
509 func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
510 sl := log.GetLogger(ctx).WithField("source", md.Name)
511 metrics := maps.Clone(md.Metrics)
512 maps.Insert(metrics, maps.All(md.MetricsStandby))
513 for metricName := range metrics {
514 metric, ok := metricDefs.GetMetricDef(metricName)
515 if !ok {
516 continue
517 }
518 if _, err = md.Conn.Exec(ctx, metric.InitSQL); err != nil {
519 return
520 }
521 sl.WithField("metric", metricName).Info("Successfully created metric fetching helpers")
522 }
523 return
524 }
525
526 func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) {
527 for _, prevDB := range r.prevLoopMonitoredDBs {
528 if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil {
529 prevDB.Conn.Close()
530 _ = r.SinksWriter.SyncMetric(prevDB.Name, "", "remove")
531 }
532 }
533
534 for roleChangedDB := range shutDownDueToRoleChange {
535 if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil {
536 db.Conn.Close()
537 }
538 _ = r.SinksWriter.SyncMetric(roleChangedDB, "", "remove")
539 }
540 }
541