...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sources/postgres.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sources

     1  package sources
     2  
     3  // This file contains the implementation of the ReaderWriter interface for the PostgreSQL database.
     4  // Monitored sources are stored in the `pgwatch.source` table in the configuration database.
     5  
     6  import (
     7  	"context"
     8  	"errors"
     9  
    10  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    11  	pgx "github.com/jackc/pgx/v5"
    12  	"github.com/jackc/pgx/v5/pgconn"
    13  )
    14  
    15  func NewPostgresSourcesReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error) {
    16  	conn, err := db.New(ctx, connstr)
    17  	if err != nil {
    18  		return nil, err
    19  	}
    20  	return NewPostgresSourcesReaderWriterConn(ctx, conn)
    21  }
    22  
    23  func NewPostgresSourcesReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error) {
    24  	return &dbSourcesReaderWriter{
    25  		ctx:      ctx,
    26  		configDb: conn,
    27  	}, conn.Ping(ctx)
    28  
    29  }
    30  
    31  type dbSourcesReaderWriter struct {
    32  	ctx      context.Context
    33  	configDb db.PgxIface
    34  }
    35  
    36  func (r *dbSourcesReaderWriter) WriteSources(dbs Sources) error {
    37  	tx, err := r.configDb.Begin(context.Background())
    38  	if err != nil {
    39  		return err
    40  	}
    41  	if _, err = tx.Exec(context.Background(), `truncate pgwatch.source`); err != nil {
    42  		return err
    43  	}
    44  	defer func() { _ = tx.Rollback(context.Background()) }()
    45  	for _, md := range dbs {
    46  		if err = r.updateSource(tx, md); err != nil {
    47  			return err
    48  		}
    49  	}
    50  	return tx.Commit(context.Background())
    51  }
    52  
    53  func (r *dbSourcesReaderWriter) updateSource(conn db.PgxIface, md Source) (err error) {
    54  	m := db.MarshallParamToJSONB
    55  	sql := `insert into pgwatch.source(
    56  	name, 
    57  	"group", 
    58  	dbtype, 
    59  	connstr, 
    60  	config, 
    61  	config_standby, 
    62  	preset_config, 
    63  	preset_config_standby, 
    64  	include_pattern, 
    65  	exclude_pattern, 
    66  	custom_tags, 
    67  	only_if_master,
    68  	is_enabled) 
    69  values 
    70  	($1, $2, $3, $4, $5, $6, NULLIF($7, ''), NULLIF($8, ''), $9, $10, $11, $12, $13)
    71  on conflict (name) do update set
    72  	"group" = $2, 
    73  	dbtype = $3, 
    74  	connstr = $4, 
    75  	config = $5, 
    76  	config_standby = $6, 
    77  	preset_config = NULLIF($7, ''),
    78  	preset_config_standby = NULLIF($8, ''), 
    79  	include_pattern = $9, 
    80  	exclude_pattern = $10, 
    81  	custom_tags = $11, 
    82  	only_if_master = $12,
    83  	is_enabled = $13`
    84  	_, err = conn.Exec(context.Background(), sql,
    85  		md.Name, md.Group, md.Kind,
    86  		md.ConnStr, m(md.Metrics), m(md.MetricsStandby), md.PresetMetrics, md.PresetMetricsStandby,
    87  		md.IncludePattern, md.ExcludePattern, m(md.CustomTags),
    88  		md.OnlyIfMaster, md.IsEnabled)
    89  	return err
    90  }
    91  
    92  func (r *dbSourcesReaderWriter) createSource(conn db.PgxIface, md Source) (err error) {
    93  	m := db.MarshallParamToJSONB
    94  	sql := `insert into pgwatch.source(
    95  	name, 
    96  	"group", 
    97  	dbtype, 
    98  	connstr, 
    99  	config, 
   100  	config_standby, 
   101  	preset_config, 
   102  	preset_config_standby, 
   103  	include_pattern, 
   104  	exclude_pattern, 
   105  	custom_tags, 
   106  	only_if_master,
   107  	is_enabled) 
   108  values 
   109  	($1, $2, $3, $4, $5, $6, NULLIF($7, ''), NULLIF($8, ''), $9, $10, $11, $12, $13)`
   110  	_, err = conn.Exec(context.Background(), sql,
   111  		md.Name, md.Group, md.Kind,
   112  		md.ConnStr, m(md.Metrics), m(md.MetricsStandby), md.PresetMetrics, md.PresetMetricsStandby,
   113  		md.IncludePattern, md.ExcludePattern, m(md.CustomTags),
   114  		md.OnlyIfMaster, md.IsEnabled)
   115  	if err != nil {
   116  		// Check for unique constraint violation using PostgreSQL error code
   117  		var pgErr *pgconn.PgError
   118  		if errors.As(err, &pgErr) && pgErr.SQLState() == "23505" {
   119  			return ErrSourceExists
   120  		}
   121  	}
   122  	return err
   123  }
   124  
   125  func (r *dbSourcesReaderWriter) UpdateSource(md Source) error {
   126  	return r.updateSource(r.configDb, md)
   127  }
   128  
   129  func (r *dbSourcesReaderWriter) CreateSource(md Source) error {
   130  	return r.createSource(r.configDb, md)
   131  }
   132  
   133  func (r *dbSourcesReaderWriter) DeleteSource(name string) error {
   134  	_, err := r.configDb.Exec(context.Background(), `delete from pgwatch.source where name = $1`, name)
   135  	return err
   136  }
   137  
   138  func (r *dbSourcesReaderWriter) GetSources() (Sources, error) {
   139  	sqlLatest := `select /* pgwatch_generated */
   140  	name, 
   141  	"group", 
   142  	dbtype, 
   143  	connstr,
   144  	coalesce(config, '{}'::jsonb) as config, 
   145  	coalesce(config_standby, '{}'::jsonb) as config_standby,
   146  	coalesce(preset_config, '') as preset_config,
   147  	coalesce(preset_config_standby, '') as preset_config_standby,
   148  	coalesce(include_pattern, '') as include_pattern, 
   149  	coalesce(exclude_pattern, '') as exclude_pattern,
   150  	coalesce(custom_tags, '{}'::jsonb) as custom_tags, 
   151  	only_if_master,
   152  	is_enabled
   153  from
   154  	pgwatch.source`
   155  	rows, err := r.configDb.Query(context.Background(), sqlLatest)
   156  	if err != nil {
   157  		return nil, err
   158  	}
   159  	return pgx.CollectRows[Source](rows, pgx.RowToStructByNameLax)
   160  }
   161