const (
metricCPULoad = "cpu_load"
metricPsutilCPU = "psutil_cpu"
metricPsutilDisk = "psutil_disk"
metricPsutilDiskIoTotal = "psutil_disk_io_total"
metricPsutilMem = "psutil_mem"
)
const (
specialMetricChangeEvents = "change_events"
specialMetricServerLogEventCounts = "server_log_event_counts"
specialMetricInstanceUp = "instance_up"
)
const csvLogDefaultGlobSuffix = "*.csv"
const csvLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
const dbMetricJoinStr = "¤¤¤" // just some unlikely string for a DB name to avoid using maps of maps for DB+metric data
const maxChunkSize uint64 = 10 * 1024 * 1024 // 10 MB
const maxTrackedFiles = 2500
const minTickInterval = 1 // seconds - floor for GCD to help handle zero/negative intervals
const sqlPgDirs = `select name, path from
(values
('data_directory', current_setting('data_directory')),
('pg_wal', current_setting('data_directory')||'/pg_wal'),
('log_directory', case
when current_setting('log_directory') ~ '^(\w:)?\/.+' then current_setting('log_directory')
else current_setting('data_directory') || '/' || current_setting('log_directory')
end)) as d(name, path)
union all
select spcname::text, pg_catalog.pg_tablespace_location(oid)
from pg_catalog.pg_tablespace where spcname !~ 'pg_.+'`
var directlyFetchableOSMetrics = []string{metricPsutilCPU, metricPsutilDisk, metricPsutilDiskIoTotal, metricPsutilMem, metricCPULoad}
var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
var lastSQLFetchError sync.Map
var metricDefs = NewConcurrentMetricDefs()
var metricsConfig metrics.MetricIntervals // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
Constants and types
var pgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
var pgSeveritiesLocale = map[string]map[string]string{ "C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"}, "de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"}, "fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"}, "it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"}, "ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"}, "pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"}, "ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"}, "sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"}, "tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"}, "zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"}, }
var prevCPULoadTimeStats cpu.TimesStat
"cache" of last CPU utilization stats for GetGoPsutilCPU to get more exact results and not having to sleep
var prevCPULoadTimeStatsLock sync.RWMutex
var prevCPULoadTimestamp time.Time
var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
func CheckFolderExistsAndReadable(path string) bool
func DoesEmergencyTriggerfileExist(fname string) bool
func GCDSlice(vals []int) int
GCDSlice computes GCD across a slice. Returns 0 for empty input.
func GetGoPsutilCPU(interval time.Duration) (metrics.Measurements, error)
GetGoPsutilCPU simulates "psutil" metric output. Assumes the result from last call as input
func GetGoPsutilDiskPG(pgDirs metrics.Measurements) (metrics.Measurements, error)
func GetGoPsutilDiskTotals() (metrics.Measurements, error)
func GetGoPsutilMem() (metrics.Measurements, error)
func GetLoadAvgLocal() (metrics.Measurements, error)
func GetPathUnderlyingDeviceID(path string) (uint64, error)
func IsDirectlyFetchableMetric(md *sources.SourceConn, metric string) bool
func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error)
func checkHasLocalPrivileges(logsDirPath string) error
func checkHasRemotePrivileges(ctx context.Context, mdb *sources.SourceConn, logsDirPath string) error
func cpuTotal(c cpu.TimesStat) float64
cpuTotal returns the total number of seconds across all CPU states. Guest and GuestNice are intentionally excluded because on Linux they are already counted within User and Nice respectively (/proc/stat semantics), so including them would double-count and skew percentage calculations.
func getFileWithLatestTimestamp(files []string) (string, error)
func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error)
func goPsutilCalcCPUUtilization(probe0, probe1 cpu.TimesStat) float64
func init()
func severityToEnglish(serverLang, errorSeverity string) string
func zeroEventCounts(eventCounts map[string]int64)
type ChangeDetectionResults struct {
Target string
Created int
Altered int
Dropped int
}
func (cdr *ChangeDetectionResults) String() string
func (cdr *ChangeDetectionResults) Total() int
type ConcurrentMetricDefs struct {
*metrics.Metrics
sync.RWMutex
}
func NewConcurrentMetricDefs() *ConcurrentMetricDefs
func (cmd *ConcurrentMetricDefs) Assign(newDefs *metrics.Metrics)
func (cmd *ConcurrentMetricDefs) GetMetricDef(name string) (m metrics.Metric, ok bool)
func (cmd *ConcurrentMetricDefs) GetPresetDef(name string) (m metrics.Preset, ok bool)
func (cmd *ConcurrentMetricDefs) GetPresetMetrics(name string) (m metrics.MetricIntervals)
type ExistingPartitionInfo struct {
StartTime time.Time
EndTime time.Time
}
type InstanceMetricCache struct {
cache map[string](metrics.Measurements) // [dbUnique+metric]lastly_fetched_data
sync.RWMutex
}
func NewInstanceMetricCache() *InstanceMetricCache
func (imc *InstanceMetricCache) Get(key string, age time.Duration) metrics.Measurements
func (imc *InstanceMetricCache) Put(key string, data metrics.Measurements)
type LogConfig struct {
CollectorEnabled bool
CSVDestination bool
TruncateOnRotation bool
Directory string
ServerMessagesLang string
}
func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (cfg *LogConfig, err error)
type LogParser struct {
*LogConfig
ctx context.Context
LogsMatchRegex *regexp.Regexp
SourceConn *sources.SourceConn
Interval time.Duration
StoreCh chan<- metrics.MeasurementEnvelope
eventCounts map[string]int64 // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
eventCountsTotal map[string]int64 // for the whole instance
lastSendTime time.Time
fileOffsets map[string]uint64 // map of log file paths to last read offsets
}
func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (lp *LogParser, err error)
func (lp *LogParser) GetMeasurementEnvelope() metrics.MeasurementEnvelope
GetMeasurementEnvelope converts current event counts to a MeasurementEnvelope
func (lp *LogParser) HasSendIntervalElapsed() bool
func (lp *LogParser) ParseLogs() error
func (lp *LogParser) parseLogsLocal() error
func (lp *LogParser) parseLogsRemote() error
func (lp *LogParser) regexMatchesToMap(matches []string) map[string]string
Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
type Reaper struct {
*cmdopts.Options
ready atomic.Bool
measurementCh chan metrics.MeasurementEnvelope
measurementCache *InstanceMetricCache
logger log.Logger
monitoredSources sources.SourceConns
prevLoopMonitoredDBs sources.SourceConns
cancelFuncs map[string]context.CancelFunc // [sourceName]cancel() — one per source
sourceReapers map[string]*SourceReaper // [sourceName] — active SourceReaper instances
}
func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper)
NewReaper creates a new Reaper instance
func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn)
func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool)
func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn)
CreateSourceHelpers creates the extensions and metric helpers for the monitored source
func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults)
func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
func (r *Reaper) FetchStatsDirectlyFromOS(ctx context.Context, md *sources.SourceConn, metricName string) (*metrics.MeasurementEnvelope, error)
func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error)
GetInstanceUpMeasurement returns a single measurement with "instance_up" metric used to detect if the instance is up or down
func (r *Reaper) GetMeasurementCache(key string) metrics.Measurements
GetMeasurementCache returns the instance-level metric cache
func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error)
func (r *Reaper) LoadMetrics() (err error)
LoadMetrics loads metric definitions from the reader
func (r *Reaper) LoadSources(ctx context.Context) (err error)
LoadSources loads sources from the reader
func (r *Reaper) PrintMemStats()
func (r *Reaper) Ready() bool
Ready() returns true if the service is healthy and operating correctly
func (r *Reaper) Reap(ctx context.Context)
Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements from the sources and storing them to the sinks. It also manages the lifecycle of the metric gatherers. In case of a source or metric definition change, it will start or stop the gatherers accordingly.
func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool)
func (r *Reaper) WriteInstanceDown(name string)
WriteInstanceDown writes instance_up = 0 metric to sinks for the given source
func (r *Reaper) WriteMeasurements(ctx context.Context)
WriteMeasurements() writes the metrics to the sinks
SourceReaper manages metric collection for a single monitored source. Instead of one goroutine per metric it runs a single GCD-based tick loop and batches SQL queries via pgx.Batch when the source is a real Postgres connection (non-pgbouncer, non-pgpool).
type SourceReaper struct {
reaper *Reaper
md *sources.SourceConn
lastFetch map[string]time.Time
lastUptimeS int64 // last seen postmaster_uptime_s for restart detection
degradedMetrics map[string]struct{} // metrics that failed individual retry; executed via fetchMetric until they recover
}
func NewSourceReaper(r *Reaper, md *sources.SourceConn) *SourceReaper
NewSourceReaper creates a SourceReaper for the given source connection.
func (sr *SourceReaper) CollectAndDispatch(ctx context.Context, rows pgx.Rows, name string, metric metrics.Metric) error
CollectAndDispatch is a helper that collects rows from a pgx.Rows and dispatches them.
func (sr *SourceReaper) Run(ctx context.Context)
Run is the main loop for a single source. It replaces N per-metric goroutines with one goroutine that batches SQL queries at GCD-aligned ticks.
func (sr *SourceReaper) activeMetrics() map[string]time.Duration
activeMetrics returns a snapshot copy of the currently active metric intervals based on the source's recovery state. Copying under the lock prevents data races when the caller iterates after the lock is released.
func (sr *SourceReaper) cacheKey(m metrics.Metric, name string) string
cacheKey returns the instance-level cache key for the given metric.
func (sr *SourceReaper) calcTickInterval() time.Duration
calcTickInterval computes GCD of all metric intervals with a minimum floor.
func (sr *SourceReaper) detectServerRestart(ctx context.Context, data metrics.Measurements)
detectServerRestart checks for PostgreSQL server restarts via postmaster_uptime_s in db_stats metric data and emits an object_changes measurement if detected.
func (sr *SourceReaper) dispatchMetricData(ctx context.Context, name string, metric metrics.Metric, data metrics.Measurements)
dispatchMetricData handles the post-fetch workflow for a collected metric: caching, sysinfo enrichment, sending, and restart detection.
func (sr *SourceReaper) executeBatch(ctx context.Context, entries []batchEntry) error
executeBatch sends all SQLs in a single pgx.Batch round-trip, dispatching each result immediately as it arrives. If any query fails, PostgreSQL's extended protocol aborts all subsequent queries in the same sync boundary (cascade failure). Any entry that returns an error from the batch is retried individually via fetchMetric to isolate real failures from cascade failures. Entries that fail even after the individual retry are marked as degraded so that subsequent runs use fetchMetric for them until they recover.
func (sr *SourceReaper) fetchMetric(ctx context.Context, entry batchEntry) error
fetchMetric executes a single SQL query and returns the resulting measurements.
func (sr *SourceReaper) fetchOSMetric(ctx context.Context, name string) error
fetchOSMetric handles gopsutil-based OS metrics.
func (sr *SourceReaper) fetchSpecialMetric(ctx context.Context, name, storageName string) error
fetchSpecialMetric handles change_events and instance_up metrics.
func (sr *SourceReaper) isRoleExcluded(m metrics.Metric) bool
isRoleExcluded returns true if the metric should be skipped based on the source's recovery state (e.g. primary-only metric on a standby).
func (sr *SourceReaper) runLogParser(ctx context.Context) error
runLogParser launches the server log event counts parser.
func (sr *SourceReaper) sendEnvelope(ctx context.Context, name, storageName string, data metrics.Measurements)
sendEnvelope adds sysinfo and dispatches a MeasurementEnvelope to the measurement channel.
batchEntry holds the minimum info needed to execute and dispatch a metric query.
type batchEntry struct {
name string
metric metrics.Metric
sql string
}