...

Package sinks

import "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
Overview
Index

Overview ▾

Package sinks provides functionality to store monitored data in different ways.

At the moment we provide sink connectors for

  • PostgreSQL and flavours,
  • Prometheus,
  • plain JSON files,
  • and RPC servers.

To ensure the simultaneous storage of data in several storages, the `MultiWriter` class is implemented.

Index ▾

Constants
Variables
func LoadTLSCredentials(CAFile string) (credentials.TransportCredentials, error)
func NewPostgresSinkMigrator(ctx context.Context, connStr string) (db.Migrator, error)
func convertSyncOp(op SyncOp) pb.SyncOp
type CmdOpts
type DbStorageSchemaType
type ExistingPartitionInfo
type JSONWriter
    func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)
    func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error
    func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error
    func (jw *JSONWriter) watchCtx()
type MeasurementMessagePostgres
type MetricsDefiner
type MultiWriter
    func (mw *MultiWriter) AddWriter(w Writer)
    func (mw *MultiWriter) Count() int
    func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
    func (mw *MultiWriter) Migrate() (err error)
    func (mw *MultiWriter) NeedsMigration() (bool, error)
    func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error)
    func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)
type PostgresWriter
    func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error)
    func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error)
    func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
    func (pgw *PostgresWriter) DeleteOldPartitions()
    func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
    func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error)
    func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
    func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error)
    func (pgw *PostgresWriter) MaintainUniqueSources()
    func (pgw *PostgresWriter) Migrate() error
    func (pgw *PostgresWriter) NeedsMigration() (bool, error)
    func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
    func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
    func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error
    func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope)
    func (pgw *PostgresWriter) init() (err error)
    func (pgw *PostgresWriter) poll()
    func (pgw *PostgresWriter) scheduleJob(interval time.Duration, job func())
type PromMetricCache
type PrometheusWriter
    func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)
    func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
    func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
    func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
    func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric
    func (promw *PrometheusWriter) Println(v ...any)
    func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)
    func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)
    func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)
    func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
    func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error
type RPCWriter
    func NewRPCWriter(ctx context.Context, connStr string) (*RPCWriter, error)
    func (rw *RPCWriter) DefineMetrics(metrics *metrics.Metrics) error
    func (rw *RPCWriter) Ping() error
    func (rw *RPCWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
    func (rw *RPCWriter) Write(msg metrics.MeasurementEnvelope) error
    func (rw *RPCWriter) watchCtx()
type SyncOp
    func (s SyncOp) String() string
type Writer
    func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error)
type copyFromMeasurements
    func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements
    func (c *copyFromMeasurements) EOF() bool
    func (c *copyFromMeasurements) Err() error
    func (c *copyFromMeasurements) MetricName() (ident pgx.Identifier)
    func (c *copyFromMeasurements) Next() bool
    func (c *copyFromMeasurements) NextEnvelope() bool
    func (c *copyFromMeasurements) Values() ([]any, error)

Package files

cmdopts.go doc.go json.go multiwriter.go postgres.go prometheus.go rpc.go types.go

Constants

MigrationsCount is the total number of migrations in admin.migration table

const MigrationsCount = 1
const promInstanceUpStateMetric = "instance_up"

timestamps older than that will be ignored on the Prom scraper side anyway, so better don't emit at all and just log a notice

const promScrapingStalenessHardDropLimit = time.Minute * time.Duration(10)

Variables

var (
    cacheLimit      = 256
    highLoadTimeout = time.Second * 5
    targetColumns   = [...]string{"time", "dbname", "data", "tag_data"}
)
var ErrNeedsMigration = errors.New("sink database schema is outdated, please run migrations using `pgwatch config upgrade` command")

make sure *dbMetricReaderWriter implements the Migrator interface

