...

Package sources

import "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
Overview
Index

Overview ▾

Provides functionality to read monitored data from different sources.

Sources defines how to get the information for the monitored databases. At the moment, sources definitions support two storages: * PostgreSQL database * YAML file

* `postgres.go` files cover the functionality for the PostgreSQL database. * `yaml.go` files cover the functionality for the YAML file. * `resolver.go` implements continuous discovery from patroni and postgres cluster. * `types.go` defines the types and interfaces. * `sample.sources.yaml` is a sample configuration file.

Index ▾

Constants
Variables
func getTransport(conf HostConfigAttrs) (*tls.Config, error)
func jsonTextToStringMap(jsonText string) (map[string]string, error)
func parseHostAndPortFromJdbcConnStr(connStr string) (string, string, error)
type CmdOpts
type HostConfigAttrs
type HostConfigPerMetricDisabledTimes
type Kind
    func (k Kind) IsValid() bool
type PatroniClusterMember
    func extractEtcdScopeMembers(ctx context.Context, s Source, scope string, kapi client.KV, addScopeToName bool) ([]PatroniClusterMember, error)
    func getConsulClusterMembers(Source) ([]PatroniClusterMember, error)
    func getEtcdClusterMembers(s Source) ([]PatroniClusterMember, error)
    func getZookeeperClusterMembers(Source) ([]PatroniClusterMember, error)
type Reader
type ReaderWriter
    func NewPostgresSourcesReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error)
    func NewPostgresSourcesReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error)
    func NewYAMLSourcesReaderWriter(ctx context.Context, path string) (ReaderWriter, error)
type Source
    func (s *Source) Clone() *Source
    func (s *Source) GetDatabaseName() string
    func (s Source) ResolveDatabases() (SourceConns, error)
type SourceConn
    func ResolveDatabasesFromPatroni(ce Source) ([]*SourceConn, error)
    func (md *SourceConn) Connect(ctx context.Context, opts CmdOpts) (err error)
    func (md *SourceConn) DiscoverPlatform(ctx context.Context) (platform string)
    func (md *SourceConn) FunctionExists(ctx context.Context, functionName string) (exists bool)
    func (md *SourceConn) GetApproxSize(ctx context.Context) (size int64, err error)
    func (md *SourceConn) GetDatabaseName() string
    func (md *SourceConn) IsPostgresSource() bool
    func (md *SourceConn) ParseConfig() (err error)
    func (md *SourceConn) Ping(ctx context.Context) (err error)
    func (md *SourceConn) SetDatabaseName(name string)
type SourceConns
    func ResolveDatabasesFromPostgres(s Source) (resolvedDbs SourceConns, err error)
    func (mds SourceConns) GetMonitoredDatabase(DBUniqueName string) *SourceConn
    func (mds SourceConns) SyncFromReader(r Reader) (newmds SourceConns, err error)
type Sources
    func (srcs Sources) ResolveDatabases() (_ SourceConns, err error)
    func (srcs Sources) Validate() (Sources, error)
type Writer
type dbSourcesReaderWriter
    func (r *dbSourcesReaderWriter) DeleteSource(name string) error
    func (r *dbSourcesReaderWriter) GetSources() (dbs Sources, err error)
    func (r *dbSourcesReaderWriter) UpdateSource(md Source) error
    func (r *dbSourcesReaderWriter) WriteSources(dbs Sources) error
    func (r *dbSourcesReaderWriter) updateDatabase(conn db.PgxIface, md Source) (err error)
type fileSourcesReaderWriter
    func (fcr *fileSourcesReaderWriter) DeleteSource(name string) error
    func (fcr *fileSourcesReaderWriter) GetSources() (dbs Sources, err error)
    func (fcr *fileSourcesReaderWriter) UpdateSource(md Source) error
    func (fcr *fileSourcesReaderWriter) WriteSources(mds Sources) error
    func (fcr *fileSourcesReaderWriter) expandEnvVars(md Source) Source
    func (fcr *fileSourcesReaderWriter) getSources(configFilePath string) (dbs Sources, err error)

Package files

cmdopts.go conn.go doc.go postgres.go resolver.go types.go yaml.go

Constants

