...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/postgres.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	_ "embed"
     6  	"errors"
     7  	"fmt"
     8  	"maps"
     9  	"slices"
    10  	"strings"
    11  	"time"
    12  
    13  	jsoniter "github.com/json-iterator/go"
    14  
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    17  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    18  	"github.com/jackc/pgx/v5"
    19  	"github.com/jackc/pgx/v5/pgconn"
    20  )
    21  
    22  var (
    23  	cacheLimit      = 256
    24  	highLoadTimeout = time.Second * 5
    25  	deleterDelay    = time.Hour
    26  	targetColumns   = [...]string{"time", "dbname", "data", "tag_data"}
    27  )
    28  
    29  func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
    30  	var conn db.PgxPoolIface
    31  	if conn, err = db.New(ctx, connstr); err != nil {
    32  		return
    33  	}
    34  	return NewWriterFromPostgresConn(ctx, conn, opts)
    35  }
    36  
    37  func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error) {
    38  	l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
    39  	ctx = log.WithLogger(ctx, l)
    40  	pgw = &PostgresWriter{
    41  		ctx:       ctx,
    42  		opts:      opts,
    43  		input:     make(chan metrics.MeasurementEnvelope, cacheLimit),
    44  		lastError: make(chan error),
    45  		sinkDb:    conn,
    46  	}
    47  	if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
    48  		l.Info("initialising measurements database...")
    49  		exists, err := db.DoesSchemaExist(ctx, conn, "admin")
    50  		if err != nil || exists {
    51  			return err
    52  		}
    53  		for _, sql := range metricSchemaSQLs {
    54  			if _, err = conn.Exec(ctx, sql); err != nil {
    55  				return err
    56  			}
    57  		}
    58  		return nil
    59  	}); err != nil {
    60  		return
    61  	}
    62  	if err = pgw.ReadMetricSchemaType(); err != nil {
    63  		return
    64  	}
    65  	if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
    66  		return
    67  	}
    68  	go pgw.deleteOldPartitions(deleterDelay)
    69  	go pgw.maintainUniqueSources()
    70  	go pgw.poll()
    71  	l.Info(`measurements sink is activated`)
    72  	return
    73  }
    74  
    75  //go:embed sql/admin_schema.sql
    76  var sqlMetricAdminSchema string
    77  
    78  //go:embed sql/admin_functions.sql
    79  var sqlMetricAdminFunctions string
    80  
    81  //go:embed sql/ensure_partition_postgres.sql
    82  var sqlMetricEnsurePartitionPostgres string
    83  
    84  //go:embed sql/ensure_partition_timescale.sql
    85  var sqlMetricEnsurePartitionTimescale string
    86  
    87  //go:embed sql/change_chunk_interval.sql
    88  var sqlMetricChangeChunkIntervalTimescale string
    89  
    90  //go:embed sql/change_compression_interval.sql
    91  var sqlMetricChangeCompressionIntervalTimescale string
    92  
    93  var (
    94  	metricSchemaSQLs = []string{
    95  		sqlMetricAdminSchema,
    96  		sqlMetricAdminFunctions,
    97  		sqlMetricEnsurePartitionPostgres,
    98  		sqlMetricEnsurePartitionTimescale,
    99  		sqlMetricChangeChunkIntervalTimescale,
   100  		sqlMetricChangeCompressionIntervalTimescale,
   101  	}
   102  )
   103  
   104  // PostgresWriter is a sink that writes metric measurements to a Postgres database.
   105  // At the moment, it supports both Postgres and TimescaleDB as a storage backend.
   106  // However, one is able to use any Postgres-compatible database as a storage backend,
   107  // e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
   108  type PostgresWriter struct {
   109  	ctx          context.Context
   110  	sinkDb       db.PgxPoolIface
   111  	metricSchema DbStorageSchemaType
   112  	opts         *CmdOpts
   113  	input        chan metrics.MeasurementEnvelope
   114  	lastError    chan error
   115  }
   116  
   117  type ExistingPartitionInfo struct {
   118  	StartTime time.Time
   119  	EndTime   time.Time
   120  }
   121  
   122  type MeasurementMessagePostgres struct {
   123  	Time    time.Time
   124  	DBName  string
   125  	Metric  string
   126  	Data    map[string]any
   127  	TagData map[string]string
   128  }
   129  
   130  type DbStorageSchemaType int
   131  
   132  const (
   133  	DbStorageSchemaPostgres DbStorageSchemaType = iota
   134  	DbStorageSchemaTimescale
   135  )
   136  
   137  func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
   138  	var isTs bool
   139  	pgw.metricSchema = DbStorageSchemaPostgres
   140  	sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
   141  	if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
   142  		pgw.metricSchema = DbStorageSchemaTimescale
   143  	}
   144  	return
   145  }
   146  
   147  var (
   148  	forceRecreatePartitions  = false                                             // to signal override PG metrics storage cache
   149  	partitionMapMetric       = make(map[string]ExistingPartitionInfo)            // metric = min/max bounds
   150  	partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
   151  )
   152  
   153  // SyncMetric ensures that tables exist for newly added metrics and/or sources
   154  func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error {
   155  	if op == AddOp {
   156  		return errors.Join(
   157  			pgw.AddDBUniqueMetricToListingTable(dbUnique, metricName),
   158  			pgw.EnsureMetricDummy(metricName), // ensure that there is at least an empty top-level table not to get ugly Grafana notifications
   159  		)
   160  	}
   161  	return nil
   162  }
   163  
   164  // EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
   165  func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
   166  	for _, name := range metrics.GetDefaultBuiltInMetrics() {
   167  		err = errors.Join(err, pgw.EnsureMetricDummy(name))
   168  	}
   169  	return
   170  }
   171  
   172  // EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
   173  func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
   174  	_, err = pgw.sinkDb.Exec(pgw.ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
   175  	return
   176  }
   177  
   178  // Write sends the measurements to the cache channel
   179  func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error {
   180  	if pgw.ctx.Err() != nil {
   181  		return pgw.ctx.Err()
   182  	}
   183  	select {
   184  	case pgw.input <- msg:
   185  		// msgs sent
   186  	case <-time.After(highLoadTimeout):
   187  		// msgs dropped due to a huge load, check stdout or file for detailed log
   188  	}
   189  	select {
   190  	case err := <-pgw.lastError:
   191  		return err
   192  	default:
   193  		return nil
   194  	}
   195  }
   196  
   197  // poll is the main loop that reads from the input channel and flushes the data to the database
   198  func (pgw *PostgresWriter) poll() {
   199  	cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
   200  	cacheTimeout := pgw.opts.BatchingDelay
   201  	tick := time.NewTicker(cacheTimeout)
   202  	for {
   203  		select {
   204  		case <-pgw.ctx.Done(): //check context with high priority
   205  			return
   206  		default:
   207  			select {
   208  			case entry := <-pgw.input:
   209  				cache = append(cache, entry)
   210  				if len(cache) < cacheLimit {
   211  					break
   212  				}
   213  				tick.Stop()
   214  				pgw.flush(cache)
   215  				cache = cache[:0]
   216  				tick = time.NewTicker(cacheTimeout)
   217  			case <-tick.C:
   218  				pgw.flush(cache)
   219  				cache = cache[:0]
   220  			case <-pgw.ctx.Done():
   221  				return
   222  			}
   223  		}
   224  	}
   225  }
   226  
   227  func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements {
   228  	return &copyFromMeasurements{envelopes: rows, envelopeIdx: -1, measurementIdx: -1}
   229  }
   230  
   231  type copyFromMeasurements struct {
   232  	envelopes      []metrics.MeasurementEnvelope
   233  	envelopeIdx    int
   234  	measurementIdx int // index of the current measurement in the envelope
   235  	metricName     string
   236  }
   237  
   238  func (c *copyFromMeasurements) Next() bool {
   239  	for {
   240  		// Check if we need to advance to the next envelope
   241  		if c.envelopeIdx < 0 || c.measurementIdx+1 >= len(c.envelopes[c.envelopeIdx].Data) {
   242  			// Advance to next envelope
   243  			c.envelopeIdx++
   244  			if c.envelopeIdx >= len(c.envelopes) {
   245  				return false // No more envelopes
   246  			}
   247  			c.measurementIdx = -1 // Reset measurement index for new envelope
   248  
   249  			// Set metric name from first envelope, or detect metric boundary
   250  			if c.metricName == "" {
   251  				c.metricName = c.envelopes[c.envelopeIdx].MetricName
   252  			} else if c.metricName != c.envelopes[c.envelopeIdx].MetricName {
   253  				// We've hit a different metric - we're done with current metric
   254  				// Reset position to process this envelope on next call
   255  				c.envelopeIdx--
   256  				c.measurementIdx = len(c.envelopes[c.envelopeIdx].Data) // Set to length so we've "finished" this envelope
   257  				c.metricName = ""                                       // Reset for next metric
   258  				return false
   259  			}
   260  		}
   261  
   262  		// Advance to next measurement in current envelope
   263  		c.measurementIdx++
   264  		if c.measurementIdx < len(c.envelopes[c.envelopeIdx].Data) {
   265  			return true // Found valid measurement
   266  		}
   267  		// If we reach here, we've exhausted current envelope, loop will advance to next envelope
   268  	}
   269  }
   270  
   271  func (c *copyFromMeasurements) EOF() bool {
   272  	return c.envelopeIdx >= len(c.envelopes)
   273  }
   274  
   275  func (c *copyFromMeasurements) Values() ([]any, error) {
   276  	row := maps.Clone(c.envelopes[c.envelopeIdx].Data[c.measurementIdx])
   277  	tagRow := maps.Clone(c.envelopes[c.envelopeIdx].CustomTags)
   278  	if tagRow == nil {
   279  		tagRow = make(map[string]string)
   280  	}
   281  	for k, v := range row {
   282  		if strings.HasPrefix(k, metrics.TagPrefix) {
   283  			tagRow[strings.TrimPrefix(k, metrics.TagPrefix)] = fmt.Sprintf("%v", v)
   284  			delete(row, k)
   285  		}
   286  	}
   287  	jsonTags, terr := jsoniter.ConfigFastest.MarshalToString(tagRow)
   288  	json, err := jsoniter.ConfigFastest.MarshalToString(row)
   289  	if err != nil || terr != nil {
   290  		return nil, errors.Join(err, terr)
   291  	}
   292  	return []any{time.Unix(0, c.envelopes[c.envelopeIdx].Data.GetEpoch()), c.envelopes[c.envelopeIdx].DBName, json, jsonTags}, nil
   293  }
   294  
   295  func (c *copyFromMeasurements) Err() error {
   296  	return nil
   297  }
   298  
   299  func (c *copyFromMeasurements) MetricName() pgx.Identifier {
   300  	return pgx.Identifier{c.envelopes[c.envelopeIdx+1].MetricName} // Metric name is taken from the next envelope
   301  }
   302  
   303  // flush sends the cached measurements to the database
   304  func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
   305  	if len(msgs) == 0 {
   306  		return
   307  	}
   308  	logger := log.GetLogger(pgw.ctx)
   309  	// metricsToStorePerMetric := make(map[string][]MeasurementMessagePostgres)
   310  	pgPartBounds := make(map[string]ExistingPartitionInfo)                  // metric=min/max
   311  	pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo) // metric=[dbname=min/max]
   312  	var err error
   313  
   314  	slices.SortFunc(msgs, func(a, b metrics.MeasurementEnvelope) int {
   315  		if a.MetricName < b.MetricName {
   316  			return -1
   317  		} else if a.MetricName > b.MetricName {
   318  			return 1
   319  		}
   320  		return 0
   321  	})
   322  
   323  	for _, msg := range msgs {
   324  		for _, dataRow := range msg.Data {
   325  			epochTime := time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
   326  			switch pgw.metricSchema {
   327  			case DbStorageSchemaTimescale:
   328  				// set min/max timestamps to check/create partitions
   329  				bounds, ok := pgPartBounds[msg.MetricName]
   330  				if !ok || (ok && epochTime.Before(bounds.StartTime)) {
   331  					bounds.StartTime = epochTime
   332  					pgPartBounds[msg.MetricName] = bounds
   333  				}
   334  				if !ok || (ok && epochTime.After(bounds.EndTime)) {
   335  					bounds.EndTime = epochTime
   336  					pgPartBounds[msg.MetricName] = bounds
   337  				}
   338  			case DbStorageSchemaPostgres:
   339  				_, ok := pgPartBoundsDbName[msg.MetricName]
   340  				if !ok {
   341  					pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
   342  				}
   343  				bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
   344  				if !ok || (ok && epochTime.Before(bounds.StartTime)) {
   345  					bounds.StartTime = epochTime
   346  					pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
   347  				}
   348  				if !ok || (ok && epochTime.After(bounds.EndTime)) {
   349  					bounds.EndTime = epochTime
   350  					pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
   351  				}
   352  			default:
   353  				logger.Fatal("unknown storage schema...")
   354  			}
   355  		}
   356  	}
   357  
   358  	switch pgw.metricSchema {
   359  	case DbStorageSchemaPostgres:
   360  		err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
   361  	case DbStorageSchemaTimescale:
   362  		err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePartitions)
   363  	default:
   364  		logger.Fatal("unknown storage schema...")
   365  	}
   366  	forceRecreatePartitions = false
   367  	if err != nil {
   368  		pgw.lastError <- err
   369  	}
   370  
   371  	var rowsBatched, n int64
   372  	t1 := time.Now()
   373  	cfm := newCopyFromMeasurements(msgs)
   374  	for !cfm.EOF() {
   375  		n, err = pgw.sinkDb.CopyFrom(context.Background(), cfm.MetricName(), targetColumns[:], cfm)
   376  		rowsBatched += n
   377  		if err != nil {
   378  			logger.Error(err)
   379  			if PgError, ok := err.(*pgconn.PgError); ok {
   380  				forceRecreatePartitions = PgError.Code == "23514"
   381  			}
   382  			if forceRecreatePartitions {
   383  				logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
   384  			}
   385  		}
   386  	}
   387  	diff := time.Since(t1)
   388  	if err == nil {
   389  		logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
   390  		return
   391  	}
   392  	pgw.lastError <- err
   393  }
   394  
   395  // EnsureMetricTime creates special partitions if Timescale used for realtime metrics
   396  func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
   397  	logger := log.GetLogger(pgw.ctx)
   398  	sqlEnsure := `select part_available_from, part_available_to from admin.ensure_partition_metric_time($1, $2)`
   399  	for metric, pb := range pgPartBounds {
   400  		if !strings.HasSuffix(metric, "_realtime") {
   401  			continue
   402  		}
   403  		if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
   404  			return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
   405  		}
   406  
   407  		partInfo, ok := partitionMapMetric[metric]
   408  		if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
   409  			err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).
   410  				Scan(&partInfo.StartTime, &partInfo.EndTime)
   411  			if err != nil {
   412  				logger.Error("Failed to create partition on 'metrics': ", err)
   413  				return err
   414  			}
   415  			partitionMapMetric[metric] = partInfo
   416  		}
   417  		if pb.EndTime.After(partInfo.EndTime) || force {
   418  			err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(nil, &partInfo.EndTime)
   419  			if err != nil {
   420  				logger.Error("Failed to create partition on 'metrics': ", err)
   421  				return err
   422  			}
   423  			partitionMapMetric[metric] = partInfo
   424  		}
   425  	}
   426  	return nil
   427  }
   428  
   429  func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
   430  	logger := log.GetLogger(pgw.ctx)
   431  	sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
   432  	for metric := range pgPartBounds {
   433  		if strings.HasSuffix(metric, "_realtime") {
   434  			continue
   435  		}
   436  		if _, ok := partitionMapMetric[metric]; !ok {
   437  			if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
   438  				logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
   439  				return err
   440  			}
   441  			partitionMapMetric[metric] = ExistingPartitionInfo{}
   442  		}
   443  	}
   444  	return pgw.EnsureMetricTime(pgPartBounds, force)
   445  }
   446  
   447  func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
   448  	var rows pgx.Rows
   449  	sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
   450  	for metric, dbnameTimestampMap := range metricDbnamePartBounds {
   451  		_, ok := partitionMapMetricDbname[metric]
   452  		if !ok {
   453  			partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
   454  		}
   455  
   456  		for dbname, pb := range dbnameTimestampMap {
   457  			if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
   458  				return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
   459  			}
   460  			partInfo, ok := partitionMapMetricDbname[metric][dbname]
   461  			if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
   462  				if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
   463  					return
   464  				}
   465  				if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
   466  					return err
   467  				}
   468  				partitionMapMetricDbname[metric][dbname] = partInfo
   469  			}
   470  			if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
   471  				if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
   472  					return
   473  				}
   474  				if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
   475  					return err
   476  				}
   477  				partitionMapMetricDbname[metric][dbname] = partInfo
   478  			}
   479  		}
   480  	}
   481  	return nil
   482  }
   483  
   484  // deleteOldPartitions is a background task that deletes old partitions from the measurements DB
   485  func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
   486  	metricAgeDaysThreshold := pgw.opts.Retention
   487  	if metricAgeDaysThreshold <= 0 {
   488  		return
   489  	}
   490  	logger := log.GetLogger(pgw.ctx)
   491  	select {
   492  	case <-pgw.ctx.Done():
   493  		return
   494  	case <-time.After(delay):
   495  		// to reduce distracting log messages at startup
   496  	}
   497  
   498  	for {
   499  		if pgw.metricSchema == DbStorageSchemaTimescale {
   500  			partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
   501  			if err != nil {
   502  				logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
   503  				continue
   504  			}
   505  			logger.Infof("Dropped %d old metric partitions...", partsDropped)
   506  		} else if pgw.metricSchema == DbStorageSchemaPostgres {
   507  			partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
   508  			if err != nil {
   509  				logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
   510  				time.Sleep(time.Second * 300)
   511  				continue
   512  			}
   513  			if len(partsToDrop) > 0 {
   514  				logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
   515  				for _, toDrop := range partsToDrop {
   516  					sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
   517  
   518  					if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlDropTable); err != nil {
   519  						logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
   520  						time.Sleep(time.Second * 300)
   521  					} else {
   522  						time.Sleep(time.Second * 5)
   523  					}
   524  				}
   525  			} else {
   526  				logger.Infof("No old metric partitions found to drop...")
   527  			}
   528  		}
   529  		select {
   530  		case <-pgw.ctx.Done():
   531  			return
   532  		case <-time.After(time.Hour * 12):
   533  		}
   534  	}
   535  }
   536  
   537  // maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
   538  // This is used to avoid listing the same source multiple times in Grafana dropdowns.
   539  func (pgw *PostgresWriter) maintainUniqueSources() {
   540  	logger := log.GetLogger(pgw.ctx)
   541  	// due to metrics deletion the listing can go out of sync (a trigger not really wanted)
   542  	sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
   543  	sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
   544  	sqlDistinct := `
   545  	WITH RECURSIVE t(dbname) AS (
   546  		SELECT MIN(dbname) AS dbname FROM %s
   547  		UNION
   548  		SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
   549  	SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
   550  	sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
   551  	sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
   552  	sqlAdd := `
   553  		INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
   554  		WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
   555  		RETURNING *`
   556  
   557  	for {
   558  		select {
   559  		case <-pgw.ctx.Done():
   560  			return
   561  		case <-time.After(time.Hour * 24):
   562  		}
   563  		var lock bool
   564  		logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
   565  		if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
   566  			logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
   567  			continue
   568  		}
   569  		if !lock {
   570  			logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
   571  			continue
   572  		}
   573  
   574  		logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
   575  		rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
   576  		allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
   577  		if err != nil {
   578  			logger.Error(err)
   579  			continue
   580  		}
   581  
   582  		for _, tableName := range allDistinctMetricTables {
   583  			foundDbnamesMap := make(map[string]bool)
   584  			foundDbnamesArr := make([]string, 0)
   585  			metricName := strings.Replace(tableName, "public.", "", 1)
   586  
   587  			logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
   588  			rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
   589  			ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
   590  			// ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
   591  			if err != nil {
   592  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
   593  				break
   594  			}
   595  			for _, drDbname := range ret {
   596  				foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
   597  			}
   598  
   599  			// delete all that are not known and add all that are not there
   600  			for k := range foundDbnamesMap {
   601  				foundDbnamesArr = append(foundDbnamesArr, k)
   602  			}
   603  			if len(foundDbnamesArr) == 0 { // delete all entries for given metric
   604  				logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
   605  
   606  				_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
   607  				if err != nil {
   608  					logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
   609  				}
   610  				continue
   611  			}
   612  			cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
   613  			if err != nil {
   614  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   615  			} else if cmdTag.RowsAffected() > 0 {
   616  				logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
   617  			}
   618  			cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
   619  			if err != nil {
   620  				logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
   621  			} else if cmdTag.RowsAffected() > 0 {
   622  				logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
   623  			}
   624  			time.Sleep(time.Minute)
   625  		}
   626  
   627  	}
   628  }
   629  
   630  func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
   631  	sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
   632  	err = pgw.sinkDb.QueryRow(pgw.ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
   633  	return
   634  }
   635  
   636  func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
   637  	sqlGetOldParts := `select admin.get_old_time_partitions($1)`
   638  	rows, err := pgw.sinkDb.Query(pgw.ctx, sqlGetOldParts, metricAgeDaysThreshold)
   639  	if err == nil {
   640  		return pgx.CollectRows(rows, pgx.RowTo[string])
   641  	}
   642  	return nil, err
   643  }
   644  
   645  func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
   646  	sql := `insert into admin.all_distinct_dbname_metrics
   647  			select $1, $2
   648  			where not exists (
   649  				select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
   650  			)`
   651  	_, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
   652  	return err
   653  }
   654