...

Package reaper

import "github.com/cybertec-postgresql/pgwatch/v5/internal/reaper"
Overview
Index

Overview ▾

Package reaper is responsible to query the metrics from monitored sources and send measurements to sinks.

Index ▾

Constants
Variables
func CheckFolderExistsAndReadable(path string) bool
func DoesEmergencyTriggerfileExist(fname string) bool
func GCDSlice(vals []int) int
func GetGoPsutilCPU(interval time.Duration) (metrics.Measurements, error)
func GetGoPsutilDiskPG(pgDirs metrics.Measurements) (metrics.Measurements, error)
func GetGoPsutilDiskTotals() (metrics.Measurements, error)
func GetGoPsutilMem() (metrics.Measurements, error)
func GetLoadAvgLocal() (metrics.Measurements, error)
func GetPathUnderlyingDeviceID(path string) (uint64, error)
func IsDirectlyFetchableMetric(md *sources.SourceConn, metric string) bool
func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error)
func checkHasLocalPrivileges(logsDirPath string) error
func checkHasRemotePrivileges(ctx context.Context, mdb *sources.SourceConn, logsDirPath string) error
func cpuTotal(c cpu.TimesStat) float64
func getFileWithLatestTimestamp(files []string) (string, error)
func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error)
func goPsutilCalcCPUUtilization(probe0, probe1 cpu.TimesStat) float64
func init()
func severityToEnglish(serverLang, errorSeverity string) string
func zeroEventCounts(eventCounts map[string]int64)
type ChangeDetectionResults
    func (cdr *ChangeDetectionResults) String() string
    func (cdr *ChangeDetectionResults) Total() int
type ConcurrentMetricDefs
    func NewConcurrentMetricDefs() *ConcurrentMetricDefs
    func (cmd *ConcurrentMetricDefs) Assign(newDefs *metrics.Metrics)
    func (cmd *ConcurrentMetricDefs) GetMetricDef(name string) (m metrics.Metric, ok bool)
    func (cmd *ConcurrentMetricDefs) GetPresetDef(name string) (m metrics.Preset, ok bool)
    func (cmd *ConcurrentMetricDefs) GetPresetMetrics(name string) (m metrics.MetricIntervals)
type ExistingPartitionInfo
type InstanceMetricCache
    func NewInstanceMetricCache() *InstanceMetricCache
    func (imc *InstanceMetricCache) Get(key string, age time.Duration) metrics.Measurements
    func (imc *InstanceMetricCache) Put(key string, data metrics.Measurements)
type LogConfig
    func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (cfg *LogConfig, err error)
type LogParser
    func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (lp *LogParser, err error)
    func (lp *LogParser) GetMeasurementEnvelope() metrics.MeasurementEnvelope
    func (lp *LogParser) HasSendIntervalElapsed() bool
    func (lp *LogParser) ParseLogs() error
    func (lp *LogParser) parseLogsLocal() error
    func (lp *LogParser) parseLogsRemote() error
    func (lp *LogParser) regexMatchesToMap(matches []string) map[string]string
type Reaper
    func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper)
    func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn)
    func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool)
    func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn)
    func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
    func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
    func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
    func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults)
    func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
    func (r *Reaper) FetchStatsDirectlyFromOS(ctx context.Context, md *sources.SourceConn, metricName string) (*metrics.MeasurementEnvelope, error)
    func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error)
    func (r *Reaper) GetMeasurementCache(key string) metrics.Measurements
    func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error)
    func (r *Reaper) LoadMetrics() (err error)
    func (r *Reaper) LoadSources(ctx context.Context) (err error)
    func (r *Reaper) PrintMemStats()
    func (r *Reaper) Ready() bool
    func (r *Reaper) Reap(ctx context.Context)
    func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool)
    func (r *Reaper) WriteInstanceDown(name string)
    func (r *Reaper) WriteMeasurements(ctx context.Context)
