...

Package reaper

import "github.com/cybertec-postgresql/pgwatch/v3/internal/reaper"
Overview
Index

Overview ▾

Package reaper is responsible to query the metrics from monitored sources and send measurements to sinks.

Index ▾

Constants
Variables
func CheckFolderExistsAndReadable(path string) bool
func CloseResourcesForRemovedMonitoredDBs(metricsWriter sinks.Writer, currentDBs, prevLoopDBs sources.SourceConns, shutDownDueToRoleChange map[string]bool)
func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error)
func DataRowsToMeasurementEnvelope(data metrics.Measurements, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.MeasurementEnvelope, error)
func DoesEmergencyTriggerfileExist(fname string) bool
func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.Measurements, error)
func FetchStatsDirectlyFromOS(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (*metrics.MeasurementEnvelope, error)
func GetAllRecoMetricsForVersion() (metrics.MetricDefs, error)
func GetGoPsutilCPU(interval time.Duration) ([]map[string]any, error)
func GetGoPsutilDiskPG(DataDirs, TblspaceDirs []map[string]any) ([]map[string]any, error)
func GetGoPsutilDiskTotals() ([]map[string]any, error)
func GetGoPsutilMem() ([]map[string]any, error)
func GetLoadAvgLocal() ([]map[string]any, error)
func GetMonitoredDatabaseByUniqueName(name string) (*sources.SourceConn, error)
func GetPathUnderlyingDeviceID(path string) (uint64, error)
func GetRecommendations(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings) (metrics.Measurements, error)
func InitPGVersionInfoFetchingLockIfNil(md *sources.SourceConn)
func IsDirectlyFetchableMetric(metric string) bool
func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error)
func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error)
func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string
func UpdateMonitoredDBCache(data sources.SourceConns)
func VersionToInt(version string) (v int)
func goPsutilCalcCPUUtilization(probe0, probe1 cpu.TimesStat) float64
type ChangeDetectionResults
    func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
    func DetectIndexChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
    func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
    func DetectSprocChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
    func DetectTableChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
type ConcurrentMetricDefs
    func NewConcurrentMetricDefs() *ConcurrentMetricDefs
    func (cmd *ConcurrentMetricDefs) Assign(newDefs *metrics.Metrics)
    func (cmd *ConcurrentMetricDefs) GetMetricDef(name string) (m metrics.Metric, ok bool)
    func (cmd *ConcurrentMetricDefs) GetPresetDef(name string) (m metrics.Preset, ok bool)
    func (cmd *ConcurrentMetricDefs) GetPresetMetrics(name string) (m map[string]float64)
type ExistingPartitionInfo
type InstanceMetricCache
    func NewInstanceMetricCache() *InstanceMetricCache
    func (imc *InstanceMetricCache) Get(key string, age time.Duration) metrics.Measurements
    func (imc *InstanceMetricCache) Put(key string, data metrics.Measurements)
type MetricFetchConfig
type MonitoredDatabaseSettings
    func GetMonitoredDatabaseSettings(ctx context.Context, md *sources.SourceConn, noCache bool) (MonitoredDatabaseSettings, error)
type Reaper
    func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper, err error)
    func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, ver MonitoredDatabaseSettings) metrics.Measurements
    func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, hostState map[string]map[string]string)
    func (r *Reaper) FetchMetric(ctx context.Context, msg MetricFetchConfig, hostState map[string]map[string]string) (*metrics.MeasurementEnvelope, error)
    func (r *Reaper) LoadMetrics() (err error)
    func (r *Reaper) LoadSources() (err error)
    func (r *Reaper) Ready() bool
    func (r *Reaper) Reap(ctx context.Context) (err error)
    func (r *Reaper) WriteMeasurements(ctx context.Context)
    func (r *Reaper) WriteMonitoredSources(ctx context.Context)
    func (r *Reaper) reapMetricMeasurements(ctx context.Context, mdb *sources.SourceConn, metricName string, interval float64)

Package files

cache.go database.go doc.go file.go metric.go psutil.go psutil_linux.go reaper.go recommendations.go

