...

Package sinks

import "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
Overview
Index

Overview ▾

Package sinks rovides functionality to store monitored data in different ways.

At the moment we provide sink connectors for

  • PostgreSQL and flavours,
  • Prometheus,
  • plain JSON files,
  • and RPC servers.

To ensure the simultaneous storage of data in several storages, the `MultiWriter` class is implemented.

Index ▾

Constants
Variables
func connectViaTLS(address, RootCA string) (*rpc.Client, error)
type CmdOpts
type DbStorageSchemaType
type ExistingPartitionInfo
type JSONWriter
    func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)
    func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error
    func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error
    func (jw *JSONWriter) watchCtx()
type MeasurementMessagePostgres
type MetricsDefiner
type MultiWriter
    func (mw *MultiWriter) AddWriter(w Writer)
    func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
    func (mw *MultiWriter) SyncMetric(dbUnique, metricName string, op SyncOp) (err error)
    func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)
type PostgresWriter
    func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error)
    func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error)
    func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
    func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error)
    func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
    func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error)
    func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
    func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error
    func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error)
    func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error)
    func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
    func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error
    func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error
    func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration)
    func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope)
    func (pgw *PostgresWriter) maintainUniqueSources()
    func (pgw *PostgresWriter) poll()
type PrometheusWriter
    func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)
    func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
    func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
    func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
    func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric
    func (promw *PrometheusWriter) Println(v ...any)
    func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)
    func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)
    func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)
    func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error
    func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error
type RPCWriter
    func NewRPCWriter(ctx context.Context, ConnStr string) (*RPCWriter, error)
    func (rw *RPCWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error
    func (rw *RPCWriter) Write(msg metrics.MeasurementEnvelope) error
    func (rw *RPCWriter) watchCtx()
type SyncOp
type SyncReq
type Writer
    func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error)
type copyFromMeasurements
    func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements
    func (c *copyFromMeasurements) EOF() bool
    func (c *copyFromMeasurements) Err() error
    func (c *copyFromMeasurements) MetricName() pgx.Identifier
    func (c *copyFromMeasurements) Next() bool
    func (c *copyFromMeasurements) Values() ([]any, error)

Package files

cmdopts.go doc.go json.go multiwriter.go postgres.go prometheus.go rpc.go types.go

Constants

const promInstanceUpStateMetric = "instance_up"

timestamps older than that will be ignored on the Prom scraper side anyway, so better don't emit at all and just log a notice

const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)

Variables

var (
    cacheLimit      = 256
    highLoadTimeout = time.Second * 5
    deleterDelay    = time.Hour
    targetColumns   = [...]string{"time", "dbname", "data", "tag_data"}
)
var (
    forceRecreatePartitions  = false                                             // to signal override PG metrics storage cache
    partitionMapMetric       = make(map[string]ExistingPartitionInfo)            // metric = min/max bounds
    partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
)
var (
    metricSchemaSQLs = []string{
        sqlMetricAdminSchema,
        sqlMetricAdminFunctions,
        sqlMetricEnsurePartitionPostgres,
        sqlMetricEnsurePartitionTimescale,
        sqlMetricChangeChunkIntervalTimescale,
        sqlMetricChangeCompressionIntervalTimescale,
    }
)

Async Prom cache

var promAsyncMetricCache = make(map[string]map[string]metrics.MeasurementEnvelope) // [dbUnique][metric]lastly_fetched_data
var promAsyncMetricCacheLock = sync.RWMutex{}
var sqlMetricAdminFunctions string
var sqlMetricAdminSchema string
var sqlMetricChangeChunkIntervalTimescale string
var sqlMetricChangeCompressionIntervalTimescale string
var sqlMetricEnsurePartitionPostgres string
var sqlMetricEnsurePartitionTimescale string

func connectViaTLS

func connectViaTLS(address, RootCA string) (*rpc.Client, error)

type CmdOpts

CmdOpts specifies the storage configuration to store metrics measurements

type CmdOpts struct {
    Sinks                 []string      `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
    BatchingDelay         time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Sink-specific batching flush delay; may be ignored by some sinks" default:"950ms" env:"PW_BATCHING_DELAY"`
    Retention             int           `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14" env:"PW_RETENTION"`
    RealDbnameField       string        `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"`
    SystemIdentifierField string        `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"`
}

type DbStorageSchemaType

type DbStorageSchemaType int
const (
    DbStorageSchemaPostgres DbStorageSchemaType = iota
    DbStorageSchemaTimescale
)

type ExistingPartitionInfo

type ExistingPartitionInfo struct {
    StartTime time.Time
    EndTime   time.Time
}

type JSONWriter

JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.

type JSONWriter struct {
    ctx context.Context
    lw  *lumberjack.Logger
    enc *jsoniter.Encoder
}

func NewJSONWriter

func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)

func (*JSONWriter) SyncMetric

func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error

func (*JSONWriter) Write

func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error

func (*JSONWriter) watchCtx

func (jw *JSONWriter) watchCtx()

type MeasurementMessagePostgres

type MeasurementMessagePostgres struct {
    Time    time.Time
    DBName  string
    Metric  string
    Data    map[string]any
    TagData map[string]string
}

type MetricsDefiner

MetricDefiner is an interface for passing metric definitions to a sink.

type MetricsDefiner interface {
    DefineMetrics(metric *metrics.Metrics) error
}

type MultiWriter

MultiWriter ensures the simultaneous storage of data in several storages.

