const ( dcsTypeEtcd = "etcd" dcsTypeZookeeper = "zookeeper" dcsTypeConsul = "consul" )
NewConn and NewConnWithConfig are wrappers to allow testing
var ( NewConn = db.New NewConnWithConfig = db.NewWithConfig )
var Kinds = []Kind{ SourcePostgres, SourcePostgresContinuous, SourcePgBouncer, SourcePgPool, SourcePatroni, SourcePatroniContinuous, SourcePatroniNamespace, }
var lastFoundClusterMembers = make(map[string][]PatroniClusterMember) // needed for cases where DCS is temporarily down
var logger log.LoggerIface = log.FallbackLogger
func getTransport(conf HostConfigAttrs) (*tls.Config, error)
func jsonTextToStringMap(jsonText string) (map[string]string, error)
func parseHostAndPortFromJdbcConnStr(connStr string) (string, string, error)
SourceOpts specifies the sources related command-line options
type CmdOpts struct { Sources string `short:"s" long:"sources" mapstructure:"config" description:"Postgres URI, file or folder of YAML files containing info on which DBs to monitor" env:"PW_SOURCES"` Refresh int `long:"refresh" mapstructure:"refresh" description:"How frequently to resync sources and metrics" env:"PW_REFRESH" default:"120"` Groups []string `short:"g" long:"group" mapstructure:"group" description:"Groups for filtering which databases to monitor. By default all are monitored" env:"PW_GROUP"` MinDbSizeMB int64 `long:"min-db-size-mb" mapstructure:"min-db-size-mb" description:"Smaller size DBs will be ignored and not monitored until they reach the threshold." env:"PW_MIN_DB_SIZE_MB" default:"0"` MaxParallelConnectionsPerDb int `long:"max-parallel-connections-per-db" mapstructure:"max-parallel-connections-per-db" description:"Max parallel metric fetches per DB. Note the multiplication effect on multi-DB instances" env:"PW_MAX_PARALLEL_CONNECTIONS_PER_DB" default:"4"` TryCreateListedExtsIfMissing string `long:"try-create-listed-exts-if-missing" mapstructure:"try-create-listed-exts-if-missing" description:"Try creating the listed extensions (comma sep.) on first connect for all monitored DBs when missing. Main usage - pg_stat_statements" env:"PW_TRY_CREATE_LISTED_EXTS_IF_MISSING" default:""` }
type HostConfigAttrs struct { DcsType string `yaml:"dcs_type"` DcsEndpoints []string `yaml:"dcs_endpoints"` Scope string Namespace string Username string Password string CAFile string `yaml:"ca_file"` CertFile string `yaml:"cert_file"` KeyFile string `yaml:"key_file"` LogsGlobPath string `yaml:"logs_glob_path"` // default $data_directory / $log_directory / *.csvlog LogsMatchRegex string `yaml:"logs_match_regex"` // default is for CSVLOG format. needs to capture following named groups: log_time, user_name, database_name and error_severity PerMetricDisabledTimes []HostConfigPerMetricDisabledTimes `yaml:"per_metric_disabled_intervals"` }
type HostConfigPerMetricDisabledTimes struct { Metrics []string `yaml:"metrics"` DisabledTimes []string `yaml:"disabled_times"` DisabledDays string `yaml:"disabled_days"` }
type Kind string
const ( SourcePostgres Kind = "postgres" SourcePostgresContinuous Kind = "postgres-continuous-discovery" SourcePgBouncer Kind = "pgbouncer" SourcePgPool Kind = "pgpool" SourcePatroni Kind = "patroni" SourcePatroniContinuous Kind = "patroni-continuous-discovery" SourcePatroniNamespace Kind = "patroni-namespace-discovery" )
func (k Kind) IsValid() bool
type PatroniClusterMember struct { Scope string Name string ConnURL string `yaml:"conn_url"` Role string }
func extractEtcdScopeMembers(ctx context.Context, s Source, scope string, kapi client.KV, addScopeToName bool) ([]PatroniClusterMember, error)
func getConsulClusterMembers(Source) ([]PatroniClusterMember, error)
func getEtcdClusterMembers(s Source) ([]PatroniClusterMember, error)
func getZookeeperClusterMembers(Source) ([]PatroniClusterMember, error)
type Reader interface { GetSources() (Sources, error) }
type ReaderWriter interface { Reader Writer }
func NewPostgresSourcesReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error)
func NewPostgresSourcesReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error)
func NewYAMLSourcesReaderWriter(ctx context.Context, path string) (ReaderWriter, error)
Source represents a configuration how to get databases to monitor. It can be a single database, a group of databases in postgres cluster, a group of databases in HA patroni cluster. pgbouncer and pgpool kinds are purely to indicate that the monitored database connection is made through a connection pooler, which supports its own additional metrics. If one is not interested in those additional metrics, it is ok to specify the connection details as a regular postgres source.
type Source struct { Name string `yaml:"name" db:"name"` Group string `yaml:"group" db:"group"` ConnStr string `yaml:"conn_str" db:"connstr"` Metrics map[string]float64 `yaml:"custom_metrics" db:"config"` MetricsStandby map[string]float64 `yaml:"custom_metrics_standby" db:"config_standby"` Kind Kind `yaml:"kind" db:"dbtype"` IncludePattern string `yaml:"include_pattern" db:"include_pattern"` ExcludePattern string `yaml:"exclude_pattern" db:"exclude_pattern"` PresetMetrics string `yaml:"preset_metrics" db:"preset_config"` PresetMetricsStandby string `yaml:"preset_metrics_standby" db:"preset_config_standby"` IsEnabled bool `yaml:"is_enabled" db:"is_enabled"` CustomTags map[string]string `yaml:"custom_tags" db:"custom_tags"` HostConfig HostConfigAttrs `yaml:"host_config" db:"host_config"` OnlyIfMaster bool `yaml:"only_if_master" db:"only_if_master"` }
func (s *Source) Clone() *Source
func (s *Source) GetDatabaseName() string
func (s Source) ResolveDatabases() (SourceConns, error)
ResolveDatabases() return a slice of found databases for continuous monitoring sources, e.g. patroni
SourceConn represents a single connection to monitor. Unlike source, it contains a database connection. Continuous discovery sources (postgres-continuous-discovery, patroni-continuous-discovery, patroni-namespace-discovery) will produce multiple monitored databases structs based on the discovered databases.
type SourceConn struct { Source Conn db.PgxPoolIface ConnConfig *pgxpool.Config }
func ResolveDatabasesFromPatroni(ce Source) ([]*SourceConn, error)
func (md *SourceConn) Connect(ctx context.Context, opts CmdOpts) (err error)
Connect will establish a connection to the database if it's not already connected. If the connection is already established, it pings the server to ensure it's still alive.
func (md *SourceConn) DiscoverPlatform(ctx context.Context) (platform string)
TryDiscoverPlatform tries to discover the platform based on the database version string and some special settings that are only available on certain platforms. Returns the platform name or "UNKNOWN" if not sure.
func (md *SourceConn) FunctionExists(ctx context.Context, functionName string) (exists bool)
FunctionExists checks if a function exists in the database
func (md *SourceConn) GetApproxSize(ctx context.Context) (size int64, err error)
GetApproxSize returns the approximate size of the database in bytes
func (md *SourceConn) GetDatabaseName() string
GetDatabaseName returns the database name from the connection string
func (md *SourceConn) IsPostgresSource() bool
func (md *SourceConn) ParseConfig() (err error)
ParseConfig will parse the connection string and store the result in the connection config
func (md *SourceConn) Ping(ctx context.Context) (err error)
Ping will try to ping the server to ensure the connection is still alive
func (md *SourceConn) SetDatabaseName(name string)
SetDatabaseName sets the database name in the connection config for resolved databases
SourceConn represents a single connection to monitor. Unlike source, it contains a database connection. Continuous discovery sources (postgres-continuous-discovery, patroni-continuous-discovery, patroni-namespace-discovery) will produce multiple monitored databases structs based on the discovered databases.
type SourceConns []*SourceConn
func ResolveDatabasesFromPostgres(s Source) (resolvedDbs SourceConns, err error)
ResolveDatabasesFromPostgres reads all the databases from the given cluster, additionally matching/not matching specified regex patterns
func (mds SourceConns) GetMonitoredDatabase(DBUniqueName string) *SourceConn
func (mds SourceConns) SyncFromReader(r Reader) (newmds SourceConns, err error)
SyncFromReader will update the monitored databases with the latest configuration from the reader. Any resolution errors will be returned, e.g. etcd unavailability. It's up to the caller to proceed with the databases available or stop the execution due to errors.
type Sources []Source
func (srcs Sources) ResolveDatabases() (_ SourceConns, err error)
ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni
func (srcs Sources) Validate() (Sources, error)
type Writer interface { WriteSources(Sources) error DeleteSource(string) error UpdateSource(md Source) error }
type dbSourcesReaderWriter struct { ctx context.Context configDb db.PgxIface }
func (r *dbSourcesReaderWriter) DeleteSource(name string) error
func (r *dbSourcesReaderWriter) GetSources() (dbs Sources, err error)
func (r *dbSourcesReaderWriter) UpdateSource(md Source) error
func (r *dbSourcesReaderWriter) WriteSources(dbs Sources) error
func (r *dbSourcesReaderWriter) updateDatabase(conn db.PgxIface, md Source) (err error)
type fileSourcesReaderWriter struct { ctx context.Context path string }
func (fcr *fileSourcesReaderWriter) DeleteSource(name string) error
func (fcr *fileSourcesReaderWriter) GetSources() (dbs Sources, err error)
func (fcr *fileSourcesReaderWriter) UpdateSource(md Source) error
func (fcr *fileSourcesReaderWriter) WriteSources(mds Sources) error
func (fcr *fileSourcesReaderWriter) expandEnvVars(md Source) Source
func (fcr *fileSourcesReaderWriter) getSources(configFilePath string) (dbs Sources, err error)