Constants

const (
    execEnvUnknown       = "UNKNOWN"
    execEnvAzureSingle   = "AZURE_SINGLE"
    execEnvAzureFlexible = "AZURE_FLEXIBLE"
    execEnvGoogle        = "GOOGLE"
)
const (
    metricCPULoad           = "cpu_load"
    metricPsutilCPU         = "psutil_cpu"
    metricPsutilDisk        = "psutil_disk"
    metricPsutilDiskIoTotal = "psutil_disk_io_total"
    metricPsutilMem         = "psutil_mem"
)
const (
    sqlPgDirs = `select 
current_setting('data_directory') as dd, 
current_setting('log_directory') as ld, 
current_setting('server_version_num')::int as pgver`
    sqlTsDirs = `select 
spcname::text as name, 
pg_catalog.pg_tablespace_location(oid) as location 
from pg_catalog.pg_tablespace 
where not spcname like any(array[E'pg\\_%'])`
)
const (
    monitoredDbsDatastoreSyncIntervalSeconds = 600              // write actively monitored DBs listing to metrics store after so many seconds
    monitoredDbsDatastoreSyncMetricName      = "configured_dbs" // FYI - for Postgres datastore there's also the admin.all_unique_dbnames table with all recent DB unique names with some metric data

    dbSizeCachingInterval = 30 * time.Minute
    dbMetricJoinStr       = "¤¤¤" // just some unlikely string for a DB name to avoid using maps of maps for DB+metric data

)
const (
    recoPrefix                        = "reco_" // special handling for metrics with such prefix, data stored in RECO_METRIC_NAME
    recoMetricName                    = "recommendations"
    specialMetricChangeEvents         = "change_events"
    specialMetricServerLogEventCounts = "server_log_event_counts"
    specialMetricPgpoolStats          = "pgpool_stats"
    specialMetricInstanceUp           = "instance_up"
    specialMetricDbSize               = "db_size"     // can be transparently switched to db_size_approx on instances with very slow FS access (Azure Single Server)
    specialMetricTableStats           = "table_stats" // can be transparently switched to table_stats_approx on instances with very slow FS (Azure Single Server)

)

Variables

var MonitoredDatabasesSettings = make(map[string]MonitoredDatabaseSettings)
var MonitoredDatabasesSettingsGetLock = make(map[string]*sync.RWMutex) // synchronize initial PG version detection to 1 instance for each defined host
var MonitoredDatabasesSettingsLock = sync.RWMutex{}
var directlyFetchableOSMetrics = map[string]bool{metricPsutilCPU: true, metricPsutilDisk: true, metricPsutilDiskIoTotal: true, metricPsutilMem: true, metricCPULoad: true}
var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
var lastDBSizeCheckLock sync.RWMutex
var lastDBSizeFetchTime = make(map[string]time.Time) // cached for DB_SIZE_CACHING_INTERVAL
var lastDBSizeMB = make(map[string]int64)
var lastSQLFetchError sync.Map
var metricConfig map[string]float64 // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
var metricDefs = NewConcurrentMetricDefs()
var monitoredDbCache map[string]*sources.SourceConn
var monitoredDbCacheLock sync.RWMutex
var monitoredSources = make(sources.SourceConns, 0)
var prevCPULoadTimeStats cpu.TimesStat

"cache" of last CPU utilization stats for GetGoPsutilCPU to get more exact results and not having to sleep

var prevCPULoadTimeStatsLock sync.RWMutex
var prevCPULoadTimestamp time.Time
var prevLoopMonitoredDBs sources.SourceConns // to be able to detect DBs removed from config
var rBouncerAndPgpoolVerMatch = regexp.MustCompile(`\d+\.+\d+`) // extract $major.minor from "4.1.2 (karasukiboshi)" or "PgBouncer 1.12.0"

VersionToInt parses a given version and returns an integer or an error if unable to parse the version. Only parses valid semantic versions. Performs checking that can find errors within the version. Examples: v1.2 -> 01_02_00, v9.6.3 -> 09_06_03, v11 -> 11_00_00

