1 package reaper
2
3 import (
4 "context"
5 "os"
6 "path/filepath"
7 "testing"
8
9 "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
10 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
11 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
12 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
13 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
14 "github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
15 "github.com/pashagolub/pgxmock/v4"
16 "github.com/stretchr/testify/assert"
17 "github.com/stretchr/testify/require"
18 )
19
20 func TestReaper_LoadSources(t *testing.T) {
21 ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
22
23 t.Run("Test pause trigger file", func(t *testing.T) {
24 pausefile := filepath.Join(t.TempDir(), "pausefile")
25 require.NoError(t, os.WriteFile(pausefile, []byte("foo"), 0644))
26 r := NewReaper(ctx, &cmdopts.Options{Metrics: metrics.CmdOpts{EmergencyPauseTriggerfile: pausefile}})
27 assert.NoError(t, r.LoadSources(ctx))
28 assert.True(t, len(r.monitoredSources) == 0, "Expected no monitored sources when pause trigger file exists")
29 })
30
31 t.Run("Test SyncFromReader errror", func(t *testing.T) {
32 reader := &testutil.MockSourcesReaderWriter{
33 GetSourcesFunc: func() (sources.Sources, error) {
34 return nil, assert.AnError
35 },
36 }
37 r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader})
38 assert.Error(t, r.LoadSources(ctx))
39 assert.Equal(t, 0, len(r.monitoredSources), "Expected no monitored sources after error")
40 })
41
42 t.Run("Test SyncFromReader success", func(t *testing.T) {
43 source1 := sources.Source{Name: "Source 1", IsEnabled: true, Kind: sources.SourcePostgres}
44 source2 := sources.Source{Name: "Source 2", IsEnabled: true, Kind: sources.SourcePostgres}
45 reader := &testutil.MockSourcesReaderWriter{
46 GetSourcesFunc: func() (sources.Sources, error) {
47 return sources.Sources{source1, source2}, nil
48 },
49 }
50
51 r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader})
52 assert.NoError(t, r.LoadSources(ctx))
53 assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored sources after successful load")
54 assert.NotNil(t, r.monitoredSources.GetMonitoredDatabase(source1.Name))
55 assert.NotNil(t, r.monitoredSources.GetMonitoredDatabase(source2.Name))
56 })
57
58 t.Run("Test repeated load", func(t *testing.T) {
59 source1 := sources.Source{Name: "Source 1", IsEnabled: true, Kind: sources.SourcePostgres}
60 source2 := sources.Source{Name: "Source 2", IsEnabled: true, Kind: sources.SourcePostgres}
61 reader := &testutil.MockSourcesReaderWriter{
62 GetSourcesFunc: func() (sources.Sources, error) {
63 return sources.Sources{source1, source2}, nil
64 },
65 }
66
67 r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader})
68 assert.NoError(t, r.LoadSources(ctx))
69 assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored sources after first load")
70
71
72 assert.NoError(t, r.LoadSources(ctx))
73 assert.Equal(t, 2, len(r.monitoredSources), "Expected still two monitored sources after second load")
74 })
75
76 t.Run("Test group limited sources", func(t *testing.T) {
77 source1 := sources.Source{Name: "Source 1", IsEnabled: true, Kind: sources.SourcePostgres, Group: ""}
78 source2 := sources.Source{Name: "Source 2", IsEnabled: true, Kind: sources.SourcePostgres, Group: "group1"}
79 source3 := sources.Source{Name: "Source 3", IsEnabled: true, Kind: sources.SourcePostgres, Group: "group1"}
80 source4 := sources.Source{Name: "Source 4", IsEnabled: true, Kind: sources.SourcePostgres, Group: "group2"}
81 source5 := sources.Source{Name: "Source 5", IsEnabled: true, Kind: sources.SourcePostgres, Group: "default"}
82 newReader := &testutil.MockSourcesReaderWriter{
83 GetSourcesFunc: func() (sources.Sources, error) {
84 return sources.Sources{source1, source2, source3, source4, source5}, nil
85 },
86 }
87
88 r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader, Sources: sources.CmdOpts{Groups: []string{"group1", "group2"}}})
89 assert.NoError(t, r.LoadSources(ctx))
90 assert.Equal(t, 3, len(r.monitoredSources), "Expected three monitored sources after load")
91
92 r = NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader, Sources: sources.CmdOpts{Groups: []string{"group1"}}})
93 assert.NoError(t, r.LoadSources(ctx))
94 assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored source after group filtering")
95
96 r = NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader})
97 assert.NoError(t, r.LoadSources(ctx))
98 assert.Equal(t, 5, len(r.monitoredSources), "Expected five monitored sources after resetting groups")
99 })
100
101 t.Run("Test source config changes trigger restart", func(t *testing.T) {
102 baseSource := sources.Source{
103 Name: "TestSource",
104 IsEnabled: true,
105 Kind: sources.SourcePostgres,
106 ConnStr: "postgres://localhost:5432/testdb",
107 Metrics: map[string]float64{"cpu": 10, "memory": 20},
108 MetricsStandby: map[string]float64{"cpu": 30},
109 CustomTags: map[string]string{"env": "test"},
110 Group: "default",
111 }
112
113 testCases := []struct {
114 name string
115 modifySource func(s *sources.Source)
116 expectCancel bool
117 }{
118 {
119 name: "custom tags change",
120 modifySource: func(s *sources.Source) {
121 s.CustomTags = map[string]string{"env": "production"}
122 },
123 expectCancel: true,
124 },
125 {
126 name: "custom tags add new tag",
127 modifySource: func(s *sources.Source) {
128 s.CustomTags = map[string]string{"env": "test", "region": "us-east"}
129 },
130 expectCancel: true,
131 },
132 {
133 name: "custom tags remove tag",
134 modifySource: func(s *sources.Source) {
135 s.CustomTags = map[string]string{}
136 },
137 expectCancel: true,
138 },
139 {
140 name: "preset metrics change",
141 modifySource: func(s *sources.Source) {
142 s.PresetMetrics = "exhaustive"
143 },
144 expectCancel: true,
145 },
146 {
147 name: "preset standby metrics change",
148 modifySource: func(s *sources.Source) {
149 s.PresetMetricsStandby = "standby-preset"
150 },
151 expectCancel: true,
152 },
153 {
154 name: "connection string change",
155 modifySource: func(s *sources.Source) {
156 s.ConnStr = "postgres://localhost:5433/newdb"
157 },
158 expectCancel: true,
159 },
160 {
161 name: "custom metrics change interval",
162 modifySource: func(s *sources.Source) {
163 s.Metrics = map[string]float64{"cpu": 15, "memory": 20}
164 },
165 expectCancel: true,
166 },
167 {
168 name: "custom metrics add new metric",
169 modifySource: func(s *sources.Source) {
170 s.Metrics = map[string]float64{"cpu": 10, "memory": 20, "disk": 30}
171 },
172 expectCancel: true,
173 },
174 {
175 name: "custom metrics remove metric",
176 modifySource: func(s *sources.Source) {
177 s.Metrics = map[string]float64{"cpu": 10}
178 },
179 expectCancel: true,
180 },
181 {
182 name: "standby metrics change",
183 modifySource: func(s *sources.Source) {
184 s.MetricsStandby = map[string]float64{"cpu": 60}
185 },
186 expectCancel: true,
187 },
188 {
189 name: "group change",
190 modifySource: func(s *sources.Source) {
191 s.Group = "new-group"
192 },
193 expectCancel: true,
194 },
195 {
196 name: "kind change",
197 modifySource: func(s *sources.Source) {
198 s.Kind = sources.SourcePgBouncer
199 },
200 expectCancel: true,
201 },
202 {
203 name: "only if master change",
204 modifySource: func(s *sources.Source) {
205 s.OnlyIfMaster = true
206 },
207 expectCancel: true,
208 },
209 {
210 name: "no change - same config",
211 modifySource: func(_ *sources.Source) {
212
213 },
214 expectCancel: false,
215 },
216 }
217
218 for _, tc := range testCases {
219 t.Run(tc.name, func(t *testing.T) {
220 initialSource := *baseSource.Clone()
221 initialReader := &testutil.MockSourcesReaderWriter{
222 GetSourcesFunc: func() (sources.Sources, error) {
223 return sources.Sources{initialSource}, nil
224 },
225 }
226
227 r := NewReaper(ctx, &cmdopts.Options{
228 SourcesReaderWriter: initialReader,
229 SinksWriter: &sinks.MultiWriter{},
230 })
231 assert.NoError(t, r.LoadSources(ctx))
232 assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after initial load")
233
234 mockConn, err := pgxmock.NewPool()
235 require.NoError(t, err)
236 mockConn.ExpectClose()
237 r.monitoredSources[0].Conn = mockConn
238
239
240 cancelCalled := make(map[string]bool)
241 for metric := range initialSource.Metrics {
242 dbMetric := initialSource.Name + "¤¤¤" + metric
243 r.cancelFuncs[dbMetric] = func() {
244 cancelCalled[dbMetric] = true
245 }
246 }
247
248
249 modifiedSource := *baseSource.Clone()
250 tc.modifySource(&modifiedSource)
251
252 modifiedReader := &testutil.MockSourcesReaderWriter{
253 GetSourcesFunc: func() (sources.Sources, error) {
254 return sources.Sources{modifiedSource}, nil
255 },
256 }
257 r.SourcesReaderWriter = modifiedReader
258
259
260 assert.NoError(t, r.LoadSources(ctx))
261 assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after reload")
262 assert.Equal(t, modifiedSource, r.monitoredSources[0].Source)
263
264 for metric := range initialSource.Metrics {
265 dbMetric := initialSource.Name + "¤¤¤" + metric
266 assert.Equal(t, tc.expectCancel, cancelCalled[dbMetric])
267 if tc.expectCancel {
268 assert.Nil(t, mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met")
269 _, exists := r.cancelFuncs[dbMetric]
270 assert.False(t, exists, "Expected cancel func to be removed from map after cancellation")
271 }
272 }
273 })
274 }
275 })
276
277 t.Run("Test only changed source cancelled in multi-source setup", func(t *testing.T) {
278 source1 := sources.Source{
279 Name: "Source1",
280 IsEnabled: true,
281 Kind: sources.SourcePostgres,
282 ConnStr: "postgres://localhost:5432/db1",
283 Metrics: map[string]float64{"cpu": 10},
284 }
285 source2 := sources.Source{
286 Name: "Source2",
287 IsEnabled: true,
288 Kind: sources.SourcePostgres,
289 ConnStr: "postgres://localhost:5432/db2",
290 Metrics: map[string]float64{"memory": 20},
291 }
292
293 initialReader := &testutil.MockSourcesReaderWriter{
294 GetSourcesFunc: func() (sources.Sources, error) {
295 return sources.Sources{source1, source2}, nil
296 },
297 }
298
299 r := NewReaper(ctx, &cmdopts.Options{
300 SourcesReaderWriter: initialReader,
301 SinksWriter: &sinks.MultiWriter{},
302 })
303 assert.NoError(t, r.LoadSources(ctx))
304
305
306 mockConn1, err := pgxmock.NewPool()
307 require.NoError(t, err)
308 mockConn1.ExpectClose()
309 r.monitoredSources[0].Conn = mockConn1
310
311 source1Cancelled := false
312 source2Cancelled := false
313 r.cancelFuncs[source1.Name+"¤¤¤"+"cpu"] = func() { source1Cancelled = true }
314 r.cancelFuncs[source2.Name+"¤¤¤"+"memory"] = func() { source2Cancelled = true }
315
316
317 modifiedSource1 := *source1.Clone()
318 modifiedSource1.ConnStr = "postgres://localhost:5433/db1_new"
319
320 modifiedReader := &testutil.MockSourcesReaderWriter{
321 GetSourcesFunc: func() (sources.Sources, error) {
322 return sources.Sources{modifiedSource1, source2}, nil
323 },
324 }
325 r.SourcesReaderWriter = modifiedReader
326
327 assert.NoError(t, r.LoadSources(ctx))
328
329 assert.True(t, source1Cancelled, "Source1 should be cancelled due to config change")
330 assert.False(t, source2Cancelled, "Source2 should NOT be cancelled as it was not modified")
331 assert.Nil(t, mockConn1.ExpectationsWereMet(), "Expected all mock expectations to be met")
332 })
333 }
334