...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/sinks/prometheus.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"maps"
     7  	"net"
     8  	"net/http"
     9  	"slices"
    10  	"strconv"
    11  	"strings"
    12  	"sync"
    13  	"time"
    14  
    15  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    16  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    17  	"github.com/prometheus/client_golang/prometheus"
    18  	"github.com/prometheus/client_golang/prometheus/promhttp"
    19  )
    20  
    21  type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope // [dbUnique][metric]lastly_fetched_data
    22  
    23  // PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper.
    24  // Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.
    25  type PrometheusWriter struct {
    26  	sync.RWMutex
    27  	logger              log.Logger
    28  	ctx                 context.Context
    29  	lastScrapeErrors    prometheus.Gauge
    30  	totalScrapes        prometheus.Counter
    31  	totalScrapeFailures prometheus.Counter
    32  	gauges              map[string]([]string) // map of metric names to their gauge names, used for Prometheus gauge metrics
    33  	Namespace           string
    34  	Cache               PromMetricCache // [dbUnique][metric]lastly_fetched_data
    35  }
    36  
    37  const promInstanceUpStateMetric = "instance_up"
    38  
    39  // timestamps older than that will be ignored on the Prom scraper side anyway, so better don't emit at all and just log a notice
    40  const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)
    41  
    42  func (promw *PrometheusWriter) Println(v ...any) {
    43  	promw.logger.Errorln(v...)
    44  }
    45  
    46  func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error) {
    47  	addr, namespace, found := strings.Cut(connstr, "/")
    48  	if !found {
    49  		namespace = "pgwatch"
    50  	}
    51  	l := log.GetLogger(ctx).WithField("sink", "prometheus").WithField("address", addr)
    52  	ctx = log.WithLogger(ctx, l)
    53  	promw = &PrometheusWriter{
    54  		ctx:       ctx,
    55  		logger:    l,
    56  		Namespace: namespace,
    57  		Cache:     make(PromMetricCache),
    58  		lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
    59  			Namespace: namespace,
    60  			Name:      "exporter_last_scrape_errors",
    61  			Help:      "Last scrape error count for all monitored hosts / metrics",
    62  		}),
    63  		totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
    64  			Namespace: namespace,
    65  			Name:      "exporter_total_scrapes",
    66  			Help:      "Total scrape attempts.",
    67  		}),
    68  		totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
    69  			Namespace: namespace,
    70  			Name:      "exporter_total_scrape_failures",
    71  			Help:      "Number of errors while executing metric queries",
    72  		}),
    73  	}
    74  
    75  	if err = prometheus.Register(promw); err != nil {
    76  		return
    77  	}
    78  
    79  	promServer := &http.Server{
    80  		Addr: addr,
    81  		Handler: promhttp.HandlerFor(
    82  			prometheus.DefaultGatherer,
    83  			promhttp.HandlerOpts{
    84  				ErrorLog:      promw,
    85  				ErrorHandling: promhttp.ContinueOnError,
    86  			},
    87  		),
    88  	}
    89  
    90  	ln, err := net.Listen("tcp", promServer.Addr)
    91  	if err != nil {
    92  		return nil, err
    93  	}
    94  
    95  	go func() { log.GetLogger(ctx).Error(promServer.Serve(ln)) }()
    96  
    97  	l.Info(`measurements sink is activated`)
    98  	return
    99  }
   100  
   101  func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
   102  	promw.gauges = make(map[string]([]string))
   103  	for name, m := range metrics.MetricDefs {
   104  		promw.gauges[name] = m.Gauges
   105  	}
   106  	return nil
   107  }
   108  
   109  func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error {
   110  	if len(msg.Data) == 0 { // no batching in async prom mode, so using 0 indexing ok
   111  		return nil
   112  	}
   113  	promw.PromAsyncCacheAddMetricData(msg.DBName, msg.MetricName, msg)
   114  	return nil
   115  }
   116  
   117  func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope) { // cache structure: [dbUnique][metric]lastly_fetched_data
   118  	promw.Lock()
   119  	defer promw.Unlock()
   120  	if _, ok := promw.Cache[dbUnique]; ok {
   121  		promw.Cache[dbUnique][metric] = msgArr
   122  	}
   123  }
   124  
   125  func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string) { // cache structure: [dbUnique][metric]lastly_fetched_data
   126  	promw.Lock()
   127  	defer promw.Unlock()
   128  	if _, ok := promw.Cache[dbUnique]; !ok {
   129  		promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
   130  	}
   131  }
   132  
   133  func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string) {
   134  	promw.Lock()
   135  	defer promw.Unlock()
   136  
   137  	if metric == "" {
   138  		delete(promw.Cache, dbUnique) // whole host removed from config
   139  	} else {
   140  		delete(promw.Cache[dbUnique], metric)
   141  	}
   142  }
   143  
   144  func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
   145  	switch op {
   146  	case DeleteOp:
   147  		promw.PurgeMetricsFromPromAsyncCacheIfAny(sourceName, metricName)
   148  	case AddOp:
   149  		promw.PromAsyncCacheInitIfRequired(sourceName, metricName)
   150  	}
   151  	return nil
   152  }
   153  
   154  func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc) {
   155  }
   156  
   157  func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric) {
   158  	var rows int
   159  	var lastScrapeErrors float64
   160  	promw.totalScrapes.Add(1)
   161  	ch <- promw.totalScrapes
   162  
   163  	promw.Lock()
   164  	if len(promw.Cache) == 0 {
   165  		promw.Unlock()
   166  		promw.logger.Warning("No dbs configured for monitoring. Check config")
   167  		ch <- promw.totalScrapeFailures
   168  		promw.lastScrapeErrors.Set(0)
   169  		ch <- promw.lastScrapeErrors
   170  		return
   171  	}
   172  	snapshot := promw.Cache
   173  	promw.Cache = make(PromMetricCache, len(snapshot))
   174  	for dbUnique := range snapshot {
   175  		promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
   176  	}
   177  	promw.Unlock()
   178  
   179  	t1 := time.Now()
   180  	for _, metricsMessages := range snapshot {
   181  		for metric, metricMessages := range metricsMessages {
   182  			if metric == "change_events" {
   183  				continue // not supported
   184  			}
   185  			promMetrics := promw.MetricStoreMessageToPromMetrics(metricMessages)
   186  			rows += len(promMetrics)
   187  			for _, pm := range promMetrics { // collect & send later in batch? capMetricChan = 1000 limit in prometheus code
   188  				ch <- pm
   189  			}
   190  		}
   191  	}
   192  	promw.logger.WithField("count", rows).WithField("elapsed", time.Since(t1)).Info("measurements written")
   193  	ch <- promw.totalScrapeFailures
   194  	promw.lastScrapeErrors.Set(lastScrapeErrors)
   195  	ch <- promw.lastScrapeErrors
   196  }
   197  
   198  func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric {
   199  	promMetrics := make([]prometheus.Metric, 0)
   200  	var epochTime time.Time
   201  	if len(msg.Data) == 0 {
   202  		return promMetrics
   203  	}
   204  
   205  	gauges := promw.gauges[msg.MetricName]
   206  
   207  	epochTime = time.Unix(0, msg.Data.GetEpoch())
   208  
   209  	if epochTime.Before(time.Now().Add(-promScrapingStalenessHardDropLimit)) {
   210  		promw.logger.Warningf("Dropping metric %s:%s cache set due to staleness (>%v)...", msg.DBName, msg.MetricName, promScrapingStalenessHardDropLimit)
   211  		promw.PurgeMetricsFromPromAsyncCacheIfAny(msg.DBName, msg.MetricName)
   212  		return promMetrics
   213  	}
   214  
   215  	for _, dr := range msg.Data {
   216  		labels := make(map[string]string)
   217  		fields := make(map[string]float64)
   218  		if msg.CustomTags != nil {
   219  			labels = maps.Clone(msg.CustomTags)
   220  		}
   221  		labels["dbname"] = msg.DBName
   222  
   223  		for k, v := range dr {
   224  			if v == nil || v == "" || k == metrics.EpochColumnName {
   225  				continue // not storing NULLs. epoch checked/assigned once
   226  			}
   227  
   228  			if strings.HasPrefix(k, "tag_") {
   229  				tag := k[4:]
   230  				labels[tag] = fmt.Sprintf("%v", v)
   231  			} else {
   232  				switch t := v.(type) {
   233  				case string:
   234  					labels[k] = t
   235  				case int, int32, int64, float32, float64:
   236  					f, err := strconv.ParseFloat(fmt.Sprintf("%v", v), 64)
   237  					if err != nil {
   238  						promw.logger.Debugf("skipping scraping column %s of [%s:%s]: %v", k, msg.DBName, msg.MetricName, err)
   239  					}
   240  					fields[k] = f
   241  				case bool:
   242  					fields[k] = map[bool]float64{true: 1, false: 0}[t]
   243  				default:
   244  					promw.logger.Debugf("skipping scraping column %s of [%s:%s], unsupported datatype: %v", k, msg.DBName, msg.MetricName, t)
   245  					continue
   246  				}
   247  			}
   248  		}
   249  
   250  		labelKeys := make([]string, 0)
   251  		labelValues := make([]string, 0)
   252  		for k, v := range labels {
   253  			labelKeys = append(labelKeys, k)
   254  			labelValues = append(labelValues, v)
   255  		}
   256  
   257  		for field, value := range fields {
   258  			fieldPromDataType := prometheus.CounterValue
   259  			if msg.MetricName == promInstanceUpStateMetric ||
   260  				len(gauges) > 0 && (gauges[0] == "*" || slices.Contains(gauges, field)) {
   261  				fieldPromDataType = prometheus.GaugeValue
   262  			}
   263  			var desc *prometheus.Desc
   264  			if promw.Namespace != "" {
   265  				if msg.MetricName == promInstanceUpStateMetric { // handle the special "instance_up" check
   266  					desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", promw.Namespace, msg.MetricName),
   267  						msg.MetricName, labelKeys, nil)
   268  				} else {
   269  					desc = prometheus.NewDesc(fmt.Sprintf("%s_%s_%s", promw.Namespace, msg.MetricName, field),
   270  						msg.MetricName, labelKeys, nil)
   271  				}
   272  			} else {
   273  				if msg.MetricName == promInstanceUpStateMetric { // handle the special "instance_up" check
   274  					desc = prometheus.NewDesc(field, msg.MetricName, labelKeys, nil)
   275  				} else {
   276  					desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", msg.MetricName, field), msg.MetricName, labelKeys, nil)
   277  				}
   278  			}
   279  			m := prometheus.MustNewConstMetric(desc, fieldPromDataType, value, labelValues...)
   280  			promMetrics = append(promMetrics, prometheus.NewMetricWithTimestamp(epochTime, m))
   281  		}
   282  	}
   283  	return promMetrics
   284  }
   285