...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sources/resolver_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sources

     1  package sources_test
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"strings"
     7  	"testing"
     8  	"time"
     9  
    10  	"github.com/stretchr/testify/assert"
    11  	"github.com/stretchr/testify/require"
    12  	client "go.etcd.io/etcd/client/v3"
    13  
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    15  	testcontainers "github.com/testcontainers/testcontainers-go"
    16  	"github.com/testcontainers/testcontainers-go/modules/etcd"
    17  	"github.com/testcontainers/testcontainers-go/modules/postgres"
    18  	"github.com/testcontainers/testcontainers-go/wait"
    19  )
    20  
    21  func TestMonitoredDatabase_ResolveDatabasesFromPostgres(t *testing.T) {
    22  	pgContainer, err := postgres.Run(ctx,
    23  		ImageName,
    24  		postgres.WithDatabase("mydatabase"),
    25  		testcontainers.WithWaitStrategy(
    26  			wait.ForLog("database system is ready to accept connections").
    27  				WithOccurrence(2).
    28  				WithStartupTimeout(5*time.Second)),
    29  	)
    30  	require.NoError(t, err)
    31  	defer func() { assert.NoError(t, pgContainer.Terminate(ctx)) }()
    32  
    33  	// Create a new MonitoredDatabase instance
    34  	md := sources.Source{}
    35  	md.Name = "continuous"
    36  	md.Kind = sources.SourcePostgresContinuous
    37  	md.ConnStr, err = pgContainer.ConnectionString(ctx, "sslmode=disable")
    38  	assert.NoError(t, err)
    39  
    40  	// Call the ResolveDatabasesFromPostgres method
    41  	dbs, err := md.ResolveDatabases()
    42  	assert.NoError(t, err)
    43  	assert.True(t, len(dbs) == 2) //postgres and mydatabase
    44  
    45  	// check the "continuous_mydatabase"
    46  	db := dbs.GetMonitoredDatabase(md.Name + "_mydatabase")
    47  	assert.NotNil(t, db)
    48  	assert.Equal(t, "mydatabase", db.GetDatabaseName())
    49  
    50  	//check unexpected database
    51  	db = dbs.GetMonitoredDatabase(md.Name + "_unexpected")
    52  	assert.Nil(t, db)
    53  }
    54  
    55  func TestMonitoredDatabase_ResolveDatabasesFromPatroni(t *testing.T) {
    56  
    57  	etcdContainer, err := etcd.Run(ctx, "gcr.io/etcd-development/etcd:v3.5.14",
    58  		testcontainers.WithWaitStrategy(wait.ForLog("ready to serve client requests").
    59  			WithStartupTimeout(15*time.Second)))
    60  	require.NoError(t, err)
    61  	defer func() { assert.NoError(t, etcdContainer.Terminate(ctx)) }()
    62  
    63  	endpoint, err := etcdContainer.ClientEndpoint(ctx)
    64  	require.NoError(t, err)
    65  
    66  	cli, err := client.New(client.Config{
    67  		Endpoints:   []string{endpoint},
    68  		DialTimeout: 10 * time.Second,
    69  	})
    70  	require.NoError(t, err, "failed to create etcd client")
    71  	defer cli.Close()
    72  
    73  	// Start postgres server for testing
    74  	pgContainer, err := postgres.Run(ctx,
    75  		ImageName,
    76  		postgres.WithDatabase("mydatabase"),
    77  		postgres.WithInitScripts("../../docker/bootstrap/create_role_db.sql"),
    78  		testcontainers.WithWaitStrategy(
    79  			wait.ForLog("database system is ready to accept connections").
    80  				WithOccurrence(2).
    81  				WithStartupTimeout(5*time.Second)),
    82  	)
    83  	require.NoError(t, err)
    84  	defer func() { assert.NoError(t, pgContainer.Terminate(ctx)) }()
    85  	pgConnStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
    86  	require.NoError(t, err)
    87  	// Put values to etcd server
    88  	kv := map[string]string{
    89  		`/service/demo/config`:           `{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":1048576,"postgresql":{"use_pg_rewind":true,"pg_hba":["local all all trust","host replication replicator all md5","host all all all md5"],"parameters":{"max_connections":100}}}`,
    90  		`/service/demo/initialize`:       `7553211779477532695`,
    91  		`/service/demo/leader`:           `patroni3`,
    92  		`/service/demo/members/patroni1`: `{"conn_url":"postgres://172.18.0.8:5432/postgres","api_url":"http://172.18.0.8:8008/patroni","state":"running","role":"replica","version":"4.0.7","xlog_location":67108960,"replay_lsn":67108960,"receive_lsn":67108960,"replication_state":"streaming","timeline":1}`,
    93  		`/service/demo/members/patroni2`: `{"conn_url":"postgres://172.18.0.4:5432/postgres","api_url":"http://172.18.0.4:8008/patroni","state":"running","role":"replica","version":"4.0.7","xlog_location":67108960,"replay_lsn":67108960,"receive_lsn":67108960,"replication_state":"streaming","timeline":1}`,
    94  		`/service/demo/members/patroni3`: `{"conn_url":"` + pgConnStr + `","api_url":"http://172.18.0.3:8008/patroni","state":"running","role":"primary","version":"4.0.7","xlog_location":67108960,"timeline":1}`,
    95  		`/service/demo/status`:           `{"optime":67108960,"slots":{"patroni1":67108960,"patroni2":67108960,"patroni3":67108960},"retain_slots":["patroni1","patroni2","patroni3"]}}`}
    96  
    97  	cancelCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    98  	for k, v := range kv {
    99  		_, err = cli.Put(cancelCtx, k, v)
   100  		require.NoError(t, err, "failed to put key %s to etcd", k)
   101  	}
   102  	cancel()
   103  
   104  	md := sources.Source{}
   105  	md.Name = "continuous"
   106  	md.OnlyIfMaster = true
   107  
   108  	t.Run("simple patroni discovery", func(t *testing.T) {
   109  		md.Kind = sources.SourcePatroni
   110  		md.ConnStr = "etcd://" + strings.TrimPrefix(endpoint, "http://")
   111  		md.ConnStr += "/service"
   112  		md.ConnStr += "/demo"
   113  
   114  		// Run ResolveDatabasesFromPatroni
   115  		dbs, err := md.ResolveDatabases()
   116  		assert.NoError(t, err)
   117  		assert.NotNil(t, dbs)
   118  		assert.Len(t, dbs, 4) // postgres, mydatrabase, pgwatch, pgwatch_metrics}
   119  	})
   120  
   121  	t.Run("several endpoints patroni discovery", func(t *testing.T) {
   122  		md.Kind = sources.SourcePatroni
   123  		e := strings.TrimPrefix(endpoint, "http://")
   124  		md.ConnStr = "etcd://" + strings.Join([]string{e, e, e}, ",")
   125  		md.ConnStr += "/service"
   126  		md.ConnStr += "/demo"
   127  
   128  		// Run ResolveDatabasesFromPatroni
   129  		dbs, err := md.ResolveDatabases()
   130  		assert.NoError(t, err)
   131  		assert.NotNil(t, dbs)
   132  		assert.Len(t, dbs, 4) // postgres, mydatrabase, pgwatch, pgwatch_metrics}
   133  	})
   134  
   135  	t.Run("namespace patroni discovery", func(t *testing.T) {
   136  		md.Kind = sources.SourcePatroni
   137  		md.ConnStr = "etcd://" + strings.TrimPrefix(endpoint, "http://")
   138  
   139  		// Run ResolveDatabasesFromPatroni
   140  		dbs, err := md.ResolveDatabases()
   141  		assert.NoError(t, err)
   142  		assert.NotNil(t, dbs)
   143  		assert.Len(t, dbs, 4) // postgres, mydatrabase, pgwatch, pgwatch_metrics}
   144  	})
   145  }
   146  
   147  func TestMonitoredDatabase_UnsupportedDCS(t *testing.T) {
   148  	md := sources.Source{}
   149  	md.Name = "continuous"
   150  	md.Kind = sources.SourcePatroni
   151  
   152  	md.ConnStr = "consul://foo"
   153  	_, err := md.ResolveDatabases()
   154  	assert.ErrorIs(t, err, errors.ErrUnsupported)
   155  
   156  	md.ConnStr = "zookeeper://foo"
   157  	_, err = md.ResolveDatabases()
   158  	assert.ErrorIs(t, err, errors.ErrUnsupported)
   159  
   160  	md.ConnStr = "unknown://foo"
   161  	_, err = md.ResolveDatabases()
   162  	assert.EqualError(t, err, "unsupported DCS type: unknown")
   163  
   164  }
   165