1 package sinks
2
3 import (
4 "context"
5 "fmt"
6 "maps"
7 "net"
8 "net/http"
9 "slices"
10 "strconv"
11 "strings"
12 "sync"
13 "time"
14
15 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
16 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/prometheus/client_golang/prometheus/promhttp"
19 )
20
21 type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope
22
23
24
25 type PrometheusWriter struct {
26 sync.RWMutex
27 logger log.Logger
28 ctx context.Context
29 lastScrapeErrors prometheus.Gauge
30 totalScrapes prometheus.Counter
31 totalScrapeFailures prometheus.Counter
32 gauges map[string]([]string)
33 Namespace string
34 Cache PromMetricCache
35 }
36
37 const promInstanceUpStateMetric = "instance_up"
38
39
40 const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)
41
42 func (promw *PrometheusWriter) Println(v ...any) {
43 promw.logger.Errorln(v...)
44 }
45
46 func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error) {
47 addr, namespace, found := strings.Cut(connstr, "/")
48 if !found {
49 namespace = "pgwatch"
50 }
51 l := log.GetLogger(ctx).WithField("sink", "prometheus").WithField("address", addr)
52 ctx = log.WithLogger(ctx, l)
53 promw = &PrometheusWriter{
54 ctx: ctx,
55 logger: l,
56 Namespace: namespace,
57 Cache: make(PromMetricCache),
58 lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
59 Namespace: namespace,
60 Name: "exporter_last_scrape_errors",
61 Help: "Last scrape error count for all monitored hosts / metrics",
62 }),
63 totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
64 Namespace: namespace,
65 Name: "exporter_total_scrapes",
66 Help: "Total scrape attempts.",
67 }),
68 totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
69 Namespace: namespace,
70 Name: "exporter_total_scrape_failures",
71 Help: "Number of errors while executing metric queries",
72 }),
73 }
74
75 if err = prometheus.Register(promw); err != nil {
76 return
77 }
78
79 promServer := &http.Server{
80 Addr: addr,
81 Handler: promhttp.HandlerFor(
82 prometheus.DefaultGatherer,
83 promhttp.HandlerOpts{
84 ErrorLog: promw,
85 ErrorHandling: promhttp.ContinueOnError,
86 },
87 ),
88 }
89
90 ln, err := net.Listen("tcp", promServer.Addr)
91 if err != nil {
92 return nil, err
93 }
94
95 go func() { log.GetLogger(ctx).Error(promServer.Serve(ln)) }()
96
97 l.Info(`measurements sink is activated`)
98 return
99 }
100
101 func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
102 promw.gauges = make(map[string]([]string))
103 for name, m := range metrics.MetricDefs {
104 promw.gauges[name] = m.Gauges
105 }
106 return nil
107 }
108
109 func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error {
110 if len(msg.Data) == 0 {
111 return nil
112 }
113 promw.PromAsyncCacheAddMetricData(msg.DBName, msg.MetricName, msg)
114 return nil
115 }
116
117 func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope) {
118 promw.Lock()
119 defer promw.Unlock()
120 if _, ok := promw.Cache[dbUnique]; ok {
121 promw.Cache[dbUnique][metric] = msgArr
122 }
123 }
124
125 func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string) {
126 promw.Lock()
127 defer promw.Unlock()
128 if _, ok := promw.Cache[dbUnique]; !ok {
129 promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
130 }
131 }
132
133 func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string) {
134 promw.Lock()
135 defer promw.Unlock()
136
137 if metric == "" {
138 delete(promw.Cache, dbUnique)
139 } else {
140 delete(promw.Cache[dbUnique], metric)
141 }
142 }
143
144 func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
145 switch op {
146 case DeleteOp:
147 promw.PurgeMetricsFromPromAsyncCacheIfAny(sourceName, metricName)
148 case AddOp:
149 promw.PromAsyncCacheInitIfRequired(sourceName, metricName)
150 }
151 return nil
152 }
153
154 func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc) {
155 }
156
157 func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric) {
158 var rows int
159 var lastScrapeErrors float64
160 promw.totalScrapes.Add(1)
161 ch <- promw.totalScrapes
162
163 promw.Lock()
164 if len(promw.Cache) == 0 {
165 promw.Unlock()
166 promw.logger.Warning("No dbs configured for monitoring. Check config")
167 ch <- promw.totalScrapeFailures
168 promw.lastScrapeErrors.Set(0)
169 ch <- promw.lastScrapeErrors
170 return
171 }
172 snapshot := promw.Cache
173 promw.Cache = make(PromMetricCache, len(snapshot))
174 for dbUnique := range snapshot {
175 promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
176 }
177 promw.Unlock()
178
179 t1 := time.Now()
180 for _, metricsMessages := range snapshot {
181 for metric, metricMessages := range metricsMessages {
182 if metric == "change_events" {
183 continue
184 }
185 promMetrics := promw.MetricStoreMessageToPromMetrics(metricMessages)
186 rows += len(promMetrics)
187 for _, pm := range promMetrics {
188 ch <- pm
189 }
190 }
191 }
192 promw.logger.WithField("count", rows).WithField("elapsed", time.Since(t1)).Info("measurements written")
193 ch <- promw.totalScrapeFailures
194 promw.lastScrapeErrors.Set(lastScrapeErrors)
195 ch <- promw.lastScrapeErrors
196 }
197
198 func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric {
199 promMetrics := make([]prometheus.Metric, 0)
200 var epochTime time.Time
201 if len(msg.Data) == 0 {
202 return promMetrics
203 }
204
205 gauges := promw.gauges[msg.MetricName]
206
207 epochTime = time.Unix(0, msg.Data.GetEpoch())
208
209 if epochTime.Before(time.Now().Add(-promScrapingStalenessHardDropLimit)) {
210 promw.logger.Warningf("Dropping metric %s:%s cache set due to staleness (>%v)...", msg.DBName, msg.MetricName, promScrapingStalenessHardDropLimit)
211 promw.PurgeMetricsFromPromAsyncCacheIfAny(msg.DBName, msg.MetricName)
212 return promMetrics
213 }
214
215 for _, dr := range msg.Data {
216 labels := make(map[string]string)
217 fields := make(map[string]float64)
218 if msg.CustomTags != nil {
219 labels = maps.Clone(msg.CustomTags)
220 }
221 labels["dbname"] = msg.DBName
222
223 for k, v := range dr {
224 if v == nil || v == "" || k == metrics.EpochColumnName {
225 continue
226 }
227
228 if strings.HasPrefix(k, "tag_") {
229 tag := k[4:]
230 labels[tag] = fmt.Sprintf("%v", v)
231 } else {
232 switch t := v.(type) {
233 case string:
234 labels[k] = t
235 case int, int32, int64, float32, float64:
236 f, err := strconv.ParseFloat(fmt.Sprintf("%v", v), 64)
237 if err != nil {
238 promw.logger.Debugf("skipping scraping column %s of [%s:%s]: %v", k, msg.DBName, msg.MetricName, err)
239 }
240 fields[k] = f
241 case bool:
242 fields[k] = map[bool]float64{true: 1, false: 0}[t]
243 default:
244 promw.logger.Debugf("skipping scraping column %s of [%s:%s], unsupported datatype: %v", k, msg.DBName, msg.MetricName, t)
245 continue
246 }
247 }
248 }
249
250 labelKeys := make([]string, 0)
251 labelValues := make([]string, 0)
252 for k, v := range labels {
253 labelKeys = append(labelKeys, k)
254 labelValues = append(labelValues, v)
255 }
256
257 for field, value := range fields {
258 fieldPromDataType := prometheus.CounterValue
259 if msg.MetricName == promInstanceUpStateMetric ||
260 len(gauges) > 0 && (gauges[0] == "*" || slices.Contains(gauges, field)) {
261 fieldPromDataType = prometheus.GaugeValue
262 }
263 var desc *prometheus.Desc
264 if promw.Namespace != "" {
265 if msg.MetricName == promInstanceUpStateMetric {
266 desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", promw.Namespace, msg.MetricName),
267 msg.MetricName, labelKeys, nil)
268 } else {
269 desc = prometheus.NewDesc(fmt.Sprintf("%s_%s_%s", promw.Namespace, msg.MetricName, field),
270 msg.MetricName, labelKeys, nil)
271 }
272 } else {
273 if msg.MetricName == promInstanceUpStateMetric {
274 desc = prometheus.NewDesc(field, msg.MetricName, labelKeys, nil)
275 } else {
276 desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", msg.MetricName, field), msg.MetricName, labelKeys, nil)
277 }
278 }
279 m := prometheus.MustNewConstMetric(desc, fieldPromDataType, value, labelValues...)
280 promMetrics = append(promMetrics, prometheus.NewMetricWithTimestamp(epochTime, m))
281 }
282 }
283 return promMetrics
284 }
285