const ( epochColumnName string = "epoch_ns" // this column (epoch in nanoseconds) is expected in every metric query tagPrefix string = "tag_" )
const promInstanceUpStateMetric = "instance_up"
timestamps older than that will be ignored on the Prom scraper side anyway, so better don't emit at all and just log a notice
const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)
const specialMetricPgbouncer = "^pgbouncer_(stats|pools)$"
var ( cacheLimit = 512 highLoadTimeout = time.Second * 5 deleterDelay = time.Hour )
var ( regexIsPgbouncerMetrics = regexp.MustCompile(specialMetricPgbouncer) forceRecreatePGMetricPartitions = false // to signal override PG metrics storage cache partitionMapMetric = make(map[string]ExistingPartitionInfo) // metric = min/max bounds partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds] )
var ( metricSchemaSQLs = []string{ sqlMetricAdminSchema, sqlMetricAdminFunctions, sqlMetricEnsurePartitionPostgres, sqlMetricEnsurePartitionTimescale, sqlMetricChangeChunkIntervalTimescale, sqlMetricChangeCompressionIntervalTimescale, } )
Async Prom cache
var promAsyncMetricCache = make(map[string]map[string][]metrics.MeasurementEnvelope) // [dbUnique][metric]lastly_fetched_data
var promAsyncMetricCacheLock = sync.RWMutex{}
var sqlMetricAdminFunctions string
var sqlMetricAdminSchema string
var sqlMetricChangeChunkIntervalTimescale string
var sqlMetricChangeCompressionIntervalTimescale string
var sqlMetricEnsurePartitionPostgres string
var sqlMetricEnsurePartitionTimescale string
CmdOpts specifies the storage configuration to store metrics measurements
type CmdOpts struct { Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"` BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Max milliseconds to wait for a batched metrics flush. [Default: 250ms]" default:"250ms" env:"PW_BATCHING_MAX_DELAY"` Retention int `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14" env:"PW_RETENTION"` RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"` SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"` }
type DbStorageSchemaType int
const ( DbStorageSchemaPostgres DbStorageSchemaType = iota DbStorageSchemaTimescale )
type ExistingPartitionInfo struct { StartTime time.Time EndTime time.Time }
JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.
type JSONWriter struct { ctx context.Context lw *lumberjack.Logger }
func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)
func (jw *JSONWriter) SyncMetric(_, _, _ string) error
func (jw *JSONWriter) Write(msgs []metrics.MeasurementEnvelope) error
func (jw *JSONWriter) watchCtx()
type MeasurementMessagePostgres struct { Time time.Time DBName string Metric string Data map[string]any TagData map[string]any }
MultiWriter ensures the simultaneous storage of data in several storages.
type MultiWriter struct { writers []Writer sync.Mutex }
func NewMultiWriter(ctx context.Context, opts *CmdOpts, metricDefs *metrics.Metrics) (mw *MultiWriter, err error)
NewMultiWriter creates and returns new instance of MultiWriter struct.
func (mw *MultiWriter) AddWriter(w Writer)
func (mw *MultiWriter) SyncMetrics(dbUnique, metricName, op string) (err error)
func (mw *MultiWriter) WriteMeasurements(ctx context.Context, storageCh <-chan []metrics.MeasurementEnvelope)
PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
type PostgresWriter struct { ctx context.Context sinkDb db.PgxPoolIface metricSchema DbStorageSchemaType metricDefs *metrics.Metrics opts *CmdOpts input chan []metrics.MeasurementEnvelope lastError chan error }
func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error)
func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error)
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error)
func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error)
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error
EnsureMetricTime creates special partitions if Timescale used for realtime metrics
func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error)
func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error)
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error
SyncMetric ensures that tables exist for newly added metrics and/or sources
func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementEnvelope) error
Write send the measurements to the cache channel
func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration)
deleteOldPartitions is a background task that deletes old partitions from the measurements DB
func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope)
flush sends the cached measurements to the database
func (pgw *PostgresWriter) maintainUniqueSources()
maintainUniqueSources is a background task that maintains a listing of unique sources for each metric. This is used to avoid listing the same source multiple times in Grafana dropdowns.
func (pgw *PostgresWriter) poll()
poll is the main loop that reads from the input channel and flushes the data to the database
PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper. Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.
type PrometheusWriter struct { ctx context.Context lastScrapeErrors prometheus.Gauge totalScrapes, totalScrapeFailures prometheus.Counter PrometheusNamespace string }
func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)
func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric
func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr []metrics.MeasurementEnvelope)
func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)
func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)
func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName, op string) error
func (promw *PrometheusWriter) Write(msgs []metrics.MeasurementEnvelope) error
func (promw *PrometheusWriter) setInstanceUpDownState(ch chan<- prometheus.Metric, dbName string)
RPCWriter is a sink that sends metric measurements to a remote server using the RPC protocol. Remote server should implement the Receiver interface. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.
type RPCWriter struct { ctx context.Context address string client *rpc.Client }
func NewRPCWriter(ctx context.Context, address string) (*RPCWriter, error)
func (rw *RPCWriter) SyncMetric(dbUnique string, metricName string, op string) error
func (rw *RPCWriter) Write(msgs []metrics.MeasurementEnvelope) error
Sends Measurement Message to RPC Sink
func (rw *RPCWriter) watchCtx()
type SyncReq struct { DbName string MetricName string Operation string }
Writer is an interface that writes metrics values
type Writer interface { SyncMetric(dbUnique, metricName, op string) error Write(msgs []metrics.MeasurementEnvelope) error }