1 package reaper
2
3 import (
4 "context"
5 "testing"
6 "testing/synctest"
7 "time"
8
9 "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
10 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
11 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
12 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
13 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
14 "github.com/jackc/pgx/v5"
15 pgxmock "github.com/pashagolub/pgxmock/v4"
16 "github.com/stretchr/testify/assert"
17 "github.com/stretchr/testify/require"
18 )
19
20 func TestGCDSlice(t *testing.T) {
21 tests := []struct {
22 name string
23 vals []int
24 want int
25 }{
26 {"empty", nil, 0},
27 {"single", []int{30}, 30},
28 {"exhaustive preset intervals", []int{30, 60, 120, 180, 300, 600, 900, 3600, 7200}, 30},
29 {"coprime", []int{7, 11, 13}, 1},
30 {"all same", []int{60, 60, 60}, 60},
31 {"basic preset", []int{60, 120}, 60},
32 }
33 for _, tc := range tests {
34 t.Run(tc.name, func(t *testing.T) {
35 assert.Equal(t, tc.want, GCDSlice(tc.vals))
36 })
37 }
38 }
39
40 func TestCalcTickInterval(t *testing.T) {
41 t.Run("exhaustive preset GCD is 30s", func(t *testing.T) {
42 sr := &SourceReaper{
43 md: &sources.SourceConn{
44 Source: sources.Source{
45 Metrics: metrics.MetricIntervals{"m1": 30, "m2": 60, "m3": 120, "m4": 300},
46 },
47 },
48 }
49 assert.Equal(t, 30*time.Second, sr.calcTickInterval())
50 })
51
52 t.Run("GCD floors to minimum 1s", func(t *testing.T) {
53 sr := &SourceReaper{
54 md: &sources.SourceConn{
55 Source: sources.Source{
56 Metrics: metrics.MetricIntervals{"m1": 3, "m2": 7},
57 },
58 },
59 }
60 assert.Equal(t, time.Second, sr.calcTickInterval())
61 })
62
63 t.Run("single metric", func(t *testing.T) {
64 sr := &SourceReaper{
65 md: &sources.SourceConn{
66 Source: sources.Source{
67 Metrics: metrics.MetricIntervals{"m1": 60},
68 },
69 },
70 }
71 assert.Equal(t, 60*time.Second, sr.calcTickInterval())
72 })
73
74 t.Run("empty metrics", func(t *testing.T) {
75 sr := &SourceReaper{
76 md: &sources.SourceConn{
77 Source: sources.Source{
78 Metrics: metrics.MetricIntervals{},
79 },
80 },
81 }
82 assert.Equal(t, time.Second, sr.calcTickInterval())
83 })
84
85 t.Run("standby metrics when in recovery", func(t *testing.T) {
86 sr := &SourceReaper{
87 md: &sources.SourceConn{
88 Source: sources.Source{
89 Metrics: metrics.MetricIntervals{"m1": 30, "m2": 60},
90 MetricsStandby: metrics.MetricIntervals{"m1": 120},
91 },
92 RuntimeInfo: sources.RuntimeInfo{IsInRecovery: true},
93 },
94 }
95 assert.Equal(t, 120*time.Second, sr.calcTickInterval())
96 })
97 }
98
99 func TestNewSourceReaper(t *testing.T) {
100 r := &Reaper{
101 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
102 measurementCache: NewInstanceMetricCache(),
103 }
104 md := &sources.SourceConn{
105 Source: sources.Source{
106 Name: "testdb",
107 Kind: sources.SourcePostgres,
108 Metrics: metrics.MetricIntervals{"cpu": 30, "mem": 60, "disk": 120},
109 },
110 }
111 sr := NewSourceReaper(r, md)
112
113 assert.NotNil(t, sr.lastFetch)
114 assert.Empty(t, sr.lastFetch)
115 assert.Equal(t, r, sr.reaper)
116 assert.Equal(t, md, sr.md)
117 }
118
119 func TestSourceReaper_ExecuteBatch(t *testing.T) {
120 ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
121
122 metricDefs.MetricDefs["batch_metric_1"] = metrics.Metric{
123 SQLs: metrics.SQLs{0: "SELECT 1 as value, 100::bigint as epoch_ns"},
124 }
125 metricDefs.MetricDefs["batch_metric_2"] = metrics.Metric{
126 SQLs: metrics.SQLs{0: "SELECT 2 as value, 200::bigint as epoch_ns"},
127 }
128
129 mock, err := pgxmock.NewPool()
130 require.NoError(t, err)
131 defer mock.Close()
132
133 md := &sources.SourceConn{
134 Source: sources.Source{
135 Name: "test_source",
136 Kind: sources.SourcePostgres,
137 Metrics: metrics.MetricIntervals{"batch_metric_1": 30, "batch_metric_2": 30},
138 },
139 Conn: mock,
140 RuntimeInfo: sources.RuntimeInfo{
141 Version: 120000,
142 ChangeState: make(map[string]map[string]string),
143 },
144 }
145
146 r := &Reaper{
147 Options: &cmdopts.Options{
148 Metrics: metrics.CmdOpts{},
149 Sinks: sinks.CmdOpts{},
150 },
151 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
152 measurementCache: NewInstanceMetricCache(),
153 }
154 sr := NewSourceReaper(r, md)
155
156 rows1 := pgxmock.NewRows([]string{"epoch_ns", "value"}).
157 AddRow(time.Now().UnixNano(), int64(100))
158 rows2 := pgxmock.NewRows([]string{"epoch_ns", "value"}).
159 AddRow(time.Now().UnixNano(), int64(200))
160 eb := mock.ExpectBatch()
161 eb.ExpectQuery("SELECT 1").WillReturnRows(rows1)
162 eb.ExpectQuery("SELECT 2").WillReturnRows(rows2)
163
164 err = sr.executeBatch(ctx, []batchEntry{
165 {name: "batch_metric_1", metric: metricDefs.MetricDefs["batch_metric_1"], sql: "SELECT 1 as value, 100::bigint as epoch_ns"},
166 {name: "batch_metric_2", metric: metricDefs.MetricDefs["batch_metric_2"], sql: "SELECT 2 as value, 200::bigint as epoch_ns"},
167 })
168 assert.NoError(t, err)
169
170 received := 0
171 for {
172 select {
173 case msg := <-r.measurementCh:
174 assert.Equal(t, "test_source", msg.DBName)
175 assert.True(t, msg.MetricName == "batch_metric_1" || msg.MetricName == "batch_metric_2")
176 received++
177 default:
178 goto done
179 }
180 }
181 done:
182 assert.Equal(t, 2, received, "should have received 2 measurement envelopes")
183 assert.NoError(t, mock.ExpectationsWereMet())
184 }
185
186 func TestSourceReaper_RunOneIteration(t *testing.T) {
187 ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger()))
188
189 metricDefs.MetricDefs["run_test_metric"] = metrics.Metric{
190 SQLs: metrics.SQLs{0: "SELECT run_test"},
191 }
192
193 mock, err := pgxmock.NewPool()
194 require.NoError(t, err)
195 defer mock.Close()
196
197 md := &sources.SourceConn{
198 Source: sources.Source{
199 Name: "run_source",
200 Kind: sources.SourcePostgres,
201 Metrics: metrics.MetricIntervals{"run_test_metric": 5},
202 },
203 Conn: mock,
204 RuntimeInfo: sources.RuntimeInfo{
205 Version: 120000,
206 ChangeState: make(map[string]map[string]string),
207 },
208 }
209
210 r := &Reaper{
211 Options: &cmdopts.Options{
212 Metrics: metrics.CmdOpts{},
213 Sinks: sinks.CmdOpts{},
214 },
215 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
216 measurementCache: NewInstanceMetricCache(),
217 }
218 sr := NewSourceReaper(r, md)
219
220
221 mock.ExpectQuery("select /\\* pgwatch_generated \\*/").
222 WillReturnError(assert.AnError)
223
224 rows := pgxmock.NewRows([]string{"epoch_ns", "value"}).
225 AddRow(time.Now().UnixNano(), int64(42))
226 eb := mock.ExpectBatch()
227 eb.ExpectQuery("SELECT run_test").WillReturnRows(rows)
228
229 go func() {
230 time.Sleep(200 * time.Millisecond)
231 cancel()
232 }()
233
234 sr.Run(ctx)
235
236 select {
237 case msg := <-r.measurementCh:
238 assert.Equal(t, "run_source", msg.DBName)
239 assert.Equal(t, "run_test_metric", msg.MetricName)
240 case <-time.After(time.Second):
241 t.Error("Expected measurement but timed out")
242 }
243 }
244
245 func TestSourceReaper_DetectServerRestart(t *testing.T) {
246 sr := &SourceReaper{
247 reaper: &Reaper{
248 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
249 },
250 md: &sources.SourceConn{
251 Source: sources.Source{Name: "restart_test"},
252 },
253 }
254
255
256 data := metrics.Measurements{
257 {"epoch_ns": time.Now().UnixNano(), "postmaster_uptime_s": int64(1000)},
258 }
259 sr.detectServerRestart(t.Context(), data)
260 assert.Equal(t, int64(1000), sr.lastUptimeS)
261 select {
262 case <-sr.reaper.measurementCh:
263 t.Error("should not emit restart event on first observation")
264 default:
265 }
266
267
268 data = metrics.Measurements{
269 {"epoch_ns": time.Now().UnixNano(), "postmaster_uptime_s": int64(2000)},
270 }
271 sr.detectServerRestart(t.Context(), data)
272 assert.Equal(t, int64(2000), sr.lastUptimeS)
273 select {
274 case <-sr.reaper.measurementCh:
275 t.Error("should not emit restart event when uptime increases")
276 default:
277 }
278
279
280 data = metrics.Measurements{
281 {"epoch_ns": time.Now().UnixNano(), "postmaster_uptime_s": int64(10)},
282 }
283 sr.detectServerRestart(t.Context(), data)
284 assert.Equal(t, int64(10), sr.lastUptimeS)
285 select {
286 case msg := <-sr.reaper.measurementCh:
287 assert.Equal(t, "object_changes", msg.MetricName)
288 assert.Contains(t, msg.Data[0]["details"], "restart")
289 default:
290 t.Error("expected restart event")
291 }
292 }
293
294 func TestSourceReaper_FetchSpecialMetric(t *testing.T) {
295 ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
296
297 newSR := func(t *testing.T) (*SourceReaper, *sources.SourceConn, pgxmock.PgxPoolIface) {
298 t.Helper()
299 md, mock := createTestSourceConn(t)
300 r := &Reaper{
301 Options: &cmdopts.Options{
302 Metrics: metrics.CmdOpts{},
303 Sinks: sinks.CmdOpts{},
304 },
305 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
306 measurementCache: NewInstanceMetricCache(),
307 }
308 return NewSourceReaper(r, md), md, mock
309 }
310
311 sr, _, mock := newSR(t)
312 defer mock.Close()
313
314 t.Run("instance_up dispatches measurement on ping success", func(t *testing.T) {
315 mock.ExpectPing()
316 assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricInstanceUp, ""))
317 select {
318 case msg := <-sr.reaper.measurementCh:
319 assert.Equal(t, specialMetricInstanceUp, msg.MetricName)
320 assert.Len(t, msg.Data, 1)
321 assert.Equal(t, 1, msg.Data[0][specialMetricInstanceUp])
322 default:
323 t.Error("expected measurement for instance_up")
324 }
325 assert.NoError(t, mock.ExpectationsWereMet())
326 })
327
328 t.Run("instance_up uses storage name when set", func(t *testing.T) {
329 mock.ExpectPing()
330 assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricInstanceUp, "infra_up"))
331 select {
332 case msg := <-sr.reaper.measurementCh:
333 assert.Equal(t, "infra_up", msg.MetricName)
334 default:
335 t.Error("expected measurement")
336 }
337 assert.NoError(t, mock.ExpectationsWereMet())
338 })
339
340 t.Run("change_events dispatches no measurement when no hash defs present", func(t *testing.T) {
341
342 metricDefs.MetricDefs[specialMetricChangeEvents] = metrics.Metric{}
343 assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricChangeEvents, ""))
344 select {
345 case <-sr.reaper.measurementCh:
346 t.Error("expected no measurement when no changes detected")
347 default:
348 }
349 assert.NoError(t, mock.ExpectationsWereMet())
350 })
351 }
352
353 func TestSourceReaper_ExecuteBatch_DegradedOnPersistentFailure(t *testing.T) {
354 ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
355
356 metricDefs.MetricDefs["good_metric"] = metrics.Metric{
357 SQLs: metrics.SQLs{0: "SELECT 1 as value, 100::bigint as epoch_ns"},
358 }
359 metricDefs.MetricDefs["bad_metric"] = metrics.Metric{
360 SQLs: metrics.SQLs{0: "SELECT bad"},
361 }
362
363 mock, err := pgxmock.NewPool()
364 require.NoError(t, err)
365 defer mock.Close()
366
367 md := &sources.SourceConn{
368 Source: sources.Source{
369 Name: "degrade_test",
370 Kind: sources.SourcePostgres,
371 Metrics: metrics.MetricIntervals{"good_metric": 30, "bad_metric": 30},
372 },
373 Conn: mock,
374 RuntimeInfo: sources.RuntimeInfo{
375 Version: 120000,
376 ChangeState: make(map[string]map[string]string),
377 },
378 }
379 r := &Reaper{
380 Options: &cmdopts.Options{
381 Metrics: metrics.CmdOpts{},
382 Sinks: sinks.CmdOpts{},
383 },
384 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
385 measurementCache: NewInstanceMetricCache(),
386 }
387 sr := NewSourceReaper(r, md)
388
389 entries := []batchEntry{
390 {name: "good_metric", metric: metricDefs.MetricDefs["good_metric"], sql: "SELECT 1 as value, 100::bigint as epoch_ns"},
391 {name: "bad_metric", metric: metricDefs.MetricDefs["bad_metric"], sql: "SELECT bad"},
392 }
393
394
395 rows1 := pgxmock.NewRows([]string{"epoch_ns", "value"}).AddRow(time.Now().UnixNano(), int64(1))
396 eb := mock.ExpectBatch()
397 eb.ExpectQuery("SELECT 1").WillReturnRows(rows1)
398 eb.ExpectQuery("SELECT bad").WillReturnError(assert.AnError)
399
400 mock.ExpectQuery("SELECT bad").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnError(assert.AnError)
401
402 err = sr.executeBatch(ctx, entries)
403 assert.Error(t, err)
404 assert.Contains(t, sr.degradedMetrics, "bad_metric", "bad_metric should be degraded after persistent failure")
405 assert.NotContains(t, sr.degradedMetrics, "good_metric", "good_metric should not be degraded")
406 assert.NoError(t, mock.ExpectationsWereMet())
407 }
408
409 func TestSourceReaper_ExecuteBatch_CascadeRecovery(t *testing.T) {
410
411 ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
412
413 metricDefs.MetricDefs["cascade_victim"] = metrics.Metric{
414 SQLs: metrics.SQLs{0: "SELECT 3 as value, 300::bigint as epoch_ns"},
415 }
416 metricDefs.MetricDefs["cascade_trigger"] = metrics.Metric{
417 SQLs: metrics.SQLs{0: "SELECT fail"},
418 }
419
420 mock, err := pgxmock.NewPool()
421 require.NoError(t, err)
422 defer mock.Close()
423
424 md := &sources.SourceConn{
425 Source: sources.Source{
426 Name: "cascade_test",
427 Kind: sources.SourcePostgres,
428 Metrics: metrics.MetricIntervals{"cascade_trigger": 30, "cascade_victim": 30},
429 },
430 Conn: mock,
431 RuntimeInfo: sources.RuntimeInfo{
432 Version: 120000,
433 ChangeState: make(map[string]map[string]string),
434 },
435 }
436 r := &Reaper{
437 Options: &cmdopts.Options{
438 Metrics: metrics.CmdOpts{},
439 Sinks: sinks.CmdOpts{},
440 },
441 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
442 measurementCache: NewInstanceMetricCache(),
443 }
444 sr := NewSourceReaper(r, md)
445
446 entries := []batchEntry{
447 {name: "cascade_trigger", metric: metricDefs.MetricDefs["cascade_trigger"], sql: "SELECT fail"},
448 {name: "cascade_victim", metric: metricDefs.MetricDefs["cascade_victim"], sql: "SELECT 3 as value, 300::bigint as epoch_ns"},
449 }
450
451
452
453 eb := mock.ExpectBatch()
454 eb.ExpectQuery("SELECT fail").WillReturnError(assert.AnError)
455 eb.ExpectQuery("SELECT 3").WillReturnError(assert.AnError)
456
457 mock.ExpectQuery("SELECT fail").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnError(assert.AnError)
458 mock.ExpectQuery("SELECT 3").WithArgs(pgx.QueryExecModeSimpleProtocol).
459 WillReturnRows(pgxmock.NewRows([]string{"epoch_ns", "value"}).AddRow(time.Now().UnixNano(), int64(3)))
460
461 err = sr.executeBatch(ctx, entries)
462 assert.Error(t, err, "cascade_trigger error should propagate")
463 assert.Contains(t, sr.degradedMetrics, "cascade_trigger", "real-failure metric should be degraded")
464 assert.NotContains(t, sr.degradedMetrics, "cascade_victim", "cascade-only victim must not be degraded")
465 assert.NoError(t, mock.ExpectationsWereMet())
466 }
467
468 func TestSourceReaper_DegradedMetricRecovery(t *testing.T) {
469
470
471
472 synctest.Test(t, func(t *testing.T) {
473 const (
474 metricName = "recovering_metric_real"
475 metricInterval = 30
476 )
477
478 metricDefs.MetricDefs[metricName] = metrics.Metric{
479 SQLs: metrics.SQLs{0: "SELECT 7 as value, 700::bigint as epoch_ns"},
480 }
481
482 mock, err := pgxmock.NewPool()
483 require.NoError(t, err)
484 defer mock.Close()
485
486 md := &sources.SourceConn{
487 Source: sources.Source{
488 Name: "recovery_src",
489 Kind: sources.SourcePostgres,
490 Metrics: metrics.MetricIntervals{metricName: metricInterval},
491 },
492 Conn: mock,
493 RuntimeInfo: sources.RuntimeInfo{
494 Version: 120000,
495 ChangeState: make(map[string]map[string]string),
496 },
497 }
498 r := &Reaper{
499 Options: &cmdopts.Options{
500 Metrics: metrics.CmdOpts{},
501 Sinks: sinks.CmdOpts{},
502 },
503 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
504 measurementCache: NewInstanceMetricCache(),
505 }
506 ctx := log.WithLogger(t.Context(), log.NewNoopLogger())
507 sr := NewSourceReaper(r, md)
508 sr.degradedMetrics[metricName] = struct{}{}
509
510
511 mock.ExpectQuery("select /\\* pgwatch_generated \\*/").WillReturnError(assert.AnError)
512 mock.ExpectQuery("SELECT 7").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnError(assert.AnError)
513
514
515 mock.ExpectQuery("select /\\* pgwatch_generated \\*/").WillReturnError(assert.AnError)
516 mock.ExpectQuery("SELECT 7").WithArgs(pgx.QueryExecModeSimpleProtocol).
517 WillReturnRows(pgxmock.NewRows([]string{"epoch_ns", "value"}).AddRow(int64(700_000_000_000), int64(7)))
518
519 go sr.Run(ctx)
520
521
522
523 synctest.Wait()
524 assert.Contains(t, sr.degradedMetrics, metricName, "should still be degraded after first failure")
525
526
527
528
529 time.Sleep(time.Duration(metricInterval)*time.Second + time.Millisecond)
530 synctest.Wait()
531 assert.NotContains(t, sr.degradedMetrics, metricName, "should recover after successful fetchMetric")
532
533 assert.NoError(t, mock.ExpectationsWereMet())
534 })
535 }
536
537 func TestSourceReaper_NonPostgresSequential(t *testing.T) {
538 ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
539
540 metricDefs.MetricDefs["seq_metric"] = metrics.Metric{
541 SQLs: metrics.SQLs{0: "SELECT seq_value"},
542 }
543
544 mock, err := pgxmock.NewPool()
545 require.NoError(t, err)
546 defer mock.Close()
547
548 md := &sources.SourceConn{
549 Source: sources.Source{
550 Name: "seq_test_src",
551 Kind: sources.SourcePostgres,
552 Metrics: metrics.MetricIntervals{"seq_metric": 30},
553 },
554 Conn: mock,
555 RuntimeInfo: sources.RuntimeInfo{
556 Version: 120000,
557 ChangeState: make(map[string]map[string]string),
558 },
559 }
560
561 r := &Reaper{
562 Options: &cmdopts.Options{
563 Metrics: metrics.CmdOpts{},
564 Sinks: sinks.CmdOpts{},
565 },
566 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
567 measurementCache: NewInstanceMetricCache(),
568 }
569 sr := NewSourceReaper(r, md)
570
571 rows := pgxmock.NewRows([]string{"epoch_ns", "value"}).
572 AddRow(time.Now().UnixNano(), int64(42))
573 mock.ExpectQuery("SELECT seq_value").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnRows(rows)
574
575 err = sr.fetchMetric(ctx, batchEntry{name: "seq_metric", metric: metricDefs.MetricDefs["seq_metric"], sql: "SELECT seq_value"})
576 assert.NoError(t, err)
577 assert.NoError(t, mock.ExpectationsWereMet())
578 }
579