1 package reaper
2
3 import (
4 "context"
5 "testing"
6 "time"
7
8 "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
9 "github.com/cybertec-postgresql/pgwatch/v5/internal/db"
10 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
11 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
12 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
13 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
14 "github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
15 "github.com/stretchr/testify/assert"
16 "github.com/stretchr/testify/require"
17 )
18
19
20
21 func setupIntegrationDB(t *testing.T) (*sources.SourceConn, func()) {
22 t.Helper()
23 if testing.Short() {
24 t.Skip("skipping integration test in short mode")
25 }
26
27 pgContainer, tearDown, err := testutil.SetupPostgresContainer()
28 require.NoError(t, err, "failed to start postgres container")
29
30 connStr, err := pgContainer.ConnectionString(testutil.TestContext, "sslmode=disable")
31 require.NoError(t, err, "failed to get connection string")
32
33 pool, err := db.New(testutil.TestContext, connStr)
34 require.NoError(t, err, "failed to create connection pool")
35
36 md := sources.NewSourceConn(sources.Source{
37 Name: "integration_test",
38 Kind: sources.SourcePostgres,
39 })
40 md.Conn = pool
41 err = md.FetchRuntimeInfo(testutil.TestContext, true)
42 require.NoError(t, err, "failed to fetch runtime info")
43
44 return md, func() {
45 pool.Close()
46 tearDown()
47 }
48 }
49
50
51
52
53 func TestIntegration_ExecuteBatch(t *testing.T) {
54 md, tearDown := setupIntegrationDB(t)
55 defer tearDown()
56
57 ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
58
59 metricDefs.MetricDefs["integ_version"] = metrics.Metric{
60 SQLs: metrics.SQLs{0: "SELECT version() AS pg_version"},
61 }
62 metricDefs.MetricDefs["integ_uptime"] = metrics.Metric{
63 SQLs: metrics.SQLs{0: "SELECT extract(epoch from now() - pg_postmaster_start_time())::int8 AS uptime_seconds"},
64 }
65 defer func() {
66 delete(metricDefs.MetricDefs, "integ_version")
67 delete(metricDefs.MetricDefs, "integ_uptime")
68 }()
69
70 md.Metrics = metrics.MetricIntervals{
71 "integ_version": 30,
72 "integ_uptime": 60,
73 }
74
75 r := &Reaper{
76 Options: &cmdopts.Options{
77 Metrics: metrics.CmdOpts{},
78 Sinks: sinks.CmdOpts{},
79 },
80 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
81 measurementCache: NewInstanceMetricCache(),
82 }
83 sr := NewSourceReaper(r, md)
84
85 err := sr.executeBatch(ctx, []batchEntry{
86 {name: "integ_version", metric: metricDefs.MetricDefs["integ_version"], sql: "SELECT version() AS pg_version"},
87 {name: "integ_uptime", metric: metricDefs.MetricDefs["integ_uptime"], sql: "SELECT extract(epoch from now() - pg_postmaster_start_time())::int8 AS uptime_seconds"},
88 })
89 require.NoError(t, err)
90
91 received := make(map[string]metrics.MeasurementEnvelope)
92 for range 10 {
93 select {
94 case msg := <-r.measurementCh:
95 received[msg.MetricName] = msg
96 default:
97 }
98 }
99
100 assert.Contains(t, received, "integ_version")
101 assert.Contains(t, received, "integ_uptime")
102
103 if msg, ok := received["integ_version"]; ok {
104 assert.Equal(t, "integration_test", msg.DBName)
105 assert.NotEmpty(t, msg.Data)
106 assert.Contains(t, msg.Data[0]["pg_version"], "PostgreSQL")
107 }
108
109 if msg, ok := received["integ_uptime"]; ok {
110 assert.Equal(t, "integration_test", msg.DBName)
111 assert.NotEmpty(t, msg.Data)
112 assert.NotNil(t, msg.Data[0]["uptime_seconds"])
113 }
114 }
115
116
117
118
119 func TestIntegration_SourceReaper_RunCollectsMetrics(t *testing.T) {
120 md, tearDown := setupIntegrationDB(t)
121 defer tearDown()
122
123 metricDefs.MetricDefs["integ_run_version"] = metrics.Metric{
124 SQLs: metrics.SQLs{0: "SELECT version() AS pg_version"},
125 }
126 metricDefs.MetricDefs["integ_run_size"] = metrics.Metric{
127 SQLs: metrics.SQLs{0: "SELECT pg_database_size(current_database()) AS db_size_bytes"},
128 }
129 defer func() {
130 delete(metricDefs.MetricDefs, "integ_run_version")
131 delete(metricDefs.MetricDefs, "integ_run_size")
132 }()
133
134 md.Metrics = metrics.MetricIntervals{
135 "integ_run_version": 5,
136 "integ_run_size": 5,
137 }
138
139 r := &Reaper{
140 Options: &cmdopts.Options{
141 Metrics: metrics.CmdOpts{},
142 Sinks: sinks.CmdOpts{},
143 },
144 measurementCh: make(chan metrics.MeasurementEnvelope, 20),
145 measurementCache: NewInstanceMetricCache(),
146 }
147 sr := NewSourceReaper(r, md)
148
149 ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger()))
150
151 done := make(chan struct{})
152 go func() {
153 sr.Run(ctx)
154 close(done)
155 }()
156
157 received := make(map[string]metrics.MeasurementEnvelope)
158 deadline := time.After(15 * time.Second)
159 for len(received) < 2 {
160 select {
161 case msg := <-r.measurementCh:
162 received[msg.MetricName] = msg
163 case <-deadline:
164 t.Fatal("timed out waiting for measurements")
165 }
166 }
167
168 cancel()
169 <-done
170
171 assert.Contains(t, received, "integ_run_version")
172 assert.Contains(t, received, "integ_run_size")
173
174 vMsg := received["integ_run_version"]
175 assert.Equal(t, "integration_test", vMsg.DBName)
176 assert.NotEmpty(t, vMsg.Data)
177 assert.Contains(t, vMsg.Data[0]["pg_version"], "PostgreSQL")
178
179 sMsg := received["integ_run_size"]
180 assert.Equal(t, "integration_test", sMsg.DBName)
181 assert.NotEmpty(t, sMsg.Data)
182 }
183
184 func TestIntegration_SourceReaper_RunExcludesMetricsByNodeStatus(t *testing.T) {
185 md, tearDown := setupIntegrationDB(t)
186 defer tearDown()
187
188 helperSetNodeStatus := func(status string) {
189 metricDefs.MetricDefs["test_metric"] = metrics.Metric{
190 SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
191 NodeStatus: status,
192 }
193 metricDefs.MetricDefs["server_log_event_counts"] = metrics.Metric{
194 SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
195 NodeStatus: status,
196 }
197 metricDefs.MetricDefs["psutil_cpu"] = metrics.Metric{
198 SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
199 NodeStatus: status,
200 }
201 metricDefs.MetricDefs[specialMetricInstanceUp] = metrics.Metric{
202 SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
203 NodeStatus: status,
204 }
205 }
206
207 r := &Reaper{
208 Options: &cmdopts.Options{
209 Metrics: metrics.CmdOpts{},
210 Sinks: sinks.CmdOpts{},
211 },
212 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
213 measurementCache: NewInstanceMetricCache(),
214 }
215
216
217
218 md.Metrics = metrics.MetricIntervals{
219 "test_metric": 5,
220 "server_log_event_counts": 5,
221 "psutil_cpu": 5,
222 specialMetricInstanceUp: 5,
223 }
224
225 t.Run("primary-only/standby-only metrics get excluded when node is standby/primary", func(t *testing.T) {
226 states := []string{"primary", "standby"}
227 for _, state := range states {
228 ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger()))
229
230 md.IsInRecovery = true
231 if state == "standby" {
232 md.IsInRecovery = false
233 }
234
235 helperSetNodeStatus(state)
236
237 sr := NewSourceReaper(r, md)
238 go func() {
239 sr.Run(ctx)
240 }()
241
242 select {
243 case msg := <-r.measurementCh:
244 t.Errorf("Expected no measurement for primary-only metrics on standby, but got: %s", msg.MetricName)
245 case <-time.After(2 * time.Second):
246 }
247
248 cancel()
249 }
250 })
251
252 t.Run("primary-only/standby-only metrics get executed when node is primary/standby", func(t *testing.T) {
253 states := []string{"primary", "standby", ""}
254 for _, state := range states {
255 ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger()))
256
257 md.IsInRecovery = false
258 if state == "standby" {
259 md.IsInRecovery = true
260 }
261
262 helperSetNodeStatus(state)
263
264 sr := NewSourceReaper(r, md)
265 go func() {
266 sr.Run(ctx)
267 }()
268
269 time.Sleep(2 * time.Second)
270 assert.GreaterOrEqual(t, len(r.measurementCh), 3)
271 cancel()
272
273 for range len(r.measurementCh) {
274
275 select {
276 case <-r.measurementCh:
277 default:
278 }
279 }
280 }
281 })
282 }
283