type SourceReaper
    func NewSourceReaper(r *Reaper, md *sources.SourceConn) *SourceReaper
    func (sr *SourceReaper) CollectAndDispatch(ctx context.Context, rows pgx.Rows, name string, metric metrics.Metric) error
    func (sr *SourceReaper) Run(ctx context.Context)
    func (sr *SourceReaper) activeMetrics() map[string]time.Duration
    func (sr *SourceReaper) cacheKey(m metrics.Metric, name string) string
    func (sr *SourceReaper) calcTickInterval() time.Duration
    func (sr *SourceReaper) detectServerRestart(ctx context.Context, data metrics.Measurements)
    func (sr *SourceReaper) dispatchMetricData(ctx context.Context, name string, metric metrics.Metric, data metrics.Measurements)
    func (sr *SourceReaper) executeBatch(ctx context.Context, entries []batchEntry) error
    func (sr *SourceReaper) fetchMetric(ctx context.Context, entry batchEntry) error
    func (sr *SourceReaper) fetchOSMetric(ctx context.Context, name string) error
    func (sr *SourceReaper) fetchSpecialMetric(ctx context.Context, name, storageName string) error
    func (sr *SourceReaper) isRoleExcluded(m metrics.Metric) bool
    func (sr *SourceReaper) runLogParser(ctx context.Context) error
    func (sr *SourceReaper) sendEnvelope(ctx context.Context, name, storageName string, data metrics.Measurements)
type batchEntry

Package files

cache.go database.go doc.go file.go logparser.go logparser_local.go logparser_remote.go metric.go psutil.go psutil_linux.go reaper.go source_reaper.go

Constants

const (
    metricCPULoad           = "cpu_load"
    metricPsutilCPU         = "psutil_cpu"
    metricPsutilDisk        = "psutil_disk"
    metricPsutilDiskIoTotal = "psutil_disk_io_total"
    metricPsutilMem         = "psutil_mem"
)
const (
    specialMetricChangeEvents         = "change_events"
    specialMetricServerLogEventCounts = "server_log_event_counts"
    specialMetricInstanceUp           = "instance_up"
)
const csvLogDefaultGlobSuffix = "*.csv"
const csvLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
const dbMetricJoinStr = "¤¤¤" // just some unlikely string for a DB name to avoid using maps of maps for DB+metric data
const maxChunkSize uint64 = 10 * 1024 * 1024 // 10 MB
const maxTrackedFiles = 2500
const minTickInterval = 1 // seconds - floor for GCD to help handle zero/negative intervals
const sqlPgDirs = `select name, path from 
(values 
    ('data_directory', current_setting('data_directory')),
    ('pg_wal', current_setting('data_directory')||'/pg_wal'),
    ('log_directory', case 
        when current_setting('log_directory') ~ '^(\w:)?\/.+' then current_setting('log_directory') 
        else current_setting('data_directory') || '/' || current_setting('log_directory') 
    end)) as d(name, path)
union all
select spcname::text, pg_catalog.pg_tablespace_location(oid)
from pg_catalog.pg_tablespace where spcname !~ 'pg_.+'`

Variables

var directlyFetchableOSMetrics = []string{metricPsutilCPU, metricPsutilDisk, metricPsutilDiskIoTotal, metricPsutilMem, metricCPULoad}
var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
var lastSQLFetchError sync.Map
var metricDefs = NewConcurrentMetricDefs()
var metricsConfig metrics.MetricIntervals // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state

Constants and types

var pgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
var pgSeveritiesLocale = map[string]map[string]string{
    "C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"},
    "de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    "fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"},
    "it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"},
    "ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"},
    "pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"},
    "ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"},
    "sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"},
    "tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"},
    "zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"},
}
var prevCPULoadTimeStats cpu.TimesStat

"cache" of last CPU utilization stats for GetGoPsutilCPU to get more exact results and not having to sleep

var prevCPULoadTimeStatsLock sync.RWMutex
var prevCPULoadTimestamp time.Time
var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}

func CheckFolderExistsAndReadable

func CheckFolderExistsAndReadable(path string) bool

func DoesEmergencyTriggerfileExist

func DoesEmergencyTriggerfileExist(fname string) bool

func GCDSlice

func GCDSlice(vals []int) int

GCDSlice computes GCD across a slice. Returns 0 for empty input.

func GetGoPsutilCPU

func GetGoPsutilCPU(interval time.Duration) (metrics.Measurements, error)

GetGoPsutilCPU simulates "psutil" metric output. Assumes the result from last call as input

func GetGoPsutilDiskPG

