1 package sinks
2
3 import (
4 "context"
5 _ "embed"
6 "errors"
7 "fmt"
8 "maps"
9 "slices"
10 "strings"
11 "time"
12
13 jsoniter "github.com/json-iterator/go"
14
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
17 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
18 "github.com/jackc/pgx/v5"
19 "github.com/jackc/pgx/v5/pgconn"
20 )
21
22 var (
23 cacheLimit = 256
24 highLoadTimeout = time.Second * 5
25 deleterDelay = time.Hour
26 targetColumns = [...]string{"time", "dbname", "data", "tag_data"}
27 )
28
29 func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
30 var conn db.PgxPoolIface
31 if conn, err = db.New(ctx, connstr); err != nil {
32 return
33 }
34 return NewWriterFromPostgresConn(ctx, conn, opts)
35 }
36
37 func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error) {
38 l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
39 ctx = log.WithLogger(ctx, l)
40 pgw = &PostgresWriter{
41 ctx: ctx,
42 opts: opts,
43 input: make(chan metrics.MeasurementEnvelope, cacheLimit),
44 lastError: make(chan error),
45 sinkDb: conn,
46 }
47 if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
48 l.Info("initialising measurements database...")
49 exists, err := db.DoesSchemaExist(ctx, conn, "admin")
50 if err != nil || exists {
51 return err
52 }
53 for _, sql := range metricSchemaSQLs {
54 if _, err = conn.Exec(ctx, sql); err != nil {
55 return err
56 }
57 }
58 return nil
59 }); err != nil {
60 return
61 }
62 if err = pgw.ReadMetricSchemaType(); err != nil {
63 return
64 }
65 if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
66 return
67 }
68 go pgw.deleteOldPartitions(deleterDelay)
69 go pgw.maintainUniqueSources()
70 go pgw.poll()
71 l.Info(`measurements sink is activated`)
72 return
73 }
74
75
76 var sqlMetricAdminSchema string
77
78
79 var sqlMetricAdminFunctions string
80
81
82 var sqlMetricEnsurePartitionPostgres string
83
84
85 var sqlMetricEnsurePartitionTimescale string
86
87
88 var sqlMetricChangeChunkIntervalTimescale string
89
90
91 var sqlMetricChangeCompressionIntervalTimescale string
92
93 var (
94 metricSchemaSQLs = []string{
95 sqlMetricAdminSchema,
96 sqlMetricAdminFunctions,
97 sqlMetricEnsurePartitionPostgres,
98 sqlMetricEnsurePartitionTimescale,
99 sqlMetricChangeChunkIntervalTimescale,
100 sqlMetricChangeCompressionIntervalTimescale,
101 }
102 )
103
104
105
106
107
108 type PostgresWriter struct {
109 ctx context.Context
110 sinkDb db.PgxPoolIface
111 metricSchema DbStorageSchemaType
112 opts *CmdOpts
113 input chan metrics.MeasurementEnvelope
114 lastError chan error
115 }
116
117 type ExistingPartitionInfo struct {
118 StartTime time.Time
119 EndTime time.Time
120 }
121
122 type MeasurementMessagePostgres struct {
123 Time time.Time
124 DBName string
125 Metric string
126 Data map[string]any
127 TagData map[string]string
128 }
129
130 type DbStorageSchemaType int
131
132 const (
133 DbStorageSchemaPostgres DbStorageSchemaType = iota
134 DbStorageSchemaTimescale
135 )
136
137 func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
138 var isTs bool
139 pgw.metricSchema = DbStorageSchemaPostgres
140 sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
141 if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
142 pgw.metricSchema = DbStorageSchemaTimescale
143 }
144 return
145 }
146
147 var (
148 forceRecreatePartitions = false
149 partitionMapMetric = make(map[string]ExistingPartitionInfo)
150 partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo)
151 )
152
153
154 func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName string, op SyncOp) error {
155 if op == AddOp {
156 return errors.Join(
157 pgw.AddDBUniqueMetricToListingTable(dbUnique, metricName),
158 pgw.EnsureMetricDummy(metricName),
159 )
160 }
161 return nil
162 }
163
164
165 func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
166 for _, name := range metrics.GetDefaultBuiltInMetrics() {
167 err = errors.Join(err, pgw.EnsureMetricDummy(name))
168 }
169 return
170 }
171
172
173 func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
174 _, err = pgw.sinkDb.Exec(pgw.ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
175 return
176 }
177
178
179 func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error {
180 if pgw.ctx.Err() != nil {
181 return pgw.ctx.Err()
182 }
183 select {
184 case pgw.input <- msg:
185
186 case <-time.After(highLoadTimeout):
187
188 }
189 select {
190 case err := <-pgw.lastError:
191 return err
192 default:
193 return nil
194 }
195 }
196
197
198 func (pgw *PostgresWriter) poll() {
199 cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
200 cacheTimeout := pgw.opts.BatchingDelay
201 tick := time.NewTicker(cacheTimeout)
202 for {
203 select {
204 case <-pgw.ctx.Done():
205 return
206 default:
207 select {
208 case entry := <-pgw.input:
209 cache = append(cache, entry)
210 if len(cache) < cacheLimit {
211 break
212 }
213 tick.Stop()
214 pgw.flush(cache)
215 cache = cache[:0]
216 tick = time.NewTicker(cacheTimeout)
217 case <-tick.C:
218 pgw.flush(cache)
219 cache = cache[:0]
220 case <-pgw.ctx.Done():
221 return
222 }
223 }
224 }
225 }
226
227 func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements {
228 return ©FromMeasurements{envelopes: rows, envelopeIdx: -1, measurementIdx: -1}
229 }
230
231 type copyFromMeasurements struct {
232 envelopes []metrics.MeasurementEnvelope
233 envelopeIdx int
234 measurementIdx int
235 metricName string
236 }
237
238 func (c *copyFromMeasurements) Next() bool {
239 for {
240
241 if c.envelopeIdx < 0 || c.measurementIdx+1 >= len(c.envelopes[c.envelopeIdx].Data) {
242
243 c.envelopeIdx++
244 if c.envelopeIdx >= len(c.envelopes) {
245 return false
246 }
247 c.measurementIdx = -1
248
249
250 if c.metricName == "" {
251 c.metricName = c.envelopes[c.envelopeIdx].MetricName
252 } else if c.metricName != c.envelopes[c.envelopeIdx].MetricName {
253
254
255 c.envelopeIdx--
256 c.measurementIdx = len(c.envelopes[c.envelopeIdx].Data)
257 c.metricName = ""
258 return false
259 }
260 }
261
262
263 c.measurementIdx++
264 if c.measurementIdx < len(c.envelopes[c.envelopeIdx].Data) {
265 return true
266 }
267
268 }
269 }
270
271 func (c *copyFromMeasurements) EOF() bool {
272 return c.envelopeIdx >= len(c.envelopes)
273 }
274
275 func (c *copyFromMeasurements) Values() ([]any, error) {
276 row := maps.Clone(c.envelopes[c.envelopeIdx].Data[c.measurementIdx])
277 tagRow := maps.Clone(c.envelopes[c.envelopeIdx].CustomTags)
278 if tagRow == nil {
279 tagRow = make(map[string]string)
280 }
281 for k, v := range row {
282 if strings.HasPrefix(k, metrics.TagPrefix) {
283 tagRow[strings.TrimPrefix(k, metrics.TagPrefix)] = fmt.Sprintf("%v", v)
284 delete(row, k)
285 }
286 }
287 jsonTags, terr := jsoniter.ConfigFastest.MarshalToString(tagRow)
288 json, err := jsoniter.ConfigFastest.MarshalToString(row)
289 if err != nil || terr != nil {
290 return nil, errors.Join(err, terr)
291 }
292 return []any{time.Unix(0, c.envelopes[c.envelopeIdx].Data.GetEpoch()), c.envelopes[c.envelopeIdx].DBName, json, jsonTags}, nil
293 }
294
295 func (c *copyFromMeasurements) Err() error {
296 return nil
297 }
298
299 func (c *copyFromMeasurements) MetricName() pgx.Identifier {
300 return pgx.Identifier{c.envelopes[c.envelopeIdx+1].MetricName}
301 }
302
303
304 func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
305 if len(msgs) == 0 {
306 return
307 }
308 logger := log.GetLogger(pgw.ctx)
309
310 pgPartBounds := make(map[string]ExistingPartitionInfo)
311 pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo)
312 var err error
313
314 slices.SortFunc(msgs, func(a, b metrics.MeasurementEnvelope) int {
315 if a.MetricName < b.MetricName {
316 return -1
317 } else if a.MetricName > b.MetricName {
318 return 1
319 }
320 return 0
321 })
322
323 for _, msg := range msgs {
324 for _, dataRow := range msg.Data {
325 epochTime := time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
326 switch pgw.metricSchema {
327 case DbStorageSchemaTimescale:
328
329 bounds, ok := pgPartBounds[msg.MetricName]
330 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
331 bounds.StartTime = epochTime
332 pgPartBounds[msg.MetricName] = bounds
333 }
334 if !ok || (ok && epochTime.After(bounds.EndTime)) {
335 bounds.EndTime = epochTime
336 pgPartBounds[msg.MetricName] = bounds
337 }
338 case DbStorageSchemaPostgres:
339 _, ok := pgPartBoundsDbName[msg.MetricName]
340 if !ok {
341 pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
342 }
343 bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
344 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
345 bounds.StartTime = epochTime
346 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
347 }
348 if !ok || (ok && epochTime.After(bounds.EndTime)) {
349 bounds.EndTime = epochTime
350 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
351 }
352 default:
353 logger.Fatal("unknown storage schema...")
354 }
355 }
356 }
357
358 switch pgw.metricSchema {
359 case DbStorageSchemaPostgres:
360 err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
361 case DbStorageSchemaTimescale:
362 err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePartitions)
363 default:
364 logger.Fatal("unknown storage schema...")
365 }
366 forceRecreatePartitions = false
367 if err != nil {
368 pgw.lastError <- err
369 }
370
371 var rowsBatched, n int64
372 t1 := time.Now()
373 cfm := newCopyFromMeasurements(msgs)
374 for !cfm.EOF() {
375 n, err = pgw.sinkDb.CopyFrom(context.Background(), cfm.MetricName(), targetColumns[:], cfm)
376 rowsBatched += n
377 if err != nil {
378 logger.Error(err)
379 if PgError, ok := err.(*pgconn.PgError); ok {
380 forceRecreatePartitions = PgError.Code == "23514"
381 }
382 if forceRecreatePartitions {
383 logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
384 }
385 }
386 }
387 diff := time.Since(t1)
388 if err == nil {
389 logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
390 return
391 }
392 pgw.lastError <- err
393 }
394
395
396 func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
397 logger := log.GetLogger(pgw.ctx)
398 sqlEnsure := `select part_available_from, part_available_to from admin.ensure_partition_metric_time($1, $2)`
399 for metric, pb := range pgPartBounds {
400 if !strings.HasSuffix(metric, "_realtime") {
401 continue
402 }
403 if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
404 return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
405 }
406
407 partInfo, ok := partitionMapMetric[metric]
408 if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
409 err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).
410 Scan(&partInfo.StartTime, &partInfo.EndTime)
411 if err != nil {
412 logger.Error("Failed to create partition on 'metrics': ", err)
413 return err
414 }
415 partitionMapMetric[metric] = partInfo
416 }
417 if pb.EndTime.After(partInfo.EndTime) || force {
418 err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(nil, &partInfo.EndTime)
419 if err != nil {
420 logger.Error("Failed to create partition on 'metrics': ", err)
421 return err
422 }
423 partitionMapMetric[metric] = partInfo
424 }
425 }
426 return nil
427 }
428
429 func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
430 logger := log.GetLogger(pgw.ctx)
431 sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
432 for metric := range pgPartBounds {
433 if strings.HasSuffix(metric, "_realtime") {
434 continue
435 }
436 if _, ok := partitionMapMetric[metric]; !ok {
437 if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
438 logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
439 return err
440 }
441 partitionMapMetric[metric] = ExistingPartitionInfo{}
442 }
443 }
444 return pgw.EnsureMetricTime(pgPartBounds, force)
445 }
446
447 func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
448 var rows pgx.Rows
449 sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
450 for metric, dbnameTimestampMap := range metricDbnamePartBounds {
451 _, ok := partitionMapMetricDbname[metric]
452 if !ok {
453 partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
454 }
455
456 for dbname, pb := range dbnameTimestampMap {
457 if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
458 return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
459 }
460 partInfo, ok := partitionMapMetricDbname[metric][dbname]
461 if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
462 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
463 return
464 }
465 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
466 return err
467 }
468 partitionMapMetricDbname[metric][dbname] = partInfo
469 }
470 if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
471 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
472 return
473 }
474 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
475 return err
476 }
477 partitionMapMetricDbname[metric][dbname] = partInfo
478 }
479 }
480 }
481 return nil
482 }
483
484
485 func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
486 metricAgeDaysThreshold := pgw.opts.Retention
487 if metricAgeDaysThreshold <= 0 {
488 return
489 }
490 logger := log.GetLogger(pgw.ctx)
491 select {
492 case <-pgw.ctx.Done():
493 return
494 case <-time.After(delay):
495
496 }
497
498 for {
499 if pgw.metricSchema == DbStorageSchemaTimescale {
500 partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
501 if err != nil {
502 logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
503 continue
504 }
505 logger.Infof("Dropped %d old metric partitions...", partsDropped)
506 } else if pgw.metricSchema == DbStorageSchemaPostgres {
507 partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
508 if err != nil {
509 logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
510 time.Sleep(time.Second * 300)
511 continue
512 }
513 if len(partsToDrop) > 0 {
514 logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
515 for _, toDrop := range partsToDrop {
516 sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
517
518 if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlDropTable); err != nil {
519 logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
520 time.Sleep(time.Second * 300)
521 } else {
522 time.Sleep(time.Second * 5)
523 }
524 }
525 } else {
526 logger.Infof("No old metric partitions found to drop...")
527 }
528 }
529 select {
530 case <-pgw.ctx.Done():
531 return
532 case <-time.After(time.Hour * 12):
533 }
534 }
535 }
536
537
538
539 func (pgw *PostgresWriter) maintainUniqueSources() {
540 logger := log.GetLogger(pgw.ctx)
541
542 sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock`
543 sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
544 sqlDistinct := `
545 WITH RECURSIVE t(dbname) AS (
546 SELECT MIN(dbname) AS dbname FROM %s
547 UNION
548 SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
549 SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
550 sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
551 sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
552 sqlAdd := `
553 INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
554 WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
555 RETURNING *`
556
557 for {
558 select {
559 case <-pgw.ctx.Done():
560 return
561 case <-time.After(time.Hour * 24):
562 }
563 var lock bool
564 logger.Infof("Trying to get metricsDb listing maintainer advisory lock...")
565 if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
566 logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
567 continue
568 }
569 if !lock {
570 logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
571 continue
572 }
573
574 logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
575 rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
576 allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
577 if err != nil {
578 logger.Error(err)
579 continue
580 }
581
582 for _, tableName := range allDistinctMetricTables {
583 foundDbnamesMap := make(map[string]bool)
584 foundDbnamesArr := make([]string, 0)
585 metricName := strings.Replace(tableName, "public.", "", 1)
586
587 logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
588 rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
589 ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
590
591 if err != nil {
592 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
593 break
594 }
595 for _, drDbname := range ret {
596 foundDbnamesMap[drDbname] = true
597 }
598
599
600 for k := range foundDbnamesMap {
601 foundDbnamesArr = append(foundDbnamesArr, k)
602 }
603 if len(foundDbnamesArr) == 0 {
604 logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
605
606 _, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
607 if err != nil {
608 logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
609 }
610 continue
611 }
612 cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
613 if err != nil {
614 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
615 } else if cmdTag.RowsAffected() > 0 {
616 logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
617 }
618 cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
619 if err != nil {
620 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
621 } else if cmdTag.RowsAffected() > 0 {
622 logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
623 }
624 time.Sleep(time.Minute)
625 }
626
627 }
628 }
629
630 func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
631 sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
632 err = pgw.sinkDb.QueryRow(pgw.ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
633 return
634 }
635
636 func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
637 sqlGetOldParts := `select admin.get_old_time_partitions($1)`
638 rows, err := pgw.sinkDb.Query(pgw.ctx, sqlGetOldParts, metricAgeDaysThreshold)
639 if err == nil {
640 return pgx.CollectRows(rows, pgx.RowTo[string])
641 }
642 return nil, err
643 }
644
645 func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
646 sql := `insert into admin.all_distinct_dbname_metrics
647 select $1, $2
648 where not exists (
649 select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
650 )`
651 _, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
652 return err
653 }
654