var regVer = regexp.MustCompile(`(\d+).?(\d*).?(\d*)`)
var specialMetrics = map[string]bool{recoMetricName: true, specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}

func CheckFolderExistsAndReadable

func CheckFolderExistsAndReadable(path string) bool

func CloseResourcesForRemovedMonitoredDBs

func CloseResourcesForRemovedMonitoredDBs(metricsWriter sinks.Writer, currentDBs, prevLoopDBs sources.SourceConns, shutDownDueToRoleChange map[string]bool)

func DBGetSizeMB

func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error)

func DataRowsToMeasurementEnvelope

func DataRowsToMeasurementEnvelope(data metrics.Measurements, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.MeasurementEnvelope, error)

data + custom tags + counters

func DoesEmergencyTriggerfileExist

func DoesEmergencyTriggerfileExist(fname string) bool

func FetchMetricsPgpool

func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.Measurements, error)

some extra work needed as pgpool SHOW commands don't specify the return data types for some reason

func FetchStatsDirectlyFromOS

func FetchStatsDirectlyFromOS(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (*metrics.MeasurementEnvelope, error)

func GetAllRecoMetricsForVersion

func GetAllRecoMetricsForVersion() (metrics.MetricDefs, error)

func GetGoPsutilCPU

func GetGoPsutilCPU(interval time.Duration) ([]map[string]any, error)

Simulates "psutil" metric output. Assumes the result from last call as input, otherwise uses a 1s measurement

func GetGoPsutilDiskPG

func GetGoPsutilDiskPG(DataDirs, TblspaceDirs []map[string]any) ([]map[string]any, error)

func GetGoPsutilDiskTotals

func GetGoPsutilDiskTotals() ([]map[string]any, error)

func GetGoPsutilMem

func GetGoPsutilMem() ([]map[string]any, error)

func GetLoadAvgLocal

func GetLoadAvgLocal() ([]map[string]any, error)

func GetMonitoredDatabaseByUniqueName

func GetMonitoredDatabaseByUniqueName(name string) (*sources.SourceConn, error)

func GetPathUnderlyingDeviceID

func GetPathUnderlyingDeviceID(path string) (uint64, error)

func GetRecommendations

func GetRecommendations(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings) (metrics.Measurements, error)

func InitPGVersionInfoFetchingLockIfNil

func InitPGVersionInfoFetchingLockIfNil(md *sources.SourceConn)

func IsDirectlyFetchableMetric

func IsDirectlyFetchableMetric(metric string) bool

func QueryMeasurements

func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error)

func TryCreateMetricsFetchingHelpers

func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error)

Called once on daemon startup to try to create "metric fething helper" functions automatically

func TryCreateMissingExtensions

func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string

Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing. With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific whitelisting or "trusted extensions" (a feature from v13). Ignores errors.

func UpdateMonitoredDBCache

func UpdateMonitoredDBCache(data sources.SourceConns)

func VersionToInt

func VersionToInt(version string) (v int)

func goPsutilCalcCPUUtilization

func goPsutilCalcCPUUtilization(probe0, probe1 cpu.TimesStat) float64

type ChangeDetectionResults

type ChangeDetectionResults struct {
    Created int
    Altered int
    Dropped int
}

func DetectConfigurationChanges

func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults

func DetectIndexChanges

func DetectIndexChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults

func DetectPrivilegeChanges

func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults

func DetectSprocChanges

func DetectSprocChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults

func DetectTableChanges

func DetectTableChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults

type ConcurrentMetricDefs

type ConcurrentMetricDefs struct {
    *metrics.Metrics
    sync.RWMutex
}

func NewConcurrentMetricDefs

func NewConcurrentMetricDefs() *ConcurrentMetricDefs

func (*ConcurrentMetricDefs) Assign

func (cmd *ConcurrentMetricDefs) Assign(newDefs *metrics.Metrics)

func (*ConcurrentMetricDefs) GetMetricDef

func (cmd *ConcurrentMetricDefs) GetMetricDef(name string) (m metrics.Metric, ok bool)

