1 package reaper
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "math"
8 "regexp"
9 "strconv"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
17 "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
18 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
19 "github.com/jackc/pgx/v5"
20 )
21
22 func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
23 var conn db.PgxIface
24 var md *sources.SourceConn
25 var err error
26 var tx pgx.Tx
27 if strings.TrimSpace(sql) == "" {
28 return nil, errors.New("empty SQL")
29 }
30 if md, err = GetMonitoredDatabaseByUniqueName(dbUnique); err != nil {
31 return nil, err
32 }
33 conn = md.Conn
34 if md.IsPostgresSource() {
35
36 if tx, err = conn.Begin(ctx); err != nil {
37 return nil, err
38 }
39 defer func() { _ = tx.Commit(ctx) }()
40 _, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
41 if err != nil {
42 return nil, err
43 }
44 conn = tx
45 }
46 rows, err := conn.Query(ctx, sql, args...)
47 if err == nil {
48 return pgx.CollectRows(rows, pgx.RowToMap)
49 }
50 return nil, err
51 }
52
53 const (
54 execEnvUnknown = "UNKNOWN"
55 execEnvAzureSingle = "AZURE_SINGLE"
56 execEnvAzureFlexible = "AZURE_FLEXIBLE"
57 execEnvGoogle = "GOOGLE"
58 )
59
60 func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error) {
61 sqlDbSize := `select /* pgwatch_generated */ pg_database_size(current_database());`
62 var sizeMB int64
63
64 lastDBSizeCheckLock.RLock()
65 lastDBSizeCheckTime := lastDBSizeFetchTime[dbUnique]
66 lastDBSize, ok := lastDBSizeMB[dbUnique]
67 lastDBSizeCheckLock.RUnlock()
68
69 if !ok || lastDBSizeCheckTime.Add(dbSizeCachingInterval).Before(time.Now()) {
70 md, err := GetMonitoredDatabaseByUniqueName(dbUnique)
71 if err != nil {
72 return 0, err
73 }
74 ver, err := GetMonitoredDatabaseSettings(ctx, md, false)
75 if err != nil || (ver.ExecEnv != execEnvAzureSingle) || (ver.ExecEnv == execEnvAzureSingle && ver.ApproxDBSizeB < 1e12) {
76 log.GetLogger(ctx).Debugf("[%s] determining DB size ...", dbUnique)
77
78 data, err := QueryMeasurements(ctx, dbUnique, sqlDbSize)
79 if err != nil {
80 log.GetLogger(ctx).Errorf("[%s] failed to determine DB size...cannot apply --min-db-size-mb flag. err: %v ...", dbUnique, err)
81 return 0, err
82 }
83 sizeMB = data[0]["pg_database_size"].(int64) / 1048576
84 } else {
85 log.GetLogger(ctx).Debugf("[%s] Using approx DB size for the --min-db-size-mb filter ...", dbUnique)
86 sizeMB = ver.ApproxDBSizeB / 1048576
87 }
88
89 log.GetLogger(ctx).Debugf("[%s] DB size = %d MB, caching for %v ...", dbUnique, sizeMB, dbSizeCachingInterval)
90
91 lastDBSizeCheckLock.Lock()
92 lastDBSizeFetchTime[dbUnique] = time.Now()
93 lastDBSizeMB[dbUnique] = sizeMB
94 lastDBSizeCheckLock.Unlock()
95
96 return sizeMB, nil
97
98 }
99 log.GetLogger(ctx).Debugf("[%s] using cached DBsize %d MB for the --min-db-size-mb filter check", dbUnique, lastDBSize)
100 return lastDBSize, nil
101 }
102
103
104
105
106
107 var regVer = regexp.MustCompile(`(\d+).?(\d*).?(\d*)`)
108
109 func VersionToInt(version string) (v int) {
110 if matches := regVer.FindStringSubmatch(version); len(matches) > 1 {
111 for i, match := range matches[1:] {
112 v += func() (m int) { m, _ = strconv.Atoi(match); return }() * int(math.Pow10(4-i*2))
113 }
114 }
115 return
116 }
117
118 var rBouncerAndPgpoolVerMatch = regexp.MustCompile(`\d+\.+\d+`)
119
120 func GetMonitoredDatabaseSettings(ctx context.Context, md *sources.SourceConn, noCache bool) (MonitoredDatabaseSettings, error) {
121 var dbSettings MonitoredDatabaseSettings
122 var dbNewSettings MonitoredDatabaseSettings
123 var ok bool
124
125 l := log.GetLogger(ctx).WithField("source", md.Name).WithField("kind", md.Kind)
126
127 sqlExtensions := `select /* pgwatch_generated */ extname::text, (regexp_matches(extversion, $$\d+\.?\d+?$$))[1]::text as extversion from pg_extension order by 1;`
128
129 MonitoredDatabasesSettingsLock.Lock()
130 getVerLock, ok := MonitoredDatabasesSettingsGetLock[md.Name]
131 if !ok {
132 MonitoredDatabasesSettingsGetLock[md.Name] = &sync.RWMutex{}
133 getVerLock = MonitoredDatabasesSettingsGetLock[md.Name]
134 }
135 dbSettings, ok = MonitoredDatabasesSettings[md.Name]
136 MonitoredDatabasesSettingsLock.Unlock()
137
138 if !noCache && ok && dbSettings.LastCheckedOn.After(time.Now().Add(time.Minute*-2)) {
139 return dbSettings, nil
140 }
141 getVerLock.Lock()
142 defer getVerLock.Unlock()
143 l.Debug("determining DB version and recovery status...")
144
145 if dbNewSettings.Extensions == nil {
146 dbNewSettings.Extensions = make(map[string]int)
147 }
148
149 switch md.Kind {
150 case sources.SourcePgBouncer:
151 if err := md.Conn.QueryRow(ctx, "SHOW VERSION").Scan(&dbNewSettings.VersionStr); err != nil {
152 return dbNewSettings, err
153 }
154 matches := rBouncerAndPgpoolVerMatch.FindStringSubmatch(dbNewSettings.VersionStr)
155 if len(matches) != 1 {
156 return dbSettings, fmt.Errorf("unexpected PgBouncer version input: %s", dbNewSettings.VersionStr)
157 }
158 dbNewSettings.Version = VersionToInt(matches[0])
159 case sources.SourcePgPool:
160 if err := md.Conn.QueryRow(ctx, "SHOW POOL_VERSION").Scan(&dbNewSettings.VersionStr); err != nil {
161 return dbNewSettings, err
162 }
163
164 matches := rBouncerAndPgpoolVerMatch.FindStringSubmatch(dbNewSettings.VersionStr)
165 if len(matches) != 1 {
166 return dbSettings, fmt.Errorf("unexpected PgPool version input: %s", dbNewSettings.VersionStr)
167 }
168 dbNewSettings.Version = VersionToInt(matches[0])
169 default:
170 sql := `select /* pgwatch_generated */
171 div(current_setting('server_version_num')::int, 10000) as ver,
172 version(),
173 pg_is_in_recovery(),
174 current_database()::TEXT,
175 system_identifier,
176 current_setting('is_superuser')::bool
177 FROM
178 pg_control_system()`
179
180 err := md.Conn.QueryRow(ctx, sql).
181 Scan(&dbNewSettings.Version, &dbNewSettings.VersionStr,
182 &dbNewSettings.IsInRecovery, &dbNewSettings.RealDbname,
183 &dbNewSettings.SystemIdentifier, &dbNewSettings.IsSuperuser)
184 if err != nil {
185 if noCache {
186 return dbSettings, err
187 }
188 l.Error("DBGetPGVersion failed, using old cached value: ", err)
189 return dbSettings, nil
190 }
191
192 if dbSettings.ExecEnv != "" {
193 dbNewSettings.ExecEnv = dbSettings.ExecEnv
194 } else {
195 l.Debugf("determining the execution env...")
196 dbNewSettings.ExecEnv = md.DiscoverPlatform(ctx)
197 }
198
199
200 if dbNewSettings.ExecEnv == execEnvAzureSingle {
201 if approxSize, err := md.GetApproxSize(ctx); err == nil {
202 dbNewSettings.ApproxDBSizeB = approxSize
203 } else {
204 dbNewSettings.ApproxDBSizeB = dbSettings.ApproxDBSizeB
205 }
206 }
207
208 l.Debugf("[%s] determining installed extensions info...", md.Name)
209 data, err := QueryMeasurements(ctx, md.Name, sqlExtensions)
210 if err != nil {
211 l.Errorf("[%s] failed to determine installed extensions info: %v", md.Name, err)
212 } else {
213 for _, dr := range data {
214 extver := VersionToInt(dr["extversion"].(string))
215 if extver == 0 {
216 l.Error("failed to determine extension version info for extension: ", dr["extname"])
217 continue
218 }
219 dbNewSettings.Extensions[dr["extname"].(string)] = extver
220 }
221 l.Debugf("[%s] installed extensions: %+v", md.Name, dbNewSettings.Extensions)
222 }
223
224 }
225
226 dbNewSettings.LastCheckedOn = time.Now()
227 MonitoredDatabasesSettingsLock.Lock()
228 MonitoredDatabasesSettings[md.Name] = dbNewSettings
229 MonitoredDatabasesSettingsLock.Unlock()
230
231 return dbNewSettings, nil
232 }
233
234 func DetectSprocChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
235 detectedChanges := make(metrics.Measurements, 0)
236 var firstRun bool
237 var changeCounts ChangeDetectionResults
238
239 log.GetLogger(ctx).Debugf("[%s][%s] checking for sproc changes...", dbUnique, specialMetricChangeEvents)
240 if _, ok := hostState["sproc_hashes"]; !ok {
241 firstRun = true
242 hostState["sproc_hashes"] = make(map[string]string)
243 }
244
245 mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
246 if !ok {
247 log.GetLogger(ctx).Error("could not get sproc_hashes sql")
248 return changeCounts
249 }
250
251 data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
252 if err != nil {
253 log.GetLogger(ctx).Error("could not read sproc_hashes from monitored host: ", dbUnique, ", err:", err)
254 return changeCounts
255 }
256
257 for _, dr := range data {
258 objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
259 prevHash, ok := hostState["sproc_hashes"][objIdent]
260 if ok {
261 if prevHash != dr["md5"].(string) {
262 log.GetLogger(ctx).Info("detected change in sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
263 dr["event"] = "alter"
264 detectedChanges = append(detectedChanges, dr)
265 hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
266 changeCounts.Altered++
267 }
268 } else {
269 if !firstRun {
270 log.GetLogger(ctx).Info("detected new sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
271 dr["event"] = "create"
272 detectedChanges = append(detectedChanges, dr)
273 changeCounts.Created++
274 }
275 hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
276 }
277 }
278
279 if !firstRun && len(hostState["sproc_hashes"]) != len(data) {
280 deletedSProcs := make([]string, 0)
281
282 currentOidMap := make(map[string]bool)
283 for _, dr := range data {
284 currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
285 }
286 for sprocIdent := range hostState["sproc_hashes"] {
287 _, ok := currentOidMap[sprocIdent]
288 if !ok {
289 splits := strings.Split(sprocIdent, dbMetricJoinStr)
290 log.GetLogger(ctx).Info("detected delete of sproc:", splits[0], ", oid:", splits[1])
291 influxEntry := metrics.NewMeasurement(data.GetEpoch())
292 influxEntry["event"] = "drop"
293 influxEntry["tag_sproc"] = splits[0]
294 influxEntry["tag_oid"] = splits[1]
295 detectedChanges = append(detectedChanges, influxEntry)
296 deletedSProcs = append(deletedSProcs, sprocIdent)
297 changeCounts.Dropped++
298 }
299 }
300 for _, deletedSProc := range deletedSProcs {
301 delete(hostState["sproc_hashes"], deletedSProc)
302 }
303 }
304 log.GetLogger(ctx).Debugf("[%s][%s] detected %d sproc changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
305 if len(detectedChanges) > 0 {
306 md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
307 storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique, MetricName: "sproc_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
308 }
309
310 return changeCounts
311 }
312
313 func DetectTableChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
314 detectedChanges := make(metrics.Measurements, 0)
315 var firstRun bool
316 var changeCounts ChangeDetectionResults
317
318 log.GetLogger(ctx).Debugf("[%s][%s] checking for table changes...", dbUnique, specialMetricChangeEvents)
319 if _, ok := hostState["table_hashes"]; !ok {
320 firstRun = true
321 hostState["table_hashes"] = make(map[string]string)
322 }
323
324 mvp, ok := metricDefs.GetMetricDef("table_hashes")
325 if !ok {
326 log.GetLogger(ctx).Error("could not get table_hashes sql")
327 return changeCounts
328 }
329
330 data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
331 if err != nil {
332 log.GetLogger(ctx).Error("could not read table_hashes from monitored host:", dbUnique, ", err:", err)
333 return changeCounts
334 }
335
336 for _, dr := range data {
337 objIdent := dr["tag_table"].(string)
338 prevHash, ok := hostState["table_hashes"][objIdent]
339
340 if ok {
341 if prevHash != dr["md5"].(string) {
342 log.GetLogger(ctx).Info("detected DDL change in table:", dr["tag_table"])
343 dr["event"] = "alter"
344 detectedChanges = append(detectedChanges, dr)
345 hostState["table_hashes"][objIdent] = dr["md5"].(string)
346 changeCounts.Altered++
347 }
348 } else {
349 if !firstRun {
350 log.GetLogger(ctx).Info("detected new table:", dr["tag_table"])
351 dr["event"] = "create"
352 detectedChanges = append(detectedChanges, dr)
353 changeCounts.Created++
354 }
355 hostState["table_hashes"][objIdent] = dr["md5"].(string)
356 }
357 }
358
359 if !firstRun && len(hostState["table_hashes"]) != len(data) {
360 deletedTables := make([]string, 0)
361
362 currentTableMap := make(map[string]bool)
363 for _, dr := range data {
364 currentTableMap[dr["tag_table"].(string)] = true
365 }
366 for table := range hostState["table_hashes"] {
367 _, ok := currentTableMap[table]
368 if !ok {
369 log.GetLogger(ctx).Info("detected drop of table:", table)
370 influxEntry := metrics.NewMeasurement(data.GetEpoch())
371 influxEntry["event"] = "drop"
372 influxEntry["tag_table"] = table
373 detectedChanges = append(detectedChanges, influxEntry)
374 deletedTables = append(deletedTables, table)
375 changeCounts.Dropped++
376 }
377 }
378 for _, deletedTable := range deletedTables {
379 delete(hostState["table_hashes"], deletedTable)
380 }
381 }
382
383 log.GetLogger(ctx).Debugf("[%s][%s] detected %d table changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
384 if len(detectedChanges) > 0 {
385 md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
386 storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique, MetricName: "table_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
387 }
388
389 return changeCounts
390 }
391
392 func DetectIndexChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
393 detectedChanges := make(metrics.Measurements, 0)
394 var firstRun bool
395 var changeCounts ChangeDetectionResults
396
397 log.GetLogger(ctx).Debugf("[%s][%s] checking for index changes...", dbUnique, specialMetricChangeEvents)
398 if _, ok := hostState["index_hashes"]; !ok {
399 firstRun = true
400 hostState["index_hashes"] = make(map[string]string)
401 }
402
403 mvp, ok := metricDefs.GetMetricDef("index_hashes")
404 if !ok {
405 log.GetLogger(ctx).Error("could not get index_hashes sql")
406 return changeCounts
407 }
408
409 data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
410 if err != nil {
411 log.GetLogger(ctx).Error("could not read index_hashes from monitored host:", dbUnique, ", err:", err)
412 return changeCounts
413 }
414
415 for _, dr := range data {
416 objIdent := dr["tag_index"].(string)
417 prevHash, ok := hostState["index_hashes"][objIdent]
418 if ok {
419 if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
420 log.GetLogger(ctx).Info("detected index change:", dr["tag_index"], ", table:", dr["table"])
421 dr["event"] = "alter"
422 detectedChanges = append(detectedChanges, dr)
423 hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
424 changeCounts.Altered++
425 }
426 } else {
427 if !firstRun {
428 log.GetLogger(ctx).Info("detected new index:", dr["tag_index"])
429 dr["event"] = "create"
430 detectedChanges = append(detectedChanges, dr)
431 changeCounts.Created++
432 }
433 hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
434 }
435 }
436
437 if !firstRun && len(hostState["index_hashes"]) != len(data) {
438 deletedIndexes := make([]string, 0)
439
440 currentIndexMap := make(map[string]bool)
441 for _, dr := range data {
442 currentIndexMap[dr["tag_index"].(string)] = true
443 }
444 for indexName := range hostState["index_hashes"] {
445 _, ok := currentIndexMap[indexName]
446 if !ok {
447 log.GetLogger(ctx).Info("detected drop of index_name:", indexName)
448 influxEntry := metrics.NewMeasurement(data.GetEpoch())
449 influxEntry["event"] = "drop"
450 influxEntry["tag_index"] = indexName
451 detectedChanges = append(detectedChanges, influxEntry)
452 deletedIndexes = append(deletedIndexes, indexName)
453 changeCounts.Dropped++
454 }
455 }
456 for _, deletedIndex := range deletedIndexes {
457 delete(hostState["index_hashes"], deletedIndex)
458 }
459 }
460 log.GetLogger(ctx).Debugf("[%s][%s] detected %d index changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
461 if len(detectedChanges) > 0 {
462 md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
463 storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique, MetricName: "index_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
464 }
465
466 return changeCounts
467 }
468
469 func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
470 detectedChanges := make(metrics.Measurements, 0)
471 var firstRun bool
472 var changeCounts ChangeDetectionResults
473
474 log.GetLogger(ctx).Debugf("[%s][%s] checking object privilege changes...", dbUnique, specialMetricChangeEvents)
475 if _, ok := hostState["object_privileges"]; !ok {
476 firstRun = true
477 hostState["object_privileges"] = make(map[string]string)
478 }
479
480 mvp, ok := metricDefs.GetMetricDef("privilege_changes")
481 if !ok || mvp.GetSQL(int(vme.Version)) == "" {
482 log.GetLogger(ctx).Warningf("[%s][%s] could not get SQL for 'privilege_changes'. cannot detect privilege changes", dbUnique, specialMetricChangeEvents)
483 return changeCounts
484 }
485
486
487 data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
488 if err != nil {
489 log.GetLogger(ctx).Errorf("[%s][%s] failed to fetch object privileges info: %v", dbUnique, specialMetricChangeEvents, err)
490 return changeCounts
491 }
492
493 currentState := make(map[string]bool)
494 for _, dr := range data {
495 objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
496 if firstRun {
497 hostState["object_privileges"][objIdent] = ""
498 } else {
499 _, ok := hostState["object_privileges"][objIdent]
500 if !ok {
501 log.GetLogger(ctx).Infof("[%s][%s] detected new object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
502 dbUnique, specialMetricChangeEvents, dr["tag_role"], dr["object_type"], dr["tag_object"], dr["privilege_type"])
503 dr["event"] = "GRANT"
504 detectedChanges = append(detectedChanges, dr)
505 changeCounts.Created++
506 hostState["object_privileges"][objIdent] = ""
507 }
508 currentState[objIdent] = true
509 }
510 }
511
512 if !firstRun && len(currentState) > 0 {
513 for objPrevRun := range hostState["object_privileges"] {
514 if _, ok := currentState[objPrevRun]; !ok {
515 splits := strings.Split(objPrevRun, "#:#")
516 log.GetLogger(ctx).Infof("[%s][%s] detected removed object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
517 dbUnique, specialMetricChangeEvents, splits[1], splits[0], splits[2], splits[3])
518 revokeEntry := metrics.NewMeasurement(data.GetEpoch())
519 revokeEntry["object_type"] = splits[0]
520 revokeEntry["tag_role"] = splits[1]
521 revokeEntry["tag_object"] = splits[2]
522 revokeEntry["privilege_type"] = splits[3]
523 revokeEntry["event"] = "REVOKE"
524 detectedChanges = append(detectedChanges, revokeEntry)
525 changeCounts.Dropped++
526 delete(hostState["object_privileges"], objPrevRun)
527 }
528 }
529 }
530
531 log.GetLogger(ctx).Debugf("[%s][%s] detected %d object privilege changes...", dbUnique, specialMetricChangeEvents, len(detectedChanges))
532 if len(detectedChanges) > 0 {
533 md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
534 storageCh <- []metrics.MeasurementEnvelope{
535 {
536 DBName: dbUnique,
537 MetricName: "privilege_changes",
538 Data: detectedChanges,
539 CustomTags: md.CustomTags,
540 }}
541 }
542
543 return changeCounts
544 }
545
546 func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
547 detectedChanges := make(metrics.Measurements, 0)
548 var firstRun bool
549 var changeCounts ChangeDetectionResults
550
551 log.GetLogger(ctx).Debugf("[%s][%s] checking for configuration changes...", dbUnique, specialMetricChangeEvents)
552 if _, ok := hostState["configuration_hashes"]; !ok {
553 firstRun = true
554 hostState["configuration_hashes"] = make(map[string]string)
555 }
556
557 mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
558 if !ok {
559 log.GetLogger(ctx).Errorf("[%s][%s] could not get configuration_hashes sql", dbUnique, specialMetricChangeEvents)
560 return changeCounts
561 }
562
563 data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
564 if err != nil {
565 log.GetLogger(ctx).Errorf("[%s][%s] could not read configuration_hashes from monitored host: %v", dbUnique, specialMetricChangeEvents, err)
566 return changeCounts
567 }
568
569 for _, dr := range data {
570 objIdent := dr["tag_setting"].(string)
571 objValue := dr["value"].(string)
572 prevРash, ok := hostState["configuration_hashes"][objIdent]
573 if ok {
574 if prevРash != objValue {
575 if objIdent == "connection_ID" {
576 continue
577 }
578 log.GetLogger(ctx).Warningf("[%s][%s] detected settings change: %s = %s (prev: %s)",
579 dbUnique, specialMetricChangeEvents, objIdent, objValue, prevРash)
580 dr["event"] = "alter"
581 detectedChanges = append(detectedChanges, dr)
582 hostState["configuration_hashes"][objIdent] = objValue
583 changeCounts.Altered++
584 }
585 } else {
586 if !firstRun {
587 log.GetLogger(ctx).Warningf("[%s][%s] detected new setting: %s", dbUnique, specialMetricChangeEvents, objIdent)
588 dr["event"] = "create"
589 detectedChanges = append(detectedChanges, dr)
590 changeCounts.Created++
591 }
592 hostState["configuration_hashes"][objIdent] = objValue
593 }
594 }
595
596 log.GetLogger(ctx).Debugf("[%s][%s] detected %d configuration changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
597 if len(detectedChanges) > 0 {
598 md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
599 storageCh <- []metrics.MeasurementEnvelope{{
600 DBName: dbUnique,
601 MetricName: "configuration_changes",
602 Data: detectedChanges,
603 CustomTags: md.CustomTags,
604 }}
605 }
606
607 return changeCounts
608 }
609
610 func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, hostState map[string]map[string]string) {
611 storageCh := r.measurementCh
612 sprocCounts := DetectSprocChanges(ctx, dbUnique, vme, storageCh, hostState)
613 tableCounts := DetectTableChanges(ctx, dbUnique, vme, storageCh, hostState)
614 indexCounts := DetectIndexChanges(ctx, dbUnique, vme, storageCh, hostState)
615 confCounts := DetectConfigurationChanges(ctx, dbUnique, vme, storageCh, hostState)
616 privChangeCounts := DetectPrivilegeChanges(ctx, dbUnique, vme, storageCh, hostState)
617
618
619 message := ""
620 if sprocCounts.Altered > 0 || sprocCounts.Created > 0 || sprocCounts.Dropped > 0 {
621 message += fmt.Sprintf(" sprocs %d/%d/%d", sprocCounts.Created, sprocCounts.Altered, sprocCounts.Dropped)
622 }
623 if tableCounts.Altered > 0 || tableCounts.Created > 0 || tableCounts.Dropped > 0 {
624 message += fmt.Sprintf(" tables/views %d/%d/%d", tableCounts.Created, tableCounts.Altered, tableCounts.Dropped)
625 }
626 if indexCounts.Altered > 0 || indexCounts.Created > 0 || indexCounts.Dropped > 0 {
627 message += fmt.Sprintf(" indexes %d/%d/%d", indexCounts.Created, indexCounts.Altered, indexCounts.Dropped)
628 }
629 if confCounts.Altered > 0 || confCounts.Created > 0 {
630 message += fmt.Sprintf(" configuration %d/%d/%d", confCounts.Created, confCounts.Altered, confCounts.Dropped)
631 }
632 if privChangeCounts.Dropped > 0 || privChangeCounts.Created > 0 {
633 message += fmt.Sprintf(" privileges %d/%d/%d", privChangeCounts.Created, privChangeCounts.Altered, privChangeCounts.Dropped)
634 }
635
636 if message > "" {
637 message = "Detected changes for \"" + dbUnique + "\" [Created/Altered/Dropped]:" + message
638 log.GetLogger(ctx).Info(message)
639 detectedChangesSummary := make(metrics.Measurements, 0)
640 influxEntry := metrics.NewMeasurement(time.Now().UnixNano())
641 influxEntry["details"] = message
642 detectedChangesSummary = append(detectedChangesSummary, influxEntry)
643 md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
644 storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique,
645 SourceType: string(md.Kind),
646 MetricName: "object_changes",
647 Data: detectedChangesSummary,
648 CustomTags: md.CustomTags,
649 }}
650
651 }
652 }
653
654
655 func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.Measurements, error) {
656 var retData = make(metrics.Measurements, 0)
657 epochNs := time.Now().UnixNano()
658
659 sqlLines := strings.Split(strings.ToUpper(mvp.GetSQL(int(vme.Version))), "\n")
660
661 for _, sql := range sqlLines {
662 if strings.HasPrefix(sql, "SHOW POOL_NODES") {
663 data, err := QueryMeasurements(ctx, msg.DBUniqueName, sql)
664 if err != nil {
665 log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
666 return data, err
667 }
668
669 for _, row := range data {
670 retRow := metrics.NewMeasurement(epochNs)
671 for k, v := range row {
672 vs := string(v.([]byte))
673
674 if k == "node_id" {
675 retRow["tag_node_id"] = vs
676 continue
677 }
678
679 retRow[k] = vs
680 if k == "status" {
681
682 switch vs {
683 case "up":
684 retRow["status_num"] = 1
685 case "down":
686 retRow["status_num"] = 0
687 default:
688 i, err := strconv.ParseInt(vs, 10, 64)
689 if err == nil {
690 retRow["status_num"] = i
691 }
692 }
693 continue
694 }
695
696 if k != "lb_weight" {
697 i, err := strconv.ParseInt(vs, 10, 64)
698 if err == nil {
699 retRow[k] = i
700 continue
701 }
702 }
703 f, err := strconv.ParseFloat(vs, 64)
704 if err == nil {
705 retRow[k] = f
706 continue
707 }
708 }
709 retData = append(retData, retRow)
710 }
711 } else if strings.HasPrefix(sql, "SHOW POOL_PROCESSES") {
712 if len(retData) == 0 {
713 log.GetLogger(ctx).Warningf("[%s][%s] SHOW POOL_NODES needs to be placed before SHOW POOL_PROCESSES. ignoring SHOW POOL_PROCESSES", msg.DBUniqueName, msg.MetricName)
714 continue
715 }
716
717 data, err := QueryMeasurements(ctx, msg.DBUniqueName, sql)
718 if err != nil {
719 log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
720 continue
721 }
722
723
724 processesTotal := 0
725 processesActive := 0
726 for _, row := range data {
727 processesTotal++
728 v, ok := row["database"]
729 if !ok {
730 log.GetLogger(ctx).Infof("[%s][%s] column 'database' not found from data returned by SHOW POOL_PROCESSES, check pool version / SQL definition", msg.DBUniqueName, msg.MetricName)
731 continue
732 }
733 if len(v.([]byte)) > 0 {
734 processesActive++
735 }
736 }
737
738 for _, retRow := range retData {
739 retRow["processes_total"] = processesTotal
740 retRow["processes_active"] = processesActive
741 }
742 }
743 }
744 return retData, nil
745 }
746
747
748
749
750 func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string {
751 sqlAvailable := `select name::text from pg_available_extensions`
752 extsCreated := make([]string, 0)
753
754
755 data, err := QueryMeasurements(ctx, dbUnique, sqlAvailable)
756 if err != nil {
757 log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", dbUnique, err)
758 return extsCreated
759 }
760
761 availableExts := make(map[string]bool)
762 for _, row := range data {
763 availableExts[row["name"].(string)] = true
764 }
765
766 for _, extToCreate := range extensionNames {
767 if _, ok := existingExtensions[extToCreate]; ok {
768 continue
769 }
770 _, ok := availableExts[extToCreate]
771 if !ok {
772 log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", dbUnique, extToCreate)
773 } else {
774 sqlCreateExt := `create extension ` + extToCreate
775 _, err := QueryMeasurements(ctx, dbUnique, sqlCreateExt)
776 if err != nil {
777 log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", dbUnique, extToCreate, err)
778 }
779 extsCreated = append(extsCreated, extToCreate)
780 }
781 }
782
783 return extsCreated
784 }
785
786
787 func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
788 metricConfig := func() map[string]float64 {
789 if len(md.Metrics) > 0 {
790 return md.Metrics
791 }
792 if md.PresetMetrics > "" {
793 return metricDefs.GetPresetMetrics(md.PresetMetrics)
794 }
795 return nil
796 }()
797 conf, err := pgx.ParseConfig(md.ConnStr)
798 if err != nil {
799 return err
800 }
801 conf.DefaultQueryExecMode = pgx.QueryExecModeExec
802 c, err := pgx.ConnectConfig(ctx, conf)
803 if err != nil {
804 return nil
805 }
806 defer c.Close(ctx)
807
808 for metricName := range metricConfig {
809 Metric, ok := metricDefs.GetMetricDef(metricName)
810 if !ok {
811 continue
812 }
813
814 _, err = c.Exec(ctx, Metric.InitSQL)
815 if err != nil {
816 log.GetLogger(ctx).Warningf("Failed to create a metric fetching helper for %s in %s: %v", md.Name, metricName, err)
817 } else {
818 log.GetLogger(ctx).Info("Successfully created metric fetching helper for", md.Name, metricName)
819 }
820 }
821 return nil
822 }
823
824 func CloseResourcesForRemovedMonitoredDBs(metricsWriter sinks.Writer, currentDBs, prevLoopDBs sources.SourceConns, shutDownDueToRoleChange map[string]bool) {
825 var curDBsMap = make(map[string]bool)
826
827 for _, curDB := range currentDBs {
828 curDBsMap[curDB.Name] = true
829 }
830
831 for _, prevDB := range prevLoopDBs {
832 if _, ok := curDBsMap[prevDB.Name]; !ok {
833 prevDB.Conn.Close()
834 _ = metricsWriter.SyncMetric(prevDB.Name, "", "remove")
835 }
836 }
837
838
839 for roleChangedDB := range shutDownDueToRoleChange {
840 if db := currentDBs.GetMonitoredDatabase(roleChangedDB); db != nil {
841 db.Conn.Close()
842 }
843 _ = metricsWriter.SyncMetric(roleChangedDB, "", "remove")
844 }
845 }
846