1 package metrics
2
3 import (
4 "context"
5 "errors"
6
7 "github.com/jackc/pgx/v5/pgconn"
8
9 "github.com/cybertec-postgresql/pgwatch/v5/internal/db"
10 )
11
12 func NewPostgresMetricReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error) {
13 conn, err := db.New(ctx, connstr)
14 if err != nil {
15 return nil, err
16 }
17 return NewPostgresMetricReaderWriterConn(ctx, conn)
18 }
19
20 func NewPostgresMetricReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error) {
21 if err := initSchema(ctx, conn); err != nil {
22 return nil, err
23 }
24 dmrw := &dbMetricReaderWriter{
25 ctx: ctx,
26 configDb: conn,
27 }
28 return dmrw, conn.Ping(ctx)
29 }
30
31 type dbMetricReaderWriter struct {
32 ctx context.Context
33 configDb db.PgxIface
34 }
35
36 var (
37 ErrNeedsMigration = errors.New("config database schema is outdated, please run migrations using `pgwatch config upgrade` command")
38 ErrMetricNotFound = errors.New("metric not found")
39 ErrPresetNotFound = errors.New("preset not found")
40 ErrInvalidMetric = errors.New("invalid metric")
41 ErrInvalidPreset = errors.New("invalid preset")
42 ErrMetricExists = errors.New("metric already exists")
43 ErrPresetExists = errors.New("preset already exists")
44 )
45
46
47 var _ db.Migrator = (*dbMetricReaderWriter)(nil)
48
49
50
51 func writeMetricsToPostgres(ctx context.Context, conn db.PgxIface, metricDefs *Metrics) error {
52 tx, err := conn.Begin(ctx)
53 if err != nil {
54 return err
55 }
56 defer func() { _ = tx.Rollback(ctx) }()
57 for metricName, metric := range metricDefs.MetricDefs {
58 _, err = tx.Exec(ctx, `INSERT INTO pgwatch.metric (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
59 values ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (name)
60 DO UPDATE SET sqls = $2, init_sql = $3, description = $4, node_status = $5,
61 gauges = $6, is_instance_level = $7, storage_name = $8`,
62 metricName, metric.SQLs, metric.InitSQL, metric.Description, metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
63 if err != nil {
64 return err
65 }
66 }
67 for presetName, preset := range metricDefs.PresetDefs {
68 _, err = tx.Exec(ctx, `INSERT INTO pgwatch.preset (name, description, metrics)
69 VALUES ($1, $2, $3) ON CONFLICT (name) DO UPDATE SET description = $2, metrics = $3;`,
70 presetName, preset.Description, preset.Metrics)
71 if err != nil {
72 return err
73 }
74 }
75 return tx.Commit(ctx)
76 }
77
78
79 func (dmrw *dbMetricReaderWriter) GetMetrics() (metricDefMapNew *Metrics, err error) {
80 ctx := dmrw.ctx
81 conn := dmrw.configDb
82 metricDefMapNew = &Metrics{MetricDefs{}, PresetDefs{}}
83 rows, err := conn.Query(ctx, `SELECT name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name FROM pgwatch.metric`)
84 if err != nil {
85 return nil, err
86 }
87 defer rows.Close()
88 for rows.Next() {
89 metric := Metric{}
90 var name string
91 err = rows.Scan(&name, &metric.SQLs, &metric.InitSQL, &metric.Description, &metric.NodeStatus, &metric.Gauges, &metric.IsInstanceLevel, &metric.StorageName)
92 if err != nil {
93 return nil, err
94 }
95 metricDefMapNew.MetricDefs[name] = metric
96 }
97 rows, err = conn.Query(ctx, `SELECT name, description, metrics FROM pgwatch.preset`)
98 if err != nil {
99 return nil, err
100 }
101 defer rows.Close()
102 for rows.Next() {
103 preset := Preset{}
104 var name string
105 err = rows.Scan(&name, &preset.Description, &preset.Metrics)
106 if err != nil {
107 return nil, err
108 }
109 metricDefMapNew.PresetDefs[name] = preset
110 }
111 return metricDefMapNew, nil
112 }
113
114 func (dmrw *dbMetricReaderWriter) WriteMetrics(metricDefs *Metrics) error {
115 return writeMetricsToPostgres(dmrw.ctx, dmrw.configDb, metricDefs)
116 }
117
118 func (dmrw *dbMetricReaderWriter) DeleteMetric(metricName string) error {
119 _, err := dmrw.configDb.Exec(dmrw.ctx, `DELETE FROM pgwatch.metric WHERE name = $1`, metricName)
120 return err
121 }
122
123 func (dmrw *dbMetricReaderWriter) UpdateMetric(metricName string, metric Metric) error {
124 ct, err := dmrw.configDb.Exec(dmrw.ctx, `INSERT INTO pgwatch.metric
125 (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
126 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
127 ON CONFLICT (name) DO UPDATE SET
128 sqls = $2, init_sql = $3, description = $4, node_status = $5, gauges = $6, is_instance_level = $7, storage_name = $8`,
129 metricName, db.MarshallParamToJSONB(metric.SQLs), metric.InitSQL, metric.Description,
130 metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
131 if err == nil && ct.RowsAffected() == 0 {
132 return ErrMetricNotFound
133 }
134 return err
135 }
136
137 func (dmrw *dbMetricReaderWriter) CreateMetric(metricName string, metric Metric) error {
138 _, err := dmrw.configDb.Exec(dmrw.ctx, `INSERT INTO pgwatch.metric
139 (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
140 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
141 metricName, db.MarshallParamToJSONB(metric.SQLs), metric.InitSQL, metric.Description,
142 metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
143 if err != nil {
144
145 var pgErr *pgconn.PgError
146 if errors.As(err, &pgErr) && pgErr.SQLState() == "23505" {
147 return ErrMetricExists
148 }
149 }
150 return err
151 }
152
153 func (dmrw *dbMetricReaderWriter) DeletePreset(presetName string) error {
154 _, err := dmrw.configDb.Exec(dmrw.ctx, `DELETE FROM pgwatch.preset WHERE name = $1`, presetName)
155 return err
156 }
157
158 func (dmrw *dbMetricReaderWriter) UpdatePreset(presetName string, preset Preset) error {
159 sql := `INSERT INTO pgwatch.preset(name, description, metrics) VALUES ($1, $2, $3)
160 ON CONFLICT (name) DO UPDATE SET description = $2, metrics = $3`
161 ct, err := dmrw.configDb.Exec(dmrw.ctx, sql, presetName, preset.Description, db.MarshallParamToJSONB(preset.Metrics))
162 if err == nil && ct.RowsAffected() == 0 {
163 return ErrPresetNotFound
164 }
165 return err
166 }
167
168 func (dmrw *dbMetricReaderWriter) CreatePreset(presetName string, preset Preset) error {
169 sql := `INSERT INTO pgwatch.preset(name, description, metrics) VALUES ($1, $2, $3)`
170 _, err := dmrw.configDb.Exec(dmrw.ctx, sql, presetName, preset.Description, db.MarshallParamToJSONB(preset.Metrics))
171 if err != nil {
172
173 var pgErr *pgconn.PgError
174 if errors.As(err, &pgErr) && pgErr.Code == "23505" {
175 return ErrPresetExists
176 }
177 }
178 return err
179 }
180