MigrationsCount is the total number of migrations in admin.migration table
const MigrationsCount = 1
timestamps older than that will be ignored on the Prom scraper side anyway, so better don't emit at all and just log a notice
const promCacheTTL = time.Minute * time.Duration(10)
const promInstanceUpStateMetric = "instance_up"
var (
cacheLimit = 256
highLoadTimeout = time.Second * 5
targetColumns = [...]string{"time", "dbname", "data", "tag_data"}
)
var ErrNeedsMigration = errors.New("sink database schema is outdated, please run migrations using `pgwatch config upgrade` command")
make sure *dbMetricReaderWriter implements the Migrator interface
var _ db.Migrator = (*PostgresWriter)(nil)
var initMigrator = func(pgw *PostgresWriter) (*migrator.Migrator, error) { return migrator.New( migrator.TableName("admin.migration"), migrator.SetNotice(func(s string) { log.GetLogger(pgw.ctx).Info(s) }), migrations(), ) }
var (
metricSchemaSQLs = []string{
sqlMetricAdminSchema,
sqlMetricAdminFunctions,
sqlMetricEnsurePartitionPostgres,
sqlMetricEnsurePartitionTimescale,
sqlMetricChangeChunkIntervalTimescale,
sqlMetricChangeCompressionIntervalTimescale,
}
)
migrations holds function returning all upgrade migrations needed
var migrations func() migrator.Option = func() migrator.Option { return migrator.Migrations( &migrator.Migration{ Name: "01110 Apply postgres sink schema migrations", Func: func(context.Context, pgx.Tx) error { return nil }, }, &migrator.Migration{ Name: "01180 Apply admin functions migrations for v5", Func: func(ctx context.Context, tx pgx.Tx) error { _, err := tx.Exec(ctx, ` DROP FUNCTION IF EXISTS admin.ensure_partition_metric_dbname_time; DROP FUNCTION IF EXISTS admin.ensure_partition_metric_time; DROP FUNCTION IF EXISTS admin.get_old_time_partitions(integer, text); DROP FUNCTION IF EXISTS admin.drop_old_time_partitions(integer, boolean, text); `) if err != nil { return err } _, err = tx.Exec(ctx, sqlMetricEnsurePartitionPostgres) if err != nil { return err } _, err = tx.Exec(ctx, sqlMetricAdminFunctions) return err }, }, ) }
var notSupportedMetrics = map[string]struct{}{ "change_events": {}, "pgbouncer_stats": {}, "pgbouncer_clients": {}, "pgpool_processes": {}, "pgpool_stats": {}, }
var sqlMetricAdminFunctions string
var sqlMetricAdminSchema string
var sqlMetricChangeChunkIntervalTimescale string
var sqlMetricChangeCompressionIntervalTimescale string
var sqlMetricEnsurePartitionPostgres string
var sqlMetricEnsurePartitionTimescale string
func LoadTLSCredentials(CAFile string) (credentials.TransportCredentials, error)
func NewPostgresSinkMigrator(ctx context.Context, connStr string) (db.Migrator, error)
func convertSyncOp(op SyncOp) pb.SyncOp
convertSyncOp converts sinks.SyncOp to pb.SyncOp
CmdOpts specifies the storage configuration to store metrics measurements
type CmdOpts struct {
Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Sink-specific batching flush delay; may be ignored by some sinks" default:"950ms" env:"PW_BATCHING_DELAY"`
PartitionInterval string `long:"partition-interval" mapstructure:"partition-interval" description:"Time range for PostgreSQL sink time partitions. Must be a valid PostgreSQL interval." default:"1 week" env:"PW_PARTITION_INTERVAL"`
RetentionInterval string `long:"retention" mapstructure:"retention" description:"Delete metrics older than this. Must be a valid PostgreSQL interval." default:"14 days" env:"PW_RETENTION"`
MaintenanceInterval string `long:"maintenance-interval" mapstructure:"maintenance-interval" description:"Run pgwatch maintenance tasks on sinks with this interval e.g., deleting old metrics; Set to zero to disable. Must be a valid PostgreSQL interval." default:"12 hours" env:"PW_MAINTENANCE_INTERVAL"`
RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"`
SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"`
}
type DbStorageSchemaType int
const (
DbStorageSchemaPostgres DbStorageSchemaType = iota
DbStorageSchemaTimescale
)
type ExistingPartitionInfo struct {
StartTime time.Time
EndTime time.Time
}
JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.
type JSONWriter struct {
ctx context.Context
lw *lumberjack.Logger
enc *jsoniter.Encoder
}
func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)
func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error
func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error
func (jw *JSONWriter) watchCtx()
type MeasurementMessagePostgres struct {
Time time.Time
DBName string
Metric string
Data map[string]any
TagData map[string]string
}
MetricDefiner is an interface for passing metric definitions to a sink.
type MetricsDefiner interface {
DefineMetrics(metrics *metrics.Metrics) error
}
MultiWriter ensures the simultaneous storage of data in several storages.
type MultiWriter struct {
writers []Writer
sync.Mutex
}
func (mw *MultiWriter) AddWriter(w Writer)
func (mw *MultiWriter) Count() int
func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
func (mw *MultiWriter) Migrate() (err error)
Migrate runs migrations on all writers that support it
func (mw *MultiWriter) NeedsMigration() (bool, error)
NeedsMigration checks if any writer needs migration
func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error)
func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)
PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
type PostgresWriter struct {
ctx context.Context
sinkDb db.PgxPoolIface
metricSchema DbStorageSchemaType
opts *CmdOpts
retentionInterval time.Duration
maintenanceInterval time.Duration
input chan metrics.MeasurementEnvelope
lastError chan error
forceRecreatePartitions bool // to signal override PG metrics storage cache
partitionMapMetric map[string]ExistingPartitionInfo // metric = min/max bounds
partitionMapMetricDbname map[string]map[string]ExistingPartitionInfo // metric[dbname = min/max bounds]
}
func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error)
func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error)
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
func (pgw *PostgresWriter) DeleteOldPartitions()
DeleteOldPartitions is a background task that deletes old partitions from the measurements DB
func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error)
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error)
func (pgw *PostgresWriter) MaintainUniqueSources()
MaintainUniqueSources is a background task that maintains a mapping of unique sources in each metric table in admin.all_distinct_dbname_metrics. This is used to avoid listing the same source multiple times in Grafana dropdowns.
func (pgw *PostgresWriter) Migrate() error
Migrate upgrades database with all migrations
func (pgw *PostgresWriter) NeedsMigration() (bool, error)
NeedsMigration checks if database needs migration
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
SyncMetric ensures that tables exist for newly added metrics and/or sources
func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error
Write sends the measurements to the cache channel
func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope)
flush sends the cached measurements to the database
func (pgw *PostgresWriter) init() (err error)
func (pgw *PostgresWriter) poll()
poll is the main loop that reads from the input channel and flushes the data to the database
func (pgw *PostgresWriter) scheduleJob(interval time.Duration, job func())
type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope // [dbUnique][metric]lastly_fetched_data
PrometheusWriter is a Prometheus exporter that implements the prometheus.Collector interface using the "unchecked collector" pattern (empty Describe method).
Design decisions based on Prometheus exporter guidelines (https://prometheus.io/docs/instrumenting/writing_exporters/#collectors):
Metrics are collected periodically by reaper and cached in-memory. On scrape, the collector reads a snapshot of the cache and emits fresh NewConstMetric values. The cache is NOT consumed on scrape, so parallel or back-to-back scrapes see the same data until the next Write() updates arrive.
This is an "unchecked collector": Describe() sends no descriptors, which tells the Prometheus registry to skip consistency checks. This is necessary because the set of metrics is dynamic (driven by monitored databases and their query results). Safety is ensured by deduplicating metric identities within each Collect() call.
Label keys are always sorted lexicographically before building descriptors and label value slices. This guarantees deterministic descriptor identity regardless of Go map iteration order.
type PrometheusWriter struct {
sync.RWMutex
logger log.Logger
ctx context.Context
gauges map[string]([]string) // map of metric names to their gauge column names
Namespace string
Cache PromMetricCache // [dbUnique][metric]lastly_fetched_data
// Self-instrumentation metrics
lastScrapeErrors prometheus.Gauge
totalScrapes prometheus.Counter
totalScrapeFailures prometheus.Counter
}
func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)
func (promw *PrometheusWriter) AddCacheEntry(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)
func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
Collect implements prometheus.Collector. It reads a snapshot of the metric cache and emits const metrics. Parallel scrapes see the same data until background Write() calls update it
func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
DefineMetrics is called by reaper on startup and whenever metric definitions change
func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
Describe is intentionally empty to makes PrometheusWriter an "unchecked collector" per the prometheus.Collector contract
func (promw *PrometheusWriter) InitCacheEntry(dbUnique string)
func (promw *PrometheusWriter) Println(v ...any)
Println implements promhttp.Logger
func (promw *PrometheusWriter) PurgeCacheEntry(dbUnique, metric string)
func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
SyncMetric is called by reaper when a metric or monitored source is removed or added, allowing the writer to purge or initialize cache entries as needed
func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error
Write is called by reaper whenever new measurement data arrives
func (promw *PrometheusWriter) WritePromMetrics(msg metrics.MeasurementEnvelope, ch chan<- prometheus.Metric) (written int, errorCount int)
WritePromMetrics converts a MeasurementEnvelope into Prometheus const metrics and sends them directly to ch. Returns the count of metrics written and errors encountered.
func (promw *PrometheusWriter) snapshotCache() PromMetricCache
snapshotCache creates a shallow copy of the cache map hierarchy. Must be called under at least promw.RLock(). The MeasurementEnvelope values are not deep-copied because writers always replace entire envelopes (never mutate them in place).
RPCWriter sends metric measurements to a remote server using gRPC. Remote servers should make use the .proto file under api/pb/ to integrate with it. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.
type RPCWriter struct {
ctx context.Context
conn *grpc.ClientConn
client pb.ReceiverClient
}
func NewRPCWriter(ctx context.Context, connStr string) (*RPCWriter, error)
func (rw *RPCWriter) DefineMetrics(metrics *metrics.Metrics) error
DefineMetrics sends metric definitions to the remote server
func (rw *RPCWriter) Ping() error
func (rw *RPCWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
SyncMetric synchronizes a metric and monitored source with the remote server
func (rw *RPCWriter) Write(msg metrics.MeasurementEnvelope) error
Sends Measurement Message to RPC Sink
func (rw *RPCWriter) watchCtx()
SyncOp represents synchronization operations for metrics. These constants are used both in Go code and protobuf definitions.
type SyncOp int32
const (
// InvalidOp represents an invalid or unrecognized operation
InvalidOp SyncOp = iota // 0
// AddOp represents adding a new metric
AddOp // 1
// DeleteOp represents deleting an existing metric or entire source
DeleteOp // 2
// DefineOp represents defining metric definitions
DefineOp // 3
)
func (s SyncOp) String() string
String returns the string representation of the SyncOp
Writer is an interface that writes metrics values
type Writer interface {
SyncMetric(sourceName, metricName string, op SyncOp) error
Write(msgs metrics.MeasurementEnvelope) error
}
func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error)
NewSinkWriter creates and returns new instance of MultiWriter struct.
type copyFromMeasurements struct {
envelopes []metrics.MeasurementEnvelope
envelopeIdx int
measurementIdx int // index of the current measurement in the envelope
metricName string
}
func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements
func (c *copyFromMeasurements) EOF() bool
func (c *copyFromMeasurements) Err() error
func (c *copyFromMeasurements) MetricName() (ident pgx.Identifier)
func (c *copyFromMeasurements) Next() bool
func (c *copyFromMeasurements) NextEnvelope() bool
func (c *copyFromMeasurements) Values() ([]any, error)