1 package sources
2
3
4
5
6 import (
7 "context"
8 "errors"
9
10 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
11 pgx "github.com/jackc/pgx/v5"
12 "github.com/jackc/pgx/v5/pgconn"
13 )
14
15 func NewPostgresSourcesReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error) {
16 conn, err := db.New(ctx, connstr)
17 if err != nil {
18 return nil, err
19 }
20 return NewPostgresSourcesReaderWriterConn(ctx, conn)
21 }
22
23 func NewPostgresSourcesReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error) {
24 return &dbSourcesReaderWriter{
25 ctx: ctx,
26 configDb: conn,
27 }, conn.Ping(ctx)
28
29 }
30
31 type dbSourcesReaderWriter struct {
32 ctx context.Context
33 configDb db.PgxIface
34 }
35
36 func (r *dbSourcesReaderWriter) WriteSources(dbs Sources) error {
37 tx, err := r.configDb.Begin(context.Background())
38 if err != nil {
39 return err
40 }
41 if _, err = tx.Exec(context.Background(), `truncate pgwatch.source`); err != nil {
42 return err
43 }
44 defer func() { _ = tx.Rollback(context.Background()) }()
45 for _, md := range dbs {
46 if err = r.updateSource(tx, md); err != nil {
47 return err
48 }
49 }
50 return tx.Commit(context.Background())
51 }
52
53 func (r *dbSourcesReaderWriter) updateSource(conn db.PgxIface, md Source) (err error) {
54 m := db.MarshallParamToJSONB
55 sql := `insert into pgwatch.source(
56 name,
57 "group",
58 dbtype,
59 connstr,
60 config,
61 config_standby,
62 preset_config,
63 preset_config_standby,
64 include_pattern,
65 exclude_pattern,
66 custom_tags,
67 only_if_master,
68 is_enabled)
69 values
70 ($1, $2, $3, $4, $5, $6, NULLIF($7, ''), NULLIF($8, ''), $9, $10, $11, $12, $13)
71 on conflict (name) do update set
72 "group" = $2,
73 dbtype = $3,
74 connstr = $4,
75 config = $5,
76 config_standby = $6,
77 preset_config = NULLIF($7, ''),
78 preset_config_standby = NULLIF($8, ''),
79 include_pattern = $9,
80 exclude_pattern = $10,
81 custom_tags = $11,
82 only_if_master = $12,
83 is_enabled = $13`
84 _, err = conn.Exec(context.Background(), sql,
85 md.Name, md.Group, md.Kind,
86 md.ConnStr, m(md.Metrics), m(md.MetricsStandby), md.PresetMetrics, md.PresetMetricsStandby,
87 md.IncludePattern, md.ExcludePattern, m(md.CustomTags),
88 md.OnlyIfMaster, md.IsEnabled)
89 return err
90 }
91
92 func (r *dbSourcesReaderWriter) createSource(conn db.PgxIface, md Source) (err error) {
93 m := db.MarshallParamToJSONB
94 sql := `insert into pgwatch.source(
95 name,
96 "group",
97 dbtype,
98 connstr,
99 config,
100 config_standby,
101 preset_config,
102 preset_config_standby,
103 include_pattern,
104 exclude_pattern,
105 custom_tags,
106 only_if_master,
107 is_enabled)
108 values
109 ($1, $2, $3, $4, $5, $6, NULLIF($7, ''), NULLIF($8, ''), $9, $10, $11, $12, $13)`
110 _, err = conn.Exec(context.Background(), sql,
111 md.Name, md.Group, md.Kind,
112 md.ConnStr, m(md.Metrics), m(md.MetricsStandby), md.PresetMetrics, md.PresetMetricsStandby,
113 md.IncludePattern, md.ExcludePattern, m(md.CustomTags),
114 md.OnlyIfMaster, md.IsEnabled)
115 if err != nil {
116
117 var pgErr *pgconn.PgError
118 if errors.As(err, &pgErr) && pgErr.SQLState() == "23505" {
119 return ErrSourceExists
120 }
121 }
122 return err
123 }
124
125 func (r *dbSourcesReaderWriter) UpdateSource(md Source) error {
126 return r.updateSource(r.configDb, md)
127 }
128
129 func (r *dbSourcesReaderWriter) CreateSource(md Source) error {
130 return r.createSource(r.configDb, md)
131 }
132
133 func (r *dbSourcesReaderWriter) DeleteSource(name string) error {
134 _, err := r.configDb.Exec(context.Background(), `delete from pgwatch.source where name = $1`, name)
135 return err
136 }
137
138 func (r *dbSourcesReaderWriter) GetSources() (Sources, error) {
139 sqlLatest := `select /* pgwatch_generated */
140 name,
141 "group",
142 dbtype,
143 connstr,
144 coalesce(config, '{}'::jsonb) as config,
145 coalesce(config_standby, '{}'::jsonb) as config_standby,
146 coalesce(preset_config, '') as preset_config,
147 coalesce(preset_config_standby, '') as preset_config_standby,
148 coalesce(include_pattern, '') as include_pattern,
149 coalesce(exclude_pattern, '') as exclude_pattern,
150 coalesce(custom_tags, '{}'::jsonb) as custom_tags,
151 only_if_master,
152 is_enabled
153 from
154 pgwatch.source`
155 rows, err := r.configDb.Query(context.Background(), sqlLatest)
156 if err != nil {
157 return nil, err
158 }
159 return pgx.CollectRows[Source](rows, pgx.RowToStructByNameLax)
160 }
161