...

Package sinks

import "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
Overview
Index

Overview ▾

Package sinks provides functionality to store monitored data in different ways.

At the moment we provide sink connectors for

  • PostgreSQL and flavours,
  • Prometheus,
  • plain JSON files,
  • and RPC servers.

To ensure the simultaneous storage of data in several storages, the `MultiWriter` class is implemented.

Index ▾

Constants
Variables
type CmdOpts
type DbStorageSchemaType
type ExistingPartitionInfo
type JSONWriter
    func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)
    func (jw *JSONWriter) SyncMetric(_, _, _ string) error
    func (jw *JSONWriter) Write(msgs []metrics.MeasurementEnvelope) error
    func (jw *JSONWriter) watchCtx()
type MeasurementMessagePostgres
type MultiWriter
    func NewMultiWriter(ctx context.Context, opts *CmdOpts, metricDefs *metrics.Metrics) (mw *MultiWriter, err error)
    func (mw *MultiWriter) AddWriter(w Writer)
    func (mw *MultiWriter) SyncMetrics(dbUnique, metricName, op string) (err error)
    func (mw *MultiWriter) WriteMeasurements(ctx context.Context, storageCh <-chan []metrics.MeasurementEnvelope)
type PostgresWriter
    func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error)
    func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error)
    func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
    func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error)
    func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
    func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error)
    func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
    func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error
    func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error)
    func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error)
    func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
    func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error
    func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementEnvelope) error
    func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration)
    func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope)
    func (pgw *PostgresWriter) maintainUniqueSources()
    func (pgw *PostgresWriter) poll()
type PrometheusWriter
    func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)
    func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
    func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
    func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric
    func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr []metrics.MeasurementEnvelope)
    func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)
    func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)
    func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName, op string) error
    func (promw *PrometheusWriter) Write(msgs []metrics.MeasurementEnvelope) error
    func (promw *PrometheusWriter) setInstanceUpDownState(ch chan<- prometheus.Metric, dbName string)
type RPCWriter
    func NewRPCWriter(ctx context.Context, address string) (*RPCWriter, error)
    func (rw *RPCWriter) SyncMetric(dbUnique string, metricName string, op string) error
    func (rw *RPCWriter) Write(msgs []metrics.MeasurementEnvelope) error
    func (rw *RPCWriter) watchCtx()
type SyncReq
type Writer

Package files

cmdopts.go doc.go json.go multiwriter.go postgres.go prometheus.go rpc.go

Constants

const (
    epochColumnName string = "epoch_ns" // this column (epoch in nanoseconds) is expected in every metric query
    tagPrefix       string = "tag_"
)
const promInstanceUpStateMetric = "instance_up"

timestamps older than that will be ignored on the Prom scraper side anyway, so better don't emit at all and just log a notice

const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)
const specialMetricPgbouncer = "^pgbouncer_(stats|pools)$"

Variables

var (
    cacheLimit      = 512
    highLoadTimeout = time.Second * 5
    deleterDelay    = time.Hour
)
var (
    regexIsPgbouncerMetrics         = regexp.MustCompile(specialMetricPgbouncer)
    forceRecreatePGMetricPartitions = false                                             // to signal override PG metrics storage cache
    partitionMapMetric              = make(map[string]ExistingPartitionInfo)            // metric = min/max bounds
    partitionMapMetricDbname        = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
)
var (
    metricSchemaSQLs = []string{
        sqlMetricAdminSchema,
        sqlMetricAdminFunctions,
        sqlMetricEnsurePartitionPostgres,
        sqlMetricEnsurePartitionTimescale,
        sqlMetricChangeChunkIntervalTimescale,
        sqlMetricChangeCompressionIntervalTimescale,
    }
)

Async Prom cache

var promAsyncMetricCache = make(map[string]map[string][]metrics.MeasurementEnvelope) // [dbUnique][metric]lastly_fetched_data
var promAsyncMetricCacheLock = sync.RWMutex{}
var sqlMetricAdminFunctions string
var sqlMetricAdminSchema string
var sqlMetricChangeChunkIntervalTimescale string
var sqlMetricChangeCompressionIntervalTimescale string
var sqlMetricEnsurePartitionPostgres string
var sqlMetricEnsurePartitionTimescale string

type CmdOpts

CmdOpts specifies the storage configuration to store metrics measurements

type CmdOpts struct {
    Sinks                 []string      `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
    BatchingDelay         time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Max milliseconds to wait for a batched metrics flush. [Default: 250ms]" default:"250ms" env:"PW_BATCHING_MAX_DELAY"`
    Retention             int           `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14" env:"PW_RETENTION"`
    RealDbnameField       string        `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"`
    SystemIdentifierField string        `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"`
}

type DbStorageSchemaType

type DbStorageSchemaType int
const (
    DbStorageSchemaPostgres DbStorageSchemaType = iota
    DbStorageSchemaTimescale
)

type ExistingPartitionInfo

type ExistingPartitionInfo struct {
    StartTime time.Time
    EndTime   time.Time
}

type JSONWriter

JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.

type JSONWriter struct {
    ctx context.Context
    lw  *lumberjack.Logger
}

func NewJSONWriter

func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)

func (*JSONWriter) SyncMetric

func (jw *JSONWriter) SyncMetric(_, _, _ string) error

func (*JSONWriter) Write

