...
1 package sinks
2
3 import (
4 "context"
5 "encoding/json"
6 "time"
7
8 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
9 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
10 "gopkg.in/natefinch/lumberjack.v2"
11 )
12
13
14
15
16
17 type JSONWriter struct {
18 ctx context.Context
19 lw *lumberjack.Logger
20 }
21
22 func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error) {
23 l := log.GetLogger(ctx).WithField("sink", "jsonfile").WithField("filename", fname)
24 ctx = log.WithLogger(ctx, l)
25 jw := &JSONWriter{
26 ctx: ctx,
27 lw: &lumberjack.Logger{Filename: fname, Compress: true},
28 }
29 go jw.watchCtx()
30 return jw, nil
31 }
32
33 func (jw *JSONWriter) Write(msgs []metrics.MeasurementEnvelope) error {
34 if jw.ctx.Err() != nil {
35 return jw.ctx.Err()
36 }
37 if len(msgs) == 0 {
38 return nil
39 }
40 enc := json.NewEncoder(jw.lw)
41 t1 := time.Now()
42 for _, msg := range msgs {
43 dataRow := map[string]any{
44 "metric": msg.MetricName,
45 "data": msg.Data,
46 "dbname": msg.DBName,
47 "custom_tags": msg.CustomTags,
48 }
49 if err := enc.Encode(dataRow); err != nil {
50 return err
51 }
52 }
53 diff := time.Since(t1)
54 log.GetLogger(jw.ctx).WithField("rows", len(msgs)).WithField("elapsed", diff).Info("measurements written")
55 return nil
56 }
57
58 func (jw *JSONWriter) watchCtx() {
59 <-jw.ctx.Done()
60 jw.lw.Close()
61 }
62
63 func (jw *JSONWriter) SyncMetric(_, _, _ string) error {
64 if jw.ctx.Err() != nil {
65 return jw.ctx.Err()
66 }
67
68 return nil
69 }
70