...

Package reaper

import "github.com/cybertec-postgresql/pgwatch/v3/internal/reaper"
Overview
Index

Overview ▾

Index ▾

Constants
Variables
func AddDbnameSysinfoIfNotExistsToQueryResultData(data metrics.Measurements, ver MonitoredDatabaseSettings, opts *cmdopts.Options) metrics.Measurements
func CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string)
func ClearDBUnreachableStateIfAny(dbUnique string)
func CloseResourcesForRemovedMonitoredDBs(metricsWriter *sinks.MultiWriter, currentDBs, prevLoopDBs sources.MonitoredDatabases, shutDownDueToRoleChange map[string]bool)
func DBExecRead(ctx context.Context, conn db.PgxIface, sql string, args ...any) (metrics.Measurements, error)
func DBExecReadByDbUniqueName(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error)
func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error)
func DatarowsToMetricstoreMessage(data metrics.Measurements, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.MeasurementEnvelope, error)
func DoesEmergencyTriggerfileExist(fname string) bool
func DoesFunctionExists(ctx context.Context, dbUnique, functionName string) bool
func FetchMetrics(ctx context.Context, msg MetricFetchConfig, hostState map[string]map[string]string, storageCh chan<- []metrics.MeasurementEnvelope, context string, opts *cmdopts.Options) ([]metrics.MeasurementEnvelope, error)
func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.Measurements, error)
func FetchStatsDirectlyFromOS(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) ([]metrics.MeasurementEnvelope, error)
func FilterPgbouncerData(ctx context.Context, data metrics.Measurements, databaseToKeep string, vme MonitoredDatabaseSettings) metrics.Measurements
func GetAllRecoMetricsForVersion(vme MonitoredDatabaseSettings) (map[string]metrics.Metric, error)
func GetConnByUniqueName(dbUnique string) db.PgxIface
func GetDBTotalApproxSize(ctx context.Context, dbUnique string) (int64, error)
func GetFromInstanceCacheIfNotOlderThanSeconds(msg MetricFetchConfig, maxAgeSeconds int64) metrics.Measurements
func GetGoPsutilDiskPG(ctx context.Context, dbUnique string) (metrics.Measurements, error)
func GetMetricVersionProperties(metric string, _ MonitoredDatabaseSettings, metricDefMap *metrics.Metrics) (metrics.Metric, error)
func GetMonitoredDatabaseByUniqueName(name string) (*sources.MonitoredDatabase, error)
func GetRecommendations(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings) (metrics.Measurements, error)
func InitPGVersionInfoFetchingLockIfNil(md *sources.MonitoredDatabase)
func InitSQLConnPoolForMonitoredDBIfNil(ctx context.Context, md *sources.MonitoredDatabase, maxConns int) (err error)
func IsCacheableMetric(msg MetricFetchConfig, mvp metrics.Metric) bool
func IsDBDormant(dbUnique string) bool
func IsDBIgnoredBasedOnRecoveryState(dbUnique string) bool
func IsDBUndersized(dbUnique string) bool
func IsDirectlyFetchableMetric(metric string) bool
func LoadMetricDefs(r metrics.Reader) (err error)
func PutToInstanceCache(msg MetricFetchConfig, data metrics.Measurements)
func SetDBUnreachableState(dbUnique string)
func SetRecoveryIgnoredDBState(dbUnique string, state bool)
func SetUndersizedDBState(dbUnique string, state bool)
func StoreMetrics(metrics []metrics.MeasurementEnvelope, storageCh chan<- []metrics.MeasurementEnvelope) (int, error)
func SyncMetricDefs(ctx context.Context, r metrics.Reader)
func SyncMonitoredDBsToDatastore(ctx context.Context, monitoredDbs []*sources.MonitoredDatabase, persistenceChannel chan []metrics.MeasurementEnvelope)
func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.MonitoredDatabase) (err error)
func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string
func TryDiscoverExecutionEnv(ctx context.Context, dbUnique string) (execEnv string)
func UpdateMonitoredDBCache(data sources.MonitoredDatabases)
func VersionToInt(version string) (v int)
func deepCopyMetricData(data metrics.Measurements) metrics.Measurements
type ChangeDetectionResults
    func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
    func DetectIndexChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
    func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
    func DetectSprocChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
    func DetectTableChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
type ExistingPartitionInfo
type MetricFetchConfig
type MonitoredDatabaseSettings
    func GetMonitoredDatabaseSettings(ctx context.Context, dbUnique string, srcType sources.Kind, noCache bool) (MonitoredDatabaseSettings, error)
