...
1 package sinks_test
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "net"
8 "net/http"
9 "net/rpc"
10 "testing"
11
12 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
13 "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
14 "github.com/stretchr/testify/assert"
15 )
16
17 type Receiver struct {
18 }
19
20 var ctxt = context.Background()
21
22 func (receiver *Receiver) UpdateMeasurements(msg *metrics.MeasurementEnvelope, logMsg *string) error {
23 if msg == nil {
24 return errors.New("msgs is nil")
25 }
26 if msg.DBName != "Db" {
27 return errors.New("invalid message")
28 }
29 *logMsg = fmt.Sprintf("Received: %+v", *msg)
30 return nil
31 }
32
33 func (receiver *Receiver) SyncMetric(syncReq *sinks.SyncReq, logMsg *string) error {
34 if syncReq == nil {
35 return errors.New("msgs is nil")
36 }
37 if syncReq.Operation == "invalid" {
38 return errors.New("invalid message")
39 }
40 *logMsg = fmt.Sprintf("Received: %+v", *syncReq)
41 return nil
42 }
43
44 func init() {
45 recv := new(Receiver)
46 if err := rpc.Register(recv); err != nil {
47 panic(err)
48 }
49 rpc.HandleHTTP()
50 if listener, err := net.Listen("tcp", "0.0.0.0:5050"); err == nil {
51 go func() {
52 _ = http.Serve(listener, nil)
53 }()
54 } else {
55 panic(err)
56 }
57 }
58
59
60 func TestNewRPCWriter(t *testing.T) {
61 a := assert.New(t)
62 _, err := sinks.NewRPCWriter(ctxt, "foo")
63 a.Error(err)
64 }
65
66 func TestRPCWrite(t *testing.T) {
67 a := assert.New(t)
68 rw, err := sinks.NewRPCWriter(ctxt, "0.0.0.0:5050")
69 a.NoError(err)
70
71
72 msgs := []metrics.MeasurementEnvelope{
73 {
74 DBName: "Db",
75 },
76 }
77 err = rw.Write(msgs)
78 a.NoError(err)
79
80
81 msgs = []metrics.MeasurementEnvelope{
82 {
83 DBName: "invalid",
84 },
85 }
86 err = rw.Write(msgs)
87 a.Error(err)
88
89
90 err = rw.Write([]metrics.MeasurementEnvelope{})
91 a.NoError(err)
92
93
94 ctx, cancel := context.WithCancel(ctxt)
95 rw, err = sinks.NewRPCWriter(ctx, "0.0.0.0:5050")
96 a.NoError(err)
97 cancel()
98 err = rw.Write(msgs)
99 a.Error(err)
100 }
101
102 func TestRPCSyncMetric(t *testing.T) {
103 port := 5050
104 a := assert.New(t)
105 rw, err := sinks.NewRPCWriter(ctxt, "0.0.0.0:"+fmt.Sprint(port))
106 if err != nil {
107 t.Error("Unable to send sync metric signal")
108 }
109
110
111 err = rw.SyncMetric("Test-DB", "DB-Metric", "Add")
112 a.NoError(err)
113
114
115 err = rw.SyncMetric("", "", "invalid")
116 a.Error(err)
117
118
119 ctx, cancel := context.WithCancel(ctxt)
120 rw, err = sinks.NewRPCWriter(ctx, "0.0.0.0:5050")
121 a.NoError(err)
122 cancel()
123 err = rw.SyncMetric("Test-DB", "DB-Metric", "Add")
124 a.Error(err)
125 }
126