...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sources/conn.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sources

     1  package sources
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"math"
     7  	"regexp"
     8  	"strconv"
     9  	"sync"
    10  	"time"
    11  
    12  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    13  	"github.com/jackc/pgx/v5"
    14  	"github.com/jackc/pgx/v5/pgxpool"
    15  )
    16  
    17  // NewConn and NewConnWithConfig are wrappers to allow testing
    18  var (
    19  	NewConn           = db.New
    20  	NewConnWithConfig = db.NewWithConfig
    21  )
    22  
    23  const (
    24  	EnvUnknown       = "UNKNOWN"
    25  	EnvAzureSingle   = "AZURE_SINGLE" //discontinued
    26  	EnvAzureFlexible = "AZURE_FLEXIBLE"
    27  	EnvGoogle        = "GOOGLE"
    28  )
    29  
    30  type RuntimeInfo struct {
    31  	LastCheckedOn    time.Time
    32  	IsInRecovery     bool
    33  	VersionStr       string
    34  	Version          int
    35  	RealDbname       string
    36  	SystemIdentifier string
    37  	IsSuperuser      bool
    38  	Extensions       map[string]int
    39  	ExecEnv          string
    40  	ApproxDbSize     int64
    41  	ChangeState      map[string]map[string]string // ["category"][object_identifier] = state
    42  }
    43  
    44  // SourceConn represents a single connection to monitor. Unlike source, it contains a database connection.
    45  // Continuous discovery sources (postgres-continuous-discovery, patroni-continuous-discovery, patroni-namespace-discovery)
    46  // will produce multiple monitored databases structs based on the discovered databases.
    47  type (
    48  	SourceConn struct {
    49  		Source
    50  		Conn       db.PgxPoolIface
    51  		ConnConfig *pgxpool.Config
    52  		RuntimeInfo
    53  		sync.RWMutex
    54  	}
    55  
    56  	SourceConns []*SourceConn
    57  )
    58  
    59  func NewSourceConn(s Source) *SourceConn {
    60  	return &SourceConn{
    61  		Source: s,
    62  		RuntimeInfo: RuntimeInfo{
    63  			Extensions:  make(map[string]int),
    64  			ChangeState: make(map[string]map[string]string),
    65  		},
    66  	}
    67  }
    68  
    69  // Ping will try to ping the server to ensure the connection is still alive
    70  func (md *SourceConn) Ping(ctx context.Context) (err error) {
    71  	if md.Kind == SourcePgBouncer {
    72  		// pgbouncer is very picky about the queries it accepts
    73  		_, err = md.Conn.Exec(ctx, "SHOW VERSION")
    74  		return
    75  	}
    76  	return md.Conn.Ping(ctx)
    77  }
    78  
    79  // Connect will establish a connection to the database if it's not already connected.
    80  // If the connection is already established, it pings the server to ensure it's still alive.
    81  func (md *SourceConn) Connect(ctx context.Context, opts CmdOpts) (err error) {
    82  	if md.Conn == nil {
    83  		if err = md.ParseConfig(); err != nil {
    84  			return err
    85  		}
    86  		if md.Kind == SourcePgBouncer {
    87  			md.ConnConfig.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
    88  		}
    89  		if opts.MaxParallelConnectionsPerDb > 0 {
    90  			md.ConnConfig.MaxConns = int32(opts.MaxParallelConnectionsPerDb)
    91  		}
    92  		md.Conn, err = NewConnWithConfig(ctx, md.ConnConfig)
    93  		if err != nil {
    94  			return err
    95  		}
    96  	}
    97  	return md.Ping(ctx)
    98  }
    99  
   100  // ParseConfig will parse the connection string and store the result in the connection config
   101  func (md *SourceConn) ParseConfig() (err error) {
   102  	if md.ConnConfig == nil {
   103  		md.ConnConfig, err = pgxpool.ParseConfig(md.ConnStr)
   104  		return
   105  	}
   106  	return
   107  }
   108  
   109  // GetUniqueIdentifier returns a unique identifier for the host assuming SysId is the same for
   110  // primary and all replicas but connection information is different
   111  func (md *SourceConn) GetClusterIdentifier() string {
   112  	if err := md.ParseConfig(); err != nil {
   113  		return ""
   114  	}
   115  	return fmt.Sprintf("%s:%s:%d", md.SystemIdentifier, md.ConnConfig.ConnConfig.Host, md.ConnConfig.ConnConfig.Port)
   116  }
   117  
   118  // GetDatabaseName returns the database name from the connection string
   119  func (md *SourceConn) GetDatabaseName() string {
   120  	if err := md.ParseConfig(); err != nil {
   121  		return ""
   122  	}
   123  	return md.ConnConfig.ConnConfig.Database
   124  }
   125  
   126  // GetMetricInterval returns the metric interval for the connection
   127  func (md *SourceConn) GetMetricInterval(name string) float64 {
   128  	md.RLock()
   129  	defer md.RUnlock()
   130  	if md.IsInRecovery && len(md.MetricsStandby) > 0 {
   131  		return md.MetricsStandby[name]
   132  	}
   133  	return md.Metrics[name]
   134  }
   135  
   136  // SetDatabaseName sets the database name in the connection config for resolved databases
   137  func (md *SourceConn) SetDatabaseName(name string) {
   138  	if err := md.ParseConfig(); err != nil {
   139  		return
   140  	}
   141  	md.ConnStr = "" // unset the connection string to force conn config usage
   142  	md.ConnConfig.ConnConfig.Database = name
   143  }
   144  
   145  func (md *SourceConn) IsPostgresSource() bool {
   146  	return md.Kind != SourcePgBouncer && md.Kind != SourcePgPool
   147  }
   148  
   149  // VersionToInt parses a given version and returns an integer  or
   150  // an error if unable to parse the version. Only parses valid semantic versions.
   151  // Performs checking that can find errors within the version.
   152  // Examples: v1.2 -> 01_02_00, v9.6.3 -> 09_06_03, v11 -> 11_00_00
   153  var regVer = regexp.MustCompile(`(\d+).?(\d*).?(\d*)`)
   154  
   155  func VersionToInt(version string) (v int) {
   156  	if matches := regVer.FindStringSubmatch(version); len(matches) > 1 {
   157  		for i, match := range matches[1:] {
   158  			v += func() (m int) { m, _ = strconv.Atoi(match); return }() * int(math.Pow10(4-i*2))
   159  		}
   160  	}
   161  	return
   162  }
   163  
   164  func (md *SourceConn) FetchRuntimeInfo(ctx context.Context, forceRefetch bool) (err error) {
   165  	md.Lock()
   166  	defer md.Unlock()
   167  	if ctx.Err() != nil {
   168  		return ctx.Err()
   169  	}
   170  
   171  	if !forceRefetch && md.LastCheckedOn.After(time.Now().Add(time.Minute*-2)) { // use cached version for 2 min
   172  		return nil
   173  	}
   174  	switch md.Kind {
   175  	case SourcePgBouncer, SourcePgPool:
   176  		if md.VersionStr, md.Version, err = md.FetchVersion(ctx, func() string {
   177  			if md.Kind == SourcePgBouncer {
   178  				return "SHOW VERSION"
   179  			}
   180  			return "SHOW POOL_VERSION"
   181  		}()); err != nil {
   182  			return
   183  		}
   184  	default:
   185  		sql := `select /* pgwatch_generated */ 
   186  	div(current_setting('server_version_num')::int, 10000) as ver, 
   187  	version(), 
   188  	pg_is_in_recovery(), 
   189  	current_database()::TEXT,
   190  	system_identifier,
   191  	current_setting('is_superuser')::bool
   192  FROM
   193  	pg_control_system()`
   194  
   195  		err = md.Conn.QueryRow(ctx, sql).
   196  			Scan(&md.Version, &md.VersionStr,
   197  				&md.IsInRecovery, &md.RealDbname,
   198  				&md.SystemIdentifier, &md.IsSuperuser)
   199  		if err != nil {
   200  			return err
   201  		}
   202  
   203  		md.ExecEnv = md.DiscoverPlatform(ctx)
   204  		md.ApproxDbSize = md.FetchApproxSize(ctx)
   205  
   206  		sqlExtensions := `select /* pgwatch_generated */ extname::text, (regexp_matches(extversion, $$\d+\.?\d+?$$))[1]::text as extversion from pg_extension order by 1;`
   207  		var res pgx.Rows
   208  		res, err = md.Conn.Query(ctx, sqlExtensions)
   209  		if err == nil {
   210  			var ext string
   211  			var ver string
   212  			_, err = pgx.ForEachRow(res, []any{&ext, &ver}, func() error {
   213  				extver := VersionToInt(ver)
   214  				if extver == 0 {
   215  					return fmt.Errorf("unexpected extension %s version input: %s", ext, ver)
   216  				}
   217  				md.Extensions[ext] = extver
   218  				return nil
   219  			})
   220  		}
   221  
   222  	}
   223  	md.LastCheckedOn = time.Now()
   224  	return err
   225  }
   226  
   227  func (md *SourceConn) FetchVersion(ctx context.Context, sql string) (version string, ver int, err error) {
   228  	if err = md.Conn.QueryRow(ctx, sql, pgx.QueryExecModeSimpleProtocol).Scan(&version); err != nil {
   229  		return
   230  	}
   231  	ver = VersionToInt(version)
   232  	return
   233  }
   234  
   235  // TryDiscoverPlatform tries to discover the platform based on the database version string and some special settings
   236  // that are only available on certain platforms. Returns the platform name or "UNKNOWN" if not sure.
   237  func (md *SourceConn) DiscoverPlatform(ctx context.Context) (platform string) {
   238  	if md.ExecEnv != "" {
   239  		return md.ExecEnv // carry over as not likely to change ever
   240  	}
   241  	sql := `select /* pgwatch_generated */
   242  	case
   243  	  when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by Visual C' then 'AZURE_SINGLE'
   244  	  when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by gcc' then 'AZURE_FLEXIBLE'
   245  	  when exists (select * from pg_settings where name = 'cloudsql.supported_extensions') then 'GOOGLE'
   246  	else
   247  	  'UNKNOWN'
   248  	end as exec_env`
   249  	_ = md.Conn.QueryRow(ctx, sql).Scan(&platform)
   250  	return
   251  }
   252  
   253  // FetchApproxSize returns the approximate size of the database in bytes
   254  func (md *SourceConn) FetchApproxSize(ctx context.Context) (size int64) {
   255  	sqlApproxDBSize := `select /* pgwatch_generated */ current_setting('block_size')::int8 * sum(relpages) from pg_class c where c.relpersistence != 't'`
   256  	_ = md.Conn.QueryRow(ctx, sqlApproxDBSize).Scan(&size)
   257  	return
   258  }
   259  
   260  // FunctionExists checks if a function exists in the database
   261  func (md *SourceConn) FunctionExists(ctx context.Context, functionName string) (exists bool) {
   262  	sql := `select /* pgwatch_generated */ true 
   263  from 
   264  	pg_proc join pg_namespace n on pronamespace = n.oid 
   265  where 
   266  	proname = $1 and n.nspname = 'public'`
   267  	_ = md.Conn.QueryRow(ctx, sql, functionName).Scan(&exists)
   268  	return
   269  }
   270  
   271  func (mds SourceConns) GetMonitoredDatabase(DBUniqueName string) *SourceConn {
   272  	for _, md := range mds {
   273  		if md.Name == DBUniqueName {
   274  			return md
   275  		}
   276  	}
   277  	return nil
   278  }
   279