1 package sinks
2
3 import (
4 "context"
5 _ "embed"
6 "errors"
7 "fmt"
8 "maps"
9 "slices"
10 "strings"
11 "time"
12
13 jsoniter "github.com/json-iterator/go"
14
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
17 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
18 "github.com/jackc/pgx/v5"
19 "github.com/jackc/pgx/v5/pgconn"
20 )
21
22 var (
23 cacheLimit = 256
24 highLoadTimeout = time.Second * 5
25 deleterDelay = time.Hour
26 targetColumns = [...]string{"time", "dbname", "data", "tag_data"}
27 )
28
29 func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
30 var conn db.PgxPoolIface
31 if conn, err = db.New(ctx, connstr); err != nil {
32 return
33 }
34 return NewWriterFromPostgresConn(ctx, conn, opts)
35 }
36
37 func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error) {
38 l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
39 ctx = log.WithLogger(ctx, l)
40 pgw = &PostgresWriter{
41 ctx: ctx,
42 opts: opts,
43 input: make(chan metrics.MeasurementEnvelope, cacheLimit),
44 lastError: make(chan error),
45 sinkDb: conn,
46 }
47 if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
48 l.Info("initialising measurements database...")
49 exists, err := db.DoesSchemaExist(ctx, conn, "admin")
50 if err != nil || exists {
51 return err
52 }
53 for _, sql := range metricSchemaSQLs {
54 if _, err = conn.Exec(ctx, sql); err != nil {
55 return err
56 }
57 }
58 return nil
59 }); err != nil {
60 return
61 }
62 if err = pgw.ReadMetricSchemaType(); err != nil {
63 return
64 }
65 if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
66 return
67 }
68 go pgw.deleteOldPartitions(deleterDelay)
69 go pgw.maintainUniqueSources()
70 go pgw.poll()
71 l.Info(`measurements sink is activated`)
72 return
73 }
74
75
76 var sqlMetricAdminSchema string
77
78
79 var sqlMetricAdminFunctions string
80
81
82 var sqlMetricEnsurePartitionPostgres string
83
84
85 var sqlMetricEnsurePartitionTimescale string
86
87
88 var sqlMetricChangeChunkIntervalTimescale string
89
90
91 var sqlMetricChangeCompressionIntervalTimescale string
92
93 var (
94 metricSchemaSQLs = []string{
95 sqlMetricAdminSchema,
96 sqlMetricAdminFunctions,
97 sqlMetricEnsurePartitionPostgres,
98 sqlMetricEnsurePartitionTimescale,
99 sqlMetricChangeChunkIntervalTimescale,
100 sqlMetricChangeCompressionIntervalTimescale,
101 }
102 )
103
104
105
106
107
108 type PostgresWriter struct {
109 ctx context.Context
110 sinkDb db.PgxPoolIface
111 metricSchema DbStorageSchemaType
112 opts *CmdOpts
113 input chan metrics.MeasurementEnvelope
114 lastError chan error
115 }
116
117 type ExistingPartitionInfo struct {
118 StartTime time.Time
119 EndTime time.Time
120 }
121
122 type MeasurementMessagePostgres struct {
123 Time time.Time
124 DBName string
125 Metric string
126 Data map[string]any
127 TagData map[string]string
128 }
129
130 type DbStorageSchemaType int
131
132 const (
133 DbStorageSchemaPostgres DbStorageSchemaType = iota
134 DbStorageSchemaTimescale
135 )
136
137 func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
138 var isTs bool
139 pgw.metricSchema = DbStorageSchemaPostgres
140 sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
141 if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
142 pgw.metricSchema = DbStorageSchemaTimescale
143 }
144 return
145 }
146
147 var (
148 forceRecreatePartitions = false
149 partitionMapMetric = make(map[string]ExistingPartitionInfo)
150 partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo)
151 )
152
153
154 func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
155 if op == AddOp {
156 return errors.Join(
157 pgw.AddDBUniqueMetricToListingTable(sourceName, metricName),
158 pgw.EnsureMetricDummy(metricName),
159 )
160 }
161 return nil
162 }
163
164
165 func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
166 for _, name := range metrics.GetDefaultBuiltInMetrics() {
167 err = errors.Join(err, pgw.EnsureMetricDummy(name))
168 }
169 return
170 }
171
172
173 func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
174 _, err = pgw.sinkDb.Exec(pgw.ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
175 return
176 }
177
178
179 func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error {
180 if pgw.ctx.Err() != nil {
181 return pgw.ctx.Err()
182 }
183 select {
184 case pgw.input <- msg:
185
186 case <-time.After(highLoadTimeout):
187
188 }
189 select {
190 case err := <-pgw.lastError:
191 return err
192 default:
193 return nil
194 }
195 }
196
197
198 func (pgw *PostgresWriter) poll() {
199 cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
200 cacheTimeout := pgw.opts.BatchingDelay
201 tick := time.NewTicker(cacheTimeout)
202 for {
203 select {
204 case <-pgw.ctx.Done():
205 return
206 default:
207 select {
208 case entry := <-pgw.input:
209 cache = append(cache, entry)
210 if len(cache) < cacheLimit {
211 break
212 }
213 tick.Stop()
214 pgw.flush(cache)
215 cache = cache[:0]
216 tick = time.NewTicker(cacheTimeout)
217 case <-tick.C:
218 pgw.flush(cache)
219 cache = cache[:0]
220 case <-pgw.ctx.Done():
221 return
222 }
223 }
224 }
225 }
226
227 func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements {
228 return ©FromMeasurements{envelopes: rows, envelopeIdx: -1, measurementIdx: -1}
229 }
230
231 type copyFromMeasurements struct {
232 envelopes []metrics.MeasurementEnvelope
233 envelopeIdx int
234 measurementIdx int
235 metricName string
236 }
237
238 func (c *copyFromMeasurements) NextEnvelope() bool {
239 c.envelopeIdx++
240 c.measurementIdx = -1
241 return c.envelopeIdx < len(c.envelopes)
242 }
243
244 func (c *copyFromMeasurements) Next() bool {
245 for {
246
247 if c.envelopeIdx < 0 || c.measurementIdx+1 >= len(c.envelopes[c.envelopeIdx].Data) {
248
249 if ok := c.NextEnvelope(); !ok {
250 return false
251 }
252
253 if c.metricName == "" {
254 c.metricName = c.envelopes[c.envelopeIdx].MetricName
255 } else if c.metricName != c.envelopes[c.envelopeIdx].MetricName {
256
257
258 c.envelopeIdx--
259 c.measurementIdx = len(c.envelopes[c.envelopeIdx].Data)
260 c.metricName = ""
261 return false
262 }
263 }
264
265
266 c.measurementIdx++
267 if c.measurementIdx < len(c.envelopes[c.envelopeIdx].Data) {
268 return true
269 }
270
271 }
272 }
273
274 func (c *copyFromMeasurements) EOF() bool {
275 return c.envelopeIdx >= len(c.envelopes)
276 }
277
278 func (c *copyFromMeasurements) Values() ([]any, error) {
279 row := maps.Clone(c.envelopes[c.envelopeIdx].Data[c.measurementIdx])
280 tagRow := maps.Clone(c.envelopes[c.envelopeIdx].CustomTags)
281 if tagRow == nil {
282 tagRow = make(map[string]string)
283 }
284 for k, v := range row {
285 if after, ok := strings.CutPrefix(k, metrics.TagPrefix); ok {
286 tagRow[after] = fmt.Sprintf("%v", v)
287 delete(row, k)
288 }
289 }
290 jsonTags, terr := jsoniter.ConfigFastest.MarshalToString(tagRow)
291 json, err := jsoniter.ConfigFastest.MarshalToString(row)
292 if err != nil || terr != nil {
293 return nil, errors.Join(err, terr)
294 }
295 return []any{time.Unix(0, c.envelopes[c.envelopeIdx].Data.GetEpoch()), c.envelopes[c.envelopeIdx].DBName, json, jsonTags}, nil
296 }
297
298 func (c *copyFromMeasurements) Err() error {
299 return nil
300 }
301
302 func (c *copyFromMeasurements) MetricName() (ident pgx.Identifier) {
303 if c.envelopeIdx+1 < len(c.envelopes) {
304
305 ident = pgx.Identifier{c.envelopes[c.envelopeIdx+1].MetricName}
306 }
307 return
308 }
309
310
311 func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
312 if len(msgs) == 0 {
313 return
314 }
315 logger := log.GetLogger(pgw.ctx)
316
317 pgPartBounds := make(map[string]ExistingPartitionInfo)
318 pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo)
319 var err error
320
321 slices.SortFunc(msgs, func(a, b metrics.MeasurementEnvelope) int {
322 if a.MetricName < b.MetricName {
323 return -1
324 } else if a.MetricName > b.MetricName {
325 return 1
326 }
327 return 0
328 })
329
330 for _, msg := range msgs {
331 for _, dataRow := range msg.Data {
332 epochTime := time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
333 switch pgw.metricSchema {
334 case DbStorageSchemaTimescale:
335
336 bounds, ok := pgPartBounds[msg.MetricName]
337 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
338 bounds.StartTime = epochTime
339 pgPartBounds[msg.MetricName] = bounds
340 }
341 if !ok || (ok && epochTime.After(bounds.EndTime)) {
342 bounds.EndTime = epochTime
343 pgPartBounds[msg.MetricName] = bounds
344 }
345 case DbStorageSchemaPostgres:
346 _, ok := pgPartBoundsDbName[msg.MetricName]
347 if !ok {
348 pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
349 }
350 bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
351 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
352 bounds.StartTime = epochTime
353 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
354 }
355 if !ok || (ok && epochTime.After(bounds.EndTime)) {
356 bounds.EndTime = epochTime
357 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
358 }
359 default:
360 logger.Fatal("unknown storage schema...")
361 }
362 }
363 }
364
365 switch pgw.metricSchema {
366 case DbStorageSchemaPostgres:
367 err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
368 case DbStorageSchemaTimescale:
369 err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePartitions)
370 default:
371 logger.Fatal("unknown storage schema...")
372 }
373 forceRecreatePartitions = false
374 if err != nil {
375 pgw.lastError <- err
376 }
377
378 var rowsBatched, n int64
379 t1 := time.Now()
380 cfm := newCopyFromMeasurements(msgs)
381 for !cfm.EOF() {
382 n, err = pgw.sinkDb.CopyFrom(context.Background(), cfm.MetricName(), targetColumns[:], cfm)
383 rowsBatched += n
384 if err != nil {
385 logger.Error(err)
386 if PgError, ok := err.(*pgconn.PgError); ok {
387 forceRecreatePartitions = PgError.Code == "23514"
388 }
389 if forceRecreatePartitions {
390 logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
391 }
392 }
393 }
394 diff := time.Since(t1)
395 if err == nil {
396 logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
397 return
398 }
399 pgw.lastError <- err
400 }
401
402
403 func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
404 logger := log.GetLogger(pgw.ctx)
405 sqlEnsure := `select part_available_from, part_available_to from admin.ensure_partition_metric_time($1, $2)`
406 for metric, pb := range pgPartBounds {
407 if !strings.HasSuffix(metric, "_realtime") {
408 continue
409 }
410 if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
411 return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
412 }
413
414 partInfo, ok := partitionMapMetric[metric]
415 if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
416 err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).
417 Scan(&partInfo.StartTime, &partInfo.EndTime)
418 if err != nil {
419 logger.Error("Failed to create partition on 'metrics': ", err)
420 return err
421 }
422 partitionMapMetric[metric] = partInfo
423 }
424 if pb.EndTime.After(partInfo.EndTime) || force {
425 err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(nil, &partInfo.EndTime)
426 if err != nil {
427 logger.Error("Failed to create partition on 'metrics': ", err)
428 return err
429 }
430 partitionMapMetric[metric] = partInfo
431 }
432 }
433 return nil
434 }
435
436 func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
437 logger := log.GetLogger(pgw.ctx)
438 sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
439 for metric := range pgPartBounds {
440 if strings.HasSuffix(metric, "_realtime") {
441 continue
442 }
443 if _, ok := partitionMapMetric[metric]; !ok {
444 if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
445 logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
446 return err
447 }
448 partitionMapMetric[metric] = ExistingPartitionInfo{}
449 }
450 }
451 return pgw.EnsureMetricTime(pgPartBounds, force)
452 }
453
454 func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
455 var rows pgx.Rows
456 sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
457 for metric, dbnameTimestampMap := range metricDbnamePartBounds {
458 _, ok := partitionMapMetricDbname[metric]
459 if !ok {
460 partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
461 }
462
463 for dbname, pb := range dbnameTimestampMap {
464 if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
465 return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
466 }
467 partInfo, ok := partitionMapMetricDbname[metric][dbname]
468 if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
469 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
470 return
471 }
472 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
473 return err
474 }
475 partitionMapMetricDbname[metric][dbname] = partInfo
476 }
477 if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
478 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
479 return
480 }
481 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
482 return err
483 }
484 partitionMapMetricDbname[metric][dbname] = partInfo
485 }
486 }
487 }
488 return nil
489 }
490
491
492 func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
493 metricAgeDaysThreshold := pgw.opts.Retention
494 if metricAgeDaysThreshold <= 0 {
495 return
496 }
497 logger := log.GetLogger(pgw.ctx)
498 select {
499 case <-pgw.ctx.Done():
500 return
501 case <-time.After(delay):
502
503 }
504
505 for {
506 if pgw.metricSchema == DbStorageSchemaTimescale {
507 partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
508 if err != nil {
509 logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
510 continue
511 }
512 logger.Infof("Dropped %d old metric partitions...", partsDropped)
513 } else if pgw.metricSchema == DbStorageSchemaPostgres {
514 partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
515 if err != nil {
516 logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
517 time.Sleep(time.Second * 300)
518 continue
519 }
520 if len(partsToDrop) > 0 {
521 logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
522 for _, toDrop := range partsToDrop {
523 sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
524
525 if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlDropTable); err != nil {
526 logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
527 time.Sleep(time.Second * 300)
528 } else {
529 time.Sleep(time.Second * 5)
530 }
531 }
532 } else {
533 logger.Infof("No old metric partitions found to drop...")
534 }
535 }
536 select {
537 case <-pgw.ctx.Done():
538 return
539 case <-time.After(time.Hour * 12):
540 }
541 }
542 }
543
544
545
546 func (pgw *PostgresWriter) maintainUniqueSources() {
547 logger := log.GetLogger(pgw.ctx)
548
549 sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock`
550 sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
551 sqlDistinct := `
552 WITH RECURSIVE t(dbname) AS (
553 SELECT MIN(dbname) AS dbname FROM %s
554 UNION
555 SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
556 SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
557 sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
558 sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
559 sqlAdd := `
560 INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
561 WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
562 RETURNING *`
563
564 for {
565 select {
566 case <-pgw.ctx.Done():
567 return
568 case <-time.After(time.Hour * 24):
569 }
570 var lock bool
571 logger.Infof("Trying to get metricsDb listing maintainer advisory lock...")
572 if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
573 logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
574 continue
575 }
576 if !lock {
577 logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
578 continue
579 }
580
581 logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
582 rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
583 allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
584 if err != nil {
585 logger.Error(err)
586 continue
587 }
588
589 for _, tableName := range allDistinctMetricTables {
590 foundDbnamesMap := make(map[string]bool)
591 foundDbnamesArr := make([]string, 0)
592 metricName := strings.Replace(tableName, "public.", "", 1)
593
594 logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
595 rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
596 ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
597
598 if err != nil {
599 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
600 break
601 }
602 for _, drDbname := range ret {
603 foundDbnamesMap[drDbname] = true
604 }
605
606
607 for k := range foundDbnamesMap {
608 foundDbnamesArr = append(foundDbnamesArr, k)
609 }
610 if len(foundDbnamesArr) == 0 {
611 logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
612
613 _, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
614 if err != nil {
615 logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
616 }
617 continue
618 }
619 cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
620 if err != nil {
621 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
622 } else if cmdTag.RowsAffected() > 0 {
623 logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
624 }
625 cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
626 if err != nil {
627 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
628 } else if cmdTag.RowsAffected() > 0 {
629 logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
630 }
631 time.Sleep(time.Minute)
632 }
633
634 }
635 }
636
637 func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
638 sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
639 err = pgw.sinkDb.QueryRow(pgw.ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
640 return
641 }
642
643 func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
644 sqlGetOldParts := `select admin.get_old_time_partitions($1)`
645 rows, err := pgw.sinkDb.Query(pgw.ctx, sqlGetOldParts, metricAgeDaysThreshold)
646 if err == nil {
647 return pgx.CollectRows(rows, pgx.RowTo[string])
648 }
649 return nil, err
650 }
651
652 func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
653 sql := `insert into admin.all_distinct_dbname_metrics
654 select $1, $2
655 where not exists (
656 select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
657 )`
658 _, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
659 return err
660 }
661