...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/postgres.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	_ "embed"
     6  	"errors"
     7  	"fmt"
     8  	"maps"
     9  	"slices"
    10  	"strings"
    11  	"time"
    12  
    13  	jsoniter "github.com/json-iterator/go"
    14  
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    17  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    18  	"github.com/jackc/pgx/v5"
    19  	"github.com/jackc/pgx/v5/pgconn"
    20  )
    21  
    22  var (
    23  	cacheLimit      = 256
    24  	highLoadTimeout = time.Second * 5
    25  	targetColumns   = [...]string{"time", "dbname", "data", "tag_data"}
    26  )
    27  
    28  // PostgresWriter is a sink that writes metric measurements to a Postgres database.
    29  // At the moment, it supports both Postgres and TimescaleDB as a storage backend.
    30  // However, one is able to use any Postgres-compatible database as a storage backend,
    31  // e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
    32  type PostgresWriter struct {
    33  	ctx                 context.Context
    34  	sinkDb              db.PgxPoolIface
    35  	metricSchema        DbStorageSchemaType
    36  	opts                *CmdOpts
    37  	retentionInterval   time.Duration
    38  	maintenanceInterval time.Duration
    39  	input               chan metrics.MeasurementEnvelope
    40  	lastError           chan error
    41  }
    42  
    43  func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
    44  	var conn db.PgxPoolIface
    45  	if conn, err = db.New(ctx, connstr); err != nil {
    46  		return
    47  	}
    48  	return NewWriterFromPostgresConn(ctx, conn, opts)
    49  }
    50  
    51  func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error) {
    52  	l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
    53  	ctx = log.WithLogger(ctx, l)
    54  	pgw = &PostgresWriter{
    55  		ctx:       ctx,
    56  		opts:      opts,
    57  		input:     make(chan metrics.MeasurementEnvelope, cacheLimit),
    58  		lastError: make(chan error),
    59  		sinkDb:    conn,
    60  	}
    61  	if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
    62  		var isValidPartitionInterval bool
    63  		if err = conn.QueryRow(ctx,
    64  			"SELECT extract(epoch from $1::interval), extract(epoch from $2::interval), $3::interval >= '1h'::interval",
    65  			opts.RetentionInterval, opts.MaintenanceInterval, opts.PartitionInterval,
    66  		).Scan(&pgw.retentionInterval, &pgw.maintenanceInterval, &isValidPartitionInterval); err != nil {
    67  			return err
    68  		}
    69  
    70  		// epoch returns seconds but time.Duration represents nanoseconds
    71  		pgw.retentionInterval *= time.Second
    72  		pgw.maintenanceInterval *= time.Second
    73  
    74  		if !isValidPartitionInterval {
    75  			return fmt.Errorf("--partition-interval must be at least 1 hour, got: %s", opts.PartitionInterval)
    76  		}
    77  		if pgw.maintenanceInterval < 0 {
    78  			return errors.New("--maintenance-interval must be a positive PostgreSQL interval or 0 to disable it")
    79  		}
    80  		if pgw.retentionInterval < time.Hour && pgw.retentionInterval != 0 {
    81  			return errors.New("--retention must be at least 1 hour PostgreSQL interval or 0 to disable it")
    82  		}
    83  
    84  		l.Info("initialising measurements database...")
    85  		exists, err := db.DoesSchemaExist(ctx, conn, "admin")
    86  		if err != nil || exists {
    87  			return err
    88  		}
    89  		for _, sql := range metricSchemaSQLs {
    90  			if _, err = conn.Exec(ctx, sql); err != nil {
    91  				return err
    92  			}
    93  		}
    94  		return nil
    95  	}); err != nil {
    96  		return
    97  	}
    98  	if err = pgw.ReadMetricSchemaType(); err != nil {
    99  		return
   100  	}
   101  	if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
   102  		return
   103  	}
   104  
   105  	pgw.scheduleJob(pgw.maintenanceInterval, pgw.MaintainUniqueSources)
   106  	pgw.scheduleJob(pgw.retentionInterval, pgw.DeleteOldPartitions)
   107  
   108  	go pgw.poll()
   109  	l.Info(`measurements sink is activated`)
   110  	return
   111  }
   112  
   113  //go:embed sql/admin_schema.sql
   114  var sqlMetricAdminSchema string
   115  
   116  //go:embed sql/admin_functions.sql
   117  var sqlMetricAdminFunctions string
   118  
   119  //go:embed sql/ensure_partition_postgres.sql
   120  var sqlMetricEnsurePartitionPostgres string
   121  
   122  //go:embed sql/ensure_partition_timescale.sql
   123  var sqlMetricEnsurePartitionTimescale string
   124  
   125  //go:embed sql/change_chunk_interval.sql
   126  var sqlMetricChangeChunkIntervalTimescale string
   127  
   128  //go:embed sql/change_compression_interval.sql
   129  var sqlMetricChangeCompressionIntervalTimescale string
   130  
   131  var (
   132  	metricSchemaSQLs = []string{
   133  		sqlMetricAdminSchema,
   134  		sqlMetricAdminFunctions,
   135  		sqlMetricEnsurePartitionPostgres,
   136  		sqlMetricEnsurePartitionTimescale,
   137  		sqlMetricChangeChunkIntervalTimescale,
   138  		sqlMetricChangeCompressionIntervalTimescale,
   139  	}
   140  )
   141  
   142  type ExistingPartitionInfo struct {
   143  	StartTime time.Time
   144  	EndTime   time.Time
   145  }
   146  
   147  type MeasurementMessagePostgres struct {
   148  	Time    time.Time
   149  	DBName  string
   150  	Metric  string
   151  	Data    map[string]any
   152  	TagData map[string]string
   153  }
   154  
   155  type DbStorageSchemaType int
   156  
   157  const (
   158  	DbStorageSchemaPostgres DbStorageSchemaType = iota
   159  	DbStorageSchemaTimescale
   160  )
   161  
   162  func (pgw *PostgresWriter) scheduleJob(interval time.Duration, job func()) {
   163  	if interval > 0 {
   164  		go func() {
   165  			for {
   166  				select {
   167  				case <-pgw.ctx.Done():
   168  					return
   169  				case <-time.After(interval):
   170  					job()
   171  				}
   172  			}
   173  		}()
   174  	}
   175  }
   176  
   177  func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
   178  	var isTs bool
   179  	pgw.metricSchema = DbStorageSchemaPostgres
   180  	sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
   181  	if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
   182  		pgw.metricSchema = DbStorageSchemaTimescale
   183  	}
   184  	return
   185  }
   186  
   187  var (
   188  	forceRecreatePartitions  = false                                             // to signal override PG metrics storage cache
   189  	partitionMapMetric       = make(map[string]ExistingPartitionInfo)            // metric = min/max bounds
   190  	partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
   191  )
   192  
   193  // SyncMetric ensures that tables exist for newly added metrics and/or sources
   194  func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
   195  	if op == AddOp {
   196  		return errors.Join(
   197  			pgw.AddDBUniqueMetricToListingTable(sourceName, metricName),
   198  			pgw.EnsureMetricDummy(metricName), // ensure that there is at least an empty top-level table not to get ugly Grafana notifications
   199  		)
   200  	}
   201  	return nil
   202  }
   203  
   204  // EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
   205  func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
   206  	for _, name := range metrics.GetDefaultBuiltInMetrics() {
   207  		err = errors.Join(err, pgw.EnsureMetricDummy(name))
   208  	}
   209  	return
   210  }
   211  
   212  // EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
   213  func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
   214  	_, err = pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.ensure_dummy_metrics_table($1)", metric)
   215  	return
   216  }
   217  
   218  // Write sends the measurements to the cache channel
   219  func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error {
   220  	if pgw.ctx.Err() != nil {
   221  		return pgw.ctx.Err()
   222  	}
   223  	select {
   224  	case pgw.input <- msg:
   225  		// msgs sent
   226  	case <-time.After(highLoadTimeout):
   227  		// msgs dropped due to a huge load, check stdout or file for detailed log
   228  	}
   229  	select {
   230  	case err := <-pgw.lastError:
   231  		return err
   232  	default:
   233  		return nil
   234  	}
   235  }
   236  
   237  // poll is the main loop that reads from the input channel and flushes the data to the database
   238  func (pgw *PostgresWriter) poll() {
   239  	cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
   240  	cacheTimeout := pgw.opts.BatchingDelay
   241  	tick := time.NewTicker(cacheTimeout)
   242  	for {
   243  		select {
   244  		case <-pgw.ctx.Done(): //check context with high priority
   245  			return
   246  		default:
   247  			select {
   248  			case entry := <-pgw.input:
   249  				cache = append(cache, entry)
   250  				if len(cache) < cacheLimit {
   251  					break
   252  				}
   253  				tick.Stop()
   254  				pgw.flush(cache)
   255  				cache = cache[:0]
   256  				tick = time.NewTicker(cacheTimeout)
   257  			case <-tick.C:
   258  				pgw.flush(cache)
   259  				cache = cache[:0]
   260  			case <-pgw.ctx.Done():
   261  				return
   262  			}
   263  		}
   264  	}
   265  }
   266  
   267  func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements {
   268  	return &copyFromMeasurements{envelopes: rows, envelopeIdx: -1, measurementIdx: -1}
   269  }
   270  
   271  type copyFromMeasurements struct {
   272  	envelopes      []metrics.MeasurementEnvelope
   273  	envelopeIdx    int
   274  	measurementIdx int // index of the current measurement in the envelope
   275  	metricName     string
   276  }
   277  
   278  func (c *copyFromMeasurements) NextEnvelope() bool {
   279  	c.envelopeIdx++
   280  	c.measurementIdx = -1
   281  	return c.envelopeIdx < len(c.envelopes)
   282  }
   283  
   284  func (c *copyFromMeasurements) Next() bool {
   285  	for {
   286  		// Check if we need to advance to the next envelope
   287  		if c.envelopeIdx < 0 || c.measurementIdx+1 >= len(c.envelopes[c.envelopeIdx].Data) {
   288  			// Advance to next envelope
   289  			if ok := c.NextEnvelope(); !ok {
   290  				return false // No more envelopes
   291  			}
   292  			// Set metric name from first envelope, or detect metric boundary
   293  			if c.metricName == "" {
   294  				c.metricName = c.envelopes[c.envelopeIdx].MetricName
   295  			} else if c.metricName != c.envelopes[c.envelopeIdx].MetricName {
   296  				// We've hit a different metric - we're done with current metric
   297  				// Reset position to process this envelope on next call
   298  				c.envelopeIdx--
   299  				c.measurementIdx = len(c.envelopes[c.envelopeIdx].Data) // Set to length so we've "finished" this envelope
   300  				c.metricName = ""                                       // Reset for next metric
   301  				return false
   302  			}
   303  		}
   304  
   305  		// Advance to next measurement in current envelope
   306  		c.measurementIdx++
   307  		if c.measurementIdx < len(c.envelopes[c.envelopeIdx].Data) {
   308  			return true // Found valid measurement
   309  		}
   310  		// If we reach here, we've exhausted current envelope, loop will advance to next envelope
   311  	}
   312  }
   313  
   314  func (c *copyFromMeasurements) EOF() bool {
   315  	return c.envelopeIdx >= len(c.envelopes)
   316  }
   317  
   318  func (c *copyFromMeasurements) Values() ([]any, error) {
   319  	row := maps.Clone(c.envelopes[c.envelopeIdx].Data[c.measurementIdx])
   320  	tagRow := maps.Clone(c.envelopes[c.envelopeIdx].CustomTags)
   321  	if tagRow == nil {
   322  		tagRow = make(map[string]string)
   323  	}
   324  	for k, v := range row {
   325  		if after, ok := strings.CutPrefix(k, metrics.TagPrefix); ok {
   326  			tagRow[after] = fmt.Sprintf("%v", v)
   327  			delete(row, k)
   328  		}
   329  	}
   330  	jsonTags, terr := jsoniter.ConfigFastest.MarshalToString(tagRow)
   331  	json, err := jsoniter.ConfigFastest.MarshalToString(row)
   332  	if err != nil || terr != nil {
   333  		return nil, errors.Join(err, terr)
   334  	}
   335  	return []any{time.Unix(0, c.envelopes[c.envelopeIdx].Data.GetEpoch()), c.envelopes[c.envelopeIdx].DBName, json, jsonTags}, nil
   336  }
   337  
   338  func (c *copyFromMeasurements) Err() error {
   339  	return nil
   340  }
   341  
   342  func (c *copyFromMeasurements) MetricName() (ident pgx.Identifier) {
   343  	if c.envelopeIdx+1 < len(c.envelopes) {
   344  		// Metric name is taken from the next envelope
   345  		ident = pgx.Identifier{c.envelopes[c.envelopeIdx+1].MetricName}
   346  	}
   347  	return
   348  }
   349  
   350  // flush sends the cached measurements to the database
   351  func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
   352  	if len(msgs) == 0 {
   353  		return
   354  	}
   355  	logger := log.GetLogger(pgw.ctx)
   356  	pgPartBounds := make(map[string]ExistingPartitionInfo)                  // metric=min/max
   357  	pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo) // metric=[dbname=min/max]
   358  	var err error
   359  
   360  	slices.SortFunc(msgs, func(a, b metrics.MeasurementEnvelope) int {
   361  		if a.MetricName < b.MetricName {
   362  			return -1
   363  		} else if a.MetricName > b.MetricName {
   364  			return 1
   365  		}
   366  		return 0
   367  	})
   368  
   369  	for _, msg := range msgs {
   370  		for _, dataRow := range msg.Data {
   371  			epochTime := time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
   372  			switch pgw.metricSchema {
   373  			case DbStorageSchemaTimescale:
   374  				// set min/max timestamps to check/create partitions
   375  				bounds, ok := pgPartBounds[msg.MetricName]
   376  				if !ok || (ok && epochTime.Before(bounds.StartTime)) {
   377  					bounds.StartTime = epochTime
   378  					pgPartBounds[msg.MetricName] = bounds
   379  				}
   380  				if !ok || (ok && epochTime.After(bounds.EndTime)) {
   381  					bounds.EndTime = epochTime
   382  					pgPartBounds[msg.MetricName] = bounds
   383  				}
   384  			case DbStorageSchemaPostgres:
   385  				_, ok := pgPartBoundsDbName[msg.MetricName]
   386  				if !ok {
   387  					pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
   388  				}
   389  				bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
   390  				if !ok || (ok && epochTime.Before(bounds.StartTime)) {
   391  					bounds.StartTime = epochTime
   392  					pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
   393  				}
   394  				if !ok || (ok && epochTime.After(bounds.EndTime)) {
   395  					bounds.EndTime = epochTime
   396  					pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
   397  				}
   398  			default:
   399  				logger.Fatal("unknown storage schema...")
   400  			}
   401  		}
   402  	}
   403  
   404  	switch pgw.metricSchema {
   405  	case DbStorageSchemaPostgres:
   406  		err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
   407  	case DbStorageSchemaTimescale:
   408  		err = pgw.EnsureMetricTimescale(pgPartBounds)
   409  	default:
   410  		logger.Fatal("unknown storage schema...")
   411  	}
   412  	forceRecreatePartitions = false
   413  	if err != nil {
   414  		pgw.lastError <- err
   415  	}
   416  
   417  	var rowsBatched, n int64
   418  	t1 := time.Now()
   419  	cfm := newCopyFromMeasurements(msgs)
   420  	for !cfm.EOF() {
   421  		n, err = pgw.sinkDb.CopyFrom(context.Background(), cfm.MetricName(), targetColumns[:], cfm)
   422  		rowsBatched += n
   423  		if err != nil {
   424  			logger.Error(err)
   425  			if PgError, ok := err.(*pgconn.PgError); ok {
   426  				forceRecreatePartitions = PgError.Code == "23514"
   427  			}
   428  			if forceRecreatePartitions {
   429  				logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
   430  			}
   431  		}
   432  	}
   433  	diff := time.Since(t1)
   434  	if err == nil {
   435  		logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
   436  		return
   437  	}
   438  	pgw.lastError <- err
   439  }
   440  
   441  func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error) {
   442  	logger := log.GetLogger(pgw.ctx)
   443  	sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
   444  	for metric := range pgPartBounds {
   445  		if _, ok := partitionMapMetric[metric]; !ok {
   446  			if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
   447  				logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
   448  				return err
   449  			}
   450  			partitionMapMetric[metric] = ExistingPartitionInfo{}
   451  		}
   452  	}
   453  	return
   454  }
   455  
   456  func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
   457  	var rows pgx.Rows
   458  	sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3, $4)`
   459  	for metric, dbnameTimestampMap := range metricDbnamePartBounds {
   460  		_, ok := partitionMapMetricDbname[metric]
   461  		if !ok {
   462  			partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
   463  		}
   464  
   465  		for dbname, pb := range dbnameTimestampMap {
   466  			if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
   467  				return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
   468  			}
   469  			partInfo, ok := partitionMapMetricDbname[metric][dbname]
   470  			if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
   471  				if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime, pgw.opts.PartitionInterval); err != nil {
   472  					return
   473  				}
   474  				if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
   475  					return err
   476  				}
   477  				partitionMapMetricDbname[metric][dbname] = partInfo
   478  			}
   479  			if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
   480  				if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime, pgw.opts.PartitionInterval); err != nil {
   481  					return
   482  				}
   483  				if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
   484  					return err
   485  				}
   486  				partitionMapMetricDbname[metric][dbname] = partInfo
   487  			}
   488  		}
   489  	}
   490  	return nil
   491  }
   492  
   493  // DeleteOldPartitions is a background task that deletes old partitions from the measurements DB
   494  func (pgw *PostgresWriter) DeleteOldPartitions() {
   495  	l := log.GetLogger(pgw.ctx)
   496  	var partsDropped int
   497  	err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
   498  		pgw.opts.RetentionInterval).Scan(&partsDropped)
   499  	if err != nil {
   500  		l.Error("Could not drop old time partitions:", err)
   501  	} else if partsDropped > 0 {
   502  		l.Infof("Dropped %d old time partitions", partsDropped)
   503  	}
   504  }
   505  
   506  // MaintainUniqueSources is a background task that maintains a mapping of unique sources
   507  // in each metric table in admin.all_distinct_dbname_metrics.
   508  // This is used to avoid listing the same source multiple times in Grafana dropdowns.
   509  func (pgw *PostgresWriter) MaintainUniqueSources() {
   510  	logger := log.GetLogger(pgw.ctx)
   511  
   512  	sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
   513  	sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
   514  	sqlDistinct := `
   515  	WITH RECURSIVE t(dbname) AS (
   516  		SELECT MIN(dbname) AS dbname FROM %s
   517  		UNION
   518  		SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t 
   519  	)
   520  	SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
   521  	sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
   522  	sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
   523  	sqlDroppedTables := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL($1)`
   524  	sqlAdd := `
   525  		INSERT INTO admin.all_distinct_dbname_metrics 
   526  		SELECT u, $2 FROM (select unnest($1::text[]) as u) x
   527  		WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
   528  		RETURNING *`
   529  
   530  	var lock bool
   531  	logger.Infof("Trying to get admin.all_distinct_dbname_metrics maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
   532  	if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
   533  		logger.Error("Getting admin.all_distinct_dbname_metrics maintainer advisory lock failed:", err)
   534  		return
   535  	}
   536  	if !lock {
   537  		logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
   538  		return
   539  	}
   540  
   541  	logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
   542  	rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
   543  	allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
   544  	if err != nil {
   545  		logger.Error(err)
   546  		return
   547  	}
   548  
   549  	for i, tableName := range allDistinctMetricTables {
   550  		foundDbnamesMap := make(map[string]bool)
   551  		foundDbnamesArr := make([]string, 0)
   552  
   553  		metricName := strings.Replace(tableName, "public.", "", 1)
   554  		// later usage in sqlDroppedTables requires no "public." prefix
   555  		allDistinctMetricTables[i] = metricName
   556  
   557  		logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
   558  		rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
   559  		ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
   560  		if err != nil {
   561  			logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   562  			continue
   563  		}
   564  		for _, drDbname := range ret {
   565  			foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
   566  		}
   567  
   568  		// delete all that are not known and add all that are not there
   569  		for k := range foundDbnamesMap {
   570  			foundDbnamesArr = append(foundDbnamesArr, k)
   571  		}
   572  		if len(foundDbnamesArr) == 0 { // delete all entries for given metric
   573  			logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
   574  
   575  			_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
   576  			if err != nil {
   577  				logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
   578  			}
   579  			continue
   580  		}
   581  		cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
   582  		if err != nil {
   583  			logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   584  		} else if cmdTag.RowsAffected() > 0 {
   585  			logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
   586  		}
   587  		cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
   588  		if err != nil {
   589  			logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   590  		} else if cmdTag.RowsAffected() > 0 {
   591  			logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
   592  		}
   593  		time.Sleep(time.Minute)
   594  	}
   595  
   596  	cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDroppedTables, allDistinctMetricTables)
   597  	if err != nil {
   598  		logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for dropped metric tables: %s", err)
   599  	} else if cmdTag.RowsAffected() > 0 {
   600  		logger.Infof("Removed %d stale entries for dropped tables from all_distinct_dbname_metrics listing table", cmdTag.RowsAffected())
   601  	}
   602  }
   603  
   604  func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
   605  	sql := `INSERT INTO admin.all_distinct_dbname_metrics
   606  			SELECT $1, $2
   607  			WHERE NOT EXISTS (
   608  				SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = $1 AND metric = $2
   609  			)`
   610  	_, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
   611  	return err
   612  }
   613