...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/postgres.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	_ "embed"
     6  	"encoding/json"
     7  	"errors"
     8  	"fmt"
     9  	"strings"
    10  	"time"
    11  
    12  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    13  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    15  	"github.com/jackc/pgx/v5"
    16  	"github.com/jackc/pgx/v5/pgconn"
    17  )
    18  
    19  var (
    20  	cacheLimit      = 512
    21  	highLoadTimeout = time.Second * 5
    22  	deleterDelay    = time.Hour
    23  )
    24  
    25  func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) {
    26  	var conn db.PgxPoolIface
    27  	if conn, err = db.New(ctx, connstr); err != nil {
    28  		return
    29  	}
    30  	return NewWriterFromPostgresConn(ctx, conn, opts, metricDefs)
    31  }
    32  
    33  func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) {
    34  	l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
    35  	ctx = log.WithLogger(ctx, l)
    36  	pgw = &PostgresWriter{
    37  		ctx:        ctx,
    38  		metricDefs: metricDefs,
    39  		opts:       opts,
    40  		input:      make(chan []metrics.MeasurementEnvelope, cacheLimit),
    41  		lastError:  make(chan error),
    42  		sinkDb:     conn,
    43  	}
    44  	if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
    45  		l.Info("initialising measurements database...")
    46  		exists, err := db.DoesSchemaExist(ctx, conn, "admin")
    47  		if err != nil || exists {
    48  			return err
    49  		}
    50  		for _, sql := range metricSchemaSQLs {
    51  			if _, err = conn.Exec(ctx, sql); err != nil {
    52  				return err
    53  			}
    54  		}
    55  		return nil
    56  	}); err != nil {
    57  		return
    58  	}
    59  	if err = pgw.ReadMetricSchemaType(); err != nil {
    60  		return
    61  	}
    62  	if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
    63  		return
    64  	}
    65  	go pgw.deleteOldPartitions(deleterDelay)
    66  	go pgw.maintainUniqueSources()
    67  	go pgw.poll()
    68  	l.Info(`measurements sink is activated`)
    69  	return
    70  }
    71  
    72  //go:embed sql/admin_schema.sql
    73  var sqlMetricAdminSchema string
    74  
    75  //go:embed sql/admin_functions.sql
    76  var sqlMetricAdminFunctions string
    77  
    78  //go:embed sql/ensure_partition_postgres.sql
    79  var sqlMetricEnsurePartitionPostgres string
    80  
    81  //go:embed sql/ensure_partition_timescale.sql
    82  var sqlMetricEnsurePartitionTimescale string
    83  
    84  //go:embed sql/change_chunk_interval.sql
    85  var sqlMetricChangeChunkIntervalTimescale string
    86  
    87  //go:embed sql/change_compression_interval.sql
    88  var sqlMetricChangeCompressionIntervalTimescale string
    89  
    90  var (
    91  	metricSchemaSQLs = []string{
    92  		sqlMetricAdminSchema,
    93  		sqlMetricAdminFunctions,
    94  		sqlMetricEnsurePartitionPostgres,
    95  		sqlMetricEnsurePartitionTimescale,
    96  		sqlMetricChangeChunkIntervalTimescale,
    97  		sqlMetricChangeCompressionIntervalTimescale,
    98  	}
    99  )
   100  
   101  // PostgresWriter is a sink that writes metric measurements to a Postgres database.
   102  // At the moment, it supports both Postgres and TimescaleDB as a storage backend.
   103  // However, one is able to use any Postgres-compatible database as a storage backend,
   104  // e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
   105  type PostgresWriter struct {
   106  	ctx          context.Context
   107  	sinkDb       db.PgxPoolIface
   108  	metricSchema DbStorageSchemaType
   109  	metricDefs   *metrics.Metrics
   110  	opts         *CmdOpts
   111  	input        chan []metrics.MeasurementEnvelope
   112  	lastError    chan error
   113  }
   114  
   115  type ExistingPartitionInfo struct {
   116  	StartTime time.Time
   117  	EndTime   time.Time
   118  }
   119  
   120  type MeasurementMessagePostgres struct {
   121  	Time    time.Time
   122  	DBName  string
   123  	Metric  string
   124  	Data    map[string]any
   125  	TagData map[string]any
   126  }
   127  
   128  type DbStorageSchemaType int
   129  
   130  const (
   131  	DbStorageSchemaPostgres DbStorageSchemaType = iota
   132  	DbStorageSchemaTimescale
   133  )
   134  
   135  func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
   136  	var isTs bool
   137  	pgw.metricSchema = DbStorageSchemaPostgres
   138  	sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
   139  	if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
   140  		pgw.metricSchema = DbStorageSchemaTimescale
   141  	}
   142  	return
   143  }
   144  
   145  var (
   146  	forceRecreatePartitions  = false                                             // to signal override PG metrics storage cache
   147  	partitionMapMetric       = make(map[string]ExistingPartitionInfo)            // metric = min/max bounds
   148  	partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
   149  )
   150  
   151  // SyncMetric ensures that tables exist for newly added metrics and/or sources
   152  func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error {
   153  	if op == "add" {
   154  		return errors.Join(
   155  			pgw.AddDBUniqueMetricToListingTable(dbUnique, metricName),
   156  			pgw.EnsureMetricDummy(metricName), // ensure that there is at least an empty top-level table not to get ugly Grafana notifications
   157  		)
   158  	}
   159  	return nil
   160  }
   161  
   162  // EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
   163  func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
   164  	for _, name := range metrics.GetDefaultBuiltInMetrics() {
   165  		err = errors.Join(err, pgw.EnsureMetricDummy(name))
   166  	}
   167  	return
   168  }
   169  
   170  // EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
   171  func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
   172  	_, err = pgw.sinkDb.Exec(pgw.ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
   173  	return
   174  }
   175  
   176  // Write sends the measurements to the cache channel
   177  func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementEnvelope) error {
   178  	if pgw.ctx.Err() != nil {
   179  		return pgw.ctx.Err()
   180  	}
   181  	select {
   182  	case pgw.input <- msgs:
   183  		// msgs sent
   184  	case <-time.After(highLoadTimeout):
   185  		// msgs dropped due to a huge load, check stdout or file for detailed log
   186  	}
   187  	select {
   188  	case err := <-pgw.lastError:
   189  		return err
   190  	default:
   191  		return nil
   192  	}
   193  }
   194  
   195  // poll is the main loop that reads from the input channel and flushes the data to the database
   196  func (pgw *PostgresWriter) poll() {
   197  	cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
   198  	cacheTimeout := pgw.opts.BatchingDelay
   199  	tick := time.NewTicker(cacheTimeout)
   200  	for {
   201  		select {
   202  		case <-pgw.ctx.Done(): //check context with high priority
   203  			return
   204  		default:
   205  			select {
   206  			case entry := <-pgw.input:
   207  				cache = append(cache, entry...)
   208  				if len(cache) < cacheLimit {
   209  					break
   210  				}
   211  				tick.Stop()
   212  				pgw.flush(cache)
   213  				cache = cache[:0]
   214  				tick = time.NewTicker(cacheTimeout)
   215  			case <-tick.C:
   216  				pgw.flush(cache)
   217  				cache = cache[:0]
   218  			case <-pgw.ctx.Done():
   219  				return
   220  			}
   221  		}
   222  	}
   223  }
   224  
   225  // flush sends the cached measurements to the database
   226  func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
   227  	if len(msgs) == 0 {
   228  		return
   229  	}
   230  	logger := log.GetLogger(pgw.ctx)
   231  	metricsToStorePerMetric := make(map[string][]MeasurementMessagePostgres)
   232  	rowsBatched := 0
   233  	totalRows := 0
   234  	pgPartBounds := make(map[string]ExistingPartitionInfo)                  // metric=min/max
   235  	pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo) // metric=[dbname=min/max]
   236  	var err error
   237  
   238  	for _, msg := range msgs {
   239  		if len(msg.Data) == 0 {
   240  			continue
   241  		}
   242  		logger.WithField("data", msg.Data).WithField("len", len(msg.Data)).Debug("sending to postgres")
   243  
   244  		for _, dataRow := range msg.Data {
   245  			var epochTime time.Time
   246  
   247  			tags := make(map[string]any)
   248  			fields := make(map[string]any)
   249  
   250  			totalRows++
   251  
   252  			if msg.CustomTags != nil {
   253  				for k, v := range msg.CustomTags {
   254  					tags[k] = fmt.Sprintf("%v", v)
   255  				}
   256  			}
   257  			epochTime = time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
   258  			for k, v := range dataRow {
   259  				if v == nil || v == "" || k == metrics.EpochColumnName {
   260  					continue // not storing NULLs
   261  				}
   262  				if strings.HasPrefix(k, metrics.TagPrefix) {
   263  					tag := k[4:]
   264  					tags[tag] = fmt.Sprintf("%v", v)
   265  				} else {
   266  					fields[k] = v
   267  				}
   268  			}
   269  
   270  			var metricsArr []MeasurementMessagePostgres
   271  			var ok bool
   272  
   273  			metricNameTemp := msg.MetricName
   274  
   275  			metricsArr, ok = metricsToStorePerMetric[metricNameTemp]
   276  			if !ok {
   277  				metricsToStorePerMetric[metricNameTemp] = make([]MeasurementMessagePostgres, 0)
   278  			}
   279  			metricsArr = append(metricsArr, MeasurementMessagePostgres{Time: epochTime, DBName: msg.DBName,
   280  				Metric: msg.MetricName, Data: fields, TagData: tags})
   281  			metricsToStorePerMetric[metricNameTemp] = metricsArr
   282  
   283  			rowsBatched++
   284  
   285  			switch pgw.metricSchema {
   286  			case DbStorageSchemaTimescale:
   287  				// set min/max timestamps to check/create partitions
   288  				bounds, ok := pgPartBounds[msg.MetricName]
   289  				if !ok || (ok && epochTime.Before(bounds.StartTime)) {
   290  					bounds.StartTime = epochTime
   291  					pgPartBounds[msg.MetricName] = bounds
   292  				}
   293  				if !ok || (ok && epochTime.After(bounds.EndTime)) {
   294  					bounds.EndTime = epochTime
   295  					pgPartBounds[msg.MetricName] = bounds
   296  				}
   297  			case DbStorageSchemaPostgres:
   298  				_, ok := pgPartBoundsDbName[msg.MetricName]
   299  				if !ok {
   300  					pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
   301  				}
   302  				bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
   303  				if !ok || (ok && epochTime.Before(bounds.StartTime)) {
   304  					bounds.StartTime = epochTime
   305  					pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
   306  				}
   307  				if !ok || (ok && epochTime.After(bounds.EndTime)) {
   308  					bounds.EndTime = epochTime
   309  					pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
   310  				}
   311  			default:
   312  				logger.Fatal("unknown storage schema...")
   313  			}
   314  		}
   315  	}
   316  
   317  	switch pgw.metricSchema {
   318  	case DbStorageSchemaPostgres:
   319  		err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
   320  	case DbStorageSchemaTimescale:
   321  		err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePartitions)
   322  	default:
   323  		logger.Fatal("unknown storage schema...")
   324  	}
   325  	if forceRecreatePartitions {
   326  		forceRecreatePartitions = false
   327  	}
   328  	if err != nil {
   329  		pgw.lastError <- err
   330  	}
   331  
   332  	// send data to PG, with a separate COPY for all metrics
   333  	logger.Debugf("COPY-ing %d metrics to Postgres metricsDB...", rowsBatched)
   334  	t1 := time.Now()
   335  
   336  	for metricName, metrics := range metricsToStorePerMetric {
   337  
   338  		getTargetTable := func() pgx.Identifier {
   339  			return pgx.Identifier{metricName}
   340  		}
   341  
   342  		getTargetColumns := func() []string {
   343  			return []string{"time", "dbname", "data", "tag_data"}
   344  		}
   345  
   346  		for _, m := range metrics {
   347  			l := logger.WithField("db", m.DBName).WithField("metric", m.Metric)
   348  			jsonBytes, err := json.Marshal(m.Data)
   349  			if err != nil {
   350  				logger.Errorf("Skipping 1 metric for [%s:%s] due to JSON conversion error: %s", m.DBName, m.Metric, err)
   351  				continue
   352  			}
   353  
   354  			getTagData := func() any {
   355  				if len(m.TagData) > 0 {
   356  					jsonBytesTags, err := json.Marshal(m.TagData)
   357  					if err != nil {
   358  						l.Error(err)
   359  						return nil
   360  					}
   361  					return string(jsonBytesTags)
   362  				}
   363  				return nil
   364  			}
   365  
   366  			rows := [][]any{{m.Time, m.DBName, string(jsonBytes), getTagData()}}
   367  
   368  			if _, err = pgw.sinkDb.CopyFrom(context.Background(), getTargetTable(), getTargetColumns(), pgx.CopyFromRows(rows)); err != nil {
   369  				l.Error(err)
   370  				if PgError, ok := err.(*pgconn.PgError); ok {
   371  					forceRecreatePartitions = PgError.Code == "23514"
   372  				}
   373  				if forceRecreatePartitions {
   374  					logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
   375  				}
   376  			}
   377  		}
   378  	}
   379  
   380  	diff := time.Since(t1)
   381  	if err == nil {
   382  		logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
   383  		return
   384  	}
   385  	pgw.lastError <- err
   386  }
   387  
   388  // EnsureMetricTime creates special partitions if Timescale used for realtime metrics
   389  func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
   390  	logger := log.GetLogger(pgw.ctx)
   391  	sqlEnsure := `select part_available_from, part_available_to from admin.ensure_partition_metric_time($1, $2)`
   392  	for metric, pb := range pgPartBounds {
   393  		if !strings.HasSuffix(metric, "_realtime") {
   394  			continue
   395  		}
   396  		if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
   397  			return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
   398  		}
   399  
   400  		partInfo, ok := partitionMapMetric[metric]
   401  		if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
   402  			err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).
   403  				Scan(&partInfo.StartTime, &partInfo.EndTime)
   404  			if err != nil {
   405  				logger.Error("Failed to create partition on 'metrics': ", err)
   406  				return err
   407  			}
   408  			partitionMapMetric[metric] = partInfo
   409  		}
   410  		if pb.EndTime.After(partInfo.EndTime) || force {
   411  			err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(nil, &partInfo.EndTime)
   412  			if err != nil {
   413  				logger.Error("Failed to create partition on 'metrics': ", err)
   414  				return err
   415  			}
   416  			partitionMapMetric[metric] = partInfo
   417  		}
   418  	}
   419  	return nil
   420  }
   421  
   422  func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
   423  	logger := log.GetLogger(pgw.ctx)
   424  	sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
   425  	for metric := range pgPartBounds {
   426  		if strings.HasSuffix(metric, "_realtime") {
   427  			continue
   428  		}
   429  		if _, ok := partitionMapMetric[metric]; !ok {
   430  			if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
   431  				logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
   432  				return err
   433  			}
   434  			partitionMapMetric[metric] = ExistingPartitionInfo{}
   435  		}
   436  	}
   437  	return pgw.EnsureMetricTime(pgPartBounds, force)
   438  }
   439  
   440  func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
   441  	var rows pgx.Rows
   442  	sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
   443  	for metric, dbnameTimestampMap := range metricDbnamePartBounds {
   444  		_, ok := partitionMapMetricDbname[metric]
   445  		if !ok {
   446  			partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
   447  		}
   448  
   449  		for dbname, pb := range dbnameTimestampMap {
   450  			if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
   451  				return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
   452  			}
   453  			partInfo, ok := partitionMapMetricDbname[metric][dbname]
   454  			if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
   455  				if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
   456  					return
   457  				}
   458  				if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
   459  					return err
   460  				}
   461  				partitionMapMetricDbname[metric][dbname] = partInfo
   462  			}
   463  			if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
   464  				if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
   465  					return
   466  				}
   467  				if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
   468  					return err
   469  				}
   470  				partitionMapMetricDbname[metric][dbname] = partInfo
   471  			}
   472  		}
   473  	}
   474  	return nil
   475  }
   476  
   477  // deleteOldPartitions is a background task that deletes old partitions from the measurements DB
   478  func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
   479  	metricAgeDaysThreshold := pgw.opts.Retention
   480  	if metricAgeDaysThreshold <= 0 {
   481  		return
   482  	}
   483  	logger := log.GetLogger(pgw.ctx)
   484  	select {
   485  	case <-pgw.ctx.Done():
   486  		return
   487  	case <-time.After(delay):
   488  		// to reduce distracting log messages at startup
   489  	}
   490  
   491  	for {
   492  		if pgw.metricSchema == DbStorageSchemaTimescale {
   493  			partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
   494  			if err != nil {
   495  				logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
   496  				continue
   497  			}
   498  			logger.Infof("Dropped %d old metric partitions...", partsDropped)
   499  		} else if pgw.metricSchema == DbStorageSchemaPostgres {
   500  			partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
   501  			if err != nil {
   502  				logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
   503  				time.Sleep(time.Second * 300)
   504  				continue
   505  			}
   506  			if len(partsToDrop) > 0 {
   507  				logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
   508  				for _, toDrop := range partsToDrop {
   509  					sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
   510  					logger.Debugf("Dropping old metric data partition: %s", toDrop)
   511  
   512  					if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlDropTable); err != nil {
   513  						logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
   514  						time.Sleep(time.Second * 300)
   515  					} else {
   516  						time.Sleep(time.Second * 5)
   517  					}
   518  				}
   519  			} else {
   520  				logger.Infof("No old metric partitions found to drop...")
   521  			}
   522  		}
   523  		select {
   524  		case <-pgw.ctx.Done():
   525  			return
   526  		case <-time.After(time.Hour * 12):
   527  		}
   528  	}
   529  }
   530  
   531  // maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
   532  // This is used to avoid listing the same source multiple times in Grafana dropdowns.
   533  func (pgw *PostgresWriter) maintainUniqueSources() {
   534  	logger := log.GetLogger(pgw.ctx)
   535  	// due to metrics deletion the listing can go out of sync (a trigger not really wanted)
   536  	sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
   537  	sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
   538  	sqlDistinct := `
   539  	WITH RECURSIVE t(dbname) AS (
   540  		SELECT MIN(dbname) AS dbname FROM %s
   541  		UNION
   542  		SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
   543  	SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
   544  	sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
   545  	sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
   546  	sqlAdd := `
   547  		INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
   548  		WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
   549  		RETURNING *`
   550  
   551  	for {
   552  		select {
   553  		case <-pgw.ctx.Done():
   554  			return
   555  		case <-time.After(time.Hour * 24):
   556  		}
   557  		var lock bool
   558  		logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
   559  		if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
   560  			logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
   561  			continue
   562  		}
   563  		if !lock {
   564  			logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
   565  			continue
   566  		}
   567  
   568  		logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
   569  		rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
   570  		allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
   571  		if err != nil {
   572  			logger.Error(err)
   573  			continue
   574  		}
   575  
   576  		for _, tableName := range allDistinctMetricTables {
   577  			foundDbnamesMap := make(map[string]bool)
   578  			foundDbnamesArr := make([]string, 0)
   579  			metricName := strings.Replace(tableName, "public.", "", 1)
   580  
   581  			logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
   582  			rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
   583  			ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
   584  			// ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
   585  			if err != nil {
   586  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
   587  				break
   588  			}
   589  			for _, drDbname := range ret {
   590  				foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
   591  			}
   592  
   593  			// delete all that are not known and add all that are not there
   594  			for k := range foundDbnamesMap {
   595  				foundDbnamesArr = append(foundDbnamesArr, k)
   596  			}
   597  			if len(foundDbnamesArr) == 0 { // delete all entries for given metric
   598  				logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
   599  
   600  				_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
   601  				if err != nil {
   602  					logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
   603  				}
   604  				continue
   605  			}
   606  			cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
   607  			if err != nil {
   608  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   609  			} else if cmdTag.RowsAffected() > 0 {
   610  				logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
   611  			}
   612  			cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
   613  			if err != nil {
   614  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   615  			} else if cmdTag.RowsAffected() > 0 {
   616  				logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
   617  			}
   618  			time.Sleep(time.Minute)
   619  		}
   620  
   621  	}
   622  }
   623  
   624  func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
   625  	sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
   626  	err = pgw.sinkDb.QueryRow(pgw.ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
   627  	return
   628  }
   629  
   630  func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
   631  	sqlGetOldParts := `select admin.get_old_time_partitions($1)`
   632  	rows, err := pgw.sinkDb.Query(pgw.ctx, sqlGetOldParts, metricAgeDaysThreshold)
   633  	if err == nil {
   634  		return pgx.CollectRows(rows, pgx.RowTo[string])
   635  	}
   636  	return nil, err
   637  }
   638  
   639  func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
   640  	sql := `insert into admin.all_distinct_dbname_metrics
   641  			select $1, $2
   642  			where not exists (
   643  				select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
   644  			)`
   645  	_, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
   646  	return err
   647  }
   648