...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/reaper_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"os"
     6  	"path/filepath"
     7  	"testing"
     8  
     9  	"github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
    10  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    11  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    12  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
    13  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    14  	"github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
    15    "github.com/pashagolub/pgxmock/v4"
    16  	"github.com/stretchr/testify/assert"
    17  	"github.com/stretchr/testify/require"
    18  )
    19  
    20  func TestReaper_LoadSources(t *testing.T) {
    21  	ctx := log.WithLogger(context.Background(), log.NewNoopLogger())
    22  
    23  	t.Run("Test pause trigger file", func(t *testing.T) {
    24  		pausefile := filepath.Join(t.TempDir(), "pausefile")
    25  		require.NoError(t, os.WriteFile(pausefile, []byte("foo"), 0644))
    26  		r := NewReaper(ctx, &cmdopts.Options{Metrics: metrics.CmdOpts{EmergencyPauseTriggerfile: pausefile}})
    27  		assert.NoError(t, r.LoadSources(ctx))
    28  		assert.True(t, len(r.monitoredSources) == 0, "Expected no monitored sources when pause trigger file exists")
    29  	})
    30  
    31  	t.Run("Test SyncFromReader errror", func(t *testing.T) {
    32  		reader := &testutil.MockSourcesReaderWriter{
    33  			GetSourcesFunc: func() (sources.Sources, error) {
    34  				return nil, assert.AnError
    35  			},
    36  		}
    37  		r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader})
    38  		assert.Error(t, r.LoadSources(ctx))
    39  		assert.Equal(t, 0, len(r.monitoredSources), "Expected no monitored sources after error")
    40  	})
    41  
    42  	t.Run("Test SyncFromReader success", func(t *testing.T) {
    43  		source1 := sources.Source{Name: "Source 1", IsEnabled: true, Kind: sources.SourcePostgres}
    44  		source2 := sources.Source{Name: "Source 2", IsEnabled: true, Kind: sources.SourcePostgres}
    45  		reader := &testutil.MockSourcesReaderWriter{
    46  			GetSourcesFunc: func() (sources.Sources, error) {
    47  				return sources.Sources{source1, source2}, nil
    48  			},
    49  		}
    50  
    51  		r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader})
    52  		assert.NoError(t, r.LoadSources(ctx))
    53  		assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored sources after successful load")
    54  		assert.NotNil(t, r.monitoredSources.GetMonitoredDatabase(source1.Name))
    55  		assert.NotNil(t, r.monitoredSources.GetMonitoredDatabase(source2.Name))
    56  	})
    57  
    58  	t.Run("Test repeated load", func(t *testing.T) {
    59  		source1 := sources.Source{Name: "Source 1", IsEnabled: true, Kind: sources.SourcePostgres}
    60  		source2 := sources.Source{Name: "Source 2", IsEnabled: true, Kind: sources.SourcePostgres}
    61  		reader := &testutil.MockSourcesReaderWriter{
    62  			GetSourcesFunc: func() (sources.Sources, error) {
    63  				return sources.Sources{source1, source2}, nil
    64  			},
    65  		}
    66  
    67  		r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader})
    68  		assert.NoError(t, r.LoadSources(ctx))
    69  		assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored sources after first load")
    70  
    71  		// Load again with the same sources
    72  		assert.NoError(t, r.LoadSources(ctx))
    73  		assert.Equal(t, 2, len(r.monitoredSources), "Expected still two monitored sources after second load")
    74  	})
    75  
    76  	t.Run("Test group limited sources", func(t *testing.T) {
    77  		source1 := sources.Source{Name: "Source 1", IsEnabled: true, Kind: sources.SourcePostgres, Group: ""}
    78  		source2 := sources.Source{Name: "Source 2", IsEnabled: true, Kind: sources.SourcePostgres, Group: "group1"}
    79  		source3 := sources.Source{Name: "Source 3", IsEnabled: true, Kind: sources.SourcePostgres, Group: "group1"}
    80  		source4 := sources.Source{Name: "Source 4", IsEnabled: true, Kind: sources.SourcePostgres, Group: "group2"}
    81  		source5 := sources.Source{Name: "Source 5", IsEnabled: true, Kind: sources.SourcePostgres, Group: "default"}
    82  		newReader := &testutil.MockSourcesReaderWriter{
    83  			GetSourcesFunc: func() (sources.Sources, error) {
    84  				return sources.Sources{source1, source2, source3, source4, source5}, nil
    85  			},
    86  		}
    87  
    88  		r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader, Sources: sources.CmdOpts{Groups: []string{"group1", "group2"}}})
    89  		assert.NoError(t, r.LoadSources(ctx))
    90  		assert.Equal(t, 3, len(r.monitoredSources), "Expected three monitored sources after load")
    91  
    92  		r = NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader, Sources: sources.CmdOpts{Groups: []string{"group1"}}})
    93  		assert.NoError(t, r.LoadSources(ctx))
    94  		assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored source after group filtering")
    95  
    96  		r = NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader})
    97  		assert.NoError(t, r.LoadSources(ctx))
    98  		assert.Equal(t, 5, len(r.monitoredSources), "Expected five monitored sources after resetting groups")
    99  	})
   100  
   101  	t.Run("Test source config changes trigger restart", func(t *testing.T) {
   102  		baseSource := sources.Source{
   103  			Name:                 "TestSource",
   104  			IsEnabled:            true,
   105  			Kind:                 sources.SourcePostgres,
   106  			ConnStr:              "postgres://localhost:5432/testdb",
   107  			Metrics:              map[string]float64{"cpu": 10, "memory": 20},
   108  			MetricsStandby:       map[string]float64{"cpu": 30},
   109  			CustomTags:           map[string]string{"env": "test"},
   110  			Group:                "default",
   111  		}
   112  
   113  		testCases := []struct {
   114  			name         string
   115  			modifySource func(s *sources.Source)
   116  			expectCancel bool
   117  		}{
   118  			{
   119  				name: "custom tags change",
   120  				modifySource: func(s *sources.Source) {
   121  					s.CustomTags = map[string]string{"env": "production"}
   122  				},
   123  				expectCancel: true,
   124  			},
   125  			{
   126  				name: "custom tags add new tag",
   127  				modifySource: func(s *sources.Source) {
   128  					s.CustomTags = map[string]string{"env": "test", "region": "us-east"}
   129  				},
   130  				expectCancel: true,
   131  			},
   132  			{
   133  				name: "custom tags remove tag",
   134  				modifySource: func(s *sources.Source) {
   135  					s.CustomTags = map[string]string{}
   136  				},
   137  				expectCancel: true,
   138  			},
   139  			{
   140  				name: "preset metrics change",
   141  				modifySource: func(s *sources.Source) {
   142  					s.PresetMetrics = "exhaustive"
   143  				},
   144  				expectCancel: true,
   145  			},
   146  			{
   147  				name: "preset standby metrics change",
   148  				modifySource: func(s *sources.Source) {
   149  					s.PresetMetricsStandby = "standby-preset"
   150  				},
   151  				expectCancel: true,
   152  			},
   153  			{
   154  				name: "connection string change",
   155  				modifySource: func(s *sources.Source) {
   156  					s.ConnStr = "postgres://localhost:5433/newdb"
   157  				},
   158  				expectCancel: true,
   159  			},
   160  			{
   161  				name: "custom metrics change interval",
   162  				modifySource: func(s *sources.Source) {
   163  					s.Metrics = map[string]float64{"cpu": 15, "memory": 20}
   164  				},
   165  				expectCancel: true,
   166  			},
   167  			{
   168  				name: "custom metrics add new metric",
   169  				modifySource: func(s *sources.Source) {
   170  					s.Metrics = map[string]float64{"cpu": 10, "memory": 20, "disk": 30}
   171  				},
   172  				expectCancel: true,
   173  			},
   174  			{
   175  				name: "custom metrics remove metric",
   176  				modifySource: func(s *sources.Source) {
   177  					s.Metrics = map[string]float64{"cpu": 10}
   178  				},
   179  				expectCancel: true,
   180  			},
   181  			{
   182  				name: "standby metrics change",
   183  				modifySource: func(s *sources.Source) {
   184  					s.MetricsStandby = map[string]float64{"cpu": 60}
   185  				},
   186  				expectCancel: true,
   187  			},
   188  			{
   189  				name: "group change",
   190  				modifySource: func(s *sources.Source) {
   191  					s.Group = "new-group"
   192  				},
   193  				expectCancel: true,
   194  			},
   195  			{
   196  				name: "kind change",
   197  				modifySource: func(s *sources.Source) {
   198  					s.Kind = sources.SourcePgBouncer
   199  				},
   200  				expectCancel: true,
   201  			},
   202  			{
   203  				name: "only if master change",
   204  				modifySource: func(s *sources.Source) {
   205  					s.OnlyIfMaster = true
   206  				},
   207  				expectCancel: true,
   208  			},
   209  			{
   210  				name: "no change - same config",
   211  				modifySource: func(_ *sources.Source) {
   212  					// No modifications - source stays the same
   213  				},
   214  				expectCancel: false,
   215  			},
   216  		}
   217  
   218  		for _, tc := range testCases {
   219  			t.Run(tc.name, func(t *testing.T) {
   220  				initialSource := *baseSource.Clone()
   221  				initialReader := &testutil.MockSourcesReaderWriter{
   222  					GetSourcesFunc: func() (sources.Sources, error) {
   223  						return sources.Sources{initialSource}, nil
   224  					},
   225  				}
   226  
   227  				r := NewReaper(ctx, &cmdopts.Options{
   228  					SourcesReaderWriter: initialReader,
   229  					SinksWriter:         &sinks.MultiWriter{},
   230  				})
   231  				assert.NoError(t, r.LoadSources(ctx))
   232  				assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after initial load")
   233  
   234  				mockConn, err := pgxmock.NewPool()
   235  				require.NoError(t, err)
   236  				mockConn.ExpectClose()
   237  				r.monitoredSources[0].Conn = mockConn
   238  
   239  				// Add a mock cancel function for a metric gatherer
   240  				cancelCalled := make(map[string]bool)
   241  				for metric := range initialSource.Metrics {
   242  					dbMetric := initialSource.Name + "¤¤¤" + metric
   243  					r.cancelFuncs[dbMetric] = func() {
   244  						cancelCalled[dbMetric] = true
   245  					}
   246  				}
   247  
   248  				// Create modified source
   249  				modifiedSource := *baseSource.Clone()
   250  				tc.modifySource(&modifiedSource)
   251  
   252  				modifiedReader := &testutil.MockSourcesReaderWriter{
   253  					GetSourcesFunc: func() (sources.Sources, error) {
   254  						return sources.Sources{modifiedSource}, nil
   255  					},
   256  				}
   257  				r.SourcesReaderWriter = modifiedReader
   258  
   259  				// Reload sources
   260  				assert.NoError(t, r.LoadSources(ctx))
   261  				assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after reload")
   262  				assert.Equal(t, modifiedSource, r.monitoredSources[0].Source)
   263  
   264  				for metric := range initialSource.Metrics {
   265  					dbMetric := initialSource.Name + "¤¤¤" + metric
   266  					assert.Equal(t, tc.expectCancel, cancelCalled[dbMetric])
   267  					if tc.expectCancel {
   268  						assert.Nil(t, mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met")
   269  						_, exists := r.cancelFuncs[dbMetric]
   270  						assert.False(t, exists, "Expected cancel func to be removed from map after cancellation")
   271  					}
   272  				}
   273  			})
   274  		}
   275  	})
   276  
   277  	t.Run("Test only changed source cancelled in multi-source setup", func(t *testing.T) {
   278  		source1 := sources.Source{
   279  			Name:      "Source1",
   280  			IsEnabled: true,
   281  			Kind:      sources.SourcePostgres,
   282  			ConnStr:   "postgres://localhost:5432/db1",
   283  			Metrics:   map[string]float64{"cpu": 10},
   284  		}
   285  		source2 := sources.Source{
   286  			Name:      "Source2",
   287  			IsEnabled: true,
   288  			Kind:      sources.SourcePostgres,
   289  			ConnStr:   "postgres://localhost:5432/db2",
   290  			Metrics:   map[string]float64{"memory": 20},
   291  		}
   292  
   293  		initialReader := &testutil.MockSourcesReaderWriter{
   294  			GetSourcesFunc: func() (sources.Sources, error) {
   295  				return sources.Sources{source1, source2}, nil
   296  			},
   297  		}
   298  
   299  		r := NewReaper(ctx, &cmdopts.Options{
   300  			SourcesReaderWriter: initialReader,
   301  			SinksWriter:         &sinks.MultiWriter{},
   302  		})
   303  		assert.NoError(t, r.LoadSources(ctx))
   304  
   305  		// Set mock connections for both sources to avoid nil pointer on Close()
   306  		mockConn1, err := pgxmock.NewPool()
   307  		require.NoError(t, err)
   308  		mockConn1.ExpectClose()
   309  		r.monitoredSources[0].Conn = mockConn1
   310  
   311  		source1Cancelled := false
   312  		source2Cancelled := false
   313  		r.cancelFuncs[source1.Name+"¤¤¤"+"cpu"] = func() { source1Cancelled = true }
   314  		r.cancelFuncs[source2.Name+"¤¤¤"+"memory"] = func() { source2Cancelled = true }
   315  
   316  		// Only modify source1
   317  		modifiedSource1 := *source1.Clone()
   318  		modifiedSource1.ConnStr = "postgres://localhost:5433/db1_new"
   319  
   320  		modifiedReader := &testutil.MockSourcesReaderWriter{
   321  			GetSourcesFunc: func() (sources.Sources, error) {
   322  				return sources.Sources{modifiedSource1, source2}, nil
   323  			},
   324  		}
   325  		r.SourcesReaderWriter = modifiedReader
   326  
   327  		assert.NoError(t, r.LoadSources(ctx))
   328  
   329  		assert.True(t, source1Cancelled, "Source1 should be cancelled due to config change")
   330  		assert.False(t, source2Cancelled, "Source2 should NOT be cancelled as it was not modified")
   331  		assert.Nil(t, mockConn1.ExpectationsWereMet(), "Expected all mock expectations to be met")
   332  	})
   333  }
   334