1 package sinks
2
3 import (
4 "context"
5 _ "embed"
6 "errors"
7 "fmt"
8 "maps"
9 "slices"
10 "strings"
11 "time"
12
13 jsoniter "github.com/json-iterator/go"
14
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
17 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
18 "github.com/jackc/pgx/v5"
19 "github.com/jackc/pgx/v5/pgconn"
20 )
21
22 var (
23 cacheLimit = 256
24 highLoadTimeout = time.Second * 5
25 targetColumns = [...]string{"time", "dbname", "data", "tag_data"}
26 )
27
28
29
30
31
32 type PostgresWriter struct {
33 ctx context.Context
34 sinkDb db.PgxPoolIface
35 metricSchema DbStorageSchemaType
36 opts *CmdOpts
37 retentionInterval time.Duration
38 maintenanceInterval time.Duration
39 input chan metrics.MeasurementEnvelope
40 lastError chan error
41 }
42
43 func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
44 var conn db.PgxPoolIface
45 if conn, err = db.New(ctx, connstr); err != nil {
46 return
47 }
48 return NewWriterFromPostgresConn(ctx, conn, opts)
49 }
50
51 func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error) {
52 l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
53 ctx = log.WithLogger(ctx, l)
54 pgw = &PostgresWriter{
55 ctx: ctx,
56 opts: opts,
57 input: make(chan metrics.MeasurementEnvelope, cacheLimit),
58 lastError: make(chan error),
59 sinkDb: conn,
60 }
61 if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
62 var isValidPartitionInterval bool
63 if err = conn.QueryRow(ctx,
64 "SELECT extract(epoch from $1::interval), extract(epoch from $2::interval), $3::interval >= '1h'::interval",
65 opts.RetentionInterval, opts.MaintenanceInterval, opts.PartitionInterval,
66 ).Scan(&pgw.retentionInterval, &pgw.maintenanceInterval, &isValidPartitionInterval); err != nil {
67 return err
68 }
69
70
71 pgw.retentionInterval *= time.Second
72 pgw.maintenanceInterval *= time.Second
73
74 if !isValidPartitionInterval {
75 return fmt.Errorf("--partition-interval must be at least 1 hour, got: %s", opts.PartitionInterval)
76 }
77 if pgw.maintenanceInterval < 0 {
78 return errors.New("--maintenance-interval must be a positive PostgreSQL interval or 0 to disable it")
79 }
80 if pgw.retentionInterval < time.Hour && pgw.retentionInterval != 0 {
81 return errors.New("--retention must be at least 1 hour PostgreSQL interval or 0 to disable it")
82 }
83
84 l.Info("initialising measurements database...")
85 exists, err := db.DoesSchemaExist(ctx, conn, "admin")
86 if err != nil || exists {
87 return err
88 }
89 for _, sql := range metricSchemaSQLs {
90 if _, err = conn.Exec(ctx, sql); err != nil {
91 return err
92 }
93 }
94 return nil
95 }); err != nil {
96 return
97 }
98 if err = pgw.ReadMetricSchemaType(); err != nil {
99 return
100 }
101 if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
102 return
103 }
104
105 pgw.scheduleJob(pgw.maintenanceInterval, pgw.MaintainUniqueSources)
106 pgw.scheduleJob(pgw.retentionInterval, pgw.DeleteOldPartitions)
107
108 go pgw.poll()
109 l.Info(`measurements sink is activated`)
110 return
111 }
112
113
114 var sqlMetricAdminSchema string
115
116
117 var sqlMetricAdminFunctions string
118
119
120 var sqlMetricEnsurePartitionPostgres string
121
122
123 var sqlMetricEnsurePartitionTimescale string
124
125
126 var sqlMetricChangeChunkIntervalTimescale string
127
128
129 var sqlMetricChangeCompressionIntervalTimescale string
130
131 var (
132 metricSchemaSQLs = []string{
133 sqlMetricAdminSchema,
134 sqlMetricAdminFunctions,
135 sqlMetricEnsurePartitionPostgres,
136 sqlMetricEnsurePartitionTimescale,
137 sqlMetricChangeChunkIntervalTimescale,
138 sqlMetricChangeCompressionIntervalTimescale,
139 }
140 )
141
142 type ExistingPartitionInfo struct {
143 StartTime time.Time
144 EndTime time.Time
145 }
146
147 type MeasurementMessagePostgres struct {
148 Time time.Time
149 DBName string
150 Metric string
151 Data map[string]any
152 TagData map[string]string
153 }
154
155 type DbStorageSchemaType int
156
157 const (
158 DbStorageSchemaPostgres DbStorageSchemaType = iota
159 DbStorageSchemaTimescale
160 )
161
162 func (pgw *PostgresWriter) scheduleJob(interval time.Duration, job func()) {
163 if interval > 0 {
164 go func() {
165 for {
166 select {
167 case <-pgw.ctx.Done():
168 return
169 case <-time.After(interval):
170 job()
171 }
172 }
173 }()
174 }
175 }
176
177 func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
178 var isTs bool
179 pgw.metricSchema = DbStorageSchemaPostgres
180 sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
181 if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
182 pgw.metricSchema = DbStorageSchemaTimescale
183 }
184 return
185 }
186
187 var (
188 forceRecreatePartitions = false
189 partitionMapMetric = make(map[string]ExistingPartitionInfo)
190 partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo)
191 )
192
193
194 func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
195 if op == AddOp {
196 return errors.Join(
197 pgw.AddDBUniqueMetricToListingTable(sourceName, metricName),
198 pgw.EnsureMetricDummy(metricName),
199 )
200 }
201 return nil
202 }
203
204
205 func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
206 for _, name := range metrics.GetDefaultBuiltInMetrics() {
207 err = errors.Join(err, pgw.EnsureMetricDummy(name))
208 }
209 return
210 }
211
212
213 func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
214 _, err = pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.ensure_dummy_metrics_table($1)", metric)
215 return
216 }
217
218
219 func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error {
220 if pgw.ctx.Err() != nil {
221 return pgw.ctx.Err()
222 }
223 select {
224 case pgw.input <- msg:
225
226 case <-time.After(highLoadTimeout):
227
228 }
229 select {
230 case err := <-pgw.lastError:
231 return err
232 default:
233 return nil
234 }
235 }
236
237
238 func (pgw *PostgresWriter) poll() {
239 cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
240 cacheTimeout := pgw.opts.BatchingDelay
241 tick := time.NewTicker(cacheTimeout)
242 for {
243 select {
244 case <-pgw.ctx.Done():
245 return
246 default:
247 select {
248 case entry := <-pgw.input:
249 cache = append(cache, entry)
250 if len(cache) < cacheLimit {
251 break
252 }
253 tick.Stop()
254 pgw.flush(cache)
255 cache = cache[:0]
256 tick = time.NewTicker(cacheTimeout)
257 case <-tick.C:
258 pgw.flush(cache)
259 cache = cache[:0]
260 case <-pgw.ctx.Done():
261 return
262 }
263 }
264 }
265 }
266
267 func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements {
268 return ©FromMeasurements{envelopes: rows, envelopeIdx: -1, measurementIdx: -1}
269 }
270
271 type copyFromMeasurements struct {
272 envelopes []metrics.MeasurementEnvelope
273 envelopeIdx int
274 measurementIdx int
275 metricName string
276 }
277
278 func (c *copyFromMeasurements) NextEnvelope() bool {
279 c.envelopeIdx++
280 c.measurementIdx = -1
281 return c.envelopeIdx < len(c.envelopes)
282 }
283
284 func (c *copyFromMeasurements) Next() bool {
285 for {
286
287 if c.envelopeIdx < 0 || c.measurementIdx+1 >= len(c.envelopes[c.envelopeIdx].Data) {
288
289 if ok := c.NextEnvelope(); !ok {
290 return false
291 }
292
293 if c.metricName == "" {
294 c.metricName = c.envelopes[c.envelopeIdx].MetricName
295 } else if c.metricName != c.envelopes[c.envelopeIdx].MetricName {
296
297
298 c.envelopeIdx--
299 c.measurementIdx = len(c.envelopes[c.envelopeIdx].Data)
300 c.metricName = ""
301 return false
302 }
303 }
304
305
306 c.measurementIdx++
307 if c.measurementIdx < len(c.envelopes[c.envelopeIdx].Data) {
308 return true
309 }
310
311 }
312 }
313
314 func (c *copyFromMeasurements) EOF() bool {
315 return c.envelopeIdx >= len(c.envelopes)
316 }
317
318 func (c *copyFromMeasurements) Values() ([]any, error) {
319 row := maps.Clone(c.envelopes[c.envelopeIdx].Data[c.measurementIdx])
320 tagRow := maps.Clone(c.envelopes[c.envelopeIdx].CustomTags)
321 if tagRow == nil {
322 tagRow = make(map[string]string)
323 }
324 for k, v := range row {
325 if after, ok := strings.CutPrefix(k, metrics.TagPrefix); ok {
326 tagRow[after] = fmt.Sprintf("%v", v)
327 delete(row, k)
328 }
329 }
330 jsonTags, terr := jsoniter.ConfigFastest.MarshalToString(tagRow)
331 json, err := jsoniter.ConfigFastest.MarshalToString(row)
332 if err != nil || terr != nil {
333 return nil, errors.Join(err, terr)
334 }
335 return []any{time.Unix(0, c.envelopes[c.envelopeIdx].Data.GetEpoch()), c.envelopes[c.envelopeIdx].DBName, json, jsonTags}, nil
336 }
337
338 func (c *copyFromMeasurements) Err() error {
339 return nil
340 }
341
342 func (c *copyFromMeasurements) MetricName() (ident pgx.Identifier) {
343 if c.envelopeIdx+1 < len(c.envelopes) {
344
345 ident = pgx.Identifier{c.envelopes[c.envelopeIdx+1].MetricName}
346 }
347 return
348 }
349
350
351 func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
352 if len(msgs) == 0 {
353 return
354 }
355 logger := log.GetLogger(pgw.ctx)
356 pgPartBounds := make(map[string]ExistingPartitionInfo)
357 pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo)
358 var err error
359
360 slices.SortFunc(msgs, func(a, b metrics.MeasurementEnvelope) int {
361 if a.MetricName < b.MetricName {
362 return -1
363 } else if a.MetricName > b.MetricName {
364 return 1
365 }
366 return 0
367 })
368
369 for _, msg := range msgs {
370 for _, dataRow := range msg.Data {
371 epochTime := time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
372 switch pgw.metricSchema {
373 case DbStorageSchemaTimescale:
374
375 bounds, ok := pgPartBounds[msg.MetricName]
376 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
377 bounds.StartTime = epochTime
378 pgPartBounds[msg.MetricName] = bounds
379 }
380 if !ok || (ok && epochTime.After(bounds.EndTime)) {
381 bounds.EndTime = epochTime
382 pgPartBounds[msg.MetricName] = bounds
383 }
384 case DbStorageSchemaPostgres:
385 _, ok := pgPartBoundsDbName[msg.MetricName]
386 if !ok {
387 pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
388 }
389 bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
390 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
391 bounds.StartTime = epochTime
392 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
393 }
394 if !ok || (ok && epochTime.After(bounds.EndTime)) {
395 bounds.EndTime = epochTime
396 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
397 }
398 default:
399 logger.Fatal("unknown storage schema...")
400 }
401 }
402 }
403
404 switch pgw.metricSchema {
405 case DbStorageSchemaPostgres:
406 err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
407 case DbStorageSchemaTimescale:
408 err = pgw.EnsureMetricTimescale(pgPartBounds)
409 default:
410 logger.Fatal("unknown storage schema...")
411 }
412 forceRecreatePartitions = false
413 if err != nil {
414 pgw.lastError <- err
415 }
416
417 var rowsBatched, n int64
418 t1 := time.Now()
419 cfm := newCopyFromMeasurements(msgs)
420 for !cfm.EOF() {
421 n, err = pgw.sinkDb.CopyFrom(context.Background(), cfm.MetricName(), targetColumns[:], cfm)
422 rowsBatched += n
423 if err != nil {
424 logger.Error(err)
425 if PgError, ok := err.(*pgconn.PgError); ok {
426 forceRecreatePartitions = PgError.Code == "23514"
427 }
428 if forceRecreatePartitions {
429 logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
430 }
431 }
432 }
433 diff := time.Since(t1)
434 if err == nil {
435 logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
436 return
437 }
438 pgw.lastError <- err
439 }
440
441 func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error) {
442 logger := log.GetLogger(pgw.ctx)
443 sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
444 for metric := range pgPartBounds {
445 if _, ok := partitionMapMetric[metric]; !ok {
446 if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
447 logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
448 return err
449 }
450 partitionMapMetric[metric] = ExistingPartitionInfo{}
451 }
452 }
453 return
454 }
455
456 func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
457 var rows pgx.Rows
458 sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3, $4)`
459 for metric, dbnameTimestampMap := range metricDbnamePartBounds {
460 _, ok := partitionMapMetricDbname[metric]
461 if !ok {
462 partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
463 }
464
465 for dbname, pb := range dbnameTimestampMap {
466 if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
467 return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
468 }
469 partInfo, ok := partitionMapMetricDbname[metric][dbname]
470 if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
471 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime, pgw.opts.PartitionInterval); err != nil {
472 return
473 }
474 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
475 return err
476 }
477 partitionMapMetricDbname[metric][dbname] = partInfo
478 }
479 if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
480 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime, pgw.opts.PartitionInterval); err != nil {
481 return
482 }
483 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
484 return err
485 }
486 partitionMapMetricDbname[metric][dbname] = partInfo
487 }
488 }
489 }
490 return nil
491 }
492
493
494 func (pgw *PostgresWriter) DeleteOldPartitions() {
495 l := log.GetLogger(pgw.ctx)
496 var partsDropped int
497 err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
498 pgw.opts.RetentionInterval).Scan(&partsDropped)
499 if err != nil {
500 l.Error("Could not drop old time partitions:", err)
501 } else if partsDropped > 0 {
502 l.Infof("Dropped %d old time partitions", partsDropped)
503 }
504 }
505
506
507
508
509 func (pgw *PostgresWriter) MaintainUniqueSources() {
510 logger := log.GetLogger(pgw.ctx)
511
512 sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock`
513 sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
514 sqlDistinct := `
515 WITH RECURSIVE t(dbname) AS (
516 SELECT MIN(dbname) AS dbname FROM %s
517 UNION
518 SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t
519 )
520 SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
521 sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
522 sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
523 sqlDroppedTables := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL($1)`
524 sqlAdd := `
525 INSERT INTO admin.all_distinct_dbname_metrics
526 SELECT u, $2 FROM (select unnest($1::text[]) as u) x
527 WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
528 RETURNING *`
529
530 var lock bool
531 logger.Infof("Trying to get admin.all_distinct_dbname_metrics maintainer advisory lock...")
532 if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
533 logger.Error("Getting admin.all_distinct_dbname_metrics maintainer advisory lock failed:", err)
534 return
535 }
536 if !lock {
537 logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
538 return
539 }
540
541 logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
542 rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
543 allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
544 if err != nil {
545 logger.Error(err)
546 return
547 }
548
549 for i, tableName := range allDistinctMetricTables {
550 foundDbnamesMap := make(map[string]bool)
551 foundDbnamesArr := make([]string, 0)
552
553 metricName := strings.Replace(tableName, "public.", "", 1)
554
555 allDistinctMetricTables[i] = metricName
556
557 logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
558 rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
559 ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
560 if err != nil {
561 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
562 continue
563 }
564 for _, drDbname := range ret {
565 foundDbnamesMap[drDbname] = true
566 }
567
568
569 for k := range foundDbnamesMap {
570 foundDbnamesArr = append(foundDbnamesArr, k)
571 }
572 if len(foundDbnamesArr) == 0 {
573 logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
574
575 _, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
576 if err != nil {
577 logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
578 }
579 continue
580 }
581 cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
582 if err != nil {
583 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
584 } else if cmdTag.RowsAffected() > 0 {
585 logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
586 }
587 cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
588 if err != nil {
589 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
590 } else if cmdTag.RowsAffected() > 0 {
591 logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
592 }
593 time.Sleep(time.Minute)
594 }
595
596 cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDroppedTables, allDistinctMetricTables)
597 if err != nil {
598 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for dropped metric tables: %s", err)
599 } else if cmdTag.RowsAffected() > 0 {
600 logger.Infof("Removed %d stale entries for dropped tables from all_distinct_dbname_metrics listing table", cmdTag.RowsAffected())
601 }
602 }
603
604 func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
605 sql := `INSERT INTO admin.all_distinct_dbname_metrics
606 SELECT $1, $2
607 WHERE NOT EXISTS (
608 SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = $1 AND metric = $2
609 )`
610 _, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
611 return err
612 }
613