1 package sinks
2
3 import (
4 "context"
5 _ "embed"
6 "errors"
7 "fmt"
8 "maps"
9 "slices"
10 "strings"
11 "time"
12
13 jsoniter "github.com/json-iterator/go"
14
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/db"
16 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
17 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
18 "github.com/jackc/pgx/v5"
19 "github.com/jackc/pgx/v5/pgconn"
20 )
21
22 var (
23 cacheLimit = 256
24 highLoadTimeout = time.Second * 5
25 targetColumns = [...]string{"time", "dbname", "data", "tag_data"}
26 )
27
28 func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
29 var conn db.PgxPoolIface
30 if conn, err = db.New(ctx, connstr); err != nil {
31 return
32 }
33 return NewWriterFromPostgresConn(ctx, conn, opts)
34 }
35
36 func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error) {
37 l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
38 ctx = log.WithLogger(ctx, l)
39 pgw = &PostgresWriter{
40 ctx: ctx,
41 opts: opts,
42 input: make(chan metrics.MeasurementEnvelope, cacheLimit),
43 lastError: make(chan error),
44 sinkDb: conn,
45 }
46 var runDeleteOldPartitions bool
47 if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
48 if err = conn.QueryRow(ctx, "SELECT $1::interval > '0'::interval", opts.Retention).Scan(&runDeleteOldPartitions); err != nil {
49 return err
50 }
51 l.Info("initialising measurements database...")
52 exists, err := db.DoesSchemaExist(ctx, conn, "admin")
53 if err != nil || exists {
54 return err
55 }
56 for _, sql := range metricSchemaSQLs {
57 if _, err = conn.Exec(ctx, sql); err != nil {
58 return err
59 }
60 }
61 return nil
62 }); err != nil {
63 return
64 }
65 if err = pgw.ReadMetricSchemaType(); err != nil {
66 return
67 }
68 if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
69 return
70 }
71 if runDeleteOldPartitions {
72 go pgw.deleteOldPartitions()
73 }
74 go pgw.maintainUniqueSources()
75 go pgw.poll()
76 l.Info(`measurements sink is activated`)
77 return
78 }
79
80
81 var sqlMetricAdminSchema string
82
83
84 var sqlMetricAdminFunctions string
85
86
87 var sqlMetricEnsurePartitionPostgres string
88
89
90 var sqlMetricEnsurePartitionTimescale string
91
92
93 var sqlMetricChangeChunkIntervalTimescale string
94
95
96 var sqlMetricChangeCompressionIntervalTimescale string
97
98 var (
99 metricSchemaSQLs = []string{
100 sqlMetricAdminSchema,
101 sqlMetricAdminFunctions,
102 sqlMetricEnsurePartitionPostgres,
103 sqlMetricEnsurePartitionTimescale,
104 sqlMetricChangeChunkIntervalTimescale,
105 sqlMetricChangeCompressionIntervalTimescale,
106 }
107 )
108
109
110
111
112
113 type PostgresWriter struct {
114 ctx context.Context
115 sinkDb db.PgxPoolIface
116 metricSchema DbStorageSchemaType
117 opts *CmdOpts
118 input chan metrics.MeasurementEnvelope
119 lastError chan error
120 }
121
122 type ExistingPartitionInfo struct {
123 StartTime time.Time
124 EndTime time.Time
125 }
126
127 type MeasurementMessagePostgres struct {
128 Time time.Time
129 DBName string
130 Metric string
131 Data map[string]any
132 TagData map[string]string
133 }
134
135 type DbStorageSchemaType int
136
137 const (
138 DbStorageSchemaPostgres DbStorageSchemaType = iota
139 DbStorageSchemaTimescale
140 )
141
142 func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
143 var isTs bool
144 pgw.metricSchema = DbStorageSchemaPostgres
145 sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
146 if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
147 pgw.metricSchema = DbStorageSchemaTimescale
148 }
149 return
150 }
151
152 var (
153 forceRecreatePartitions = false
154 partitionMapMetric = make(map[string]ExistingPartitionInfo)
155 partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo)
156 )
157
158
159 func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
160 if op == AddOp {
161 return errors.Join(
162 pgw.AddDBUniqueMetricToListingTable(sourceName, metricName),
163 pgw.EnsureMetricDummy(metricName),
164 )
165 }
166 return nil
167 }
168
169
170 func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
171 for _, name := range metrics.GetDefaultBuiltInMetrics() {
172 err = errors.Join(err, pgw.EnsureMetricDummy(name))
173 }
174 return
175 }
176
177
178 func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
179 _, err = pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.ensure_dummy_metrics_table($1)", metric)
180 return
181 }
182
183
184 func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error {
185 if pgw.ctx.Err() != nil {
186 return pgw.ctx.Err()
187 }
188 select {
189 case pgw.input <- msg:
190
191 case <-time.After(highLoadTimeout):
192
193 }
194 select {
195 case err := <-pgw.lastError:
196 return err
197 default:
198 return nil
199 }
200 }
201
202
203 func (pgw *PostgresWriter) poll() {
204 cache := make([]metrics.MeasurementEnvelope, 0, cacheLimit)
205 cacheTimeout := pgw.opts.BatchingDelay
206 tick := time.NewTicker(cacheTimeout)
207 for {
208 select {
209 case <-pgw.ctx.Done():
210 return
211 default:
212 select {
213 case entry := <-pgw.input:
214 cache = append(cache, entry)
215 if len(cache) < cacheLimit {
216 break
217 }
218 tick.Stop()
219 pgw.flush(cache)
220 cache = cache[:0]
221 tick = time.NewTicker(cacheTimeout)
222 case <-tick.C:
223 pgw.flush(cache)
224 cache = cache[:0]
225 case <-pgw.ctx.Done():
226 return
227 }
228 }
229 }
230 }
231
232 func newCopyFromMeasurements(rows []metrics.MeasurementEnvelope) *copyFromMeasurements {
233 return ©FromMeasurements{envelopes: rows, envelopeIdx: -1, measurementIdx: -1}
234 }
235
236 type copyFromMeasurements struct {
237 envelopes []metrics.MeasurementEnvelope
238 envelopeIdx int
239 measurementIdx int
240 metricName string
241 }
242
243 func (c *copyFromMeasurements) NextEnvelope() bool {
244 c.envelopeIdx++
245 c.measurementIdx = -1
246 return c.envelopeIdx < len(c.envelopes)
247 }
248
249 func (c *copyFromMeasurements) Next() bool {
250 for {
251
252 if c.envelopeIdx < 0 || c.measurementIdx+1 >= len(c.envelopes[c.envelopeIdx].Data) {
253
254 if ok := c.NextEnvelope(); !ok {
255 return false
256 }
257
258 if c.metricName == "" {
259 c.metricName = c.envelopes[c.envelopeIdx].MetricName
260 } else if c.metricName != c.envelopes[c.envelopeIdx].MetricName {
261
262
263 c.envelopeIdx--
264 c.measurementIdx = len(c.envelopes[c.envelopeIdx].Data)
265 c.metricName = ""
266 return false
267 }
268 }
269
270
271 c.measurementIdx++
272 if c.measurementIdx < len(c.envelopes[c.envelopeIdx].Data) {
273 return true
274 }
275
276 }
277 }
278
279 func (c *copyFromMeasurements) EOF() bool {
280 return c.envelopeIdx >= len(c.envelopes)
281 }
282
283 func (c *copyFromMeasurements) Values() ([]any, error) {
284 row := maps.Clone(c.envelopes[c.envelopeIdx].Data[c.measurementIdx])
285 tagRow := maps.Clone(c.envelopes[c.envelopeIdx].CustomTags)
286 if tagRow == nil {
287 tagRow = make(map[string]string)
288 }
289 for k, v := range row {
290 if after, ok := strings.CutPrefix(k, metrics.TagPrefix); ok {
291 tagRow[after] = fmt.Sprintf("%v", v)
292 delete(row, k)
293 }
294 }
295 jsonTags, terr := jsoniter.ConfigFastest.MarshalToString(tagRow)
296 json, err := jsoniter.ConfigFastest.MarshalToString(row)
297 if err != nil || terr != nil {
298 return nil, errors.Join(err, terr)
299 }
300 return []any{time.Unix(0, c.envelopes[c.envelopeIdx].Data.GetEpoch()), c.envelopes[c.envelopeIdx].DBName, json, jsonTags}, nil
301 }
302
303 func (c *copyFromMeasurements) Err() error {
304 return nil
305 }
306
307 func (c *copyFromMeasurements) MetricName() (ident pgx.Identifier) {
308 if c.envelopeIdx+1 < len(c.envelopes) {
309
310 ident = pgx.Identifier{c.envelopes[c.envelopeIdx+1].MetricName}
311 }
312 return
313 }
314
315
316 func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
317 if len(msgs) == 0 {
318 return
319 }
320 logger := log.GetLogger(pgw.ctx)
321
322 pgPartBounds := make(map[string]ExistingPartitionInfo)
323 pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo)
324 var err error
325
326 slices.SortFunc(msgs, func(a, b metrics.MeasurementEnvelope) int {
327 if a.MetricName < b.MetricName {
328 return -1
329 } else if a.MetricName > b.MetricName {
330 return 1
331 }
332 return 0
333 })
334
335 for _, msg := range msgs {
336 for _, dataRow := range msg.Data {
337 epochTime := time.Unix(0, metrics.Measurement(dataRow).GetEpoch())
338 switch pgw.metricSchema {
339 case DbStorageSchemaTimescale:
340
341 bounds, ok := pgPartBounds[msg.MetricName]
342 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
343 bounds.StartTime = epochTime
344 pgPartBounds[msg.MetricName] = bounds
345 }
346 if !ok || (ok && epochTime.After(bounds.EndTime)) {
347 bounds.EndTime = epochTime
348 pgPartBounds[msg.MetricName] = bounds
349 }
350 case DbStorageSchemaPostgres:
351 _, ok := pgPartBoundsDbName[msg.MetricName]
352 if !ok {
353 pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
354 }
355 bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
356 if !ok || (ok && epochTime.Before(bounds.StartTime)) {
357 bounds.StartTime = epochTime
358 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
359 }
360 if !ok || (ok && epochTime.After(bounds.EndTime)) {
361 bounds.EndTime = epochTime
362 pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
363 }
364 default:
365 logger.Fatal("unknown storage schema...")
366 }
367 }
368 }
369
370 switch pgw.metricSchema {
371 case DbStorageSchemaPostgres:
372 err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
373 case DbStorageSchemaTimescale:
374 err = pgw.EnsureMetricTimescale(pgPartBounds)
375 default:
376 logger.Fatal("unknown storage schema...")
377 }
378 forceRecreatePartitions = false
379 if err != nil {
380 pgw.lastError <- err
381 }
382
383 var rowsBatched, n int64
384 t1 := time.Now()
385 cfm := newCopyFromMeasurements(msgs)
386 for !cfm.EOF() {
387 n, err = pgw.sinkDb.CopyFrom(context.Background(), cfm.MetricName(), targetColumns[:], cfm)
388 rowsBatched += n
389 if err != nil {
390 logger.Error(err)
391 if PgError, ok := err.(*pgconn.PgError); ok {
392 forceRecreatePartitions = PgError.Code == "23514"
393 }
394 if forceRecreatePartitions {
395 logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
396 }
397 }
398 }
399 diff := time.Since(t1)
400 if err == nil {
401 logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
402 return
403 }
404 pgw.lastError <- err
405 }
406
407 func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error) {
408 logger := log.GetLogger(pgw.ctx)
409 sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
410 for metric := range pgPartBounds {
411 if _, ok := partitionMapMetric[metric]; !ok {
412 if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
413 logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
414 return err
415 }
416 partitionMapMetric[metric] = ExistingPartitionInfo{}
417 }
418 }
419 return
420 }
421
422 func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
423 var rows pgx.Rows
424 sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
425 for metric, dbnameTimestampMap := range metricDbnamePartBounds {
426 _, ok := partitionMapMetricDbname[metric]
427 if !ok {
428 partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
429 }
430
431 for dbname, pb := range dbnameTimestampMap {
432 if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
433 return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
434 }
435 partInfo, ok := partitionMapMetricDbname[metric][dbname]
436 if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
437 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
438 return
439 }
440 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
441 return err
442 }
443 partitionMapMetricDbname[metric][dbname] = partInfo
444 }
445 if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
446 if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
447 return
448 }
449 if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
450 return err
451 }
452 partitionMapMetricDbname[metric][dbname] = partInfo
453 }
454 }
455 }
456 return nil
457 }
458
459
460 func (pgw *PostgresWriter) deleteOldPartitions() {
461 l := log.GetLogger(pgw.ctx)
462 for {
463 var partsDropped int
464 err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
465 pgw.opts.Retention).Scan(&partsDropped)
466 if err != nil {
467 l.Error("Could not drop old time partitions:", err)
468 } else if partsDropped > 0 {
469 l.Infof("Dropped %d old time partitions", partsDropped)
470 }
471 select {
472 case <-pgw.ctx.Done():
473 return
474 case <-time.After(time.Hour * 12):
475 }
476 }
477 }
478
479
480
481 func (pgw *PostgresWriter) maintainUniqueSources() {
482 logger := log.GetLogger(pgw.ctx)
483
484 sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock`
485 sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
486 sqlDistinct := `
487 WITH RECURSIVE t(dbname) AS (
488 SELECT MIN(dbname) AS dbname FROM %s
489 UNION
490 SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
491 SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
492 sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
493 sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
494 sqlAdd := `
495 INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
496 WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
497 RETURNING *`
498
499 for {
500 select {
501 case <-pgw.ctx.Done():
502 return
503 case <-time.After(time.Hour * 24):
504 }
505 var lock bool
506 logger.Infof("Trying to get metricsDb listing maintainer advisory lock...")
507 if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
508 logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
509 continue
510 }
511 if !lock {
512 logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
513 continue
514 }
515
516 logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
517 rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
518 allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
519 if err != nil {
520 logger.Error(err)
521 continue
522 }
523
524 for _, tableName := range allDistinctMetricTables {
525 foundDbnamesMap := make(map[string]bool)
526 foundDbnamesArr := make([]string, 0)
527 metricName := strings.Replace(tableName, "public.", "", 1)
528
529 logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
530 rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
531 ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
532
533 if err != nil {
534 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
535 break
536 }
537 for _, drDbname := range ret {
538 foundDbnamesMap[drDbname] = true
539 }
540
541
542 for k := range foundDbnamesMap {
543 foundDbnamesArr = append(foundDbnamesArr, k)
544 }
545 if len(foundDbnamesArr) == 0 {
546 logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
547
548 _, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
549 if err != nil {
550 logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
551 }
552 continue
553 }
554 cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
555 if err != nil {
556 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
557 } else if cmdTag.RowsAffected() > 0 {
558 logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
559 }
560 cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
561 if err != nil {
562 logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
563 } else if cmdTag.RowsAffected() > 0 {
564 logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
565 }
566 time.Sleep(time.Minute)
567 }
568
569 }
570 }
571
572 func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
573 sql := `INSERT INTO admin.all_distinct_dbname_metrics
574 SELECT $1, $2
575 WHERE NOT EXISTS (
576 SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = $1 AND metric = $2
577 )`
578 _, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
579 return err
580 }
581