1 package sources_test
2
3 import (
4 "context"
5 "errors"
6 "strings"
7 "testing"
8 "time"
9
10 "github.com/stretchr/testify/assert"
11 "github.com/stretchr/testify/require"
12 client "go.etcd.io/etcd/client/v3"
13
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/testutil"
16 )
17
18 func TestMonitoredDatabase_ResolveDatabasesFromPostgres(t *testing.T) {
19 pgContainer, pgTeardown, err := testutil.SetupPostgresContainer()
20 require.NoError(t, err)
21 defer pgTeardown()
22
23
24 md := sources.Source{}
25 md.Name = "continuous"
26 md.Kind = sources.SourcePostgresContinuous
27 md.ConnStr, err = pgContainer.ConnectionString(ctx, "sslmode=disable")
28 assert.NoError(t, err)
29
30
31 dbs, err := md.ResolveDatabases()
32 assert.NoError(t, err)
33 assert.True(t, len(dbs) == 2)
34
35
36 db := dbs.GetMonitoredDatabase(md.Name + "_mydatabase")
37 assert.NotNil(t, db)
38 assert.Equal(t, "mydatabase", db.GetDatabaseName())
39
40
41 db = dbs.GetMonitoredDatabase(md.Name + "_unexpected")
42 assert.Nil(t, db)
43 }
44
45 func TestMonitoredDatabase_ResolveDatabasesFromPatroni(t *testing.T) {
46 etcdContainer, etcdTeardown, err := testutil.SetupEtcdContainer()
47 require.NoError(t, err)
48 defer etcdTeardown()
49
50 endpoint, err := etcdContainer.ClientEndpoint(ctx)
51 require.NoError(t, err)
52
53 cli, err := client.New(client.Config{
54 Endpoints: []string{endpoint},
55 DialTimeout: 10 * time.Second,
56 })
57 require.NoError(t, err, "failed to create etcd client")
58 defer cli.Close()
59
60
61 pgContainer, pgTeardown, err := testutil.SetupPostgresContainerWithInitScripts("../../docker/bootstrap/create_role_db.sql")
62 require.NoError(t, err)
63 defer pgTeardown()
64
65 pgConnStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
66 require.NoError(t, err)
67
68 kv := map[string]string{
69 `/service/demo/config`: `{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":1048576,"postgresql":{"use_pg_rewind":true,"pg_hba":["local all all trust","host replication replicator all md5","host all all all md5"],"parameters":{"max_connections":100}}}`,
70 `/service/demo/initialize`: `7553211779477532695`,
71 `/service/demo/leader`: `patroni3`,
72 `/service/demo/members/patroni1`: `{"conn_url":"postgres://172.18.0.8:5432/postgres","api_url":"http://172.18.0.8:8008/patroni","state":"running","role":"replica","version":"4.0.7","xlog_location":67108960,"replay_lsn":67108960,"receive_lsn":67108960,"replication_state":"streaming","timeline":1}`,
73 `/service/demo/members/patroni2`: `{"conn_url":"postgres://172.18.0.4:5432/postgres","api_url":"http://172.18.0.4:8008/patroni","state":"running","role":"replica","version":"4.0.7","xlog_location":67108960,"replay_lsn":67108960,"receive_lsn":67108960,"replication_state":"streaming","timeline":1}`,
74 `/service/demo/members/patroni3`: `{"conn_url":"` + pgConnStr + `","api_url":"http://172.18.0.3:8008/patroni","state":"running","role":"primary","version":"4.0.7","xlog_location":67108960,"timeline":1}`,
75 `/service/demo/status`: `{"optime":67108960,"slots":{"patroni1":67108960,"patroni2":67108960,"patroni3":67108960},"retain_slots":["patroni1","patroni2","patroni3"]}}`}
76
77 cancelCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
78 for k, v := range kv {
79 _, err = cli.Put(cancelCtx, k, v)
80 require.NoError(t, err, "failed to put key %s to etcd", k)
81 }
82 cancel()
83
84 md := sources.Source{}
85 md.Name = "continuous"
86 md.OnlyIfMaster = true
87
88 t.Run("simple patroni discovery", func(t *testing.T) {
89 md.Kind = sources.SourcePatroni
90 md.ConnStr = "etcd://" + strings.TrimPrefix(endpoint, "http://")
91 md.ConnStr += "/service"
92 md.ConnStr += "/demo"
93
94
95 dbs, err := md.ResolveDatabases()
96 assert.NoError(t, err)
97 assert.NotNil(t, dbs)
98 assert.Len(t, dbs, 4)
99 })
100
101 t.Run("several endpoints patroni discovery", func(t *testing.T) {
102 md.Kind = sources.SourcePatroni
103 e := strings.TrimPrefix(endpoint, "http://")
104 md.ConnStr = "etcd://" + strings.Join([]string{e, e, e}, ",")
105 md.ConnStr += "/service"
106 md.ConnStr += "/demo"
107
108
109 dbs, err := md.ResolveDatabases()
110 assert.NoError(t, err)
111 assert.NotNil(t, dbs)
112 assert.Len(t, dbs, 4)
113 })
114
115 t.Run("namespace patroni discovery", func(t *testing.T) {
116 md.Kind = sources.SourcePatroni
117 md.ConnStr = "etcd://" + strings.TrimPrefix(endpoint, "http://")
118
119
120 dbs, err := md.ResolveDatabases()
121 assert.NoError(t, err)
122 assert.NotNil(t, dbs)
123 assert.Len(t, dbs, 4)
124 })
125 }
126
127 func TestMonitoredDatabase_UnsupportedDCS(t *testing.T) {
128 md := sources.Source{}
129 md.Name = "continuous"
130 md.Kind = sources.SourcePatroni
131
132 md.ConnStr = "consul://foo"
133 _, err := md.ResolveDatabases()
134 assert.ErrorIs(t, err, errors.ErrUnsupported)
135
136 md.ConnStr = "zookeeper://foo"
137 _, err = md.ResolveDatabases()
138 assert.ErrorIs(t, err, errors.ErrUnsupported)
139
140 md.ConnStr = "unknown://foo"
141 _, err = md.ResolveDatabases()
142 assert.EqualError(t, err, "unsupported DCS type: unknown")
143
144 }
145