...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/prometheus.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"net"
     7  	"net/http"
     8  	"slices"
     9  	"strconv"
    10  	"strings"
    11  	"sync"
    12  	"time"
    13  
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    16  	"github.com/prometheus/client_golang/prometheus"
    17  	"github.com/prometheus/client_golang/prometheus/promhttp"
    18  )
    19  
    20  // PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper.
    21  // Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.
    22  type PrometheusWriter struct {
    23  	ctx                               context.Context
    24  	lastScrapeErrors                  prometheus.Gauge
    25  	totalScrapes, totalScrapeFailures prometheus.Counter
    26  	PrometheusNamespace               string
    27  }
    28  
    29  const promInstanceUpStateMetric = "instance_up"
    30  
    31  // timestamps older than that will be ignored on the Prom scraper side anyway, so better don't emit at all and just log a notice
    32  const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)
    33  
    34  func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error) {
    35  	addr, namespace, found := strings.Cut(connstr, "/")
    36  	if !found {
    37  		namespace = "pgwatch"
    38  	}
    39  	l := log.GetLogger(ctx).WithField("sink", "prometheus").WithField("address", addr)
    40  	ctx = log.WithLogger(ctx, l)
    41  	promw = &PrometheusWriter{
    42  		ctx:                 ctx,
    43  		PrometheusNamespace: namespace,
    44  		lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
    45  			Namespace: namespace,
    46  			Name:      "exporter_last_scrape_errors",
    47  			Help:      "Last scrape error count for all monitored hosts / metrics",
    48  		}),
    49  		totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
    50  			Namespace: namespace,
    51  			Name:      "exporter_total_scrapes",
    52  			Help:      "Total scrape attempts.",
    53  		}),
    54  		totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
    55  			Namespace: namespace,
    56  			Name:      "exporter_total_scrape_failures",
    57  			Help:      "Number of errors while executing metric queries",
    58  		}),
    59  	}
    60  
    61  	if err = prometheus.Register(promw); err != nil {
    62  		return
    63  	}
    64  	promServer := &http.Server{
    65  		Addr:    addr,
    66  		Handler: promhttp.Handler(),
    67  	}
    68  
    69  	ln, err := net.Listen("tcp", promServer.Addr)
    70  	if err != nil {
    71  		return nil, err
    72  	}
    73  
    74  	go func() { log.GetLogger(ctx).Error(promServer.Serve(ln)) }()
    75  
    76  	l.Info(`measurements sink is activated`)
    77  	return
    78  }
    79  
    80  func (promw *PrometheusWriter) Write(msgs []metrics.MeasurementEnvelope) error {
    81  	if len(msgs) == 0 || len(msgs[0].Data) == 0 { // no batching in async prom mode, so using 0 indexing ok
    82  		return nil
    83  	}
    84  	msg := msgs[0]
    85  	promw.PromAsyncCacheAddMetricData(msg.DBName, msg.MetricName, msgs)
    86  	return nil
    87  }
    88  
    89  // Async Prom cache
    90  var promAsyncMetricCache = make(map[string]map[string][]metrics.MeasurementEnvelope) // [dbUnique][metric]lastly_fetched_data
    91  var promAsyncMetricCacheLock = sync.RWMutex{}
    92  
    93  func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr []metrics.MeasurementEnvelope) { // cache structure: [dbUnique][metric]lastly_fetched_data
    94  	promAsyncMetricCacheLock.Lock()
    95  	defer promAsyncMetricCacheLock.Unlock()
    96  	if _, ok := promAsyncMetricCache[dbUnique]; ok {
    97  		promAsyncMetricCache[dbUnique][metric] = msgArr
    98  	}
    99  }
   100  
   101  func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string) { // cache structure: [dbUnique][metric]lastly_fetched_data
   102  	promAsyncMetricCacheLock.Lock()
   103  	defer promAsyncMetricCacheLock.Unlock()
   104  	if _, ok := promAsyncMetricCache[dbUnique]; !ok {
   105  		metricMap := make(map[string][]metrics.MeasurementEnvelope)
   106  		promAsyncMetricCache[dbUnique] = metricMap
   107  	}
   108  }
   109  
   110  func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string) {
   111  	promAsyncMetricCacheLock.Lock()
   112  	defer promAsyncMetricCacheLock.Unlock()
   113  
   114  	if metric == "" {
   115  		delete(promAsyncMetricCache, dbUnique) // whole host removed from config
   116  	} else {
   117  		delete(promAsyncMetricCache[dbUnique], metric)
   118  	}
   119  }
   120  
   121  func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName, op string) error {
   122  	switch op {
   123  	case "remove":
   124  		promw.PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metricName)
   125  	case "add":
   126  		promw.PromAsyncCacheInitIfRequired(dbUnique, metricName)
   127  	}
   128  	return nil
   129  }
   130  
   131  func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc) {
   132  }
   133  
   134  func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric) {
   135  	var lastScrapeErrors float64
   136  	logger := log.GetLogger(promw.ctx)
   137  	promw.totalScrapes.Add(1)
   138  	ch <- promw.totalScrapes
   139  
   140  	if len(promAsyncMetricCache) == 0 {
   141  		logger.Warning("No dbs configured for monitoring. Check config")
   142  		ch <- promw.totalScrapeFailures
   143  		promw.lastScrapeErrors.Set(0)
   144  		ch <- promw.lastScrapeErrors
   145  		return
   146  	}
   147  
   148  	for dbname, metricsMessages := range promAsyncMetricCache {
   149  		promw.setInstanceUpDownState(ch, dbname)
   150  		for metric, metricMessages := range metricsMessages {
   151  			if metric == "change_events" {
   152  				continue // not supported
   153  			}
   154  			if len(metricMessages) > 0 {
   155  				promMetrics := promw.MetricStoreMessageToPromMetrics(metricMessages[0])
   156  				for _, pm := range promMetrics { // collect & send later in batch? capMetricChan = 1000 limit in prometheus code
   157  					ch <- pm
   158  				}
   159  			}
   160  		}
   161  
   162  	}
   163  
   164  	ch <- promw.totalScrapeFailures
   165  	promw.lastScrapeErrors.Set(lastScrapeErrors)
   166  	ch <- promw.lastScrapeErrors
   167  
   168  	// atomic.StoreInt64(&lastSuccessfulDatastoreWriteTimeEpoch, time.Now().Unix())
   169  }
   170  
   171  func (promw *PrometheusWriter) setInstanceUpDownState(ch chan<- prometheus.Metric, dbName string) {
   172  	logger := log.GetLogger(promw.ctx)
   173  	data := metrics.NewMeasurement(time.Now().UnixNano())
   174  	data[promInstanceUpStateMetric] = 1
   175  
   176  	pm := promw.MetricStoreMessageToPromMetrics(metrics.MeasurementEnvelope{
   177  		DBName:           dbName,
   178  		SourceType:       "postgres",
   179  		MetricName:       promInstanceUpStateMetric,
   180  		CustomTags:       nil, //md.CustomTags,
   181  		Data:             metrics.Measurements{data},
   182  		RealDbname:       dbName, //vme.RealDbname,
   183  		SystemIdentifier: dbName, //vme.SystemIdentifier,
   184  	})
   185  
   186  	if len(pm) > 0 {
   187  		ch <- pm[0]
   188  	} else {
   189  		logger.Error("Could not formulate an instance state report - should not happen")
   190  	}
   191  }
   192  
   193  func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric {
   194  	promMetrics := make([]prometheus.Metric, 0)
   195  	logger := log.GetLogger(promw.ctx)
   196  	var epochTime time.Time
   197  
   198  	if len(msg.Data) == 0 {
   199  		return promMetrics
   200  	}
   201  
   202  	epochTime = time.Unix(0, msg.Data.GetEpoch())
   203  
   204  	if epochTime.Before(time.Now().Add(-promScrapingStalenessHardDropLimit)) {
   205  		logger.Warningf("[%s][%s] Dropping metric set due to staleness (>%v) ...", msg.DBName, msg.MetricName, promScrapingStalenessHardDropLimit)
   206  		promw.PurgeMetricsFromPromAsyncCacheIfAny(msg.DBName, msg.MetricName)
   207  		return promMetrics
   208  	}
   209  
   210  	for _, dr := range msg.Data {
   211  		labels := make(map[string]string)
   212  		fields := make(map[string]float64)
   213  		labels["dbname"] = msg.DBName
   214  
   215  		for k, v := range dr {
   216  			if v == nil || v == "" || k == metrics.EpochColumnName {
   217  				continue // not storing NULLs. epoch checked/assigned once
   218  			}
   219  
   220  			if strings.HasPrefix(k, "tag_") {
   221  				tag := k[4:]
   222  				labels[tag] = fmt.Sprintf("%v", v)
   223  			} else {
   224  				switch t := v.(type) {
   225  				case int, int32, int64, float32, float64:
   226  					f, err := strconv.ParseFloat(fmt.Sprintf("%v", v), 64)
   227  					if err != nil {
   228  						logger.Debugf("skipping scraping column %s of [%s:%s]: %v", k, msg.DBName, msg.MetricName, err)
   229  					}
   230  					fields[k] = f
   231  				case bool:
   232  					fields[k] = map[bool]float64{true: 1, false: 0}[t]
   233  				default:
   234  					logger.Debugf("skipping scraping column %s of [%s:%s], unsupported datatype: %v", k, msg.DBName, msg.MetricName, t)
   235  					continue
   236  				}
   237  			}
   238  		}
   239  		if msg.CustomTags != nil {
   240  			for k, v := range msg.CustomTags {
   241  				labels[k] = fmt.Sprintf("%v", v)
   242  			}
   243  		}
   244  
   245  		labelKeys := make([]string, 0)
   246  		labelValues := make([]string, 0)
   247  		for k, v := range labels {
   248  			labelKeys = append(labelKeys, k)
   249  			labelValues = append(labelValues, v)
   250  		}
   251  
   252  		for field, value := range fields {
   253  			fieldPromDataType := prometheus.CounterValue
   254  			if msg.MetricName == promInstanceUpStateMetric ||
   255  				len(msg.MetricDef.Gauges) > 0 &&
   256  					(msg.MetricDef.Gauges[0] == "*" || slices.Contains(msg.MetricDef.Gauges, field)) {
   257  				fieldPromDataType = prometheus.GaugeValue
   258  			}
   259  			var desc *prometheus.Desc
   260  			if promw.PrometheusNamespace != "" {
   261  				if msg.MetricName == promInstanceUpStateMetric { // handle the special "instance_up" check
   262  					desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", promw.PrometheusNamespace, msg.MetricName),
   263  						msg.MetricName, labelKeys, nil)
   264  				} else {
   265  					desc = prometheus.NewDesc(fmt.Sprintf("%s_%s_%s", promw.PrometheusNamespace, msg.MetricName, field),
   266  						msg.MetricName, labelKeys, nil)
   267  				}
   268  			} else {
   269  				if msg.MetricName == promInstanceUpStateMetric { // handle the special "instance_up" check
   270  					desc = prometheus.NewDesc(field, msg.MetricName, labelKeys, nil)
   271  				} else {
   272  					desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", msg.MetricName, field), msg.MetricName, labelKeys, nil)
   273  				}
   274  			}
   275  			m := prometheus.MustNewConstMetric(desc, fieldPromDataType, value, labelValues...)
   276  			promMetrics = append(promMetrics, prometheus.NewMetricWithTimestamp(epochTime, m))
   277  		}
   278  	}
   279  	return promMetrics
   280  }
   281