const (
    dcsTypeEtcd      = "etcd"
    dcsTypeZookeeper = "zookeeper"
    dcsTypeConsul    = "consul"
)

Variables

NewConn and NewConnWithConfig are wrappers to allow testing

var (
    NewConn           = db.New
    NewConnWithConfig = db.NewWithConfig
)
var Kinds = []Kind{
    SourcePostgres,
    SourcePostgresContinuous,
    SourcePgBouncer,
    SourcePgPool,
    SourcePatroni,
    SourcePatroniContinuous,
    SourcePatroniNamespace,
}
var lastFoundClusterMembers = make(map[string][]PatroniClusterMember) // needed for cases where DCS is temporarily down
var logger log.LoggerIface = log.FallbackLogger

func getTransport

func getTransport(conf HostConfigAttrs) (*tls.Config, error)

func jsonTextToStringMap

func jsonTextToStringMap(jsonText string) (map[string]string, error)

func parseHostAndPortFromJdbcConnStr

func parseHostAndPortFromJdbcConnStr(connStr string) (string, string, error)

type CmdOpts

SourceOpts specifies the sources related command-line options

type CmdOpts struct {
    Sources                      string   `short:"s" long:"sources" mapstructure:"config" description:"Postgres URI, file or folder of YAML files containing info on which DBs to monitor" env:"PW_SOURCES"`
    Refresh                      int      `long:"refresh" mapstructure:"refresh" description:"How frequently to resync sources and metrics" env:"PW_REFRESH" default:"120"`
    Groups                       []string `short:"g" long:"group" mapstructure:"group" description:"Groups for filtering which databases to monitor. By default all are monitored" env:"PW_GROUP"`
    MinDbSizeMB                  int64    `long:"min-db-size-mb" mapstructure:"min-db-size-mb" description:"Smaller size DBs will be ignored and not monitored until they reach the threshold." env:"PW_MIN_DB_SIZE_MB" default:"0"`
    MaxParallelConnectionsPerDb  int      `long:"max-parallel-connections-per-db" mapstructure:"max-parallel-connections-per-db" description:"Max parallel metric fetches per DB. Note the multiplication effect on multi-DB instances" env:"PW_MAX_PARALLEL_CONNECTIONS_PER_DB" default:"4"`
    TryCreateListedExtsIfMissing string   `long:"try-create-listed-exts-if-missing" mapstructure:"try-create-listed-exts-if-missing" description:"Try creating the listed extensions (comma sep.) on first connect for all monitored DBs when missing. Main usage - pg_stat_statements" env:"PW_TRY_CREATE_LISTED_EXTS_IF_MISSING" default:""`
}

type HostConfigAttrs

type HostConfigAttrs struct {
    DcsType                string   `yaml:"dcs_type"`
    DcsEndpoints           []string `yaml:"dcs_endpoints"`
    Scope                  string
    Namespace              string
    Username               string
    Password               string
    CAFile                 string                             `yaml:"ca_file"`
    CertFile               string                             `yaml:"cert_file"`
    KeyFile                string                             `yaml:"key_file"`
    LogsGlobPath           string                             `yaml:"logs_glob_path"`   // default $data_directory / $log_directory / *.csvlog
    LogsMatchRegex         string                             `yaml:"logs_match_regex"` // default is for CSVLOG format. needs to capture following named groups: log_time, user_name, database_name and error_severity
    PerMetricDisabledTimes []HostConfigPerMetricDisabledTimes `yaml:"per_metric_disabled_intervals"`
}

type HostConfigPerMetricDisabledTimes

type HostConfigPerMetricDisabledTimes struct {
    Metrics       []string `yaml:"metrics"`
    DisabledTimes []string `yaml:"disabled_times"`
    DisabledDays  string   `yaml:"disabled_days"`
}

type Kind

type Kind string
const (
    SourcePostgres           Kind = "postgres"
    SourcePostgresContinuous Kind = "postgres-continuous-discovery"
    SourcePgBouncer          Kind = "pgbouncer"
    SourcePgPool             Kind = "pgpool"
    SourcePatroni            Kind = "patroni"
    SourcePatroniContinuous  Kind = "patroni-continuous-discovery"
    SourcePatroniNamespace   Kind = "patroni-namespace-discovery"
)

func (Kind) IsValid

