...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/sinks/postgres_schema_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"testing"
     6  
     7  	migrator "github.com/cybertec-postgresql/pgx-migrator"
     8  	"github.com/jackc/pgx/v5"
     9  	"github.com/pashagolub/pgxmock/v4"
    10  	"github.com/stretchr/testify/assert"
    11  )
    12  
    13  func TestPostgresWriterMigrate(t *testing.T) {
    14  	oldInitMigrator := initMigrator
    15  	t.Cleanup(func() {
    16  		initMigrator = oldInitMigrator
    17  	})
    18  
    19  	a := assert.New(t)
    20  	conn, err := pgxmock.NewPool()
    21  	a.NoError(err)
    22  
    23  	// Mock the migrator to use simple migrations for testing
    24  	initMigrator = func(_ *PostgresWriter) (*migrator.Migrator, error) {
    25  		return migrator.New(
    26  			migrator.TableName("admin.migration"),
    27  			migrator.Migrations(
    28  				&migrator.Migration{
    29  					Name: "Test migration 1",
    30  					Func: func(ctx context.Context, tx pgx.Tx) error {
    31  						_, err := tx.Query(ctx, "SELECT 1 AS col1")
    32  						return err
    33  					},
    34  				},
    35  				&migrator.Migration{
    36  					Name: "Test migration 2",
    37  					Func: func(ctx context.Context, tx pgx.Tx) error {
    38  						_, err := tx.Query(ctx, "SELECT 2 AS col2")
    39  						return err
    40  					},
    41  				},
    42  			),
    43  		)
    44  	}
    45  
    46  	conn.ExpectExec(`CREATE TABLE IF NOT EXISTS admin\.migration`).WillReturnResult(pgxmock.NewResult("CREATE", 1))
    47  	conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(0))
    48  
    49  	// Expect transaction for migration 1 execution
    50  	conn.ExpectBegin()
    51  	conn.ExpectQuery(`SELECT 1 AS col1`).WillReturnRows(pgxmock.NewRows([]string{"col1"}).AddRow(1))
    52  	conn.ExpectExec(`INSERT INTO`).WillReturnResult(pgxmock.NewResult("INSERT", 1))
    53  	conn.ExpectCommit()
    54  
    55  	// Expect transaction for migration 2 execution
    56  	conn.ExpectBegin()
    57  	conn.ExpectQuery(`SELECT 2 AS col2`).WillReturnRows(pgxmock.NewRows([]string{"col2"}).AddRow(2))
    58  	conn.ExpectExec(`INSERT INTO`).WillReturnResult(pgxmock.NewResult("INSERT", 1))
    59  	conn.ExpectCommit()
    60  
    61  	pgw := &PostgresWriter{ctx: ctx, sinkDb: conn}
    62  	err = pgw.Migrate()
    63  	a.NoError(err)
    64  	a.NoError(conn.ExpectationsWereMet())
    65  }
    66  
    67  func TestPostgresWriterNeedsMigration(t *testing.T) {
    68  	a := assert.New(t)
    69  	conn, err := pgxmock.NewPool()
    70  	a.NoError(err)
    71  
    72  	// Expect checks for migration table existence and pending migrations
    73  	conn.ExpectQuery(`SELECT to_regclass`).
    74  		WithArgs("admin.migration").
    75  		WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
    76  	conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(0))
    77  
    78  	pgw := &PostgresWriter{ctx: ctx, sinkDb: conn}
    79  	needs, err := pgw.NeedsMigration()
    80  	a.NoError(err)
    81  	a.True(needs)
    82  	a.NoError(conn.ExpectationsWereMet())
    83  }
    84  
    85  func TestPostgresWriterNeedsMigrationNoMigrationNeeded(t *testing.T) {
    86  	a := assert.New(t)
    87  	conn, err := pgxmock.NewPool()
    88  	a.NoError(err)
    89  
    90  	// Expect checks for migration table existence and no pending migrations
    91  	conn.ExpectQuery(`SELECT to_regclass`).
    92  		WithArgs("admin.migration").
    93  		WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
    94  	conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(2))
    95  
    96  	pgw := &PostgresWriter{ctx: ctx, sinkDb: conn}
    97  	needs, err := pgw.NeedsMigration()
    98  	a.NoError(err)
    99  	a.False(needs)
   100  	a.NoError(conn.ExpectationsWereMet())
   101  }
   102  
   103  func TestPostgresWriterMigrateFail(t *testing.T) {
   104  	oldInitMigrator := initMigrator
   105  	t.Cleanup(func() {
   106  		initMigrator = oldInitMigrator
   107  	})
   108  	a := assert.New(t)
   109  	pgw := &PostgresWriter{ctx: ctx}
   110  	initMigrator = func(*PostgresWriter) (*migrator.Migrator, error) {
   111  		return nil, assert.AnError
   112  	}
   113  	err := pgw.Migrate()
   114  	a.Error(err)
   115  	a.Contains(err.Error(), "cannot initialize migration")
   116  }
   117  
   118  func TestPostgresWriterNeedsMigrationFail(t *testing.T) {
   119  	oldInitMigrator := initMigrator
   120  	t.Cleanup(func() {
   121  		initMigrator = oldInitMigrator
   122  	})
   123  	a := assert.New(t)
   124  	pgw := &PostgresWriter{ctx: ctx}
   125  	initMigrator = func(*PostgresWriter) (*migrator.Migrator, error) {
   126  		return nil, assert.AnError
   127  	}
   128  	_, err := pgw.NeedsMigration()
   129  	a.Error(err)
   130  }
   131