...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/rpc.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"net/rpc"
     6  
     7  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
     8  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
     9  )
    10  
    11  // RPCWriter is a sink that sends metric measurements to a remote server using the RPC protocol.
    12  // Remote server should implement the Receiver interface. It's up to the implementer to define the
    13  // behavior of the server. It can be a simple logger, external storage, alerting system,
    14  // or an analytics system.
    15  type RPCWriter struct {
    16  	ctx     context.Context
    17  	address string
    18  	client  *rpc.Client
    19  }
    20  
    21  func NewRPCWriter(ctx context.Context, address string) (*RPCWriter, error) {
    22  	client, err := rpc.DialHTTP("tcp", address)
    23  	if err != nil {
    24  		return nil, err
    25  	}
    26  	l := log.GetLogger(ctx).WithField("sink", "rpc").WithField("address", address)
    27  	ctx = log.WithLogger(ctx, l)
    28  	rw := &RPCWriter{
    29  		ctx:     ctx,
    30  		address: address,
    31  		client:  client,
    32  	}
    33  	go rw.watchCtx()
    34  	return rw, nil
    35  }
    36  
    37  // Sends Measurement Message to RPC Sink
    38  func (rw *RPCWriter) Write(msgs []metrics.MeasurementEnvelope) error {
    39  	if rw.ctx.Err() != nil {
    40  		return rw.ctx.Err()
    41  	}
    42  	if len(msgs) == 0 {
    43  		return nil
    44  	}
    45  	for _, msg := range msgs {
    46  		var logMsg string
    47  		if err := rw.client.Call("Receiver.UpdateMeasurements", &msg, &logMsg); err != nil {
    48  			return err
    49  		}
    50  		if len(logMsg) > 0 {
    51  			log.GetLogger(rw.ctx).Info(logMsg)
    52  		}
    53  	}
    54  	return nil
    55  }
    56  
    57  type SyncReq struct {
    58  	DbName     string
    59  	MetricName string
    60  	Operation  string
    61  }
    62  
    63  func (rw *RPCWriter) SyncMetric(dbUnique string, metricName string, op string) error {
    64  	var logMsg string
    65  	if err := rw.client.Call("Receiver.SyncMetric", &SyncReq{
    66  		Operation:  op,
    67  		DbName:     dbUnique,
    68  		MetricName: metricName,
    69  	}, &logMsg); err != nil {
    70  		return err
    71  	}
    72  	if len(logMsg) > 0 {
    73  		log.GetLogger(rw.ctx).Info(logMsg)
    74  	}
    75  	return nil
    76  }
    77  
    78  func (rw *RPCWriter) watchCtx() {
    79  	<-rw.ctx.Done()
    80  	rw.client.Close()
    81  }
    82