1 package sinks
2
3 import (
4 "context"
5 _ "embed"
6 "errors"
7 "fmt"
8 "maps"
9 "slices"
10 "strings"
11 "time"
12
13 jsoniter "github.com/json-iterator/go"
14
15 "github.com/cybertec-postgresql/pgwatch/v5/internal/db"
16 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
17 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
18 migrator "github.com/cybertec-postgresql/pgx-migrator"
19 "github.com/jackc/pgx/v5"
20 "github.com/jackc/pgx/v5/pgconn"
21 "github.com/jackc/pgx/v5/pgxpool"
22 )
23
24 var (
25 cacheLimit = 256
26 highLoadTimeout = time.Second * 5
27 targetColumns = [...]string{"time", "dbname", "data", "tag_data"}
28 )
29
30
31 var sqlMetricAdminSchema string
32
33
34 var sqlMetricAdminFunctions string
35
36
37 var sqlMetricEnsurePartitionPostgres string
38
39
40 var sqlMetricEnsurePartitionTimescale string
41
42
43 var sqlMetricChangeChunkIntervalTimescale string
44
45
46 var sqlMetricChangeCompressionIntervalTimescale string
47
48 var (
49 metricSchemaSQLs = []string{
50 sqlMetricAdminSchema,
51 sqlMetricAdminFunctions,
52 sqlMetricEnsurePartitionPostgres,
53 sqlMetricEnsurePartitionTimescale,
54 sqlMetricChangeChunkIntervalTimescale,
55 sqlMetricChangeCompressionIntervalTimescale,
56 }
57 )
58
59
60
61
62
63 type PostgresWriter struct {
64 ctx context.Context
65 sinkDb db.PgxPoolIface
66 metricSchema DbStorageSchemaType
67 opts *CmdOpts
68 retentionInterval time.Duration
69 maintenanceInterval time.Duration
70 input chan metrics.MeasurementEnvelope
71 lastError chan error
72 forceRecreatePartitions bool
73 partitionMapMetric map[string]ExistingPartitionInfo
74 partitionMapMetricDbname map[string]map[string]ExistingPartitionInfo
75 }
76
77
78 var _ db.Migrator = (*PostgresWriter)(nil)
79
80 func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
81 var conn db.PgxPoolIface
82 if conn, err = db.New(ctx, connstr); err != nil {
83 return
84 }
85 return NewWriterFromPostgresConn(ctx, conn, opts)
86 }
87
88 var ErrNeedsMigration = errors.New("sink database schema is outdated, please run migrations using `pgwatch config upgrade` command")
89
90 func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error) {
91 l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
92 ctx = log.WithLogger(ctx, l)
93 pgw = &PostgresWriter{
94 ctx: ctx,
95 opts: opts,
96 input: make(chan metrics.MeasurementEnvelope, cacheLimit),
97 lastError: make(chan error),
98 sinkDb: conn,
99 forceRecreatePartitions: false,
100 partitionMapMetric: make(map[string]ExistingPartitionInfo),
101 partitionMapMetricDbname: make(map[string]map[string]ExistingPartitionInfo),
102 }
103 l.Info("initialising measurements database...")
104 if err = pgw.init(); err != nil {
105 return nil, err
106 }
107 if err = pgw.ReadMetricSchemaType(); err != nil {
108 return nil, err
109 }
110 if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
111 return nil, err
112 }
113 pgw.scheduleJob(pgw.maintenanceInterval, func() {
114 pgw.DeleteOldPartitions()
115 pgw.MaintainUniqueSources()
116 })
117 go pgw.poll()
118 l.Info(`measurements sink is activated`)
119 return
120 }
121
122 func (pgw *PostgresWriter) init() (err error) {
123 return db.Init(pgw.ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
124 var isValidPartitionInterval bool
125 if err = conn.QueryRow(ctx,
126 "SELECT extract(epoch from $1::interval), extract(epoch from $2::interval), $3::interval >= '1h'::interval",
127 pgw.opts.RetentionInterval, pgw.opts.MaintenanceInterval, pgw.opts.PartitionInterval,
128 ).Scan(&pgw.retentionInterval, &pgw.maintenanceInterval, &isValidPartitionInterval); err != nil {
129 return err
130 }
131
132
133 pgw.retentionInterval *= time.Second
134 pgw.maintenanceInterval *= time.Second
135
136 if !isValidPartitionInterval {
137 return fmt.Errorf("--partition-interval must be at least 1 hour, got: %s", pgw.opts.PartitionInterval)
138 }
139 if pgw.maintenanceInterval < 0 {
140 return errors.New("--maintenance-interval must be a positive PostgreSQL interval or 0 to disable it")
141 }
142 if pgw.retentionInterval < time.Hour && pgw.retentionInterval != 0 {
143 return errors.New("--retention must be at least 1 hour PostgreSQL interval or 0 to disable it")
144 }
145
146 exists, err := db.DoesSchemaExist(ctx, conn, "admin")
147 if err != nil || exists {
148 return err
149 }
150 for _, sql := range metricSchemaSQLs {
151 if _, err = conn.Exec(ctx, sql); err != nil {
152 return err
153 }
154 }
155 return nil
156 })
157 }
158
159 type ExistingPartitionInfo struct {
160 StartTime time.Time
161 EndTime time.Time
162 }
163
164 type MeasurementMessagePostgres struct {
165 Time time.Time
166 DBName string
167 Metric string
168 Data map[string]any
169 TagData map[string]string
170 }
171
172 type DbStorageSchemaType int
173
174 const (
175 DbStorageSchemaPostgres DbStorageSchemaType = iota
176 DbStorageSchemaTimescale
177 )
178
179 func (pgw *PostgresWriter) scheduleJob(interval time.Duration, job func()) {
180 if interval > 0 {
181 go func() {
182 for {
183 select {
184 case <-pgw.ctx.Done():
185 return
186 case <-time.After(interval):
187 job()
188 }
189 }
190 }()
191 }
192 }
193
194 func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
195 var isTs bool
196 pgw.metricSchema = DbStorageSchemaPostgres
197 sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
198 if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
199 pgw.metricSchema = DbStorageSchemaTimescale
200 }
201 return
202 }
203
204
205 func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
206 if op == AddOp {
207 return errors.Join(
208 pgw.AddDBUniqueMetricToListingTable(sourceName, metricName),
209 pgw.EnsureMetricDummy(metricName),
210 )
211 }
212 return nil
213 }
214
215
216 func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
217 for _, name := range metrics.GetDefaultBuiltInMetrics() {
218 err = errors.Join(err, pgw.EnsureMetricDummy(name))
219 }
220 return
221 }
222
223
224 func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
225 _, err = pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.ensure_dummy_metrics_table($1)", metric)
226 return
227 }
228
229
230 func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error {
231 if pgw.ctx.Err() != nil {
232 return pgw.ctx.Err()
233 }
234 select {
235 case pgw.input <- msg:
236
237 case <-time.After(highLoadTimeout):
238
239 }
240 select {
241 case err := <-pgw.lastError:
242 return err
243 default:
244 return nil
245 }
246 }
247
248
249 func (pgw *PostgresWriter) poll() {
250 cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
251 cacheTimeout := pgw.opts.BatchingDelay
252 tick := time.NewTicker(cacheTimeout)
253 for {
254 select {
255 case <-pgw.ctx.Done():
256 return
257 default:
258 select {
259 case entry := <-pgw.input:
260 cache = append(cache, entry)
261 if len(cache) < cacheLimit {
262 break
263 }
264 tick.Stop()
265 pgw.flush(cache)
266 cache = cache[:0]
267 tick = time.NewTicker(cacheTimeout)
268 case <-tick.C:
269 pgw.flush(cache)
270 cache = cache[:0]
271 case <-pgw.ctx.Done():
272 return
273 }
274 }
275 }
276 }
277
278 func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements {
279 return ©FromMeasurements{envelopes: rows, envelopeIdx: -1, measurementIdx: -1}
280 }
281
282 type copyFromMeasurements struct {
283 envelopes []metrics.MeasurementEnvelope
284 envelopeIdx int
285 measurementIdx int
286 metricName string
287 }
288
289 func (c *copyFromMeasurements) NextEnvelope() bool {
290 c.envelopeIdx++
291 c.measurementIdx = -1
292 return c.envelopeIdx < len(c.envelopes)
293 }
294
295 func (c *copyFromMeasurements) Next() bool {
296 for {
297
298 if c.envelopeIdx < 0 || c.measurementIdx+1 >= len(c.envelopes[c.envelopeIdx].Data) {
299
300 if ok := c.NextEnvelope(); !ok {
301 return false
302 }
303
304 if c.metricName == "" {
305 c.metricName = c.envelopes[c.envelopeIdx].MetricName
306 } else if c.metricName != c.envelopes[c.envelopeIdx].MetricName {
307
308
309 c.envelopeIdx--
310 c.measurementIdx = len(c.envelopes[c.envelopeIdx].Data)
311 c.metricName = ""
312 return false
313 }
314 }
315
316
317 c.measurementIdx++
318 if c.measurementIdx < len(c.envelopes[c.envelopeIdx].Data) {
319 return true
320 }
321
322 }
323 }
324
325 func (c *copyFromMeasurements) EOF() bool {
326 return c.envelopeIdx >= len(c.envelopes)
327 }
328
329 func (c *copyFromMeasurements) Values() ([]any, error) {
330 row := maps.Clone(c.envelopes[c.envelopeIdx].Data[c.measurementIdx])
331 tagRow := maps.Clone(c.envelopes[c.envelopeIdx].CustomTags)
332 if tagRow == nil {
333 tagRow = make(map[string]string)
334 }
335 for k, v := range row {
336 if after, ok := strings.CutPrefix(k, metrics.TagPrefix); ok {
337 tagRow[after] = fmt.Sprintf("%v", v)
338 delete(row, k)
339 }
340 }
341 jsonTags, terr := jsoniter.ConfigFastest.MarshalToString(tagRow)
342 json, err := jsoniter.ConfigFastest.MarshalToString(row)
343 if err != nil || terr != nil {
344 return nil, errors.Join(err, terr)
345 }
346 return []any{time.Unix(0, c.envelopes[c.envelopeIdx].Data.GetEpoch()), c.envelopes[c.envelopeIdx].DBName, json, jsonTags}, nil
347 }
348
349 func (c *copyFromMeasurements) Err() error {
350 return nil
351 }
352
353 func (c *copyFromMeasurements) MetricName() (ident pgx.Identifier) {
354 if c.envelopeIdx+1 < len(c.envelopes) {
355
356 ident = pgx.Identifier{c.envelopes[c.envelopeIdx+1].MetricName}
357 }
358 return
359 }
360
361
362 func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
363 if len(msgs) == 0 {
364 return
365 }
366 logger := log.GetLogger(pgw.ctx)
367 pgPartBounds := make(map[string]ExistingPartitionInfo)
368 pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo)
369 var err error
370
371 slices.SortFunc(msgs, func(a, b metrics.MeasurementEnvelope) int {
372 if a.MetricName < b.MetricName {
373 return -1
374 } else if a.MetricName > b.MetricName {
375 return 1
376 }
377 return 0
378 })
379
380 for _, msg := range msgs {
381 for _, dataRow := range msg.Data {
382 epochTime := time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
383 switch pgw.metricSchema {
384 case DbStorageSchemaTimescale:
385
386 bounds, ok := pgPartBounds[msg.MetricName]
387 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
388 bounds.StartTime = epochTime
389 pgPartBounds[msg.MetricName] = bounds
390 }
391 if !ok || (ok && epochTime.After(bounds.EndTime)) {
392 bounds.EndTime = epochTime
393 pgPartBounds[msg.MetricName] = bounds
394 }
395 case DbStorageSchemaPostgres:
396 _, ok := pgPartBoundsDbName[msg.MetricName]
397 if !ok {
398 pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
399 }
400 bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
401 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
402 bounds.StartTime = epochTime
403 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
404 }
405 if !ok || (ok && epochTime.After(bounds.EndTime)) {
406 bounds.EndTime = epochTime
407 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
408 }
409 default:
410 logger.Fatal("unknown storage schema...")
411 }
412 }
413 }
414
415 switch pgw.metricSchema {
416 case DbStorageSchemaPostgres:
417 err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName)
418 case DbStorageSchemaTimescale:
419 err = pgw.EnsureMetricTimescale(pgPartBounds)
420 default:
421 logger.Fatal("unknown storage schema...")
422 }
423 pgw.forceRecreatePartitions = false
424 if err != nil {
425 select {
426 case pgw.lastError <- err:
427 default:
428 }
429 }
430
431 var rowsBatched, n int64
432 t1 := time.Now()
433 cfm := newCopyFromMeasurements(msgs)
434 for !cfm.EOF() {
435 n, err = pgw.sinkDb.CopyFrom(context.Background(), cfm.MetricName(), targetColumns[:], cfm)
436 rowsBatched += n
437 if err != nil {
438 logger.Error(err)
439 if PgError, ok := err.(*pgconn.PgError); ok {
440 pgw.forceRecreatePartitions = PgError.Code == "23514"
441 }
442 if pgw.forceRecreatePartitions {
443 logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
444 }
445 }
446 }
447 diff := time.Since(t1)
448 if err == nil {
449 logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
450 return
451 }
452 select {
453 case pgw.lastError <- err:
454 default:
455 }
456 }
457
458 func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error) {
459 logger := log.GetLogger(pgw.ctx)
460 sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
461 for metric := range pgPartBounds {
462 if _, ok := pgw.partitionMapMetric[metric]; !ok {
463 if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
464 logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
465 return err
466 }
467 pgw.partitionMapMetric[metric] = ExistingPartitionInfo{}
468 }
469 }
470 return
471 }
472
473 func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error) {
474 var rows pgx.Rows
475 sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3, $4)`
476 for metric, dbnameTimestampMap := range metricDbnamePartBounds {
477 _, ok := pgw.partitionMapMetricDbname[metric]
478 if !ok {
479 pgw.partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
480 }
481
482 for dbname, pb := range dbnameTimestampMap {
483 if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
484 return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
485 }
486 partInfo, ok := pgw.partitionMapMetricDbname[metric][dbname]
487 if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || pgw.forceRecreatePartitions {
488 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime, pgw.opts.PartitionInterval); err != nil {
489 return
490 }
491 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
492 return err
493 }
494 pgw.partitionMapMetricDbname[metric][dbname] = partInfo
495 }
496 if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || pgw.forceRecreatePartitions {
497 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.EndTime, pgw.opts.PartitionInterval); err != nil {
498 return
499 }
500 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
501 return err
502 }
503 pgw.partitionMapMetricDbname[metric][dbname] = partInfo
504 }
505 }
506 }
507 return nil
508 }
509
510
511 func (pgw *PostgresWriter) DeleteOldPartitions() {
512 l := log.GetLogger(pgw.ctx)
513 var partsDropped int
514 err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
515 pgw.opts.RetentionInterval).Scan(&partsDropped)
516 if err != nil {
517 l.Error("Could not drop old time partitions:", err)
518 } else if partsDropped > 0 {
519 l.Infof("Dropped %d old time partitions", partsDropped)
520 }
521 }
522
523
524
525
526 func (pgw *PostgresWriter) MaintainUniqueSources() {
527 logger := log.GetLogger(pgw.ctx)
528 var rowsAffected int
529 if err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.maintain_unique_sources()`).Scan(&rowsAffected); err != nil {
530 logger.Error("Failed to run admin.all_distinct_dbname_metrics maintenance:", err)
531 return
532 }
533 logger.WithField("rows", rowsAffected).Info("Successfully processed admin.all_distinct_dbname_metrics")
534 }
535
536 func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
537 sql := `INSERT INTO admin.all_distinct_dbname_metrics
538 SELECT $1, $2
539 WHERE NOT EXISTS (
540 SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = $1 AND metric = $2
541 )`
542 _, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
543 return err
544 }
545
546 func NewPostgresSinkMigrator(ctx context.Context, connStr string) (db.Migrator, error) {
547 conn, err := pgxpool.New(ctx, connStr)
548 if err != nil {
549 return nil, err
550 }
551 pgw := &PostgresWriter{
552 ctx: ctx,
553 sinkDb: conn,
554 }
555 exists, err := db.DoesSchemaExist(ctx, conn, "admin")
556 if err != nil {
557 return nil, err
558 }
559 if exists {
560 return pgw, nil
561 }
562 for _, sql := range metricSchemaSQLs {
563 if _, err = conn.Exec(ctx, sql); err != nil {
564 return nil, err
565 }
566 }
567 return pgw, nil
568 }
569
570 var initMigrator = func(pgw *PostgresWriter) (*migrator.Migrator, error) {
571 return migrator.New(
572 migrator.TableName("admin.migration"),
573 migrator.SetNotice(func(s string) {
574 log.GetLogger(pgw.ctx).Info(s)
575 }),
576 migrations(),
577 )
578 }
579
580
581 func (pgw *PostgresWriter) Migrate() error {
582 m, err := initMigrator(pgw)
583 if err != nil {
584 return fmt.Errorf("cannot initialize migration: %w", err)
585 }
586 return m.Migrate(pgw.ctx, pgw.sinkDb)
587 }
588
589
590 func (pgw *PostgresWriter) NeedsMigration() (bool, error) {
591 m, err := initMigrator(pgw)
592 if err != nil {
593 return false, err
594 }
595 return m.NeedUpgrade(pgw.ctx, pgw.sinkDb)
596 }
597
598
599 const MigrationsCount = 1
600
601
602 var migrations func() migrator.Option = func() migrator.Option {
603 return migrator.Migrations(
604 &migrator.Migration{
605 Name: "01110 Apply postgres sink schema migrations",
606 Func: func(context.Context, pgx.Tx) error {
607
608 return nil
609 },
610 },
611
612 &migrator.Migration{
613 Name: "01180 Apply admin functions migrations for v5",
614 Func: func(ctx context.Context, tx pgx.Tx) error {
615 _, err := tx.Exec(ctx, `
616 DROP FUNCTION IF EXISTS admin.ensure_partition_metric_dbname_time;
617 DROP FUNCTION IF EXISTS admin.ensure_partition_metric_time;
618 DROP FUNCTION IF EXISTS admin.get_old_time_partitions(integer, text);
619 DROP FUNCTION IF EXISTS admin.drop_old_time_partitions(integer, boolean, text);
620 `)
621 if err != nil {
622 return err
623 }
624
625 _, err = tx.Exec(ctx, sqlMetricEnsurePartitionPostgres)
626 if err != nil {
627 return err
628 }
629 _, err = tx.Exec(ctx, sqlMetricAdminFunctions)
630 return err
631 },
632 },
633
634
635
636
637
638
639
640
641
642 )
643 }
644