...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sources/resolver_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sources

     1  package sources_test
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"testing"
     8  	"time"
     9  
    10  	"github.com/stretchr/testify/assert"
    11  	"github.com/stretchr/testify/require"
    12  	client "go.etcd.io/etcd/client/v3"
    13  	"go.etcd.io/etcd/server/v3/embed"
    14  
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    16  	testcontainers "github.com/testcontainers/testcontainers-go"
    17  	"github.com/testcontainers/testcontainers-go/modules/postgres"
    18  	"github.com/testcontainers/testcontainers-go/wait"
    19  )
    20  
    21  func TestMonitoredDatabase_ResolveDatabasesFromPostgres(t *testing.T) {
    22  	pgContainer, err := postgres.Run(ctx,
    23  		ImageName,
    24  		postgres.WithDatabase("mydatabase"),
    25  		testcontainers.WithWaitStrategy(
    26  			wait.ForLog("database system is ready to accept connections").
    27  				WithOccurrence(2).
    28  				WithStartupTimeout(5*time.Second)),
    29  	)
    30  	require.NoError(t, err)
    31  	defer func() { assert.NoError(t, pgContainer.Terminate(ctx)) }()
    32  
    33  	// Create a new MonitoredDatabase instance
    34  	md := sources.Source{}
    35  	md.Name = "continuous"
    36  	md.Kind = sources.SourcePostgresContinuous
    37  	md.ConnStr, err = pgContainer.ConnectionString(ctx)
    38  	assert.NoError(t, err)
    39  
    40  	// Call the ResolveDatabasesFromPostgres method
    41  	dbs, err := md.ResolveDatabases()
    42  	assert.NoError(t, err)
    43  	assert.True(t, len(dbs) == 2) //postgres and mydatabase
    44  
    45  	// check the "continuous_mydatabase"
    46  	db := dbs.GetMonitoredDatabase(md.Name + "_mydatabase")
    47  	assert.NotNil(t, db)
    48  	assert.Equal(t, "mydatabase", db.GetDatabaseName())
    49  
    50  	//check unexpected database
    51  	db = dbs.GetMonitoredDatabase(md.Name + "_unexpected")
    52  	assert.Nil(t, db)
    53  }
    54  
    55  func TestMonitoredDatabase_ResolveDatabasesFromPatroni(t *testing.T) {
    56  	// Start embedded etcd server
    57  	cfg := embed.NewConfig()
    58  	cfg.Dir = t.TempDir()
    59  	cfg.LogLevel = "error"
    60  	e, err := embed.StartEtcd(cfg)
    61  	require.NoError(t, err)
    62  	defer e.Close()
    63  
    64  	select {
    65  	case <-e.Server.ReadyNotify():
    66  		// ready
    67  	case <-time.After(5 * time.Second):
    68  		t.Fatal("etcd server took too long to start")
    69  	}
    70  
    71  	endpoint := e.Clients[0].Addr().String()
    72  
    73  	cli, err := client.New(client.Config{
    74  		Endpoints:   []string{"http://" + endpoint},
    75  		DialTimeout: 2 * time.Second,
    76  	})
    77  	require.NoError(t, err, "failed to create etcd client")
    78  	defer cli.Close()
    79  
    80  	// Start postgres server for testing
    81  	pgContainer, err := postgres.Run(ctx,
    82  		ImageName,
    83  		postgres.WithDatabase("mydatabase"),
    84  		postgres.WithInitScripts("../../docker/bootstrap/create_role_db.sql"),
    85  		testcontainers.WithWaitStrategy(
    86  			wait.ForLog("database system is ready to accept connections").
    87  				WithOccurrence(2).
    88  				WithStartupTimeout(5*time.Second)),
    89  	)
    90  	require.NoError(t, err)
    91  	defer func() { assert.NoError(t, pgContainer.Terminate(ctx)) }()
    92  
    93  	// Put values to etcd server
    94  	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    95  	connStr, err := pgContainer.ConnectionString(ctx)
    96  	require.NoError(t, err)
    97  	_, err = cli.Put(ctx, "/service/batman/members/pg1",
    98  		fmt.Sprintf(`{"role":"master","conn_url":"%s"}`, connStr))
    99  	require.NoError(t, err)
   100  	_, err = cli.Put(ctx, "/service/batman/members/pg2",
   101  		`{"role":"standby","conn_url":"must_be_skipped"}`)
   102  	cancel()
   103  	require.NoError(t, err)
   104  
   105  	md := sources.Source{}
   106  	md.Name = "continuous"
   107  	md.OnlyIfMaster = true
   108  	md.HostConfig.DcsType = "etcd"
   109  	md.HostConfig.DcsEndpoints = []string{"http://" + endpoint}
   110  
   111  	t.Run("simple patroni discovery", func(t *testing.T) {
   112  		md.Kind = sources.SourcePatroni
   113  		md.HostConfig.Scope = "/batman/"
   114  		md.HostConfig.Namespace = "/service"
   115  
   116  		// Run ResolveDatabasesFromPatroni
   117  		dbs, err := md.ResolveDatabases()
   118  		assert.NoError(t, err)
   119  		assert.NotNil(t, dbs)
   120  		assert.Len(t, dbs, 4) // postgres, mydatrabase, pgwatch, pgwatch_metrics}
   121  	})
   122  
   123  	t.Run("namespace patroni discovery", func(t *testing.T) {
   124  		md.Kind = sources.SourcePatroniNamespace
   125  		md.HostConfig.Scope = ""
   126  		md.HostConfig.Namespace = ""
   127  
   128  		// Run ResolveDatabasesFromPatroni
   129  		dbs, err := md.ResolveDatabases()
   130  		assert.NoError(t, err)
   131  		assert.NotNil(t, dbs)
   132  		assert.Len(t, dbs, 4) // postgres, mydatrabase, pgwatch, pgwatch_metrics}
   133  	})
   134  }
   135  
   136  func TestMonitoredDatabase_UnsupportedDCS(t *testing.T) {
   137  	md := sources.Source{}
   138  	md.Name = "continuous"
   139  	md.Kind = sources.SourcePatroni
   140  
   141  	md.HostConfig.DcsType = "consul"
   142  	_, err := md.ResolveDatabases()
   143  	assert.ErrorIs(t, err, errors.ErrUnsupported)
   144  
   145  	md.HostConfig.DcsType = "zookeeper"
   146  	_, err = md.ResolveDatabases()
   147  	assert.ErrorIs(t, err, errors.ErrUnsupported)
   148  
   149  	md.HostConfig.DcsType = "unknown"
   150  	_, err = md.ResolveDatabases()
   151  	assert.EqualError(t, err, "unknown DCS")
   152  
   153  }
   154