1 package sinks
2
3 import (
4 "context"
5 "fmt"
6 "maps"
7 "net"
8 "net/http"
9 "slices"
10 "strconv"
11 "strings"
12 "sync"
13 "time"
14
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/prometheus/client_golang/prometheus/promhttp"
19 )
20
21
22
23 type PrometheusWriter struct {
24 logger log.Logger
25 ctx context.Context
26 lastScrapeErrors prometheus.Gauge
27 totalScrapes, totalScrapeFailures prometheus.Counter
28 PrometheusNamespace string
29 gauges map[string]([]string)
30 }
31
32 const promInstanceUpStateMetric = "instance_up"
33
34
35 const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)
36
37 func (promw *PrometheusWriter) Println(v ...any) {
38 promw.logger.Errorln(v...)
39 }
40
41 func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error) {
42 addr, namespace, found := strings.Cut(connstr, "/")
43 if !found {
44 namespace = "pgwatch"
45 }
46 l := log.GetLogger(ctx).WithField("sink", "prometheus").WithField("address", addr)
47 ctx = log.WithLogger(ctx, l)
48 promw = &PrometheusWriter{
49 ctx: ctx,
50 logger: l,
51 PrometheusNamespace: namespace,
52 lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
53 Namespace: namespace,
54 Name: "exporter_last_scrape_errors",
55 Help: "Last scrape error count for all monitored hosts / metrics",
56 }),
57 totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
58 Namespace: namespace,
59 Name: "exporter_total_scrapes",
60 Help: "Total scrape attempts.",
61 }),
62 totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
63 Namespace: namespace,
64 Name: "exporter_total_scrape_failures",
65 Help: "Number of errors while executing metric queries",
66 }),
67 }
68
69 if err = prometheus.Register(promw); err != nil {
70 return
71 }
72
73 promServer := &http.Server{
74 Addr: addr,
75 Handler: promhttp.HandlerFor(
76 prometheus.DefaultGatherer,
77 promhttp.HandlerOpts{
78 ErrorLog: promw,
79 ErrorHandling: promhttp.ContinueOnError,
80 },
81 ),
82 }
83
84 ln, err := net.Listen("tcp", promServer.Addr)
85 if err != nil {
86 return nil, err
87 }
88
89 go func() { log.GetLogger(ctx).Error(promServer.Serve(ln)) }()
90
91 l.Info(`measurements sink is activated`)
92 return
93 }
94
95 func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
96 promw.gauges = make(map[string]([]string))
97 for name, m := range metrics.MetricDefs {
98 promw.gauges[name] = m.Gauges
99 }
100 return nil
101 }
102
103 func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error {
104 if len(msg.Data) == 0 {
105 return nil
106 }
107 promw.PromAsyncCacheAddMetricData(msg.DBName, msg.MetricName, msg)
108 return nil
109 }
110
111
112 var promAsyncMetricCache = make(map[string]map[string]metrics.MeasurementEnvelope)
113 var promAsyncMetricCacheLock = sync.RWMutex{}
114
115 func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope) {
116 promAsyncMetricCacheLock.Lock()
117 defer promAsyncMetricCacheLock.Unlock()
118 if _, ok := promAsyncMetricCache[dbUnique]; ok {
119 promAsyncMetricCache[dbUnique][metric] = msgArr
120 }
121 }
122
123 func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string) {
124 promAsyncMetricCacheLock.Lock()
125 defer promAsyncMetricCacheLock.Unlock()
126 if _, ok := promAsyncMetricCache[dbUnique]; !ok {
127 metricMap := make(map[string]metrics.MeasurementEnvelope)
128 promAsyncMetricCache[dbUnique] = metricMap
129 }
130 }
131
132 func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string) {
133 promAsyncMetricCacheLock.Lock()
134 defer promAsyncMetricCacheLock.Unlock()
135
136 if metric == "" {
137 delete(promAsyncMetricCache, dbUnique)
138 } else {
139 delete(promAsyncMetricCache[dbUnique], metric)
140 }
141 }
142
143 func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error {
144 switch op {
145 case DeleteOp:
146 promw.PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metricName)
147 case AddOp:
148 promw.PromAsyncCacheInitIfRequired(dbUnique, metricName)
149 }
150 return nil
151 }
152
153 func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc) {
154 }
155
156 func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric) {
157 var rows int
158 var lastScrapeErrors float64
159 promw.totalScrapes.Add(1)
160 ch <- promw.totalScrapes
161
162 if len(promAsyncMetricCache) == 0 {
163 promw.logger.Warning("No dbs configured for monitoring. Check config")
164 ch <- promw.totalScrapeFailures
165 promw.lastScrapeErrors.Set(0)
166 ch <- promw.lastScrapeErrors
167 return
168 }
169 t1 := time.Now()
170 for dbname, metricsMessages := range promAsyncMetricCache {
171 for metric, metricMessages := range metricsMessages {
172 if metric == "change_events" {
173 continue
174 }
175 promMetrics := promw.MetricStoreMessageToPromMetrics(metricMessages)
176 rows += len(promMetrics)
177 for _, pm := range promMetrics {
178 ch <- pm
179 }
180 }
181 promAsyncMetricCacheLock.Lock()
182 promAsyncMetricCache[dbname] = make(map[string]metrics.MeasurementEnvelope)
183 promAsyncMetricCacheLock.Unlock()
184 }
185 promw.logger.WithField("count", rows).WithField("elapsed", time.Since(t1)).Info("measurements written")
186 ch <- promw.totalScrapeFailures
187 promw.lastScrapeErrors.Set(lastScrapeErrors)
188 ch <- promw.lastScrapeErrors
189 }
190
191 func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric {
192 promMetrics := make([]prometheus.Metric, 0)
193 var epochTime time.Time
194 if len(msg.Data) == 0 {
195 return promMetrics
196 }
197
198 gauges := promw.gauges[msg.MetricName]
199
200 epochTime = time.Unix(0, msg.Data.GetEpoch())
201
202 if epochTime.Before(time.Now().Add(-promScrapingStalenessHardDropLimit)) {
203 promw.logger.Warningf("Dropping metric %s:%s cache set due to staleness (>%v)...", msg.DBName, msg.MetricName, promScrapingStalenessHardDropLimit)
204 promw.PurgeMetricsFromPromAsyncCacheIfAny(msg.DBName, msg.MetricName)
205 return promMetrics
206 }
207
208 for _, dr := range msg.Data {
209 labels := make(map[string]string)
210 fields := make(map[string]float64)
211 if msg.CustomTags != nil {
212 labels = maps.Clone(msg.CustomTags)
213 }
214 labels["dbname"] = msg.DBName
215
216 for k, v := range dr {
217 if v == nil || v == "" || k == metrics.EpochColumnName {
218 continue
219 }
220
221 if strings.HasPrefix(k, "tag_") {
222 tag := k[4:]
223 labels[tag] = fmt.Sprintf("%v", v)
224 } else {
225 switch t := v.(type) {
226 case string:
227 labels[k] = t
228 case int, int32, int64, float32, float64:
229 f, err := strconv.ParseFloat(fmt.Sprintf("%v", v), 64)
230 if err != nil {
231 promw.logger.Debugf("skipping scraping column %s of [%s:%s]: %v", k, msg.DBName, msg.MetricName, err)
232 }
233 fields[k] = f
234 case bool:
235 fields[k] = map[bool]float64{true: 1, false: 0}[t]
236 default:
237 promw.logger.Debugf("skipping scraping column %s of [%s:%s], unsupported datatype: %v", k, msg.DBName, msg.MetricName, t)
238 continue
239 }
240 }
241 }
242
243 labelKeys := make([]string, 0)
244 labelValues := make([]string, 0)
245 for k, v := range labels {
246 labelKeys = append(labelKeys, k)
247 labelValues = append(labelValues, v)
248 }
249
250 for field, value := range fields {
251 fieldPromDataType := prometheus.CounterValue
252 if msg.MetricName == promInstanceUpStateMetric ||
253 len(gauges) > 0 && (gauges[0] == "*" || slices.Contains(gauges, field)) {
254 fieldPromDataType = prometheus.GaugeValue
255 }
256 var desc *prometheus.Desc
257 if promw.PrometheusNamespace != "" {
258 if msg.MetricName == promInstanceUpStateMetric {
259 desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", promw.PrometheusNamespace, msg.MetricName),
260 msg.MetricName, labelKeys, nil)
261 } else {
262 desc = prometheus.NewDesc(fmt.Sprintf("%s_%s_%s", promw.PrometheusNamespace, msg.MetricName, field),
263 msg.MetricName, labelKeys, nil)
264 }
265 } else {
266 if msg.MetricName == promInstanceUpStateMetric {
267 desc = prometheus.NewDesc(field, msg.MetricName, labelKeys, nil)
268 } else {
269 desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", msg.MetricName, field), msg.MetricName, labelKeys, nil)
270 }
271 }
272 m := prometheus.MustNewConstMetric(desc, fieldPromDataType, value, labelValues...)
273 promMetrics = append(promMetrics, prometheus.NewMetricWithTimestamp(epochTime, m))
274 }
275 }
276 return promMetrics
277 }
278