1 package sources_test
2
3 import (
4 "context"
5 "errors"
6 "strings"
7 "testing"
8 "time"
9
10 "github.com/stretchr/testify/assert"
11 "github.com/stretchr/testify/require"
12 client "go.etcd.io/etcd/client/v3"
13
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
15 testcontainers "github.com/testcontainers/testcontainers-go"
16 "github.com/testcontainers/testcontainers-go/modules/etcd"
17 "github.com/testcontainers/testcontainers-go/modules/postgres"
18 "github.com/testcontainers/testcontainers-go/wait"
19 )
20
21 func TestMonitoredDatabase_ResolveDatabasesFromPostgres(t *testing.T) {
22 pgContainer, err := postgres.Run(ctx,
23 ImageName,
24 postgres.WithDatabase("mydatabase"),
25 testcontainers.WithWaitStrategy(
26 wait.ForLog("database system is ready to accept connections").
27 WithOccurrence(2).
28 WithStartupTimeout(5*time.Second)),
29 )
30 require.NoError(t, err)
31 defer func() { assert.NoError(t, pgContainer.Terminate(ctx)) }()
32
33
34 md := sources.Source{}
35 md.Name = "continuous"
36 md.Kind = sources.SourcePostgresContinuous
37 md.ConnStr, err = pgContainer.ConnectionString(ctx, "sslmode=disable")
38 assert.NoError(t, err)
39
40
41 dbs, err := md.ResolveDatabases()
42 assert.NoError(t, err)
43 assert.True(t, len(dbs) == 2)
44
45
46 db := dbs.GetMonitoredDatabase(md.Name + "_mydatabase")
47 assert.NotNil(t, db)
48 assert.Equal(t, "mydatabase", db.GetDatabaseName())
49
50
51 db = dbs.GetMonitoredDatabase(md.Name + "_unexpected")
52 assert.Nil(t, db)
53 }
54
55 func TestMonitoredDatabase_ResolveDatabasesFromPatroni(t *testing.T) {
56
57 etcdContainer, err := etcd.Run(ctx, "gcr.io/etcd-development/etcd:v3.5.14",
58 testcontainers.WithWaitStrategy(wait.ForLog("ready to serve client requests").
59 WithStartupTimeout(15*time.Second)))
60 require.NoError(t, err)
61 defer func() { assert.NoError(t, etcdContainer.Terminate(ctx)) }()
62
63 endpoint, err := etcdContainer.ClientEndpoint(ctx)
64 require.NoError(t, err)
65
66 cli, err := client.New(client.Config{
67 Endpoints: []string{endpoint},
68 DialTimeout: 10 * time.Second,
69 })
70 require.NoError(t, err, "failed to create etcd client")
71 defer cli.Close()
72
73
74 pgContainer, err := postgres.Run(ctx,
75 ImageName,
76 postgres.WithDatabase("mydatabase"),
77 postgres.WithInitScripts("../../docker/bootstrap/create_role_db.sql"),
78 testcontainers.WithWaitStrategy(
79 wait.ForLog("database system is ready to accept connections").
80 WithOccurrence(2).
81 WithStartupTimeout(5*time.Second)),
82 )
83 require.NoError(t, err)
84 defer func() { assert.NoError(t, pgContainer.Terminate(ctx)) }()
85 pgConnStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
86 require.NoError(t, err)
87
88 kv := map[string]string{
89 `/service/demo/config`: `{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":1048576,"postgresql":{"use_pg_rewind":true,"pg_hba":["local all all trust","host replication replicator all md5","host all all all md5"],"parameters":{"max_connections":100}}}`,
90 `/service/demo/initialize`: `7553211779477532695`,
91 `/service/demo/leader`: `patroni3`,
92 `/service/demo/members/patroni1`: `{"conn_url":"postgres://172.18.0.8:5432/postgres","api_url":"http://172.18.0.8:8008/patroni","state":"running","role":"replica","version":"4.0.7","xlog_location":67108960,"replay_lsn":67108960,"receive_lsn":67108960,"replication_state":"streaming","timeline":1}`,
93 `/service/demo/members/patroni2`: `{"conn_url":"postgres://172.18.0.4:5432/postgres","api_url":"http://172.18.0.4:8008/patroni","state":"running","role":"replica","version":"4.0.7","xlog_location":67108960,"replay_lsn":67108960,"receive_lsn":67108960,"replication_state":"streaming","timeline":1}`,
94 `/service/demo/members/patroni3`: `{"conn_url":"` + pgConnStr + `","api_url":"http://172.18.0.3:8008/patroni","state":"running","role":"primary","version":"4.0.7","xlog_location":67108960,"timeline":1}`,
95 `/service/demo/status`: `{"optime":67108960,"slots":{"patroni1":67108960,"patroni2":67108960,"patroni3":67108960},"retain_slots":["patroni1","patroni2","patroni3"]}}`}
96
97 cancelCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
98 for k, v := range kv {
99 _, err = cli.Put(cancelCtx, k, v)
100 require.NoError(t, err, "failed to put key %s to etcd", k)
101 }
102 cancel()
103
104 md := sources.Source{}
105 md.Name = "continuous"
106 md.OnlyIfMaster = true
107
108 t.Run("simple patroni discovery", func(t *testing.T) {
109 md.Kind = sources.SourcePatroni
110 md.ConnStr = "etcd://" + strings.TrimPrefix(endpoint, "http://")
111 md.ConnStr += "/service"
112 md.ConnStr += "/demo"
113
114
115 dbs, err := md.ResolveDatabases()
116 assert.NoError(t, err)
117 assert.NotNil(t, dbs)
118 assert.Len(t, dbs, 4)
119 })
120
121 t.Run("several endpoints patroni discovery", func(t *testing.T) {
122 md.Kind = sources.SourcePatroni
123 e := strings.TrimPrefix(endpoint, "http://")
124 md.ConnStr = "etcd://" + strings.Join([]string{e, e, e}, ",")
125 md.ConnStr += "/service"
126 md.ConnStr += "/demo"
127
128
129 dbs, err := md.ResolveDatabases()
130 assert.NoError(t, err)
131 assert.NotNil(t, dbs)
132 assert.Len(t, dbs, 4)
133 })
134
135 t.Run("namespace patroni discovery", func(t *testing.T) {
136 md.Kind = sources.SourcePatroni
137 md.ConnStr = "etcd://" + strings.TrimPrefix(endpoint, "http://")
138
139
140 dbs, err := md.ResolveDatabases()
141 assert.NoError(t, err)
142 assert.NotNil(t, dbs)
143 assert.Len(t, dbs, 4)
144 })
145 }
146
147 func TestMonitoredDatabase_UnsupportedDCS(t *testing.T) {
148 md := sources.Source{}
149 md.Name = "continuous"
150 md.Kind = sources.SourcePatroni
151
152 md.ConnStr = "consul://foo"
153 _, err := md.ResolveDatabases()
154 assert.ErrorIs(t, err, errors.ErrUnsupported)
155
156 md.ConnStr = "zookeeper://foo"
157 _, err = md.ResolveDatabases()
158 assert.ErrorIs(t, err, errors.ErrUnsupported)
159
160 md.ConnStr = "unknown://foo"
161 _, err = md.ResolveDatabases()
162 assert.EqualError(t, err, "unsupported DCS type: unknown")
163
164 }
165