1 package sinks
2
3 import (
4 "testing"
5 "time"
6
7 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
8 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
9 "github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
10 "github.com/prometheus/client_golang/prometheus"
11 "github.com/stretchr/testify/assert"
12 "github.com/stretchr/testify/require"
13 )
14
15 func newTestPrometheusWriter(namespace string) *PrometheusWriter {
16 return &PrometheusWriter{
17 ctx: testutil.TestContext,
18 logger: log.GetLogger(testutil.TestContext),
19 Namespace: namespace,
20 Cache: make(PromMetricCache),
21 lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
22 Namespace: namespace,
23 Name: "test_last_scrape_errors",
24 }),
25 totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
26 Namespace: namespace,
27 Name: "test_total_scrapes",
28 }),
29 totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
30 Namespace: namespace,
31 Name: "test_total_scrape_failures",
32 }),
33 }
34 }
35
36
37
38
39 func TestWriteAfterCollect(t *testing.T) {
40 promw := newTestPrometheusWriter("test")
41
42
43 msg := metrics.MeasurementEnvelope{
44 DBName: "db1",
45 MetricName: "metric1",
46 Data: metrics.Measurements{
47 {metrics.EpochColumnName: time.Now().UnixNano(), "value": int64(100)},
48 },
49 }
50 require.NoError(t, promw.Write(msg))
51
52
53 ch := make(chan prometheus.Metric, 100)
54 promw.Collect(ch)
55 assert.NotEmpty(t, promw.Cache, "cache should still contain data after Collect (snapshot-based)")
56
57
58 msg.Data[0]["value"] = int64(200)
59 require.NoError(t, promw.Write(msg))
60
61 assert.Contains(t, promw.Cache, "db1")
62 assert.Equal(t, int64(200), promw.Cache["db1"]["metric1"].Data[0]["value"])
63 }
64
65
66
67 func TestCollect_CachePreserved(t *testing.T) {
68 promw := newTestPrometheusWriter("test")
69
70
71 for _, db := range []string{"db1", "db2", "db3", "db4", "db5"} {
72 promw.Cache[db] = map[string]metrics.MeasurementEnvelope{
73 "metric": {
74 DBName: db,
75 MetricName: "metric",
76 Data: metrics.Measurements{
77 {metrics.EpochColumnName: time.Now().UnixNano(), "value": int64(1)},
78 },
79 },
80 }
81 }
82 assert.Len(t, promw.Cache, 5)
83
84
85 ch := make(chan prometheus.Metric, 100)
86 promw.Collect(ch)
87
88
89 assert.Len(t, promw.Cache, 5, "cache should be preserved after Collect")
90
91
92 ch2 := make(chan prometheus.Metric, 100)
93 promw.Collect(ch2)
94 assert.Len(t, promw.Cache, 5, "cache should be preserved after second Collect")
95 }
96
97
98
99
100 func TestCollect_DeterministicLabelOrdering(t *testing.T) {
101 promw := newTestPrometheusWriter("test")
102 promw.gauges = map[string][]string{promInstanceUpStateMetric: {"*"}}
103
104
105
106 promw.Cache["db1"] = map[string]metrics.MeasurementEnvelope{
107 "metric1": {
108 DBName: "db1",
109 MetricName: promInstanceUpStateMetric,
110 Data: metrics.Measurements{
111 {
112 metrics.EpochColumnName: time.Now().UnixNano(),
113 "tag_host": "server1",
114 "tag_port": "5432",
115 "tag_region": "us-east-1",
116 "value": int64(42),
117 },
118 },
119 },
120 }
121
122
123
124 const metaMetrics = 3
125 for i := range 100 {
126 ch := make(chan prometheus.Metric, 100)
127 promw.Collect(ch)
128 close(ch)
129
130 var collected []prometheus.Metric
131 for m := range ch {
132 collected = append(collected, m)
133 }
134
135 assert.Len(t, collected, 1+metaMetrics, "iteration %d: expected 1 data + 3 meta metrics", i)
136 }
137 }
138
139
140
141 func TestCollect_DeduplicateMetrics(t *testing.T) {
142 promw := newTestPrometheusWriter("test")
143 promw.gauges = map[string][]string{"metric1": {"*"}}
144
145
146 promw.Cache["db1"] = map[string]metrics.MeasurementEnvelope{
147 "metric1": {
148 DBName: "db1",
149 MetricName: "metric1",
150 CustomTags: map[string]string{"sys_id": "42"},
151 Data: metrics.Measurements{
152 {
153 metrics.EpochColumnName: time.Now().UnixNano(),
154 "tag_host": "server1",
155 "value": int64(42),
156 "bool_val": false,
157 "extra_field1": "ignored",
158 },
159 {
160 metrics.EpochColumnName: time.Now().UnixNano(),
161 "tag_host": "server1",
162 "value": int64(99),
163 "bool_val": true,
164 "extra_field1": "ignored",
165 },
166 },
167 },
168 }
169
170 ch := make(chan prometheus.Metric, 100)
171 promw.Collect(ch)
172 close(ch)
173
174 var count int
175 for c := range ch {
176 t.Log(c.Desc())
177 count++
178 }
179
180
181 assert.Equal(t, 2+3, count, "duplicate metric identity should be deduplicated (1 data + 3 meta)")
182 }
183
184
185
186
187 func TestCollect_InvalidMetricDoesNotPanic(t *testing.T) {
188 promw := newTestPrometheusWriter("test")
189
190
191 assert.NotPanics(t, func() {
192 ch := make(chan prometheus.Metric, 100)
193 promw.Collect(ch)
194 })
195 }
196
197
198
199 func TestCollect_EmptyCache(t *testing.T) {
200 promw := newTestPrometheusWriter("test")
201
202 ch := make(chan prometheus.Metric, 100)
203 promw.Collect(ch)
204 close(ch)
205
206 var count int
207 for range ch {
208 count++
209 }
210
211 assert.Equal(t, 3, count, "empty cache should produce only 3 meta-metrics")
212 }
213
214
215
216 func TestCollect_StaleMetricsDropped(t *testing.T) {
217 promw := newTestPrometheusWriter("test")
218 promw.gauges = map[string][]string{"metric1": {"*"}}
219
220 staleEpoch := time.Now().Add(-promCacheTTL - time.Minute).UnixNano()
221 promw.Cache["db1"] = map[string]metrics.MeasurementEnvelope{
222 "metric1": {
223 DBName: "db1",
224 MetricName: "metric1",
225 Data: metrics.Measurements{
226 {
227 metrics.EpochColumnName: staleEpoch,
228 "value": int64(42),
229 },
230 },
231 },
232 }
233
234 ch := make(chan prometheus.Metric, 100)
235 promw.Collect(ch)
236 close(ch)
237
238 var count int
239 for range ch {
240 count++
241 }
242
243 assert.Equal(t, 3, count, "stale metrics should be dropped, only meta-metrics remain")
244 }
245
246 func TestPrometheusWriteEmpty(t *testing.T) {
247 promw := newTestPrometheusWriter("test")
248 assert.NoError(t, promw.Write(metrics.MeasurementEnvelope{}))
249 ch := make(chan prometheus.Metric, 100)
250 written, errCount := promw.WritePromMetrics(metrics.MeasurementEnvelope{}, ch)
251 assert.Zero(t, errCount)
252 assert.Zero(t, written)
253 close(ch)
254 }
255
256 func TestPrometheusWRiteUnsupportedMetric(t *testing.T) {
257 promw := newTestPrometheusWriter("test")
258 assert.NoError(t, promw.Write(metrics.MeasurementEnvelope{
259 DBName: "db1",
260 MetricName: "change_events",
261 Data: metrics.Measurements{
262 {metrics.EpochColumnName: time.Now().UnixNano(), "value": int64(100)},
263 },
264 }))
265 }
266