...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/postgres.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	_ "embed"
     6  	"errors"
     7  	"fmt"
     8  	"maps"
     9  	"slices"
    10  	"strings"
    11  	"time"
    12  
    13  	jsoniter "github.com/json-iterator/go"
    14  
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    17  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    18  	"github.com/jackc/pgx/v5"
    19  	"github.com/jackc/pgx/v5/pgconn"
    20  )
    21  
    22  var (
    23  	cacheLimit      = 256
    24  	highLoadTimeout = time.Second * 5
    25  	targetColumns   = [...]string{"time", "dbname", "data", "tag_data"}
    26  )
    27  
    28  func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
    29  	var conn db.PgxPoolIface
    30  	if conn, err = db.New(ctx, connstr); err != nil {
    31  		return
    32  	}
    33  	return NewWriterFromPostgresConn(ctx, conn, opts)
    34  }
    35  
    36  func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error) {
    37  	l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
    38  	ctx = log.WithLogger(ctx, l)
    39  	pgw = &PostgresWriter{
    40  		ctx:       ctx,
    41  		opts:      opts,
    42  		input:     make(chan metrics.MeasurementEnvelope, cacheLimit),
    43  		lastError: make(chan error),
    44  		sinkDb:    conn,
    45  	}
    46  	var runDeleteOldPartitions bool
    47  	if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
    48  		if err = conn.QueryRow(ctx, "SELECT $1::interval > '0'::interval", opts.Retention).Scan(&runDeleteOldPartitions); err != nil {
    49  			return err
    50  		}
    51  		l.Info("initialising measurements database...")
    52  		exists, err := db.DoesSchemaExist(ctx, conn, "admin")
    53  		if err != nil || exists {
    54  			return err
    55  		}
    56  		for _, sql := range metricSchemaSQLs {
    57  			if _, err = conn.Exec(ctx, sql); err != nil {
    58  				return err
    59  			}
    60  		}
    61  		return nil
    62  	}); err != nil {
    63  		return
    64  	}
    65  	if err = pgw.ReadMetricSchemaType(); err != nil {
    66  		return
    67  	}
    68  	if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
    69  		return
    70  	}
    71  	if runDeleteOldPartitions {
    72  		go pgw.deleteOldPartitions()
    73  	}
    74  	go pgw.maintainUniqueSources()
    75  	go pgw.poll()
    76  	l.Info(`measurements sink is activated`)
    77  	return
    78  }
    79  
    80  //go:embed sql/admin_schema.sql
    81  var sqlMetricAdminSchema string
    82  
    83  //go:embed sql/admin_functions.sql
    84  var sqlMetricAdminFunctions string
    85  
    86  //go:embed sql/ensure_partition_postgres.sql
    87  var sqlMetricEnsurePartitionPostgres string
    88  
    89  //go:embed sql/ensure_partition_timescale.sql
    90  var sqlMetricEnsurePartitionTimescale string
    91  
    92  //go:embed sql/change_chunk_interval.sql
    93  var sqlMetricChangeChunkIntervalTimescale string
    94  
    95  //go:embed sql/change_compression_interval.sql
    96  var sqlMetricChangeCompressionIntervalTimescale string
    97  
    98  var (
    99  	metricSchemaSQLs = []string{
   100  		sqlMetricAdminSchema,
   101  		sqlMetricAdminFunctions,
   102  		sqlMetricEnsurePartitionPostgres,
   103  		sqlMetricEnsurePartitionTimescale,
   104  		sqlMetricChangeChunkIntervalTimescale,
   105  		sqlMetricChangeCompressionIntervalTimescale,
   106  	}
   107  )
   108  
   109  // PostgresWriter is a sink that writes metric measurements to a Postgres database.
   110  // At the moment, it supports both Postgres and TimescaleDB as a storage backend.
   111  // However, one is able to use any Postgres-compatible database as a storage backend,
   112  // e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
   113  type PostgresWriter struct {
   114  	ctx          context.Context
   115  	sinkDb       db.PgxPoolIface
   116  	metricSchema DbStorageSchemaType
   117  	opts         *CmdOpts
   118  	input        chan metrics.MeasurementEnvelope
   119  	lastError    chan error
   120  }
   121  
   122  type ExistingPartitionInfo struct {
   123  	StartTime time.Time
   124  	EndTime   time.Time
   125  }
   126  
   127  type MeasurementMessagePostgres struct {
   128  	Time    time.Time
   129  	DBName  string
   130  	Metric  string
   131  	Data    map[string]any
   132  	TagData map[string]string
   133  }
   134  
   135  type DbStorageSchemaType int
   136  
   137  const (
   138  	DbStorageSchemaPostgres DbStorageSchemaType = iota
   139  	DbStorageSchemaTimescale
   140  )
   141  
   142  func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
   143  	var isTs bool
   144  	pgw.metricSchema = DbStorageSchemaPostgres
   145  	sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
   146  	if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
   147  		pgw.metricSchema = DbStorageSchemaTimescale
   148  	}
   149  	return
   150  }
   151  
   152  var (
   153  	forceRecreatePartitions  = false                                             // to signal override PG metrics storage cache
   154  	partitionMapMetric       = make(map[string]ExistingPartitionInfo)            // metric = min/max bounds
   155  	partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
   156  )
   157  
   158  // SyncMetric ensures that tables exist for newly added metrics and/or sources
   159  func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
   160  	if op == AddOp {
   161  		return errors.Join(
   162  			pgw.AddDBUniqueMetricToListingTable(sourceName, metricName),
   163  			pgw.EnsureMetricDummy(metricName), // ensure that there is at least an empty top-level table not to get ugly Grafana notifications
   164  		)
   165  	}
   166  	return nil
   167  }
   168  
   169  // EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
   170  func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
   171  	for _, name := range metrics.GetDefaultBuiltInMetrics() {
   172  		err = errors.Join(err, pgw.EnsureMetricDummy(name))
   173  	}
   174  	return
   175  }
   176  
   177  // EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
   178  func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
   179  	_, err = pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.ensure_dummy_metrics_table($1)", metric)
   180  	return
   181  }
   182  
   183  // Write sends the measurements to the cache channel
   184  func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error {
   185  	if pgw.ctx.Err() != nil {
   186  		return pgw.ctx.Err()
   187  	}
   188  	select {
   189  	case pgw.input <- msg:
   190  		// msgs sent
   191  	case <-time.After(highLoadTimeout):
   192  		// msgs dropped due to a huge load, check stdout or file for detailed log
   193  	}
   194  	select {
   195  	case err := <-pgw.lastError:
   196  		return err
   197  	default:
   198  		return nil
   199  	}
   200  }
   201  
   202  // poll is the main loop that reads from the input channel and flushes the data to the database
   203  func (pgw *PostgresWriter) poll() {
   204  	cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
   205  	cacheTimeout := pgw.opts.BatchingDelay
   206  	tick := time.NewTicker(cacheTimeout)
   207  	for {
   208  		select {
   209  		case <-pgw.ctx.Done(): //check context with high priority
   210  			return
   211  		default:
   212  			select {
   213  			case entry := <-pgw.input:
   214  				cache = append(cache, entry)
   215  				if len(cache) < cacheLimit {
   216  					break
   217  				}
   218  				tick.Stop()
   219  				pgw.flush(cache)
   220  				cache = cache[:0]
   221  				tick = time.NewTicker(cacheTimeout)
   222  			case <-tick.C:
   223  				pgw.flush(cache)
   224  				cache = cache[:0]
   225  			case <-pgw.ctx.Done():
   226  				return
   227  			}
   228  		}
   229  	}
   230  }
   231  
   232  func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements {
   233  	return &copyFromMeasurements{envelopes: rows, envelopeIdx: -1, measurementIdx: -1}
   234  }
   235  
   236  type copyFromMeasurements struct {
   237  	envelopes      []metrics.MeasurementEnvelope
   238  	envelopeIdx    int
   239  	measurementIdx int // index of the current measurement in the envelope
   240  	metricName     string
   241  }
   242  
   243  func (c *copyFromMeasurements) NextEnvelope() bool {
   244  	c.envelopeIdx++
   245  	c.measurementIdx = -1
   246  	return c.envelopeIdx < len(c.envelopes)
   247  }
   248  
   249  func (c *copyFromMeasurements) Next() bool {
   250  	for {
   251  		// Check if we need to advance to the next envelope
   252  		if c.envelopeIdx < 0 || c.measurementIdx+1 >= len(c.envelopes[c.envelopeIdx].Data) {
   253  			// Advance to next envelope
   254  			if ok := c.NextEnvelope(); !ok {
   255  				return false // No more envelopes
   256  			}
   257  			// Set metric name from first envelope, or detect metric boundary
   258  			if c.metricName == "" {
   259  				c.metricName = c.envelopes[c.envelopeIdx].MetricName
   260  			} else if c.metricName != c.envelopes[c.envelopeIdx].MetricName {
   261  				// We've hit a different metric - we're done with current metric
   262  				// Reset position to process this envelope on next call
   263  				c.envelopeIdx--
   264  				c.measurementIdx = len(c.envelopes[c.envelopeIdx].Data) // Set to length so we've "finished" this envelope
   265  				c.metricName = ""                                       // Reset for next metric
   266  				return false
   267  			}
   268  		}
   269  
   270  		// Advance to next measurement in current envelope
   271  		c.measurementIdx++
   272  		if c.measurementIdx < len(c.envelopes[c.envelopeIdx].Data) {
   273  			return true // Found valid measurement
   274  		}
   275  		// If we reach here, we've exhausted current envelope, loop will advance to next envelope
   276  	}
   277  }
   278  
   279  func (c *copyFromMeasurements) EOF() bool {
   280  	return c.envelopeIdx >= len(c.envelopes)
   281  }
   282  
   283  func (c *copyFromMeasurements) Values() ([]any, error) {
   284  	row := maps.Clone(c.envelopes[c.envelopeIdx].Data[c.measurementIdx])
   285  	tagRow := maps.Clone(c.envelopes[c.envelopeIdx].CustomTags)
   286  	if tagRow == nil {
   287  		tagRow = make(map[string]string)
   288  	}
   289  	for k, v := range row {
   290  		if after, ok := strings.CutPrefix(k, metrics.TagPrefix); ok {
   291  			tagRow[after] = fmt.Sprintf("%v", v)
   292  			delete(row, k)
   293  		}
   294  	}
   295  	jsonTags, terr := jsoniter.ConfigFastest.MarshalToString(tagRow)
   296  	json, err := jsoniter.ConfigFastest.MarshalToString(row)
   297  	if err != nil || terr != nil {
   298  		return nil, errors.Join(err, terr)
   299  	}
   300  	return []any{time.Unix(0, c.envelopes[c.envelopeIdx].Data.GetEpoch()), c.envelopes[c.envelopeIdx].DBName, json, jsonTags}, nil
   301  }
   302  
   303  func (c *copyFromMeasurements) Err() error {
   304  	return nil
   305  }
   306  
   307  func (c *copyFromMeasurements) MetricName() (ident pgx.Identifier) {
   308  	if c.envelopeIdx+1 < len(c.envelopes) {
   309  		// Metric name is taken from the next envelope
   310  		ident = pgx.Identifier{c.envelopes[c.envelopeIdx+1].MetricName}
   311  	}
   312  	return
   313  }
   314  
   315  // flush sends the cached measurements to the database
   316  func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
   317  	if len(msgs) == 0 {
   318  		return
   319  	}
   320  	logger := log.GetLogger(pgw.ctx)
   321  	// metricsToStorePerMetric := make(map[string][]MeasurementMessagePostgres)
   322  	pgPartBounds := make(map[string]ExistingPartitionInfo)                  // metric=min/max
   323  	pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo) // metric=[dbname=min/max]
   324  	var err error
   325  
   326  	slices.SortFunc(msgs, func(a, b metrics.MeasurementEnvelope) int {
   327  		if a.MetricName < b.MetricName {
   328  			return -1
   329  		} else if a.MetricName > b.MetricName {
   330  			return 1
   331  		}
   332  		return 0
   333  	})
   334  
   335  	for _, msg := range msgs {
   336  		for _, dataRow := range msg.Data {
   337  			epochTime := time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
   338  			switch pgw.metricSchema {
   339  			case DbStorageSchemaTimescale:
   340  				// set min/max timestamps to check/create partitions
   341  				bounds, ok := pgPartBounds[msg.MetricName]
   342  				if !ok || (ok && epochTime.Before(bounds.StartTime)) {
   343  					bounds.StartTime = epochTime
   344  					pgPartBounds[msg.MetricName] = bounds
   345  				}
   346  				if !ok || (ok && epochTime.After(bounds.EndTime)) {
   347  					bounds.EndTime = epochTime
   348  					pgPartBounds[msg.MetricName] = bounds
   349  				}
   350  			case DbStorageSchemaPostgres:
   351  				_, ok := pgPartBoundsDbName[msg.MetricName]
   352  				if !ok {
   353  					pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
   354  				}
   355  				bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
   356  				if !ok || (ok && epochTime.Before(bounds.StartTime)) {
   357  					bounds.StartTime = epochTime
   358  					pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
   359  				}
   360  				if !ok || (ok && epochTime.After(bounds.EndTime)) {
   361  					bounds.EndTime = epochTime
   362  					pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
   363  				}
   364  			default:
   365  				logger.Fatal("unknown storage schema...")
   366  			}
   367  		}
   368  	}
   369  
   370  	switch pgw.metricSchema {
   371  	case DbStorageSchemaPostgres:
   372  		err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
   373  	case DbStorageSchemaTimescale:
   374  		err = pgw.EnsureMetricTimescale(pgPartBounds)
   375  	default:
   376  		logger.Fatal("unknown storage schema...")
   377  	}
   378  	forceRecreatePartitions = false
   379  	if err != nil {
   380  		pgw.lastError <- err
   381  	}
   382  
   383  	var rowsBatched, n int64
   384  	t1 := time.Now()
   385  	cfm := newCopyFromMeasurements(msgs)
   386  	for !cfm.EOF() {
   387  		n, err = pgw.sinkDb.CopyFrom(context.Background(), cfm.MetricName(), targetColumns[:], cfm)
   388  		rowsBatched += n
   389  		if err != nil {
   390  			logger.Error(err)
   391  			if PgError, ok := err.(*pgconn.PgError); ok {
   392  				forceRecreatePartitions = PgError.Code == "23514"
   393  			}
   394  			if forceRecreatePartitions {
   395  				logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
   396  			}
   397  		}
   398  	}
   399  	diff := time.Since(t1)
   400  	if err == nil {
   401  		logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
   402  		return
   403  	}
   404  	pgw.lastError <- err
   405  }
   406  
   407  func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error) {
   408  	logger := log.GetLogger(pgw.ctx)
   409  	sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
   410  	for metric := range pgPartBounds {
   411  		if _, ok := partitionMapMetric[metric]; !ok {
   412  			if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
   413  				logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
   414  				return err
   415  			}
   416  			partitionMapMetric[metric] = ExistingPartitionInfo{}
   417  		}
   418  	}
   419  	return
   420  }
   421  
   422  func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
   423  	var rows pgx.Rows
   424  	sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
   425  	for metric, dbnameTimestampMap := range metricDbnamePartBounds {
   426  		_, ok := partitionMapMetricDbname[metric]
   427  		if !ok {
   428  			partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
   429  		}
   430  
   431  		for dbname, pb := range dbnameTimestampMap {
   432  			if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
   433  				return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
   434  			}
   435  			partInfo, ok := partitionMapMetricDbname[metric][dbname]
   436  			if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
   437  				if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
   438  					return
   439  				}
   440  				if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
   441  					return err
   442  				}
   443  				partitionMapMetricDbname[metric][dbname] = partInfo
   444  			}
   445  			if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
   446  				if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
   447  					return
   448  				}
   449  				if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
   450  					return err
   451  				}
   452  				partitionMapMetricDbname[metric][dbname] = partInfo
   453  			}
   454  		}
   455  	}
   456  	return nil
   457  }
   458  
   459  // deleteOldPartitions is a background task that deletes old partitions from the measurements DB
   460  func (pgw *PostgresWriter) deleteOldPartitions() {
   461  	l := log.GetLogger(pgw.ctx)
   462  	for {
   463  		var partsDropped int
   464  		err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
   465  			pgw.opts.Retention).Scan(&partsDropped)
   466  		if err != nil {
   467  			l.Error("Could not drop old time partitions:", err)
   468  		} else if partsDropped > 0 {
   469  			l.Infof("Dropped %d old time partitions", partsDropped)
   470  		}
   471  		select {
   472  		case <-pgw.ctx.Done():
   473  			return
   474  		case <-time.After(time.Hour * 12):
   475  		}
   476  	}
   477  }
   478  
   479  // maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
   480  // This is used to avoid listing the same source multiple times in Grafana dropdowns.
   481  func (pgw *PostgresWriter) maintainUniqueSources() {
   482  	logger := log.GetLogger(pgw.ctx)
   483  	// due to metrics deletion the listing can go out of sync (a trigger not really wanted)
   484  	sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
   485  	sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
   486  	sqlDistinct := `
   487  	WITH RECURSIVE t(dbname) AS (
   488  		SELECT MIN(dbname) AS dbname FROM %s
   489  		UNION
   490  		SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
   491  	SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
   492  	sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
   493  	sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
   494  	sqlAdd := `
   495  		INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
   496  		WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
   497  		RETURNING *`
   498  
   499  	for {
   500  		select {
   501  		case <-pgw.ctx.Done():
   502  			return
   503  		case <-time.After(time.Hour * 24):
   504  		}
   505  		var lock bool
   506  		logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
   507  		if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
   508  			logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
   509  			continue
   510  		}
   511  		if !lock {
   512  			logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
   513  			continue
   514  		}
   515  
   516  		logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
   517  		rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
   518  		allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
   519  		if err != nil {
   520  			logger.Error(err)
   521  			continue
   522  		}
   523  
   524  		for _, tableName := range allDistinctMetricTables {
   525  			foundDbnamesMap := make(map[string]bool)
   526  			foundDbnamesArr := make([]string, 0)
   527  			metricName := strings.Replace(tableName, "public.", "", 1)
   528  
   529  			logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
   530  			rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
   531  			ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
   532  			// ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
   533  			if err != nil {
   534  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
   535  				break
   536  			}
   537  			for _, drDbname := range ret {
   538  				foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
   539  			}
   540  
   541  			// delete all that are not known and add all that are not there
   542  			for k := range foundDbnamesMap {
   543  				foundDbnamesArr = append(foundDbnamesArr, k)
   544  			}
   545  			if len(foundDbnamesArr) == 0 { // delete all entries for given metric
   546  				logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
   547  
   548  				_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
   549  				if err != nil {
   550  					logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
   551  				}
   552  				continue
   553  			}
   554  			cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
   555  			if err != nil {
   556  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   557  			} else if cmdTag.RowsAffected() > 0 {
   558  				logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
   559  			}
   560  			cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
   561  			if err != nil {
   562  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   563  			} else if cmdTag.RowsAffected() > 0 {
   564  				logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
   565  			}
   566  			time.Sleep(time.Minute)
   567  		}
   568  
   569  	}
   570  }
   571  
   572  func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
   573  	sql := `INSERT INTO admin.all_distinct_dbname_metrics
   574  			SELECT $1, $2
   575  			WHERE NOT EXISTS (
   576  				SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = $1 AND metric = $2
   577  			)`
   578  	_, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
   579  	return err
   580  }
   581