var _ db.Migrator = (*PostgresWriter)(nil)
var initMigrator = func(pgw *PostgresWriter) (*migrator.Migrator, error) {
    return migrator.New(
        migrator.TableName("admin.migration"),
        migrator.SetNotice(func(s string) {
            log.GetLogger(pgw.ctx).Info(s)
        }),
        migrations(),
    )
}
var (
    metricSchemaSQLs = []string{
        sqlMetricAdminSchema,
        sqlMetricAdminFunctions,
        sqlMetricEnsurePartitionPostgres,
        sqlMetricEnsurePartitionTimescale,
        sqlMetricChangeChunkIntervalTimescale,
        sqlMetricChangeCompressionIntervalTimescale,
    }
)

migrations holds function returning all upgrade migrations needed

var migrations func() migrator.Option = func() migrator.Option {
    return migrator.Migrations(
        &migrator.Migration{
            Name: "01110 Apply postgres sink schema migrations",
            Func: func(context.Context, pgx.Tx) error {

                return nil
            },
        },

        &migrator.Migration{
            Name: "01180 Apply admin functions migrations for v5",
            Func: func(ctx context.Context, tx pgx.Tx) error {
                _, err := tx.Exec(ctx, `
                    DROP FUNCTION IF EXISTS admin.ensure_partition_metric_dbname_time;
                    DROP FUNCTION IF EXISTS admin.ensure_partition_metric_time;
                    DROP FUNCTION IF EXISTS admin.get_old_time_partitions(integer, text);
                    DROP FUNCTION IF EXISTS admin.drop_old_time_partitions(integer, boolean, text);
                `)
                if err != nil {
                    return err
                }

                _, err = tx.Exec(ctx, sqlMetricEnsurePartitionPostgres)
                if err != nil {
                    return err
                }
                _, err = tx.Exec(ctx, sqlMetricAdminFunctions)
                return err
            },
        },
    )
}
var sqlMetricAdminFunctions string
var sqlMetricAdminSchema string
var sqlMetricChangeChunkIntervalTimescale string
var sqlMetricChangeCompressionIntervalTimescale string
var sqlMetricEnsurePartitionPostgres string
var sqlMetricEnsurePartitionTimescale string

func LoadTLSCredentials

func LoadTLSCredentials(CAFile string) (credentials.TransportCredentials, error)

func NewPostgresSinkMigrator

func NewPostgresSinkMigrator(ctx context.Context, connStr string) (db.Migrator, error)

func convertSyncOp

func convertSyncOp(op SyncOp) pb.SyncOp

convertSyncOp converts sinks.SyncOp to pb.SyncOp

type CmdOpts

CmdOpts specifies the storage configuration to store metrics measurements

type CmdOpts struct {
    Sinks                 []string      `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
    BatchingDelay         time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Sink-specific batching flush delay; may be ignored by some sinks" default:"950ms" env:"PW_BATCHING_DELAY"`
    PartitionInterval     string        `long:"partition-interval" mapstructure:"partition-interval" description:"Time range for PostgreSQL sink time partitions. Must be a valid PostgreSQL interval." default:"1 week" env:"PW_PARTITION_INTERVAL"`
    RetentionInterval     string        `long:"retention" mapstructure:"retention" description:"Delete metrics older than this. Must be a valid PostgreSQL interval." default:"14 days" env:"PW_RETENTION"`
    MaintenanceInterval   string        `long:"maintenance-interval" mapstructure:"maintenance-interval" description:"Run pgwatch maintenance tasks on sinks with this interval e.g., deleting old metrics; Set to zero to disable. Must be a valid PostgreSQL interval." default:"12 hours" env:"PW_MAINTENANCE_INTERVAL"`
    RealDbnameField       string        `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"`
    SystemIdentifierField string        `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"`
}

type DbStorageSchemaType

type DbStorageSchemaType int
const (
    DbStorageSchemaPostgres DbStorageSchemaType = iota
    DbStorageSchemaTimescale
)

type ExistingPartitionInfo

type ExistingPartitionInfo struct {
    StartTime time.Time
    EndTime   time.Time
}

type JSONWriter

JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.

type JSONWriter struct {
    ctx context.Context
    lw  *lumberjack.Logger
    enc *jsoniter.Encoder
}

func NewJSONWriter

func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)

func (*JSONWriter) SyncMetric

func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error

