1 package reaper
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "time"
9
10 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
11 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
12 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
13 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
14 "github.com/jackc/pgx/v5"
15 )
16
17 func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
18 if strings.TrimSpace(sql) == "" {
19 return nil, errors.New("empty SQL")
20 }
21
22
23 if !md.IsPostgresSource() {
24 args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
25 }
26
27 rows, err := md.Conn.Query(ctx, sql, args...)
28 if err == nil {
29 return pgx.CollectRows(rows, metrics.RowToMeasurement)
30 }
31 return nil, err
32 }
33
34 func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults) {
35 detectedChanges := make(metrics.Measurements, 0)
36 var firstRun bool
37 l := log.GetLogger(ctx)
38 changeCounts.Target = "functions"
39 l.Debug("checking for sproc changes...")
40 if _, ok := md.ChangeState["sproc_hashes"]; !ok {
41 firstRun = true
42 md.ChangeState["sproc_hashes"] = make(map[string]string)
43 }
44
45 mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
46 if !ok {
47 l.Error("could not get sproc_hashes sql")
48 return
49 }
50
51 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
52 if err != nil {
53 l.Error(err)
54 return
55 }
56
57 for _, dr := range data {
58 objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
59 prevHash, ok := md.ChangeState["sproc_hashes"][objIdent]
60 ll := l.WithField("sproc", dr["tag_sproc"]).WithField("oid", dr["tag_oid"])
61 if ok {
62 if prevHash != dr["md5"].(string) {
63 ll.Debug("change detected")
64 dr["event"] = "alter"
65 detectedChanges = append(detectedChanges, dr)
66 md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
67 changeCounts.Altered++
68 }
69 } else {
70 if !firstRun {
71 ll.Debug("new sproc detected")
72 dr["event"] = "create"
73 detectedChanges = append(detectedChanges, dr)
74 changeCounts.Created++
75 }
76 md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
77 }
78 }
79
80 if !firstRun && len(md.ChangeState["sproc_hashes"]) != len(data) {
81
82 currentOidMap := make(map[string]bool)
83 for _, dr := range data {
84 currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
85 }
86 for sprocIdent := range md.ChangeState["sproc_hashes"] {
87 _, ok := currentOidMap[sprocIdent]
88 if !ok {
89 splits := strings.Split(sprocIdent, dbMetricJoinStr)
90 l.WithField("sproc", splits[0]).WithField("oid", splits[1]).Debug("deleted sproc detected")
91 m := metrics.NewMeasurement(data.GetEpoch())
92 m["event"] = "drop"
93 m["tag_sproc"] = splits[0]
94 m["tag_oid"] = splits[1]
95 detectedChanges = append(detectedChanges, m)
96 delete(md.ChangeState["sproc_hashes"], sprocIdent)
97 changeCounts.Dropped++
98 }
99 }
100 }
101 l.Debugf("sproc changes detected: %d", len(detectedChanges))
102 if len(detectedChanges) > 0 {
103 r.measurementCh <- metrics.MeasurementEnvelope{
104 DBName: md.Name,
105 MetricName: "sproc_changes",
106 Data: detectedChanges,
107 CustomTags: md.CustomTags,
108 }
109 }
110
111 return changeCounts
112 }
113
114 func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
115 detectedChanges := make(metrics.Measurements, 0)
116 var firstRun bool
117 var changeCounts ChangeDetectionResults
118 l := log.GetLogger(ctx)
119 changeCounts.Target = "tables"
120 l.Debug("checking for table changes...")
121 if _, ok := md.ChangeState["table_hashes"]; !ok {
122 firstRun = true
123 md.ChangeState["table_hashes"] = make(map[string]string)
124 }
125
126 mvp, ok := metricDefs.GetMetricDef("table_hashes")
127 if !ok {
128 l.Error("could not get table_hashes sql")
129 return changeCounts
130 }
131
132 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
133 if err != nil {
134 l.Error(err)
135 return changeCounts
136 }
137
138 for _, dr := range data {
139 objIdent := dr["tag_table"].(string)
140 prevHash, ok := md.ChangeState["table_hashes"][objIdent]
141 ll := l.WithField("table", dr["tag_table"])
142 if ok {
143 if prevHash != dr["md5"].(string) {
144 ll.Debug("change detected")
145 dr["event"] = "alter"
146 detectedChanges = append(detectedChanges, dr)
147 md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
148 changeCounts.Altered++
149 }
150 } else {
151 if !firstRun {
152 ll.Debug("new table detected")
153 dr["event"] = "create"
154 detectedChanges = append(detectedChanges, dr)
155 changeCounts.Created++
156 }
157 md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
158 }
159 }
160
161 if !firstRun && len(md.ChangeState["table_hashes"]) != len(data) {
162 deletedTables := make([]string, 0)
163
164 currentTableMap := make(map[string]bool)
165 for _, dr := range data {
166 currentTableMap[dr["tag_table"].(string)] = true
167 }
168 for table := range md.ChangeState["table_hashes"] {
169 _, ok := currentTableMap[table]
170 if !ok {
171 l.WithField("table", table).Debug("deleted table detected")
172 influxEntry := metrics.NewMeasurement(data.GetEpoch())
173 influxEntry["event"] = "drop"
174 influxEntry["tag_table"] = table
175 detectedChanges = append(detectedChanges, influxEntry)
176 deletedTables = append(deletedTables, table)
177 changeCounts.Dropped++
178 }
179 }
180 for _, deletedTable := range deletedTables {
181 delete(md.ChangeState["table_hashes"], deletedTable)
182 }
183 }
184
185 l.Debugf("table changes detected: %d", len(detectedChanges))
186 if len(detectedChanges) > 0 {
187 r.measurementCh <- metrics.MeasurementEnvelope{
188 DBName: md.Name,
189 MetricName: "table_changes",
190 Data: detectedChanges,
191 CustomTags: md.CustomTags,
192 }
193 }
194
195 return changeCounts
196 }
197
198 func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
199 detectedChanges := make(metrics.Measurements, 0)
200 var firstRun bool
201 var changeCounts ChangeDetectionResults
202 l := log.GetLogger(ctx)
203 changeCounts.Target = "indexes"
204 l.Debug("checking for index changes...")
205 if _, ok := md.ChangeState["index_hashes"]; !ok {
206 firstRun = true
207 md.ChangeState["index_hashes"] = make(map[string]string)
208 }
209
210 mvp, ok := metricDefs.GetMetricDef("index_hashes")
211 if !ok {
212 l.Error("could not get index_hashes sql")
213 return changeCounts
214 }
215
216 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
217 if err != nil {
218 l.Error(err)
219 return changeCounts
220 }
221
222 for _, dr := range data {
223 objIdent := dr["tag_index"].(string)
224 prevHash, ok := md.ChangeState["index_hashes"][objIdent]
225 ll := l.WithField("index", dr["tag_index"]).WithField("table", dr["table"])
226 if ok {
227 if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
228 ll.Debug("change detected")
229 dr["event"] = "alter"
230 detectedChanges = append(detectedChanges, dr)
231 md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
232 changeCounts.Altered++
233 }
234 } else {
235 if !firstRun {
236 ll.Debug("new index detected")
237 dr["event"] = "create"
238 detectedChanges = append(detectedChanges, dr)
239 changeCounts.Created++
240 }
241 md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
242 }
243 }
244
245 if !firstRun && len(md.ChangeState["index_hashes"]) != len(data) {
246 deletedIndexes := make([]string, 0)
247
248 currentIndexMap := make(map[string]bool)
249 for _, dr := range data {
250 currentIndexMap[dr["tag_index"].(string)] = true
251 }
252 for indexName := range md.ChangeState["index_hashes"] {
253 _, ok := currentIndexMap[indexName]
254 if !ok {
255 l.WithField("index", indexName).Debug("deleted index detected")
256 influxEntry := metrics.NewMeasurement(data.GetEpoch())
257 influxEntry["event"] = "drop"
258 influxEntry["tag_index"] = indexName
259 detectedChanges = append(detectedChanges, influxEntry)
260 deletedIndexes = append(deletedIndexes, indexName)
261 changeCounts.Dropped++
262 }
263 }
264 for _, deletedIndex := range deletedIndexes {
265 delete(md.ChangeState["index_hashes"], deletedIndex)
266 }
267 }
268 l.Debugf("index changes detected: %d", len(detectedChanges))
269 if len(detectedChanges) > 0 {
270 r.measurementCh <- metrics.MeasurementEnvelope{
271 DBName: md.Name,
272 MetricName: "index_changes",
273 Data: detectedChanges,
274 CustomTags: md.CustomTags,
275 }
276 }
277
278 return changeCounts
279 }
280
281 func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
282 detectedChanges := make(metrics.Measurements, 0)
283 var firstRun bool
284 var changeCounts ChangeDetectionResults
285 l := log.GetLogger(ctx)
286 changeCounts.Target = "privileges"
287 l.Debug("checking object privilege changes...")
288 if _, ok := md.ChangeState["object_privileges"]; !ok {
289 firstRun = true
290 md.ChangeState["object_privileges"] = make(map[string]string)
291 }
292
293 mvp, ok := metricDefs.GetMetricDef("privilege_changes")
294 if !ok || mvp.GetSQL(int(md.Version)) == "" {
295 l.Warning("could not get SQL for 'privilege_changes'. cannot detect privilege changes")
296 return changeCounts
297 }
298
299
300 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
301 if err != nil {
302 l.Error(err)
303 return changeCounts
304 }
305
306 currentState := make(map[string]bool)
307 for _, dr := range data {
308 objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
309 ll := l.WithField("role", dr["tag_role"]).
310 WithField("object_type", dr["object_type"]).
311 WithField("object", dr["tag_object"]).
312 WithField("privilege_type", dr["privilege_type"])
313 if firstRun {
314 md.ChangeState["object_privileges"][objIdent] = ""
315 } else {
316 _, ok := md.ChangeState["object_privileges"][objIdent]
317 if !ok {
318 ll.Debug("new object privileges detected")
319 dr["event"] = "GRANT"
320 detectedChanges = append(detectedChanges, dr)
321 changeCounts.Created++
322 md.ChangeState["object_privileges"][objIdent] = ""
323 }
324 currentState[objIdent] = true
325 }
326 }
327
328 if !firstRun && len(currentState) > 0 {
329 for objPrevRun := range md.ChangeState["object_privileges"] {
330 if _, ok := currentState[objPrevRun]; !ok {
331 splits := strings.Split(objPrevRun, "#:#")
332 l.WithField("role", splits[1]).
333 WithField("object_type", splits[0]).
334 WithField("object", splits[2]).
335 WithField("privilege_type", splits[3]).
336 Debug("removed object privileges detected")
337 revokeEntry := metrics.NewMeasurement(data.GetEpoch())
338 revokeEntry["object_type"] = splits[0]
339 revokeEntry["tag_role"] = splits[1]
340 revokeEntry["tag_object"] = splits[2]
341 revokeEntry["privilege_type"] = splits[3]
342 revokeEntry["event"] = "REVOKE"
343 detectedChanges = append(detectedChanges, revokeEntry)
344 changeCounts.Dropped++
345 delete(md.ChangeState["object_privileges"], objPrevRun)
346 }
347 }
348 }
349
350 l.Debugf("object privilege changes detected: %d", len(detectedChanges))
351 if len(detectedChanges) > 0 {
352 r.measurementCh <- metrics.MeasurementEnvelope{
353 DBName: md.Name,
354 MetricName: "privilege_changes",
355 Data: detectedChanges,
356 CustomTags: md.CustomTags,
357 }
358 }
359
360 return changeCounts
361 }
362
363 func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
364 detectedChanges := make(metrics.Measurements, 0)
365 var firstRun bool
366 var changeCounts ChangeDetectionResults
367 l := log.GetLogger(ctx)
368 changeCounts.Target = "settings"
369 l.Debug("checking for configuration changes...")
370 if _, ok := md.ChangeState["configuration_hashes"]; !ok {
371 firstRun = true
372 md.ChangeState["configuration_hashes"] = make(map[string]string)
373 }
374
375 mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
376 if !ok {
377 l.Error("could not get configuration_hashes sql")
378 return changeCounts
379 }
380
381 rows, err := md.Conn.Query(ctx, mvp.GetSQL(md.Version))
382 if err != nil {
383 l.Error(err)
384 return changeCounts
385 }
386 defer rows.Close()
387 var (
388 objIdent, objValue string
389 epoch int64
390 )
391 for rows.Next() {
392 if rows.Scan(&epoch, &objIdent, &objValue) != nil {
393 return changeCounts
394 }
395 prevРash, ok := md.ChangeState["configuration_hashes"][objIdent]
396 ll := l.WithField("setting", objIdent)
397 if ok {
398 if prevРash != objValue {
399 ll.Warningf("settings change detected: %s = %s (prev: %s)", objIdent, objValue, prevРash)
400 detectedChanges = append(detectedChanges, metrics.Measurement{
401 metrics.EpochColumnName: epoch,
402 "tag_setting": objIdent,
403 "value": objValue,
404 "event": "alter"})
405 md.ChangeState["configuration_hashes"][objIdent] = objValue
406 changeCounts.Altered++
407 }
408 } else {
409 md.ChangeState["configuration_hashes"][objIdent] = objValue
410 if firstRun {
411 continue
412 }
413 ll.Debug("new setting detected")
414 detectedChanges = append(detectedChanges, metrics.Measurement{
415 metrics.EpochColumnName: epoch,
416 "tag_setting": objIdent,
417 "value": objValue,
418 "event": "create"})
419 changeCounts.Created++
420 }
421 }
422
423 l.Debugf("configuration changes detected: %d", len(detectedChanges))
424 if len(detectedChanges) > 0 {
425 r.measurementCh <- metrics.MeasurementEnvelope{
426 DBName: md.Name,
427 MetricName: "configuration_changes",
428 Data: detectedChanges,
429 CustomTags: md.CustomTags,
430 }
431 }
432 return changeCounts
433 }
434
435
436
437 func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
438 return metrics.Measurements{
439 metrics.Measurement{
440 metrics.EpochColumnName: time.Now().UnixNano(),
441 "instance_up": func() int {
442 if md.Conn.Ping(ctx) == nil {
443 return 1
444 }
445 return 0
446 }(),
447 },
448 }, nil
449 }
450
451 func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
452 md.Lock()
453 defer md.Unlock()
454
455 spN := r.DetectSprocChanges(ctx, md)
456 tblN := r.DetectTableChanges(ctx, md)
457 idxN := r.DetectIndexChanges(ctx, md)
458 cnfN := r.DetectConfigurationChanges(ctx, md)
459 privN := r.DetectPrivilegeChanges(ctx, md)
460
461 if spN.Total()+tblN.Total()+idxN.Total()+cnfN.Total()+privN.Total() == 0 {
462 return nil, nil
463 }
464
465 m := metrics.NewMeasurement(time.Now().UnixNano())
466 m["details"] = strings.Join([]string{spN.String(), tblN.String(), idxN.String(), cnfN.String(), privN.String()}, " ")
467 return metrics.Measurements{m}, nil
468 }
469
470 func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool) {
471 for _, prevDB := range r.prevLoopMonitoredDBs {
472 if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil {
473 prevDB.Conn.Close()
474 _ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
475 }
476 }
477
478 for toShutDownDB := range hostsToShutDown {
479 if db := r.monitoredSources.GetMonitoredDatabase(toShutDownDB); db != nil {
480 db.Conn.Close()
481 }
482 _ = r.SinksWriter.SyncMetric(toShutDownDB, "", sinks.DeleteOp)
483 }
484 }
485