1 package reaper
2
3 import (
4 "cmp"
5 "context"
6 "fmt"
7 "runtime"
8 "slices"
9 "strings"
10 "time"
11
12 "sync/atomic"
13
14 "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
15 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
16 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
17 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
18 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
19 )
20
21 const (
22 specialMetricChangeEvents = "change_events"
23 specialMetricServerLogEventCounts = "server_log_event_counts"
24 specialMetricInstanceUp = "instance_up"
25 )
26
27 var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
28
29 var hostLastKnownStatusInRecovery = make(map[string]bool)
30 var metricsConfig metrics.MetricIntervals
31 var metricDefs = NewConcurrentMetricDefs()
32
33
34 type Reaper struct {
35 *cmdopts.Options
36 ready atomic.Bool
37 measurementCh chan metrics.MeasurementEnvelope
38 measurementCache *InstanceMetricCache
39 logger log.Logger
40 monitoredSources sources.SourceConns
41 prevLoopMonitoredDBs sources.SourceConns
42 cancelFuncs map[string]context.CancelFunc
43 }
44
45
46 func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
47 return &Reaper{
48 Options: opts,
49 measurementCh: make(chan metrics.MeasurementEnvelope, 256),
50 measurementCache: NewInstanceMetricCache(),
51 logger: log.GetLogger(ctx),
52 monitoredSources: make(sources.SourceConns, 0),
53 prevLoopMonitoredDBs: make(sources.SourceConns, 0),
54 cancelFuncs: make(map[string]context.CancelFunc),
55 }
56 }
57
58
59 func (r *Reaper) Ready() bool {
60 return r.ready.Load()
61 }
62
63 func (r *Reaper) PrintMemStats() {
64 var m runtime.MemStats
65 runtime.ReadMemStats(&m)
66
67 bToKb := func(b uint64) uint64 {
68 return b / 1024
69 }
70 r.logger.Debugf("Alloc: %d Kb, TotalAlloc: %d Kb, Sys: %d Kb, NumGC: %d, HeapAlloc: %d Kb, HeapSys: %d Kb",
71 bToKb(m.Alloc), bToKb(m.TotalAlloc), bToKb(m.Sys), m.NumGC, bToKb(m.HeapAlloc), bToKb(m.HeapSys))
72 }
73
74
75
76
77
78 func (r *Reaper) Reap(ctx context.Context) {
79 var err error
80 logger := r.logger
81
82 go r.WriteMeasurements(ctx)
83
84 r.ready.Store(true)
85
86 for {
87 if r.Logging.LogLevel == "debug" {
88 r.PrintMemStats()
89 }
90 if err = r.LoadSources(ctx); err != nil {
91 logger.WithError(err).Error("could not refresh active sources, using last valid cache")
92 }
93 if err = r.LoadMetrics(); err != nil {
94 logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
95 }
96
97
98 hostsToShutDownDueToRoleChange := make(map[string]bool)
99 for _, monitoredSource := range r.monitoredSources {
100 srcL := logger.WithField("source", monitoredSource.Name)
101 ctx = log.WithLogger(ctx, srcL)
102
103 if monitoredSource.Connect(ctx, r.Sources) != nil {
104 r.WriteInstanceDown(monitoredSource)
105 srcL.Warning("could not init connection, retrying on next iteration")
106 continue
107 }
108
109 if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
110 srcL.WithError(err).Error("could not start metric gathering")
111 continue
112 }
113 srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
114 if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
115 srcL.Info("not added to monitoring due to 'master only' property")
116 if monitoredSource.IsPostgresSource() {
117 srcL.Info("to be removed from monitoring due to 'master only' property and status change")
118 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
119 }
120 continue
121 }
122
123 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
124 metricsConfig = monitoredSource.MetricsStandby
125 } else {
126 metricsConfig = monitoredSource.Metrics
127 }
128
129 r.CreateSourceHelpers(ctx, srcL, monitoredSource)
130
131 if monitoredSource.IsPostgresSource() {
132 DBSizeMB := monitoredSource.ApproxDbSize / 1048576
133 if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
134 srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
135 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
136 continue
137 }
138
139 lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
140 if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
141 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
142 srcL.Warning("Switching metrics collection to standby config...")
143 metricsConfig = monitoredSource.MetricsStandby
144 } else if !monitoredSource.IsInRecovery {
145 srcL.Warning("Switching metrics collection to primary config...")
146 metricsConfig = monitoredSource.Metrics
147 }
148
149 }
150 }
151 hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
152
153 for metricName, interval := range metricsConfig {
154 metricDefExists := false
155 var mvp metrics.Metric
156
157 mvp, metricDefExists = metricDefs.GetMetricDef(metricName)
158
159 dbMetric := monitoredSource.Name + dbMetricJoinStr + metricName
160 _, cancelFuncExists := r.cancelFuncs[dbMetric]
161
162 if metricDefExists && !cancelFuncExists {
163 if interval > 0 {
164 srcL.WithField("metric", metricName).WithField("interval", interval).Info("starting gatherer")
165 metricCtx, cancelFunc := context.WithCancel(ctx)
166 r.cancelFuncs[dbMetric] = cancelFunc
167
168 metricNameForStorage := metricName
169 if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
170 metricNameForStorage = mvp.StorageName
171 }
172
173 if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, sinks.AddOp); err != nil {
174 srcL.Error(err)
175 }
176
177 go r.reapMetricMeasurements(metricCtx, monitoredSource, metricName)
178 }
179 } else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
180
181 if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
182 cancelFunc()
183 }
184 srcL.WithField("metric", metricName).Warning("shutting down gatherer...")
185 delete(r.cancelFuncs, dbMetric)
186 } else if !metricDefExists {
187 epoch, ok := lastSQLFetchError.Load(metricName)
188 if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) {
189 srcL.WithField("metric", metricName).Warning("metric definition not found")
190 lastSQLFetchError.Store(metricName, time.Now().Unix())
191 }
192 }
193 }
194 }
195
196 r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
197
198 r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
199 select {
200 case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
201 logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
202 case <-ctx.Done():
203 return
204 }
205 }
206 }
207
208
209 func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
210 if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
211 return
212 }
213 if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
214 return
215 }
216
217 if r.Sources.TryCreateListedExtsIfMissing > "" {
218 srcL.Info("trying to create extensions if missing")
219 extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
220 extsCreated, err := monitoredSource.TryCreateMissingExtensions(ctx, extsToCreate)
221 if err != nil {
222 srcL.Warning(err)
223 }
224 if extsCreated != "" {
225 srcL.Infof("%d/%d extensions created: %s", len(extsCreated), len(extsToCreate), extsCreated)
226 }
227 }
228
229 if r.Sources.CreateHelpers {
230 srcL.Info("trying to create helper objects if missing")
231 if err := monitoredSource.TryCreateMetricsHelpers(ctx, func(metric string) string {
232 if m, ok := metricDefs.GetMetricDef(metric); ok {
233 return m.InitSQL
234 }
235 return ""
236 }); err != nil {
237 srcL.Warning(err)
238 }
239 }
240
241 }
242
243 func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool) {
244 logger := r.logger
245
246
247 logger.Debug("checking if any workers need to be shut down...")
248 for dbMetric, cancelFunc := range r.cancelFuncs {
249 var currentMetricConfig metrics.MetricIntervals
250 var md *sources.SourceConn
251 var dbRemovedFromConfig bool
252 var metricRemovedFromPreset bool
253 splits := strings.Split(dbMetric, dbMetricJoinStr)
254 db := splits[0]
255 metric := splits[1]
256
257 _, wholeDbShutDown := hostsToShutDown[db]
258 if !wholeDbShutDown {
259 md = r.monitoredSources.GetMonitoredDatabase(db)
260 if md == nil {
261 dbRemovedFromConfig = true
262 logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
263 }
264 }
265
266
267
268
269
270
271 if !(wholeDbShutDown || dbRemovedFromConfig) {
272 if md.IsInRecovery && len(md.MetricsStandby) > 0 {
273 currentMetricConfig = md.MetricsStandby
274 } else {
275 currentMetricConfig = md.Metrics
276 }
277 interval, isMetricActive := currentMetricConfig[metric]
278 metricRemovedFromPreset = !isMetricActive || interval <= 0
279 }
280
281 if ctx.Err() != nil || wholeDbShutDown || dbRemovedFromConfig || metricRemovedFromPreset {
282 logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
283 cancelFunc()
284 delete(r.cancelFuncs, dbMetric)
285 if err := r.SinksWriter.SyncMetric(db, metric, sinks.DeleteOp); err != nil {
286 logger.Error(err)
287 }
288 }
289 }
290
291
292 r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDown)
293 }
294
295 func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
296 var lastUptimeS int64 = -1
297 var lastErrorNotificationTime time.Time
298 var err error
299 var ok bool
300
301 failedFetches := 0
302 lastDBVersionFetchTime := time.Unix(0, 0)
303
304 l := log.GetLogger(ctx).WithField("metric", metricName)
305 ctx = log.WithLogger(ctx, l)
306
307 if metricName == specialMetricServerLogEventCounts {
308 lp, err := NewLogParser(ctx, md, r.measurementCh)
309 if err != nil {
310 l.WithError(err).Error("Failed to init log parser")
311 return
312 }
313 err = lp.ParseLogs()
314 if err != nil {
315 l.WithError(err).Error("Error parsing logs")
316 }
317 return
318 }
319
320 for {
321 interval := md.GetMetricInterval(metricName)
322 if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
323
324 if err = md.FetchRuntimeInfo(ctx, false); err != nil {
325 lastDBVersionFetchTime = time.Now()
326 }
327
328 if _, ok = metricDefs.GetMetricDef(metricName); !ok {
329 l.WithField("source", md.Name).Error("metric definition not found")
330 return
331 }
332 }
333
334 var metricStoreMessages *metrics.MeasurementEnvelope
335
336 t1 := time.Now()
337
338 if IsDirectlyFetchableMetric(md, metricName) {
339 if metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName); err != nil {
340 l.WithError(err).Errorf("Could not read metric directly from OS")
341 }
342 }
343 if metricStoreMessages == nil {
344 metricStoreMessages, err = r.FetchMetric(ctx, md, metricName)
345 }
346
347 if time.Since(t1) > interval {
348 l.Warningf("Total fetching time of %v bigger than %v interval", time.Since(t1), interval)
349 }
350
351 if err != nil {
352 failedFetches++
353
354 if time.Since(lastErrorNotificationTime) > time.Minute*10 {
355 l.WithError(err).WithField("count", failedFetches).Error("failed to fetch metric data")
356 lastErrorNotificationTime = time.Now()
357 }
358 } else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
359 r.measurementCh <- *metricStoreMessages
360
361 if metricName == "db_stats" {
362 postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
363 if ok {
364 if lastUptimeS != -1 {
365 if postmasterUptimeS.(int64) < lastUptimeS {
366 message := "Detected server restart (or failover)"
367 l.Warning(message)
368 detectedChangesSummary := make(metrics.Measurements, 0)
369 entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
370 entry["details"] = message
371 detectedChangesSummary = append(detectedChangesSummary, entry)
372 r.measurementCh <- metrics.MeasurementEnvelope{
373 DBName: md.Name,
374 MetricName: "object_changes",
375 Data: detectedChangesSummary,
376 CustomTags: metricStoreMessages.CustomTags,
377 }
378 }
379 }
380 lastUptimeS = postmasterUptimeS.(int64)
381 }
382 }
383 }
384
385 select {
386 case <-ctx.Done():
387 return
388 case <-time.After(interval):
389
390 }
391 }
392 }
393
394
395 func (r *Reaper) LoadSources(ctx context.Context) (err error) {
396 if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
397 r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
398 r.monitoredSources = make(sources.SourceConns, 0)
399 return nil
400 }
401
402 var newSrcs sources.SourceConns
403 srcs, err := r.SourcesReaderWriter.GetSources()
404 if err != nil {
405 return err
406 }
407 srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
408 return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group)
409 })
410 if newSrcs, err = srcs.ResolveDatabases(); err != nil {
411 r.logger.WithError(err).Error("could not resolve databases from sources")
412 }
413
414 for i, newMD := range newSrcs {
415 md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
416 if md == nil {
417 continue
418 }
419 if md.Equal(newMD.Source) {
420
421 newSrcs[i] = md
422 continue
423 }
424
425
426 r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...")
427 r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true})
428 }
429 r.monitoredSources = newSrcs
430 r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
431 return nil
432 }
433
434
435 func (r *Reaper) WriteInstanceDown(md *sources.SourceConn) {
436 r.measurementCh <- metrics.MeasurementEnvelope{
437 DBName: md.Name,
438 MetricName: specialMetricInstanceUp,
439 Data: metrics.Measurements{metrics.Measurement{
440 metrics.EpochColumnName: time.Now().UnixNano(),
441 specialMetricInstanceUp: 0},
442 },
443 }
444 }
445
446
447 func (r *Reaper) WriteMeasurements(ctx context.Context) {
448 var err error
449 for {
450 select {
451 case <-ctx.Done():
452 return
453 case msg := <-r.measurementCh:
454 if err = r.SinksWriter.Write(msg); err != nil {
455 r.logger.Error(err)
456 }
457 }
458 }
459 }
460
461 func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
462 for _, dr := range data {
463 if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
464 dr[r.Sinks.RealDbnameField] = md.RealDbname
465 }
466 if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
467 dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
468 }
469 }
470 }
471
472 func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string) (_ *metrics.MeasurementEnvelope, err error) {
473 var sql string
474 var data metrics.Measurements
475 var metric metrics.Metric
476 var fromCache bool
477 var cacheKey string
478 var ok bool
479
480 if metric, ok = metricDefs.GetMetricDef(metricName); !ok {
481 return nil, metrics.ErrMetricNotFound
482 }
483 l := log.GetLogger(ctx)
484
485 if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && md.GetMetricInterval(metricName) < r.Metrics.CacheAge() {
486 cacheKey = fmt.Sprintf("%s:%s", md.GetClusterIdentifier(), metricName)
487 }
488 data = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge())
489 fromCache = len(data) > 0
490 if !fromCache {
491 if (metric.PrimaryOnly() && md.IsInRecovery) || (metric.StandbyOnly() && !md.IsInRecovery) {
492 l.Debug("Skipping fetching of as server in wrong IsInRecovery: ", md.IsInRecovery)
493 return nil, nil
494 }
495 switch metricName {
496 case specialMetricChangeEvents:
497 data, err = r.GetObjectChangesMeasurement(ctx, md)
498 case specialMetricInstanceUp:
499 data, err = r.GetInstanceUpMeasurement(ctx, md)
500 default:
501 sql = metric.GetSQL(md.Version)
502 if sql == "" {
503 l.WithField("source", md.Name).WithField("version", md.Version).Warning("no SQL found for metric version")
504 return nil, nil
505 }
506 data, err = QueryMeasurements(ctx, md, sql)
507 }
508 if err != nil || len(data) == 0 {
509 return nil, err
510 }
511 r.measurementCache.Put(cacheKey, data)
512 }
513 r.AddSysinfoToMeasurements(data, md)
514 l.WithField("cache", fromCache).WithField("rows", len(data)).Info("measurements fetched")
515 return &metrics.MeasurementEnvelope{
516 DBName: md.Name,
517 MetricName: cmp.Or(metric.StorageName, metricName),
518 Data: data,
519 CustomTags: md.CustomTags}, nil
520 }
521