func GetGoPsutilDiskPG(pgDirs metrics.Measurements) (metrics.Measurements, error)

func GetGoPsutilDiskTotals

func GetGoPsutilDiskTotals() (metrics.Measurements, error)

func GetGoPsutilMem

func GetGoPsutilMem() (metrics.Measurements, error)

func GetLoadAvgLocal

func GetLoadAvgLocal() (metrics.Measurements, error)

func GetPathUnderlyingDeviceID

func GetPathUnderlyingDeviceID(path string) (uint64, error)

func IsDirectlyFetchableMetric

func IsDirectlyFetchableMetric(md *sources.SourceConn, metric string) bool

func QueryMeasurements

func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error)

func checkHasLocalPrivileges

func checkHasLocalPrivileges(logsDirPath string) error

func checkHasRemotePrivileges

func checkHasRemotePrivileges(ctx context.Context, mdb *sources.SourceConn, logsDirPath string) error

func cpuTotal

func cpuTotal(c cpu.TimesStat) float64

cpuTotal returns the total number of seconds across all CPU states. Guest and GuestNice are intentionally excluded because on Linux they are already counted within User and Nice respectively (/proc/stat semantics), so including them would double-count and skew percentage calculations.

func getFileWithLatestTimestamp

func getFileWithLatestTimestamp(files []string) (string, error)

func getFileWithNextModTimestamp

func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error)

func goPsutilCalcCPUUtilization

func goPsutilCalcCPUUtilization(probe0, probe1 cpu.TimesStat) float64

func init

func init()

func severityToEnglish

func severityToEnglish(serverLang, errorSeverity string) string

func zeroEventCounts

func zeroEventCounts(eventCounts map[string]int64)

type ChangeDetectionResults

type ChangeDetectionResults struct {
    Target  string
    Created int
    Altered int
    Dropped int
}

func (*ChangeDetectionResults) String

func (cdr *ChangeDetectionResults) String() string

func (*ChangeDetectionResults) Total

func (cdr *ChangeDetectionResults) Total() int

type ConcurrentMetricDefs

type ConcurrentMetricDefs struct {
    *metrics.Metrics
    sync.RWMutex
}

func NewConcurrentMetricDefs

func NewConcurrentMetricDefs() *ConcurrentMetricDefs

func (*ConcurrentMetricDefs) Assign

func (cmd *ConcurrentMetricDefs) Assign(newDefs *metrics.Metrics)

func (*ConcurrentMetricDefs) GetMetricDef

func (cmd *ConcurrentMetricDefs) GetMetricDef(name string) (m metrics.Metric, ok bool)

func (*ConcurrentMetricDefs) GetPresetDef

func (cmd *ConcurrentMetricDefs) GetPresetDef(name string) (m metrics.Preset, ok bool)

func (*ConcurrentMetricDefs) GetPresetMetrics

func (cmd *ConcurrentMetricDefs) GetPresetMetrics(name string) (m metrics.MetricIntervals)

type ExistingPartitionInfo

type ExistingPartitionInfo struct {
    StartTime time.Time
    EndTime   time.Time
}

type InstanceMetricCache

type InstanceMetricCache struct {
    cache map[string](metrics.Measurements) // [dbUnique+metric]lastly_fetched_data
    sync.RWMutex
}

func NewInstanceMetricCache

func NewInstanceMetricCache() *InstanceMetricCache

func (*InstanceMetricCache) Get

func (imc *InstanceMetricCache) Get(key string, age time.Duration) metrics.Measurements

func (*InstanceMetricCache) Put

func (imc *InstanceMetricCache) Put(key string, data metrics.Measurements)

type LogConfig

type LogConfig struct {
    CollectorEnabled   bool
    CSVDestination     bool
    TruncateOnRotation bool
    Directory          string
    ServerMessagesLang string
}

func tryDetermineLogSettings

func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (cfg *LogConfig, err error)

type LogParser

type LogParser struct {
    *LogConfig
    ctx              context.Context
    LogsMatchRegex   *regexp.Regexp
    SourceConn       *sources.SourceConn
    Interval         time.Duration
    StoreCh          chan<- metrics.MeasurementEnvelope
    eventCounts      map[string]int64 // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
    eventCountsTotal map[string]int64 // for the whole instance
    lastSendTime     time.Time
    fileOffsets      map[string]uint64 // map of log file paths to last read offsets
}