type MultiWriter struct {
    writers []Writer
    sync.Mutex
}

func (*MultiWriter) AddWriter

func (mw *MultiWriter) AddWriter(w Writer)

func (*MultiWriter) DefineMetrics

func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)

func (*MultiWriter) SyncMetric

func (mw *MultiWriter) SyncMetric(dbUnique, metricName string, op SyncOp) (err error)

func (*MultiWriter) Write

func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)

type PostgresWriter

PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.

type PostgresWriter struct {
    ctx          context.Context
    sinkDb       db.PgxPoolIface
    metricSchema DbStorageSchemaType
    opts         *CmdOpts
    input        chan metrics.MeasurementEnvelope
    lastError    chan error
}

func NewPostgresWriter

func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error)

func NewWriterFromPostgresConn

func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error)

func (*PostgresWriter) AddDBUniqueMetricToListingTable

func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error

func (*PostgresWriter) DropOldTimePartitions

func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error)

func (*PostgresWriter) EnsureBuiltinMetricDummies

func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)

EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist

func (*PostgresWriter) EnsureMetricDbnameTime

func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error)

func (*PostgresWriter) EnsureMetricDummy

func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)

EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist

func (*PostgresWriter) EnsureMetricTime

func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error

EnsureMetricTime creates special partitions if Timescale used for realtime metrics

func (*PostgresWriter) EnsureMetricTimescale

func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error)

func (*PostgresWriter) GetOldTimePartitions

func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error)

func (*PostgresWriter) ReadMetricSchemaType

func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)

func (*PostgresWriter) SyncMetric

func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error

SyncMetric ensures that tables exist for newly added metrics and/or sources

func (*PostgresWriter) Write

func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error

Write sends the measurements to the cache channel

func (*PostgresWriter) deleteOldPartitions

func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration)

deleteOldPartitions is a background task that deletes old partitions from the measurements DB

func (*PostgresWriter) flush

func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope)

flush sends the cached measurements to the database

func (*PostgresWriter) maintainUniqueSources

func (pgw *PostgresWriter) maintainUniqueSources()

maintainUniqueSources is a background task that maintains a listing of unique sources for each metric. This is used to avoid listing the same source multiple times in Grafana dropdowns.

func (*PostgresWriter) poll

func (pgw *PostgresWriter) poll()

poll is the main loop that reads from the input channel and flushes the data to the database

type PrometheusWriter

PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper. Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.

type PrometheusWriter struct {
    logger                            log.Logger
    ctx                               context.Context
    lastScrapeErrors                  prometheus.Gauge
    totalScrapes, totalScrapeFailures prometheus.Counter
    PrometheusNamespace               string
    gauges                            map[string]([]string) // map of metric names to their gauge names, used for Prometheus gauge metrics
}

func NewPrometheusWriter

func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)

func (*PrometheusWriter) Collect

func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)

func (*PrometheusWriter) DefineMetrics

func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)

func (*PrometheusWriter) Describe

func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)

func (*PrometheusWriter) MetricStoreMessageToPromMetrics

func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric

func (*PrometheusWriter) Println

func (promw *PrometheusWriter) Println(v ...any)

func (*PrometheusWriter) PromAsyncCacheAddMetricData

func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)

func (*PrometheusWriter) PromAsyncCacheInitIfRequired

func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)

func (*PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny

func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)

func (*PrometheusWriter) SyncMetric

func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error

func (*PrometheusWriter) Write

func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error

type RPCWriter

RPCWriter is a sink that sends metric measurements to a remote server using the RPC protocol. Remote server should implement the Receiver interface. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.

type RPCWriter struct {
    ctx    context.Context
    client *rpc.Client
}

func NewRPCWriter

func NewRPCWriter(ctx context.Context, ConnStr string) (*RPCWriter, error)

func (*RPCWriter) SyncMetric

func (rw *RPCWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error

func (*RPCWriter) Write

func (rw *RPCWriter) Write(msg metrics.MeasurementEnvelope) error

Sends Measurement Message to RPC Sink

func (*RPCWriter) watchCtx

func (rw *RPCWriter) watchCtx()

type SyncOp

type SyncOp int
const (
    AddOp SyncOp = iota
    DeleteOp
    InvalidOp
)

type SyncReq

type SyncReq struct {
    DbName     string
    MetricName string
    Operation  SyncOp
}

type Writer

Writer is an interface that writes metrics values

type Writer interface {
    SyncMetric(dbUnique, metricName string, op SyncOp) error
    Write(msgs metrics.MeasurementEnvelope) error
}

func NewSinkWriter

func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error)

NewSinkWriter creates and returns new instance of MultiWriter struct.

type copyFromMeasurements

type copyFromMeasurements struct {
    envelopes      []metrics.MeasurementEnvelope
    envelopeIdx    int
    measurementIdx int // index of the current measurement in the envelope
    metricName     string
}

func newCopyFromMeasurements

func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements

func (*copyFromMeasurements) EOF

func (c *copyFromMeasurements) EOF() bool

func (*copyFromMeasurements) Err

func (c *copyFromMeasurements) Err() error

func (*copyFromMeasurements) MetricName

func (c *copyFromMeasurements) MetricName() pgx.Identifier

func (*copyFromMeasurements) Next

func (c *copyFromMeasurements) Next() bool

func (*copyFromMeasurements) Values

func (c *copyFromMeasurements) Values() ([]any, error)