type Reaper
    func NewReaper(opts *cmdopts.Options, sourcesReaderWriter sources.ReaderWriter, metricsReaderWriter metrics.ReaderWriter) *Reaper
    func (r *Reaper) Reap(mainContext context.Context) (err error)
    func (r *Reaper) reapMetricMeasurementsFromSource(ctx context.Context, dbUniqueName, dbUniqueNameOrig string, srcType sources.Kind, metricName string, configMap map[string]float64)

Package files

cache.go database.go file.go reaper.go recommendations.go types.go

Constants

const (
    execEnvUnknown       = "UNKNOWN"
    execEnvAzureSingle   = "AZURE_SINGLE"
    execEnvAzureFlexible = "AZURE_FLEXIBLE"
    execEnvGoogle        = "GOOGLE"
)
const (
    metricCPULoad           = "cpu_load"
    metricPsutilCPU         = "psutil_cpu"
    metricPsutilDisk        = "psutil_disk"
    metricPsutilDiskIoTotal = "psutil_disk_io_total"
    metricPsutilMem         = "psutil_mem"
)
const (
    recoPrefix                        = "reco_" // special handling for metrics with such prefix, data stored in RECO_METRIC_NAME
    recoMetricName                    = "recommendations"
    specialMetricChangeEvents         = "change_events"
    specialMetricServerLogEventCounts = "server_log_event_counts"
    specialMetricPgbouncer            = "^pgbouncer_(stats|pools)$"
    specialMetricPgpoolStats          = "pgpool_stats"
    specialMetricInstanceUp           = "instance_up"
    specialMetricDbSize               = "db_size"     // can be transparently switched to db_size_approx on instances with very slow FS access (Azure Single Server)
    specialMetricTableStats           = "table_stats" // can be transparently switched to table_stats_approx on instances with very slow FS (Azure Single Server)

)
const (
    epochColumnName     string = "epoch_ns" // this column (epoch in nanoseconds) is expected in every metric query
    tagPrefix           string = "tag_"
    persistQueueMaxSize        = 10000 // storage queue max elements. when reaching the limit, older metrics will be dropped.

    gathererStatusStart     = "START"
    gathererStatusStop      = "STOP"
    metricdbIdent           = "metricDb"
    configdbIdent           = "configDb"
    contextPrometheusScrape = "prometheus-scrape"

    monitoredDbsDatastoreSyncIntervalSeconds = 600              // write actively monitored DBs listing to metrics store after so many seconds
    monitoredDbsDatastoreSyncMetricName      = "configured_dbs" // FYI - for Postgres datastore there's also the admin.all_unique_dbnames table with all recent DB unique names with some metric data

    dbSizeCachingInterval = 30 * time.Minute
    dbMetricJoinStr       = "¤¤¤" // just some unlikely string for a DB name to avoid using maps of maps for DB+metric data

)
const metricDefinitionRefreshInterval time.Duration = time.Minute * 2 // min time before checking for new/changed metric definitions

Variables

var MonitoredDatabasesSettings = make(map[string]MonitoredDatabaseSettings)
var MonitoredDatabasesSettingsGetLock = make(map[string]*sync.RWMutex) // synchronize initial PG version detection to 1 instance for each defined host
var MonitoredDatabasesSettingsLock = sync.RWMutex{}
var directlyFetchableOSMetrics = map[string]bool{metricPsutilCPU: true, metricPsutilDisk: true, metricPsutilDiskIoTotal: true, metricPsutilMem: true, metricCPULoad: true}
var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
var hostMetricIntervalMap = make(map[string]float64) // [db1_metric] = 30
var instanceMetricCache = make(map[string](metrics.Measurements)) // [dbUnique+metric]lastly_fetched_data
var instanceMetricCacheLock = sync.RWMutex{}
var instanceMetricCacheTimestamp = make(map[string]time.Time) // [dbUnique+metric]last_fetch_time
var instanceMetricCacheTimestampLock = sync.RWMutex{}
var lastDBSizeCheckLock sync.RWMutex
var lastDBSizeFetchTime = make(map[string]time.Time) // cached for DB_SIZE_CACHING_INTERVAL
var lastDBSizeMB = make(map[string]int64)
var lastMonitoredDBsUpdate time.Time
var lastSQLFetchError sync.Map
var metricConfig map[string]float64 // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
var metricDefMapLock = sync.RWMutex{}
var metricDefinitionMap *metrics.Metrics = &metrics.Metrics{}
var monitoredDbCache map[string]*sources.MonitoredDatabase
var monitoredDbCacheLock sync.RWMutex
var monitoredDbs = make(sources.MonitoredDatabases, 0)
var pgBouncerNumericCountersStartVersion = 01_12_00 // pgBouncer changed internal counters data type in v1.12
var prevLoopMonitoredDBs sources.MonitoredDatabases // to be able to detect DBs removed from config
var rBouncerAndPgpoolVerMatch = regexp.MustCompile(`\d+\.+\d+`) // extract $major.minor from "4.1.2 (karasukiboshi)" or "PgBouncer 1.12.0"
var recoveryIgnoredDBs = make(map[string]bool) // DBs in recovery state and OnlyIfMaster specified in config
var recoveryIgnoredDBsLock = sync.RWMutex{}

