...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/postgres.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	_ "embed"
     6  	"errors"
     7  	"fmt"
     8  	"maps"
     9  	"slices"
    10  	"strings"
    11  	"time"
    12  
    13  	jsoniter "github.com/json-iterator/go"
    14  
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    17  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    18  	"github.com/jackc/pgx/v5"
    19  	"github.com/jackc/pgx/v5/pgconn"
    20  )
    21  
    22  var (
    23  	cacheLimit      = 256
    24  	highLoadTimeout = time.Second * 5
    25  	deleterDelay    = time.Hour
    26  	targetColumns   = [...]string{"time", "dbname", "data", "tag_data"}
    27  )
    28  
    29  func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
    30  	var conn db.PgxPoolIface
    31  	if conn, err = db.New(ctx, connstr); err != nil {
    32  		return
    33  	}
    34  	return NewWriterFromPostgresConn(ctx, conn, opts)
    35  }
    36  
    37  func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error) {
    38  	l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
    39  	ctx = log.WithLogger(ctx, l)
    40  	pgw = &PostgresWriter{
    41  		ctx:       ctx,
    42  		opts:      opts,
    43  		input:     make(chan metrics.MeasurementEnvelope, cacheLimit),
    44  		lastError: make(chan error),
    45  		sinkDb:    conn,
    46  	}
    47  	if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
    48  		l.Info("initialising measurements database...")
    49  		exists, err := db.DoesSchemaExist(ctx, conn, "admin")
    50  		if err != nil || exists {
    51  			return err
    52  		}
    53  		for _, sql := range metricSchemaSQLs {
    54  			if _, err = conn.Exec(ctx, sql); err != nil {
    55  				return err
    56  			}
    57  		}
    58  		return nil
    59  	}); err != nil {
    60  		return
    61  	}
    62  	if err = pgw.ReadMetricSchemaType(); err != nil {
    63  		return
    64  	}
    65  	if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
    66  		return
    67  	}
    68  	go pgw.deleteOldPartitions(deleterDelay)
    69  	go pgw.maintainUniqueSources()
    70  	go pgw.poll()
    71  	l.Info(`measurements sink is activated`)
    72  	return
    73  }
    74  
    75  //go:embed sql/admin_schema.sql
    76  var sqlMetricAdminSchema string
    77  
    78  //go:embed sql/admin_functions.sql
    79  var sqlMetricAdminFunctions string
    80  
    81  //go:embed sql/ensure_partition_postgres.sql
    82  var sqlMetricEnsurePartitionPostgres string
    83  
    84  //go:embed sql/ensure_partition_timescale.sql
    85  var sqlMetricEnsurePartitionTimescale string
    86  
    87  //go:embed sql/change_chunk_interval.sql
    88  var sqlMetricChangeChunkIntervalTimescale string
    89  
    90  //go:embed sql/change_compression_interval.sql
    91  var sqlMetricChangeCompressionIntervalTimescale string
    92  
    93  var (
    94  	metricSchemaSQLs = []string{
    95  		sqlMetricAdminSchema,
    96  		sqlMetricAdminFunctions,
    97  		sqlMetricEnsurePartitionPostgres,
    98  		sqlMetricEnsurePartitionTimescale,
    99  		sqlMetricChangeChunkIntervalTimescale,
   100  		sqlMetricChangeCompressionIntervalTimescale,
   101  	}
   102  )
   103  
   104  // PostgresWriter is a sink that writes metric measurements to a Postgres database.
   105  // At the moment, it supports both Postgres and TimescaleDB as a storage backend.
   106  // However, one is able to use any Postgres-compatible database as a storage backend,
   107  // e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
   108  type PostgresWriter struct {
   109  	ctx          context.Context
   110  	sinkDb       db.PgxPoolIface
   111  	metricSchema DbStorageSchemaType
   112  	opts         *CmdOpts
   113  	input        chan metrics.MeasurementEnvelope
   114  	lastError    chan error
   115  }
   116  
   117  type ExistingPartitionInfo struct {
   118  	StartTime time.Time
   119  	EndTime   time.Time
   120  }
   121  
   122  type MeasurementMessagePostgres struct {
   123  	Time    time.Time
   124  	DBName  string
   125  	Metric  string
   126  	Data    map[string]any
   127  	TagData map[string]string
   128  }
   129  
   130  type DbStorageSchemaType int
   131  
   132  const (
   133  	DbStorageSchemaPostgres DbStorageSchemaType = iota
   134  	DbStorageSchemaTimescale
   135  )
   136  
   137  func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
   138  	var isTs bool
   139  	pgw.metricSchema = DbStorageSchemaPostgres
   140  	sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
   141  	if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
   142  		pgw.metricSchema = DbStorageSchemaTimescale
   143  	}
   144  	return
   145  }
   146  
   147  var (
   148  	forceRecreatePartitions  = false                                             // to signal override PG metrics storage cache
   149  	partitionMapMetric       = make(map[string]ExistingPartitionInfo)            // metric = min/max bounds
   150  	partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
   151  )
   152  
   153  // SyncMetric ensures that tables exist for newly added metrics and/or sources
   154  func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
   155  	if op == AddOp {
   156  		return errors.Join(
   157  			pgw.AddDBUniqueMetricToListingTable(sourceName, metricName),
   158  			pgw.EnsureMetricDummy(metricName), // ensure that there is at least an empty top-level table not to get ugly Grafana notifications
   159  		)
   160  	}
   161  	return nil
   162  }
   163  
   164  // EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
   165  func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
   166  	for _, name := range metrics.GetDefaultBuiltInMetrics() {
   167  		err = errors.Join(err, pgw.EnsureMetricDummy(name))
   168  	}
   169  	return
   170  }
   171  
   172  // EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
   173  func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
   174  	_, err = pgw.sinkDb.Exec(pgw.ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
   175  	return
   176  }
   177  
   178  // Write sends the measurements to the cache channel
   179  func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error {
   180  	if pgw.ctx.Err() != nil {
   181  		return pgw.ctx.Err()
   182  	}
   183  	select {
   184  	case pgw.input <- msg:
   185  		// msgs sent
   186  	case <-time.After(highLoadTimeout):
   187  		// msgs dropped due to a huge load, check stdout or file for detailed log
   188  	}
   189  	select {
   190  	case err := <-pgw.lastError:
   191  		return err
   192  	default:
   193  		return nil
   194  	}
   195  }
   196  
   197  // poll is the main loop that reads from the input channel and flushes the data to the database
   198  func (pgw *PostgresWriter) poll() {
   199  	cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
   200  	cacheTimeout := pgw.opts.BatchingDelay
   201  	tick := time.NewTicker(cacheTimeout)
   202  	for {
   203  		select {
   204  		case <-pgw.ctx.Done(): //check context with high priority
   205  			return
   206  		default:
   207  			select {
   208  			case entry := <-pgw.input:
   209  				cache = append(cache, entry)
   210  				if len(cache) < cacheLimit {
   211  					break
   212  				}
   213  				tick.Stop()
   214  				pgw.flush(cache)
   215  				cache = cache[:0]
   216  				tick = time.NewTicker(cacheTimeout)
   217  			case <-tick.C:
   218  				pgw.flush(cache)
   219  				cache = cache[:0]
   220  			case <-pgw.ctx.Done():
   221  				return
   222  			}
   223  		}
   224  	}
   225  }
   226  
   227  func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements {
   228  	return &copyFromMeasurements{envelopes: rows, envelopeIdx: -1, measurementIdx: -1}
   229  }
   230  
   231  type copyFromMeasurements struct {
   232  	envelopes      []metrics.MeasurementEnvelope
   233  	envelopeIdx    int
   234  	measurementIdx int // index of the current measurement in the envelope
   235  	metricName     string
   236  }
   237  
   238  func (c *copyFromMeasurements) NextEnvelope() bool {
   239  	c.envelopeIdx++
   240  	c.measurementIdx = -1
   241  	return c.envelopeIdx < len(c.envelopes)
   242  }
   243  
   244  func (c *copyFromMeasurements) Next() bool {
   245  	for {
   246  		// Check if we need to advance to the next envelope
   247  		if c.envelopeIdx < 0 || c.measurementIdx+1 >= len(c.envelopes[c.envelopeIdx].Data) {
   248  			// Advance to next envelope
   249  			if ok := c.NextEnvelope(); !ok {
   250  				return false // No more envelopes
   251  			}
   252  			// Set metric name from first envelope, or detect metric boundary
   253  			if c.metricName == "" {
   254  				c.metricName = c.envelopes[c.envelopeIdx].MetricName
   255  			} else if c.metricName != c.envelopes[c.envelopeIdx].MetricName {
   256  				// We've hit a different metric - we're done with current metric
   257  				// Reset position to process this envelope on next call
   258  				c.envelopeIdx--
   259  				c.measurementIdx = len(c.envelopes[c.envelopeIdx].Data) // Set to length so we've "finished" this envelope
   260  				c.metricName = ""                                       // Reset for next metric
   261  				return false
   262  			}
   263  		}
   264  
   265  		// Advance to next measurement in current envelope
   266  		c.measurementIdx++
   267  		if c.measurementIdx < len(c.envelopes[c.envelopeIdx].Data) {
   268  			return true // Found valid measurement
   269  		}
   270  		// If we reach here, we've exhausted current envelope, loop will advance to next envelope
   271  	}
   272  }
   273  
   274  func (c *copyFromMeasurements) EOF() bool {
   275  	return c.envelopeIdx >= len(c.envelopes)
   276  }
   277  
   278  func (c *copyFromMeasurements) Values() ([]any, error) {
   279  	row := maps.Clone(c.envelopes[c.envelopeIdx].Data[c.measurementIdx])
   280  	tagRow := maps.Clone(c.envelopes[c.envelopeIdx].CustomTags)
   281  	if tagRow == nil {
   282  		tagRow = make(map[string]string)
   283  	}
   284  	for k, v := range row {
   285  		if after, ok := strings.CutPrefix(k, metrics.TagPrefix); ok {
   286  			tagRow[after] = fmt.Sprintf("%v", v)
   287  			delete(row, k)
   288  		}
   289  	}
   290  	jsonTags, terr := jsoniter.ConfigFastest.MarshalToString(tagRow)
   291  	json, err := jsoniter.ConfigFastest.MarshalToString(row)
   292  	if err != nil || terr != nil {
   293  		return nil, errors.Join(err, terr)
   294  	}
   295  	return []any{time.Unix(0, c.envelopes[c.envelopeIdx].Data.GetEpoch()), c.envelopes[c.envelopeIdx].DBName, json, jsonTags}, nil
   296  }
   297  
   298  func (c *copyFromMeasurements) Err() error {
   299  	return nil
   300  }
   301  
   302  func (c *copyFromMeasurements) MetricName() (ident pgx.Identifier) {
   303  	if c.envelopeIdx+1 < len(c.envelopes) {
   304  		// Metric name is taken from the next envelope
   305  		ident = pgx.Identifier{c.envelopes[c.envelopeIdx+1].MetricName}
   306  	}
   307  	return
   308  }
   309  
   310  // flush sends the cached measurements to the database
   311  func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
   312  	if len(msgs) == 0 {
   313  		return
   314  	}
   315  	logger := log.GetLogger(pgw.ctx)
   316  	// metricsToStorePerMetric := make(map[string][]MeasurementMessagePostgres)
   317  	pgPartBounds := make(map[string]ExistingPartitionInfo)                  // metric=min/max
   318  	pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo) // metric=[dbname=min/max]
   319  	var err error
   320  
   321  	slices.SortFunc(msgs, func(a, b metrics.MeasurementEnvelope) int {
   322  		if a.MetricName < b.MetricName {
   323  			return -1
   324  		} else if a.MetricName > b.MetricName {
   325  			return 1
   326  		}
   327  		return 0
   328  	})
   329  
   330  	for _, msg := range msgs {
   331  		for _, dataRow := range msg.Data {
   332  			epochTime := time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
   333  			switch pgw.metricSchema {
   334  			case DbStorageSchemaTimescale:
   335  				// set min/max timestamps to check/create partitions
   336  				bounds, ok := pgPartBounds[msg.MetricName]
   337  				if !ok || (ok && epochTime.Before(bounds.StartTime)) {
   338  					bounds.StartTime = epochTime
   339  					pgPartBounds[msg.MetricName] = bounds
   340  				}
   341  				if !ok || (ok && epochTime.After(bounds.EndTime)) {
   342  					bounds.EndTime = epochTime
   343  					pgPartBounds[msg.MetricName] = bounds
   344  				}
   345  			case DbStorageSchemaPostgres:
   346  				_, ok := pgPartBoundsDbName[msg.MetricName]
   347  				if !ok {
   348  					pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
   349  				}
   350  				bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
   351  				if !ok || (ok && epochTime.Before(bounds.StartTime)) {
   352  					bounds.StartTime = epochTime
   353  					pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
   354  				}
   355  				if !ok || (ok && epochTime.After(bounds.EndTime)) {
   356  					bounds.EndTime = epochTime
   357  					pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
   358  				}
   359  			default:
   360  				logger.Fatal("unknown storage schema...")
   361  			}
   362  		}
   363  	}
   364  
   365  	switch pgw.metricSchema {
   366  	case DbStorageSchemaPostgres:
   367  		err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
   368  	case DbStorageSchemaTimescale:
   369  		err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePartitions)
   370  	default:
   371  		logger.Fatal("unknown storage schema...")
   372  	}
   373  	forceRecreatePartitions = false
   374  	if err != nil {
   375  		pgw.lastError <- err
   376  	}
   377  
   378  	var rowsBatched, n int64
   379  	t1 := time.Now()
   380  	cfm := newCopyFromMeasurements(msgs)
   381  	for !cfm.EOF() {
   382  		n, err = pgw.sinkDb.CopyFrom(context.Background(), cfm.MetricName(), targetColumns[:], cfm)
   383  		rowsBatched += n
   384  		if err != nil {
   385  			logger.Error(err)
   386  			if PgError, ok := err.(*pgconn.PgError); ok {
   387  				forceRecreatePartitions = PgError.Code == "23514"
   388  			}
   389  			if forceRecreatePartitions {
   390  				logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
   391  			}
   392  		}
   393  	}
   394  	diff := time.Since(t1)
   395  	if err == nil {
   396  		logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
   397  		return
   398  	}
   399  	pgw.lastError <- err
   400  }
   401  
   402  // EnsureMetricTime creates special partitions if Timescale used for realtime metrics
   403  func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
   404  	logger := log.GetLogger(pgw.ctx)
   405  	sqlEnsure := `select part_available_from, part_available_to from admin.ensure_partition_metric_time($1, $2)`
   406  	for metric, pb := range pgPartBounds {
   407  		if !strings.HasSuffix(metric, "_realtime") {
   408  			continue
   409  		}
   410  		if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
   411  			return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
   412  		}
   413  
   414  		partInfo, ok := partitionMapMetric[metric]
   415  		if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
   416  			err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).
   417  				Scan(&partInfo.StartTime, &partInfo.EndTime)
   418  			if err != nil {
   419  				logger.Error("Failed to create partition on 'metrics': ", err)
   420  				return err
   421  			}
   422  			partitionMapMetric[metric] = partInfo
   423  		}
   424  		if pb.EndTime.After(partInfo.EndTime) || force {
   425  			err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(nil, &partInfo.EndTime)
   426  			if err != nil {
   427  				logger.Error("Failed to create partition on 'metrics': ", err)
   428  				return err
   429  			}
   430  			partitionMapMetric[metric] = partInfo
   431  		}
   432  	}
   433  	return nil
   434  }
   435  
   436  func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
   437  	logger := log.GetLogger(pgw.ctx)
   438  	sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
   439  	for metric := range pgPartBounds {
   440  		if strings.HasSuffix(metric, "_realtime") {
   441  			continue
   442  		}
   443  		if _, ok := partitionMapMetric[metric]; !ok {
   444  			if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
   445  				logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
   446  				return err
   447  			}
   448  			partitionMapMetric[metric] = ExistingPartitionInfo{}
   449  		}
   450  	}
   451  	return pgw.EnsureMetricTime(pgPartBounds, force)
   452  }
   453  
   454  func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
   455  	var rows pgx.Rows
   456  	sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
   457  	for metric, dbnameTimestampMap := range metricDbnamePartBounds {
   458  		_, ok := partitionMapMetricDbname[metric]
   459  		if !ok {
   460  			partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
   461  		}
   462  
   463  		for dbname, pb := range dbnameTimestampMap {
   464  			if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
   465  				return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
   466  			}
   467  			partInfo, ok := partitionMapMetricDbname[metric][dbname]
   468  			if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
   469  				if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
   470  					return
   471  				}
   472  				if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
   473  					return err
   474  				}
   475  				partitionMapMetricDbname[metric][dbname] = partInfo
   476  			}
   477  			if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
   478  				if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
   479  					return
   480  				}
   481  				if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
   482  					return err
   483  				}
   484  				partitionMapMetricDbname[metric][dbname] = partInfo
   485  			}
   486  		}
   487  	}
   488  	return nil
   489  }
   490  
   491  // deleteOldPartitions is a background task that deletes old partitions from the measurements DB
   492  func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
   493  	metricAgeDaysThreshold := pgw.opts.Retention
   494  	if metricAgeDaysThreshold <= 0 {
   495  		return
   496  	}
   497  	logger := log.GetLogger(pgw.ctx)
   498  	select {
   499  	case <-pgw.ctx.Done():
   500  		return
   501  	case <-time.After(delay):
   502  		// to reduce distracting log messages at startup
   503  	}
   504  
   505  	for {
   506  		if pgw.metricSchema == DbStorageSchemaTimescale {
   507  			partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
   508  			if err != nil {
   509  				logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
   510  				continue
   511  			}
   512  			logger.Infof("Dropped %d old metric partitions...", partsDropped)
   513  		} else if pgw.metricSchema == DbStorageSchemaPostgres {
   514  			partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
   515  			if err != nil {
   516  				logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
   517  				time.Sleep(time.Second * 300)
   518  				continue
   519  			}
   520  			if len(partsToDrop) > 0 {
   521  				logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
   522  				for _, toDrop := range partsToDrop {
   523  					sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
   524  
   525  					if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlDropTable); err != nil {
   526  						logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
   527  						time.Sleep(time.Second * 300)
   528  					} else {
   529  						time.Sleep(time.Second * 5)
   530  					}
   531  				}
   532  			} else {
   533  				logger.Infof("No old metric partitions found to drop...")
   534  			}
   535  		}
   536  		select {
   537  		case <-pgw.ctx.Done():
   538  			return
   539  		case <-time.After(time.Hour * 12):
   540  		}
   541  	}
   542  }
   543  
   544  // maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
   545  // This is used to avoid listing the same source multiple times in Grafana dropdowns.
   546  func (pgw *PostgresWriter) maintainUniqueSources() {
   547  	logger := log.GetLogger(pgw.ctx)
   548  	// due to metrics deletion the listing can go out of sync (a trigger not really wanted)
   549  	sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
   550  	sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
   551  	sqlDistinct := `
   552  	WITH RECURSIVE t(dbname) AS (
   553  		SELECT MIN(dbname) AS dbname FROM %s
   554  		UNION
   555  		SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
   556  	SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
   557  	sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
   558  	sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
   559  	sqlAdd := `
   560  		INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
   561  		WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
   562  		RETURNING *`
   563  
   564  	for {
   565  		select {
   566  		case <-pgw.ctx.Done():
   567  			return
   568  		case <-time.After(time.Hour * 24):
   569  		}
   570  		var lock bool
   571  		logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
   572  		if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
   573  			logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
   574  			continue
   575  		}
   576  		if !lock {
   577  			logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
   578  			continue
   579  		}
   580  
   581  		logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
   582  		rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
   583  		allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
   584  		if err != nil {
   585  			logger.Error(err)
   586  			continue
   587  		}
   588  
   589  		for _, tableName := range allDistinctMetricTables {
   590  			foundDbnamesMap := make(map[string]bool)
   591  			foundDbnamesArr := make([]string, 0)
   592  			metricName := strings.Replace(tableName, "public.", "", 1)
   593  
   594  			logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
   595  			rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
   596  			ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
   597  			// ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
   598  			if err != nil {
   599  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
   600  				break
   601  			}
   602  			for _, drDbname := range ret {
   603  				foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
   604  			}
   605  
   606  			// delete all that are not known and add all that are not there
   607  			for k := range foundDbnamesMap {
   608  				foundDbnamesArr = append(foundDbnamesArr, k)
   609  			}
   610  			if len(foundDbnamesArr) == 0 { // delete all entries for given metric
   611  				logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
   612  
   613  				_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
   614  				if err != nil {
   615  					logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
   616  				}
   617  				continue
   618  			}
   619  			cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
   620  			if err != nil {
   621  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   622  			} else if cmdTag.RowsAffected() > 0 {
   623  				logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
   624  			}
   625  			cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
   626  			if err != nil {
   627  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   628  			} else if cmdTag.RowsAffected() > 0 {
   629  				logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
   630  			}
   631  			time.Sleep(time.Minute)
   632  		}
   633  
   634  	}
   635  }
   636  
   637  func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
   638  	sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
   639  	err = pgw.sinkDb.QueryRow(pgw.ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
   640  	return
   641  }
   642  
   643  func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
   644  	sqlGetOldParts := `select admin.get_old_time_partitions($1)`
   645  	rows, err := pgw.sinkDb.Query(pgw.ctx, sqlGetOldParts, metricAgeDaysThreshold)
   646  	if err == nil {
   647  		return pgx.CollectRows(rows, pgx.RowTo[string])
   648  	}
   649  	return nil, err
   650  }
   651  
   652  func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
   653  	sql := `insert into admin.all_distinct_dbname_metrics
   654  			select $1, $2
   655  			where not exists (
   656  				select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
   657  			)`
   658  	_, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
   659  	return err
   660  }
   661