...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/source_reaper_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"testing"
     6  	"testing/synctest"
     7  	"time"
     8  
     9  	"github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
    10  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    11  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    12  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
    13  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    14  	"github.com/jackc/pgx/v5"
    15  	pgxmock "github.com/pashagolub/pgxmock/v4"
    16  	"github.com/stretchr/testify/assert"
    17  	"github.com/stretchr/testify/require"
    18  )
    19  
    20  func TestGCDSlice(t *testing.T) {
    21  	tests := []struct {
    22  		name string
    23  		vals []int
    24  		want int
    25  	}{
    26  		{"empty", nil, 0},
    27  		{"single", []int{30}, 30},
    28  		{"exhaustive preset intervals", []int{30, 60, 120, 180, 300, 600, 900, 3600, 7200}, 30},
    29  		{"coprime", []int{7, 11, 13}, 1},
    30  		{"all same", []int{60, 60, 60}, 60},
    31  		{"basic preset", []int{60, 120}, 60},
    32  	}
    33  	for _, tc := range tests {
    34  		t.Run(tc.name, func(t *testing.T) {
    35  			assert.Equal(t, tc.want, GCDSlice(tc.vals))
    36  		})
    37  	}
    38  }
    39  
    40  func TestCalcTickInterval(t *testing.T) {
    41  	t.Run("exhaustive preset GCD is 30s", func(t *testing.T) {
    42  		sr := &SourceReaper{
    43  			md: &sources.SourceConn{
    44  				Source: sources.Source{
    45  					Metrics: metrics.MetricIntervals{"m1": 30, "m2": 60, "m3": 120, "m4": 300},
    46  				},
    47  			},
    48  		}
    49  		assert.Equal(t, 30*time.Second, sr.calcTickInterval())
    50  	})
    51  
    52  	t.Run("GCD floors to minimum 1s", func(t *testing.T) {
    53  		sr := &SourceReaper{
    54  			md: &sources.SourceConn{
    55  				Source: sources.Source{
    56  					Metrics: metrics.MetricIntervals{"m1": 3, "m2": 7},
    57  				},
    58  			},
    59  		}
    60  		assert.Equal(t, time.Second, sr.calcTickInterval())
    61  	})
    62  
    63  	t.Run("single metric", func(t *testing.T) {
    64  		sr := &SourceReaper{
    65  			md: &sources.SourceConn{
    66  				Source: sources.Source{
    67  					Metrics: metrics.MetricIntervals{"m1": 60},
    68  				},
    69  			},
    70  		}
    71  		assert.Equal(t, 60*time.Second, sr.calcTickInterval())
    72  	})
    73  
    74  	t.Run("empty metrics", func(t *testing.T) {
    75  		sr := &SourceReaper{
    76  			md: &sources.SourceConn{
    77  				Source: sources.Source{
    78  					Metrics: metrics.MetricIntervals{},
    79  				},
    80  			},
    81  		}
    82  		assert.Equal(t, time.Second, sr.calcTickInterval())
    83  	})
    84  
    85  	t.Run("standby metrics when in recovery", func(t *testing.T) {
    86  		sr := &SourceReaper{
    87  			md: &sources.SourceConn{
    88  				Source: sources.Source{
    89  					Metrics:        metrics.MetricIntervals{"m1": 30, "m2": 60},
    90  					MetricsStandby: metrics.MetricIntervals{"m1": 120},
    91  				},
    92  				RuntimeInfo: sources.RuntimeInfo{IsInRecovery: true},
    93  			},
    94  		}
    95  		assert.Equal(t, 120*time.Second, sr.calcTickInterval())
    96  	})
    97  }
    98  
    99  func TestNewSourceReaper(t *testing.T) {
   100  	r := &Reaper{
   101  		measurementCh:    make(chan metrics.MeasurementEnvelope, 10),
   102  		measurementCache: NewInstanceMetricCache(),
   103  	}
   104  	md := &sources.SourceConn{
   105  		Source: sources.Source{
   106  			Name:    "testdb",
   107  			Kind:    sources.SourcePostgres,
   108  			Metrics: metrics.MetricIntervals{"cpu": 30, "mem": 60, "disk": 120},
   109  		},
   110  	}
   111  	sr := NewSourceReaper(r, md)
   112  
   113  	assert.NotNil(t, sr.lastFetch)
   114  	assert.Empty(t, sr.lastFetch)
   115  	assert.Equal(t, r, sr.reaper)
   116  	assert.Equal(t, md, sr.md)
   117  }
   118  
   119  func TestSourceReaper_ExecuteBatch(t *testing.T) {
   120  	ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
   121  
   122  	metricDefs.MetricDefs["batch_metric_1"] = metrics.Metric{
   123  		SQLs: metrics.SQLs{0: "SELECT 1 as value, 100::bigint as epoch_ns"},
   124  	}
   125  	metricDefs.MetricDefs["batch_metric_2"] = metrics.Metric{
   126  		SQLs: metrics.SQLs{0: "SELECT 2 as value, 200::bigint as epoch_ns"},
   127  	}
   128  
   129  	mock, err := pgxmock.NewPool()
   130  	require.NoError(t, err)
   131  	defer mock.Close()
   132  
   133  	md := &sources.SourceConn{
   134  		Source: sources.Source{
   135  			Name:    "test_source",
   136  			Kind:    sources.SourcePostgres,
   137  			Metrics: metrics.MetricIntervals{"batch_metric_1": 30, "batch_metric_2": 30},
   138  		},
   139  		Conn: mock,
   140  		RuntimeInfo: sources.RuntimeInfo{
   141  			Version:     120000,
   142  			ChangeState: make(map[string]map[string]string),
   143  		},
   144  	}
   145  
   146  	r := &Reaper{
   147  		Options: &cmdopts.Options{
   148  			Metrics: metrics.CmdOpts{},
   149  			Sinks:   sinks.CmdOpts{},
   150  		},
   151  		measurementCh:    make(chan metrics.MeasurementEnvelope, 10),
   152  		measurementCache: NewInstanceMetricCache(),
   153  	}
   154  	sr := NewSourceReaper(r, md)
   155  
   156  	rows1 := pgxmock.NewRows([]string{"epoch_ns", "value"}).
   157  		AddRow(time.Now().UnixNano(), int64(100))
   158  	rows2 := pgxmock.NewRows([]string{"epoch_ns", "value"}).
   159  		AddRow(time.Now().UnixNano(), int64(200))
   160  	eb := mock.ExpectBatch()
   161  	eb.ExpectQuery("SELECT 1").WillReturnRows(rows1)
   162  	eb.ExpectQuery("SELECT 2").WillReturnRows(rows2)
   163  
   164  	err = sr.executeBatch(ctx, []batchEntry{
   165  		{name: "batch_metric_1", metric: metricDefs.MetricDefs["batch_metric_1"], sql: "SELECT 1 as value, 100::bigint as epoch_ns"},
   166  		{name: "batch_metric_2", metric: metricDefs.MetricDefs["batch_metric_2"], sql: "SELECT 2 as value, 200::bigint as epoch_ns"},
   167  	})
   168  	assert.NoError(t, err)
   169  
   170  	received := 0
   171  	for {
   172  		select {
   173  		case msg := <-r.measurementCh:
   174  			assert.Equal(t, "test_source", msg.DBName)
   175  			assert.True(t, msg.MetricName == "batch_metric_1" || msg.MetricName == "batch_metric_2")
   176  			received++
   177  		default:
   178  			goto done
   179  		}
   180  	}
   181  done:
   182  	assert.Equal(t, 2, received, "should have received 2 measurement envelopes")
   183  	assert.NoError(t, mock.ExpectationsWereMet())
   184  }
   185  
   186  func TestSourceReaper_RunOneIteration(t *testing.T) {
   187  	ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger()))
   188  
   189  	metricDefs.MetricDefs["run_test_metric"] = metrics.Metric{
   190  		SQLs: metrics.SQLs{0: "SELECT run_test"},
   191  	}
   192  
   193  	mock, err := pgxmock.NewPool()
   194  	require.NoError(t, err)
   195  	defer mock.Close()
   196  
   197  	md := &sources.SourceConn{
   198  		Source: sources.Source{
   199  			Name:    "run_source",
   200  			Kind:    sources.SourcePostgres,
   201  			Metrics: metrics.MetricIntervals{"run_test_metric": 5},
   202  		},
   203  		Conn: mock,
   204  		RuntimeInfo: sources.RuntimeInfo{
   205  			Version:     120000,
   206  			ChangeState: make(map[string]map[string]string),
   207  		},
   208  	}
   209  
   210  	r := &Reaper{
   211  		Options: &cmdopts.Options{
   212  			Metrics: metrics.CmdOpts{},
   213  			Sinks:   sinks.CmdOpts{},
   214  		},
   215  		measurementCh:    make(chan metrics.MeasurementEnvelope, 10),
   216  		measurementCache: NewInstanceMetricCache(),
   217  	}
   218  	sr := NewSourceReaper(r, md)
   219  
   220  	// FetchRuntimeInfo sends a query
   221  	mock.ExpectQuery("select /\\* pgwatch_generated \\*/").
   222  		WillReturnError(assert.AnError)
   223  
   224  	rows := pgxmock.NewRows([]string{"epoch_ns", "value"}).
   225  		AddRow(time.Now().UnixNano(), int64(42))
   226  	eb := mock.ExpectBatch()
   227  	eb.ExpectQuery("SELECT run_test").WillReturnRows(rows)
   228  
   229  	go func() {
   230  		time.Sleep(200 * time.Millisecond)
   231  		cancel()
   232  	}()
   233  
   234  	sr.Run(ctx)
   235  
   236  	select {
   237  	case msg := <-r.measurementCh:
   238  		assert.Equal(t, "run_source", msg.DBName)
   239  		assert.Equal(t, "run_test_metric", msg.MetricName)
   240  	case <-time.After(time.Second):
   241  		t.Error("Expected measurement but timed out")
   242  	}
   243  }
   244  
   245  func TestSourceReaper_DetectServerRestart(t *testing.T) {
   246  	sr := &SourceReaper{
   247  		reaper: &Reaper{
   248  			measurementCh: make(chan metrics.MeasurementEnvelope, 10),
   249  		},
   250  		md: &sources.SourceConn{
   251  			Source: sources.Source{Name: "restart_test"},
   252  		},
   253  	}
   254  
   255  	// First observation — establish baseline
   256  	data := metrics.Measurements{
   257  		{"epoch_ns": time.Now().UnixNano(), "postmaster_uptime_s": int64(1000)},
   258  	}
   259  	sr.detectServerRestart(t.Context(), data)
   260  	assert.Equal(t, int64(1000), sr.lastUptimeS)
   261  	select {
   262  	case <-sr.reaper.measurementCh:
   263  		t.Error("should not emit restart event on first observation")
   264  	default:
   265  	}
   266  
   267  	// Second observation — uptime increased (normal)
   268  	data = metrics.Measurements{
   269  		{"epoch_ns": time.Now().UnixNano(), "postmaster_uptime_s": int64(2000)},
   270  	}
   271  	sr.detectServerRestart(t.Context(), data)
   272  	assert.Equal(t, int64(2000), sr.lastUptimeS)
   273  	select {
   274  	case <-sr.reaper.measurementCh:
   275  		t.Error("should not emit restart event when uptime increases")
   276  	default:
   277  	}
   278  
   279  	// Third observation — uptime decreased (restart!)
   280  	data = metrics.Measurements{
   281  		{"epoch_ns": time.Now().UnixNano(), "postmaster_uptime_s": int64(10)},
   282  	}
   283  	sr.detectServerRestart(t.Context(), data)
   284  	assert.Equal(t, int64(10), sr.lastUptimeS)
   285  	select {
   286  	case msg := <-sr.reaper.measurementCh:
   287  		assert.Equal(t, "object_changes", msg.MetricName)
   288  		assert.Contains(t, msg.Data[0]["details"], "restart")
   289  	default:
   290  		t.Error("expected restart event")
   291  	}
   292  }
   293  
   294  func TestSourceReaper_FetchSpecialMetric(t *testing.T) {
   295  	ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
   296  
   297  	newSR := func(t *testing.T) (*SourceReaper, *sources.SourceConn, pgxmock.PgxPoolIface) {
   298  		t.Helper()
   299  		md, mock := createTestSourceConn(t)
   300  		r := &Reaper{
   301  			Options: &cmdopts.Options{
   302  				Metrics: metrics.CmdOpts{},
   303  				Sinks:   sinks.CmdOpts{},
   304  			},
   305  			measurementCh:    make(chan metrics.MeasurementEnvelope, 10),
   306  			measurementCache: NewInstanceMetricCache(),
   307  		}
   308  		return NewSourceReaper(r, md), md, mock
   309  	}
   310  
   311  	sr, _, mock := newSR(t)
   312  	defer mock.Close()
   313  
   314  	t.Run("instance_up dispatches measurement on ping success", func(t *testing.T) {
   315  		mock.ExpectPing()
   316  		assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricInstanceUp, ""))
   317  		select {
   318  		case msg := <-sr.reaper.measurementCh:
   319  			assert.Equal(t, specialMetricInstanceUp, msg.MetricName)
   320  			assert.Len(t, msg.Data, 1)
   321  			assert.Equal(t, 1, msg.Data[0][specialMetricInstanceUp])
   322  		default:
   323  			t.Error("expected measurement for instance_up")
   324  		}
   325  		assert.NoError(t, mock.ExpectationsWereMet())
   326  	})
   327  
   328  	t.Run("instance_up uses storage name when set", func(t *testing.T) {
   329  		mock.ExpectPing()
   330  		assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricInstanceUp, "infra_up"))
   331  		select {
   332  		case msg := <-sr.reaper.measurementCh:
   333  			assert.Equal(t, "infra_up", msg.MetricName)
   334  		default:
   335  			t.Error("expected measurement")
   336  		}
   337  		assert.NoError(t, mock.ExpectationsWereMet())
   338  	})
   339  
   340  	t.Run("change_events dispatches no measurement when no hash defs present", func(t *testing.T) {
   341  		// Doesn't contain additional defs for any of {"sproc_hashes", "table_hashes", "index_hashes", "configuration_hashes", "privilege_hashes"}
   342  		metricDefs.MetricDefs[specialMetricChangeEvents] = metrics.Metric{}
   343  		assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricChangeEvents, ""))
   344  		select {
   345  		case <-sr.reaper.measurementCh:
   346  			t.Error("expected no measurement when no changes detected")
   347  		default:
   348  		}
   349  		assert.NoError(t, mock.ExpectationsWereMet())
   350  	})
   351  }
   352  
   353  func TestSourceReaper_ExecuteBatch_DegradedOnPersistentFailure(t *testing.T) {
   354  	ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
   355  
   356  	metricDefs.MetricDefs["good_metric"] = metrics.Metric{
   357  		SQLs: metrics.SQLs{0: "SELECT 1 as value, 100::bigint as epoch_ns"},
   358  	}
   359  	metricDefs.MetricDefs["bad_metric"] = metrics.Metric{
   360  		SQLs: metrics.SQLs{0: "SELECT bad"},
   361  	}
   362  
   363  	mock, err := pgxmock.NewPool()
   364  	require.NoError(t, err)
   365  	defer mock.Close()
   366  
   367  	md := &sources.SourceConn{
   368  		Source: sources.Source{
   369  			Name:    "degrade_test",
   370  			Kind:    sources.SourcePostgres,
   371  			Metrics: metrics.MetricIntervals{"good_metric": 30, "bad_metric": 30},
   372  		},
   373  		Conn: mock,
   374  		RuntimeInfo: sources.RuntimeInfo{
   375  			Version:     120000,
   376  			ChangeState: make(map[string]map[string]string),
   377  		},
   378  	}
   379  	r := &Reaper{
   380  		Options: &cmdopts.Options{
   381  			Metrics: metrics.CmdOpts{},
   382  			Sinks:   sinks.CmdOpts{},
   383  		},
   384  		measurementCh:    make(chan metrics.MeasurementEnvelope, 10),
   385  		measurementCache: NewInstanceMetricCache(),
   386  	}
   387  	sr := NewSourceReaper(r, md)
   388  
   389  	entries := []batchEntry{
   390  		{name: "good_metric", metric: metricDefs.MetricDefs["good_metric"], sql: "SELECT 1 as value, 100::bigint as epoch_ns"},
   391  		{name: "bad_metric", metric: metricDefs.MetricDefs["bad_metric"], sql: "SELECT bad"},
   392  	}
   393  
   394  	// batch: good_metric succeeds, bad_metric cascades → retry bad_metric individually → still fails
   395  	rows1 := pgxmock.NewRows([]string{"epoch_ns", "value"}).AddRow(time.Now().UnixNano(), int64(1))
   396  	eb := mock.ExpectBatch()
   397  	eb.ExpectQuery("SELECT 1").WillReturnRows(rows1)
   398  	eb.ExpectQuery("SELECT bad").WillReturnError(assert.AnError) // cascade
   399  	// individual retry of bad_metric
   400  	mock.ExpectQuery("SELECT bad").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnError(assert.AnError)
   401  
   402  	err = sr.executeBatch(ctx, entries)
   403  	assert.Error(t, err)
   404  	assert.Contains(t, sr.degradedMetrics, "bad_metric", "bad_metric should be degraded after persistent failure")
   405  	assert.NotContains(t, sr.degradedMetrics, "good_metric", "good_metric should not be degraded")
   406  	assert.NoError(t, mock.ExpectationsWereMet())
   407  }
   408  
   409  func TestSourceReaper_ExecuteBatch_CascadeRecovery(t *testing.T) {
   410  	// A metric that errors in the batch but succeeds on individual retry must NOT be marked degraded.
   411  	ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
   412  
   413  	metricDefs.MetricDefs["cascade_victim"] = metrics.Metric{
   414  		SQLs: metrics.SQLs{0: "SELECT 3 as value, 300::bigint as epoch_ns"},
   415  	}
   416  	metricDefs.MetricDefs["cascade_trigger"] = metrics.Metric{
   417  		SQLs: metrics.SQLs{0: "SELECT fail"},
   418  	}
   419  
   420  	mock, err := pgxmock.NewPool()
   421  	require.NoError(t, err)
   422  	defer mock.Close()
   423  
   424  	md := &sources.SourceConn{
   425  		Source: sources.Source{
   426  			Name:    "cascade_test",
   427  			Kind:    sources.SourcePostgres,
   428  			Metrics: metrics.MetricIntervals{"cascade_trigger": 30, "cascade_victim": 30},
   429  		},
   430  		Conn: mock,
   431  		RuntimeInfo: sources.RuntimeInfo{
   432  			Version:     120000,
   433  			ChangeState: make(map[string]map[string]string),
   434  		},
   435  	}
   436  	r := &Reaper{
   437  		Options: &cmdopts.Options{
   438  			Metrics: metrics.CmdOpts{},
   439  			Sinks:   sinks.CmdOpts{},
   440  		},
   441  		measurementCh:    make(chan metrics.MeasurementEnvelope, 10),
   442  		measurementCache: NewInstanceMetricCache(),
   443  	}
   444  	sr := NewSourceReaper(r, md)
   445  
   446  	entries := []batchEntry{
   447  		{name: "cascade_trigger", metric: metricDefs.MetricDefs["cascade_trigger"], sql: "SELECT fail"},
   448  		{name: "cascade_victim", metric: metricDefs.MetricDefs["cascade_victim"], sql: "SELECT 3 as value, 300::bigint as epoch_ns"},
   449  	}
   450  
   451  	// batch: trigger fails, victim cascades → both retry individually
   452  	// trigger fails individually (real error), victim succeeds individually (was only a cascade)
   453  	eb := mock.ExpectBatch()
   454  	eb.ExpectQuery("SELECT fail").WillReturnError(assert.AnError)
   455  	eb.ExpectQuery("SELECT 3").WillReturnError(assert.AnError) // cascade in batch
   456  	// individual retries
   457  	mock.ExpectQuery("SELECT fail").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnError(assert.AnError)
   458  	mock.ExpectQuery("SELECT 3").WithArgs(pgx.QueryExecModeSimpleProtocol).
   459  		WillReturnRows(pgxmock.NewRows([]string{"epoch_ns", "value"}).AddRow(time.Now().UnixNano(), int64(3)))
   460  
   461  	err = sr.executeBatch(ctx, entries)
   462  	assert.Error(t, err, "cascade_trigger error should propagate")
   463  	assert.Contains(t, sr.degradedMetrics, "cascade_trigger", "real-failure metric should be degraded")
   464  	assert.NotContains(t, sr.degradedMetrics, "cascade_victim", "cascade-only victim must not be degraded")
   465  	assert.NoError(t, mock.ExpectationsWereMet())
   466  }
   467  
   468  func TestSourceReaper_DegradedMetricRecovery(t *testing.T) {
   469  	// Uses the real Run loop (via synctest fake clock) to verify the full degraded→recovered
   470  	// lifecycle: iteration 1 the degraded metric fails individually (stays degraded),
   471  	// iteration 2 it succeeds (removed from degradedMetrics).
   472  	synctest.Test(t, func(t *testing.T) {
   473  		const (
   474  			metricName     = "recovering_metric_real"
   475  			metricInterval = 30
   476  		)
   477  
   478  		metricDefs.MetricDefs[metricName] = metrics.Metric{
   479  			SQLs: metrics.SQLs{0: "SELECT 7 as value, 700::bigint as epoch_ns"},
   480  		}
   481  
   482  		mock, err := pgxmock.NewPool()
   483  		require.NoError(t, err)
   484  		defer mock.Close()
   485  
   486  		md := &sources.SourceConn{
   487  			Source: sources.Source{
   488  				Name:    "recovery_src",
   489  				Kind:    sources.SourcePostgres,
   490  				Metrics: metrics.MetricIntervals{metricName: metricInterval},
   491  			},
   492  			Conn: mock,
   493  			RuntimeInfo: sources.RuntimeInfo{
   494  				Version:     120000,
   495  				ChangeState: make(map[string]map[string]string),
   496  			},
   497  		}
   498  		r := &Reaper{
   499  			Options: &cmdopts.Options{
   500  				Metrics: metrics.CmdOpts{},
   501  				Sinks:   sinks.CmdOpts{},
   502  			},
   503  			measurementCh:    make(chan metrics.MeasurementEnvelope, 10),
   504  			measurementCache: NewInstanceMetricCache(),
   505  		}
   506  		ctx := log.WithLogger(t.Context(), log.NewNoopLogger())
   507  		sr := NewSourceReaper(r, md)
   508  		sr.degradedMetrics[metricName] = struct{}{} // pre-seed: metric already degraded
   509  
   510  		// Iteration 1: FetchRuntimeInfo + degraded individual fetch → fails → stays degraded
   511  		mock.ExpectQuery("select /\\* pgwatch_generated \\*/").WillReturnError(assert.AnError)
   512  		mock.ExpectQuery("SELECT 7").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnError(assert.AnError)
   513  
   514  		// Iteration 2: FetchRuntimeInfo + degraded individual fetch → succeeds → recovered
   515  		mock.ExpectQuery("select /\\* pgwatch_generated \\*/").WillReturnError(assert.AnError)
   516  		mock.ExpectQuery("SELECT 7").WithArgs(pgx.QueryExecModeSimpleProtocol).
   517  			WillReturnRows(pgxmock.NewRows([]string{"epoch_ns", "value"}).AddRow(int64(700_000_000_000), int64(7)))
   518  
   519  		go sr.Run(ctx)
   520  
   521  		// Run goroutine completes iteration 1 (pgxmock is in-memory, no real I/O) then
   522  		// blocks on time.After — the only durably-blocking operation in the loop.
   523  		synctest.Wait()
   524  		assert.Contains(t, sr.degradedMetrics, metricName, "should still be degraded after first failure")
   525  
   526  		// Advance the fake clock past the interval to trigger iteration 2.
   527  		// The Run goroutine's time.After(30s) fires first; it runs iteration 2 and
   528  		// blocks again before the test goroutine's sleep finishes.
   529  		time.Sleep(time.Duration(metricInterval)*time.Second + time.Millisecond)
   530  		synctest.Wait()
   531  		assert.NotContains(t, sr.degradedMetrics, metricName, "should recover after successful fetchMetric")
   532  
   533  		assert.NoError(t, mock.ExpectationsWereMet())
   534  	})
   535  }
   536  
   537  func TestSourceReaper_NonPostgresSequential(t *testing.T) {
   538  	ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
   539  
   540  	metricDefs.MetricDefs["seq_metric"] = metrics.Metric{
   541  		SQLs: metrics.SQLs{0: "SELECT seq_value"},
   542  	}
   543  
   544  	mock, err := pgxmock.NewPool()
   545  	require.NoError(t, err)
   546  	defer mock.Close()
   547  
   548  	md := &sources.SourceConn{
   549  		Source: sources.Source{
   550  			Name:    "seq_test_src",
   551  			Kind:    sources.SourcePostgres,
   552  			Metrics: metrics.MetricIntervals{"seq_metric": 30},
   553  		},
   554  		Conn: mock,
   555  		RuntimeInfo: sources.RuntimeInfo{
   556  			Version:     120000,
   557  			ChangeState: make(map[string]map[string]string),
   558  		},
   559  	}
   560  
   561  	r := &Reaper{
   562  		Options: &cmdopts.Options{
   563  			Metrics: metrics.CmdOpts{},
   564  			Sinks:   sinks.CmdOpts{},
   565  		},
   566  		measurementCh:    make(chan metrics.MeasurementEnvelope, 10),
   567  		measurementCache: NewInstanceMetricCache(),
   568  	}
   569  	sr := NewSourceReaper(r, md)
   570  
   571  	rows := pgxmock.NewRows([]string{"epoch_ns", "value"}).
   572  		AddRow(time.Now().UnixNano(), int64(42))
   573  	mock.ExpectQuery("SELECT seq_value").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnRows(rows)
   574  
   575  	err = sr.fetchMetric(ctx, batchEntry{name: "seq_metric", metric: metricDefs.MetricDefs["seq_metric"], sql: "SELECT seq_value"})
   576  	assert.NoError(t, err)
   577  	assert.NoError(t, mock.ExpectationsWereMet())
   578  }
   579