...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/prometheus.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"maps"
     7  	"net"
     8  	"net/http"
     9  	"slices"
    10  	"strconv"
    11  	"strings"
    12  	"sync"
    13  	"time"
    14  
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    17  	"github.com/prometheus/client_golang/prometheus"
    18  	"github.com/prometheus/client_golang/prometheus/promhttp"
    19  )
    20  
    21  // PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper.
    22  // Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.
    23  type PrometheusWriter struct {
    24  	logger                            log.Logger
    25  	ctx                               context.Context
    26  	lastScrapeErrors                  prometheus.Gauge
    27  	totalScrapes, totalScrapeFailures prometheus.Counter
    28  	PrometheusNamespace               string
    29  	gauges                            map[string]([]string) // map of metric names to their gauge names, used for Prometheus gauge metrics
    30  }
    31  
    32  const promInstanceUpStateMetric = "instance_up"
    33  
    34  // timestamps older than that will be ignored on the Prom scraper side anyway, so better don't emit at all and just log a notice
    35  const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)
    36  
    37  func (promw *PrometheusWriter) Println(v ...any) {
    38  	promw.logger.Errorln(v...)
    39  }
    40  
    41  func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error) {
    42  	addr, namespace, found := strings.Cut(connstr, "/")
    43  	if !found {
    44  		namespace = "pgwatch"
    45  	}
    46  	l := log.GetLogger(ctx).WithField("sink", "prometheus").WithField("address", addr)
    47  	ctx = log.WithLogger(ctx, l)
    48  	promw = &PrometheusWriter{
    49  		ctx:                 ctx,
    50  		logger:              l,
    51  		PrometheusNamespace: namespace,
    52  		lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
    53  			Namespace: namespace,
    54  			Name:      "exporter_last_scrape_errors",
    55  			Help:      "Last scrape error count for all monitored hosts / metrics",
    56  		}),
    57  		totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
    58  			Namespace: namespace,
    59  			Name:      "exporter_total_scrapes",
    60  			Help:      "Total scrape attempts.",
    61  		}),
    62  		totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
    63  			Namespace: namespace,
    64  			Name:      "exporter_total_scrape_failures",
    65  			Help:      "Number of errors while executing metric queries",
    66  		}),
    67  	}
    68  
    69  	if err = prometheus.Register(promw); err != nil {
    70  		return
    71  	}
    72  
    73  	promServer := &http.Server{
    74  		Addr: addr,
    75  		Handler: promhttp.HandlerFor(
    76  			prometheus.DefaultGatherer,
    77  			promhttp.HandlerOpts{
    78  				ErrorLog:      promw,
    79  				ErrorHandling: promhttp.ContinueOnError,
    80  			},
    81  		),
    82  	}
    83  
    84  	ln, err := net.Listen("tcp", promServer.Addr)
    85  	if err != nil {
    86  		return nil, err
    87  	}
    88  
    89  	go func() { log.GetLogger(ctx).Error(promServer.Serve(ln)) }()
    90  
    91  	l.Info(`measurements sink is activated`)
    92  	return
    93  }
    94  
    95  func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
    96  	promw.gauges = make(map[string]([]string))
    97  	for name, m := range metrics.MetricDefs {
    98  		promw.gauges[name] = m.Gauges
    99  	}
   100  	return nil
   101  }
   102  
   103  func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error {
   104  	if len(msg.Data) == 0 { // no batching in async prom mode, so using 0 indexing ok
   105  		return nil
   106  	}
   107  	promw.PromAsyncCacheAddMetricData(msg.DBName, msg.MetricName, msg)
   108  	return nil
   109  }
   110  
   111  // Async Prom cache
   112  var promAsyncMetricCache = make(map[string]map[string]metrics.MeasurementEnvelope) // [dbUnique][metric]lastly_fetched_data
   113  var promAsyncMetricCacheLock = sync.RWMutex{}
   114  
   115  func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope) { // cache structure: [dbUnique][metric]lastly_fetched_data
   116  	promAsyncMetricCacheLock.Lock()
   117  	defer promAsyncMetricCacheLock.Unlock()
   118  	if _, ok := promAsyncMetricCache[dbUnique]; ok {
   119  		promAsyncMetricCache[dbUnique][metric] = msgArr
   120  	}
   121  }
   122  
   123  func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string) { // cache structure: [dbUnique][metric]lastly_fetched_data
   124  	promAsyncMetricCacheLock.Lock()
   125  	defer promAsyncMetricCacheLock.Unlock()
   126  	if _, ok := promAsyncMetricCache[dbUnique]; !ok {
   127  		metricMap := make(map[string]metrics.MeasurementEnvelope)
   128  		promAsyncMetricCache[dbUnique] = metricMap
   129  	}
   130  }
   131  
   132  func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string) {
   133  	promAsyncMetricCacheLock.Lock()
   134  	defer promAsyncMetricCacheLock.Unlock()
   135  
   136  	if metric == "" {
   137  		delete(promAsyncMetricCache, dbUnique) // whole host removed from config
   138  	} else {
   139  		delete(promAsyncMetricCache[dbUnique], metric)
   140  	}
   141  }
   142  
   143  func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error {
   144  	switch op {
   145  	case DeleteOp:
   146  		promw.PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metricName)
   147  	case AddOp:
   148  		promw.PromAsyncCacheInitIfRequired(dbUnique, metricName)
   149  	}
   150  	return nil
   151  }
   152  
   153  func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc) {
   154  }
   155  
   156  func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric) {
   157  	var rows int
   158  	var lastScrapeErrors float64
   159  	promw.totalScrapes.Add(1)
   160  	ch <- promw.totalScrapes
   161  
   162  	if len(promAsyncMetricCache) == 0 {
   163  		promw.logger.Warning("No dbs configured for monitoring. Check config")
   164  		ch <- promw.totalScrapeFailures
   165  		promw.lastScrapeErrors.Set(0)
   166  		ch <- promw.lastScrapeErrors
   167  		return
   168  	}
   169  	t1 := time.Now()
   170  	for dbname, metricsMessages := range promAsyncMetricCache {
   171  		for metric, metricMessages := range metricsMessages {
   172  			if metric == "change_events" {
   173  				continue // not supported
   174  			}
   175  			promMetrics := promw.MetricStoreMessageToPromMetrics(metricMessages)
   176  			rows += len(promMetrics)
   177  			for _, pm := range promMetrics { // collect & send later in batch? capMetricChan = 1000 limit in prometheus code
   178  				ch <- pm
   179  			}
   180  		}
   181  		promAsyncMetricCacheLock.Lock()
   182  		promAsyncMetricCache[dbname] = make(map[string]metrics.MeasurementEnvelope) // clear the cache for this db after metrics are collected
   183  		promAsyncMetricCacheLock.Unlock()
   184  	}
   185  	promw.logger.WithField("count", rows).WithField("elapsed", time.Since(t1)).Info("measurements written")
   186  	ch <- promw.totalScrapeFailures
   187  	promw.lastScrapeErrors.Set(lastScrapeErrors)
   188  	ch <- promw.lastScrapeErrors
   189  }
   190  
   191  func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric {
   192  	promMetrics := make([]prometheus.Metric, 0)
   193  	var epochTime time.Time
   194  	if len(msg.Data) == 0 {
   195  		return promMetrics
   196  	}
   197  
   198  	gauges := promw.gauges[msg.MetricName]
   199  
   200  	epochTime = time.Unix(0, msg.Data.GetEpoch())
   201  
   202  	if epochTime.Before(time.Now().Add(-promScrapingStalenessHardDropLimit)) {
   203  		promw.logger.Warningf("Dropping metric %s:%s cache set due to staleness (>%v)...", msg.DBName, msg.MetricName, promScrapingStalenessHardDropLimit)
   204  		promw.PurgeMetricsFromPromAsyncCacheIfAny(msg.DBName, msg.MetricName)
   205  		return promMetrics
   206  	}
   207  
   208  	for _, dr := range msg.Data {
   209  		labels := make(map[string]string)
   210  		fields := make(map[string]float64)
   211  		if msg.CustomTags != nil {
   212  			labels = maps.Clone(msg.CustomTags)
   213  		}
   214  		labels["dbname"] = msg.DBName
   215  
   216  		for k, v := range dr {
   217  			if v == nil || v == "" || k == metrics.EpochColumnName {
   218  				continue // not storing NULLs. epoch checked/assigned once
   219  			}
   220  
   221  			if strings.HasPrefix(k, "tag_") {
   222  				tag := k[4:]
   223  				labels[tag] = fmt.Sprintf("%v", v)
   224  			} else {
   225  				switch t := v.(type) {
   226  				case string:
   227  					labels[k] = t
   228  				case int, int32, int64, float32, float64:
   229  					f, err := strconv.ParseFloat(fmt.Sprintf("%v", v), 64)
   230  					if err != nil {
   231  						promw.logger.Debugf("skipping scraping column %s of [%s:%s]: %v", k, msg.DBName, msg.MetricName, err)
   232  					}
   233  					fields[k] = f
   234  				case bool:
   235  					fields[k] = map[bool]float64{true: 1, false: 0}[t]
   236  				default:
   237  					promw.logger.Debugf("skipping scraping column %s of [%s:%s], unsupported datatype: %v", k, msg.DBName, msg.MetricName, t)
   238  					continue
   239  				}
   240  			}
   241  		}
   242  
   243  		labelKeys := make([]string, 0)
   244  		labelValues := make([]string, 0)
   245  		for k, v := range labels {
   246  			labelKeys = append(labelKeys, k)
   247  			labelValues = append(labelValues, v)
   248  		}
   249  
   250  		for field, value := range fields {
   251  			fieldPromDataType := prometheus.CounterValue
   252  			if msg.MetricName == promInstanceUpStateMetric ||
   253  				len(gauges) > 0 && (gauges[0] == "*" || slices.Contains(gauges, field)) {
   254  				fieldPromDataType = prometheus.GaugeValue
   255  			}
   256  			var desc *prometheus.Desc
   257  			if promw.PrometheusNamespace != "" {
   258  				if msg.MetricName == promInstanceUpStateMetric { // handle the special "instance_up" check
   259  					desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", promw.PrometheusNamespace, msg.MetricName),
   260  						msg.MetricName, labelKeys, nil)
   261  				} else {
   262  					desc = prometheus.NewDesc(fmt.Sprintf("%s_%s_%s", promw.PrometheusNamespace, msg.MetricName, field),
   263  						msg.MetricName, labelKeys, nil)
   264  				}
   265  			} else {
   266  				if msg.MetricName == promInstanceUpStateMetric { // handle the special "instance_up" check
   267  					desc = prometheus.NewDesc(field, msg.MetricName, labelKeys, nil)
   268  				} else {
   269  					desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", msg.MetricName, field), msg.MetricName, labelKeys, nil)
   270  				}
   271  			}
   272  			m := prometheus.MustNewConstMetric(desc, fieldPromDataType, value, labelValues...)
   273  			promMetrics = append(promMetrics, prometheus.NewMetricWithTimestamp(epochTime, m))
   274  		}
   275  	}
   276  	return promMetrics
   277  }
   278