VersionToInt parses a given version and returns an integer or an error if unable to parse the version. Only parses valid semantic versions. Performs checking that can find errors within the version. Examples: v1.2 -> 01_02_00, v9.6.3 -> 09_06_03, v11 -> 11_00_00

var regVer = regexp.MustCompile(`(\d+).?(\d*).?(\d*)`)
var regexIsPgbouncerMetrics = regexp.MustCompile(specialMetricPgbouncer)
var specialMetrics = map[string]bool{recoMetricName: true, specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
var undersizedDBs = make(map[string]bool) // DBs below the --min-db-size-mb limit, if set
var undersizedDBsLock = sync.RWMutex{}
var unreachableDB = make(map[string]time.Time)
var unreachableDBsLock sync.RWMutex

func AddDbnameSysinfoIfNotExistsToQueryResultData

func AddDbnameSysinfoIfNotExistsToQueryResultData(data metrics.Measurements, ver MonitoredDatabaseSettings, opts *cmdopts.Options) metrics.Measurements

func CheckForPGObjectChangesAndStore

func CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string)

func ClearDBUnreachableStateIfAny

func ClearDBUnreachableStateIfAny(dbUnique string)

func CloseResourcesForRemovedMonitoredDBs

func CloseResourcesForRemovedMonitoredDBs(metricsWriter *sinks.MultiWriter, currentDBs, prevLoopDBs sources.MonitoredDatabases, shutDownDueToRoleChange map[string]bool)

func DBExecRead

func DBExecRead(ctx context.Context, conn db.PgxIface, sql string, args ...any) (metrics.Measurements, error)

func DBExecReadByDbUniqueName

func DBExecReadByDbUniqueName(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error)

func DBGetSizeMB

func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error)

func DatarowsToMetricstoreMessage

func DatarowsToMetricstoreMessage(data metrics.Measurements, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.MeasurementEnvelope, error)

data + custom tags + counters

func DoesEmergencyTriggerfileExist

func DoesEmergencyTriggerfileExist(fname string) bool

func DoesFunctionExists

func DoesFunctionExists(ctx context.Context, dbUnique, functionName string) bool

func FetchMetrics

func FetchMetrics(ctx context.Context,
    msg MetricFetchConfig,
    hostState map[string]map[string]string,
    storageCh chan<- []metrics.MeasurementEnvelope,
    context string,
    opts *cmdopts.Options) ([]metrics.MeasurementEnvelope, error)

func FetchMetricsPgpool

func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.Measurements, error)

some extra work needed as pgpool SHOW commands don't specify the return data types for some reason

func FetchStatsDirectlyFromOS

func FetchStatsDirectlyFromOS(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) ([]metrics.MeasurementEnvelope, error)

func FilterPgbouncerData

func FilterPgbouncerData(ctx context.Context, data metrics.Measurements, databaseToKeep string, vme MonitoredDatabaseSettings) metrics.Measurements

func GetAllRecoMetricsForVersion

func GetAllRecoMetricsForVersion(vme MonitoredDatabaseSettings) (map[string]metrics.Metric, error)

func GetConnByUniqueName

func GetConnByUniqueName(dbUnique string) db.PgxIface

func GetDBTotalApproxSize

func GetDBTotalApproxSize(ctx context.Context, dbUnique string) (int64, error)

func GetFromInstanceCacheIfNotOlderThanSeconds

func GetFromInstanceCacheIfNotOlderThanSeconds(msg MetricFetchConfig, maxAgeSeconds int64) metrics.Measurements

func GetGoPsutilDiskPG

func GetGoPsutilDiskPG(ctx context.Context, dbUnique string) (metrics.Measurements, error)

connects actually to the instance to determine PG relevant disk paths / mounts

func GetMetricVersionProperties

func GetMetricVersionProperties(metric string, _ MonitoredDatabaseSettings, metricDefMap *metrics.Metrics) (metrics.Metric, error)

assumes upwards compatibility for versions

func GetMonitoredDatabaseByUniqueName

func GetMonitoredDatabaseByUniqueName(name string) (*sources.MonitoredDatabase, error)

func GetRecommendations

func GetRecommendations(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings) (metrics.Measurements, error)

func InitPGVersionInfoFetchingLockIfNil

func InitPGVersionInfoFetchingLockIfNil(md *sources.MonitoredDatabase)

func InitSQLConnPoolForMonitoredDBIfNil

func InitSQLConnPoolForMonitoredDBIfNil(ctx context.Context, md *sources.MonitoredDatabase, maxConns int) (err error)

