...
1 package sinks
2
3 import (
4 "context"
5 "time"
6
7 jsoniter "github.com/json-iterator/go"
8
9 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
10 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
11 "gopkg.in/natefinch/lumberjack.v2"
12 )
13
14
15
16
17
18 type JSONWriter struct {
19 ctx context.Context
20 lw *lumberjack.Logger
21 enc *jsoniter.Encoder
22 }
23
24 func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error) {
25 l := log.GetLogger(ctx).WithField("sink", "jsonfile").WithField("filename", fname)
26 ctx = log.WithLogger(ctx, l)
27 jw := &JSONWriter{
28 ctx: ctx,
29 lw: &lumberjack.Logger{Filename: fname, Compress: true},
30 }
31 jw.enc = jsoniter.ConfigFastest.NewEncoder(jw.lw)
32 go jw.watchCtx()
33 return jw, nil
34 }
35
36 func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error {
37 if jw.ctx.Err() != nil {
38 return jw.ctx.Err()
39 }
40 if len(msg.Data) == 0 {
41 return nil
42 }
43 t1 := time.Now()
44 written := 0
45
46 dataRow := map[string]any{
47 "metric": msg.MetricName,
48 "data": msg.Data,
49 "dbname": msg.DBName,
50 "custom_tags": msg.CustomTags,
51 }
52 if err := jw.enc.Encode(dataRow); err != nil {
53 return err
54 }
55 written += len(msg.Data)
56
57 diff := time.Since(t1)
58 log.GetLogger(jw.ctx).WithField("rows", written).WithField("elapsed", diff).Info("measurements written")
59 return nil
60 }
61
62 func (jw *JSONWriter) watchCtx() {
63 <-jw.ctx.Done()
64 jw.lw.Close()
65 }
66
67 func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error {
68 if jw.ctx.Err() != nil {
69 return jw.ctx.Err()
70 }
71
72 return nil
73 }
74