1 package reaper
2
3 import (
4 "cmp"
5 "context"
6 "fmt"
7 "runtime"
8 "slices"
9 "strings"
10 "time"
11
12 "sync/atomic"
13
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/cmdopts"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
17 "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
18 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
19 )
20
21 const (
22 specialMetricChangeEvents = "change_events"
23 specialMetricServerLogEventCounts = "server_log_event_counts"
24 specialMetricInstanceUp = "instance_up"
25 )
26
27 var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
28
29 var hostLastKnownStatusInRecovery = make(map[string]bool)
30 var metricsConfig map[string]float64
31 var metricDefs = NewConcurrentMetricDefs()
32
33
34 type Reaper struct {
35 *cmdopts.Options
36 ready atomic.Bool
37 measurementCh chan metrics.MeasurementEnvelope
38 measurementCache *InstanceMetricCache
39 logger log.Logger
40 monitoredSources sources.SourceConns
41 prevLoopMonitoredDBs sources.SourceConns
42 cancelFuncs map[string]context.CancelFunc
43 }
44
45
46 func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
47 return &Reaper{
48 Options: opts,
49 measurementCh: make(chan metrics.MeasurementEnvelope, 256),
50 measurementCache: NewInstanceMetricCache(),
51 logger: log.GetLogger(ctx),
52 monitoredSources: make(sources.SourceConns, 0),
53 prevLoopMonitoredDBs: make(sources.SourceConns, 0),
54 cancelFuncs: make(map[string]context.CancelFunc),
55 }
56 }
57
58
59 func (r *Reaper) Ready() bool {
60 return r.ready.Load()
61 }
62
63 func (r *Reaper) PrintMemStats() {
64 var m runtime.MemStats
65 runtime.ReadMemStats(&m)
66
67 bToKb := func(b uint64) uint64 {
68 return b / 1024
69 }
70 r.logger.Debugf("Alloc: %d Kb, TotalAlloc: %d Kb, Sys: %d Kb, NumGC: %d, HeapAlloc: %d Kb, HeapSys: %d Kb",
71 bToKb(m.Alloc), bToKb(m.TotalAlloc), bToKb(m.Sys), m.NumGC, bToKb(m.HeapAlloc), bToKb(m.HeapSys))
72 }
73
74
75
76
77
78 func (r *Reaper) Reap(ctx context.Context) {
79 var err error
80 logger := r.logger
81
82 go r.WriteMeasurements(ctx)
83 go r.WriteMonitoredSources(ctx)
84
85 r.ready.Store(true)
86
87 for {
88 if r.Logging.LogLevel == "debug" {
89 r.PrintMemStats()
90 }
91 if err = r.LoadSources(); err != nil {
92 logger.WithError(err).Error("could not refresh active sources, using last valid cache")
93 }
94 if err = r.LoadMetrics(); err != nil {
95 logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
96 }
97
98
99 hostsToShutDownDueToRoleChange := make(map[string]bool)
100 for _, monitoredSource := range r.monitoredSources {
101 srcL := logger.WithField("source", monitoredSource.Name)
102 ctx = log.WithLogger(ctx, srcL)
103
104 if monitoredSource.Connect(ctx, r.Sources) != nil {
105 srcL.Warning("could not init connection, retrying on next iteration")
106 continue
107 }
108
109 if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
110 srcL.WithError(err).Error("could not start metric gathering")
111 continue
112 }
113 srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
114 if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
115 srcL.Info("not added to monitoring due to 'master only' property")
116 if monitoredSource.IsPostgresSource() {
117 srcL.Info("to be removed from monitoring due to 'master only' property and status change")
118 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
119 }
120 continue
121 }
122
123 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
124 metricsConfig = monitoredSource.MetricsStandby
125 } else {
126 metricsConfig = monitoredSource.Metrics
127 }
128
129 r.CreateSourceHelpers(ctx, srcL, monitoredSource)
130
131 if monitoredSource.IsPostgresSource() {
132 DBSizeMB := monitoredSource.ApproxDbSize / 1048576
133 if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
134 srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
135 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
136 continue
137 }
138
139 lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
140 if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
141 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
142 srcL.Warning("Switching metrics collection to standby config...")
143 metricsConfig = monitoredSource.MetricsStandby
144 } else if !monitoredSource.IsInRecovery {
145 srcL.Warning("Switching metrics collection to primary config...")
146 metricsConfig = monitoredSource.Metrics
147 }
148
149 }
150 }
151 hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
152
153 for metricName, interval := range metricsConfig {
154 metricDefExists := false
155 var mvp metrics.Metric
156
157 mvp, metricDefExists = metricDefs.GetMetricDef(metricName)
158
159 dbMetric := monitoredSource.Name + dbMetricJoinStr + metricName
160 _, cancelFuncExists := r.cancelFuncs[dbMetric]
161
162 if metricDefExists && !cancelFuncExists {
163 if interval > 0 {
164 srcL.WithField("metric", metricName).WithField("interval", interval).Info("starting gatherer")
165 metricCtx, cancelFunc := context.WithCancel(ctx)
166 r.cancelFuncs[dbMetric] = cancelFunc
167
168 metricNameForStorage := metricName
169 if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
170 metricNameForStorage = mvp.StorageName
171 }
172
173 if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, sinks.AddOp); err != nil {
174 srcL.Error(err)
175 }
176
177 go r.reapMetricMeasurements(metricCtx, monitoredSource, metricName)
178 }
179 } else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
180
181 if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
182 cancelFunc()
183 }
184 srcL.WithField("metric", metricName).Warning("shutting down gatherer...")
185 delete(r.cancelFuncs, dbMetric)
186 } else if !metricDefExists {
187 epoch, ok := lastSQLFetchError.Load(metricName)
188 if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) {
189 srcL.WithField("metric", metricName).Warning("metric definition not found")
190 lastSQLFetchError.Store(metricName, time.Now().Unix())
191 }
192 }
193 }
194 }
195
196 r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
197
198 r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
199 select {
200 case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
201 logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
202 case <-ctx.Done():
203 return
204 }
205 }
206 }
207
208
209 func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
210 if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
211 return
212 }
213 if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
214 return
215 }
216
217 if r.Sources.TryCreateListedExtsIfMissing > "" {
218 srcL.Info("trying to create extensions if missing")
219 extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
220 monitoredSource.RLock()
221 extsCreated := TryCreateMissingExtensions(ctx, monitoredSource, extsToCreate, monitoredSource.Extensions)
222 monitoredSource.RUnlock()
223 srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
224 }
225
226 if r.Sources.CreateHelpers {
227 srcL.Info("trying to create helper objects if missing")
228 if err := TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
229 srcL.WithError(err).Warning("failed to create helper functions")
230 }
231 }
232
233 }
234
235 func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRoleChange map[string]bool) {
236 logger := r.logger
237
238
239 logger.Debug("checking if any workers need to be shut down...")
240 for dbMetric, cancelFunc := range r.cancelFuncs {
241 var currentMetricConfig map[string]float64
242 var md *sources.SourceConn
243 var dbRemovedFromConfig bool
244 singleMetricDisabled := false
245 splits := strings.Split(dbMetric, dbMetricJoinStr)
246 db := splits[0]
247 metric := splits[1]
248
249 _, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
250 if !wholeDbShutDownDueToRoleChange {
251 md = r.monitoredSources.GetMonitoredDatabase(db)
252 if md == nil {
253 dbRemovedFromConfig = true
254 logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
255 }
256 }
257
258 if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) {
259 if md.IsInRecovery && len(md.MetricsStandby) > 0 {
260 currentMetricConfig = md.MetricsStandby
261 } else {
262 currentMetricConfig = md.Metrics
263 }
264 interval, isMetricActive := currentMetricConfig[metric]
265 singleMetricDisabled = !isMetricActive || interval <= 0
266 }
267
268 if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
269 logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
270 cancelFunc()
271 delete(r.cancelFuncs, dbMetric)
272 if err := r.SinksWriter.SyncMetric(db, metric, sinks.DeleteOp); err != nil {
273 logger.Error(err)
274 }
275 }
276 }
277
278
279 r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDownDueToRoleChange)
280 }
281
282 func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
283 var lastUptimeS int64 = -1
284 var lastErrorNotificationTime time.Time
285 var err error
286 var ok bool
287
288 failedFetches := 0
289 lastDBVersionFetchTime := time.Unix(0, 0)
290
291 l := log.GetLogger(ctx).WithField("metric", metricName)
292 ctx = log.WithLogger(ctx, l)
293
294 if metricName == specialMetricServerLogEventCounts {
295 metrics.ParseLogs(ctx, md, md.RealDbname, md.GetMetricInterval(metricName), r.measurementCh, "", "")
296 return
297 }
298
299 for {
300 interval := md.GetMetricInterval(metricName)
301 if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
302
303 if err = md.FetchRuntimeInfo(ctx, false); err != nil {
304 lastDBVersionFetchTime = time.Now()
305 }
306
307 if _, ok = metricDefs.GetMetricDef(metricName); !ok {
308 l.Error("Could not get metric version properties")
309 return
310 }
311 }
312
313 var metricStoreMessages *metrics.MeasurementEnvelope
314
315
316 if r.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
317 metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName)
318 if err != nil {
319 l.WithError(err).Errorf("Could not read metric directly from OS")
320 }
321 }
322 t1 := time.Now()
323 if metricStoreMessages == nil {
324 metricStoreMessages, err = r.FetchMetric(ctx, md, metricName)
325 }
326
327 if time.Since(t1) > (time.Second * time.Duration(interval)) {
328 l.Warningf("Total fetching time of %v bigger than %vs interval", time.Since(t1), interval)
329 }
330
331 if err != nil {
332 failedFetches++
333
334 if time.Since(lastErrorNotificationTime) > time.Minute*10 {
335 l.WithError(err).WithField("count", failedFetches).Error("failed to fetch metric data")
336 lastErrorNotificationTime = time.Now()
337 }
338 } else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
339 r.measurementCh <- *metricStoreMessages
340
341 if metricName == "db_stats" {
342 postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
343 if ok {
344 if lastUptimeS != -1 {
345 if postmasterUptimeS.(int64) < lastUptimeS {
346 message := "Detected server restart (or failover)"
347 l.Warning(message)
348 detectedChangesSummary := make(metrics.Measurements, 0)
349 entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
350 entry["details"] = message
351 detectedChangesSummary = append(detectedChangesSummary, entry)
352 r.measurementCh <- metrics.MeasurementEnvelope{
353 DBName: md.Name,
354 MetricName: "object_changes",
355 Data: detectedChangesSummary,
356 CustomTags: metricStoreMessages.CustomTags,
357 }
358 }
359 }
360 lastUptimeS = postmasterUptimeS.(int64)
361 }
362 }
363 }
364
365 select {
366 case <-ctx.Done():
367 return
368 case <-time.After(time.Second * time.Duration(interval)):
369
370 }
371 }
372 }
373
374
375 func (r *Reaper) LoadSources() (err error) {
376 if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
377 r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
378 r.monitoredSources = make(sources.SourceConns, 0)
379 return nil
380 }
381
382 var newSrcs sources.SourceConns
383 srcs, err := r.SourcesReaderWriter.GetSources()
384 if err != nil {
385 return err
386 }
387 srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
388 return !s.IsEnabled || len(r.Sources.Groups) > 0 && !s.IsDefaultGroup() && !slices.Contains(r.Sources.Groups, s.Group)
389 })
390 if newSrcs, err = srcs.ResolveDatabases(); err != nil {
391 r.logger.WithError(err).Error("could not resolve databases from sources")
392 }
393
394 for i, newMD := range newSrcs {
395 md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
396 if md == nil {
397 continue
398 }
399 if md.Equal(newMD.Source) {
400
401 newSrcs[i] = md
402 continue
403 }
404 }
405 r.monitoredSources = newSrcs
406 r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
407 return nil
408 }
409
410
411
412 func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
413 for {
414 if len(r.monitoredSources) > 0 {
415 now := time.Now().UnixNano()
416 for _, mdb := range r.monitoredSources {
417 db := metrics.NewMeasurement(now)
418 db["tag_group"] = mdb.Group
419 db["master_only"] = mdb.OnlyIfMaster
420 for k, v := range mdb.CustomTags {
421 db[metrics.TagPrefix+k] = v
422 }
423 r.measurementCh <- metrics.MeasurementEnvelope{
424 DBName: mdb.Name,
425 MetricName: monitoredDbsDatastoreSyncMetricName,
426 Data: metrics.Measurements{db},
427 }
428 }
429 }
430 select {
431 case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
432
433 case <-ctx.Done():
434 return
435 }
436 }
437 }
438
439
440 func (r *Reaper) WriteMeasurements(ctx context.Context) {
441 var err error
442 for {
443 select {
444 case <-ctx.Done():
445 return
446 case msg := <-r.measurementCh:
447 if err = r.SinksWriter.Write(msg); err != nil {
448 r.logger.Error(err)
449 }
450 }
451 }
452 }
453
454 func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
455 for _, dr := range data {
456 if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
457 dr[r.Sinks.RealDbnameField] = md.RealDbname
458 }
459 if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
460 dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
461 }
462 }
463 }
464
465 func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string) (_ *metrics.MeasurementEnvelope, err error) {
466 var sql string
467 var data metrics.Measurements
468 var metric metrics.Metric
469 var fromCache bool
470 var cacheKey string
471 var ok bool
472
473 if metric, ok = metricDefs.GetMetricDef(metricName); !ok {
474 return nil, metrics.ErrMetricNotFound
475 }
476 l := log.GetLogger(ctx)
477
478 if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && time.Second*time.Duration(md.GetMetricInterval(metricName)) < r.Metrics.CacheAge() {
479 cacheKey = fmt.Sprintf("%s:%s", md.GetClusterIdentifier(), metricName)
480 }
481 data = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge())
482 fromCache = len(data) > 0
483 if !fromCache {
484 if (metric.PrimaryOnly() && md.IsInRecovery) || (metric.StandbyOnly() && !md.IsInRecovery) {
485 l.Debug("Skipping fetching of as server in wrong IsInRecovery: ", md.IsInRecovery)
486 return nil, nil
487 }
488 switch metricName {
489 case specialMetricChangeEvents:
490 data, err = r.GetObjectChangesMeasurement(ctx, md)
491 case specialMetricInstanceUp:
492 data, err = r.GetInstanceUpMeasurement(ctx, md)
493 default:
494 sql = metric.GetSQL(md.Version)
495 if sql == "" {
496 l.Warning("Ignoring fetching because of empty SQL")
497 return nil, nil
498 }
499 data, err = QueryMeasurements(ctx, md, sql)
500 }
501 if err != nil || len(data) == 0 {
502 return nil, err
503 }
504 r.measurementCache.Put(cacheKey, data)
505 }
506 r.AddSysinfoToMeasurements(data, md)
507 l.WithField("cache", fromCache).WithField("rows", len(data)).Info("measurements fetched")
508 return &metrics.MeasurementEnvelope{
509 DBName: md.Name,
510 MetricName: cmp.Or(metric.StorageName, metricName),
511 Data: data,
512 CustomTags: md.CustomTags}, nil
513 }
514