1 package reaper
2
3 import (
4 "cmp"
5 "context"
6 "fmt"
7 "slices"
8 "strings"
9 "time"
10
11 "sync/atomic"
12
13 "github.com/cybertec-postgresql/pgwatch/v3/internal/cmdopts"
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
17 )
18
19 var hostLastKnownStatusInRecovery = make(map[string]bool)
20 var metricsConfig map[string]float64
21 var metricDefs = NewConcurrentMetricDefs()
22
23
24 type Reaper struct {
25 *cmdopts.Options
26 ready atomic.Bool
27 measurementCh chan []metrics.MeasurementEnvelope
28 measurementCache *InstanceMetricCache
29 logger log.Logger
30 monitoredSources sources.SourceConns
31 prevLoopMonitoredDBs sources.SourceConns
32 cancelFuncs map[string]context.CancelFunc
33 }
34
35
36 func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
37 return &Reaper{
38 Options: opts,
39 measurementCh: make(chan []metrics.MeasurementEnvelope, 10000),
40 measurementCache: NewInstanceMetricCache(),
41 logger: log.GetLogger(ctx),
42 monitoredSources: make(sources.SourceConns, 0),
43 prevLoopMonitoredDBs: make(sources.SourceConns, 0),
44 cancelFuncs: make(map[string]context.CancelFunc),
45 }
46 }
47
48
49 func (r *Reaper) Ready() bool {
50 return r.ready.Load()
51 }
52
53
54
55
56
57 func (r *Reaper) Reap(ctx context.Context) {
58 var err error
59 logger := r.logger
60
61 go r.WriteMeasurements(ctx)
62 go r.WriteMonitoredSources(ctx)
63
64 r.ready.Store(true)
65
66 for {
67 if err = r.LoadSources(); err != nil {
68 logger.WithError(err).Error("could not refresh active sources, using last valid cache")
69 }
70 if err = r.LoadMetrics(); err != nil {
71 logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
72 }
73
74 UpdateMonitoredDBCache(r.monitoredSources)
75 hostsToShutDownDueToRoleChange := make(map[string]bool)
76 for _, monitoredSource := range r.monitoredSources {
77 srcL := logger.WithField("source", monitoredSource.Name)
78
79 if monitoredSource.Connect(ctx, r.Sources) != nil {
80 srcL.WithError(err).Warning("could not init connection, retrying on next iteration")
81 continue
82 }
83
84 if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
85 srcL.WithError(err).Error("could not start metric gathering")
86 continue
87 }
88 srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
89 if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
90 srcL.Info("not added to monitoring due to 'master only' property")
91 continue
92 }
93
94 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
95 metricsConfig = monitoredSource.MetricsStandby
96 } else {
97 metricsConfig = monitoredSource.Metrics
98 }
99 hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
100
101 r.CreateSourceHelpers(ctx, srcL, monitoredSource)
102
103 if monitoredSource.IsPostgresSource() {
104 if r.Sources.MinDbSizeMB >= 8 {
105 DBSizeMB := monitoredSource.ApproxDbSize / 1048576
106 if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
107 srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
108 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
109 continue
110 }
111 }
112
113 lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
114 if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
115 srcL.Info("to be removed from monitoring due to 'master only' property and status change")
116 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
117 continue
118 } else if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
119 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
120 srcL.Warning("Switching metrics collection to standby config...")
121 metricsConfig = monitoredSource.MetricsStandby
122 hostLastKnownStatusInRecovery[monitoredSource.Name] = true
123 } else {
124 srcL.Warning("Switching metrics collection to primary config...")
125 metricsConfig = monitoredSource.Metrics
126 hostLastKnownStatusInRecovery[monitoredSource.Name] = false
127 }
128 }
129
130 }
131
132 for metricName, interval := range metricsConfig {
133 metric := metricName
134 metricDefExists := false
135 var mvp metrics.Metric
136
137 if strings.HasPrefix(metric, recoPrefix) {
138 metric = recoMetricName
139 metricDefExists = true
140 } else {
141 mvp, metricDefExists = metricDefs.GetMetricDef(metric)
142 }
143
144 dbMetric := monitoredSource.Name + dbMetricJoinStr + metric
145 _, cancelFuncExists := r.cancelFuncs[dbMetric]
146
147 if metricDefExists && !cancelFuncExists {
148 if interval > 0 {
149 srcL.WithField("metric", metric).WithField("interval", interval).Info("starting gatherer")
150 metricCtx, cancelFunc := context.WithCancel(ctx)
151 r.cancelFuncs[dbMetric] = cancelFunc
152
153 metricNameForStorage := metricName
154 if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
155 metricNameForStorage = mvp.StorageName
156 }
157
158 if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, "add"); err != nil {
159 srcL.Error(err)
160 }
161
162 go r.reapMetricMeasurements(metricCtx, monitoredSource, metric)
163 }
164 } else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
165
166 if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
167 cancelFunc()
168 }
169 srcL.WithField("metric", metric).Warning("shutting down gatherer...")
170 delete(r.cancelFuncs, dbMetric)
171 } else if !metricDefExists {
172 epoch, ok := lastSQLFetchError.Load(metric)
173 if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) {
174 srcL.WithField("metric", metric).Warning("metric definition not found")
175 lastSQLFetchError.Store(metric, time.Now().Unix())
176 }
177 }
178 }
179 }
180
181 r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
182
183 r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
184 select {
185 case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
186 logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
187 case <-ctx.Done():
188 return
189 }
190 }
191 }
192
193
194 func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
195 if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
196 return
197 }
198 if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
199 return
200 }
201
202 if r.Sources.TryCreateListedExtsIfMissing > "" {
203 srcL.Info("trying to create extensions if missing")
204 extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
205 extsCreated := TryCreateMissingExtensions(ctx, monitoredSource.Name, extsToCreate, monitoredSource.Extensions)
206 srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
207 }
208
209 if r.Sources.CreateHelpers {
210 srcL.Info("trying to create helper objects if missing")
211 if err := TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
212 srcL.WithError(err).Warning("failed to create helper functions")
213 }
214 }
215
216 }
217
218 func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRoleChange map[string]bool) {
219 logger := r.logger
220
221
222 logger.Debug("checking if any workers need to be shut down...")
223 for dbMetric, cancelFunc := range r.cancelFuncs {
224 var currentMetricConfig map[string]float64
225 var md *sources.SourceConn
226 var ok, dbRemovedFromConfig bool
227 singleMetricDisabled := false
228 splits := strings.Split(dbMetric, dbMetricJoinStr)
229 db := splits[0]
230 metric := splits[1]
231
232 _, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
233 if !wholeDbShutDownDueToRoleChange {
234 monitoredDbCacheLock.RLock()
235 md, ok = monitoredDbCache[db]
236 monitoredDbCacheLock.RUnlock()
237 if !ok {
238 dbRemovedFromConfig = true
239 logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
240 }
241 }
242
243 if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) {
244 if md.IsInRecovery && len(md.MetricsStandby) > 0 {
245 currentMetricConfig = md.MetricsStandby
246 } else {
247 currentMetricConfig = md.Metrics
248 }
249 interval, isMetricActive := currentMetricConfig[metric]
250 singleMetricDisabled = !isMetricActive || interval <= 0
251 }
252
253 if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
254 logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
255 cancelFunc()
256 delete(r.cancelFuncs, dbMetric)
257 if err := r.SinksWriter.SyncMetric(db, metric, "remove"); err != nil {
258 logger.Error(err)
259 }
260 }
261 }
262
263
264 r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDownDueToRoleChange)
265 }
266
267
268 func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
269 hostState := make(map[string]map[string]string)
270 var lastUptimeS int64 = -1
271 var lastErrorNotificationTime time.Time
272 var err error
273 var ok bool
274 var envelopes []metrics.MeasurementEnvelope
275
276 failedFetches := 0
277 lastDBVersionFetchTime := time.Unix(0, 0)
278
279 l := r.logger.WithField("source", md.Name).WithField("metric", metricName)
280 if metricName == specialMetricServerLogEventCounts {
281 metrics.ParseLogs(ctx, md, md.RealDbname, md.GetMetricInterval(metricName), r.measurementCh)
282 return
283 }
284
285 for {
286 interval := md.GetMetricInterval(metricName)
287 if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
288
289 if err = md.FetchRuntimeInfo(ctx, false); err != nil {
290 lastDBVersionFetchTime = time.Now()
291 }
292
293 if _, ok = metricDefs.GetMetricDef(metricName); !ok {
294 l.Error("Could not get metric version properties")
295 return
296 }
297 }
298
299 var metricStoreMessages *metrics.MeasurementEnvelope
300
301
302 if r.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
303 metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName)
304 if err != nil {
305 l.WithError(err).Errorf("Could not reader metric directly from OS")
306 }
307 }
308 t1 := time.Now()
309 if metricStoreMessages == nil {
310 metricStoreMessages, err = r.FetchMetric(ctx, md, metricName, hostState)
311 }
312
313 if time.Since(t1) > (time.Second * time.Duration(interval)) {
314 l.Warningf("Total fetching time of %v bigger than %vs interval", time.Since(t1), interval)
315 }
316
317 if err != nil {
318 failedFetches++
319
320 if time.Since(lastErrorNotificationTime) > time.Minute*10 {
321 l.WithError(err).WithField("count", failedFetches).Error("failed to fetch metric data")
322 lastErrorNotificationTime = time.Now()
323 }
324 } else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
325 envelopes = []metrics.MeasurementEnvelope{*metricStoreMessages}
326
327 if metricName == "db_stats" {
328 postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
329 if ok {
330 if lastUptimeS != -1 {
331 if postmasterUptimeS.(int64) < lastUptimeS {
332 message := "Detected server restart (or failover)"
333 l.Warning(message)
334 detectedChangesSummary := make(metrics.Measurements, 0)
335 entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
336 entry["details"] = message
337 detectedChangesSummary = append(detectedChangesSummary, entry)
338 envelopes = append(envelopes,
339 metrics.MeasurementEnvelope{
340 DBName: md.Name,
341 SourceType: string(md.Kind),
342 MetricName: "object_changes",
343 Data: detectedChangesSummary,
344 CustomTags: metricStoreMessages.CustomTags,
345 })
346 }
347 }
348 lastUptimeS = postmasterUptimeS.(int64)
349 }
350 }
351 r.measurementCh <- envelopes
352 }
353
354 select {
355 case <-ctx.Done():
356 return
357 case <-time.After(time.Second * time.Duration(interval)):
358
359 }
360 }
361 }
362
363
364 func (r *Reaper) LoadSources() (err error) {
365 if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
366 r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
367 r.monitoredSources = make([]*sources.SourceConn, 0)
368 return nil
369 }
370 if r.monitoredSources, err = r.monitoredSources.SyncFromReader(r.SourcesReaderWriter); err != nil {
371 return err
372 }
373 r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
374 return nil
375 }
376
377
378
379 func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
380 for {
381 if len(monitoredDbCache) > 0 {
382 msms := make([]metrics.MeasurementEnvelope, len(monitoredDbCache))
383 now := time.Now().UnixNano()
384
385 monitoredDbCacheLock.RLock()
386 for _, mdb := range monitoredDbCache {
387 db := metrics.NewMeasurement(now)
388 db["tag_group"] = mdb.Group
389 db["master_only"] = mdb.OnlyIfMaster
390 for k, v := range mdb.CustomTags {
391 db[metrics.TagPrefix+k] = v
392 }
393 msms = append(msms, metrics.MeasurementEnvelope{
394 DBName: mdb.Name,
395 MetricName: monitoredDbsDatastoreSyncMetricName,
396 Data: metrics.Measurements{db},
397 })
398 }
399 monitoredDbCacheLock.RUnlock()
400 r.measurementCh <- msms
401 }
402 select {
403 case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
404
405 case <-ctx.Done():
406 return
407 }
408 }
409 }
410
411
412 func (r *Reaper) WriteMeasurements(ctx context.Context) {
413 var err error
414 for {
415 select {
416 case <-ctx.Done():
417 return
418 case msg := <-r.measurementCh:
419 if err = r.SinksWriter.Write(msg); err != nil {
420 r.logger.Error(err)
421 }
422 }
423 }
424 }
425
426 func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
427 for _, dr := range data {
428 if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
429 dr[r.Sinks.RealDbnameField] = md.RealDbname
430 }
431 if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
432 dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
433 }
434 }
435 }
436
437 func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string, hostState map[string]map[string]string) (_ *metrics.MeasurementEnvelope, err error) {
438 var sql string
439 var data metrics.Measurements
440 var metric metrics.Metric
441 var fromCache bool
442 var cacheKey string
443 var ok bool
444
445 if metric, ok = metricDefs.GetMetricDef(metricName); !ok {
446 return nil, metrics.ErrMetricNotFound
447 }
448 l := r.logger.WithField("source", md.Name).WithField("metric", metricName)
449
450 if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && time.Second*time.Duration(md.GetMetricInterval(metricName)) < r.Metrics.CacheAge() {
451 cacheKey = fmt.Sprintf("%s:%s", md.GetClusterIdentifier(), metricName)
452 }
453 if data = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge()); len(data) > 0 {
454 fromCache = true
455 goto send_to_storageChannel
456 }
457
458 sql = metric.GetSQL(md.Version)
459 if sql == "" && !(metricName == specialMetricChangeEvents || metricName == recoMetricName) {
460 l.Warning("Ignoring fetching because of empty SQL")
461 return nil, nil
462 }
463
464 if (metric.PrimaryOnly() && md.IsInRecovery) || (metric.StandbyOnly() && !md.IsInRecovery) {
465 l.Debug("Skipping fetching of as server in wrong IsInRecovery: ", md.IsInRecovery)
466 return nil, nil
467 }
468
469 switch metricName {
470 case specialMetricChangeEvents:
471 r.CheckForPGObjectChangesAndStore(ctx, md.Name, md, hostState)
472 return nil, nil
473 case recoMetricName:
474 if data, err = GetRecommendations(ctx, md.Name, md); err != nil {
475 return nil, err
476 }
477 default:
478 if data, err = QueryMeasurements(ctx, md.Name, sql); err != nil {
479
480 if strings.Contains(err.Error(), "recovery is in progress") && md.IsInRecovery {
481 l.Debugf("[%s:%s] failed to fetch metrics: %s", md.Name, metricName, err)
482 return nil, err
483 }
484 if metricName == specialMetricInstanceUp {
485 l.WithError(err).Debugf("[%s:%s] failed to fetch metrics. marking instance as not up", md.Name, metricName)
486 data = make(metrics.Measurements, 1)
487 data[0] = metrics.NewMeasurement(time.Now().UnixNano())
488 data[0]["is_up"] = 0
489 goto send_to_storageChannel
490 }
491 l.
492 WithFields(map[string]any{"source": md.Name, "metric": metricName}).
493 WithError(err).Error("failed to fetch metrics")
494
495 return nil, err
496 }
497 }
498 r.measurementCache.Put(cacheKey, data)
499
500 send_to_storageChannel:
501 r.AddSysinfoToMeasurements(data, md)
502 l.WithField("cache", fromCache).WithField("rows", len(data)).Info("measurements fetched")
503 return &metrics.MeasurementEnvelope{
504 DBName: md.Name,
505 MetricName: cmp.Or(metric.StorageName, metricName),
506 Data: data,
507 CustomTags: md.CustomTags,
508 MetricDef: metric,
509 RealDbname: md.RealDbname,
510 SystemIdentifier: md.SystemIdentifier}, nil
511 }
512