every DB under monitoring should have exactly 1 sql.DB connection assigned, that will internally limit parallel access

func IsCacheableMetric

func IsCacheableMetric(msg MetricFetchConfig, mvp metrics.Metric) bool

func IsDBDormant

func IsDBDormant(dbUnique string) bool

func IsDBIgnoredBasedOnRecoveryState

func IsDBIgnoredBasedOnRecoveryState(dbUnique string) bool

func IsDBUndersized

func IsDBUndersized(dbUnique string) bool

func IsDirectlyFetchableMetric

func IsDirectlyFetchableMetric(metric string) bool

func LoadMetricDefs

func LoadMetricDefs(r metrics.Reader) (err error)

LoadMetricDefs loads metric definitions from the reader

func PutToInstanceCache

func PutToInstanceCache(msg MetricFetchConfig, data metrics.Measurements)

func SetDBUnreachableState

func SetDBUnreachableState(dbUnique string)

func SetRecoveryIgnoredDBState

func SetRecoveryIgnoredDBState(dbUnique string, state bool)

func SetUndersizedDBState

func SetUndersizedDBState(dbUnique string, state bool)

func StoreMetrics

func StoreMetrics(metrics []metrics.MeasurementEnvelope, storageCh chan<- []metrics.MeasurementEnvelope) (int, error)

func SyncMetricDefs

func SyncMetricDefs(ctx context.Context, r metrics.Reader)

SyncMetricDefs refreshes metric definitions at regular intervals

func SyncMonitoredDBsToDatastore

func SyncMonitoredDBsToDatastore(ctx context.Context, monitoredDbs []*sources.MonitoredDatabase, persistenceChannel chan []metrics.MeasurementEnvelope)

func TryCreateMetricsFetchingHelpers

func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.MonitoredDatabase) (err error)

Called once on daemon startup to try to create "metric fething helper" functions automatically

func TryCreateMissingExtensions

func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string

Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing. With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific whitelisting or "trusted extensions" (a feature from v13). Ignores errors.

func TryDiscoverExecutionEnv

func TryDiscoverExecutionEnv(ctx context.Context, dbUnique string) (execEnv string)

func UpdateMonitoredDBCache

func UpdateMonitoredDBCache(data sources.MonitoredDatabases)

func VersionToInt

func VersionToInt(version string) (v int)

func deepCopyMetricData

func deepCopyMetricData(data metrics.Measurements) metrics.Measurements

type ChangeDetectionResults

type ChangeDetectionResults struct {
    Created int
    Altered int
    Dropped int
}

func DetectConfigurationChanges

func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults

func DetectIndexChanges

func DetectIndexChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults

func DetectPrivilegeChanges

func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults

func DetectSprocChanges

func DetectSprocChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults

func DetectTableChanges

func DetectTableChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults

type ExistingPartitionInfo

type ExistingPartitionInfo struct {
    StartTime time.Time
    EndTime   time.Time
}

type MetricFetchConfig

type MetricFetchConfig struct {
    DBUniqueName        string
    DBUniqueNameOrig    string
    MetricName          string
    Source              sources.Kind
    Interval            time.Duration
    CreatedOn           time.Time
    StmtTimeoutOverride int64
}

type MonitoredDatabaseSettings

type MonitoredDatabaseSettings struct {
    LastCheckedOn    time.Time
    IsInRecovery     bool
    VersionStr       string
    Version          int
    RealDbname       string
    SystemIdentifier string
    IsSuperuser      bool // if true and no helpers are installed, use superuser SQL version of metric if available
    Extensions       map[string]int
    ExecEnv          string
    ApproxDBSizeB    int64
}

func GetMonitoredDatabaseSettings

func GetMonitoredDatabaseSettings(ctx context.Context, dbUnique string, srcType sources.Kind, noCache bool) (MonitoredDatabaseSettings, error)

type Reaper

type Reaper struct {
    opts                *cmdopts.Options
    sourcesReaderWriter sources.ReaderWriter
    metricsReaderWriter metrics.ReaderWriter
    measurementCh       chan []metrics.MeasurementEnvelope
}

func NewReaper

func NewReaper(opts *cmdopts.Options, sourcesReaderWriter sources.ReaderWriter, metricsReaderWriter metrics.ReaderWriter) *Reaper

func (*Reaper) Reap

func (r *Reaper) Reap(mainContext context.Context) (err error)

func (*Reaper) reapMetricMeasurementsFromSource

func (r *Reaper) reapMetricMeasurementsFromSource(ctx context.Context,
    dbUniqueName, dbUniqueNameOrig string,
    srcType sources.Kind,
    metricName string,
    configMap map[string]float64)

metrics.ControlMessage notifies of shutdown + interval change