1 package sinks
2
3 import (
4 "context"
5 "fmt"
6 "maps"
7 "net"
8 "net/http"
9 "slices"
10 "strconv"
11 "strings"
12 "sync"
13 "time"
14
15 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
16 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/prometheus/client_golang/prometheus/promhttp"
19 )
20
21 type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 type PrometheusWriter struct {
45 sync.RWMutex
46 logger log.Logger
47 ctx context.Context
48 gauges map[string]([]string)
49 Namespace string
50 Cache PromMetricCache
51
52
53 lastScrapeErrors prometheus.Gauge
54 totalScrapes prometheus.Counter
55 totalScrapeFailures prometheus.Counter
56 }
57
58 const promInstanceUpStateMetric = "instance_up"
59
60
61 const promCacheTTL = time.Minute * time.Duration(10)
62
63 func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error) {
64 addr, namespace, found := strings.Cut(connstr, "/")
65 if !found || namespace == "" {
66 namespace = "pgwatch"
67 }
68 l := log.GetLogger(ctx).WithField("sink", "prometheus").WithField("address", addr)
69 ctx = log.WithLogger(ctx, l)
70
71 promw = &PrometheusWriter{
72 ctx: ctx,
73 logger: l,
74 Namespace: namespace,
75 Cache: make(PromMetricCache),
76 lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
77 Namespace: namespace,
78 Name: "exporter_last_scrape_errors",
79 Help: "Last scrape error count for all monitored hosts / metrics",
80 }),
81 totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
82 Namespace: namespace,
83 Name: "exporter_total_scrapes",
84 Help: "Total scrape attempts.",
85 }),
86 totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
87 Namespace: namespace,
88 Name: "exporter_total_scrape_failures",
89 Help: "Number of errors while executing metric queries",
90 }),
91 }
92
93 if err = prometheus.Register(promw); err != nil {
94 return
95 }
96
97 promServer := &http.Server{
98 Addr: addr,
99 Handler: promhttp.HandlerFor(
100 prometheus.DefaultGatherer,
101 promhttp.HandlerOpts{
102 ErrorLog: promw,
103 ErrorHandling: promhttp.ContinueOnError,
104 },
105 ),
106 }
107
108 ln, err := net.Listen("tcp", promServer.Addr)
109 if err != nil {
110 return nil, err
111 }
112
113 go func() { log.GetLogger(ctx).Error(promServer.Serve(ln)) }()
114
115 l.Info(`measurements sink is activated`)
116 return
117 }
118
119
120 func (promw *PrometheusWriter) Println(v ...any) {
121 promw.logger.Errorln(v...)
122 }
123
124
125 func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
126 promw.Lock()
127 defer promw.Unlock()
128 promw.gauges = make(map[string]([]string))
129 for name, m := range metrics.MetricDefs {
130 promw.gauges[name] = m.Gauges
131 }
132 return nil
133 }
134
135
136 func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error {
137 if len(msg.Data) == 0 {
138 return nil
139 }
140 promw.AddCacheEntry(msg.DBName, msg.MetricName, msg)
141 return nil
142 }
143
144
145
146 func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
147 switch op {
148 case DeleteOp:
149 promw.PurgeCacheEntry(sourceName, metricName)
150 case AddOp:
151 promw.InitCacheEntry(sourceName)
152 }
153 return nil
154 }
155
156 var notSupportedMetrics = map[string]struct{}{
157 "change_events": {},
158 "pgbouncer_stats": {},
159 "pgbouncer_clients": {},
160 "pgpool_processes": {},
161 "pgpool_stats": {},
162 }
163
164 func (promw *PrometheusWriter) AddCacheEntry(dbUnique, metric string, msgArr metrics.MeasurementEnvelope) {
165 if _, ok := notSupportedMetrics[metric]; ok {
166 return
167 }
168 promw.Lock()
169 defer promw.Unlock()
170 if _, ok := promw.Cache[dbUnique]; !ok {
171 promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
172 }
173 promw.Cache[dbUnique][metric] = msgArr
174 }
175
176 func (promw *PrometheusWriter) InitCacheEntry(dbUnique string) {
177 promw.Lock()
178 defer promw.Unlock()
179 if _, ok := promw.Cache[dbUnique]; !ok {
180 promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
181 }
182 }
183
184 func (promw *PrometheusWriter) PurgeCacheEntry(dbUnique, metric string) {
185 promw.Lock()
186 defer promw.Unlock()
187 if metric == "" {
188 delete(promw.Cache, dbUnique)
189 return
190 }
191 delete(promw.Cache[dbUnique], metric)
192 }
193
194
195
196 func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc) {
197 }
198
199
200
201
202 func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric) {
203 promw.totalScrapes.Add(1)
204 ch <- promw.totalScrapes
205
206 promw.RLock()
207 if len(promw.Cache) == 0 {
208 promw.RUnlock()
209 promw.logger.Warning("No dbs configured for monitoring. Check config")
210 ch <- promw.totalScrapeFailures
211 promw.lastScrapeErrors.Set(0)
212 ch <- promw.lastScrapeErrors
213 return
214 }
215 snapshot := promw.snapshotCache()
216 promw.RUnlock()
217
218 var rows int
219 var lastScrapeErrors float64
220
221 t1 := time.Now()
222 for _, metricsMessages := range snapshot {
223 for _, envelope := range metricsMessages {
224 written, errors := promw.WritePromMetrics(envelope, ch)
225 lastScrapeErrors += float64(errors)
226 rows += written
227 }
228 }
229 promw.logger.WithField("count", rows).WithField("elapsed", time.Since(t1)).Info("measurements written")
230 ch <- promw.totalScrapeFailures
231 promw.lastScrapeErrors.Set(lastScrapeErrors)
232 ch <- promw.lastScrapeErrors
233 }
234
235
236
237
238
239 func (promw *PrometheusWriter) snapshotCache() PromMetricCache {
240 snapshot := make(PromMetricCache, len(promw.Cache))
241 for db, metricMap := range promw.Cache {
242 snapshot[db] = maps.Clone(metricMap)
243 }
244 return snapshot
245 }
246
247
248
249 func (promw *PrometheusWriter) WritePromMetrics(msg metrics.MeasurementEnvelope, ch chan<- prometheus.Metric) (written int, errorCount int) {
250 if len(msg.Data) == 0 {
251 return
252 }
253
254 promw.RLock()
255 gauges := promw.gauges[msg.MetricName]
256 promw.RUnlock()
257
258 epochTime := time.Unix(0, msg.Data.GetEpoch())
259
260 if epochTime.Before(time.Now().Add(-promCacheTTL)) {
261 promw.logger.Debugf("Dropping metric %s:%s cache set due to staleness (>%v)...", msg.DBName, msg.MetricName, promCacheTTL)
262 promw.PurgeCacheEntry(msg.DBName, msg.MetricName)
263 return
264 }
265
266 seen := make(map[string]any)
267
268 for _, measurement := range msg.Data {
269 labels := make(map[string]string)
270 fields := make(map[string]float64)
271 if msg.CustomTags != nil {
272 labels = maps.Clone(msg.CustomTags)
273 }
274 labels["dbname"] = msg.DBName
275
276 for k, v := range measurement {
277 if k == metrics.EpochColumnName || v == nil || v == "" {
278 continue
279 }
280
281 tag, found := strings.CutPrefix(k, "tag_")
282 if found {
283 labels[tag] = fmt.Sprintf("%v", v)
284 } else {
285 switch t := v.(type) {
286 case int, int32, int64, float32, float64:
287 fields[k], _ = strconv.ParseFloat(fmt.Sprintf("%v", v), 64)
288 case bool:
289 fields[k] = map[bool]float64{true: 1, false: 0}[t]
290 default:
291
292
293
294 promw.logger.Debugf("skipping scraping column %s of [%s:%s], unsupported datatype: %v", k, msg.DBName, msg.MetricName, t)
295 continue
296 }
297 }
298 }
299
300
301
302
303
304 labelKeys := slices.Sorted(maps.Keys(labels))
305 labelValues := make([]string, len(labelKeys))
306 for i, k := range labelKeys {
307 labelValues[i] = labels[k]
308 }
309
310 for field, value := range fields {
311 fieldPromDataType := prometheus.CounterValue
312 if msg.MetricName == promInstanceUpStateMetric ||
313 len(gauges) > 0 && (gauges[0] == "*" || slices.Contains(gauges, field)) {
314 fieldPromDataType = prometheus.GaugeValue
315 }
316
317 var fqName string
318 if msg.MetricName == promInstanceUpStateMetric {
319 fqName = fmt.Sprintf("%s_%s", promw.Namespace, msg.MetricName)
320 } else {
321 fqName = fmt.Sprintf("%s_%s_%s", promw.Namespace, msg.MetricName, field)
322 }
323
324
325 identity := fqName + "_" + strings.Join(labelValues, "_")
326 if _, dup := seen[identity]; dup {
327 promw.logger.
328 WithField("metric", msg.MetricName).
329 Warning("duplicate metric identity dropped, prefix differentiating string columns with tag_")
330 errorCount++
331 continue
332 }
333 seen[identity] = struct{}{}
334
335 desc := prometheus.NewDesc(fqName, msg.MetricName, labelKeys, nil)
336 m, err := prometheus.NewConstMetric(desc, fieldPromDataType, value, labelValues...)
337 if err != nil {
338 promw.logger.Warningf("skipping metric %s of [%s:%s]: %v", fqName, msg.DBName, msg.MetricName, err)
339 errorCount++
340 continue
341 }
342 ch <- prometheus.NewMetricWithTimestamp(epochTime, m)
343 written++
344 }
345 }
346 return
347 }
348