func (jw *JSONWriter) Write(msgs []metrics.MeasurementEnvelope) error

func (*JSONWriter) watchCtx

func (jw *JSONWriter) watchCtx()

type MeasurementMessagePostgres

type MeasurementMessagePostgres struct {
    Time    time.Time
    DBName  string
    Metric  string
    Data    map[string]any
    TagData map[string]any
}

type MultiWriter

MultiWriter ensures the simultaneous storage of data in several storages.

type MultiWriter struct {
    writers []Writer
    sync.Mutex
}

func NewMultiWriter

func NewMultiWriter(ctx context.Context, opts *CmdOpts, metricDefs *metrics.Metrics) (mw *MultiWriter, err error)

NewMultiWriter creates and returns new instance of MultiWriter struct.

func (*MultiWriter) AddWriter

func (mw *MultiWriter) AddWriter(w Writer)

func (*MultiWriter) SyncMetrics

func (mw *MultiWriter) SyncMetrics(dbUnique, metricName, op string) (err error)

func (*MultiWriter) WriteMeasurements

func (mw *MultiWriter) WriteMeasurements(ctx context.Context, storageCh <-chan []metrics.MeasurementEnvelope)

type PostgresWriter

PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.

type PostgresWriter struct {
    ctx          context.Context
    sinkDb       db.PgxPoolIface
    metricSchema DbStorageSchemaType
    metricDefs   *metrics.Metrics
    opts         *CmdOpts
    input        chan []metrics.MeasurementEnvelope
    lastError    chan error
}

func NewPostgresWriter

func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error)

func NewWriterFromPostgresConn

func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error)

func (*PostgresWriter) AddDBUniqueMetricToListingTable

func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error

func (*PostgresWriter) DropOldTimePartitions

func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error)

func (*PostgresWriter) EnsureBuiltinMetricDummies

func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)

EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist

func (*PostgresWriter) EnsureMetricDbnameTime

func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error)

func (*PostgresWriter) EnsureMetricDummy

func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)

EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist

func (*PostgresWriter) EnsureMetricTime

func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error

EnsureMetricTime creates special partitions if Timescale used for realtime metrics

func (*PostgresWriter) EnsureMetricTimescale

func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error)

func (*PostgresWriter) GetOldTimePartitions

func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error)

func (*PostgresWriter) ReadMetricSchemaType

func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)

func (*PostgresWriter) SyncMetric

func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error

SyncMetric ensures that tables exist for newly added metrics and/or sources

func (*PostgresWriter) Write

func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementEnvelope) error

Write sends the measurements to the cache channel

func (*PostgresWriter) deleteOldPartitions

func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration)

deleteOldPartitions is a background task that deletes old partitions from the measurements DB

func (*PostgresWriter) flush

func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope)

flush sends the cached measurements to the database

func (*PostgresWriter) maintainUniqueSources

func (pgw *PostgresWriter) maintainUniqueSources()

maintainUniqueSources is a background task that maintains a listing of unique sources for each metric. This is used to avoid listing the same source multiple times in Grafana dropdowns.

func (*PostgresWriter) poll

func (pgw *PostgresWriter) poll()

poll is the main loop that reads from the input channel and flushes the data to the database

type PrometheusWriter

PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper. Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.

type PrometheusWriter struct {
    ctx                               context.Context
    lastScrapeErrors                  prometheus.Gauge
    totalScrapes, totalScrapeFailures prometheus.Counter
    PrometheusNamespace               string
}

func NewPrometheusWriter

func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)

func (*PrometheusWriter) Collect

func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)

func (*PrometheusWriter) Describe

func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)

func (*PrometheusWriter) MetricStoreMessageToPromMetrics

func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric

func (*PrometheusWriter) PromAsyncCacheAddMetricData

func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr []metrics.MeasurementEnvelope)

func (*PrometheusWriter) PromAsyncCacheInitIfRequired

func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)

func (*PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny

func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)

func (*PrometheusWriter) SyncMetric

func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName, op string) error

func (*PrometheusWriter) Write

func (promw *PrometheusWriter) Write(msgs []metrics.MeasurementEnvelope) error

func (*PrometheusWriter) setInstanceUpDownState

func (promw *PrometheusWriter) setInstanceUpDownState(ch chan<- prometheus.Metric, dbName string)

type RPCWriter

RPCWriter is a sink that sends metric measurements to a remote server using the RPC protocol. Remote server should implement the Receiver interface. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.

type RPCWriter struct {
    ctx     context.Context
    address string
    client  *rpc.Client
}

func NewRPCWriter

func NewRPCWriter(ctx context.Context, address string) (*RPCWriter, error)

func (*RPCWriter) SyncMetric

func (rw *RPCWriter) SyncMetric(dbUnique string, metricName string, op string) error

func (*RPCWriter) Write

func (rw *RPCWriter) Write(msgs []metrics.MeasurementEnvelope) error

Sends Measurement Message to RPC Sink

func (*RPCWriter) watchCtx

func (rw *RPCWriter) watchCtx()

type SyncReq

type SyncReq struct {
    DbName     string
    MetricName string
    Operation  string
}

type Writer

Writer is an interface that writes metrics values

type Writer interface {
    SyncMetric(dbUnique, metricName, op string) error
    Write(msgs []metrics.MeasurementEnvelope) error
}