1 package metrics
2
3 import (
4 "context"
5 "errors"
6
7 "github.com/jackc/pgx/v5/pgconn"
8
9 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
10 )
11
12 func NewPostgresMetricReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error) {
13 conn, err := db.New(ctx, connstr)
14 if err != nil {
15 return nil, err
16 }
17 return NewPostgresMetricReaderWriterConn(ctx, conn)
18 }
19
20 func NewPostgresMetricReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error) {
21 if err := initSchema(ctx, conn); err != nil {
22 return nil, err
23 }
24 return &dbMetricReaderWriter{
25 ctx: ctx,
26 configDb: conn,
27 }, conn.Ping(ctx)
28 }
29
30 type dbMetricReaderWriter struct {
31 ctx context.Context
32 configDb db.PgxIface
33 }
34
35 var (
36 ErrMetricNotFound = errors.New("metric not found")
37 ErrPresetNotFound = errors.New("preset not found")
38 ErrInvalidMetric = errors.New("invalid metric")
39 ErrInvalidPreset = errors.New("invalid preset")
40 ErrMetricExists = errors.New("metric already exists")
41 ErrPresetExists = errors.New("preset already exists")
42 )
43
44
45 var _ Migrator = (*dbMetricReaderWriter)(nil)
46
47
48
49 func writeMetricsToPostgres(ctx context.Context, conn db.PgxIface, metricDefs *Metrics) error {
50 tx, err := conn.Begin(ctx)
51 if err != nil {
52 return err
53 }
54 defer func() { _ = tx.Rollback(ctx) }()
55 for metricName, metric := range metricDefs.MetricDefs {
56 _, err = tx.Exec(ctx, `INSERT INTO pgwatch.metric (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
57 values ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (name)
58 DO UPDATE SET sqls = $2, init_sql = $3, description = $4, node_status = $5,
59 gauges = $6, is_instance_level = $7, storage_name = $8`,
60 metricName, metric.SQLs, metric.InitSQL, metric.Description, metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
61 if err != nil {
62 return err
63 }
64 }
65 for presetName, preset := range metricDefs.PresetDefs {
66 _, err = tx.Exec(ctx, `INSERT INTO pgwatch.preset (name, description, metrics)
67 VALUES ($1, $2, $3) ON CONFLICT (name) DO UPDATE SET description = $2, metrics = $3;`,
68 presetName, preset.Description, preset.Metrics)
69 if err != nil {
70 return err
71 }
72 }
73 return tx.Commit(ctx)
74 }
75
76
77 func (dmrw *dbMetricReaderWriter) GetMetrics() (metricDefMapNew *Metrics, err error) {
78 ctx := dmrw.ctx
79 conn := dmrw.configDb
80 metricDefMapNew = &Metrics{MetricDefs{}, PresetDefs{}}
81 rows, err := conn.Query(ctx, `SELECT name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name FROM pgwatch.metric`)
82 if err != nil {
83 return nil, err
84 }
85 defer rows.Close()
86 for rows.Next() {
87 metric := Metric{}
88 var name string
89 err = rows.Scan(&name, &metric.SQLs, &metric.InitSQL, &metric.Description, &metric.NodeStatus, &metric.Gauges, &metric.IsInstanceLevel, &metric.StorageName)
90 if err != nil {
91 return nil, err
92 }
93 metricDefMapNew.MetricDefs[name] = metric
94 }
95 rows, err = conn.Query(ctx, `SELECT name, description, metrics FROM pgwatch.preset`)
96 if err != nil {
97 return nil, err
98 }
99 defer rows.Close()
100 for rows.Next() {
101 preset := Preset{}
102 var name string
103 err = rows.Scan(&name, &preset.Description, &preset.Metrics)
104 if err != nil {
105 return nil, err
106 }
107 metricDefMapNew.PresetDefs[name] = preset
108 }
109 return metricDefMapNew, nil
110 }
111
112 func (dmrw *dbMetricReaderWriter) WriteMetrics(metricDefs *Metrics) error {
113 return writeMetricsToPostgres(dmrw.ctx, dmrw.configDb, metricDefs)
114 }
115
116 func (dmrw *dbMetricReaderWriter) DeleteMetric(metricName string) error {
117 _, err := dmrw.configDb.Exec(dmrw.ctx, `DELETE FROM pgwatch.metric WHERE name = $1`, metricName)
118 return err
119 }
120
121 func (dmrw *dbMetricReaderWriter) UpdateMetric(metricName string, metric Metric) error {
122 ct, err := dmrw.configDb.Exec(dmrw.ctx, `INSERT INTO pgwatch.metric
123 (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
124 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
125 ON CONFLICT (name) DO UPDATE SET
126 sqls = $2, init_sql = $3, description = $4, node_status = $5, gauges = $6, is_instance_level = $7, storage_name = $8`,
127 metricName, db.MarshallParamToJSONB(metric.SQLs), metric.InitSQL, metric.Description,
128 metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
129 if err == nil && ct.RowsAffected() == 0 {
130 return ErrMetricNotFound
131 }
132 return err
133 }
134
135 func (dmrw *dbMetricReaderWriter) CreateMetric(metricName string, metric Metric) error {
136 _, err := dmrw.configDb.Exec(dmrw.ctx, `INSERT INTO pgwatch.metric
137 (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
138 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
139 metricName, db.MarshallParamToJSONB(metric.SQLs), metric.InitSQL, metric.Description,
140 metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
141 if err != nil {
142
143 var pgErr *pgconn.PgError
144 if errors.As(err, &pgErr) && pgErr.SQLState() == "23505" {
145 return ErrMetricExists
146 }
147 }
148 return err
149 }
150
151 func (dmrw *dbMetricReaderWriter) DeletePreset(presetName string) error {
152 _, err := dmrw.configDb.Exec(dmrw.ctx, `DELETE FROM pgwatch.preset WHERE name = $1`, presetName)
153 return err
154 }
155
156 func (dmrw *dbMetricReaderWriter) UpdatePreset(presetName string, preset Preset) error {
157 sql := `INSERT INTO pgwatch.preset(name, description, metrics) VALUES ($1, $2, $3)
158 ON CONFLICT (name) DO UPDATE SET description = $2, metrics = $3`
159 ct, err := dmrw.configDb.Exec(dmrw.ctx, sql, presetName, preset.Description, db.MarshallParamToJSONB(preset.Metrics))
160 if err == nil && ct.RowsAffected() == 0 {
161 return ErrPresetNotFound
162 }
163 return err
164 }
165
166 func (dmrw *dbMetricReaderWriter) CreatePreset(presetName string, preset Preset) error {
167 sql := `INSERT INTO pgwatch.preset(name, description, metrics) VALUES ($1, $2, $3)`
168 _, err := dmrw.configDb.Exec(dmrw.ctx, sql, presetName, preset.Description, db.MarshallParamToJSONB(preset.Metrics))
169 if err != nil {
170
171 var pgErr *pgconn.PgError
172 if errors.As(err, &pgErr) && pgErr.Code == "23505" {
173 return ErrPresetExists
174 }
175 }
176 return err
177 }
178