1 package sources_test
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "testing"
8 "time"
9
10 "github.com/stretchr/testify/assert"
11 "github.com/stretchr/testify/require"
12 client "go.etcd.io/etcd/client/v3"
13 "go.etcd.io/etcd/server/v3/embed"
14
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
16 testcontainers "github.com/testcontainers/testcontainers-go"
17 "github.com/testcontainers/testcontainers-go/modules/postgres"
18 "github.com/testcontainers/testcontainers-go/wait"
19 )
20
21 func TestMonitoredDatabase_ResolveDatabasesFromPostgres(t *testing.T) {
22 pgContainer, err := postgres.Run(ctx,
23 ImageName,
24 postgres.WithDatabase("mydatabase"),
25 testcontainers.WithWaitStrategy(
26 wait.ForLog("database system is ready to accept connections").
27 WithOccurrence(2).
28 WithStartupTimeout(5*time.Second)),
29 )
30 require.NoError(t, err)
31 defer func() { assert.NoError(t, pgContainer.Terminate(ctx)) }()
32
33
34 md := sources.Source{}
35 md.Name = "continuous"
36 md.Kind = sources.SourcePostgresContinuous
37 md.ConnStr, err = pgContainer.ConnectionString(ctx)
38 assert.NoError(t, err)
39
40
41 dbs, err := md.ResolveDatabases()
42 assert.NoError(t, err)
43 assert.True(t, len(dbs) == 2)
44
45
46 db := dbs.GetMonitoredDatabase(md.Name + "_mydatabase")
47 assert.NotNil(t, db)
48 assert.Equal(t, "mydatabase", db.GetDatabaseName())
49
50
51 db = dbs.GetMonitoredDatabase(md.Name + "_unexpected")
52 assert.Nil(t, db)
53 }
54
55 func TestMonitoredDatabase_ResolveDatabasesFromPatroni(t *testing.T) {
56
57 cfg := embed.NewConfig()
58 cfg.Dir = t.TempDir()
59 cfg.LogLevel = "error"
60 e, err := embed.StartEtcd(cfg)
61 require.NoError(t, err)
62 defer e.Close()
63
64 select {
65 case <-e.Server.ReadyNotify():
66
67 case <-time.After(5 * time.Second):
68 t.Fatal("etcd server took too long to start")
69 }
70
71 endpoint := e.Clients[0].Addr().String()
72
73 cli, err := client.New(client.Config{
74 Endpoints: []string{"http://" + endpoint},
75 DialTimeout: 2 * time.Second,
76 })
77 require.NoError(t, err, "failed to create etcd client")
78 defer cli.Close()
79
80
81 pgContainer, err := postgres.Run(ctx,
82 ImageName,
83 postgres.WithDatabase("mydatabase"),
84 postgres.WithInitScripts("../../docker/bootstrap/create_role_db.sql"),
85 testcontainers.WithWaitStrategy(
86 wait.ForLog("database system is ready to accept connections").
87 WithOccurrence(2).
88 WithStartupTimeout(5*time.Second)),
89 )
90 require.NoError(t, err)
91 defer func() { assert.NoError(t, pgContainer.Terminate(ctx)) }()
92
93
94 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
95 connStr, err := pgContainer.ConnectionString(ctx)
96 require.NoError(t, err)
97 _, err = cli.Put(ctx, "/service/batman/members/pg1",
98 fmt.Sprintf(`{"role":"master","conn_url":"%s"}`, connStr))
99 require.NoError(t, err)
100 _, err = cli.Put(ctx, "/service/batman/members/pg2",
101 `{"role":"standby","conn_url":"must_be_skipped"}`)
102 cancel()
103 require.NoError(t, err)
104
105 md := sources.Source{}
106 md.Name = "continuous"
107 md.OnlyIfMaster = true
108 md.HostConfig.DcsType = "etcd"
109 md.HostConfig.DcsEndpoints = []string{"http://" + endpoint}
110
111 t.Run("simple patroni discovery", func(t *testing.T) {
112 md.Kind = sources.SourcePatroni
113 md.HostConfig.Scope = "/batman/"
114 md.HostConfig.Namespace = "/service"
115
116
117 dbs, err := md.ResolveDatabases()
118 assert.NoError(t, err)
119 assert.NotNil(t, dbs)
120 assert.Len(t, dbs, 4)
121 })
122
123 t.Run("namespace patroni discovery", func(t *testing.T) {
124 md.Kind = sources.SourcePatroniNamespace
125 md.HostConfig.Scope = ""
126 md.HostConfig.Namespace = ""
127
128
129 dbs, err := md.ResolveDatabases()
130 assert.NoError(t, err)
131 assert.NotNil(t, dbs)
132 assert.Len(t, dbs, 4)
133 })
134 }
135
136 func TestMonitoredDatabase_UnsupportedDCS(t *testing.T) {
137 md := sources.Source{}
138 md.Name = "continuous"
139 md.Kind = sources.SourcePatroni
140
141 md.HostConfig.DcsType = "consul"
142 _, err := md.ResolveDatabases()
143 assert.ErrorIs(t, err, errors.ErrUnsupported)
144
145 md.HostConfig.DcsType = "zookeeper"
146 _, err = md.ResolveDatabases()
147 assert.ErrorIs(t, err, errors.ErrUnsupported)
148
149 md.HostConfig.DcsType = "unknown"
150 _, err = md.ResolveDatabases()
151 assert.EqualError(t, err, "unknown DCS")
152
153 }
154