...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/source_reaper_integration_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"testing"
     6  	"time"
     7  
     8  	"github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
     9  	"github.com/cybertec-postgresql/pgwatch/v5/internal/db"
    10  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    11  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    12  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
    13  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    14  	"github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
    15  	"github.com/stretchr/testify/assert"
    16  	"github.com/stretchr/testify/require"
    17  )
    18  
    19  // setupIntegrationDB starts a real Postgres container and returns a SourceConn
    20  // with a live pgxpool connection. The caller must call tearDown when done.
    21  func setupIntegrationDB(t *testing.T) (*sources.SourceConn, func()) {
    22  	t.Helper()
    23  	if testing.Short() {
    24  		t.Skip("skipping integration test in short mode")
    25  	}
    26  
    27  	pgContainer, tearDown, err := testutil.SetupPostgresContainer()
    28  	require.NoError(t, err, "failed to start postgres container")
    29  
    30  	connStr, err := pgContainer.ConnectionString(testutil.TestContext, "sslmode=disable")
    31  	require.NoError(t, err, "failed to get connection string")
    32  
    33  	pool, err := db.New(testutil.TestContext, connStr)
    34  	require.NoError(t, err, "failed to create connection pool")
    35  
    36  	md := sources.NewSourceConn(sources.Source{
    37  		Name: "integration_test",
    38  		Kind: sources.SourcePostgres,
    39  	})
    40  	md.Conn = pool
    41  	err = md.FetchRuntimeInfo(testutil.TestContext, true)
    42  	require.NoError(t, err, "failed to fetch runtime info")
    43  
    44  	return md, func() {
    45  		pool.Close()
    46  		tearDown()
    47  	}
    48  }
    49  
    50  // TestIntegration_ExecuteBatch verifies the full executeBatch path against a real
    51  // Postgres instance: builds a pgx.Batch from metric definitions, sends it, and
    52  // receives MeasurementEnvelopes on the measurement channel.
    53  func TestIntegration_ExecuteBatch(t *testing.T) {
    54  	md, tearDown := setupIntegrationDB(t)
    55  	defer tearDown()
    56  
    57  	ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
    58  
    59  	metricDefs.MetricDefs["integ_version"] = metrics.Metric{
    60  		SQLs: metrics.SQLs{0: "SELECT version() AS pg_version"},
    61  	}
    62  	metricDefs.MetricDefs["integ_uptime"] = metrics.Metric{
    63  		SQLs: metrics.SQLs{0: "SELECT extract(epoch from now() - pg_postmaster_start_time())::int8 AS uptime_seconds"},
    64  	}
    65  	defer func() {
    66  		delete(metricDefs.MetricDefs, "integ_version")
    67  		delete(metricDefs.MetricDefs, "integ_uptime")
    68  	}()
    69  
    70  	md.Metrics = metrics.MetricIntervals{
    71  		"integ_version": 30,
    72  		"integ_uptime":  60,
    73  	}
    74  
    75  	r := &Reaper{
    76  		Options: &cmdopts.Options{
    77  			Metrics: metrics.CmdOpts{},
    78  			Sinks:   sinks.CmdOpts{},
    79  		},
    80  		measurementCh:    make(chan metrics.MeasurementEnvelope, 10),
    81  		measurementCache: NewInstanceMetricCache(),
    82  	}
    83  	sr := NewSourceReaper(r, md)
    84  
    85  	err := sr.executeBatch(ctx, []batchEntry{
    86  		{name: "integ_version", metric: metricDefs.MetricDefs["integ_version"], sql: "SELECT version() AS pg_version"},
    87  		{name: "integ_uptime", metric: metricDefs.MetricDefs["integ_uptime"], sql: "SELECT extract(epoch from now() - pg_postmaster_start_time())::int8 AS uptime_seconds"},
    88  	})
    89  	require.NoError(t, err)
    90  
    91  	received := make(map[string]metrics.MeasurementEnvelope)
    92  	for range 10 {
    93  		select {
    94  		case msg := <-r.measurementCh:
    95  			received[msg.MetricName] = msg
    96  		default:
    97  		}
    98  	}
    99  
   100  	assert.Contains(t, received, "integ_version")
   101  	assert.Contains(t, received, "integ_uptime")
   102  
   103  	if msg, ok := received["integ_version"]; ok {
   104  		assert.Equal(t, "integration_test", msg.DBName)
   105  		assert.NotEmpty(t, msg.Data)
   106  		assert.Contains(t, msg.Data[0]["pg_version"], "PostgreSQL")
   107  	}
   108  
   109  	if msg, ok := received["integ_uptime"]; ok {
   110  		assert.Equal(t, "integration_test", msg.DBName)
   111  		assert.NotEmpty(t, msg.Data)
   112  		assert.NotNil(t, msg.Data[0]["uptime_seconds"])
   113  	}
   114  }
   115  
   116  // TestIntegration_SourceReaper_RunCollectsMetrics verifies the full Run loop:
   117  // creates a SourceReaper with two SQL metrics, lets it run one tick against
   118  // a real Postgres container, and verifies that both metric envelopes arrive.
   119  func TestIntegration_SourceReaper_RunCollectsMetrics(t *testing.T) {
   120  	md, tearDown := setupIntegrationDB(t)
   121  	defer tearDown()
   122  
   123  	metricDefs.MetricDefs["integ_run_version"] = metrics.Metric{
   124  		SQLs: metrics.SQLs{0: "SELECT version() AS pg_version"},
   125  	}
   126  	metricDefs.MetricDefs["integ_run_size"] = metrics.Metric{
   127  		SQLs: metrics.SQLs{0: "SELECT pg_database_size(current_database()) AS db_size_bytes"},
   128  	}
   129  	defer func() {
   130  		delete(metricDefs.MetricDefs, "integ_run_version")
   131  		delete(metricDefs.MetricDefs, "integ_run_size")
   132  	}()
   133  
   134  	md.Metrics = metrics.MetricIntervals{
   135  		"integ_run_version": 5,
   136  		"integ_run_size":    5,
   137  	}
   138  
   139  	r := &Reaper{
   140  		Options: &cmdopts.Options{
   141  			Metrics: metrics.CmdOpts{},
   142  			Sinks:   sinks.CmdOpts{},
   143  		},
   144  		measurementCh:    make(chan metrics.MeasurementEnvelope, 20),
   145  		measurementCache: NewInstanceMetricCache(),
   146  	}
   147  	sr := NewSourceReaper(r, md)
   148  
   149  	ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger()))
   150  
   151  	done := make(chan struct{})
   152  	go func() {
   153  		sr.Run(ctx)
   154  		close(done)
   155  	}()
   156  
   157  	received := make(map[string]metrics.MeasurementEnvelope)
   158  	deadline := time.After(15 * time.Second)
   159  	for len(received) < 2 {
   160  		select {
   161  		case msg := <-r.measurementCh:
   162  			received[msg.MetricName] = msg
   163  		case <-deadline:
   164  			t.Fatal("timed out waiting for measurements")
   165  		}
   166  	}
   167  
   168  	cancel()
   169  	<-done
   170  
   171  	assert.Contains(t, received, "integ_run_version")
   172  	assert.Contains(t, received, "integ_run_size")
   173  
   174  	vMsg := received["integ_run_version"]
   175  	assert.Equal(t, "integration_test", vMsg.DBName)
   176  	assert.NotEmpty(t, vMsg.Data)
   177  	assert.Contains(t, vMsg.Data[0]["pg_version"], "PostgreSQL")
   178  
   179  	sMsg := received["integ_run_size"]
   180  	assert.Equal(t, "integration_test", sMsg.DBName)
   181  	assert.NotEmpty(t, sMsg.Data)
   182  }
   183  
   184  func TestIntegration_SourceReaper_RunExcludesMetricsByNodeStatus(t *testing.T) {
   185  	md, tearDown := setupIntegrationDB(t)
   186  	defer tearDown()
   187  
   188  	helperSetNodeStatus := func(status string) {
   189  		metricDefs.MetricDefs["test_metric"] = metrics.Metric{
   190  			SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
   191  			NodeStatus: status,
   192  		}
   193  		metricDefs.MetricDefs["server_log_event_counts"] = metrics.Metric{
   194  			SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
   195  			NodeStatus: status,
   196  		}
   197  		metricDefs.MetricDefs["psutil_cpu"] = metrics.Metric{
   198  			SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
   199  			NodeStatus: status,
   200  		}
   201  		metricDefs.MetricDefs[specialMetricInstanceUp] = metrics.Metric{
   202  			SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
   203  			NodeStatus: status,
   204  		}
   205  	}
   206  	
   207  	r := &Reaper{
   208  		Options: &cmdopts.Options{
   209  			Metrics: metrics.CmdOpts{},
   210  			Sinks:   sinks.CmdOpts{},
   211  		},
   212  		measurementCh:    make(chan metrics.MeasurementEnvelope, 10),
   213  		measurementCache: NewInstanceMetricCache(),
   214  	}
   215  
   216  	// using psutil_*, server_log_event_counts, instance_up
   217  	// to ensure specially-handled metrics have the same behaviour
   218  	md.Metrics = metrics.MetricIntervals{
   219  		"test_metric": 5,
   220  		"server_log_event_counts": 5,
   221  		"psutil_cpu": 5,
   222  		specialMetricInstanceUp: 5,
   223  	}
   224  
   225  	t.Run("primary-only/standby-only metrics get excluded when node is standby/primary", func(t *testing.T) {
   226  		states := []string{"primary", "standby"}
   227  		for _, state := range states {
   228  			ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger()))
   229  
   230  			md.IsInRecovery = true
   231  			if state == "standby" {
   232  				md.IsInRecovery = false
   233  			}
   234  
   235  			helperSetNodeStatus(state)
   236  
   237  			sr := NewSourceReaper(r, md)
   238  			go func() {
   239  				sr.Run(ctx)
   240  			}()
   241  
   242  			select {
   243  			case msg := <-r.measurementCh:
   244  				t.Errorf("Expected no measurement for primary-only metrics on standby, but got: %s", msg.MetricName)
   245  			case <-time.After(2 * time.Second):
   246  			}
   247  
   248  			cancel()
   249  		}
   250  	})
   251  
   252  	t.Run("primary-only/standby-only metrics get executed when node is primary/standby", func(t *testing.T) {
   253  		states := []string{"primary", "standby", ""} // "" => should fetch all as well
   254  		for _, state := range states {
   255  			ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger()))
   256  
   257  			md.IsInRecovery = false
   258  			if state == "standby" {
   259  				md.IsInRecovery = true
   260  			}
   261  
   262  			helperSetNodeStatus(state)
   263  
   264  			sr := NewSourceReaper(r, md)
   265  			go func() {
   266  				sr.Run(ctx)
   267  			}()
   268  
   269  			time.Sleep(2 * time.Second)
   270  			assert.GreaterOrEqual(t, len(r.measurementCh), 3)
   271  			cancel()
   272  
   273  			for range len(r.measurementCh) {
   274  				// empty channel to ensure correctness in subsequent runs
   275  				select {
   276  				case <-r.measurementCh:
   277  				default:
   278  				}
   279  			}
   280  		}
   281  	})
   282  }
   283