const ( execEnvUnknown = "UNKNOWN" execEnvAzureSingle = "AZURE_SINGLE" execEnvAzureFlexible = "AZURE_FLEXIBLE" execEnvGoogle = "GOOGLE" )
const ( metricCPULoad = "cpu_load" metricPsutilCPU = "psutil_cpu" metricPsutilDisk = "psutil_disk" metricPsutilDiskIoTotal = "psutil_disk_io_total" metricPsutilMem = "psutil_mem" )
const ( sqlPgDirs = `select current_setting('data_directory') as dd, current_setting('log_directory') as ld, current_setting('server_version_num')::int as pgver` sqlTsDirs = `select spcname::text as name, pg_catalog.pg_tablespace_location(oid) as location from pg_catalog.pg_tablespace where not spcname like any(array[E'pg\\_%'])` )
const ( monitoredDbsDatastoreSyncIntervalSeconds = 600 // write actively monitored DBs listing to metrics store after so many seconds monitoredDbsDatastoreSyncMetricName = "configured_dbs" // FYI - for Postgres datastore there's also the admin.all_unique_dbnames table with all recent DB unique names with some metric data dbSizeCachingInterval = 30 * time.Minute dbMetricJoinStr = "¤¤¤" // just some unlikely string for a DB name to avoid using maps of maps for DB+metric data )
const ( recoPrefix = "reco_" // special handling for metrics with such prefix, data stored in RECO_METRIC_NAME recoMetricName = "recommendations" specialMetricChangeEvents = "change_events" specialMetricServerLogEventCounts = "server_log_event_counts" specialMetricPgpoolStats = "pgpool_stats" specialMetricInstanceUp = "instance_up" specialMetricDbSize = "db_size" // can be transparently switched to db_size_approx on instances with very slow FS access (Azure Single Server) specialMetricTableStats = "table_stats" // can be transparently switched to table_stats_approx on instances with very slow FS (Azure Single Server) )
var MonitoredDatabasesSettings = make(map[string]MonitoredDatabaseSettings)
var MonitoredDatabasesSettingsGetLock = make(map[string]*sync.RWMutex) // synchronize initial PG version detection to 1 instance for each defined host
var MonitoredDatabasesSettingsLock = sync.RWMutex{}
var directlyFetchableOSMetrics = map[string]bool{metricPsutilCPU: true, metricPsutilDisk: true, metricPsutilDiskIoTotal: true, metricPsutilMem: true, metricCPULoad: true}
var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
var lastDBSizeCheckLock sync.RWMutex
var lastDBSizeFetchTime = make(map[string]time.Time) // cached for DB_SIZE_CACHING_INTERVAL
var lastDBSizeMB = make(map[string]int64)
var lastSQLFetchError sync.Map
var metricConfig map[string]float64 // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
var metricDefs = NewConcurrentMetricDefs()
var monitoredDbCache map[string]*sources.SourceConn
var monitoredDbCacheLock sync.RWMutex
var monitoredSources = make(sources.SourceConns, 0)
var prevCPULoadTimeStats cpu.TimesStat
"cache" of last CPU utilization stats for GetGoPsutilCPU to get more exact results and not having to sleep
var prevCPULoadTimeStatsLock sync.RWMutex
var prevCPULoadTimestamp time.Time
var prevLoopMonitoredDBs sources.SourceConns // to be able to detect DBs removed from config
var rBouncerAndPgpoolVerMatch = regexp.MustCompile(`\d+\.+\d+`) // extract $major.minor from "4.1.2 (karasukiboshi)" or "PgBouncer 1.12.0"
VersionToInt parses a given version and returns an integer or an error if unable to parse the version. Only parses valid semantic versions. Performs checking that can find errors within the version. Examples: v1.2 -> 01_02_00, v9.6.3 -> 09_06_03, v11 -> 11_00_00
var regVer = regexp.MustCompile(`(\d+).?(\d*).?(\d*)`)
var specialMetrics = map[string]bool{recoMetricName: true, specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
func CheckFolderExistsAndReadable(path string) bool
func CloseResourcesForRemovedMonitoredDBs(metricsWriter sinks.Writer, currentDBs, prevLoopDBs sources.SourceConns, shutDownDueToRoleChange map[string]bool)
func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error)
func DataRowsToMeasurementEnvelope(data metrics.Measurements, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.MeasurementEnvelope, error)
data + custom tags + counters
func DoesEmergencyTriggerfileExist(fname string) bool
func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.Measurements, error)
some extra work needed as pgpool SHOW commands don't specify the return data types for some reason
func FetchStatsDirectlyFromOS(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (*metrics.MeasurementEnvelope, error)
func GetAllRecoMetricsForVersion() (metrics.MetricDefs, error)
func GetGoPsutilCPU(interval time.Duration) ([]map[string]any, error)
Simulates "psutil" metric output. Assumes the result from last call as input, otherwise uses a 1s measurement
func GetGoPsutilDiskPG(DataDirs, TblspaceDirs []map[string]any) ([]map[string]any, error)
func GetGoPsutilDiskTotals() ([]map[string]any, error)
func GetGoPsutilMem() ([]map[string]any, error)
func GetLoadAvgLocal() ([]map[string]any, error)
func GetMonitoredDatabaseByUniqueName(name string) (*sources.SourceConn, error)
func GetPathUnderlyingDeviceID(path string) (uint64, error)
func GetRecommendations(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings) (metrics.Measurements, error)
func InitPGVersionInfoFetchingLockIfNil(md *sources.SourceConn)
func IsDirectlyFetchableMetric(metric string) bool
func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error)
func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error)
Called once on daemon startup to try to create "metric fething helper" functions automatically
func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string
Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing. With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
func UpdateMonitoredDBCache(data sources.SourceConns)
func VersionToInt(version string) (v int)
func goPsutilCalcCPUUtilization(probe0, probe1 cpu.TimesStat) float64
type ChangeDetectionResults struct { Created int Altered int Dropped int }
func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
func DetectIndexChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
func DetectSprocChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
func DetectTableChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults
type ConcurrentMetricDefs struct { *metrics.Metrics sync.RWMutex }
func NewConcurrentMetricDefs() *ConcurrentMetricDefs
func (cmd *ConcurrentMetricDefs) Assign(newDefs *metrics.Metrics)
func (cmd *ConcurrentMetricDefs) GetMetricDef(name string) (m metrics.Metric, ok bool)
func (cmd *ConcurrentMetricDefs) GetPresetDef(name string) (m metrics.Preset, ok bool)
func (cmd *ConcurrentMetricDefs) GetPresetMetrics(name string) (m map[string]float64)
type ExistingPartitionInfo struct { StartTime time.Time EndTime time.Time }
type InstanceMetricCache struct { cache map[string](metrics.Measurements) // [dbUnique+metric]lastly_fetched_data sync.RWMutex }
func NewInstanceMetricCache() *InstanceMetricCache
func (imc *InstanceMetricCache) Get(key string, age time.Duration) metrics.Measurements
func (imc *InstanceMetricCache) Put(key string, data metrics.Measurements)
type MetricFetchConfig struct { DBUniqueName string DBUniqueNameOrig string MetricName string Source sources.Kind Interval time.Duration CreatedOn time.Time StmtTimeoutOverride int64 }
type MonitoredDatabaseSettings struct { LastCheckedOn time.Time IsInRecovery bool VersionStr string Version int RealDbname string SystemIdentifier string IsSuperuser bool // if true and no helpers are installed, use superuser SQL version of metric if available Extensions map[string]int ExecEnv string ApproxDBSizeB int64 }
func GetMonitoredDatabaseSettings(ctx context.Context, md *sources.SourceConn, noCache bool) (MonitoredDatabaseSettings, error)
Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
type Reaper struct { *cmdopts.Options ready atomic.Bool measurementCh chan []metrics.MeasurementEnvelope measurementCache *InstanceMetricCache logger log.LoggerIface }
func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper, err error)
NewReaper creates a new Reaper instance
func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, ver MonitoredDatabaseSettings) metrics.Measurements
func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, hostState map[string]map[string]string)
func (r *Reaper) FetchMetric(ctx context.Context, msg MetricFetchConfig, hostState map[string]map[string]string) (*metrics.MeasurementEnvelope, error)
func (r *Reaper) LoadMetrics() (err error)
LoadMetrics loads metric definitions from the reader
func (r *Reaper) LoadSources() (err error)
LoadSources loads sources from the reader
func (r *Reaper) Ready() bool
Ready() returns true if the service is healthy and operating correctly
func (r *Reaper) Reap(ctx context.Context) (err error)
Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements from the sources and storing them to the sinks. It also manages the lifecycle of the metric gatherers. In case of a source or metric definition change, it will start or stop the gatherers accordingly.
func (r *Reaper) WriteMeasurements(ctx context.Context)
WriteMeasurements() writes the metrics to the sinks
func (r *Reaper) WriteMonitoredSources(ctx context.Context)
WriteMonitoredSources writes actively monitored DBs listing to sinks every monitoredDbsDatastoreSyncIntervalSeconds (default 10min)
func (r *Reaper) reapMetricMeasurements(ctx context.Context, mdb *sources.SourceConn, metricName string, interval float64)
metrics.ControlMessage notifies of shutdown + interval change