func (*ConcurrentMetricDefs) GetPresetDef

func (cmd *ConcurrentMetricDefs) GetPresetDef(name string) (m metrics.Preset, ok bool)

func (*ConcurrentMetricDefs) GetPresetMetrics

func (cmd *ConcurrentMetricDefs) GetPresetMetrics(name string) (m map[string]float64)

type ExistingPartitionInfo

type ExistingPartitionInfo struct {
    StartTime time.Time
    EndTime   time.Time
}

type InstanceMetricCache

type InstanceMetricCache struct {
    cache map[string](metrics.Measurements) // [dbUnique+metric]lastly_fetched_data
    sync.RWMutex
}

func NewInstanceMetricCache

func NewInstanceMetricCache() *InstanceMetricCache

func (*InstanceMetricCache) Get

func (imc *InstanceMetricCache) Get(key string, age time.Duration) metrics.Measurements

func (*InstanceMetricCache) Put

func (imc *InstanceMetricCache) Put(key string, data metrics.Measurements)

type MetricFetchConfig

type MetricFetchConfig struct {
    DBUniqueName        string
    DBUniqueNameOrig    string
    MetricName          string
    Source              sources.Kind
    Interval            time.Duration
    CreatedOn           time.Time
    StmtTimeoutOverride int64
}

type MonitoredDatabaseSettings

type MonitoredDatabaseSettings struct {
    LastCheckedOn    time.Time
    IsInRecovery     bool
    VersionStr       string
    Version          int
    RealDbname       string
    SystemIdentifier string
    IsSuperuser      bool // if true and no helpers are installed, use superuser SQL version of metric if available
    Extensions       map[string]int
    ExecEnv          string
    ApproxDBSizeB    int64
}

func GetMonitoredDatabaseSettings

func GetMonitoredDatabaseSettings(ctx context.Context, md *sources.SourceConn, noCache bool) (MonitoredDatabaseSettings, error)

type Reaper

Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks

type Reaper struct {
    *cmdopts.Options
    ready            atomic.Bool
    measurementCh    chan []metrics.MeasurementEnvelope
    measurementCache *InstanceMetricCache
    logger           log.LoggerIface
}

func NewReaper

func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper, err error)

NewReaper creates a new Reaper instance

func (*Reaper) AddSysinfoToMeasurements

func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, ver MonitoredDatabaseSettings) metrics.Measurements

func (*Reaper) CheckForPGObjectChangesAndStore

func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, hostState map[string]map[string]string)

func (*Reaper) FetchMetric

func (r *Reaper) FetchMetric(ctx context.Context, msg MetricFetchConfig, hostState map[string]map[string]string) (*metrics.MeasurementEnvelope, error)

func (*Reaper) LoadMetrics

func (r *Reaper) LoadMetrics() (err error)

LoadMetrics loads metric definitions from the reader

func (*Reaper) LoadSources

func (r *Reaper) LoadSources() (err error)

LoadSources loads sources from the reader

func (*Reaper) Ready

func (r *Reaper) Ready() bool

Ready() returns true if the service is healthy and operating correctly

func (*Reaper) Reap

func (r *Reaper) Reap(ctx context.Context) (err error)

Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements from the sources and storing them to the sinks. It also manages the lifecycle of the metric gatherers. In case of a source or metric definition change, it will start or stop the gatherers accordingly.

func (*Reaper) WriteMeasurements

func (r *Reaper) WriteMeasurements(ctx context.Context)

WriteMeasurements() writes the metrics to the sinks

func (*Reaper) WriteMonitoredSources

func (r *Reaper) WriteMonitoredSources(ctx context.Context)

WriteMonitoredSources writes actively monitored DBs listing to sinks every monitoredDbsDatastoreSyncIntervalSeconds (default 10min)

func (*Reaper) reapMetricMeasurements

func (r *Reaper) reapMetricMeasurements(ctx context.Context, mdb *sources.SourceConn, metricName string, interval float64)

metrics.ControlMessage notifies of shutdown + interval change