...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/sinks/prometheus.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"maps"
     7  	"net"
     8  	"net/http"
     9  	"slices"
    10  	"strconv"
    11  	"strings"
    12  	"sync"
    13  	"time"
    14  
    15  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    16  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    17  	"github.com/prometheus/client_golang/prometheus"
    18  	"github.com/prometheus/client_golang/prometheus/promhttp"
    19  )
    20  
    21  type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope // [dbUnique][metric]lastly_fetched_data
    22  
    23  // PrometheusWriter is a Prometheus exporter that implements the prometheus.Collector
    24  // interface using the "unchecked collector" pattern (empty Describe method).
    25  //
    26  // Design decisions based on Prometheus exporter guidelines
    27  // (https://prometheus.io/docs/instrumenting/writing_exporters/#collectors):
    28  //
    29  //   - Metrics are collected periodically by reaper and cached in-memory.
    30  //     On scrape, the collector reads a snapshot of the cache
    31  //     and emits fresh NewConstMetric values. The cache is NOT consumed on
    32  //     scrape, so parallel or back-to-back scrapes see the same data until the
    33  //     next Write() updates arrive.
    34  //
    35  //   - This is an "unchecked collector": Describe() sends no descriptors, which
    36  //     tells the Prometheus registry to skip consistency checks. This is necessary
    37  //     because the set of metrics is dynamic (driven by monitored databases and
    38  //     their query results). Safety is ensured by deduplicating metric identities
    39  //     within each Collect() call.
    40  //
    41  //   - Label keys are always sorted lexicographically before building descriptors
    42  //     and label value slices. This guarantees deterministic descriptor identity
    43  //     regardless of Go map iteration order.
    44  type PrometheusWriter struct {
    45  	sync.RWMutex
    46  	logger    log.Logger
    47  	ctx       context.Context
    48  	gauges    map[string]([]string) // map of metric names to their gauge column names
    49  	Namespace string
    50  	Cache     PromMetricCache // [dbUnique][metric]lastly_fetched_data
    51  
    52  	// Self-instrumentation metrics
    53  	lastScrapeErrors    prometheus.Gauge
    54  	totalScrapes        prometheus.Counter
    55  	totalScrapeFailures prometheus.Counter
    56  }
    57  
    58  const promInstanceUpStateMetric = "instance_up"
    59  
    60  // timestamps older than that will be ignored on the Prom scraper side anyway, so better don't emit at all and just log a notice
    61  const promCacheTTL = time.Minute * time.Duration(10)
    62  
    63  func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error) {
    64  	addr, namespace, found := strings.Cut(connstr, "/")
    65  	if !found || namespace == "" {
    66  		namespace = "pgwatch"
    67  	}
    68  	l := log.GetLogger(ctx).WithField("sink", "prometheus").WithField("address", addr)
    69  	ctx = log.WithLogger(ctx, l)
    70  
    71  	promw = &PrometheusWriter{
    72  		ctx:       ctx,
    73  		logger:    l,
    74  		Namespace: namespace,
    75  		Cache:     make(PromMetricCache),
    76  		lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
    77  			Namespace: namespace,
    78  			Name:      "exporter_last_scrape_errors",
    79  			Help:      "Last scrape error count for all monitored hosts / metrics",
    80  		}),
    81  		totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
    82  			Namespace: namespace,
    83  			Name:      "exporter_total_scrapes",
    84  			Help:      "Total scrape attempts.",
    85  		}),
    86  		totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
    87  			Namespace: namespace,
    88  			Name:      "exporter_total_scrape_failures",
    89  			Help:      "Number of errors while executing metric queries",
    90  		}),
    91  	}
    92  
    93  	if err = prometheus.Register(promw); err != nil {
    94  		return
    95  	}
    96  
    97  	promServer := &http.Server{
    98  		Addr: addr,
    99  		Handler: promhttp.HandlerFor(
   100  			prometheus.DefaultGatherer,
   101  			promhttp.HandlerOpts{
   102  				ErrorLog:      promw,
   103  				ErrorHandling: promhttp.ContinueOnError,
   104  			},
   105  		),
   106  	}
   107  
   108  	ln, err := net.Listen("tcp", promServer.Addr)
   109  	if err != nil {
   110  		return nil, err
   111  	}
   112  
   113  	go func() { log.GetLogger(ctx).Error(promServer.Serve(ln)) }()
   114  
   115  	l.Info(`measurements sink is activated`)
   116  	return
   117  }
   118  
   119  // Println implements promhttp.Logger
   120  func (promw *PrometheusWriter) Println(v ...any) {
   121  	promw.logger.Errorln(v...)
   122  }
   123  
   124  // DefineMetrics is called by reaper on startup and whenever metric definitions change
   125  func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
   126  	promw.Lock()
   127  	defer promw.Unlock()
   128  	promw.gauges = make(map[string]([]string))
   129  	for name, m := range metrics.MetricDefs {
   130  		promw.gauges[name] = m.Gauges
   131  	}
   132  	return nil
   133  }
   134  
   135  // Write is called by reaper whenever new measurement data arrives
   136  func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error {
   137  	if len(msg.Data) == 0 {
   138  		return nil
   139  	}
   140  	promw.AddCacheEntry(msg.DBName, msg.MetricName, msg)
   141  	return nil
   142  }
   143  
   144  // SyncMetric is called by reaper when a metric or monitored source is removed or added,
   145  // allowing the writer to purge or initialize cache entries as needed
   146  func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
   147  	switch op {
   148  	case DeleteOp:
   149  		promw.PurgeCacheEntry(sourceName, metricName)
   150  	case AddOp:
   151  		promw.InitCacheEntry(sourceName)
   152  	}
   153  	return nil
   154  }
   155  
   156  var notSupportedMetrics = map[string]struct{}{
   157  	"change_events":     {}, // fully consist of text columns
   158  	"pgbouncer_stats":   {}, // this and below metrics column names cannot be renamed with tag_ prefix
   159  	"pgbouncer_clients": {},
   160  	"pgpool_processes":  {},
   161  	"pgpool_stats":      {},
   162  }
   163  
   164  func (promw *PrometheusWriter) AddCacheEntry(dbUnique, metric string, msgArr metrics.MeasurementEnvelope) { // cache structure: [dbUnique][metric]lastly_fetched_data
   165  	if _, ok := notSupportedMetrics[metric]; ok {
   166  		return // not supported
   167  	}
   168  	promw.Lock()
   169  	defer promw.Unlock()
   170  	if _, ok := promw.Cache[dbUnique]; !ok {
   171  		promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
   172  	}
   173  	promw.Cache[dbUnique][metric] = msgArr
   174  }
   175  
   176  func (promw *PrometheusWriter) InitCacheEntry(dbUnique string) {
   177  	promw.Lock()
   178  	defer promw.Unlock()
   179  	if _, ok := promw.Cache[dbUnique]; !ok {
   180  		promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
   181  	}
   182  }
   183  
   184  func (promw *PrometheusWriter) PurgeCacheEntry(dbUnique, metric string) {
   185  	promw.Lock()
   186  	defer promw.Unlock()
   187  	if metric == "" {
   188  		delete(promw.Cache, dbUnique) // whole host removed from config
   189  		return
   190  	}
   191  	delete(promw.Cache[dbUnique], metric)
   192  }
   193  
   194  // Describe is intentionally empty to makes PrometheusWriter an "unchecked
   195  // collector" per the prometheus.Collector contract
   196  func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc) {
   197  }
   198  
   199  // Collect implements prometheus.Collector. It reads a snapshot of the metric
   200  // cache and emits const metrics. Parallel scrapes see the same data until
   201  // background Write() calls update it
   202  func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric) {
   203  	promw.totalScrapes.Add(1)
   204  	ch <- promw.totalScrapes
   205  
   206  	promw.RLock()
   207  	if len(promw.Cache) == 0 {
   208  		promw.RUnlock()
   209  		promw.logger.Warning("No dbs configured for monitoring. Check config")
   210  		ch <- promw.totalScrapeFailures
   211  		promw.lastScrapeErrors.Set(0)
   212  		ch <- promw.lastScrapeErrors
   213  		return
   214  	}
   215  	snapshot := promw.snapshotCache()
   216  	promw.RUnlock()
   217  
   218  	var rows int
   219  	var lastScrapeErrors float64
   220  
   221  	t1 := time.Now()
   222  	for _, metricsMessages := range snapshot {
   223  		for _, envelope := range metricsMessages {
   224  			written, errors := promw.WritePromMetrics(envelope, ch)
   225  			lastScrapeErrors += float64(errors)
   226  			rows += written
   227  		}
   228  	}
   229  	promw.logger.WithField("count", rows).WithField("elapsed", time.Since(t1)).Info("measurements written")
   230  	ch <- promw.totalScrapeFailures
   231  	promw.lastScrapeErrors.Set(lastScrapeErrors)
   232  	ch <- promw.lastScrapeErrors
   233  }
   234  
   235  // snapshotCache creates a shallow copy of the cache map hierarchy.
   236  // Must be called under at least promw.RLock().
   237  // The MeasurementEnvelope values are not deep-copied because writers always
   238  // replace entire envelopes (never mutate them in place).
   239  func (promw *PrometheusWriter) snapshotCache() PromMetricCache {
   240  	snapshot := make(PromMetricCache, len(promw.Cache))
   241  	for db, metricMap := range promw.Cache {
   242  		snapshot[db] = maps.Clone(metricMap)
   243  	}
   244  	return snapshot
   245  }
   246  
   247  // WritePromMetrics converts a MeasurementEnvelope into Prometheus const metrics
   248  // and sends them directly to ch. Returns the count of metrics written and errors encountered.
   249  func (promw *PrometheusWriter) WritePromMetrics(msg metrics.MeasurementEnvelope, ch chan<- prometheus.Metric) (written int, errorCount int) {
   250  	if len(msg.Data) == 0 {
   251  		return
   252  	}
   253  
   254  	promw.RLock()
   255  	gauges := promw.gauges[msg.MetricName]
   256  	promw.RUnlock()
   257  
   258  	epochTime := time.Unix(0, msg.Data.GetEpoch())
   259  
   260  	if epochTime.Before(time.Now().Add(-promCacheTTL)) {
   261  		promw.logger.Debugf("Dropping metric %s:%s cache set due to staleness (>%v)...", msg.DBName, msg.MetricName, promCacheTTL)
   262  		promw.PurgeCacheEntry(msg.DBName, msg.MetricName)
   263  		return
   264  	}
   265  
   266  	seen := make(map[string]any)
   267  
   268  	for _, measurement := range msg.Data {
   269  		labels := make(map[string]string)
   270  		fields := make(map[string]float64)
   271  		if msg.CustomTags != nil {
   272  			labels = maps.Clone(msg.CustomTags)
   273  		}
   274  		labels["dbname"] = msg.DBName
   275  
   276  		for k, v := range measurement {
   277  			if k == metrics.EpochColumnName || v == nil || v == "" {
   278  				continue // epoch checked/assigned once
   279  			}
   280  
   281  			tag, found := strings.CutPrefix(k, "tag_")
   282  			if found {
   283  				labels[tag] = fmt.Sprintf("%v", v)
   284  			} else {
   285  				switch t := v.(type) {
   286  				case int, int32, int64, float32, float64:
   287  					fields[k], _ = strconv.ParseFloat(fmt.Sprintf("%v", v), 64)
   288  				case bool:
   289  					fields[k] = map[bool]float64{true: 1, false: 0}[t]
   290  				default:
   291  					// Only "tag_" prefixed columns become labels (handled above).
   292  					// Plain string columns (e.g. data_dir, version_str) are not
   293  					// numeric values and must not be promoted to labels
   294  					promw.logger.Debugf("skipping scraping column %s of [%s:%s], unsupported datatype: %v", k, msg.DBName, msg.MetricName, t)
   295  					continue
   296  				}
   297  			}
   298  		}
   299  
   300  		// Sort label keys for deterministic descriptor identity.
   301  		// Since Go maps iterate in random order, the same label set
   302  		// could produce different prometheus.Desc objects across rows or scrapes,
   303  		// leading to "duplicate metric" errors
   304  		labelKeys := slices.Sorted(maps.Keys(labels))
   305  		labelValues := make([]string, len(labelKeys))
   306  		for i, k := range labelKeys {
   307  			labelValues[i] = labels[k]
   308  		}
   309  
   310  		for field, value := range fields {
   311  			fieldPromDataType := prometheus.CounterValue
   312  			if msg.MetricName == promInstanceUpStateMetric ||
   313  				len(gauges) > 0 && (gauges[0] == "*" || slices.Contains(gauges, field)) {
   314  				fieldPromDataType = prometheus.GaugeValue
   315  			}
   316  
   317  			var fqName string
   318  			if msg.MetricName == promInstanceUpStateMetric {
   319  				fqName = fmt.Sprintf("%s_%s", promw.Namespace, msg.MetricName)
   320  			} else {
   321  				fqName = fmt.Sprintf("%s_%s_%s", promw.Namespace, msg.MetricName, field)
   322  			}
   323  
   324  			// skip if this exact identity was already emitted in this scrape
   325  			identity := fqName + "_" + strings.Join(labelValues, "_")
   326  			if _, dup := seen[identity]; dup {
   327  				promw.logger.
   328  					WithField("metric", msg.MetricName).
   329  					Warning("duplicate metric identity dropped, prefix differentiating string columns with tag_")
   330  				errorCount++
   331  				continue
   332  			}
   333  			seen[identity] = struct{}{}
   334  
   335  			desc := prometheus.NewDesc(fqName, msg.MetricName, labelKeys, nil)
   336  			m, err := prometheus.NewConstMetric(desc, fieldPromDataType, value, labelValues...)
   337  			if err != nil {
   338  				promw.logger.Warningf("skipping metric %s of [%s:%s]: %v", fqName, msg.DBName, msg.MetricName, err)
   339  				errorCount++
   340  				continue
   341  			}
   342  			ch <- prometheus.NewMetricWithTimestamp(epochTime, m)
   343  			written++
   344  		}
   345  	}
   346  	return
   347  }
   348