...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sources/postgres.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sources

     1  package sources
     2  
     3  // This file contains the implementation of the ReaderWriter interface for the PostgreSQL database.
     4  // Monitored sources are stored in the `pgwatch.source` table in the configuration database.
     5  
     6  import (
     7  	"context"
     8  
     9  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    10  	pgx "github.com/jackc/pgx/v5"
    11  )
    12  
    13  func NewPostgresSourcesReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error) {
    14  	conn, err := db.New(ctx, connstr)
    15  	if err != nil {
    16  		return nil, err
    17  	}
    18  	return NewPostgresSourcesReaderWriterConn(ctx, conn)
    19  }
    20  
    21  func NewPostgresSourcesReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error) {
    22  	return &dbSourcesReaderWriter{
    23  		ctx:      ctx,
    24  		configDb: conn,
    25  	}, conn.Ping(ctx)
    26  
    27  }
    28  
    29  type dbSourcesReaderWriter struct {
    30  	ctx      context.Context
    31  	configDb db.PgxIface
    32  }
    33  
    34  func (r *dbSourcesReaderWriter) WriteSources(dbs Sources) error {
    35  	tx, err := r.configDb.Begin(context.Background())
    36  	if err != nil {
    37  		return err
    38  	}
    39  	if _, err = tx.Exec(context.Background(), `truncate pgwatch.source`); err != nil {
    40  		return err
    41  	}
    42  	defer func() { _ = tx.Rollback(context.Background()) }()
    43  	for _, md := range dbs {
    44  		if err = r.updateDatabase(tx, md); err != nil {
    45  			return err
    46  		}
    47  	}
    48  	return tx.Commit(context.Background())
    49  }
    50  
    51  func (r *dbSourcesReaderWriter) updateDatabase(conn db.PgxIface, md Source) (err error) {
    52  	m := db.MarshallParamToJSONB
    53  	sql := `insert into pgwatch.source(
    54  	name, 
    55  	"group", 
    56  	dbtype, 
    57  	connstr, 
    58  	config, 
    59  	config_standby, 
    60  	preset_config, 
    61  	preset_config_standby, 
    62  	include_pattern, 
    63  	exclude_pattern, 
    64  	custom_tags, 
    65  	host_config, 
    66  	only_if_master) 
    67  values 
    68  	($1, $2, $3, $4, $5, $6, NULLIF($7, ''), NULLIF($8, ''), $9, $10, $11, $12, $13) 
    69  on conflict (name) do update set
    70  	"group" = $2, 
    71  	dbtype = $3, 
    72  	connstr = $4, 
    73  	config = $5, 
    74  	config_standby = $6, 
    75  	preset_config = NULLIF($7, ''),
    76  	preset_config_standby = NULLIF($8, ''), 
    77  	include_pattern = $9, 
    78  	exclude_pattern = $10, 
    79  	custom_tags = $11, 
    80  	host_config = $12,
    81  	only_if_master = $13`
    82  	_, err = conn.Exec(context.Background(), sql,
    83  		md.Name, md.Group, md.Kind,
    84  		md.ConnStr, m(md.Metrics), m(md.MetricsStandby), md.PresetMetrics, md.PresetMetricsStandby,
    85  		md.IncludePattern, md.ExcludePattern, m(md.CustomTags),
    86  		m(md.HostConfig), md.OnlyIfMaster)
    87  	return err
    88  }
    89  
    90  func (r *dbSourcesReaderWriter) UpdateSource(md Source) error {
    91  	return r.updateDatabase(r.configDb, md)
    92  }
    93  
    94  func (r *dbSourcesReaderWriter) DeleteSource(name string) error {
    95  	_, err := r.configDb.Exec(context.Background(), `delete from pgwatch.source where name = $1`, name)
    96  	return err
    97  }
    98  
    99  func (r *dbSourcesReaderWriter) GetSources() (dbs Sources, err error) {
   100  	sqlLatest := `select /* pgwatch_generated */
   101  	name, 
   102  	"group", 
   103  	dbtype, 
   104  	connstr,
   105  	coalesce(config, '{}'::jsonb) as config, 
   106  	coalesce(config_standby, '{}'::jsonb) as config_standby,
   107  	coalesce(preset_config, '') as preset_config,
   108  	coalesce(preset_config_standby, '') as preset_config_standby,
   109  	coalesce(include_pattern, '') as include_pattern, 
   110  	coalesce(exclude_pattern, '') as exclude_pattern,
   111  	coalesce(custom_tags, '{}'::jsonb) as custom_tags, 
   112  	coalesce(host_config, '{}') as host_config, 
   113  	only_if_master,
   114  	is_enabled
   115  from
   116  	pgwatch.source`
   117  	rows, err := r.configDb.Query(context.Background(), sqlLatest)
   118  	if err != nil {
   119  		return nil, err
   120  	}
   121  	dbs, err = pgx.CollectRows[Source](rows, pgx.RowToStructByNameLax)
   122  	return
   123  }
   124