1 package reaper
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "time"
9
10 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
11 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
12 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
13 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
14 "github.com/jackc/pgx/v5"
15 )
16
17 func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
18 if strings.TrimSpace(sql) == "" {
19 return nil, errors.New("empty SQL")
20 }
21
22 if !md.IsPostgresSource() {
23 args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
24 }
25
26 rows, err := md.Conn.Query(ctx, sql, args...)
27 if err == nil {
28 return pgx.CollectRows(rows, metrics.RowToMeasurement)
29 }
30 return nil, err
31 }
32
33 func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults) {
34 detectedChanges := make(metrics.Measurements, 0)
35 var firstRun bool
36 l := log.GetLogger(ctx)
37 changeCounts.Target = "functions"
38 l.Debug("checking for sproc changes...")
39 if _, ok := md.ChangeState["sproc_hashes"]; !ok {
40 firstRun = true
41 md.ChangeState["sproc_hashes"] = make(map[string]string)
42 }
43 mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
44 if !ok {
45 l.Error("could not get sproc_hashes sql")
46 return
47 }
48 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
49 if err != nil {
50 l.Error(err)
51 return
52 }
53 for _, dr := range data {
54 objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
55 prevHash, ok := md.ChangeState["sproc_hashes"][objIdent]
56 ll := l.WithField("sproc", dr["tag_sproc"]).WithField("oid", dr["tag_oid"])
57 if ok {
58 if prevHash != dr["md5"].(string) {
59 ll.Debug("change detected")
60 dr["event"] = "alter"
61 detectedChanges = append(detectedChanges, dr)
62 md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
63 changeCounts.Altered++
64 }
65 } else {
66 if !firstRun {
67 ll.Debug("new sproc detected")
68 dr["event"] = "create"
69 detectedChanges = append(detectedChanges, dr)
70 changeCounts.Created++
71 }
72 md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
73 }
74 }
75
76 if !firstRun && len(md.ChangeState["sproc_hashes"]) != len(data) {
77
78 currentOidMap := make(map[string]bool)
79 for _, dr := range data {
80 currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
81 }
82 for sprocIdent := range md.ChangeState["sproc_hashes"] {
83 _, ok := currentOidMap[sprocIdent]
84 if !ok {
85 splits := strings.Split(sprocIdent, dbMetricJoinStr)
86 l.WithField("sproc", splits[0]).WithField("oid", splits[1]).Debug("deleted sproc detected")
87 m := metrics.NewMeasurement(data.GetEpoch())
88 m["event"] = "drop"
89 m["tag_sproc"] = splits[0]
90 m["tag_oid"] = splits[1]
91 detectedChanges = append(detectedChanges, m)
92 delete(md.ChangeState["sproc_hashes"], sprocIdent)
93 changeCounts.Dropped++
94 }
95 }
96 }
97 l.Debugf("sproc changes detected: %d", len(detectedChanges))
98 if len(detectedChanges) > 0 {
99 r.measurementCh <- metrics.MeasurementEnvelope{
100 DBName: md.Name,
101 MetricName: "sproc_changes",
102 Data: detectedChanges,
103 CustomTags: md.CustomTags,
104 }
105 }
106 return changeCounts
107 }
108
109 func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
110 detectedChanges := make(metrics.Measurements, 0)
111 var firstRun bool
112 var changeCounts ChangeDetectionResults
113 l := log.GetLogger(ctx)
114 changeCounts.Target = "tables"
115 l.Debug("checking for table changes...")
116 if _, ok := md.ChangeState["table_hashes"]; !ok {
117 firstRun = true
118 md.ChangeState["table_hashes"] = make(map[string]string)
119 }
120 mvp, ok := metricDefs.GetMetricDef("table_hashes")
121 if !ok {
122 l.Error("could not get table_hashes sql")
123 return changeCounts
124 }
125 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
126 if err != nil {
127 l.Error(err)
128 return changeCounts
129 }
130 for _, dr := range data {
131 objIdent := dr["tag_table"].(string)
132 prevHash, ok := md.ChangeState["table_hashes"][objIdent]
133 ll := l.WithField("table", dr["tag_table"])
134 if ok {
135 if prevHash != dr["md5"].(string) {
136 ll.Debug("change detected")
137 dr["event"] = "alter"
138 detectedChanges = append(detectedChanges, dr)
139 md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
140 changeCounts.Altered++
141 }
142 } else {
143 if !firstRun {
144 ll.Debug("new table detected")
145 dr["event"] = "create"
146 detectedChanges = append(detectedChanges, dr)
147 changeCounts.Created++
148 }
149 md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
150 }
151 }
152
153 if !firstRun && len(md.ChangeState["table_hashes"]) != len(data) {
154 deletedTables := make([]string, 0)
155
156 currentTableMap := make(map[string]bool)
157 for _, dr := range data {
158 currentTableMap[dr["tag_table"].(string)] = true
159 }
160 for table := range md.ChangeState["table_hashes"] {
161 _, ok := currentTableMap[table]
162 if !ok {
163 l.WithField("table", table).Debug("deleted table detected")
164 influxEntry := metrics.NewMeasurement(data.GetEpoch())
165 influxEntry["event"] = "drop"
166 influxEntry["tag_table"] = table
167 detectedChanges = append(detectedChanges, influxEntry)
168 deletedTables = append(deletedTables, table)
169 changeCounts.Dropped++
170 }
171 }
172 for _, deletedTable := range deletedTables {
173 delete(md.ChangeState["table_hashes"], deletedTable)
174 }
175 }
176 l.Debugf("table changes detected: %d", len(detectedChanges))
177 if len(detectedChanges) > 0 {
178 r.measurementCh <- metrics.MeasurementEnvelope{
179 DBName: md.Name,
180 MetricName: "table_changes",
181 Data: detectedChanges,
182 CustomTags: md.CustomTags,
183 }
184 }
185 return changeCounts
186 }
187
188 func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
189 detectedChanges := make(metrics.Measurements, 0)
190 var firstRun bool
191 var changeCounts ChangeDetectionResults
192 l := log.GetLogger(ctx)
193 changeCounts.Target = "indexes"
194 l.Debug("checking for index changes...")
195 if _, ok := md.ChangeState["index_hashes"]; !ok {
196 firstRun = true
197 md.ChangeState["index_hashes"] = make(map[string]string)
198 }
199 mvp, ok := metricDefs.GetMetricDef("index_hashes")
200 if !ok {
201 l.Error("could not get index_hashes sql")
202 return changeCounts
203 }
204 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
205 if err != nil {
206 l.Error(err)
207 return changeCounts
208 }
209 for _, dr := range data {
210 objIdent := dr["tag_index"].(string)
211 prevHash, ok := md.ChangeState["index_hashes"][objIdent]
212 ll := l.WithField("index", dr["tag_index"]).WithField("table", dr["table"])
213 if ok {
214 if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
215 ll.Debug("change detected")
216 dr["event"] = "alter"
217 detectedChanges = append(detectedChanges, dr)
218 md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
219 changeCounts.Altered++
220 }
221 } else {
222 if !firstRun {
223 ll.Debug("new index detected")
224 dr["event"] = "create"
225 detectedChanges = append(detectedChanges, dr)
226 changeCounts.Created++
227 }
228 md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
229 }
230 }
231
232 if !firstRun && len(md.ChangeState["index_hashes"]) != len(data) {
233 deletedIndexes := make([]string, 0)
234
235 currentIndexMap := make(map[string]bool)
236 for _, dr := range data {
237 currentIndexMap[dr["tag_index"].(string)] = true
238 }
239 for indexName := range md.ChangeState["index_hashes"] {
240 _, ok := currentIndexMap[indexName]
241 if !ok {
242 l.WithField("index", indexName).Debug("deleted index detected")
243 influxEntry := metrics.NewMeasurement(data.GetEpoch())
244 influxEntry["event"] = "drop"
245 influxEntry["tag_index"] = indexName
246 detectedChanges = append(detectedChanges, influxEntry)
247 deletedIndexes = append(deletedIndexes, indexName)
248 changeCounts.Dropped++
249 }
250 }
251 for _, deletedIndex := range deletedIndexes {
252 delete(md.ChangeState["index_hashes"], deletedIndex)
253 }
254 }
255 l.Debugf("index changes detected: %d", len(detectedChanges))
256 if len(detectedChanges) > 0 {
257 r.measurementCh <- metrics.MeasurementEnvelope{
258 DBName: md.Name,
259 MetricName: "index_changes",
260 Data: detectedChanges,
261 CustomTags: md.CustomTags,
262 }
263 }
264 return changeCounts
265 }
266
267 func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
268 detectedChanges := make(metrics.Measurements, 0)
269 var firstRun bool
270 var changeCounts ChangeDetectionResults
271 l := log.GetLogger(ctx)
272 changeCounts.Target = "privileges"
273 l.Debug("checking object privilege changes...")
274 if _, ok := md.ChangeState["object_privileges"]; !ok {
275 firstRun = true
276 md.ChangeState["object_privileges"] = make(map[string]string)
277 }
278 mvp, ok := metricDefs.GetMetricDef("privilege_changes")
279 if !ok || mvp.GetSQL(int(md.Version)) == "" {
280 l.Warning("could not get SQL for 'privilege_changes'. cannot detect privilege changes")
281 return changeCounts
282 }
283
284 data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
285 if err != nil {
286 l.Error(err)
287 return changeCounts
288 }
289 currentState := make(map[string]bool)
290 for _, dr := range data {
291 objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
292 ll := l.WithField("role", dr["tag_role"]).
293 WithField("object_type", dr["object_type"]).
294 WithField("object", dr["tag_object"]).
295 WithField("privilege_type", dr["privilege_type"])
296 if firstRun {
297 md.ChangeState["object_privileges"][objIdent] = ""
298 } else {
299 _, ok := md.ChangeState["object_privileges"][objIdent]
300 if !ok {
301 ll.Debug("new object privileges detected")
302 dr["event"] = "GRANT"
303 detectedChanges = append(detectedChanges, dr)
304 changeCounts.Created++
305 md.ChangeState["object_privileges"][objIdent] = ""
306 }
307 currentState[objIdent] = true
308 }
309 }
310
311 if !firstRun && len(currentState) > 0 {
312 for objPrevRun := range md.ChangeState["object_privileges"] {
313 if _, ok := currentState[objPrevRun]; !ok {
314 splits := strings.Split(objPrevRun, "#:#")
315 l.WithField("role", splits[1]).
316 WithField("object_type", splits[0]).
317 WithField("object", splits[2]).
318 WithField("privilege_type", splits[3]).
319 Debug("removed object privileges detected")
320 revokeEntry := metrics.NewMeasurement(data.GetEpoch())
321 revokeEntry["object_type"] = splits[0]
322 revokeEntry["tag_role"] = splits[1]
323 revokeEntry["tag_object"] = splits[2]
324 revokeEntry["privilege_type"] = splits[3]
325 revokeEntry["event"] = "REVOKE"
326 detectedChanges = append(detectedChanges, revokeEntry)
327 changeCounts.Dropped++
328 delete(md.ChangeState["object_privileges"], objPrevRun)
329 }
330 }
331 }
332 l.Debugf("object privilege changes detected: %d", len(detectedChanges))
333 if len(detectedChanges) > 0 {
334 r.measurementCh <- metrics.MeasurementEnvelope{
335 DBName: md.Name,
336 MetricName: "privilege_changes",
337 Data: detectedChanges,
338 CustomTags: md.CustomTags,
339 }
340 }
341 return changeCounts
342 }
343
344 func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
345 detectedChanges := make(metrics.Measurements, 0)
346 var firstRun bool
347 var changeCounts ChangeDetectionResults
348 l := log.GetLogger(ctx)
349 changeCounts.Target = "settings"
350 l.Debug("checking for configuration changes...")
351 if _, ok := md.ChangeState["configuration_hashes"]; !ok {
352 firstRun = true
353 md.ChangeState["configuration_hashes"] = make(map[string]string)
354 }
355 mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
356 if !ok {
357 l.Error("could not get configuration_hashes sql")
358 return changeCounts
359 }
360 rows, err := md.Conn.Query(ctx, mvp.GetSQL(md.Version))
361 if err != nil {
362 l.Error(err)
363 return changeCounts
364 }
365 defer rows.Close()
366 var (
367 objIdent, objValue string
368 epoch int64
369 )
370 for rows.Next() {
371 if rows.Scan(&epoch, &objIdent, &objValue) != nil {
372 return changeCounts
373 }
374 prevРash, ok := md.ChangeState["configuration_hashes"][objIdent]
375 ll := l.WithField("setting", objIdent)
376 if ok {
377 if prevРash != objValue {
378 ll.Warningf("settings change detected: %s = %s (prev: %s)", objIdent, objValue, prevРash)
379 detectedChanges = append(detectedChanges, metrics.Measurement{
380 metrics.EpochColumnName: epoch,
381 "tag_setting": objIdent,
382 "value": objValue,
383 "event": "alter"})
384 md.ChangeState["configuration_hashes"][objIdent] = objValue
385 changeCounts.Altered++
386 }
387 } else {
388 md.ChangeState["configuration_hashes"][objIdent] = objValue
389 if firstRun {
390 continue
391 }
392 ll.Debug("new setting detected")
393 detectedChanges = append(detectedChanges, metrics.Measurement{
394 metrics.EpochColumnName: epoch,
395 "tag_setting": objIdent,
396 "value": objValue,
397 "event": "create"})
398 changeCounts.Created++
399 }
400 }
401 l.Debugf("configuration changes detected: %d", len(detectedChanges))
402 if len(detectedChanges) > 0 {
403 r.measurementCh <- metrics.MeasurementEnvelope{
404 DBName: md.Name,
405 MetricName: "configuration_changes",
406 Data: detectedChanges,
407 CustomTags: md.CustomTags,
408 }
409 }
410 return changeCounts
411 }
412
413
414
415 func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
416 return metrics.Measurements{
417 metrics.Measurement{
418 metrics.EpochColumnName: time.Now().UnixNano(),
419 "instance_up": func() int {
420 if md.Conn.Ping(ctx) == nil {
421 return 1
422 }
423 return 0
424 }(),
425 },
426 }, nil
427 }
428
429 func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
430 md.Lock()
431 defer md.Unlock()
432 spN := r.DetectSprocChanges(ctx, md)
433 tblN := r.DetectTableChanges(ctx, md)
434 idxN := r.DetectIndexChanges(ctx, md)
435 cnfN := r.DetectConfigurationChanges(ctx, md)
436 privN := r.DetectPrivilegeChanges(ctx, md)
437 if spN.Total()+tblN.Total()+idxN.Total()+cnfN.Total()+privN.Total() == 0 {
438 return nil, nil
439 }
440 m := metrics.NewMeasurement(time.Now().UnixNano())
441 m["details"] = strings.Join([]string{spN.String(), tblN.String(), idxN.String(), cnfN.String(), privN.String()}, " ")
442 return metrics.Measurements{m}, nil
443 }
444 func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool) {
445 for _, prevDB := range r.prevLoopMonitoredDBs {
446 if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil {
447 prevDB.Conn.Close()
448 _ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
449 }
450 }
451 for toShutDownDB := range hostsToShutDown {
452 if db := r.monitoredSources.GetMonitoredDatabase(toShutDownDB); db != nil {
453 db.Conn.Close()
454 }
455 _ = r.SinksWriter.SyncMetric(toShutDownDB, "", sinks.DeleteOp)
456 }
457 }
458