1 package reaper
2
3 import (
4 "context"
5 "runtime"
6 "slices"
7 "strings"
8 "time"
9
10 "sync/atomic"
11
12 "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
13 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
14 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
15 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
16 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
17 )
18
19 const (
20 specialMetricChangeEvents = "change_events"
21 specialMetricServerLogEventCounts = "server_log_event_counts"
22 specialMetricInstanceUp = "instance_up"
23 )
24
25 var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
26
27 var hostLastKnownStatusInRecovery = make(map[string]bool)
28 var metricsConfig metrics.MetricIntervals
29 var metricDefs = NewConcurrentMetricDefs()
30
31
32 type Reaper struct {
33 *cmdopts.Options
34 ready atomic.Bool
35 measurementCh chan metrics.MeasurementEnvelope
36 measurementCache *InstanceMetricCache
37 logger log.Logger
38 monitoredSources sources.SourceConns
39 prevLoopMonitoredDBs sources.SourceConns
40 cancelFuncs map[string]context.CancelFunc
41 sourceReapers map[string]*SourceReaper
42 }
43
44
45 func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
46 return &Reaper{
47 Options: opts,
48 measurementCh: make(chan metrics.MeasurementEnvelope, 256),
49 measurementCache: NewInstanceMetricCache(),
50 logger: log.GetLogger(ctx),
51 monitoredSources: make(sources.SourceConns, 0),
52 prevLoopMonitoredDBs: make(sources.SourceConns, 0),
53 cancelFuncs: make(map[string]context.CancelFunc),
54 sourceReapers: make(map[string]*SourceReaper),
55 }
56 }
57
58
59 func (r *Reaper) Ready() bool {
60 return r.ready.Load()
61 }
62
63 func (r *Reaper) PrintMemStats() {
64 var m runtime.MemStats
65 runtime.ReadMemStats(&m)
66
67 bToKb := func(b uint64) uint64 {
68 return b / 1024
69 }
70 r.logger.Debugf("Alloc: %d Kb, TotalAlloc: %d Kb, Sys: %d Kb, NumGC: %d, HeapAlloc: %d Kb, HeapSys: %d Kb",
71 bToKb(m.Alloc), bToKb(m.TotalAlloc), bToKb(m.Sys), m.NumGC, bToKb(m.HeapAlloc), bToKb(m.HeapSys))
72 }
73
74
75
76
77
78 func (r *Reaper) Reap(ctx context.Context) {
79 var err error
80 logger := r.logger
81
82 go r.WriteMeasurements(ctx)
83
84 r.ready.Store(true)
85
86 for {
87 if r.Logging.LogLevel == "debug" {
88 r.PrintMemStats()
89 }
90 if err = r.LoadSources(ctx); err != nil {
91 logger.WithError(err).Error("could not refresh active sources, using last valid cache")
92 }
93 if err = r.LoadMetrics(); err != nil {
94 logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
95 }
96
97
98 hostsToShutDownDueToRoleChange := make(map[string]bool)
99 for _, monitoredSource := range r.monitoredSources {
100 srcL := logger.WithField("source", monitoredSource.Name)
101 ctx = log.WithLogger(ctx, srcL)
102
103 if monitoredSource.Connect(ctx, r.Sources) != nil {
104 r.WriteInstanceDown(monitoredSource.Name)
105 srcL.Warning("could not init connection, retrying on next iteration")
106 continue
107 }
108
109 if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
110 srcL.WithError(err).Error("could not start metric gathering")
111 continue
112 }
113 srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
114 if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
115 srcL.Info("not added to monitoring due to 'master only' property")
116 if monitoredSource.IsPostgresSource() {
117 srcL.Info("to be removed from monitoring due to 'master only' property and status change")
118 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
119 }
120 continue
121 }
122
123 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
124 metricsConfig = monitoredSource.MetricsStandby
125 } else {
126 metricsConfig = monitoredSource.Metrics
127 }
128
129 r.CreateSourceHelpers(ctx, srcL, monitoredSource)
130
131 if monitoredSource.IsPostgresSource() {
132 DBSizeMB := monitoredSource.ApproxDbSize / 1048576
133 if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
134 srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
135 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
136 continue
137 }
138
139 lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
140 if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
141 if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
142 srcL.Warning("Switching metrics collection to standby config...")
143 metricsConfig = monitoredSource.MetricsStandby
144 } else if !monitoredSource.IsInRecovery {
145 srcL.Warning("Switching metrics collection to primary config...")
146 metricsConfig = monitoredSource.Metrics
147 }
148
149 }
150 }
151 hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
152
153
154 for metricName := range metricsConfig {
155 mvp, metricDefExists := metricDefs.GetMetricDef(metricName)
156 if !metricDefExists {
157 epoch, ok := lastSQLFetchError.Load(metricName)
158 if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) {
159 srcL.WithField("metric", metricName).Warning("metric definition not found")
160 lastSQLFetchError.Store(metricName, time.Now().Unix())
161 }
162 continue
163 }
164 metricNameForStorage := metricName
165 if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
166 metricNameForStorage = mvp.StorageName
167 }
168 if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, sinks.AddOp); err != nil {
169 srcL.Error(err)
170 }
171 }
172
173
174 if _, exists := r.sourceReapers[monitoredSource.Name]; !exists {
175 srcL.Info("starting source reaper")
176 sr := NewSourceReaper(r, monitoredSource)
177 sourceCtx, cancelFunc := context.WithCancel(ctx)
178 r.cancelFuncs[monitoredSource.Name] = cancelFunc
179 r.sourceReapers[monitoredSource.Name] = sr
180 go sr.Run(sourceCtx)
181 }
182 }
183
184 r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
185
186 r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
187 select {
188 case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
189 logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
190 case <-ctx.Done():
191 return
192 }
193 }
194 }
195
196
197 func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
198 if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
199 return
200 }
201 if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
202 return
203 }
204
205 if r.Sources.TryCreateListedExtsIfMissing > "" {
206 srcL.Info("trying to create extensions if missing")
207 extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
208 extsCreated, err := monitoredSource.TryCreateMissingExtensions(ctx, extsToCreate)
209 if err != nil {
210 srcL.Warning(err)
211 }
212 if extsCreated != "" {
213 srcL.Infof("%d/%d extensions created: %s", len(extsCreated), len(extsToCreate), extsCreated)
214 }
215 }
216
217 if r.Sources.CreateHelpers {
218 srcL.Info("trying to create helper objects if missing")
219 if err := monitoredSource.TryCreateMetricsHelpers(ctx, func(metric string) string {
220 if m, ok := metricDefs.GetMetricDef(metric); ok {
221 return m.InitSQL
222 }
223 return ""
224 }); err != nil {
225 srcL.Warning(err)
226 }
227 }
228
229 }
230
231 func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool) {
232 logger := r.logger
233
234
235 logger.Debug("checking if any workers need to be shut down...")
236 for sourceName, cancelFunc := range r.cancelFuncs {
237 var dbRemovedFromConfig bool
238
239 _, wholeDbShutDown := hostsToShutDown[sourceName]
240 if !wholeDbShutDown {
241 md := r.monitoredSources.GetMonitoredDatabase(sourceName)
242 if md == nil {
243 dbRemovedFromConfig = true
244 logger.Debugf("DB %s removed from config, shutting down source reaper...", sourceName)
245 }
246 }
247
248 if ctx.Err() != nil || wholeDbShutDown || dbRemovedFromConfig {
249 logger.WithField("source", sourceName).Info("stopping source reaper...")
250 cancelFunc()
251 delete(r.cancelFuncs, sourceName)
252 delete(r.sourceReapers, sourceName)
253 if err := r.SinksWriter.SyncMetric(sourceName, "", sinks.DeleteOp); err != nil {
254 logger.Error(err)
255 }
256 }
257 }
258
259
260 r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDown)
261 }
262
263
264 func (r *Reaper) LoadSources(ctx context.Context) (err error) {
265 if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
266 r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
267 r.monitoredSources = make(sources.SourceConns, 0)
268 return nil
269 }
270
271 var newSrcs sources.SourceConns
272 srcs, err := r.SourcesReaderWriter.GetSources()
273 if err != nil {
274 return err
275 }
276 srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
277
278 return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group)
279 })
280
281 if newSrcs, err = srcs.ResolveDatabases(r.WriteInstanceDown); err != nil {
282
283 r.logger.WithError(err).Error("could not resolve databases from sources")
284 }
285
286 for i, newMD := range newSrcs {
287 md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
288 if md == nil {
289 continue
290 }
291 if md.Equal(newMD.Source) {
292
293 newSrcs[i] = md
294 continue
295 }
296
297
298 r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...")
299 r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true})
300 }
301 r.monitoredSources = newSrcs
302 r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
303 return nil
304 }
305
306
307 func (r *Reaper) WriteInstanceDown(name string) {
308 r.measurementCh <- metrics.MeasurementEnvelope{
309 DBName: name,
310 MetricName: specialMetricInstanceUp,
311 Data: metrics.Measurements{metrics.Measurement{
312 metrics.EpochColumnName: time.Now().UnixNano(),
313 specialMetricInstanceUp: 0},
314 },
315 }
316 }
317
318
319 func (r *Reaper) GetMeasurementCache(key string) metrics.Measurements {
320 return r.measurementCache.Get(key, r.Metrics.CacheAge())
321 }
322
323
324 func (r *Reaper) WriteMeasurements(ctx context.Context) {
325 var err error
326 for {
327 select {
328 case <-ctx.Done():
329 return
330 case msg := <-r.measurementCh:
331 if err = r.SinksWriter.Write(msg); err != nil {
332 r.logger.Error(err)
333 }
334 }
335 }
336 }
337
338 func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
339 for _, dr := range data {
340 if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
341 dr[r.Sinks.RealDbnameField] = md.RealDbname
342 }
343 if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
344 dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
345 }
346 }
347 }
348