func NewLogParser

func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (lp *LogParser, err error)

func (*LogParser) GetMeasurementEnvelope

func (lp *LogParser) GetMeasurementEnvelope() metrics.MeasurementEnvelope

GetMeasurementEnvelope converts current event counts to a MeasurementEnvelope

func (*LogParser) HasSendIntervalElapsed

func (lp *LogParser) HasSendIntervalElapsed() bool

func (*LogParser) ParseLogs

func (lp *LogParser) ParseLogs() error

func (*LogParser) parseLogsLocal

func (lp *LogParser) parseLogsLocal() error

func (*LogParser) parseLogsRemote

func (lp *LogParser) parseLogsRemote() error

func (*LogParser) regexMatchesToMap

func (lp *LogParser) regexMatchesToMap(matches []string) map[string]string

type Reaper

Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks

type Reaper struct {
    *cmdopts.Options
    ready                atomic.Bool
    measurementCh        chan metrics.MeasurementEnvelope
    measurementCache     *InstanceMetricCache
    logger               log.Logger
    monitoredSources     sources.SourceConns
    prevLoopMonitoredDBs sources.SourceConns
    cancelFuncs          map[string]context.CancelFunc // [sourceName]cancel() — one per source
    sourceReapers        map[string]*SourceReaper      // [sourceName] — active SourceReaper instances
}

func NewReaper

func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper)

NewReaper creates a new Reaper instance

func (*Reaper) AddSysinfoToMeasurements

func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn)

func (*Reaper) CloseResourcesForRemovedMonitoredDBs

func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool)

func (*Reaper) CreateSourceHelpers

func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn)

CreateSourceHelpers creates the extensions and metric helpers for the monitored source

func (*Reaper) DetectConfigurationChanges

func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults

func (*Reaper) DetectIndexChanges

func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults

func (*Reaper) DetectPrivilegeChanges

func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults

func (*Reaper) DetectSprocChanges

func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults)

func (*Reaper) DetectTableChanges

func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults

func (*Reaper) FetchStatsDirectlyFromOS

func (r *Reaper) FetchStatsDirectlyFromOS(ctx context.Context, md *sources.SourceConn, metricName string) (*metrics.MeasurementEnvelope, error)

func (*Reaper) GetInstanceUpMeasurement

func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error)

GetInstanceUpMeasurement returns a single measurement with "instance_up" metric used to detect if the instance is up or down

func (*Reaper) GetMeasurementCache

func (r *Reaper) GetMeasurementCache(key string) metrics.Measurements

GetMeasurementCache returns the instance-level metric cache

func (*Reaper) GetObjectChangesMeasurement

func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error)

func (*Reaper) LoadMetrics

func (r *Reaper) LoadMetrics() (err error)

LoadMetrics loads metric definitions from the reader

func (*Reaper) LoadSources

func (r *Reaper) LoadSources(ctx context.Context) (err error)

LoadSources loads sources from the reader

func (*Reaper) PrintMemStats

func (r *Reaper) PrintMemStats()

func (*Reaper) Ready

func (r *Reaper) Ready() bool

Ready() returns true if the service is healthy and operating correctly

func (*Reaper) Reap

func (r *Reaper) Reap(ctx context.Context)

Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements from the sources and storing them to the sinks. It also manages the lifecycle of the metric gatherers. In case of a source or metric definition change, it will start or stop the gatherers accordingly.

func (*Reaper) ShutdownOldWorkers

func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool)

func (*Reaper) WriteInstanceDown

func (r *Reaper) WriteInstanceDown(name string)

WriteInstanceDown writes instance_up = 0 metric to sinks for the given source

func (*Reaper) WriteMeasurements

func (r *Reaper) WriteMeasurements(ctx context.Context)

WriteMeasurements() writes the metrics to the sinks

type SourceReaper

SourceReaper manages metric collection for a single monitored source. Instead of one goroutine per metric it runs a single GCD-based tick loop and batches SQL queries via pgx.Batch when the source is a real Postgres connection (non-pgbouncer, non-pgpool).

