1 package sinks_test
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "testing"
8
9 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
10 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
11 "github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
12 "github.com/stretchr/testify/assert"
13 "google.golang.org/grpc/codes"
14 "google.golang.org/grpc/status"
15 )
16
17 var ctx = testutil.TestContext
18
19 func TestMain(m *testing.M) {
20
21 rpcTeardown, err := testutil.SetupRPCServers()
22 if err != nil {
23 rpcTeardown()
24 panic(err)
25 }
26
27
28 exitCode := m.Run()
29
30
31 rpcTeardown()
32 os.Exit(exitCode)
33 }
34
35
36
37 func TestCACertParamValidation(t *testing.T) {
38 a := assert.New(t)
39 _, err := sinks.NewRPCWriter(ctx, testutil.TLSConnStr)
40 a.NoError(err)
41
42 err = os.WriteFile("badca.crt", []byte(""), 0644)
43 a.NoError(err)
44 defer func() { _ = os.Remove("badca.crt") }()
45
46 BadRPCParams := map[string]string{
47 "?sslrootca=file.txt": "error loading CA file",
48 "?sslrootca=": "error loading CA file",
49 "?sslrootca=badca.crt": "invalid CA file",
50 }
51
52 for param, errMsg := range BadRPCParams {
53 _, err = sinks.NewRPCWriter(ctx, fmt.Sprintf("grpc://%s%s", testutil.TLSServerAddress, param))
54 a.ErrorContains(err, errMsg)
55 }
56 }
57
58 func TestRPCTLSWriter(t *testing.T) {
59 a := assert.New(t)
60
61 rw, err := sinks.NewRPCWriter(ctx, testutil.TLSConnStr)
62 a.NoError(err)
63
64
65 msgs := metrics.MeasurementEnvelope{
66 DBName: "Db",
67 Data: metrics.Measurements{{"test": 1}},
68 }
69 err = rw.Write(msgs)
70 a.NoError(err)
71 }
72
73 func TestRPCWrite(t *testing.T) {
74 a := assert.New(t)
75
76 rw, err := sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
77 a.NoError(err)
78
79
80 msgs := metrics.MeasurementEnvelope{
81 DBName: "Db",
82 Data: metrics.Measurements{{"test": 1}},
83 }
84 err = rw.Write(msgs)
85 a.NoError(err)
86
87
88 msgs.DBName = "invalid"
89 err = rw.Write(msgs)
90 a.ErrorIs(err, status.Error(codes.Unknown, "invalid message"))
91
92
93 err = rw.Write(metrics.MeasurementEnvelope{})
94 a.ErrorIs(err, status.Error(codes.Unknown, "empty message"))
95
96
97 ctx, cancel := context.WithCancel(ctx)
98 rw, err = sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
99 a.NoError(err)
100 cancel()
101 err = rw.Write(msgs)
102 a.Error(err)
103 }
104
105 func TestRPCSyncMetric(t *testing.T) {
106 a := assert.New(t)
107
108 rw, err := sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
109 a.NoError(err)
110
111
112 err = rw.SyncMetric("Test-DB", "DB-Metric", sinks.AddOp)
113 a.NoError(err)
114
115
116 err = rw.SyncMetric("", "", sinks.InvalidOp)
117 a.ErrorIs(err, status.Error(codes.Unknown, "invalid sync request"))
118
119
120 ctx, cancel := context.WithCancel(ctx)
121 rw, err = sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
122 a.NoError(err)
123 cancel()
124 err = rw.SyncMetric("Test-DB", "DB-Metric", sinks.AddOp)
125 a.Error(err)
126 }
127
128 func TestRPCDefineMetric(t *testing.T) {
129 a := assert.New(t)
130
131 rw, err := sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
132 a.NoError(err)
133
134
135 var writer sinks.Writer = rw
136 definer, ok := writer.(sinks.MetricsDefiner)
137 a.True(ok, "RPCWriter should implement MetricsDefiner interface")
138
139
140 testMetrics := &metrics.Metrics{
141 MetricDefs: metrics.MetricDefs{
142 "test_metric": metrics.Metric{
143 SQLs: metrics.SQLs{
144 11: "SELECT 1 as test_column",
145 12: "SELECT 2 as test_column",
146 },
147 Description: "Test metric",
148 Gauges: []string{"test_column"},
149 },
150 },
151 PresetDefs: metrics.PresetDefs{
152 "test_preset": metrics.Preset{
153 Description: "Test preset",
154 Metrics: map[string]float64{"test_metric": 30.0},
155 },
156 },
157 }
158
159 err = definer.DefineMetrics(testMetrics)
160 a.NoError(err)
161
162
163 emptyMetrics := &metrics.Metrics{
164 MetricDefs: make(metrics.MetricDefs),
165 PresetDefs: make(metrics.PresetDefs),
166 }
167
168 err = definer.DefineMetrics(emptyMetrics)
169 a.NoError(err)
170
171
172 ctx, cancel := context.WithCancel(ctx)
173 rw, err = sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
174 a.NoError(err)
175 cancel()
176
177 writer = rw
178 definer = writer.(sinks.MetricsDefiner)
179 err = definer.DefineMetrics(testMetrics)
180 a.Error(err)
181 }
182
183 func TestAuthCredsSending(t *testing.T) {
184 a := assert.New(t)
185
186 unauthenticatedConnStr := "grpc://notpgwatch:notpgwatch@localhost:6060"
187 rw, err := sinks.NewRPCWriter(ctx, unauthenticatedConnStr)
188 a.NoError(err)
189
190 err = rw.Write(metrics.MeasurementEnvelope{})
191 a.Equal(err, status.Error(codes.Unauthenticated, "unauthenticated"))
192 }
193