const (
metricCPULoad = "cpu_load"
metricPsutilCPU = "psutil_cpu"
metricPsutilDisk = "psutil_disk"
metricPsutilDiskIoTotal = "psutil_disk_io_total"
metricPsutilMem = "psutil_mem"
)
const (
specialMetricChangeEvents = "change_events"
specialMetricServerLogEventCounts = "server_log_event_counts"
specialMetricInstanceUp = "instance_up"
)
const csvLogDefaultGlobSuffix = "*.csv"
const csvLogDefaultRegEx = `^^(?P<log_time>.*?),"?(?P<user_name>.*?)"?,"?(?P<database_name>.*?)"?,(?P<process_id>\d+),"?(?P<connection_from>.*?)"?,(?P<session_id>.*?),(?P<session_line_num>\d+),"?(?P<command_tag>.*?)"?,(?P<session_start_time>.*?),(?P<virtual_transaction_id>.*?),(?P<transaction_id>.*?),(?P<error_severity>\w+),`
const dbMetricJoinStr = "¤¤¤" // just some unlikely string for a DB name to avoid using maps of maps for DB+metric data
const maxChunkSize uint64 = 10 * 1024 * 1024 // 10 MB
const maxTrackedFiles = 2500
const sqlPgDirs = `select name, path from
(values
('data_directory', current_setting('data_directory')),
('pg_wal', current_setting('data_directory')||'/pg_wal'),
('log_directory', case
when current_setting('log_directory') ~ '^(\w:)?\/.+' then current_setting('log_directory')
else current_setting('data_directory') || '/' || current_setting('log_directory')
end)) as d(name, path)
union all
select spcname::text, pg_catalog.pg_tablespace_location(oid)
from pg_catalog.pg_tablespace where spcname !~ 'pg_.+'`
var directlyFetchableOSMetrics = []string{metricPsutilCPU, metricPsutilDisk, metricPsutilDiskIoTotal, metricPsutilMem, metricCPULoad}
var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
var lastSQLFetchError sync.Map
var metricDefs = NewConcurrentMetricDefs()
var metricsConfig metrics.MetricIntervals // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
Constants and types
var pgSeverities = [...]string{"DEBUG", "INFO", "NOTICE", "WARNING", "ERROR", "LOG", "FATAL", "PANIC"}
var pgSeveritiesLocale = map[string]map[string]string{ "C.": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "WARNING": "WARNING", "ERROR": "ERROR", "FATAL": "FATAL", "PANIC": "PANIC"}, "de": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "HINWEIS": "NOTICE", "WARNUNG": "WARNING", "FEHLER": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"}, "fr": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTICE": "NOTICE", "ATTENTION": "WARNING", "ERREUR": "ERROR", "FATAL": "FATAL", "PANIK": "PANIC"}, "it": {"DEBUG": "DEBUG", "LOG": "LOG", "INFO": "INFO", "NOTIFICA": "NOTICE", "ATTENZIONE": "WARNING", "ERRORE": "ERROR", "FATALE": "FATAL", "PANICO": "PANIC"}, "ko": {"디버그": "DEBUG", "로그": "LOG", "정보": "INFO", "알림": "NOTICE", "경고": "WARNING", "오류": "ERROR", "치명적오류": "FATAL", "손상": "PANIC"}, "pl": {"DEBUG": "DEBUG", "DZIENNIK": "LOG", "INFORMACJA": "INFO", "UWAGA": "NOTICE", "OSTRZEŻENIE": "WARNING", "BŁĄD": "ERROR", "KATASTROFALNY": "FATAL", "PANIKA": "PANIC"}, "ru": {"ОТЛАДКА": "DEBUG", "СООБЩЕНИЕ": "LOG", "ИНФОРМАЦИЯ": "INFO", "ЗАМЕЧАНИЕ": "NOTICE", "ПРЕДУПРЕЖДЕНИЕ": "WARNING", "ОШИБКА": "ERROR", "ВАЖНО": "FATAL", "ПАНИКА": "PANIC"}, "sv": {"DEBUG": "DEBUG", "LOGG": "LOG", "INFO": "INFO", "NOTIS": "NOTICE", "VARNING": "WARNING", "FEL": "ERROR", "FATALT": "FATAL", "PANIK": "PANIC"}, "tr": {"DEBUG": "DEBUG", "LOG": "LOG", "BİLGİ": "INFO", "NOT": "NOTICE", "UYARI": "WARNING", "HATA": "ERROR", "ÖLÜMCÜL (FATAL)": "FATAL", "KRİTİK": "PANIC"}, "zh": {"调试": "DEBUG", "日志": "LOG", "信息": "INFO", "注意": "NOTICE", "警告": "WARNING", "错误": "ERROR", "致命错误": "FATAL", "比致命错误还过分的错误": "PANIC"}, }
var prevCPULoadTimeStats cpu.TimesStat
"cache" of last CPU utilization stats for GetGoPsutilCPU to get more exact results and not having to sleep
var prevCPULoadTimeStatsLock sync.RWMutex
var prevCPULoadTimestamp time.Time
var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
func CheckFolderExistsAndReadable(path string) bool
func DoesEmergencyTriggerfileExist(fname string) bool
func GetGoPsutilCPU(interval time.Duration) (metrics.Measurements, error)
GetGoPsutilCPU simulates "psutil" metric output. Assumes the result from last call as input
func GetGoPsutilDiskPG(pgDirs metrics.Measurements) (metrics.Measurements, error)
func GetGoPsutilDiskTotals() (metrics.Measurements, error)
func GetGoPsutilMem() (metrics.Measurements, error)
func GetLoadAvgLocal() (metrics.Measurements, error)
func GetPathUnderlyingDeviceID(path string) (uint64, error)
func IsDirectlyFetchableMetric(md *sources.SourceConn, metric string) bool
func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error)
func checkHasLocalPrivileges(logsDirPath string) error
func checkHasRemotePrivileges(ctx context.Context, mdb *sources.SourceConn, logsDirPath string) error
func cpuTotal(c cpu.TimesStat) float64
cpuTotal returns the total number of seconds across all CPU states. Guest and GuestNice are intentionally excluded because on Linux they are already counted within User and Nice respectively (/proc/stat semantics), so including them would double-count and skew percentage calculations.
func getFileWithLatestTimestamp(files []string) (string, error)
func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error)
func goPsutilCalcCPUUtilization(probe0, probe1 cpu.TimesStat) float64
func init()
func severityToEnglish(serverLang, errorSeverity string) string
func zeroEventCounts(eventCounts map[string]int64)
type ChangeDetectionResults struct {
Target string
Created int
Altered int
Dropped int
}
func (cdr *ChangeDetectionResults) String() string
func (cdr *ChangeDetectionResults) Total() int
type ConcurrentMetricDefs struct {
*metrics.Metrics
sync.RWMutex
}
func NewConcurrentMetricDefs() *ConcurrentMetricDefs
func (cmd *ConcurrentMetricDefs) Assign(newDefs *metrics.Metrics)
func (cmd *ConcurrentMetricDefs) GetMetricDef(name string) (m metrics.Metric, ok bool)
func (cmd *ConcurrentMetricDefs) GetPresetDef(name string) (m metrics.Preset, ok bool)
func (cmd *ConcurrentMetricDefs) GetPresetMetrics(name string) (m metrics.MetricIntervals)
type ExistingPartitionInfo struct {
StartTime time.Time
EndTime time.Time
}
type InstanceMetricCache struct {
cache map[string](metrics.Measurements) // [dbUnique+metric]lastly_fetched_data
sync.RWMutex
}
func NewInstanceMetricCache() *InstanceMetricCache
func (imc *InstanceMetricCache) Get(key string, age time.Duration) metrics.Measurements
func (imc *InstanceMetricCache) Put(key string, data metrics.Measurements)
type LogConfig struct {
CollectorEnabled bool
CSVDestination bool
TruncateOnRotation bool
Directory string
ServerMessagesLang string
}
func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (cfg *LogConfig, err error)
type LogParser struct {
*LogConfig
ctx context.Context
LogsMatchRegex *regexp.Regexp
SourceConn *sources.SourceConn
Interval time.Duration
StoreCh chan<- metrics.MeasurementEnvelope
eventCounts map[string]int64 // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
eventCountsTotal map[string]int64 // for the whole instance
lastSendTime time.Time
fileOffsets map[string]uint64 // map of log file paths to last read offsets
}
func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (lp *LogParser, err error)
func (lp *LogParser) GetMeasurementEnvelope() metrics.MeasurementEnvelope
GetMeasurementEnvelope converts current event counts to a MeasurementEnvelope
func (lp *LogParser) HasSendIntervalElapsed() bool
func (lp *LogParser) ParseLogs() error
func (lp *LogParser) parseLogsLocal() error
func (lp *LogParser) parseLogsRemote() error
func (lp *LogParser) regexMatchesToMap(matches []string) map[string]string
Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
type Reaper struct {
*cmdopts.Options
ready atomic.Bool
measurementCh chan metrics.MeasurementEnvelope
measurementCache *InstanceMetricCache
logger log.Logger
monitoredSources sources.SourceConns
prevLoopMonitoredDBs sources.SourceConns
cancelFuncs map[string]context.CancelFunc
}
func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper)
NewReaper creates a new Reaper instance
func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn)
func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool)
func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn)
CreateSourceHelpers creates the extensions and metric helpers for the monitored source
func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults)
func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults
func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string) (_ *metrics.MeasurementEnvelope, err error)
func (r *Reaper) FetchStatsDirectlyFromOS(ctx context.Context, md *sources.SourceConn, metricName string) (*metrics.MeasurementEnvelope, error)
func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error)
GetInstanceUpMeasurement returns a single measurement with "instance_up" metric used to detect if the instance is up or down
func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error)
func (r *Reaper) LoadMetrics() (err error)
LoadMetrics loads metric definitions from the reader
func (r *Reaper) LoadSources(ctx context.Context) (err error)
LoadSources loads sources from the reader
func (r *Reaper) PrintMemStats()
func (r *Reaper) Ready() bool
Ready() returns true if the service is healthy and operating correctly
func (r *Reaper) Reap(ctx context.Context)
Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements from the sources and storing them to the sinks. It also manages the lifecycle of the metric gatherers. In case of a source or metric definition change, it will start or stop the gatherers accordingly.
func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool)
func (r *Reaper) WriteInstanceDown(md *sources.SourceConn)
WriteInstanceDown writes instance_up = 0 metric to sinks for the given source
func (r *Reaper) WriteMeasurements(ctx context.Context)
WriteMeasurements() writes the metrics to the sinks
func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string)