type SourceReaper struct {
    reaper          *Reaper
    md              *sources.SourceConn
    lastFetch       map[string]time.Time
    lastUptimeS     int64               // last seen postmaster_uptime_s for restart detection
    degradedMetrics map[string]struct{} // metrics that failed individual retry; executed via fetchMetric until they recover
}

func NewSourceReaper

func NewSourceReaper(r *Reaper, md *sources.SourceConn) *SourceReaper

NewSourceReaper creates a SourceReaper for the given source connection.

func (*SourceReaper) CollectAndDispatch

func (sr *SourceReaper) CollectAndDispatch(ctx context.Context, rows pgx.Rows, name string, metric metrics.Metric) error

CollectAndDispatch is a helper that collects rows from a pgx.Rows and dispatches them.

func (*SourceReaper) Run

func (sr *SourceReaper) Run(ctx context.Context)

Run is the main loop for a single source. It replaces N per-metric goroutines with one goroutine that batches SQL queries at GCD-aligned ticks.

func (*SourceReaper) activeMetrics

func (sr *SourceReaper) activeMetrics() map[string]time.Duration

activeMetrics returns a snapshot copy of the currently active metric intervals based on the source's recovery state. Copying under the lock prevents data races when the caller iterates after the lock is released.

func (*SourceReaper) cacheKey

func (sr *SourceReaper) cacheKey(m metrics.Metric, name string) string

cacheKey returns the instance-level cache key for the given metric.

func (*SourceReaper) calcTickInterval

func (sr *SourceReaper) calcTickInterval() time.Duration

calcTickInterval computes GCD of all metric intervals with a minimum floor.

func (*SourceReaper) detectServerRestart

func (sr *SourceReaper) detectServerRestart(ctx context.Context, data metrics.Measurements)

detectServerRestart checks for PostgreSQL server restarts via postmaster_uptime_s in db_stats metric data and emits an object_changes measurement if detected.

func (*SourceReaper) dispatchMetricData

func (sr *SourceReaper) dispatchMetricData(ctx context.Context, name string, metric metrics.Metric, data metrics.Measurements)

dispatchMetricData handles the post-fetch workflow for a collected metric: caching, sysinfo enrichment, sending, and restart detection.

func (*SourceReaper) executeBatch

func (sr *SourceReaper) executeBatch(ctx context.Context, entries []batchEntry) error

executeBatch sends all SQLs in a single pgx.Batch round-trip, dispatching each result immediately as it arrives. If any query fails, PostgreSQL's extended protocol aborts all subsequent queries in the same sync boundary (cascade failure). Any entry that returns an error from the batch is retried individually via fetchMetric to isolate real failures from cascade failures. Entries that fail even after the individual retry are marked as degraded so that subsequent runs use fetchMetric for them until they recover.

func (*SourceReaper) fetchMetric

func (sr *SourceReaper) fetchMetric(ctx context.Context, entry batchEntry) error

fetchMetric executes a single SQL query and returns the resulting measurements.

func (*SourceReaper) fetchOSMetric

func (sr *SourceReaper) fetchOSMetric(ctx context.Context, name string) error

fetchOSMetric handles gopsutil-based OS metrics.

func (*SourceReaper) fetchSpecialMetric

func (sr *SourceReaper) fetchSpecialMetric(ctx context.Context, name, storageName string) error

fetchSpecialMetric handles change_events and instance_up metrics.

func (*SourceReaper) isRoleExcluded

func (sr *SourceReaper) isRoleExcluded(m metrics.Metric) bool

isRoleExcluded returns true if the metric should be skipped based on the source's recovery state (e.g. primary-only metric on a standby).

func (*SourceReaper) runLogParser

func (sr *SourceReaper) runLogParser(ctx context.Context) error

runLogParser launches the server log event counts parser.

func (*SourceReaper) sendEnvelope

func (sr *SourceReaper) sendEnvelope(ctx context.Context, name, storageName string, data metrics.Measurements)

sendEnvelope adds sysinfo and dispatches a MeasurementEnvelope to the measurement channel.

type batchEntry

batchEntry holds the minimum info needed to execute and dispatch a metric query.

type batchEntry struct {
    name   string
    metric metrics.Metric
    sql    string
}