...
1 package sinks
2
3 import (
4 "context"
5 "net/rpc"
6
7 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
8 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
9 )
10
11
12
13
14
15 type RPCWriter struct {
16 ctx context.Context
17 address string
18 client *rpc.Client
19 }
20
21 func NewRPCWriter(ctx context.Context, address string) (*RPCWriter, error) {
22 client, err := rpc.DialHTTP("tcp", address)
23 if err != nil {
24 return nil, err
25 }
26 l := log.GetLogger(ctx).WithField("sink", "rpc").WithField("address", address)
27 ctx = log.WithLogger(ctx, l)
28 rw := &RPCWriter{
29 ctx: ctx,
30 address: address,
31 client: client,
32 }
33 go rw.watchCtx()
34 return rw, nil
35 }
36
37
38 func (rw *RPCWriter) Write(msgs []metrics.MeasurementEnvelope) error {
39 if rw.ctx.Err() != nil {
40 return rw.ctx.Err()
41 }
42 if len(msgs) == 0 {
43 return nil
44 }
45 for _, msg := range msgs {
46 var logMsg string
47 if err := rw.client.Call("Receiver.UpdateMeasurements", &msg, &logMsg); err != nil {
48 return err
49 }
50 if len(logMsg) > 0 {
51 log.GetLogger(rw.ctx).Info(logMsg)
52 }
53 }
54 return nil
55 }
56
57 type SyncReq struct {
58 DbName string
59 MetricName string
60 Operation string
61 }
62
63 func (rw *RPCWriter) SyncMetric(dbUnique string, metricName string, op string) error {
64 var logMsg string
65 if err := rw.client.Call("Receiver.SyncMetric", &SyncReq{
66 Operation: op,
67 DbName: dbUnique,
68 MetricName: metricName,
69 }, &logMsg); err != nil {
70 return err
71 }
72 if len(logMsg) > 0 {
73 log.GetLogger(rw.ctx).Info(logMsg)
74 }
75 return nil
76 }
77
78 func (rw *RPCWriter) watchCtx() {
79 <-rw.ctx.Done()
80 rw.client.Close()
81 }
82