...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/sinks/prometheus_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"testing"
     5  	"time"
     6  
     7  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
     8  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
     9  	"github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
    10  	"github.com/prometheus/client_golang/prometheus"
    11  	"github.com/stretchr/testify/assert"
    12  	"github.com/stretchr/testify/require"
    13  )
    14  
    15  func newTestPrometheusWriter(namespace string) *PrometheusWriter {
    16  	return &PrometheusWriter{
    17  		ctx:       testutil.TestContext,
    18  		logger:    log.GetLogger(testutil.TestContext),
    19  		Namespace: namespace,
    20  		Cache:     make(PromMetricCache),
    21  		lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
    22  			Namespace: namespace,
    23  			Name:      "test_last_scrape_errors",
    24  		}),
    25  		totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
    26  			Namespace: namespace,
    27  			Name:      "test_total_scrapes",
    28  		}),
    29  		totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
    30  			Namespace: namespace,
    31  			Name:      "test_total_scrape_failures",
    32  		}),
    33  	}
    34  }
    35  
    36  // TestWriteAfterCollect verifies that Write() works after Collect().
    37  // Since Collect() now reads a snapshot without clearing the cache,
    38  // the cache should still contain the original data after collect.
    39  func TestWriteAfterCollect(t *testing.T) {
    40  	promw := newTestPrometheusWriter("test")
    41  
    42  	// Write initial data
    43  	msg := metrics.MeasurementEnvelope{
    44  		DBName:     "db1",
    45  		MetricName: "metric1",
    46  		Data: metrics.Measurements{
    47  			{metrics.EpochColumnName: time.Now().UnixNano(), "value": int64(100)},
    48  		},
    49  	}
    50  	require.NoError(t, promw.Write(msg))
    51  
    52  	// Collect reads a snapshot — cache is NOT cleared
    53  	ch := make(chan prometheus.Metric, 100)
    54  	promw.Collect(ch)
    55  	assert.NotEmpty(t, promw.Cache, "cache should still contain data after Collect (snapshot-based)")
    56  
    57  	// Write after Collect — must work
    58  	msg.Data[0]["value"] = int64(200)
    59  	require.NoError(t, promw.Write(msg))
    60  
    61  	assert.Contains(t, promw.Cache, "db1")
    62  	assert.Equal(t, int64(200), promw.Cache["db1"]["metric1"].Data[0]["value"])
    63  }
    64  
    65  // TestCollect_CachePreserved verifies Collect() does not consume the cache.
    66  // Parallel scrapes and back-to-back scrapes should see the same data.
    67  func TestCollect_CachePreserved(t *testing.T) {
    68  	promw := newTestPrometheusWriter("test")
    69  
    70  	// Populate cache with multiple databases
    71  	for _, db := range []string{"db1", "db2", "db3", "db4", "db5"} {
    72  		promw.Cache[db] = map[string]metrics.MeasurementEnvelope{
    73  			"metric": {
    74  				DBName:     db,
    75  				MetricName: "metric",
    76  				Data: metrics.Measurements{
    77  					{metrics.EpochColumnName: time.Now().UnixNano(), "value": int64(1)},
    78  				},
    79  			},
    80  		}
    81  	}
    82  	assert.Len(t, promw.Cache, 5)
    83  
    84  	// First Collect
    85  	ch := make(chan prometheus.Metric, 100)
    86  	promw.Collect(ch)
    87  
    88  	// Cache should still have all 5 databases
    89  	assert.Len(t, promw.Cache, 5, "cache should be preserved after Collect")
    90  
    91  	// Second Collect should also work (back-to-back scrapes)
    92  	ch2 := make(chan prometheus.Metric, 100)
    93  	promw.Collect(ch2)
    94  	assert.Len(t, promw.Cache, 5, "cache should be preserved after second Collect")
    95  }
    96  
    97  // TestCollect_DeterministicLabelOrdering verifies that metrics with the same
    98  // labels produce consistent output regardless of map insertion order.
    99  // This is the fix for the "collected metric was collected before" error.
   100  func TestCollect_DeterministicLabelOrdering(t *testing.T) {
   101  	promw := newTestPrometheusWriter("test")
   102  	promw.gauges = map[string][]string{promInstanceUpStateMetric: {"*"}}
   103  
   104  	// Create two data rows with the same labels but potentially different
   105  	// map iteration orders (Go randomizes map iteration).
   106  	promw.Cache["db1"] = map[string]metrics.MeasurementEnvelope{
   107  		"metric1": {
   108  			DBName:     "db1",
   109  			MetricName: promInstanceUpStateMetric,
   110  			Data: metrics.Measurements{
   111  				{
   112  					metrics.EpochColumnName: time.Now().UnixNano(),
   113  					"tag_host":              "server1",
   114  					"tag_port":              "5432",
   115  					"tag_region":            "us-east-1",
   116  					"value":                 int64(42),
   117  				},
   118  			},
   119  		},
   120  	}
   121  
   122  	// Collect multiple times — should never produce duplicate errors.
   123  	// Each Collect emits 3 self-instrumentation metrics + 1 data metric = 4 total.
   124  	const metaMetrics = 3 // totalScrapes + totalScrapeFailures + lastScrapeErrors
   125  	for i := range 100 {
   126  		ch := make(chan prometheus.Metric, 100)
   127  		promw.Collect(ch)
   128  		close(ch)
   129  
   130  		var collected []prometheus.Metric
   131  		for m := range ch {
   132  			collected = append(collected, m)
   133  		}
   134  		// 1 data metric + 3 meta-metrics
   135  		assert.Len(t, collected, 1+metaMetrics, "iteration %d: expected 1 data + 3 meta metrics", i)
   136  	}
   137  }
   138  
   139  // TestCollect_DeduplicateMetrics verifies that duplicate row data
   140  // (same metric name + same label values) is emitted only once per scrape.
   141  func TestCollect_DeduplicateMetrics(t *testing.T) {
   142  	promw := newTestPrometheusWriter("test")
   143  	promw.gauges = map[string][]string{"metric1": {"*"}}
   144  
   145  	// Two identical rows — same labels, same field name
   146  	promw.Cache["db1"] = map[string]metrics.MeasurementEnvelope{
   147  		"metric1": {
   148  			DBName:     "db1",
   149  			MetricName: "metric1",
   150  			CustomTags: map[string]string{"sys_id": "42"}, // custom tags should not affect identity
   151  			Data: metrics.Measurements{
   152  				{
   153  					metrics.EpochColumnName: time.Now().UnixNano(),
   154  					"tag_host":              "server1",
   155  					"value":                 int64(42),
   156  					"bool_val":              false,
   157  					"extra_field1":          "ignored", // extra fields should not affect identity
   158  				},
   159  				{
   160  					metrics.EpochColumnName: time.Now().UnixNano(),
   161  					"tag_host":              "server1",
   162  					"value":                 int64(99), // different values, same identity
   163  					"bool_val":              true,
   164  					"extra_field1":          "ignored", // extra fields should not affect identity
   165  				},
   166  			},
   167  		},
   168  	}
   169  
   170  	ch := make(chan prometheus.Metric, 100)
   171  	promw.Collect(ch)
   172  	close(ch)
   173  
   174  	var count int
   175  	for c := range ch {
   176  		t.Log(c.Desc())
   177  		count++
   178  	}
   179  	// Should deduplicate — only the first occurrence is emitted.
   180  	// 2 data metrics + 3 meta-metrics = 5 total.
   181  	assert.Equal(t, 2+3, count, "duplicate metric identity should be deduplicated (1 data + 3 meta)")
   182  }
   183  
   184  // TestCollect_InvalidMetricDoesNotPanic verifies that a malformed metric
   185  // (label count mismatch) does not cause a panic. This tests the
   186  // NewConstMetric error handling path.
   187  func TestCollect_InvalidMetricDoesNotPanic(t *testing.T) {
   188  	promw := newTestPrometheusWriter("test")
   189  
   190  	// This should not panic under any circumstances
   191  	assert.NotPanics(t, func() {
   192  		ch := make(chan prometheus.Metric, 100)
   193  		promw.Collect(ch)
   194  	})
   195  }
   196  
   197  // TestCollect_EmptyCache verifies that collecting from an empty cache
   198  // produces only the self-instrumentation metrics.
   199  func TestCollect_EmptyCache(t *testing.T) {
   200  	promw := newTestPrometheusWriter("test")
   201  
   202  	ch := make(chan prometheus.Metric, 100)
   203  	promw.Collect(ch)
   204  	close(ch)
   205  
   206  	var count int
   207  	for range ch {
   208  		count++
   209  	}
   210  	// 0 data metrics + 3 meta-metrics (totalScrapes, totalScrapeFailures, lastScrapeErrors)
   211  	assert.Equal(t, 3, count, "empty cache should produce only 3 meta-metrics")
   212  }
   213  
   214  // TestCollect_StaleMetricsDropped verifies that metrics older than
   215  // promScrapingStalenessHardDropLimit are dropped.
   216  func TestCollect_StaleMetricsDropped(t *testing.T) {
   217  	promw := newTestPrometheusWriter("test")
   218  	promw.gauges = map[string][]string{"metric1": {"*"}}
   219  
   220  	staleEpoch := time.Now().Add(-promCacheTTL - time.Minute).UnixNano()
   221  	promw.Cache["db1"] = map[string]metrics.MeasurementEnvelope{
   222  		"metric1": {
   223  			DBName:     "db1",
   224  			MetricName: "metric1",
   225  			Data: metrics.Measurements{
   226  				{
   227  					metrics.EpochColumnName: staleEpoch,
   228  					"value":                 int64(42),
   229  				},
   230  			},
   231  		},
   232  	}
   233  
   234  	ch := make(chan prometheus.Metric, 100)
   235  	promw.Collect(ch)
   236  	close(ch)
   237  
   238  	var count int
   239  	for range ch {
   240  		count++
   241  	}
   242  	// 0 data metrics + 3 meta-metrics
   243  	assert.Equal(t, 3, count, "stale metrics should be dropped, only meta-metrics remain")
   244  }
   245  
   246  func TestPrometheusWriteEmpty(t *testing.T) {
   247  	promw := newTestPrometheusWriter("test")
   248  	assert.NoError(t, promw.Write(metrics.MeasurementEnvelope{}))
   249  	ch := make(chan prometheus.Metric, 100)
   250  	written, errCount := promw.WritePromMetrics(metrics.MeasurementEnvelope{}, ch)
   251  	assert.Zero(t, errCount)
   252  	assert.Zero(t, written)
   253  	close(ch)
   254  }
   255  
   256  func TestPrometheusWRiteUnsupportedMetric(t *testing.T) {
   257  	promw := newTestPrometheusWriter("test")
   258  	assert.NoError(t, promw.Write(metrics.MeasurementEnvelope{
   259  		DBName:     "db1",
   260  		MetricName: "change_events", // unsupported metric
   261  		Data: metrics.Measurements{
   262  			{metrics.EpochColumnName: time.Now().UnixNano(), "value": int64(100)},
   263  		},
   264  	}))
   265  }
   266