1 package sinks
2
3 import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "strings"
10 "time"
11
12 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
13 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
15 "github.com/jackc/pgx/v5"
16 "github.com/jackc/pgx/v5/pgconn"
17 )
18
19 var (
20 cacheLimit = 512
21 highLoadTimeout = time.Second * 5
22 deleterDelay = time.Hour
23 )
24
25 func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) {
26 var conn db.PgxPoolIface
27 if conn, err = db.New(ctx, connstr); err != nil {
28 return
29 }
30 return NewWriterFromPostgresConn(ctx, conn, opts, metricDefs)
31 }
32
33 func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) {
34 l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
35 ctx = log.WithLogger(ctx, l)
36 pgw = &PostgresWriter{
37 ctx: ctx,
38 metricDefs: metricDefs,
39 opts: opts,
40 input: make(chan []metrics.MeasurementEnvelope, cacheLimit),
41 lastError: make(chan error),
42 sinkDb: conn,
43 }
44 if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
45 l.Info("initialising measurements database...")
46 exists, err := db.DoesSchemaExist(ctx, conn, "admin")
47 if err != nil || exists {
48 return err
49 }
50 for _, sql := range metricSchemaSQLs {
51 if _, err = conn.Exec(ctx, sql); err != nil {
52 return err
53 }
54 }
55 return nil
56 }); err != nil {
57 return
58 }
59 if err = pgw.ReadMetricSchemaType(); err != nil {
60 return
61 }
62 if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
63 return
64 }
65 go pgw.deleteOldPartitions(deleterDelay)
66 go pgw.maintainUniqueSources()
67 go pgw.poll()
68 l.Info(`measurements sink is activated`)
69 return
70 }
71
72
73 var sqlMetricAdminSchema string
74
75
76 var sqlMetricAdminFunctions string
77
78
79 var sqlMetricEnsurePartitionPostgres string
80
81
82 var sqlMetricEnsurePartitionTimescale string
83
84
85 var sqlMetricChangeChunkIntervalTimescale string
86
87
88 var sqlMetricChangeCompressionIntervalTimescale string
89
90 var (
91 metricSchemaSQLs = []string{
92 sqlMetricAdminSchema,
93 sqlMetricAdminFunctions,
94 sqlMetricEnsurePartitionPostgres,
95 sqlMetricEnsurePartitionTimescale,
96 sqlMetricChangeChunkIntervalTimescale,
97 sqlMetricChangeCompressionIntervalTimescale,
98 }
99 )
100
101
102
103
104
105 type PostgresWriter struct {
106 ctx context.Context
107 sinkDb db.PgxPoolIface
108 metricSchema DbStorageSchemaType
109 metricDefs *metrics.Metrics
110 opts *CmdOpts
111 input chan []metrics.MeasurementEnvelope
112 lastError chan error
113 }
114
115 type ExistingPartitionInfo struct {
116 StartTime time.Time
117 EndTime time.Time
118 }
119
120 type MeasurementMessagePostgres struct {
121 Time time.Time
122 DBName string
123 Metric string
124 Data map[string]any
125 TagData map[string]any
126 }
127
128 type DbStorageSchemaType int
129
130 const (
131 DbStorageSchemaPostgres DbStorageSchemaType = iota
132 DbStorageSchemaTimescale
133 )
134
135 func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
136 var isTs bool
137 pgw.metricSchema = DbStorageSchemaPostgres
138 sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
139 if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
140 pgw.metricSchema = DbStorageSchemaTimescale
141 }
142 return
143 }
144
145 var (
146 forceRecreatePartitions = false
147 partitionMapMetric = make(map[string]ExistingPartitionInfo)
148 partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo)
149 )
150
151
152 func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error {
153 if op == "add" {
154 return errors.Join(
155 pgw.AddDBUniqueMetricToListingTable(dbUnique, metricName),
156 pgw.EnsureMetricDummy(metricName),
157 )
158 }
159 return nil
160 }
161
162
163 func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
164 for _, name := range metrics.GetDefaultBuiltInMetrics() {
165 err = errors.Join(err, pgw.EnsureMetricDummy(name))
166 }
167 return
168 }
169
170
171 func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
172 _, err = pgw.sinkDb.Exec(pgw.ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
173 return
174 }
175
176
177 func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementEnvelope) error {
178 if pgw.ctx.Err() != nil {
179 return pgw.ctx.Err()
180 }
181 select {
182 case pgw.input <- msgs:
183
184 case <-time.After(highLoadTimeout):
185
186 }
187 select {
188 case err := <-pgw.lastError:
189 return err
190 default:
191 return nil
192 }
193 }
194
195
196 func (pgw *PostgresWriter) poll() {
197 cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
198 cacheTimeout := pgw.opts.BatchingDelay
199 tick := time.NewTicker(cacheTimeout)
200 for {
201 select {
202 case <-pgw.ctx.Done():
203 return
204 default:
205 select {
206 case entry := <-pgw.input:
207 cache = append(cache, entry...)
208 if len(cache) < cacheLimit {
209 break
210 }
211 tick.Stop()
212 pgw.flush(cache)
213 cache = cache[:0]
214 tick = time.NewTicker(cacheTimeout)
215 case <-tick.C:
216 pgw.flush(cache)
217 cache = cache[:0]
218 case <-pgw.ctx.Done():
219 return
220 }
221 }
222 }
223 }
224
225
226 func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
227 if len(msgs) == 0 {
228 return
229 }
230 logger := log.GetLogger(pgw.ctx)
231 metricsToStorePerMetric := make(map[string][]MeasurementMessagePostgres)
232 rowsBatched := 0
233 totalRows := 0
234 pgPartBounds := make(map[string]ExistingPartitionInfo)
235 pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo)
236 var err error
237
238 for _, msg := range msgs {
239 if len(msg.Data) == 0 {
240 continue
241 }
242 logger.WithField("data", msg.Data).WithField("len", len(msg.Data)).Debug("sending to postgres")
243
244 for _, dataRow := range msg.Data {
245 var epochTime time.Time
246
247 tags := make(map[string]any)
248 fields := make(map[string]any)
249
250 totalRows++
251
252 if msg.CustomTags != nil {
253 for k, v := range msg.CustomTags {
254 tags[k] = fmt.Sprintf("%v", v)
255 }
256 }
257 epochTime = time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
258 for k, v := range dataRow {
259 if v == nil || v == "" || k == metrics.EpochColumnName {
260 continue
261 }
262 if strings.HasPrefix(k, metrics.TagPrefix) {
263 tag := k[4:]
264 tags[tag] = fmt.Sprintf("%v", v)
265 } else {
266 fields[k] = v
267 }
268 }
269
270 var metricsArr []MeasurementMessagePostgres
271 var ok bool
272
273 metricNameTemp := msg.MetricName
274
275 metricsArr, ok = metricsToStorePerMetric[metricNameTemp]
276 if !ok {
277 metricsToStorePerMetric[metricNameTemp] = make([]MeasurementMessagePostgres, 0)
278 }
279 metricsArr = append(metricsArr, MeasurementMessagePostgres{Time: epochTime, DBName: msg.DBName,
280 Metric: msg.MetricName, Data: fields, TagData: tags})
281 metricsToStorePerMetric[metricNameTemp] = metricsArr
282
283 rowsBatched++
284
285 switch pgw.metricSchema {
286 case DbStorageSchemaTimescale:
287
288 bounds, ok := pgPartBounds[msg.MetricName]
289 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
290 bounds.StartTime = epochTime
291 pgPartBounds[msg.MetricName] = bounds
292 }
293 if !ok || (ok && epochTime.After(bounds.EndTime)) {
294 bounds.EndTime = epochTime
295 pgPartBounds[msg.MetricName] = bounds
296 }
297 case DbStorageSchemaPostgres:
298 _, ok := pgPartBoundsDbName[msg.MetricName]
299 if !ok {
300 pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
301 }
302 bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
303 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
304 bounds.StartTime = epochTime
305 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
306 }
307 if !ok || (ok && epochTime.After(bounds.EndTime)) {
308 bounds.EndTime = epochTime
309 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
310 }
311 default:
312 logger.Fatal("unknown storage schema...")
313 }
314 }
315 }
316
317 switch pgw.metricSchema {
318 case DbStorageSchemaPostgres:
319 err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
320 case DbStorageSchemaTimescale:
321 err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePartitions)
322 default:
323 logger.Fatal("unknown storage schema...")
324 }
325 if forceRecreatePartitions {
326 forceRecreatePartitions = false
327 }
328 if err != nil {
329 pgw.lastError <- err
330 }
331
332
333 logger.Debugf("COPY-ing %d metrics to Postgres metricsDB...", rowsBatched)
334 t1 := time.Now()
335
336 for metricName, metrics := range metricsToStorePerMetric {
337
338 getTargetTable := func() pgx.Identifier {
339 return pgx.Identifier{metricName}
340 }
341
342 getTargetColumns := func() []string {
343 return []string{"time", "dbname", "data", "tag_data"}
344 }
345
346 for _, m := range metrics {
347 l := logger.WithField("db", m.DBName).WithField("metric", m.Metric)
348 jsonBytes, err := json.Marshal(m.Data)
349 if err != nil {
350 logger.Errorf("Skipping 1 metric for [%s:%s] due to JSON conversion error: %s", m.DBName, m.Metric, err)
351 continue
352 }
353
354 getTagData := func() any {
355 if len(m.TagData) > 0 {
356 jsonBytesTags, err := json.Marshal(m.TagData)
357 if err != nil {
358 l.Error(err)
359 return nil
360 }
361 return string(jsonBytesTags)
362 }
363 return nil
364 }
365
366 rows := [][]any{{m.Time, m.DBName, string(jsonBytes), getTagData()}}
367
368 if _, err = pgw.sinkDb.CopyFrom(context.Background(), getTargetTable(), getTargetColumns(), pgx.CopyFromRows(rows)); err != nil {
369 l.Error(err)
370 if PgError, ok := err.(*pgconn.PgError); ok {
371 forceRecreatePartitions = PgError.Code == "23514"
372 }
373 if forceRecreatePartitions {
374 logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
375 }
376 }
377 }
378 }
379
380 diff := time.Since(t1)
381 if err == nil {
382 logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
383 return
384 }
385 pgw.lastError <- err
386 }
387
388
389 func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
390 logger := log.GetLogger(pgw.ctx)
391 sqlEnsure := `select part_available_from, part_available_to from admin.ensure_partition_metric_time($1, $2)`
392 for metric, pb := range pgPartBounds {
393 if !strings.HasSuffix(metric, "_realtime") {
394 continue
395 }
396 if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
397 return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
398 }
399
400 partInfo, ok := partitionMapMetric[metric]
401 if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
402 err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).
403 Scan(&partInfo.StartTime, &partInfo.EndTime)
404 if err != nil {
405 logger.Error("Failed to create partition on 'metrics': ", err)
406 return err
407 }
408 partitionMapMetric[metric] = partInfo
409 }
410 if pb.EndTime.After(partInfo.EndTime) || force {
411 err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(nil, &partInfo.EndTime)
412 if err != nil {
413 logger.Error("Failed to create partition on 'metrics': ", err)
414 return err
415 }
416 partitionMapMetric[metric] = partInfo
417 }
418 }
419 return nil
420 }
421
422 func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
423 logger := log.GetLogger(pgw.ctx)
424 sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
425 for metric := range pgPartBounds {
426 if strings.HasSuffix(metric, "_realtime") {
427 continue
428 }
429 if _, ok := partitionMapMetric[metric]; !ok {
430 if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
431 logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
432 return err
433 }
434 partitionMapMetric[metric] = ExistingPartitionInfo{}
435 }
436 }
437 return pgw.EnsureMetricTime(pgPartBounds, force)
438 }
439
440 func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
441 var rows pgx.Rows
442 sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
443 for metric, dbnameTimestampMap := range metricDbnamePartBounds {
444 _, ok := partitionMapMetricDbname[metric]
445 if !ok {
446 partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
447 }
448
449 for dbname, pb := range dbnameTimestampMap {
450 if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
451 return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
452 }
453 partInfo, ok := partitionMapMetricDbname[metric][dbname]
454 if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
455 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
456 return
457 }
458 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
459 return err
460 }
461 partitionMapMetricDbname[metric][dbname] = partInfo
462 }
463 if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
464 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
465 return
466 }
467 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
468 return err
469 }
470 partitionMapMetricDbname[metric][dbname] = partInfo
471 }
472 }
473 }
474 return nil
475 }
476
477
478 func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
479 metricAgeDaysThreshold := pgw.opts.Retention
480 if metricAgeDaysThreshold <= 0 {
481 return
482 }
483 logger := log.GetLogger(pgw.ctx)
484 select {
485 case <-pgw.ctx.Done():
486 return
487 case <-time.After(delay):
488
489 }
490
491 for {
492 if pgw.metricSchema == DbStorageSchemaTimescale {
493 partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
494 if err != nil {
495 logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
496 continue
497 }
498 logger.Infof("Dropped %d old metric partitions...", partsDropped)
499 } else if pgw.metricSchema == DbStorageSchemaPostgres {
500 partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
501 if err != nil {
502 logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
503 time.Sleep(time.Second * 300)
504 continue
505 }
506 if len(partsToDrop) > 0 {
507 logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
508 for _, toDrop := range partsToDrop {
509 sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
510 logger.Debugf("Dropping old metric data partition: %s", toDrop)
511
512 if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlDropTable); err != nil {
513 logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
514 time.Sleep(time.Second * 300)
515 } else {
516 time.Sleep(time.Second * 5)
517 }
518 }
519 } else {
520 logger.Infof("No old metric partitions found to drop...")
521 }
522 }
523 select {
524 case <-pgw.ctx.Done():
525 return
526 case <-time.After(time.Hour * 12):
527 }
528 }
529 }
530
531
532
533 func (pgw *PostgresWriter) maintainUniqueSources() {
534 logger := log.GetLogger(pgw.ctx)
535
536 sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock`
537 sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
538 sqlDistinct := `
539 WITH RECURSIVE t(dbname) AS (
540 SELECT MIN(dbname) AS dbname FROM %s
541 UNION
542 SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
543 SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
544 sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
545 sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
546 sqlAdd := `
547 INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
548 WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
549 RETURNING *`
550
551 for {
552 select {
553 case <-pgw.ctx.Done():
554 return
555 case <-time.After(time.Hour * 24):
556 }
557 var lock bool
558 logger.Infof("Trying to get metricsDb listing maintainer advisory lock...")
559 if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
560 logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
561 continue
562 }
563 if !lock {
564 logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
565 continue
566 }
567
568 logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
569 rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
570 allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
571 if err != nil {
572 logger.Error(err)
573 continue
574 }
575
576 for _, tableName := range allDistinctMetricTables {
577 foundDbnamesMap := make(map[string]bool)
578 foundDbnamesArr := make([]string, 0)
579 metricName := strings.Replace(tableName, "public.", "", 1)
580
581 logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
582 rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
583 ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
584
585 if err != nil {
586 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
587 break
588 }
589 for _, drDbname := range ret {
590 foundDbnamesMap[drDbname] = true
591 }
592
593
594 for k := range foundDbnamesMap {
595 foundDbnamesArr = append(foundDbnamesArr, k)
596 }
597 if len(foundDbnamesArr) == 0 {
598 logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
599
600 _, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
601 if err != nil {
602 logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
603 }
604 continue
605 }
606 cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
607 if err != nil {
608 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
609 } else if cmdTag.RowsAffected() > 0 {
610 logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
611 }
612 cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
613 if err != nil {
614 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
615 } else if cmdTag.RowsAffected() > 0 {
616 logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
617 }
618 time.Sleep(time.Minute)
619 }
620
621 }
622 }
623
624 func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
625 sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
626 err = pgw.sinkDb.QueryRow(pgw.ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
627 return
628 }
629
630 func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
631 sqlGetOldParts := `select admin.get_old_time_partitions($1)`
632 rows, err := pgw.sinkDb.Query(pgw.ctx, sqlGetOldParts, metricAgeDaysThreshold)
633 if err == nil {
634 return pgx.CollectRows(rows, pgx.RowTo[string])
635 }
636 return nil, err
637 }
638
639 func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
640 sql := `insert into admin.all_distinct_dbname_metrics
641 select $1, $2
642 where not exists (
643 select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
644 )`
645 _, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
646 return err
647 }
648