1 package reaper
2
3 import (
4 "context"
5 "testing"
6 "time"
7
8 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
9 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
10 pgxmock "github.com/pashagolub/pgxmock/v4"
11 "github.com/stretchr/testify/assert"
12 "github.com/stretchr/testify/require"
13 )
14
15
16 func createTestSourceConn(t *testing.T) (*sources.SourceConn, pgxmock.PgxPoolIface) {
17 mock, err := pgxmock.NewPool()
18 require.NoError(t, err)
19
20 md := &sources.SourceConn{
21 Conn: mock,
22 Source: sources.Source{Name: "testdb"},
23 RuntimeInfo: sources.RuntimeInfo{
24 Version: 120000,
25 ChangeState: make(map[string]map[string]string),
26 },
27 }
28 return md, mock
29 }
30
31 func TestTryCreateMetricsFetchingHelpers(t *testing.T) {
32 ctx := context.Background()
33 mock, err := pgxmock.NewPool()
34 assert.NoError(t, err)
35 defer mock.Close()
36
37 metricDefs.MetricDefs["metric1"] = metrics.Metric{
38 InitSQL: "CREATE FUNCTION metric1",
39 }
40
41 md := &sources.SourceConn{
42 Conn: mock,
43 Source: sources.Source{
44 Name: "testdb",
45 Metrics: map[string]float64{"metric1": 42, "nonexistent": 0},
46 MetricsStandby: map[string]float64{"metric1": 42},
47 },
48 }
49
50 t.Run("success", func(t *testing.T) {
51 mock.ExpectExec("CREATE FUNCTION metric1").WillReturnResult(pgxmock.NewResult("CREATE", 1))
52
53 err = TryCreateMetricsFetchingHelpers(ctx, md)
54 assert.NoError(t, err)
55 assert.NoError(t, mock.ExpectationsWereMet())
56 })
57
58 t.Run("error on exec", func(t *testing.T) {
59 mock.ExpectExec("CREATE FUNCTION metric1").WillReturnError(assert.AnError)
60
61 err = TryCreateMetricsFetchingHelpers(ctx, md)
62 assert.Error(t, err)
63 assert.NoError(t, mock.ExpectationsWereMet())
64 })
65
66 }
67
68 func TestDetectSprocChanges(t *testing.T) {
69 ctx := context.Background()
70
71
72 metricDefs.MetricDefs["sproc_hashes"] = metrics.Metric{
73 SQLs: map[int]string{
74 120000: "SELECT",
75 },
76 }
77
78
79 md, mock := createTestSourceConn(t)
80 defer mock.Close()
81
82 reaper := &Reaper{
83 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
84 }
85
86
87 initialRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
88 AddRow("func1", "123", "hash1", time.Now().UnixNano()).
89 AddRow("func2", "456", "hash2", time.Now().UnixNano())
90 mock.ExpectQuery("SELECT").WillReturnRows(initialRows)
91
92 result := reaper.DetectSprocChanges(ctx, md)
93 assert.Equal(t, 0, result.Created)
94 assert.Equal(t, 0, result.Altered)
95 assert.Equal(t, 0, result.Dropped)
96
97
98 assert.NotEmpty(t, md.ChangeState["sproc_hashes"])
99
100
101 modifiedRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
102 AddRow("func1", "123", "new_hash", time.Now().UnixNano()).
103 AddRow("func2", "456", "hash2", time.Now().UnixNano())
104 mock.ExpectQuery("SELECT").WillReturnRows(modifiedRows)
105
106 result = reaper.DetectSprocChanges(ctx, md)
107 assert.Equal(t, 0, result.Created)
108 assert.Equal(t, 1, result.Altered)
109 assert.Equal(t, 0, result.Dropped)
110
111
112 newSprocRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
113 AddRow("func1", "123", "new_hash", time.Now().UnixNano()).
114 AddRow("func2", "456", "hash2", time.Now().UnixNano()).
115 AddRow("func3", "789", "hash3", time.Now().UnixNano())
116 mock.ExpectQuery("SELECT").WillReturnRows(newSprocRows)
117
118 result = reaper.DetectSprocChanges(ctx, md)
119 assert.Equal(t, 1, result.Created)
120 assert.Equal(t, 0, result.Altered)
121 assert.Equal(t, 0, result.Dropped)
122
123
124 select {
125 case <-reaper.measurementCh:
126
127 default:
128 t.Error("Expected measurement to be sent")
129 }
130
131
132 droppedSprocRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
133 AddRow("func1", "123", "new_hash", time.Now().UnixNano()).
134 AddRow("func3", "789", "hash3", time.Now().UnixNano())
135 mock.ExpectQuery("SELECT").WillReturnRows(droppedSprocRows)
136
137 result = reaper.DetectSprocChanges(ctx, md)
138 assert.Equal(t, 0, result.Created)
139 assert.Equal(t, 0, result.Altered)
140 assert.Equal(t, 1, result.Dropped)
141
142 assert.NoError(t, mock.ExpectationsWereMet())
143 }
144
145 func TestDetectTableChanges(t *testing.T) {
146 ctx := context.Background()
147
148
149 metricDefs.MetricDefs["table_hashes"] = metrics.Metric{
150 SQLs: map[int]string{
151 120000: "SELECT",
152 },
153 }
154
155
156 md, mock := createTestSourceConn(t)
157 defer mock.Close()
158
159 reaper := &Reaper{
160 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
161 }
162
163
164 initialRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
165 AddRow("table1", "123", "hash1", time.Now().UnixNano()).
166 AddRow("table2", "456", "hash2", time.Now().UnixNano())
167 mock.ExpectQuery("SELECT").WillReturnRows(initialRows)
168
169 result := reaper.DetectTableChanges(ctx, md)
170 assert.Equal(t, 0, result.Created)
171 assert.Equal(t, 0, result.Altered)
172 assert.Equal(t, 0, result.Dropped)
173 assert.NotEmpty(t, md.ChangeState["table_hashes"])
174
175
176 modifiedRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
177 AddRow("table1", "123", "new_hash", time.Now().UnixNano()).
178 AddRow("table2", "456", "hash2", time.Now().UnixNano())
179 mock.ExpectQuery("SELECT").WillReturnRows(modifiedRows)
180
181 result = reaper.DetectTableChanges(ctx, md)
182 assert.Equal(t, 0, result.Created)
183 assert.Equal(t, 1, result.Altered)
184 assert.Equal(t, 0, result.Dropped)
185
186
187 newTableRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
188 AddRow("table1", "123", "new_hash", time.Now().UnixNano()).
189 AddRow("table2", "456", "hash2", time.Now().UnixNano()).
190 AddRow("table3", "789", "hash3", time.Now().UnixNano())
191 mock.ExpectQuery("SELECT").WillReturnRows(newTableRows)
192
193 result = reaper.DetectTableChanges(ctx, md)
194 assert.Equal(t, 1, result.Created)
195 assert.Equal(t, 0, result.Altered)
196 assert.Equal(t, 0, result.Dropped)
197
198
199 select {
200 case msg := <-reaper.measurementCh:
201 assert.Equal(t, "table_changes", msg.MetricName)
202 assert.Equal(t, "testdb", msg.DBName)
203 default:
204 t.Error("Expected measurement to be sent")
205 }
206
207
208 droppedTableRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
209 AddRow("table1", "123", "new_hash", time.Now().UnixNano()).
210 AddRow("table3", "789", "hash3", time.Now().UnixNano())
211 mock.ExpectQuery("SELECT").WillReturnRows(droppedTableRows)
212
213 result = reaper.DetectTableChanges(ctx, md)
214 assert.Equal(t, 0, result.Created)
215 assert.Equal(t, 0, result.Altered)
216 assert.Equal(t, 1, result.Dropped)
217
218
219 _, exists := md.ChangeState["table_hashes"]["table2"]
220 assert.False(t, exists)
221
222 assert.NoError(t, mock.ExpectationsWereMet())
223 }
224
225 func TestDetectIndexChanges(t *testing.T) {
226 ctx := context.Background()
227
228
229 metricDefs.MetricDefs["index_hashes"] = metrics.Metric{
230 SQLs: map[int]string{
231 120000: "SELECT",
232 },
233 }
234
235
236 md, mock := createTestSourceConn(t)
237 defer mock.Close()
238
239 reaper := &Reaper{
240 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
241 }
242
243
244 initialRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
245 AddRow("idx1", "table1", "hash1", "t", time.Now().UnixNano()).
246 AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano())
247 mock.ExpectQuery("SELECT").WillReturnRows(initialRows)
248
249 result := reaper.DetectIndexChanges(ctx, md)
250 assert.Equal(t, 0, result.Created)
251 assert.Equal(t, 0, result.Altered)
252 assert.Equal(t, 0, result.Dropped)
253 assert.NotEmpty(t, md.ChangeState["index_hashes"])
254
255
256 modifiedRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
257 AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()).
258 AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano())
259 mock.ExpectQuery("SELECT").WillReturnRows(modifiedRows)
260
261 result = reaper.DetectIndexChanges(ctx, md)
262 assert.Equal(t, 0, result.Created)
263 assert.Equal(t, 1, result.Altered)
264 assert.Equal(t, 0, result.Dropped)
265
266
267 newIndexRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
268 AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()).
269 AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano()).
270 AddRow("idx3", "table2", "hash3", "t", time.Now().UnixNano())
271 mock.ExpectQuery("SELECT").WillReturnRows(newIndexRows)
272
273 result = reaper.DetectIndexChanges(ctx, md)
274 assert.Equal(t, 1, result.Created)
275 assert.Equal(t, 0, result.Altered)
276 assert.Equal(t, 0, result.Dropped)
277
278
279 select {
280 case msg := <-reaper.measurementCh:
281 assert.Equal(t, "index_changes", msg.MetricName)
282 assert.Equal(t, "testdb", msg.DBName)
283 default:
284 t.Error("Expected measurement to be sent")
285 }
286
287
288 droppedIndexRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
289 AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()).
290 AddRow("idx3", "table2", "hash3", "t", time.Now().UnixNano())
291 mock.ExpectQuery("SELECT").WillReturnRows(droppedIndexRows)
292
293 result = reaper.DetectIndexChanges(ctx, md)
294 assert.Equal(t, 0, result.Created)
295 assert.Equal(t, 0, result.Altered)
296 assert.Equal(t, 1, result.Dropped)
297
298
299 _, exists := md.ChangeState["index_hashes"]["idx2"]
300 assert.False(t, exists)
301
302 assert.NoError(t, mock.ExpectationsWereMet())
303 }
304
305 func TestDetectPrivilegeChanges(t *testing.T) {
306 ctx := context.Background()
307
308
309 metricDefs.MetricDefs["privilege_changes"] = metrics.Metric{
310 SQLs: map[int]string{
311 120000: "SELECT",
312 },
313 }
314
315
316 md, mock := createTestSourceConn(t)
317 defer mock.Close()
318
319 reaper := &Reaper{
320 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
321 }
322
323
324 initialRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}).
325 AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()).
326 AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano())
327 mock.ExpectQuery("SELECT").WillReturnRows(initialRows)
328
329 result := reaper.DetectPrivilegeChanges(ctx, md)
330 assert.Equal(t, 0, result.Created)
331 assert.Equal(t, 0, result.Altered)
332 assert.Equal(t, 0, result.Dropped)
333 assert.NotEmpty(t, md.ChangeState["object_privileges"])
334
335
336 newPrivilegeRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}).
337 AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()).
338 AddRow("table", "user1", "table1", "INSERT", time.Now().UnixNano()).
339 AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano())
340 mock.ExpectQuery("SELECT").WillReturnRows(newPrivilegeRows)
341
342 result = reaper.DetectPrivilegeChanges(ctx, md)
343 assert.Equal(t, 1, result.Created)
344 assert.Equal(t, 0, result.Altered)
345 assert.Equal(t, 0, result.Dropped)
346
347
348 select {
349 case msg := <-reaper.measurementCh:
350 assert.Equal(t, "privilege_changes", msg.MetricName)
351 assert.Equal(t, "testdb", msg.DBName)
352 default:
353 t.Error("Expected measurement to be sent")
354 }
355
356
357 revokedPrivilegeRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}).
358 AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()).
359 AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano())
360 mock.ExpectQuery("SELECT").WillReturnRows(revokedPrivilegeRows)
361
362 result = reaper.DetectPrivilegeChanges(ctx, md)
363 assert.Equal(t, 0, result.Created)
364 assert.Equal(t, 0, result.Altered)
365 assert.Equal(t, 1, result.Dropped)
366
367
368 _, exists := md.ChangeState["object_privileges"]["table#:#user1#:#table1#:#INSERT"]
369 assert.False(t, exists)
370
371 assert.NoError(t, mock.ExpectationsWereMet())
372 }
373
374 func TestDetectConfigurationChanges(t *testing.T) {
375 ctx := context.Background()
376
377
378 metricDefs.MetricDefs["configuration_hashes"] = metrics.Metric{
379 SQLs: map[int]string{
380 120000: "SELECT",
381 },
382 }
383
384
385 mock, err := pgxmock.NewPool()
386 require.NoError(t, err)
387 defer mock.Close()
388
389 md := &sources.SourceConn{
390 Conn: mock,
391 Source: sources.Source{Name: "testdb"},
392 RuntimeInfo: sources.RuntimeInfo{
393 Version: 120000,
394 ChangeState: make(map[string]map[string]string),
395 },
396 }
397
398 reaper := &Reaper{
399 measurementCh: make(chan metrics.MeasurementEnvelope, 10),
400 }
401
402
403 initialRows := pgxmock.NewRows([]string{"epoch_ns", "setting", "value"}).
404 AddRow(time.Now().UnixNano(), "max_connections", "100").
405 AddRow(time.Now().UnixNano(), "shared_buffers", "128MB")
406 mock.ExpectQuery("SELECT").WillReturnRows(initialRows)
407
408 result := reaper.DetectConfigurationChanges(ctx, md)
409 assert.Equal(t, 0, result.Created)
410 assert.Equal(t, 0, result.Altered)
411 assert.Equal(t, 0, result.Dropped)
412 assert.NotEmpty(t, md.ChangeState["configuration_hashes"])
413
414
415 newSettingRows := pgxmock.NewRows([]string{"epoch_ns", "setting", "value"}).
416 AddRow(time.Now().UnixNano(), "max_connections", "100").
417 AddRow(time.Now().UnixNano(), "shared_buffers", "128MB").
418 AddRow(time.Now().UnixNano(), "work_mem", "4MB")
419 mock.ExpectQuery("SELECT").WillReturnRows(newSettingRows)
420
421 result = reaper.DetectConfigurationChanges(ctx, md)
422 assert.Equal(t, 1, result.Created)
423 assert.Equal(t, 0, result.Altered)
424 assert.Equal(t, 0, result.Dropped)
425
426
427 select {
428 case msg := <-reaper.measurementCh:
429 assert.Equal(t, "configuration_changes", msg.MetricName)
430 assert.Equal(t, "testdb", msg.DBName)
431 default:
432 t.Error("Expected measurement to be sent")
433 }
434
435
436 changedValueRows := pgxmock.NewRows([]string{"epoch_ns", "setting", "value"}).
437 AddRow(time.Now().UnixNano(), "max_connections", "200").
438 AddRow(time.Now().UnixNano(), "shared_buffers", "256MB").
439 AddRow(time.Now().UnixNano(), "work_mem", "4MB")
440 mock.ExpectQuery("SELECT").WillReturnRows(changedValueRows)
441
442 result = reaper.DetectConfigurationChanges(ctx, md)
443 assert.Equal(t, 0, result.Created)
444 assert.Equal(t, 2, result.Altered)
445 assert.Equal(t, 0, result.Dropped)
446
447
448 assert.Equal(t, "200", md.ChangeState["configuration_hashes"]["max_connections"])
449 assert.Equal(t, "256MB", md.ChangeState["configuration_hashes"]["shared_buffers"])
450
451 assert.NoError(t, mock.ExpectationsWereMet())
452 }
453
454 func TestGetInstanceUpMeasurement(t *testing.T) {
455 ctx := context.Background()
456 reaper := &Reaper{}
457
458 testCases := []struct {
459 name string
460 pingError error
461 expectedUpValue int
462 }{
463 {
464 name: "connection is up",
465 pingError: nil,
466 expectedUpValue: 1,
467 },
468 {
469 name: "connection is down",
470 pingError: assert.AnError,
471 expectedUpValue: 0,
472 },
473 {
474 name: "connection timeout",
475 pingError: context.DeadlineExceeded,
476 expectedUpValue: 0,
477 },
478 }
479
480 for _, tc := range testCases {
481 t.Run(tc.name, func(t *testing.T) {
482 md, mock := createTestSourceConn(t)
483 defer mock.Close()
484
485
486 if tc.pingError == nil {
487 mock.ExpectPing()
488 } else {
489 mock.ExpectPing().WillReturnError(tc.pingError)
490 }
491
492 measurements, err := reaper.GetInstanceUpMeasurement(ctx, md)
493
494
495 assert.NoError(t, err)
496 require.NotNil(t, measurements)
497 require.Len(t, measurements, 1)
498
499
500 measurement := measurements[0]
501 assert.Contains(t, measurement, "instance_up")
502 assert.Equal(t, tc.expectedUpValue, measurement["instance_up"])
503
504
505 assert.Contains(t, measurement, metrics.EpochColumnName)
506 assert.Greater(t, measurement[metrics.EpochColumnName].(int64), int64(0))
507 assert.LessOrEqual(t, measurement[metrics.EpochColumnName].(int64), time.Now().UnixNano())
508
509 assert.NoError(t, mock.ExpectationsWereMet())
510 })
511 }
512 }
513