const promInstanceUpStateMetric = "instance_up"
timestamps older than that will be ignored on the Prom scraper side anyway, so better don't emit at all and just log a notice
const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)
var ( cacheLimit = 256 highLoadTimeout = time.Second * 5 deleterDelay = time.Hour targetColumns = [...]string{"time", "dbname", "data", "tag_data"} )
var ( forceRecreatePartitions = false // to signal override PG metrics storage cache partitionMapMetric = make(map[string]ExistingPartitionInfo) // metric = min/max bounds partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds] )
var ( metricSchemaSQLs = []string{ sqlMetricAdminSchema, sqlMetricAdminFunctions, sqlMetricEnsurePartitionPostgres, sqlMetricEnsurePartitionTimescale, sqlMetricChangeChunkIntervalTimescale, sqlMetricChangeCompressionIntervalTimescale, } )
Async Prom cache
var promAsyncMetricCache = make(map[string]map[string]metrics.MeasurementEnvelope) // [dbUnique][metric]lastly_fetched_data
var promAsyncMetricCacheLock = sync.RWMutex{}
var sqlMetricAdminFunctions string
var sqlMetricAdminSchema string
var sqlMetricChangeChunkIntervalTimescale string
var sqlMetricChangeCompressionIntervalTimescale string
var sqlMetricEnsurePartitionPostgres string
var sqlMetricEnsurePartitionTimescale string
func connectViaTLS(address, RootCA string) (*rpc.Client, error)
CmdOpts specifies the storage configuration to store metrics measurements
type CmdOpts struct { Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"` BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Sink-specific batching flush delay; may be ignored by some sinks" default:"950ms" env:"PW_BATCHING_DELAY"` Retention int `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14" env:"PW_RETENTION"` RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"` SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"` }
type DbStorageSchemaType int
const ( DbStorageSchemaPostgres DbStorageSchemaType = iota DbStorageSchemaTimescale )
type ExistingPartitionInfo struct { StartTime time.Time EndTime time.Time }
JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.
type JSONWriter struct { ctx context.Context lw *lumberjack.Logger enc *jsoniter.Encoder }
func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)
func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error
func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error
func (jw *JSONWriter) watchCtx()
type MeasurementMessagePostgres struct { Time time.Time DBName string Metric string Data map[string]any TagData map[string]string }
MetricDefiner is an interface for passing metric definitions to a sink.
type MetricsDefiner interface { DefineMetrics(metric *metrics.Metrics) error }
MultiWriter ensures the simultaneous storage of data in several storages.
type MultiWriter struct { writers []Writer sync.Mutex }
func (mw *MultiWriter) AddWriter(w Writer)
func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
func (mw *MultiWriter) SyncMetric(dbUnique, metricName string, op SyncOp) (err error)
func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)
PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
type PostgresWriter struct { ctx context.Context sinkDb db.PgxPoolIface metricSchema DbStorageSchemaType opts *CmdOpts input chan metrics.MeasurementEnvelope lastError chan error }
func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error)
func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error)
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error)
func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error)
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error
EnsureMetricTime creates special partitions if Timescale used for realtime metrics
func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error)
func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error)
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error
SyncMetric ensures that tables exist for newly added metrics and/or sources
func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error
Write sends the measurements to the cache channel
func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration)
deleteOldPartitions is a background task that deletes old partitions from the measurements DB
func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope)
flush sends the cached measurements to the database
func (pgw *PostgresWriter) maintainUniqueSources()
maintainUniqueSources is a background task that maintains a listing of unique sources for each metric. This is used to avoid listing the same source multiple times in Grafana dropdowns.
func (pgw *PostgresWriter) poll()
poll is the main loop that reads from the input channel and flushes the data to the database
PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper. Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.
type PrometheusWriter struct { logger log.Logger ctx context.Context lastScrapeErrors prometheus.Gauge totalScrapes, totalScrapeFailures prometheus.Counter PrometheusNamespace string gauges map[string]([]string) // map of metric names to their gauge names, used for Prometheus gauge metrics }
func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)
func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric
func (promw *PrometheusWriter) Println(v ...any)
func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)
func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)
func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)
func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error
func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error
RPCWriter is a sink that sends metric measurements to a remote server using the RPC protocol. Remote server should implement the Receiver interface. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.
type RPCWriter struct { ctx context.Context client *rpc.Client }
func NewRPCWriter(ctx context.Context, ConnStr string) (*RPCWriter, error)
func (rw *RPCWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error
func (rw *RPCWriter) Write(msg metrics.MeasurementEnvelope) error
Sends Measurement Message to RPC Sink
func (rw *RPCWriter) watchCtx()
type SyncOp int
const ( AddOp SyncOp = iota DeleteOp InvalidOp )
type SyncReq struct { DbName string MetricName string Operation SyncOp }
Writer is an interface that writes metrics values
type Writer interface { SyncMetric(dbUnique, metricName string, op SyncOp) error Write(msgs metrics.MeasurementEnvelope) error }
func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error)
NewSinkWriter creates and returns new instance of MultiWriter struct.
type copyFromMeasurements struct { envelopes []metrics.MeasurementEnvelope envelopeIdx int measurementIdx int // index of the current measurement in the envelope metricName string }
func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements
func (c *copyFromMeasurements) EOF() bool
func (c *copyFromMeasurements) Err() error
func (c *copyFromMeasurements) MetricName() pgx.Identifier
func (c *copyFromMeasurements) Next() bool
func (c *copyFromMeasurements) Values() ([]any, error)