...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/postgres_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"testing"
     7  	"time"
     8  
     9  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    10  	"github.com/pashagolub/pgxmock/v4"
    11  	"github.com/stretchr/testify/assert"
    12  )
    13  
    14  var ctx = context.Background()
    15  
    16  func TestReadMetricSchemaType(t *testing.T) {
    17  	conn, err := pgxmock.NewPool()
    18  	assert.NoError(t, err)
    19  
    20  	pgw := PostgresWriter{
    21  		ctx:    ctx,
    22  		sinkDb: conn,
    23  	}
    24  
    25  	conn.ExpectQuery("SELECT schema_type").
    26  		WillReturnError(errors.New("expected"))
    27  	assert.Error(t, pgw.ReadMetricSchemaType())
    28  
    29  	conn.ExpectQuery("SELECT schema_type").
    30  		WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
    31  	assert.NoError(t, pgw.ReadMetricSchemaType())
    32  	assert.Equal(t, DbStorageSchemaTimescale, pgw.metricSchema)
    33  }
    34  
    35  func TestNewWriterFromPostgresConn(t *testing.T) {
    36  	conn, err := pgxmock.NewPool()
    37  	assert.NoError(t, err)
    38  
    39  	conn.ExpectPing()
    40  	conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
    41  	conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
    42  	for _, m := range metrics.GetDefaultBuiltInMetrics() {
    43  		conn.ExpectExec("select admin.ensure_dummy_metrics_table").WithArgs(m).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
    44  	}
    45  
    46  	opts := &CmdOpts{BatchingDelay: time.Hour, Retention: 356}
    47  	pgw, err := NewWriterFromPostgresConn(ctx, conn, opts, metrics.GetDefaultMetrics())
    48  	assert.NoError(t, err)
    49  	assert.NotNil(t, pgw)
    50  
    51  	assert.NoError(t, conn.ExpectationsWereMet())
    52  }
    53  
    54  func TestSyncMetric(t *testing.T) {
    55  	conn, err := pgxmock.NewPool()
    56  	assert.NoError(t, err)
    57  	pgw := PostgresWriter{
    58  		ctx:    ctx,
    59  		sinkDb: conn,
    60  	}
    61  	dbUnique := "mydb"
    62  	metricName := "mymetric"
    63  	op := "add"
    64  	conn.ExpectExec("insert into admin\\.all_distinct_dbname_metrics").WithArgs(dbUnique, metricName).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
    65  	conn.ExpectExec("select admin\\.ensure_dummy_metrics_table").WithArgs(metricName).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
    66  	err = pgw.SyncMetric(dbUnique, metricName, op)
    67  	assert.NoError(t, err)
    68  	assert.NoError(t, conn.ExpectationsWereMet())
    69  
    70  	op = "foo"
    71  	err = pgw.SyncMetric(dbUnique, metricName, op)
    72  	assert.NoError(t, err, "ignore unknown operation")
    73  }
    74  
    75  func TestWrite(t *testing.T) {
    76  	conn, err := pgxmock.NewPool()
    77  	assert.NoError(t, err)
    78  	ctx, cancel := context.WithCancel(ctx)
    79  	pgw := PostgresWriter{
    80  		ctx:    ctx,
    81  		sinkDb: conn,
    82  	}
    83  	messages := []metrics.MeasurementEnvelope{
    84  		{
    85  			MetricName: "test_metric",
    86  			Data: metrics.Measurements{
    87  				{"number": 1, "string": "test_data"},
    88  			},
    89  			DBName:     "test_db",
    90  			CustomTags: map[string]string{"foo": "boo"},
    91  		},
    92  	}
    93  
    94  	highLoadTimeout = 0
    95  	err = pgw.Write(messages)
    96  	assert.NoError(t, err, "messages skipped due to high load")
    97  
    98  	highLoadTimeout = time.Second * 5
    99  	pgw.input = make(chan []metrics.MeasurementEnvelope, cacheLimit)
   100  	err = pgw.Write(messages)
   101  	assert.NoError(t, err, "write successful")
   102  
   103  	cancel()
   104  	err = pgw.Write(messages)
   105  	assert.Error(t, err, "context canceled")
   106  }
   107  
   108  func TestPostgresWriter_EnsureMetricTime(t *testing.T) {
   109  	conn, err := pgxmock.NewPool()
   110  	assert.NoError(t, err)
   111  	pgw := PostgresWriter{
   112  		ctx:    ctx,
   113  		sinkDb: conn,
   114  	}
   115  
   116  	TestPartBounds := map[string]ExistingPartitionInfo{"test_metric_realtime": {time.Now(), time.Now()}}
   117  	conn.ExpectQuery(`select part_available_from, part_available_to`).
   118  		WithArgs("test_metric_realtime", TestPartBounds["test_metric_realtime"].StartTime).
   119  		WillReturnRows(pgxmock.NewRows([]string{"part_available_from", "part_available_to"}).
   120  			AddRow(time.Now(), time.Now()))
   121  
   122  	conn.ExpectQuery(`select part_available_from, part_available_to`).
   123  		WithArgs("test_metric_realtime", TestPartBounds["test_metric_realtime"].EndTime).
   124  		WillReturnRows(pgxmock.NewRows([]string{"part_available_from", "part_available_to"}).
   125  			AddRow(time.Now(), time.Now()))
   126  
   127  	err = pgw.EnsureMetricTime(TestPartBounds, true)
   128  	assert.NoError(t, err)
   129  	assert.NoError(t, conn.ExpectationsWereMet())
   130  }
   131