1 package sources
2
3 import (
4 "context"
5 "fmt"
6 "math"
7 "regexp"
8 "strconv"
9 "sync"
10 "time"
11
12 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
13 "github.com/jackc/pgx/v5"
14 "github.com/jackc/pgx/v5/pgxpool"
15 )
16
17
18 var (
19 NewConn = db.New
20 NewConnWithConfig = db.NewWithConfig
21 )
22
23 const (
24 EnvUnknown = "UNKNOWN"
25 EnvAzureSingle = "AZURE_SINGLE"
26 EnvAzureFlexible = "AZURE_FLEXIBLE"
27 EnvGoogle = "GOOGLE"
28 )
29
30 type RuntimeInfo struct {
31 LastCheckedOn time.Time
32 IsInRecovery bool
33 VersionStr string
34 Version int
35 RealDbname string
36 SystemIdentifier string
37 IsSuperuser bool
38 Extensions map[string]int
39 ExecEnv string
40 ApproxDbSize int64
41 ChangeState map[string]map[string]string
42 }
43
44
45
46
47 type (
48 SourceConn struct {
49 Source
50 Conn db.PgxPoolIface
51 ConnConfig *pgxpool.Config
52 RuntimeInfo
53 sync.RWMutex
54 }
55
56 SourceConns []*SourceConn
57 )
58
59 func NewSourceConn(s Source) *SourceConn {
60 return &SourceConn{
61 Source: s,
62 RuntimeInfo: RuntimeInfo{
63 Extensions: make(map[string]int),
64 ChangeState: make(map[string]map[string]string),
65 },
66 }
67 }
68
69
70 func (md *SourceConn) Ping(ctx context.Context) (err error) {
71 if md.Kind == SourcePgBouncer {
72
73 _, err = md.Conn.Exec(ctx, "SHOW VERSION")
74 return
75 }
76 return md.Conn.Ping(ctx)
77 }
78
79
80
81 func (md *SourceConn) Connect(ctx context.Context, opts CmdOpts) (err error) {
82 if md.Conn == nil {
83 if err = md.ParseConfig(); err != nil {
84 return err
85 }
86 if md.Kind == SourcePgBouncer {
87 md.ConnConfig.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
88 }
89 if opts.MaxParallelConnectionsPerDb > 0 {
90 md.ConnConfig.MaxConns = int32(opts.MaxParallelConnectionsPerDb)
91 }
92 md.Conn, err = NewConnWithConfig(ctx, md.ConnConfig)
93 if err != nil {
94 return err
95 }
96 }
97 return md.Ping(ctx)
98 }
99
100
101 func (md *SourceConn) ParseConfig() (err error) {
102 if md.ConnConfig == nil {
103 md.ConnConfig, err = pgxpool.ParseConfig(md.ConnStr)
104 return
105 }
106 return
107 }
108
109
110
111 func (md *SourceConn) GetClusterIdentifier() string {
112 if err := md.ParseConfig(); err != nil {
113 return ""
114 }
115 return fmt.Sprintf("%s:%s:%d", md.SystemIdentifier, md.ConnConfig.ConnConfig.Host, md.ConnConfig.ConnConfig.Port)
116 }
117
118
119 func (md *SourceConn) GetDatabaseName() string {
120 if err := md.ParseConfig(); err != nil {
121 return ""
122 }
123 return md.ConnConfig.ConnConfig.Database
124 }
125
126
127 func (md *SourceConn) GetMetricInterval(name string) float64 {
128 md.RLock()
129 defer md.RUnlock()
130 if md.IsInRecovery && len(md.MetricsStandby) > 0 {
131 return md.MetricsStandby[name]
132 }
133 return md.Metrics[name]
134 }
135
136
137 func (md *SourceConn) SetDatabaseName(name string) {
138 if err := md.ParseConfig(); err != nil {
139 return
140 }
141 md.ConnStr = ""
142 md.ConnConfig.ConnConfig.Database = name
143 }
144
145 func (md *SourceConn) IsPostgresSource() bool {
146 return md.Kind != SourcePgBouncer && md.Kind != SourcePgPool
147 }
148
149
150
151
152
153 var regVer = regexp.MustCompile(`(\d+).?(\d*).?(\d*)`)
154
155 func VersionToInt(version string) (v int) {
156 if matches := regVer.FindStringSubmatch(version); len(matches) > 1 {
157 for i, match := range matches[1:] {
158 v += func() (m int) { m, _ = strconv.Atoi(match); return }() * int(math.Pow10(4-i*2))
159 }
160 }
161 return
162 }
163
164 func (md *SourceConn) FetchRuntimeInfo(ctx context.Context, forceRefetch bool) (err error) {
165 md.Lock()
166 defer md.Unlock()
167 if ctx.Err() != nil {
168 return ctx.Err()
169 }
170
171 if !forceRefetch && md.LastCheckedOn.After(time.Now().Add(time.Minute*-2)) {
172 return nil
173 }
174 switch md.Kind {
175 case SourcePgBouncer, SourcePgPool:
176 if md.VersionStr, md.Version, err = md.FetchVersion(ctx, func() string {
177 if md.Kind == SourcePgBouncer {
178 return "SHOW VERSION"
179 }
180 return "SHOW POOL_VERSION"
181 }()); err != nil {
182 return
183 }
184 default:
185 sql := `select /* pgwatch_generated */
186 div(current_setting('server_version_num')::int, 10000) as ver,
187 version(),
188 pg_is_in_recovery(),
189 current_database()::TEXT,
190 system_identifier,
191 current_setting('is_superuser')::bool
192 FROM
193 pg_control_system()`
194
195 err = md.Conn.QueryRow(ctx, sql).
196 Scan(&md.Version, &md.VersionStr,
197 &md.IsInRecovery, &md.RealDbname,
198 &md.SystemIdentifier, &md.IsSuperuser)
199 if err != nil {
200 return err
201 }
202
203 md.ExecEnv = md.DiscoverPlatform(ctx)
204 md.ApproxDbSize = md.FetchApproxSize(ctx)
205
206 sqlExtensions := `select /* pgwatch_generated */ extname::text, (regexp_matches(extversion, $$\d+\.?\d+?$$))[1]::text as extversion from pg_extension order by 1;`
207 var res pgx.Rows
208 res, err = md.Conn.Query(ctx, sqlExtensions)
209 if err == nil {
210 var ext string
211 var ver string
212 _, err = pgx.ForEachRow(res, []any{&ext, &ver}, func() error {
213 extver := VersionToInt(ver)
214 if extver == 0 {
215 return fmt.Errorf("unexpected extension %s version input: %s", ext, ver)
216 }
217 md.Extensions[ext] = extver
218 return nil
219 })
220 }
221
222 }
223 md.LastCheckedOn = time.Now()
224 return err
225 }
226
227 func (md *SourceConn) FetchVersion(ctx context.Context, sql string) (version string, ver int, err error) {
228 if err = md.Conn.QueryRow(ctx, sql, pgx.QueryExecModeSimpleProtocol).Scan(&version); err != nil {
229 return
230 }
231 ver = VersionToInt(version)
232 return
233 }
234
235
236
237 func (md *SourceConn) DiscoverPlatform(ctx context.Context) (platform string) {
238 if md.ExecEnv != "" {
239 return md.ExecEnv
240 }
241 sql := `select /* pgwatch_generated */
242 case
243 when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by Visual C' then 'AZURE_SINGLE'
244 when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by gcc' then 'AZURE_FLEXIBLE'
245 when exists (select * from pg_settings where name = 'cloudsql.supported_extensions') then 'GOOGLE'
246 else
247 'UNKNOWN'
248 end as exec_env`
249 _ = md.Conn.QueryRow(ctx, sql).Scan(&platform)
250 return
251 }
252
253
254 func (md *SourceConn) FetchApproxSize(ctx context.Context) (size int64) {
255 sqlApproxDBSize := `select /* pgwatch_generated */ current_setting('block_size')::int8 * sum(relpages) from pg_class c where c.relpersistence != 't'`
256 _ = md.Conn.QueryRow(ctx, sqlApproxDBSize).Scan(&size)
257 return
258 }
259
260
261 func (md *SourceConn) FunctionExists(ctx context.Context, functionName string) (exists bool) {
262 sql := `select /* pgwatch_generated */ true
263 from
264 pg_proc join pg_namespace n on pronamespace = n.oid
265 where
266 proname = $1 and n.nspname = 'public'`
267 _ = md.Conn.QueryRow(ctx, sql, functionName).Scan(&exists)
268 return
269 }
270
271 func (mds SourceConns) GetMonitoredDatabase(DBUniqueName string) *SourceConn {
272 for _, md := range mds {
273 if md.Name == DBUniqueName {
274 return md
275 }
276 }
277 return nil
278 }
279