func (k Kind) IsValid() bool

type PatroniClusterMember

type PatroniClusterMember struct {
    Scope   string
    Name    string
    ConnURL string `yaml:"conn_url"`
    Role    string
}

func extractEtcdScopeMembers

func extractEtcdScopeMembers(ctx context.Context, s Source, scope string, kapi client.KV, addScopeToName bool) ([]PatroniClusterMember, error)

func getConsulClusterMembers

func getConsulClusterMembers(Source) ([]PatroniClusterMember, error)

func getEtcdClusterMembers

func getEtcdClusterMembers(s Source) ([]PatroniClusterMember, error)

func getZookeeperClusterMembers

func getZookeeperClusterMembers(Source) ([]PatroniClusterMember, error)

type Reader

type Reader interface {
    GetSources() (Sources, error)
}

type ReaderWriter

type ReaderWriter interface {
    Reader
    Writer
}

func NewPostgresSourcesReaderWriter

func NewPostgresSourcesReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error)

func NewPostgresSourcesReaderWriterConn

func NewPostgresSourcesReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error)

func NewYAMLSourcesReaderWriter

func NewYAMLSourcesReaderWriter(ctx context.Context, path string) (ReaderWriter, error)

type Source

Source represents a configuration how to get databases to monitor. It can be a single database, a group of databases in postgres cluster, a group of databases in HA patroni cluster. pgbouncer and pgpool kinds are purely to indicate that the monitored database connection is made through a connection pooler, which supports its own additional metrics. If one is not interested in those additional metrics, it is ok to specify the connection details as a regular postgres source.

type Source struct {
    Name                 string             `yaml:"name" db:"name"`
    Group                string             `yaml:"group" db:"group"`
    ConnStr              string             `yaml:"conn_str" db:"connstr"`
    Metrics              map[string]float64 `yaml:"custom_metrics" db:"config"`
    MetricsStandby       map[string]float64 `yaml:"custom_metrics_standby" db:"config_standby"`
    Kind                 Kind               `yaml:"kind" db:"dbtype"`
    IncludePattern       string             `yaml:"include_pattern" db:"include_pattern"`
    ExcludePattern       string             `yaml:"exclude_pattern" db:"exclude_pattern"`
    PresetMetrics        string             `yaml:"preset_metrics" db:"preset_config"`
    PresetMetricsStandby string             `yaml:"preset_metrics_standby" db:"preset_config_standby"`
    IsEnabled            bool               `yaml:"is_enabled" db:"is_enabled"`
    CustomTags           map[string]string  `yaml:"custom_tags" db:"custom_tags"`
    HostConfig           HostConfigAttrs    `yaml:"host_config" db:"host_config"`
    OnlyIfMaster         bool               `yaml:"only_if_master" db:"only_if_master"`
}

func (*Source) Clone

func (s *Source) Clone() *Source

func (*Source) GetDatabaseName

func (s *Source) GetDatabaseName() string

func (Source) ResolveDatabases

func (s Source) ResolveDatabases() (SourceConns, error)

ResolveDatabases() return a slice of found databases for continuous monitoring sources, e.g. patroni

type SourceConn

SourceConn represents a single connection to monitor. Unlike source, it contains a database connection. Continuous discovery sources (postgres-continuous-discovery, patroni-continuous-discovery, patroni-namespace-discovery) will produce multiple monitored databases structs based on the discovered databases.

type SourceConn struct {
    Source
    Conn       db.PgxPoolIface
    ConnConfig *pgxpool.Config
}

func ResolveDatabasesFromPatroni

func ResolveDatabasesFromPatroni(ce Source) ([]*SourceConn, error)

func (*SourceConn) Connect

func (md *SourceConn) Connect(ctx context.Context, opts CmdOpts) (err error)

Connect will establish a connection to the database if it's not already connected. If the connection is already established, it pings the server to ensure it's still alive.

func (*SourceConn) DiscoverPlatform

func (md *SourceConn) DiscoverPlatform(ctx context.Context) (platform string)

TryDiscoverPlatform tries to discover the platform based on the database version string and some special settings that are only available on certain platforms. Returns the platform name or "UNKNOWN" if not sure.

func (*SourceConn) FunctionExists

