1 package reaper
2
3 import (
4 "cmp"
5 "context"
6 "fmt"
7 "runtime"
8 "slices"
9 "strings"
10 "time"
11
12 "sync/atomic"
13
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/cmdopts"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
17 "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
18 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
19 )
20
21 const (
22 specialMetricChangeEvents = "change_events"
23 specialMetricServerLogEventCounts = "server_log_event_counts"
24 specialMetricInstanceUp = "instance_up"
25 )
26
27 var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
28
29 var hostLastKnownStatusInRecovery = make(map[string]bool)
30 var metricsConfig map[string]float64
31 var metricDefs = NewConcurrentMetricDefs()
32
33
34 type Reaper struct {
35 *cmdopts.Options
36 ready atomic.Bool
37 measurementCh chan metrics.MeasurementEnvelope
38 measurementCache *InstanceMetricCache
39 logger log.Logger
40 monitoredSources sources.SourceConns
41 prevLoopMonitoredDBs sources.SourceConns
42 cancelFuncs map[string]context.CancelFunc
43 }
44
45
46 func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
47 return &Reaper{
48 Options: opts,
49 measurementCh: make(chan metrics.MeasurementEnvelope, 256),
50 measurementCache: NewInstanceMetricCache(),
51 logger: log.GetLogger(ctx),
52 monitoredSources: make(sources.SourceConns, 0),
53 prevLoopMonitoredDBs: make(sources.SourceConns, 0),
54 cancelFuncs: make(map[string]context.CancelFunc),
55 }
56 }
57
58
59 func (r *Reaper) Ready() bool {
60 return r.ready.Load()
61 }
62
63 func (r *Reaper) PrintMemStats() {
64 var m runtime.MemStats
65 runtime.ReadMemStats(&m)
66
67 bToKb := func(b uint64) uint64 {
68 return b / 1024
69 }
70 r.logger.Debugf("Alloc: %d Kb, TotalAlloc: %d Kb, Sys: %d Kb, NumGC: %d, HeapAlloc: %d Kb, HeapSys: %d Kb",
71 bToKb(m.Alloc), bToKb(m.TotalAlloc), bToKb(m.Sys), m.NumGC, bToKb(m.HeapAlloc), bToKb(m.HeapSys))
72 }
73
74
75
76
77
78 func (r *Reaper) Reap(ctx context.Context) {
79 var err error
80 logger := r.logger
81
82 go r.WriteMeasurements(ctx)
83 go r.WriteMonitoredSources(ctx)
84
85 r.ready.Store(true)
86
87 for {
88 if r.Logging.LogLevel == "debug" {
89 r.PrintMemStats()
90 }
91 if err = r.LoadSources(); err != nil {
92 logger.WithError(err).Error("could not refresh active sources, using last valid cache")
93 }
94 if err = r.LoadMetrics(); err != nil {
95 logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
96 }
97
98
99 hostsToShutDownDueToRoleChange := make(map[string]bool)
100 for _, monitoredSource := range r.monitoredSources {
101 srcL := logger.WithField("source", monitoredSource.Name)
102
103 if monitoredSource.Connect(ctx, r.Sources) != nil {
104 srcL.Warning("could not init connection, retrying on next iteration")
105 continue
106 }
107
108 if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
109 srcL.WithError(err).Error("could not start metric gathering")
110 continue
111 }
112 srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
113 if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
114 srcL.Info("not added to monitoring due to 'master only' property")
115 if monitoredSource.IsPostgresSource() {
116 srcL.Info("to be removed from monitoring due to 'master only' property and status change")
117 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
118 }
119 continue
120 }
121
122 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
123 metricsConfig = monitoredSource.MetricsStandby
124 } else {
125 metricsConfig = monitoredSource.Metrics
126 }
127
128 r.CreateSourceHelpers(ctx, srcL, monitoredSource)
129
130 if monitoredSource.IsPostgresSource() {
131 DBSizeMB := monitoredSource.ApproxDbSize / 1048576
132 if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
133 srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
134 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
135 continue
136 }
137
138 lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
139 if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
140 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
141 srcL.Warning("Switching metrics collection to standby config...")
142 metricsConfig = monitoredSource.MetricsStandby
143 } else if !monitoredSource.IsInRecovery {
144 srcL.Warning("Switching metrics collection to primary config...")
145 metricsConfig = monitoredSource.Metrics
146 }
147
148 }
149 }
150 hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
151
152 for metricName, interval := range metricsConfig {
153 metricDefExists := false
154 var mvp metrics.Metric
155
156 mvp, metricDefExists = metricDefs.GetMetricDef(metricName)
157
158 dbMetric := monitoredSource.Name + dbMetricJoinStr + metricName
159 _, cancelFuncExists := r.cancelFuncs[dbMetric]
160
161 if metricDefExists && !cancelFuncExists {
162 if interval > 0 {
163 srcL.WithField("metric", metricName).WithField("interval", interval).Info("starting gatherer")
164 metricCtx, cancelFunc := context.WithCancel(ctx)
165 r.cancelFuncs[dbMetric] = cancelFunc
166
167 metricNameForStorage := metricName
168 if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
169 metricNameForStorage = mvp.StorageName
170 }
171
172 if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, sinks.AddOp); err != nil {
173 srcL.Error(err)
174 }
175
176 go r.reapMetricMeasurements(metricCtx, monitoredSource, metricName)
177 }
178 } else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
179
180 if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
181 cancelFunc()
182 }
183 srcL.WithField("metric", metricName).Warning("shutting down gatherer...")
184 delete(r.cancelFuncs, dbMetric)
185 } else if !metricDefExists {
186 epoch, ok := lastSQLFetchError.Load(metricName)
187 if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) {
188 srcL.WithField("metric", metricName).Warning("metric definition not found")
189 lastSQLFetchError.Store(metricName, time.Now().Unix())
190 }
191 }
192 }
193 }
194
195 r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
196
197 r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
198 select {
199 case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
200 logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
201 case <-ctx.Done():
202 return
203 }
204 }
205 }
206
207
208 func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
209 if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
210 return
211 }
212 if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
213 return
214 }
215
216 if r.Sources.TryCreateListedExtsIfMissing > "" {
217 srcL.Info("trying to create extensions if missing")
218 extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
219 extsCreated := TryCreateMissingExtensions(ctx, monitoredSource, extsToCreate, monitoredSource.Extensions)
220 srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
221 }
222
223 if r.Sources.CreateHelpers {
224 srcL.Info("trying to create helper objects if missing")
225 if err := TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
226 srcL.WithError(err).Warning("failed to create helper functions")
227 }
228 }
229
230 }
231
232 func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRoleChange map[string]bool) {
233 logger := r.logger
234
235
236 logger.Debug("checking if any workers need to be shut down...")
237 for dbMetric, cancelFunc := range r.cancelFuncs {
238 var currentMetricConfig map[string]float64
239 var md *sources.SourceConn
240 var dbRemovedFromConfig bool
241 singleMetricDisabled := false
242 splits := strings.Split(dbMetric, dbMetricJoinStr)
243 db := splits[0]
244 metric := splits[1]
245
246 _, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
247 if !wholeDbShutDownDueToRoleChange {
248 md = r.monitoredSources.GetMonitoredDatabase(db)
249 if md == nil {
250 dbRemovedFromConfig = true
251 logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
252 }
253 }
254
255 if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) {
256 if md.IsInRecovery && len(md.MetricsStandby) > 0 {
257 currentMetricConfig = md.MetricsStandby
258 } else {
259 currentMetricConfig = md.Metrics
260 }
261 interval, isMetricActive := currentMetricConfig[metric]
262 singleMetricDisabled = !isMetricActive || interval <= 0
263 }
264
265 if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
266 logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
267 cancelFunc()
268 delete(r.cancelFuncs, dbMetric)
269 if err := r.SinksWriter.SyncMetric(db, metric, sinks.DeleteOp); err != nil {
270 logger.Error(err)
271 }
272 }
273 }
274
275
276 r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDownDueToRoleChange)
277 }
278
279 func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
280 hostState := make(map[string]map[string]string)
281 var lastUptimeS int64 = -1
282 var lastErrorNotificationTime time.Time
283 var err error
284 var ok bool
285
286 failedFetches := 0
287 lastDBVersionFetchTime := time.Unix(0, 0)
288
289 l := r.logger.WithField("source", md.Name).WithField("metric", metricName)
290 if metricName == specialMetricServerLogEventCounts {
291 metrics.ParseLogs(ctx, md, md.RealDbname, md.GetMetricInterval(metricName), r.measurementCh, "", "")
292 return
293 }
294
295 for {
296 interval := md.GetMetricInterval(metricName)
297 if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
298
299 if err = md.FetchRuntimeInfo(ctx, false); err != nil {
300 lastDBVersionFetchTime = time.Now()
301 }
302
303 if _, ok = metricDefs.GetMetricDef(metricName); !ok {
304 l.Error("Could not get metric version properties")
305 return
306 }
307 }
308
309 var metricStoreMessages *metrics.MeasurementEnvelope
310
311
312 if r.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
313 metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName)
314 if err != nil {
315 l.WithError(err).Errorf("Could not read metric directly from OS")
316 }
317 }
318 t1 := time.Now()
319 if metricStoreMessages == nil {
320 metricStoreMessages, err = r.FetchMetric(ctx, md, metricName, hostState)
321 }
322
323 if time.Since(t1) > (time.Second * time.Duration(interval)) {
324 l.Warningf("Total fetching time of %v bigger than %vs interval", time.Since(t1), interval)
325 }
326
327 if err != nil {
328 failedFetches++
329
330 if time.Since(lastErrorNotificationTime) > time.Minute*10 {
331 l.WithError(err).WithField("count", failedFetches).Error("failed to fetch metric data")
332 lastErrorNotificationTime = time.Now()
333 }
334 } else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
335 r.measurementCh <- *metricStoreMessages
336
337 if metricName == "db_stats" {
338 postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
339 if ok {
340 if lastUptimeS != -1 {
341 if postmasterUptimeS.(int64) < lastUptimeS {
342 message := "Detected server restart (or failover)"
343 l.Warning(message)
344 detectedChangesSummary := make(metrics.Measurements, 0)
345 entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
346 entry["details"] = message
347 detectedChangesSummary = append(detectedChangesSummary, entry)
348 r.measurementCh <- metrics.MeasurementEnvelope{
349 DBName: md.Name,
350 MetricName: "object_changes",
351 Data: detectedChangesSummary,
352 CustomTags: metricStoreMessages.CustomTags,
353 }
354 }
355 }
356 lastUptimeS = postmasterUptimeS.(int64)
357 }
358 }
359 }
360
361 select {
362 case <-ctx.Done():
363 return
364 case <-time.After(time.Second * time.Duration(interval)):
365
366 }
367 }
368 }
369
370
371 func (r *Reaper) LoadSources() (err error) {
372 if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
373 r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
374 r.monitoredSources = make(sources.SourceConns, 0)
375 return nil
376 }
377
378 var newSrcs sources.SourceConns
379 srcs, err := r.SourcesReaderWriter.GetSources()
380 if err != nil {
381 return err
382 }
383 srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
384 return !s.IsEnabled || len(r.Sources.Groups) > 0 && !s.IsDefaultGroup() && !slices.Contains(r.Sources.Groups, s.Group)
385 })
386 if newSrcs, err = srcs.ResolveDatabases(); err != nil {
387 r.logger.WithError(err).Error("could not resolve databases from sources")
388 }
389
390 for i, newMD := range newSrcs {
391 md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
392 if md == nil {
393 continue
394 }
395 if md.Equal(newMD.Source) {
396
397 newSrcs[i] = md
398 continue
399 }
400 }
401 r.monitoredSources = newSrcs
402 r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
403 return nil
404 }
405
406
407
408 func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
409 for {
410 if len(r.monitoredSources) > 0 {
411 now := time.Now().UnixNano()
412 for _, mdb := range r.monitoredSources {
413 db := metrics.NewMeasurement(now)
414 db["tag_group"] = mdb.Group
415 db["master_only"] = mdb.OnlyIfMaster
416 for k, v := range mdb.CustomTags {
417 db[metrics.TagPrefix+k] = v
418 }
419 r.measurementCh <- metrics.MeasurementEnvelope{
420 DBName: mdb.Name,
421 MetricName: monitoredDbsDatastoreSyncMetricName,
422 Data: metrics.Measurements{db},
423 }
424 }
425 }
426 select {
427 case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
428
429 case <-ctx.Done():
430 return
431 }
432 }
433 }
434
435
436 func (r *Reaper) WriteMeasurements(ctx context.Context) {
437 var err error
438 for {
439 select {
440 case <-ctx.Done():
441 return
442 case msg := <-r.measurementCh:
443 if err = r.SinksWriter.Write(msg); err != nil {
444 r.logger.Error(err)
445 }
446 }
447 }
448 }
449
450 func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
451 for _, dr := range data {
452 if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
453 dr[r.Sinks.RealDbnameField] = md.RealDbname
454 }
455 if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
456 dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
457 }
458 }
459 }
460
461 func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string, hostState map[string]map[string]string) (_ *metrics.MeasurementEnvelope, err error) {
462 var sql string
463 var data metrics.Measurements
464 var metric metrics.Metric
465 var fromCache bool
466 var cacheKey string
467 var ok bool
468
469 if metric, ok = metricDefs.GetMetricDef(metricName); !ok {
470 return nil, metrics.ErrMetricNotFound
471 }
472 l := r.logger.WithField("source", md.Name).WithField("metric", metricName)
473
474 if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && time.Second*time.Duration(md.GetMetricInterval(metricName)) < r.Metrics.CacheAge() {
475 cacheKey = fmt.Sprintf("%s:%s", md.GetClusterIdentifier(), metricName)
476 }
477 data = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge())
478 fromCache = len(data) > 0
479 if !fromCache {
480 if (metric.PrimaryOnly() && md.IsInRecovery) || (metric.StandbyOnly() && !md.IsInRecovery) {
481 l.Debug("Skipping fetching of as server in wrong IsInRecovery: ", md.IsInRecovery)
482 return nil, nil
483 }
484 switch metricName {
485 case specialMetricChangeEvents:
486 r.CheckForPGObjectChangesAndStore(ctx, md.Name, md, hostState)
487 return nil, nil
488 case specialMetricInstanceUp:
489 data, err = r.GetInstanceUpMeasurement(ctx, md)
490 default:
491 sql = metric.GetSQL(md.Version)
492 if sql == "" {
493 l.Warning("Ignoring fetching because of empty SQL")
494 return nil, nil
495 }
496 data, err = QueryMeasurements(ctx, md, sql)
497 }
498 if err != nil {
499 return nil, err
500 }
501 r.measurementCache.Put(cacheKey, data)
502 }
503 r.AddSysinfoToMeasurements(data, md)
504 l.WithField("cache", fromCache).WithField("rows", len(data)).Info("measurements fetched")
505 return &metrics.MeasurementEnvelope{
506 DBName: md.Name,
507 MetricName: cmp.Or(metric.StorageName, metricName),
508 Data: data,
509 CustomTags: md.CustomTags}, nil
510 }
511