...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/sinks/rpc_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/sinks

     1  package sinks_test
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"os"
     7  	"testing"
     8  
     9  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    10  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
    11  	"github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
    12  	"github.com/stretchr/testify/assert"
    13  	"google.golang.org/grpc/codes"
    14  	"google.golang.org/grpc/status"
    15  )
    16  
    17  var ctx = testutil.TestContext
    18  
    19  func TestMain(m *testing.M) {
    20  	// Setup
    21  	rpcTeardown, err := testutil.SetupRPCServers()
    22  	if err != nil {
    23  		rpcTeardown()
    24  		panic(err)
    25  	}
    26  
    27  	// Execute all tests
    28  	exitCode := m.Run()
    29  
    30  	// Teardown
    31  	rpcTeardown()
    32  	os.Exit(exitCode)
    33  }
    34  
    35  // Tests begin from here ---------------------------------------------------------
    36  
    37  func TestCACertParamValidation(t *testing.T) {
    38  	a := assert.New(t)
    39  	_, err := sinks.NewRPCWriter(ctx, testutil.TLSConnStr)
    40  	a.NoError(err)
    41  
    42  	err = os.WriteFile("badca.crt", []byte(""), 0644)
    43  	a.NoError(err)
    44  	defer func() { _ = os.Remove("badca.crt") }()
    45  
    46  	BadRPCParams := map[string]string{
    47  		"?sslrootca=file.txt":  "error loading CA file",
    48  		"?sslrootca=":          "error loading CA file",
    49  		"?sslrootca=badca.crt": "invalid CA file",
    50  	}
    51  
    52  	for param, errMsg := range BadRPCParams {
    53  		_, err = sinks.NewRPCWriter(ctx, fmt.Sprintf("grpc://%s%s", testutil.TLSServerAddress, param))
    54  		a.ErrorContains(err, errMsg)
    55  	}
    56  }
    57  
    58  func TestRPCTLSWriter(t *testing.T) {
    59  	a := assert.New(t)
    60  
    61  	rw, err := sinks.NewRPCWriter(ctx, testutil.TLSConnStr)
    62  	a.NoError(err)
    63  
    64  	// no error for valid messages
    65  	msgs := metrics.MeasurementEnvelope{
    66  		DBName: "Db",
    67  		Data:   metrics.Measurements{{"test": 1}},
    68  	}
    69  	err = rw.Write(msgs)
    70  	a.NoError(err)
    71  }
    72  
    73  func TestRPCWrite(t *testing.T) {
    74  	a := assert.New(t)
    75  
    76  	rw, err := sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
    77  	a.NoError(err)
    78  
    79  	// no error for valid messages
    80  	msgs := metrics.MeasurementEnvelope{
    81  		DBName: "Db",
    82  		Data:   metrics.Measurements{{"test": 1}},
    83  	}
    84  	err = rw.Write(msgs)
    85  	a.NoError(err)
    86  
    87  	// error for invalid messages
    88  	msgs.DBName = "invalid"
    89  	err = rw.Write(msgs)
    90  	a.ErrorIs(err, status.Error(codes.Unknown, "invalid message"))
    91  
    92  	// error for empty messages
    93  	err = rw.Write(metrics.MeasurementEnvelope{})
    94  	a.ErrorIs(err, status.Error(codes.Unknown, "empty message"))
    95  
    96  	// error for cancelled context
    97  	ctx, cancel := context.WithCancel(ctx)
    98  	rw, err = sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
    99  	a.NoError(err)
   100  	cancel()
   101  	err = rw.Write(msgs)
   102  	a.Error(err)
   103  }
   104  
   105  func TestRPCSyncMetric(t *testing.T) {
   106  	a := assert.New(t)
   107  
   108  	rw, err := sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
   109  	a.NoError(err)
   110  
   111  	// no error for valid Sync requests
   112  	err = rw.SyncMetric("Test-DB", "DB-Metric", sinks.AddOp)
   113  	a.NoError(err)
   114  
   115  	// error for invalid Sync requests
   116  	err = rw.SyncMetric("", "", sinks.InvalidOp)
   117  	a.ErrorIs(err, status.Error(codes.Unknown, "invalid sync request"))
   118  
   119  	// error for cancelled context
   120  	ctx, cancel := context.WithCancel(ctx)
   121  	rw, err = sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
   122  	a.NoError(err)
   123  	cancel()
   124  	err = rw.SyncMetric("Test-DB", "DB-Metric", sinks.AddOp)
   125  	a.Error(err)
   126  }
   127  
   128  func TestRPCDefineMetric(t *testing.T) {
   129  	a := assert.New(t)
   130  
   131  	rw, err := sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
   132  	a.NoError(err)
   133  
   134  	// Test that RPCWriter implements MetricsDefiner interface
   135  	var writer sinks.Writer = rw
   136  	definer, ok := writer.(sinks.MetricsDefiner)
   137  	a.True(ok, "RPCWriter should implement MetricsDefiner interface")
   138  
   139  	// Test with valid metrics
   140  	testMetrics := &metrics.Metrics{
   141  		MetricDefs: metrics.MetricDefs{
   142  			"test_metric": metrics.Metric{
   143  				SQLs: metrics.SQLs{
   144  					11: "SELECT 1 as test_column",
   145  					12: "SELECT 2 as test_column",
   146  				},
   147  				Description: "Test metric",
   148  				Gauges:      []string{"test_column"},
   149  			},
   150  		},
   151  		PresetDefs: metrics.PresetDefs{
   152  			"test_preset": metrics.Preset{
   153  				Description: "Test preset",
   154  				Metrics:     map[string]float64{"test_metric": 30.0},
   155  			},
   156  		},
   157  	}
   158  
   159  	err = definer.DefineMetrics(testMetrics)
   160  	a.NoError(err)
   161  
   162  	// Test with empty metrics (should still work)
   163  	emptyMetrics := &metrics.Metrics{
   164  		MetricDefs: make(metrics.MetricDefs),
   165  		PresetDefs: make(metrics.PresetDefs),
   166  	}
   167  
   168  	err = definer.DefineMetrics(emptyMetrics)
   169  	a.NoError(err)
   170  
   171  	// Test with cancelled context
   172  	ctx, cancel := context.WithCancel(ctx)
   173  	rw, err = sinks.NewRPCWriter(ctx, testutil.PlainConnStr)
   174  	a.NoError(err)
   175  	cancel()
   176  
   177  	writer = rw
   178  	definer = writer.(sinks.MetricsDefiner)
   179  	err = definer.DefineMetrics(testMetrics)
   180  	a.Error(err)
   181  }
   182  
   183  func TestAuthCredsSending(t *testing.T) {
   184  	a := assert.New(t)
   185  
   186  	unauthenticatedConnStr := "grpc://notpgwatch:notpgwatch@localhost:6060"
   187  	rw, err := sinks.NewRPCWriter(ctx, unauthenticatedConnStr)
   188  	a.NoError(err)
   189  
   190  	err = rw.Write(metrics.MeasurementEnvelope{})
   191  	a.Equal(err, status.Error(codes.Unauthenticated, "unauthenticated"))
   192  }
   193