func (md *SourceConn) FunctionExists(ctx context.Context, functionName string) (exists bool)

FunctionExists checks if a function exists in the database

func (*SourceConn) GetApproxSize

func (md *SourceConn) GetApproxSize(ctx context.Context) (size int64, err error)

GetApproxSize returns the approximate size of the database in bytes

func (*SourceConn) GetDatabaseName

func (md *SourceConn) GetDatabaseName() string

GetDatabaseName returns the database name from the connection string

func (*SourceConn) IsPostgresSource

func (md *SourceConn) IsPostgresSource() bool

func (*SourceConn) ParseConfig

func (md *SourceConn) ParseConfig() (err error)

ParseConfig will parse the connection string and store the result in the connection config

func (*SourceConn) Ping

func (md *SourceConn) Ping(ctx context.Context) (err error)

Ping will try to ping the server to ensure the connection is still alive

func (*SourceConn) SetDatabaseName

func (md *SourceConn) SetDatabaseName(name string)

SetDatabaseName sets the database name in the connection config for resolved databases

type SourceConns

SourceConn represents a single connection to monitor. Unlike source, it contains a database connection. Continuous discovery sources (postgres-continuous-discovery, patroni-continuous-discovery, patroni-namespace-discovery) will produce multiple monitored databases structs based on the discovered databases.

type SourceConns []*SourceConn

func ResolveDatabasesFromPostgres

func ResolveDatabasesFromPostgres(s Source) (resolvedDbs SourceConns, err error)

ResolveDatabasesFromPostgres reads all the databases from the given cluster, additionally matching/not matching specified regex patterns

func (SourceConns) GetMonitoredDatabase

func (mds SourceConns) GetMonitoredDatabase(DBUniqueName string) *SourceConn

func (SourceConns) SyncFromReader

func (mds SourceConns) SyncFromReader(r Reader) (newmds SourceConns, err error)

SyncFromReader will update the monitored databases with the latest configuration from the reader. Any resolution errors will be returned, e.g. etcd unavailability. It's up to the caller to proceed with the databases available or stop the execution due to errors.

type Sources

type Sources []Source

func (Sources) ResolveDatabases

func (srcs Sources) ResolveDatabases() (_ SourceConns, err error)

ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni

func (Sources) Validate

func (srcs Sources) Validate() (Sources, error)

type Writer

type Writer interface {
    WriteSources(Sources) error
    DeleteSource(string) error
    UpdateSource(md Source) error
}

type dbSourcesReaderWriter

type dbSourcesReaderWriter struct {
    ctx      context.Context
    configDb db.PgxIface
}

func (*dbSourcesReaderWriter) DeleteSource

func (r *dbSourcesReaderWriter) DeleteSource(name string) error

func (*dbSourcesReaderWriter) GetSources

func (r *dbSourcesReaderWriter) GetSources() (dbs Sources, err error)

func (*dbSourcesReaderWriter) UpdateSource

func (r *dbSourcesReaderWriter) UpdateSource(md Source) error

func (*dbSourcesReaderWriter) WriteSources

func (r *dbSourcesReaderWriter) WriteSources(dbs Sources) error

func (*dbSourcesReaderWriter) updateDatabase

func (r *dbSourcesReaderWriter) updateDatabase(conn db.PgxIface, md Source) (err error)

type fileSourcesReaderWriter

type fileSourcesReaderWriter struct {
    ctx  context.Context
    path string
}

func (*fileSourcesReaderWriter) DeleteSource

func (fcr *fileSourcesReaderWriter) DeleteSource(name string) error

func (*fileSourcesReaderWriter) GetSources

func (fcr *fileSourcesReaderWriter) GetSources() (dbs Sources, err error)

func (*fileSourcesReaderWriter) UpdateSource

func (fcr *fileSourcesReaderWriter) UpdateSource(md Source) error

func (*fileSourcesReaderWriter) WriteSources

func (fcr *fileSourcesReaderWriter) WriteSources(mds Sources) error

func (*fileSourcesReaderWriter) expandEnvVars

func (fcr *fileSourcesReaderWriter) expandEnvVars(md Source) Source

func (*fileSourcesReaderWriter) getSources

func (fcr *fileSourcesReaderWriter) getSources(configFilePath string) (dbs Sources, err error)