...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sources/resolver_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sources

     1  package sources_test
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"strings"
     7  	"testing"
     8  	"time"
     9  
    10  	"github.com/stretchr/testify/assert"
    11  	"github.com/stretchr/testify/require"
    12  	client "go.etcd.io/etcd/client/v3"
    13  
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/testutil"
    16  )
    17  
    18  func TestMonitoredDatabase_ResolveDatabasesFromPostgres(t *testing.T) {
    19  	pgContainer, pgTeardown, err := testutil.SetupPostgresContainer()
    20  	require.NoError(t, err)
    21  	defer pgTeardown()
    22  
    23  	// Create a new MonitoredDatabase instance
    24  	md := sources.Source{}
    25  	md.Name = "continuous"
    26  	md.Kind = sources.SourcePostgresContinuous
    27  	md.ConnStr, err = pgContainer.ConnectionString(ctx, "sslmode=disable")
    28  	assert.NoError(t, err)
    29  
    30  	// Call the ResolveDatabasesFromPostgres method
    31  	dbs, err := md.ResolveDatabases()
    32  	assert.NoError(t, err)
    33  	assert.True(t, len(dbs) == 2) //postgres and mydatabase
    34  
    35  	// check the "continuous_mydatabase"
    36  	db := dbs.GetMonitoredDatabase(md.Name + "_mydatabase")
    37  	assert.NotNil(t, db)
    38  	assert.Equal(t, "mydatabase", db.GetDatabaseName())
    39  
    40  	//check unexpected database
    41  	db = dbs.GetMonitoredDatabase(md.Name + "_unexpected")
    42  	assert.Nil(t, db)
    43  }
    44  
    45  func TestMonitoredDatabase_ResolveDatabasesFromPatroni(t *testing.T) {
    46  	etcdContainer, etcdTeardown, err := testutil.SetupEtcdContainer()
    47  	require.NoError(t, err)
    48  	defer etcdTeardown()
    49  
    50  	endpoint, err := etcdContainer.ClientEndpoint(ctx)
    51  	require.NoError(t, err)
    52  
    53  	cli, err := client.New(client.Config{
    54  		Endpoints:   []string{endpoint},
    55  		DialTimeout: 10 * time.Second,
    56  	})
    57  	require.NoError(t, err, "failed to create etcd client")
    58  	defer cli.Close()
    59  
    60  	// Start postgres server for testing
    61  	pgContainer, pgTeardown, err := testutil.SetupPostgresContainerWithInitScripts("../../docker/bootstrap/create_role_db.sql")
    62  	require.NoError(t, err)
    63  	defer pgTeardown()
    64  
    65  	pgConnStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
    66  	require.NoError(t, err)
    67  	// Put values to etcd server
    68  	kv := map[string]string{
    69  		`/service/demo/config`:           `{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":1048576,"postgresql":{"use_pg_rewind":true,"pg_hba":["local all all trust","host replication replicator all md5","host all all all md5"],"parameters":{"max_connections":100}}}`,
    70  		`/service/demo/initialize`:       `7553211779477532695`,
    71  		`/service/demo/leader`:           `patroni3`,
    72  		`/service/demo/members/patroni1`: `{"conn_url":"postgres://172.18.0.8:5432/postgres","api_url":"http://172.18.0.8:8008/patroni","state":"running","role":"replica","version":"4.0.7","xlog_location":67108960,"replay_lsn":67108960,"receive_lsn":67108960,"replication_state":"streaming","timeline":1}`,
    73  		`/service/demo/members/patroni2`: `{"conn_url":"postgres://172.18.0.4:5432/postgres","api_url":"http://172.18.0.4:8008/patroni","state":"running","role":"replica","version":"4.0.7","xlog_location":67108960,"replay_lsn":67108960,"receive_lsn":67108960,"replication_state":"streaming","timeline":1}`,
    74  		`/service/demo/members/patroni3`: `{"conn_url":"` + pgConnStr + `","api_url":"http://172.18.0.3:8008/patroni","state":"running","role":"primary","version":"4.0.7","xlog_location":67108960,"timeline":1}`,
    75  		`/service/demo/status`:           `{"optime":67108960,"slots":{"patroni1":67108960,"patroni2":67108960,"patroni3":67108960},"retain_slots":["patroni1","patroni2","patroni3"]}}`}
    76  
    77  	cancelCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    78  	for k, v := range kv {
    79  		_, err = cli.Put(cancelCtx, k, v)
    80  		require.NoError(t, err, "failed to put key %s to etcd", k)
    81  	}
    82  	cancel()
    83  
    84  	md := sources.Source{}
    85  	md.Name = "continuous"
    86  	md.OnlyIfMaster = true
    87  
    88  	t.Run("simple patroni discovery", func(t *testing.T) {
    89  		md.Kind = sources.SourcePatroni
    90  		md.ConnStr = "etcd://" + strings.TrimPrefix(endpoint, "http://")
    91  		md.ConnStr += "/service"
    92  		md.ConnStr += "/demo"
    93  
    94  		// Run ResolveDatabasesFromPatroni
    95  		dbs, err := md.ResolveDatabases()
    96  		assert.NoError(t, err)
    97  		assert.NotNil(t, dbs)
    98  		assert.Len(t, dbs, 4) // postgres, mydatrabase, pgwatch, pgwatch_metrics}
    99  	})
   100  
   101  	t.Run("several endpoints patroni discovery", func(t *testing.T) {
   102  		md.Kind = sources.SourcePatroni
   103  		e := strings.TrimPrefix(endpoint, "http://")
   104  		md.ConnStr = "etcd://" + strings.Join([]string{e, e, e}, ",")
   105  		md.ConnStr += "/service"
   106  		md.ConnStr += "/demo"
   107  
   108  		// Run ResolveDatabasesFromPatroni
   109  		dbs, err := md.ResolveDatabases()
   110  		assert.NoError(t, err)
   111  		assert.NotNil(t, dbs)
   112  		assert.Len(t, dbs, 4) // postgres, mydatrabase, pgwatch, pgwatch_metrics}
   113  	})
   114  
   115  	t.Run("namespace patroni discovery", func(t *testing.T) {
   116  		md.Kind = sources.SourcePatroni
   117  		md.ConnStr = "etcd://" + strings.TrimPrefix(endpoint, "http://")
   118  
   119  		// Run ResolveDatabasesFromPatroni
   120  		dbs, err := md.ResolveDatabases()
   121  		assert.NoError(t, err)
   122  		assert.NotNil(t, dbs)
   123  		assert.Len(t, dbs, 4) // postgres, mydatrabase, pgwatch, pgwatch_metrics}
   124  	})
   125  }
   126  
   127  func TestMonitoredDatabase_UnsupportedDCS(t *testing.T) {
   128  	md := sources.Source{}
   129  	md.Name = "continuous"
   130  	md.Kind = sources.SourcePatroni
   131  
   132  	md.ConnStr = "consul://foo"
   133  	_, err := md.ResolveDatabases()
   134  	assert.ErrorIs(t, err, errors.ErrUnsupported)
   135  
   136  	md.ConnStr = "zookeeper://foo"
   137  	_, err = md.ResolveDatabases()
   138  	assert.ErrorIs(t, err, errors.ErrUnsupported)
   139  
   140  	md.ConnStr = "unknown://foo"
   141  	_, err = md.ResolveDatabases()
   142  	assert.EqualError(t, err, "unsupported DCS type: unknown")
   143  
   144  }
   145