1 package sinks 2 3 import ( 4 "context" 5 "errors" 6 "testing" 7 "time" 8 9 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics" 10 "github.com/pashagolub/pgxmock/v4" 11 "github.com/stretchr/testify/assert" 12 ) 13 14 var ctx = context.Background() 15 16 func TestReadMetricSchemaType(t *testing.T) { 17 conn, err := pgxmock.NewPool() 18 assert.NoError(t, err) 19 20 pgw := PostgresWriter{ 21 ctx: ctx, 22 sinkDb: conn, 23 } 24 25 conn.ExpectQuery("SELECT schema_type"). 26 WillReturnError(errors.New("expected")) 27 assert.Error(t, pgw.ReadMetricSchemaType()) 28 29 conn.ExpectQuery("SELECT schema_type"). 30 WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) 31 assert.NoError(t, pgw.ReadMetricSchemaType()) 32 assert.Equal(t, DbStorageSchemaTimescale, pgw.metricSchema) 33 } 34 35 func TestNewWriterFromPostgresConn(t *testing.T) { 36 conn, err := pgxmock.NewPool() 37 assert.NoError(t, err) 38 39 conn.ExpectPing() 40 conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) 41 conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) 42 for _, m := range metrics.GetDefaultBuiltInMetrics() { 43 conn.ExpectExec("select admin.ensure_dummy_metrics_table").WithArgs(m).WillReturnResult(pgxmock.NewResult("EXECUTE", 1)) 44 } 45 46 opts := &CmdOpts{BatchingDelay: time.Hour, Retention: 356} 47 pgw, err := NewWriterFromPostgresConn(ctx, conn, opts, metrics.GetDefaultMetrics()) 48 assert.NoError(t, err) 49 assert.NotNil(t, pgw) 50 51 assert.NoError(t, conn.ExpectationsWereMet()) 52 } 53 54 func TestSyncMetric(t *testing.T) { 55 conn, err := pgxmock.NewPool() 56 assert.NoError(t, err) 57 pgw := PostgresWriter{ 58 ctx: ctx, 59 sinkDb: conn, 60 } 61 dbUnique := "mydb" 62 metricName := "mymetric" 63 op := "add" 64 conn.ExpectExec("insert into admin\\.all_distinct_dbname_metrics").WithArgs(dbUnique, metricName).WillReturnResult(pgxmock.NewResult("EXECUTE", 1)) 65 conn.ExpectExec("select admin\\.ensure_dummy_metrics_table").WithArgs(metricName).WillReturnResult(pgxmock.NewResult("EXECUTE", 1)) 66 err = pgw.SyncMetric(dbUnique, metricName, op) 67 assert.NoError(t, err) 68 assert.NoError(t, conn.ExpectationsWereMet()) 69 70 op = "foo" 71 err = pgw.SyncMetric(dbUnique, metricName, op) 72 assert.NoError(t, err, "ignore unknown operation") 73 } 74 75 func TestWrite(t *testing.T) { 76 conn, err := pgxmock.NewPool() 77 assert.NoError(t, err) 78 ctx, cancel := context.WithCancel(ctx) 79 pgw := PostgresWriter{ 80 ctx: ctx, 81 sinkDb: conn, 82 } 83 messages := []metrics.MeasurementEnvelope{ 84 { 85 MetricName: "test_metric", 86 Data: metrics.Measurements{ 87 {"number": 1, "string": "test_data"}, 88 }, 89 DBName: "test_db", 90 CustomTags: map[string]string{"foo": "boo"}, 91 }, 92 } 93 94 highLoadTimeout = 0 95 err = pgw.Write(messages) 96 assert.NoError(t, err, "messages skipped due to high load") 97 98 highLoadTimeout = time.Second * 5 99 pgw.input = make(chan []metrics.MeasurementEnvelope, cacheLimit) 100 err = pgw.Write(messages) 101 assert.NoError(t, err, "write successful") 102 103 cancel() 104 err = pgw.Write(messages) 105 assert.Error(t, err, "context canceled") 106 } 107 108 func TestPostgresWriter_EnsureMetricTime(t *testing.T) { 109 conn, err := pgxmock.NewPool() 110 assert.NoError(t, err) 111 pgw := PostgresWriter{ 112 ctx: ctx, 113 sinkDb: conn, 114 } 115 116 TestPartBounds := map[string]ExistingPartitionInfo{"test_metric_realtime": {time.Now(), time.Now()}} 117 conn.ExpectQuery(`select part_available_from, part_available_to`). 118 WithArgs("test_metric_realtime", TestPartBounds["test_metric_realtime"].StartTime). 119 WillReturnRows(pgxmock.NewRows([]string{"part_available_from", "part_available_to"}). 120 AddRow(time.Now(), time.Now())) 121 122 conn.ExpectQuery(`select part_available_from, part_available_to`). 123 WithArgs("test_metric_realtime", TestPartBounds["test_metric_realtime"].EndTime). 124 WillReturnRows(pgxmock.NewRows([]string{"part_available_from", "part_available_to"}). 125 AddRow(time.Now(), time.Now())) 126 127 err = pgw.EnsureMetricTime(TestPartBounds, true) 128 assert.NoError(t, err) 129 assert.NoError(t, conn.ExpectationsWereMet()) 130 } 131