...
1 package sinks
2
3 import (
4 "context"
5 "testing"
6
7 migrator "github.com/cybertec-postgresql/pgx-migrator"
8 "github.com/jackc/pgx/v5"
9 "github.com/pashagolub/pgxmock/v4"
10 "github.com/stretchr/testify/assert"
11 )
12
13 func TestPostgresWriterMigrate(t *testing.T) {
14 oldInitMigrator := initMigrator
15 t.Cleanup(func() {
16 initMigrator = oldInitMigrator
17 })
18
19 a := assert.New(t)
20 conn, err := pgxmock.NewPool()
21 a.NoError(err)
22
23
24 initMigrator = func(_ *PostgresWriter) (*migrator.Migrator, error) {
25 return migrator.New(
26 migrator.TableName("admin.migration"),
27 migrator.Migrations(
28 &migrator.Migration{
29 Name: "Test migration 1",
30 Func: func(ctx context.Context, tx pgx.Tx) error {
31 _, err := tx.Query(ctx, "SELECT 1 AS col1")
32 return err
33 },
34 },
35 &migrator.Migration{
36 Name: "Test migration 2",
37 Func: func(ctx context.Context, tx pgx.Tx) error {
38 _, err := tx.Query(ctx, "SELECT 2 AS col2")
39 return err
40 },
41 },
42 ),
43 )
44 }
45
46 conn.ExpectExec(`CREATE TABLE IF NOT EXISTS admin\.migration`).WillReturnResult(pgxmock.NewResult("CREATE", 1))
47 conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(0))
48
49
50 conn.ExpectBegin()
51 conn.ExpectQuery(`SELECT 1 AS col1`).WillReturnRows(pgxmock.NewRows([]string{"col1"}).AddRow(1))
52 conn.ExpectExec(`INSERT INTO`).WillReturnResult(pgxmock.NewResult("INSERT", 1))
53 conn.ExpectCommit()
54
55
56 conn.ExpectBegin()
57 conn.ExpectQuery(`SELECT 2 AS col2`).WillReturnRows(pgxmock.NewRows([]string{"col2"}).AddRow(2))
58 conn.ExpectExec(`INSERT INTO`).WillReturnResult(pgxmock.NewResult("INSERT", 1))
59 conn.ExpectCommit()
60
61 pgw := &PostgresWriter{ctx: ctx, sinkDb: conn}
62 err = pgw.Migrate()
63 a.NoError(err)
64 a.NoError(conn.ExpectationsWereMet())
65 }
66
67 func TestPostgresWriterNeedsMigration(t *testing.T) {
68 a := assert.New(t)
69 conn, err := pgxmock.NewPool()
70 a.NoError(err)
71
72
73 conn.ExpectQuery(`SELECT to_regclass`).
74 WithArgs("admin.migration").
75 WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
76 conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(0))
77
78 pgw := &PostgresWriter{ctx: ctx, sinkDb: conn}
79 needs, err := pgw.NeedsMigration()
80 a.NoError(err)
81 a.True(needs)
82 a.NoError(conn.ExpectationsWereMet())
83 }
84
85 func TestPostgresWriterNeedsMigrationNoMigrationNeeded(t *testing.T) {
86 a := assert.New(t)
87 conn, err := pgxmock.NewPool()
88 a.NoError(err)
89
90
91 conn.ExpectQuery(`SELECT to_regclass`).
92 WithArgs("admin.migration").
93 WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
94 conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(2))
95
96 pgw := &PostgresWriter{ctx: ctx, sinkDb: conn}
97 needs, err := pgw.NeedsMigration()
98 a.NoError(err)
99 a.False(needs)
100 a.NoError(conn.ExpectationsWereMet())
101 }
102
103 func TestPostgresWriterMigrateFail(t *testing.T) {
104 oldInitMigrator := initMigrator
105 t.Cleanup(func() {
106 initMigrator = oldInitMigrator
107 })
108 a := assert.New(t)
109 pgw := &PostgresWriter{ctx: ctx}
110 initMigrator = func(*PostgresWriter) (*migrator.Migrator, error) {
111 return nil, assert.AnError
112 }
113 err := pgw.Migrate()
114 a.Error(err)
115 a.Contains(err.Error(), "cannot initialize migration")
116 }
117
118 func TestPostgresWriterNeedsMigrationFail(t *testing.T) {
119 oldInitMigrator := initMigrator
120 t.Cleanup(func() {
121 initMigrator = oldInitMigrator
122 })
123 a := assert.New(t)
124 pgw := &PostgresWriter{ctx: ctx}
125 initMigrator = func(*PostgresWriter) (*migrator.Migrator, error) {
126 return nil, assert.AnError
127 }
128 _, err := pgw.NeedsMigration()
129 a.Error(err)
130 }
131