1 package sources
2
3
4
5
6 import (
7 "context"
8
9 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
10 pgx "github.com/jackc/pgx/v5"
11 )
12
13 func NewPostgresSourcesReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error) {
14 conn, err := db.New(ctx, connstr)
15 if err != nil {
16 return nil, err
17 }
18 return NewPostgresSourcesReaderWriterConn(ctx, conn)
19 }
20
21 func NewPostgresSourcesReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error) {
22 return &dbSourcesReaderWriter{
23 ctx: ctx,
24 configDb: conn,
25 }, conn.Ping(ctx)
26
27 }
28
29 type dbSourcesReaderWriter struct {
30 ctx context.Context
31 configDb db.PgxIface
32 }
33
34 func (r *dbSourcesReaderWriter) WriteSources(dbs Sources) error {
35 tx, err := r.configDb.Begin(context.Background())
36 if err != nil {
37 return err
38 }
39 if _, err = tx.Exec(context.Background(), `truncate pgwatch.source`); err != nil {
40 return err
41 }
42 defer func() { _ = tx.Rollback(context.Background()) }()
43 for _, md := range dbs {
44 if err = r.updateDatabase(tx, md); err != nil {
45 return err
46 }
47 }
48 return tx.Commit(context.Background())
49 }
50
51 func (r *dbSourcesReaderWriter) updateDatabase(conn db.PgxIface, md Source) (err error) {
52 m := db.MarshallParamToJSONB
53 sql := `insert into pgwatch.source(
54 name,
55 "group",
56 dbtype,
57 connstr,
58 config,
59 config_standby,
60 preset_config,
61 preset_config_standby,
62 include_pattern,
63 exclude_pattern,
64 custom_tags,
65 host_config,
66 only_if_master,
67 is_enabled)
68 values
69 ($1, $2, $3, $4, $5, $6, NULLIF($7, ''), NULLIF($8, ''), $9, $10, $11, $12, $13, $14)
70 on conflict (name) do update set
71 "group" = $2,
72 dbtype = $3,
73 connstr = $4,
74 config = $5,
75 config_standby = $6,
76 preset_config = NULLIF($7, ''),
77 preset_config_standby = NULLIF($8, ''),
78 include_pattern = $9,
79 exclude_pattern = $10,
80 custom_tags = $11,
81 host_config = $12,
82 only_if_master = $13,
83 is_enabled = $14`
84 _, err = conn.Exec(context.Background(), sql,
85 md.Name, md.Group, md.Kind,
86 md.ConnStr, m(md.Metrics), m(md.MetricsStandby), md.PresetMetrics, md.PresetMetricsStandby,
87 md.IncludePattern, md.ExcludePattern, m(md.CustomTags),
88 m(md.HostConfig), md.OnlyIfMaster, md.IsEnabled)
89 return err
90 }
91
92 func (r *dbSourcesReaderWriter) UpdateSource(md Source) error {
93 return r.updateDatabase(r.configDb, md)
94 }
95
96 func (r *dbSourcesReaderWriter) DeleteSource(name string) error {
97 _, err := r.configDb.Exec(context.Background(), `delete from pgwatch.source where name = $1`, name)
98 return err
99 }
100
101 func (r *dbSourcesReaderWriter) GetSources() (Sources, error) {
102 sqlLatest := `select /* pgwatch_generated */
103 name,
104 "group",
105 dbtype,
106 connstr,
107 coalesce(config, '{}'::jsonb) as config,
108 coalesce(config_standby, '{}'::jsonb) as config_standby,
109 coalesce(preset_config, '') as preset_config,
110 coalesce(preset_config_standby, '') as preset_config_standby,
111 coalesce(include_pattern, '') as include_pattern,
112 coalesce(exclude_pattern, '') as exclude_pattern,
113 coalesce(custom_tags, '{}'::jsonb) as custom_tags,
114 coalesce(host_config, '{}') as host_config,
115 only_if_master,
116 is_enabled
117 from
118 pgwatch.source`
119 rows, err := r.configDb.Query(context.Background(), sqlLatest)
120 if err != nil {
121 return nil, err
122 }
123 return pgx.CollectRows[Source](rows, pgx.RowToStructByNameLax)
124 }
125