1 package sinks
2
3 import (
4 "context"
5 "fmt"
6 "net"
7 "net/http"
8 "slices"
9 "strconv"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
16 "github.com/prometheus/client_golang/prometheus"
17 "github.com/prometheus/client_golang/prometheus/promhttp"
18 )
19
20
21
22 type PrometheusWriter struct {
23 ctx context.Context
24 lastScrapeErrors prometheus.Gauge
25 totalScrapes, totalScrapeFailures prometheus.Counter
26 PrometheusNamespace string
27 }
28
29 const promInstanceUpStateMetric = "instance_up"
30
31
32 const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)
33
34 func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error) {
35 addr, namespace, found := strings.Cut(connstr, "/")
36 if !found {
37 namespace = "pgwatch"
38 }
39 l := log.GetLogger(ctx).WithField("sink", "prometheus").WithField("address", addr)
40 ctx = log.WithLogger(ctx, l)
41 promw = &PrometheusWriter{
42 ctx: ctx,
43 PrometheusNamespace: namespace,
44 lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
45 Namespace: namespace,
46 Name: "exporter_last_scrape_errors",
47 Help: "Last scrape error count for all monitored hosts / metrics",
48 }),
49 totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
50 Namespace: namespace,
51 Name: "exporter_total_scrapes",
52 Help: "Total scrape attempts.",
53 }),
54 totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
55 Namespace: namespace,
56 Name: "exporter_total_scrape_failures",
57 Help: "Number of errors while executing metric queries",
58 }),
59 }
60
61 if err = prometheus.Register(promw); err != nil {
62 return
63 }
64 promServer := &http.Server{
65 Addr: addr,
66 Handler: promhttp.Handler(),
67 }
68
69 ln, err := net.Listen("tcp", promServer.Addr)
70 if err != nil {
71 return nil, err
72 }
73
74 go func() { log.GetLogger(ctx).Error(promServer.Serve(ln)) }()
75
76 l.Info(`measurements sink is activated`)
77 return
78 }
79
80 func (promw *PrometheusWriter) Write(msgs []metrics.MeasurementEnvelope) error {
81 if len(msgs) == 0 || len(msgs[0].Data) == 0 {
82 return nil
83 }
84 msg := msgs[0]
85 promw.PromAsyncCacheAddMetricData(msg.DBName, msg.MetricName, msgs)
86 return nil
87 }
88
89
90 var promAsyncMetricCache = make(map[string]map[string][]metrics.MeasurementEnvelope)
91 var promAsyncMetricCacheLock = sync.RWMutex{}
92
93 func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr []metrics.MeasurementEnvelope) {
94 promAsyncMetricCacheLock.Lock()
95 defer promAsyncMetricCacheLock.Unlock()
96 if _, ok := promAsyncMetricCache[dbUnique]; ok {
97 promAsyncMetricCache[dbUnique][metric] = msgArr
98 }
99 }
100
101 func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string) {
102 promAsyncMetricCacheLock.Lock()
103 defer promAsyncMetricCacheLock.Unlock()
104 if _, ok := promAsyncMetricCache[dbUnique]; !ok {
105 metricMap := make(map[string][]metrics.MeasurementEnvelope)
106 promAsyncMetricCache[dbUnique] = metricMap
107 }
108 }
109
110 func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string) {
111 promAsyncMetricCacheLock.Lock()
112 defer promAsyncMetricCacheLock.Unlock()
113
114 if metric == "" {
115 delete(promAsyncMetricCache, dbUnique)
116 } else {
117 delete(promAsyncMetricCache[dbUnique], metric)
118 }
119 }
120
121 func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName, op string) error {
122 switch op {
123 case "remove":
124 promw.PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metricName)
125 case "add":
126 promw.PromAsyncCacheInitIfRequired(dbUnique, metricName)
127 }
128 return nil
129 }
130
131 func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc) {
132 }
133
134 func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric) {
135 var lastScrapeErrors float64
136 logger := log.GetLogger(promw.ctx)
137 promw.totalScrapes.Add(1)
138 ch <- promw.totalScrapes
139
140 if len(promAsyncMetricCache) == 0 {
141 logger.Warning("No dbs configured for monitoring. Check config")
142 ch <- promw.totalScrapeFailures
143 promw.lastScrapeErrors.Set(0)
144 ch <- promw.lastScrapeErrors
145 return
146 }
147
148 for dbname, metricsMessages := range promAsyncMetricCache {
149 promw.setInstanceUpDownState(ch, dbname)
150 for metric, metricMessages := range metricsMessages {
151 if metric == "change_events" {
152 continue
153 }
154 if len(metricMessages) > 0 {
155 promMetrics := promw.MetricStoreMessageToPromMetrics(metricMessages[0])
156 for _, pm := range promMetrics {
157 ch <- pm
158 }
159 }
160 }
161
162 }
163
164 ch <- promw.totalScrapeFailures
165 promw.lastScrapeErrors.Set(lastScrapeErrors)
166 ch <- promw.lastScrapeErrors
167
168
169 }
170
171 func (promw *PrometheusWriter) setInstanceUpDownState(ch chan<- prometheus.Metric, dbName string) {
172 logger := log.GetLogger(promw.ctx)
173 data := metrics.NewMeasurement(time.Now().UnixNano())
174 data[promInstanceUpStateMetric] = 1
175
176 pm := promw.MetricStoreMessageToPromMetrics(metrics.MeasurementEnvelope{
177 DBName: dbName,
178 SourceType: "postgres",
179 MetricName: promInstanceUpStateMetric,
180 CustomTags: nil,
181 Data: metrics.Measurements{data},
182 RealDbname: dbName,
183 SystemIdentifier: dbName,
184 })
185
186 if len(pm) > 0 {
187 ch <- pm[0]
188 } else {
189 logger.Error("Could not formulate an instance state report - should not happen")
190 }
191 }
192
193 func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric {
194 promMetrics := make([]prometheus.Metric, 0)
195 logger := log.GetLogger(promw.ctx)
196 var epochTime time.Time
197
198 if len(msg.Data) == 0 {
199 return promMetrics
200 }
201
202 epochTime = time.Unix(0, msg.Data.GetEpoch())
203
204 if epochTime.Before(time.Now().Add(-promScrapingStalenessHardDropLimit)) {
205 logger.Warningf("[%s][%s] Dropping metric set due to staleness (>%v) ...", msg.DBName, msg.MetricName, promScrapingStalenessHardDropLimit)
206 promw.PurgeMetricsFromPromAsyncCacheIfAny(msg.DBName, msg.MetricName)
207 return promMetrics
208 }
209
210 for _, dr := range msg.Data {
211 labels := make(map[string]string)
212 fields := make(map[string]float64)
213 labels["dbname"] = msg.DBName
214
215 for k, v := range dr {
216 if v == nil || v == "" || k == metrics.EpochColumnName {
217 continue
218 }
219
220 if strings.HasPrefix(k, "tag_") {
221 tag := k[4:]
222 labels[tag] = fmt.Sprintf("%v", v)
223 } else {
224 switch t := v.(type) {
225 case int, int32, int64, float32, float64:
226 f, err := strconv.ParseFloat(fmt.Sprintf("%v", v), 64)
227 if err != nil {
228 logger.Debugf("skipping scraping column %s of [%s:%s]: %v", k, msg.DBName, msg.MetricName, err)
229 }
230 fields[k] = f
231 case bool:
232 fields[k] = map[bool]float64{true: 1, false: 0}[t]
233 default:
234 logger.Debugf("skipping scraping column %s of [%s:%s], unsupported datatype: %v", k, msg.DBName, msg.MetricName, t)
235 continue
236 }
237 }
238 }
239 if msg.CustomTags != nil {
240 for k, v := range msg.CustomTags {
241 labels[k] = fmt.Sprintf("%v", v)
242 }
243 }
244
245 labelKeys := make([]string, 0)
246 labelValues := make([]string, 0)
247 for k, v := range labels {
248 labelKeys = append(labelKeys, k)
249 labelValues = append(labelValues, v)
250 }
251
252 for field, value := range fields {
253 fieldPromDataType := prometheus.CounterValue
254 if msg.MetricName == promInstanceUpStateMetric ||
255 len(msg.MetricDef.Gauges) > 0 &&
256 (msg.MetricDef.Gauges[0] == "*" || slices.Contains(msg.MetricDef.Gauges, field)) {
257 fieldPromDataType = prometheus.GaugeValue
258 }
259 var desc *prometheus.Desc
260 if promw.PrometheusNamespace != "" {
261 if msg.MetricName == promInstanceUpStateMetric {
262 desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", promw.PrometheusNamespace, msg.MetricName),
263 msg.MetricName, labelKeys, nil)
264 } else {
265 desc = prometheus.NewDesc(fmt.Sprintf("%s_%s_%s", promw.PrometheusNamespace, msg.MetricName, field),
266 msg.MetricName, labelKeys, nil)
267 }
268 } else {
269 if msg.MetricName == promInstanceUpStateMetric {
270 desc = prometheus.NewDesc(field, msg.MetricName, labelKeys, nil)
271 } else {
272 desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", msg.MetricName, field), msg.MetricName, labelKeys, nil)
273 }
274 }
275 m := prometheus.MustNewConstMetric(desc, fieldPromDataType, value, labelValues...)
276 promMetrics = append(promMetrics, prometheus.NewMetricWithTimestamp(epochTime, m))
277 }
278 }
279 return promMetrics
280 }
281