func (*JSONWriter) Write

func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error

func (*JSONWriter) watchCtx

func (jw *JSONWriter) watchCtx()

type MeasurementMessagePostgres

type MeasurementMessagePostgres struct {
    Time    time.Time
    DBName  string
    Metric  string
    Data    map[string]any
    TagData map[string]string
}

type MetricsDefiner

MetricDefiner is an interface for passing metric definitions to a sink.

type MetricsDefiner interface {
    DefineMetrics(metrics *metrics.Metrics) error
}

type MultiWriter

MultiWriter ensures the simultaneous storage of data in several storages.

type MultiWriter struct {
    writers []Writer
    sync.Mutex
}

func (*MultiWriter) AddWriter

func (mw *MultiWriter) AddWriter(w Writer)

func (*MultiWriter) Count

func (mw *MultiWriter) Count() int

func (*MultiWriter) DefineMetrics

func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)

func (*MultiWriter) Migrate

func (mw *MultiWriter) Migrate() (err error)

Migrate runs migrations on all writers that support it

func (*MultiWriter) NeedsMigration

func (mw *MultiWriter) NeedsMigration() (bool, error)

NeedsMigration checks if any writer needs migration

func (*MultiWriter) SyncMetric

func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error)

func (*MultiWriter) Write

func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)

type PostgresWriter

PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.

type PostgresWriter struct {
    ctx                      context.Context
    sinkDb                   db.PgxPoolIface
    metricSchema             DbStorageSchemaType
    opts                     *CmdOpts
    retentionInterval        time.Duration
    maintenanceInterval      time.Duration
    input                    chan metrics.MeasurementEnvelope
    lastError                chan error
    forceRecreatePartitions  bool                                        // to signal override PG metrics storage cache
    partitionMapMetric       map[string]ExistingPartitionInfo            // metric = min/max bounds
    partitionMapMetricDbname map[string]map[string]ExistingPartitionInfo // metric[dbname = min/max bounds]
}

func NewPostgresWriter

func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error)

func NewWriterFromPostgresConn

func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error)

func (*PostgresWriter) AddDBUniqueMetricToListingTable

func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error

func (*PostgresWriter) DeleteOldPartitions

func (pgw *PostgresWriter) DeleteOldPartitions()

DeleteOldPartitions is a background task that deletes old partitions from the measurements DB

func (*PostgresWriter) EnsureBuiltinMetricDummies

func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)

EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist

func (*PostgresWriter) EnsureMetricDbnameTime

func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error)

func (*PostgresWriter) EnsureMetricDummy

func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)

EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist

func (*PostgresWriter) EnsureMetricTimescale

func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error)

func (*PostgresWriter) MaintainUniqueSources

func (pgw *PostgresWriter) MaintainUniqueSources()

MaintainUniqueSources is a background task that maintains a mapping of unique sources in each metric table in admin.all_distinct_dbname_metrics. This is used to avoid listing the same source multiple times in Grafana dropdowns.

func (*PostgresWriter) Migrate

func (pgw *PostgresWriter) Migrate() error

Migrate upgrades database with all migrations

func (*PostgresWriter) NeedsMigration

func (pgw *PostgresWriter) NeedsMigration() (bool, error)

NeedsMigration checks if database needs migration

func (*PostgresWriter) ReadMetricSchemaType

func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)

func (*PostgresWriter) SyncMetric

func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error

SyncMetric ensures that tables exist for newly added metrics and/or sources

func (*PostgresWriter) Write

func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error

Write sends the measurements to the cache channel

func (*PostgresWriter) flush

func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope)

flush sends the cached measurements to the database

func (*PostgresWriter) init

func (pgw *PostgresWriter) init() (err error)

func (*PostgresWriter) poll

func (pgw *PostgresWriter) poll()

poll is the main loop that reads from the input channel and flushes the data to the database

func (*PostgresWriter) scheduleJob

func (pgw *PostgresWriter) scheduleJob(interval time.Duration, job func())

type PromMetricCache

type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope // [dbUnique][metric]lastly_fetched_data

