1 package reaper
2
3 import (
4 "cmp"
5 "context"
6 "fmt"
7 "runtime"
8 "slices"
9 "strings"
10 "time"
11
12 "sync/atomic"
13
14 "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
15 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
16 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
17 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
18 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
19 )
20
21 const (
22 specialMetricChangeEvents = "change_events"
23 specialMetricServerLogEventCounts = "server_log_event_counts"
24 specialMetricInstanceUp = "instance_up"
25 )
26
27 var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
28
29 var hostLastKnownStatusInRecovery = make(map[string]bool)
30 var metricsConfig map[string]float64
31 var metricDefs = NewConcurrentMetricDefs()
32
33
34 type Reaper struct {
35 *cmdopts.Options
36 ready atomic.Bool
37 measurementCh chan metrics.MeasurementEnvelope
38 measurementCache *InstanceMetricCache
39 logger log.Logger
40 monitoredSources sources.SourceConns
41 prevLoopMonitoredDBs sources.SourceConns
42 cancelFuncs map[string]context.CancelFunc
43 }
44
45
46 func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
47 return &Reaper{
48 Options: opts,
49 measurementCh: make(chan metrics.MeasurementEnvelope, 256),
50 measurementCache: NewInstanceMetricCache(),
51 logger: log.GetLogger(ctx),
52 monitoredSources: make(sources.SourceConns, 0),
53 prevLoopMonitoredDBs: make(sources.SourceConns, 0),
54 cancelFuncs: make(map[string]context.CancelFunc),
55 }
56 }
57
58
59 func (r *Reaper) Ready() bool {
60 return r.ready.Load()
61 }
62
63 func (r *Reaper) PrintMemStats() {
64 var m runtime.MemStats
65 runtime.ReadMemStats(&m)
66
67 bToKb := func(b uint64) uint64 {
68 return b / 1024
69 }
70 r.logger.Debugf("Alloc: %d Kb, TotalAlloc: %d Kb, Sys: %d Kb, NumGC: %d, HeapAlloc: %d Kb, HeapSys: %d Kb",
71 bToKb(m.Alloc), bToKb(m.TotalAlloc), bToKb(m.Sys), m.NumGC, bToKb(m.HeapAlloc), bToKb(m.HeapSys))
72 }
73
74
75
76
77
78 func (r *Reaper) Reap(ctx context.Context) {
79 var err error
80 logger := r.logger
81
82 go r.WriteMeasurements(ctx)
83 go r.WriteMonitoredSources(ctx)
84
85 r.ready.Store(true)
86
87 for {
88 if r.Logging.LogLevel == "debug" {
89 r.PrintMemStats()
90 }
91 if err = r.LoadSources(ctx); err != nil {
92 logger.WithError(err).Error("could not refresh active sources, using last valid cache")
93 }
94 if err = r.LoadMetrics(); err != nil {
95 logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
96 }
97
98
99 hostsToShutDownDueToRoleChange := make(map[string]bool)
100 for _, monitoredSource := range r.monitoredSources {
101 srcL := logger.WithField("source", monitoredSource.Name)
102 ctx = log.WithLogger(ctx, srcL)
103
104 if monitoredSource.Connect(ctx, r.Sources) != nil {
105 srcL.Warning("could not init connection, retrying on next iteration")
106 continue
107 }
108
109 if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
110 srcL.WithError(err).Error("could not start metric gathering")
111 continue
112 }
113 srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
114 if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
115 srcL.Info("not added to monitoring due to 'master only' property")
116 if monitoredSource.IsPostgresSource() {
117 srcL.Info("to be removed from monitoring due to 'master only' property and status change")
118 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
119 }
120 continue
121 }
122
123 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
124 metricsConfig = monitoredSource.MetricsStandby
125 } else {
126 metricsConfig = monitoredSource.Metrics
127 }
128
129 r.CreateSourceHelpers(ctx, srcL, monitoredSource)
130
131 if monitoredSource.IsPostgresSource() {
132 DBSizeMB := monitoredSource.ApproxDbSize / 1048576
133 if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
134 srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
135 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
136 continue
137 }
138
139 lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
140 if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
141 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
142 srcL.Warning("Switching metrics collection to standby config...")
143 metricsConfig = monitoredSource.MetricsStandby
144 } else if !monitoredSource.IsInRecovery {
145 srcL.Warning("Switching metrics collection to primary config...")
146 metricsConfig = monitoredSource.Metrics
147 }
148
149 }
150 }
151 hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
152
153 for metricName, interval := range metricsConfig {
154 metricDefExists := false
155 var mvp metrics.Metric
156
157 mvp, metricDefExists = metricDefs.GetMetricDef(metricName)
158
159 dbMetric := monitoredSource.Name + dbMetricJoinStr + metricName
160 _, cancelFuncExists := r.cancelFuncs[dbMetric]
161
162 if metricDefExists && !cancelFuncExists {
163 if interval > 0 {
164 srcL.WithField("metric", metricName).WithField("interval", interval).Info("starting gatherer")
165 metricCtx, cancelFunc := context.WithCancel(ctx)
166 r.cancelFuncs[dbMetric] = cancelFunc
167
168 metricNameForStorage := metricName
169 if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
170 metricNameForStorage = mvp.StorageName
171 }
172
173 if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, sinks.AddOp); err != nil {
174 srcL.Error(err)
175 }
176
177 go r.reapMetricMeasurements(metricCtx, monitoredSource, metricName)
178 }
179 } else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
180
181 if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
182 cancelFunc()
183 }
184 srcL.WithField("metric", metricName).Warning("shutting down gatherer...")
185 delete(r.cancelFuncs, dbMetric)
186 } else if !metricDefExists {
187 epoch, ok := lastSQLFetchError.Load(metricName)
188 if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) {
189 srcL.WithField("metric", metricName).Warning("metric definition not found")
190 lastSQLFetchError.Store(metricName, time.Now().Unix())
191 }
192 }
193 }
194 }
195
196 r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
197
198 r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
199 select {
200 case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
201 logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
202 case <-ctx.Done():
203 return
204 }
205 }
206 }
207
208
209 func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
210 if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
211 return
212 }
213 if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
214 return
215 }
216
217 if r.Sources.TryCreateListedExtsIfMissing > "" {
218 srcL.Info("trying to create extensions if missing")
219 extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
220 monitoredSource.RLock()
221 extsCreated := TryCreateMissingExtensions(ctx, monitoredSource, extsToCreate, monitoredSource.Extensions)
222 monitoredSource.RUnlock()
223 srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
224 }
225
226 if r.Sources.CreateHelpers {
227 srcL.Info("trying to create helper objects if missing")
228 if err := TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
229 srcL.WithError(err).Warning("failed to create helper functions")
230 }
231 }
232
233 }
234
235 func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool) {
236 logger := r.logger
237
238
239 logger.Debug("checking if any workers need to be shut down...")
240 for dbMetric, cancelFunc := range r.cancelFuncs {
241 var currentMetricConfig map[string]float64
242 var md *sources.SourceConn
243 var dbRemovedFromConfig bool
244 var metricRemovedFromPreset bool
245 splits := strings.Split(dbMetric, dbMetricJoinStr)
246 db := splits[0]
247 metric := splits[1]
248
249 _, wholeDbShutDown := hostsToShutDown[db]
250 if !wholeDbShutDown {
251 md = r.monitoredSources.GetMonitoredDatabase(db)
252 if md == nil {
253 dbRemovedFromConfig = true
254 logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
255 }
256 }
257
258
259
260
261
262
263 if !(wholeDbShutDown || dbRemovedFromConfig) {
264 if md.IsInRecovery && len(md.MetricsStandby) > 0 {
265 currentMetricConfig = md.MetricsStandby
266 } else {
267 currentMetricConfig = md.Metrics
268 }
269 interval, isMetricActive := currentMetricConfig[metric]
270 metricRemovedFromPreset = !isMetricActive || interval <= 0
271 }
272
273 if ctx.Err() != nil || wholeDbShutDown || dbRemovedFromConfig || metricRemovedFromPreset {
274 logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
275 cancelFunc()
276 delete(r.cancelFuncs, dbMetric)
277 if err := r.SinksWriter.SyncMetric(db, metric, sinks.DeleteOp); err != nil {
278 logger.Error(err)
279 }
280 }
281 }
282
283
284 r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDown)
285 }
286
287 func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
288 var lastUptimeS int64 = -1
289 var lastErrorNotificationTime time.Time
290 var err error
291 var ok bool
292
293 failedFetches := 0
294 lastDBVersionFetchTime := time.Unix(0, 0)
295
296 l := log.GetLogger(ctx).WithField("metric", metricName)
297 ctx = log.WithLogger(ctx, l)
298
299 if metricName == specialMetricServerLogEventCounts {
300 lp, err := NewLogParser(ctx, md, r.measurementCh)
301 if err != nil {
302 l.WithError(err).Error("Failed to init log parser")
303 return
304 }
305 err = lp.ParseLogs()
306 if err != nil {
307 l.WithError(err).Error("Error parsing logs")
308 }
309 return
310 }
311
312 for {
313 interval := md.GetMetricInterval(metricName)
314 if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
315
316 if err = md.FetchRuntimeInfo(ctx, false); err != nil {
317 lastDBVersionFetchTime = time.Now()
318 }
319
320 if _, ok = metricDefs.GetMetricDef(metricName); !ok {
321 l.Error("Could not get metric version properties")
322 return
323 }
324 }
325
326 var metricStoreMessages *metrics.MeasurementEnvelope
327
328 t1 := time.Now()
329
330 if IsDirectlyFetchableMetric(md, metricName) {
331 if metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName); err != nil {
332 l.WithError(err).Errorf("Could not read metric directly from OS")
333 }
334 }
335 if metricStoreMessages == nil {
336 metricStoreMessages, err = r.FetchMetric(ctx, md, metricName)
337 }
338
339 if time.Since(t1) > (time.Second * time.Duration(interval)) {
340 l.Warningf("Total fetching time of %v bigger than %vs interval", time.Since(t1), interval)
341 }
342
343 if err != nil {
344 failedFetches++
345
346 if time.Since(lastErrorNotificationTime) > time.Minute*10 {
347 l.WithError(err).WithField("count", failedFetches).Error("failed to fetch metric data")
348 lastErrorNotificationTime = time.Now()
349 }
350 } else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
351 r.measurementCh <- *metricStoreMessages
352
353 if metricName == "db_stats" {
354 postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
355 if ok {
356 if lastUptimeS != -1 {
357 if postmasterUptimeS.(int64) < lastUptimeS {
358 message := "Detected server restart (or failover)"
359 l.Warning(message)
360 detectedChangesSummary := make(metrics.Measurements, 0)
361 entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
362 entry["details"] = message
363 detectedChangesSummary = append(detectedChangesSummary, entry)
364 r.measurementCh <- metrics.MeasurementEnvelope{
365 DBName: md.Name,
366 MetricName: "object_changes",
367 Data: detectedChangesSummary,
368 CustomTags: metricStoreMessages.CustomTags,
369 }
370 }
371 }
372 lastUptimeS = postmasterUptimeS.(int64)
373 }
374 }
375 }
376
377 select {
378 case <-ctx.Done():
379 return
380 case <-time.After(time.Second * time.Duration(interval)):
381
382 }
383 }
384 }
385
386
387 func (r *Reaper) LoadSources(ctx context.Context) (err error) {
388 if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
389 r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
390 r.monitoredSources = make(sources.SourceConns, 0)
391 return nil
392 }
393
394 var newSrcs sources.SourceConns
395 srcs, err := r.SourcesReaderWriter.GetSources()
396 if err != nil {
397 return err
398 }
399 srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
400 return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group)
401 })
402 if newSrcs, err = srcs.ResolveDatabases(); err != nil {
403 r.logger.WithError(err).Error("could not resolve databases from sources")
404 }
405
406 for i, newMD := range newSrcs {
407 md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
408 if md == nil {
409 continue
410 }
411 if md.Equal(newMD.Source) {
412
413 newSrcs[i] = md
414 continue
415 }
416
417
418 r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...")
419 r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true})
420 }
421 r.monitoredSources = newSrcs
422 r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
423 return nil
424 }
425
426
427
428 func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
429 for {
430 if len(r.monitoredSources) > 0 {
431 now := time.Now().UnixNano()
432 for _, mdb := range r.monitoredSources {
433 db := metrics.NewMeasurement(now)
434 db["tag_group"] = mdb.Group
435 db["master_only"] = mdb.OnlyIfMaster
436 for k, v := range mdb.CustomTags {
437 db[metrics.TagPrefix+k] = v
438 }
439 r.measurementCh <- metrics.MeasurementEnvelope{
440 DBName: mdb.Name,
441 MetricName: monitoredDbsDatastoreSyncMetricName,
442 Data: metrics.Measurements{db},
443 }
444 }
445 }
446 select {
447 case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
448
449 case <-ctx.Done():
450 return
451 }
452 }
453 }
454
455
456 func (r *Reaper) WriteMeasurements(ctx context.Context) {
457 var err error
458 for {
459 select {
460 case <-ctx.Done():
461 return
462 case msg := <-r.measurementCh:
463 if err = r.SinksWriter.Write(msg); err != nil {
464 r.logger.Error(err)
465 }
466 }
467 }
468 }
469
470 func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
471 for _, dr := range data {
472 if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
473 dr[r.Sinks.RealDbnameField] = md.RealDbname
474 }
475 if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
476 dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
477 }
478 }
479 }
480
481 func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string) (_ *metrics.MeasurementEnvelope, err error) {
482 var sql string
483 var data metrics.Measurements
484 var metric metrics.Metric
485 var fromCache bool
486 var cacheKey string
487 var ok bool
488
489 if metric, ok = metricDefs.GetMetricDef(metricName); !ok {
490 return nil, metrics.ErrMetricNotFound
491 }
492 l := log.GetLogger(ctx)
493
494 if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && time.Second*time.Duration(md.GetMetricInterval(metricName)) < r.Metrics.CacheAge() {
495 cacheKey = fmt.Sprintf("%s:%s", md.GetClusterIdentifier(), metricName)
496 }
497 data = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge())
498 fromCache = len(data) > 0
499 if !fromCache {
500 if (metric.PrimaryOnly() && md.IsInRecovery) || (metric.StandbyOnly() && !md.IsInRecovery) {
501 l.Debug("Skipping fetching of as server in wrong IsInRecovery: ", md.IsInRecovery)
502 return nil, nil
503 }
504 switch metricName {
505 case specialMetricChangeEvents:
506 data, err = r.GetObjectChangesMeasurement(ctx, md)
507 case specialMetricInstanceUp:
508 data, err = r.GetInstanceUpMeasurement(ctx, md)
509 default:
510 sql = metric.GetSQL(md.Version)
511 if sql == "" {
512 l.Warning("Ignoring fetching because of empty SQL")
513 return nil, nil
514 }
515 data, err = QueryMeasurements(ctx, md, sql)
516 }
517 if err != nil || len(data) == 0 {
518 return nil, err
519 }
520 r.measurementCache.Put(cacheKey, data)
521 }
522 r.AddSysinfoToMeasurements(data, md)
523 l.WithField("cache", fromCache).WithField("rows", len(data)).Info("measurements fetched")
524 return &metrics.MeasurementEnvelope{
525 DBName: md.Name,
526 MetricName: cmp.Or(metric.StorageName, metricName),
527 Data: data,
528 CustomTags: md.CustomTags}, nil
529 }
530