1 package metrics
2
3 import (
4 "context"
5 "errors"
6
7 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
8 )
9
10 func NewPostgresMetricReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error) {
11 conn, err := db.New(ctx, connstr)
12 if err != nil {
13 return nil, err
14 }
15 return NewPostgresMetricReaderWriterConn(ctx, conn)
16 }
17
18 func NewPostgresMetricReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error) {
19 if err := initSchema(ctx, conn); err != nil {
20 return nil, err
21 }
22 return &dbMetricReaderWriter{
23 ctx: ctx,
24 configDb: conn,
25 }, conn.Ping(ctx)
26 }
27
28 type dbMetricReaderWriter struct {
29 ctx context.Context
30 configDb db.PgxIface
31 }
32
33 var (
34 ErrMetricNotFound = errors.New("metric not found")
35 ErrPresetNotFound = errors.New("preset not found")
36 ErrInvalidMetric = errors.New("invalid metric")
37 ErrInvalidPreset = errors.New("invalid preset")
38 )
39
40
41 var _ Migrator = (*dbMetricReaderWriter)(nil)
42
43
44
45 func writeMetricsToPostgres(ctx context.Context, conn db.PgxIface, metricDefs *Metrics) error {
46 tx, err := conn.Begin(ctx)
47 if err != nil {
48 return err
49 }
50 defer func() { _ = tx.Rollback(ctx) }()
51 for metricName, metric := range metricDefs.MetricDefs {
52 _, err = tx.Exec(ctx, `INSERT INTO pgwatch.metric (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
53 values ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (name)
54 DO UPDATE SET sqls = $2, init_sql = $3, description = $4, node_status = $5,
55 gauges = $6, is_instance_level = $7, storage_name = $8`,
56 metricName, metric.SQLs, metric.InitSQL, metric.Description, metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
57 if err != nil {
58 return err
59 }
60 }
61 for presetName, preset := range metricDefs.PresetDefs {
62 _, err = tx.Exec(ctx, `INSERT INTO pgwatch.preset (name, description, metrics)
63 VALUES ($1, $2, $3) ON CONFLICT (name) DO UPDATE SET description = $2, metrics = $3;`,
64 presetName, preset.Description, preset.Metrics)
65 if err != nil {
66 return err
67 }
68 }
69 return tx.Commit(ctx)
70 }
71
72
73 func (dmrw *dbMetricReaderWriter) GetMetrics() (metricDefMapNew *Metrics, err error) {
74 ctx := dmrw.ctx
75 conn := dmrw.configDb
76 metricDefMapNew = &Metrics{MetricDefs{}, PresetDefs{}}
77 rows, err := conn.Query(ctx, `SELECT name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name FROM pgwatch.metric`)
78 if err != nil {
79 return nil, err
80 }
81 defer rows.Close()
82 for rows.Next() {
83 metric := Metric{}
84 var name string
85 err = rows.Scan(&name, &metric.SQLs, &metric.InitSQL, &metric.Description, &metric.NodeStatus, &metric.Gauges, &metric.IsInstanceLevel, &metric.StorageName)
86 if err != nil {
87 return nil, err
88 }
89 metricDefMapNew.MetricDefs[name] = metric
90 }
91 rows, err = conn.Query(ctx, `SELECT name, description, metrics FROM pgwatch.preset`)
92 if err != nil {
93 return nil, err
94 }
95 defer rows.Close()
96 for rows.Next() {
97 preset := Preset{}
98 var name string
99 err = rows.Scan(&name, &preset.Description, &preset.Metrics)
100 if err != nil {
101 return nil, err
102 }
103 metricDefMapNew.PresetDefs[name] = preset
104 }
105 return metricDefMapNew, nil
106 }
107
108 func (dmrw *dbMetricReaderWriter) WriteMetrics(metricDefs *Metrics) error {
109 return writeMetricsToPostgres(dmrw.ctx, dmrw.configDb, metricDefs)
110 }
111
112 func (dmrw *dbMetricReaderWriter) DeleteMetric(metricName string) error {
113 _, err := dmrw.configDb.Exec(dmrw.ctx, `DELETE FROM pgwatch.metric WHERE name = $1`, metricName)
114 return err
115 }
116
117 func (dmrw *dbMetricReaderWriter) UpdateMetric(metricName string, metric Metric) error {
118 ct, err := dmrw.configDb.Exec(dmrw.ctx, `INSERT INTO pgwatch.metric
119 (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
120 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
121 ON CONFLICT (name) DO UPDATE SET
122 sqls = $2, init_sql = $3, description = $4, node_status = $5, gauges = $6, is_instance_level = $7, storage_name = $8`,
123 metricName, db.MarshallParamToJSONB(metric.SQLs), metric.InitSQL, metric.Description,
124 metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
125 if err == nil && ct.RowsAffected() == 0 {
126 return ErrMetricNotFound
127 }
128 return err
129 }
130
131 func (dmrw *dbMetricReaderWriter) DeletePreset(presetName string) error {
132 _, err := dmrw.configDb.Exec(dmrw.ctx, `DELETE FROM pgwatch.preset WHERE name = $1`, presetName)
133 return err
134 }
135
136 func (dmrw *dbMetricReaderWriter) UpdatePreset(presetName string, preset Preset) error {
137 sql := `INSERT INTO pgwatch.preset(name, description, metrics) VALUES ($1, $2, $3)
138 ON CONFLICT (name) DO UPDATE SET description = $2, metrics = $3`
139 ct, err := dmrw.configDb.Exec(dmrw.ctx, sql, presetName, preset.Description, db.MarshallParamToJSONB(preset.Metrics))
140 if err == nil && ct.RowsAffected() == 0 {
141 return ErrPresetNotFound
142 }
143 return err
144 }
145