1 package reaper
2
3 import (
4 "cmp"
5 "context"
6 "errors"
7 "fmt"
8 "time"
9
10 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
11 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
12 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
13 "github.com/jackc/pgx/v5"
14 )
15
16 const minTickInterval = 1
17
18
19
20
21
22 type SourceReaper struct {
23 reaper *Reaper
24 md *sources.SourceConn
25 lastFetch map[string]time.Time
26 lastUptimeS int64
27 degradedMetrics map[string]struct{}
28 }
29
30
31 func NewSourceReaper(r *Reaper, md *sources.SourceConn) *SourceReaper {
32 sr := &SourceReaper{
33 reaper: r,
34 md: md,
35 lastFetch: make(map[string]time.Time),
36 degradedMetrics: make(map[string]struct{}),
37 }
38 return sr
39 }
40
41
42
43
44 func (sr *SourceReaper) activeMetrics() map[string]time.Duration {
45 sr.md.RLock()
46 defer sr.md.RUnlock()
47 am := sr.md.Metrics
48 if sr.md.IsInRecovery && len(sr.md.MetricsStandby) > 0 {
49 am = sr.md.MetricsStandby
50 }
51 c := make(map[string]time.Duration, len(am))
52 for k, v := range am {
53 c[k] = time.Duration(v) * time.Second
54 }
55 return c
56 }
57
58
59 func GCDSlice(vals []int) int {
60 if len(vals) == 0 {
61 return 0
62 }
63 g := vals[0]
64 for _, v := range vals[1:] {
65 for v != 0 {
66 g, v = v, g%v
67 }
68 }
69 return g
70 }
71
72
73 func (sr *SourceReaper) calcTickInterval() time.Duration {
74 am := sr.activeMetrics()
75 intervals := make([]int, 0, len(am))
76 for _, d := range am {
77 intervals = append(intervals, max(int(d.Seconds()), minTickInterval))
78 }
79 return time.Duration(max(GCDSlice(intervals), minTickInterval)) * time.Second
80 }
81
82
83 func (sr *SourceReaper) cacheKey(m metrics.Metric, name string) string {
84 age := sr.reaper.Metrics.CacheAge()
85 if m.IsInstanceLevel && age > 0 && sr.md.GetMetricInterval(name) < age {
86 return fmt.Sprintf("%s:%s", sr.md.GetClusterIdentifier(), name)
87 }
88 return ""
89 }
90
91
92
93 func (sr *SourceReaper) isRoleExcluded(m metrics.Metric) bool {
94 sr.md.RLock()
95 defer sr.md.RUnlock()
96 return (m.PrimaryOnly() && sr.md.IsInRecovery) || (m.StandbyOnly() && !sr.md.IsInRecovery)
97 }
98
99
100
101 func (sr *SourceReaper) sendEnvelope(ctx context.Context, name, storageName string, data metrics.Measurements) {
102 log.GetLogger(ctx).WithField("metric", name).WithField("rows", len(data)).Info("measurements fetched")
103 sr.reaper.AddSysinfoToMeasurements(data, sr.md)
104 sr.reaper.measurementCh <- metrics.MeasurementEnvelope{
105 DBName: sr.md.Name,
106 MetricName: cmp.Or(storageName, name),
107 Data: data,
108 CustomTags: sr.md.CustomTags,
109 }
110 }
111
112
113
114 func (sr *SourceReaper) dispatchMetricData(ctx context.Context, name string, metric metrics.Metric, data metrics.Measurements) {
115 if key := sr.cacheKey(metric, name); key != "" {
116 sr.reaper.measurementCache.Put(key, data)
117 }
118 sr.sendEnvelope(ctx, name, metric.StorageName, data)
119 if name == "db_stats" {
120 sr.detectServerRestart(ctx, data)
121 }
122 }
123
124
125 type batchEntry struct {
126 name string
127 metric metrics.Metric
128 sql string
129 }
130
131
132
133 func (sr *SourceReaper) Run(ctx context.Context) {
134 l := log.GetLogger(ctx).WithField("source", sr.md.Name)
135 ctx = log.WithLogger(ctx, l)
136 var err error
137 for {
138 if err = sr.md.FetchRuntimeInfo(ctx, false); err != nil {
139 l.WithError(err).Warning("could not refresh runtime info")
140 }
141
142 now := time.Now()
143 var batch []batchEntry
144
145 for name, interval := range sr.activeMetrics() {
146 if interval <= 0 {
147 continue
148 }
149 if lf := sr.lastFetch[name]; !lf.IsZero() && now.Sub(lf) < interval {
150 continue
151 }
152
153 metric, ok := metricDefs.GetMetricDef(name)
154 if !ok || sr.isRoleExcluded(metric) {
155 continue
156 }
157
158 switch {
159 case name == specialMetricServerLogEventCounts:
160 if sr.lastFetch[name].IsZero() {
161 go func() {
162 if e := sr.runLogParser(ctx); e != nil {
163 l.WithError(e).Error("log parser error")
164 }
165 }()
166 }
167 case IsDirectlyFetchableMetric(sr.md, name):
168 err = sr.fetchOSMetric(ctx, name)
169 sr.lastFetch[name] = time.Now()
170 case name == specialMetricChangeEvents || name == specialMetricInstanceUp:
171 err = sr.fetchSpecialMetric(ctx, name, metric.StorageName)
172 sr.lastFetch[name] = time.Now()
173 default:
174 if cached := sr.reaper.GetMeasurementCache(sr.cacheKey(metric, name)); len(cached) > 0 {
175 l.WithField("metric", name).Info("instance level cache hit")
176 sr.sendEnvelope(ctx, name, metric.StorageName, cached)
177 sr.lastFetch[name] = time.Now()
178 break
179 }
180 sr.md.RLock()
181 version := sr.md.Version
182 sr.md.RUnlock()
183 sql := metric.GetSQL(version)
184 if sql == "" {
185 l.WithField("metric", name).WithField("version", version).Warning("no SQL found for metric version")
186 sr.lastFetch[name] = time.Now()
187 break
188 }
189 if _, degraded := sr.degradedMetrics[name]; degraded {
190 if err = sr.fetchMetric(ctx, batchEntry{name: name, metric: metric, sql: sql}); err != nil {
191 l.WithError(err).WithField("metric", name).Error("degraded metric fetch failed")
192 } else {
193 l.WithField("metric", name).Info("degraded metric recovered, returning to batch execution")
194 delete(sr.degradedMetrics, name)
195 }
196 sr.lastFetch[name] = time.Now()
197 break
198 }
199 batch = append(batch, batchEntry{name: name, metric: metric, sql: sql})
200 }
201 if err != nil {
202 l.WithError(err).WithField("metric", name).Error("failed to fetch metric")
203 }
204 }
205
206 if len(batch) > 0 {
207 if sr.md.IsPostgresSource() {
208 err = sr.executeBatch(ctx, batch)
209 } else {
210 for _, e := range batch {
211 err = sr.fetchMetric(ctx, e)
212 }
213 }
214 if err != nil {
215 l.WithError(err).Error("failed to fetch metrics")
216 }
217
218 now := time.Now()
219 for _, e := range batch {
220 sr.lastFetch[e.name] = now
221 }
222 }
223 select {
224 case <-ctx.Done():
225 return
226 case <-time.After(sr.calcTickInterval()):
227 }
228 }
229 }
230
231
232
233
234
235
236
237
238 func (sr *SourceReaper) executeBatch(ctx context.Context, entries []batchEntry) error {
239 batch := &pgx.Batch{}
240 for _, e := range entries {
241 batch.Queue(e.sql)
242 }
243
244 br := sr.md.Conn.SendBatch(ctx, batch)
245 defer func() { _ = br.Close() }()
246
247 var (
248 errs []error
249 retries []batchEntry
250 )
251 for _, e := range entries {
252 rows, err := br.Query()
253 if err != nil {
254
255 retries = append(retries, e)
256 continue
257 }
258 errs = append(errs, sr.CollectAndDispatch(ctx, rows, e.name, e.metric))
259 }
260
261 for _, e := range retries {
262 if err := sr.fetchMetric(ctx, e); err != nil {
263 errs = append(errs, fmt.Errorf("failed to fetch metric %s: %v", e.name, err))
264 log.GetLogger(ctx).WithField("metric", e.name).Warning("metric degraded after repeated failures, switching to individual fetch")
265 sr.degradedMetrics[e.name] = struct{}{}
266 }
267 }
268 return errors.Join(errs...)
269 }
270
271
272 func (sr *SourceReaper) fetchMetric(ctx context.Context, entry batchEntry) error {
273 rows, err := sr.md.Conn.Query(ctx, entry.sql, pgx.QueryExecModeSimpleProtocol)
274 if err != nil {
275 return err
276 }
277 return sr.CollectAndDispatch(ctx, rows, entry.name, entry.metric)
278 }
279
280
281 func (sr *SourceReaper) CollectAndDispatch(ctx context.Context, rows pgx.Rows, name string, metric metrics.Metric) error {
282 data, err := pgx.CollectRows(rows, metrics.RowToMeasurement)
283 if err != nil {
284 return err
285 }
286 if len(data) > 0 {
287 sr.dispatchMetricData(ctx, name, metric, data)
288 }
289 return nil
290 }
291
292
293 func (sr *SourceReaper) fetchOSMetric(ctx context.Context, name string) error {
294 msg, err := sr.reaper.FetchStatsDirectlyFromOS(ctx, sr.md, name)
295 if err != nil {
296 return fmt.Errorf("could not read metric from OS: %v", err)
297 }
298 if msg != nil && len(msg.Data) > 0 {
299 log.GetLogger(ctx).WithField("metric", name).WithField("rows", len(msg.Data)).Info("measurements fetched")
300 sr.reaper.measurementCh <- *msg
301 }
302 return nil
303 }
304
305
306 func (sr *SourceReaper) fetchSpecialMetric(ctx context.Context, name, storageName string) error {
307 var (
308 data metrics.Measurements
309 err error
310 )
311 switch name {
312 case specialMetricChangeEvents:
313 data, err = sr.reaper.GetObjectChangesMeasurement(ctx, sr.md)
314 case specialMetricInstanceUp:
315 data, err = sr.reaper.GetInstanceUpMeasurement(ctx, sr.md)
316 }
317 if err != nil {
318 return fmt.Errorf("failed to fetch special metric: %v", err)
319 }
320 if len(data) > 0 {
321 sr.sendEnvelope(ctx, name, storageName, data)
322 }
323 return err
324 }
325
326
327 func (sr *SourceReaper) runLogParser(ctx context.Context) error {
328 lp, err := NewLogParser(ctx, sr.md, sr.reaper.measurementCh)
329 if err != nil {
330 return fmt.Errorf("failed to initialize log parser: %v", err)
331 }
332 if err := lp.ParseLogs(); err != nil {
333 return fmt.Errorf("log parser error: %v", err)
334 }
335 return nil
336 }
337
338
339
340 func (sr *SourceReaper) detectServerRestart(ctx context.Context, data metrics.Measurements) {
341 if len(data) == 0 {
342 return
343 }
344 uptimeS, ok := data[0]["postmaster_uptime_s"].(int64)
345 if !ok {
346 return
347 }
348 prev := sr.lastUptimeS
349 sr.lastUptimeS = uptimeS
350 if prev > 0 && uptimeS < prev {
351 l := log.GetLogger(ctx)
352 l.Warning("Detected server restart (or failover)")
353 entry := metrics.NewMeasurement(data.GetEpoch())
354 entry["details"] = "Detected server restart (or failover)"
355 sr.reaper.measurementCh <- metrics.MeasurementEnvelope{
356 DBName: sr.md.Name,
357 MetricName: "object_changes",
358 Data: metrics.Measurements{entry},
359 CustomTags: sr.md.CustomTags,
360 }
361 }
362 }
363