...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/rpc_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks_test
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"net"
     8  	"net/http"
     9  	"net/rpc"
    10  	"testing"
    11  
    12  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    13  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
    14  	"github.com/stretchr/testify/assert"
    15  )
    16  
    17  type Receiver struct {
    18  }
    19  
    20  var ctxt = context.Background()
    21  
    22  func (receiver *Receiver) UpdateMeasurements(msg *metrics.MeasurementEnvelope, logMsg *string) error {
    23  	if msg == nil {
    24  		return errors.New("msgs is nil")
    25  	}
    26  	if msg.DBName != "Db" {
    27  		return errors.New("invalid message")
    28  	}
    29  	*logMsg = fmt.Sprintf("Received: %+v", *msg)
    30  	return nil
    31  }
    32  
    33  func (receiver *Receiver) SyncMetric(syncReq *sinks.SyncReq, logMsg *string) error {
    34  	if syncReq == nil {
    35  		return errors.New("msgs is nil")
    36  	}
    37  	if syncReq.Operation == "invalid" {
    38  		return errors.New("invalid message")
    39  	}
    40  	*logMsg = fmt.Sprintf("Received: %+v", *syncReq)
    41  	return nil
    42  }
    43  
    44  func init() {
    45  	recv := new(Receiver)
    46  	if err := rpc.Register(recv); err != nil {
    47  		panic(err)
    48  	}
    49  	rpc.HandleHTTP()
    50  	if listener, err := net.Listen("tcp", "0.0.0.0:5050"); err == nil {
    51  		go func() {
    52  			_ = http.Serve(listener, nil)
    53  		}()
    54  	} else {
    55  		panic(err)
    56  	}
    57  }
    58  
    59  // Test begin from here ---------------------------------------------------------
    60  func TestNewRPCWriter(t *testing.T) {
    61  	a := assert.New(t)
    62  	_, err := sinks.NewRPCWriter(ctxt, "foo")
    63  	a.Error(err)
    64  }
    65  
    66  func TestRPCWrite(t *testing.T) {
    67  	a := assert.New(t)
    68  	rw, err := sinks.NewRPCWriter(ctxt, "0.0.0.0:5050")
    69  	a.NoError(err)
    70  
    71  	// no error for valid messages
    72  	msgs := []metrics.MeasurementEnvelope{
    73  		{
    74  			DBName: "Db",
    75  		},
    76  	}
    77  	err = rw.Write(msgs)
    78  	a.NoError(err)
    79  
    80  	// error for invalid messages
    81  	msgs = []metrics.MeasurementEnvelope{
    82  		{
    83  			DBName: "invalid",
    84  		},
    85  	}
    86  	err = rw.Write(msgs)
    87  	a.Error(err)
    88  
    89  	// no error for empty messages
    90  	err = rw.Write([]metrics.MeasurementEnvelope{})
    91  	a.NoError(err)
    92  
    93  	// error for cancelled context
    94  	ctx, cancel := context.WithCancel(ctxt)
    95  	rw, err = sinks.NewRPCWriter(ctx, "0.0.0.0:5050")
    96  	a.NoError(err)
    97  	cancel()
    98  	err = rw.Write(msgs)
    99  	a.Error(err)
   100  }
   101  
   102  func TestRPCSyncMetric(t *testing.T) {
   103  	port := 5050
   104  	a := assert.New(t)
   105  	rw, err := sinks.NewRPCWriter(ctxt, "0.0.0.0:"+fmt.Sprint(port))
   106  	if err != nil {
   107  		t.Error("Unable to send sync metric signal")
   108  	}
   109  
   110  	// no error for valid messages
   111  	err = rw.SyncMetric("Test-DB", "DB-Metric", "Add")
   112  	a.NoError(err)
   113  
   114  	// error for invalid messages
   115  	err = rw.SyncMetric("", "", "invalid")
   116  	a.Error(err)
   117  
   118  	// error for cancelled context
   119  	ctx, cancel := context.WithCancel(ctxt)
   120  	rw, err = sinks.NewRPCWriter(ctx, "0.0.0.0:5050")
   121  	a.NoError(err)
   122  	cancel()
   123  	err = rw.SyncMetric("Test-DB", "DB-Metric", "Add")
   124  	a.Error(err)
   125  }
   126