type PrometheusWriter

PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper. Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.

type PrometheusWriter struct {
    sync.RWMutex
    logger              log.Logger
    ctx                 context.Context
    lastScrapeErrors    prometheus.Gauge
    totalScrapes        prometheus.Counter
    totalScrapeFailures prometheus.Counter
    gauges              map[string]([]string) // map of metric names to their gauge names, used for Prometheus gauge metrics
    Namespace           string
    Cache               PromMetricCache // [dbUnique][metric]lastly_fetched_data
}

func NewPrometheusWriter

func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)

func (*PrometheusWriter) Collect

func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)

func (*PrometheusWriter) DefineMetrics

func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)

func (*PrometheusWriter) Describe

func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)

func (*PrometheusWriter) MetricStoreMessageToPromMetrics

func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric

func (*PrometheusWriter) Println

func (promw *PrometheusWriter) Println(v ...any)

func (*PrometheusWriter) PromAsyncCacheAddMetricData

func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)

func (*PrometheusWriter) PromAsyncCacheInitIfRequired

func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)

func (*PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny

func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)

func (*PrometheusWriter) SyncMetric

func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error

func (*PrometheusWriter) Write

func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error

type RPCWriter

RPCWriter sends metric measurements to a remote server using gRPC. Remote servers should make use the .proto file under api/pb/ to integrate with it. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.

type RPCWriter struct {
    ctx    context.Context
    conn   *grpc.ClientConn
    client pb.ReceiverClient
}

func NewRPCWriter

func NewRPCWriter(ctx context.Context, connStr string) (*RPCWriter, error)

func (*RPCWriter) DefineMetrics

func (rw *RPCWriter) DefineMetrics(metrics *metrics.Metrics) error

DefineMetrics sends metric definitions to the remote server

func (*RPCWriter) Ping

func (rw *RPCWriter) Ping() error

func (*RPCWriter) SyncMetric

func (rw *RPCWriter) SyncMetric(sourceName, metricName string, op SyncOp) error

SyncMetric synchronizes a metric and monitored source with the remote server

func (*RPCWriter) Write

func (rw *RPCWriter) Write(msg metrics.MeasurementEnvelope) error

Sends Measurement Message to RPC Sink

func (*RPCWriter) watchCtx

func (rw *RPCWriter) watchCtx()

type SyncOp

SyncOp represents synchronization operations for metrics. These constants are used both in Go code and protobuf definitions.

type SyncOp int32
const (
    // InvalidOp represents an invalid or unrecognized operation
    InvalidOp SyncOp = iota // 0
    // AddOp represents adding a new metric
    AddOp // 1
    // DeleteOp represents deleting an existing metric or entire source
    DeleteOp // 2
    // DefineOp represents defining metric definitions
    DefineOp // 3
)

func (SyncOp) String

func (s SyncOp) String() string

String returns the string representation of the SyncOp

type Writer

Writer is an interface that writes metrics values

type Writer interface {
    SyncMetric(sourceName, metricName string, op SyncOp) error
    Write(msgs metrics.MeasurementEnvelope) error
}

func NewSinkWriter

func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error)

NewSinkWriter creates and returns new instance of MultiWriter struct.

type copyFromMeasurements

type copyFromMeasurements struct {
    envelopes      []metrics.MeasurementEnvelope
    envelopeIdx    int
    measurementIdx int // index of the current measurement in the envelope
    metricName     string
}

func newCopyFromMeasurements

func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements

func (*copyFromMeasurements) EOF

func (c *copyFromMeasurements) EOF() bool

func (*copyFromMeasurements) Err

func (c *copyFromMeasurements) Err() error

func (*copyFromMeasurements) MetricName

func (c *copyFromMeasurements) MetricName() (ident pgx.Identifier)

func (*copyFromMeasurements) Next

func (c *copyFromMeasurements) Next() bool

func (*copyFromMeasurements) NextEnvelope

func (c *copyFromMeasurements) NextEnvelope() bool

func (*copyFromMeasurements) Values

func (c *copyFromMeasurements) Values() ([]any, error)