...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/json.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"time"
     6  
     7  	jsoniter "github.com/json-iterator/go"
     8  
     9  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    10  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    11  	"gopkg.in/natefinch/lumberjack.v2"
    12  )
    13  
    14  // JSONWriter is a sink that writes metric measurements to a file in JSON format.
    15  // It supports compression and rotation of output files. The default rotation is based on the file size (100Mb).
    16  // JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems,
    17  // such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.
    18  type JSONWriter struct {
    19  	ctx context.Context
    20  	lw  *lumberjack.Logger
    21  	enc *jsoniter.Encoder
    22  }
    23  
    24  func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error) {
    25  	l := log.GetLogger(ctx).WithField("sink", "jsonfile").WithField("filename", fname)
    26  	ctx = log.WithLogger(ctx, l)
    27  	jw := &JSONWriter{
    28  		ctx: ctx,
    29  		lw:  &lumberjack.Logger{Filename: fname, Compress: true},
    30  	}
    31  	jw.enc = jsoniter.ConfigFastest.NewEncoder(jw.lw)
    32  	go jw.watchCtx()
    33  	return jw, nil
    34  }
    35  
    36  func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error {
    37  	if jw.ctx.Err() != nil {
    38  		return jw.ctx.Err()
    39  	}
    40  	if len(msg.Data) == 0 {
    41  		return nil
    42  	}
    43  	t1 := time.Now()
    44  	written := 0
    45  
    46  	dataRow := map[string]any{
    47  		"metric":      msg.MetricName,
    48  		"data":        msg.Data,
    49  		"dbname":      msg.DBName,
    50  		"custom_tags": msg.CustomTags,
    51  	}
    52  	if err := jw.enc.Encode(dataRow); err != nil {
    53  		return err
    54  	}
    55  	written += len(msg.Data)
    56  
    57  	diff := time.Since(t1)
    58  	log.GetLogger(jw.ctx).WithField("rows", written).WithField("elapsed", diff).Info("measurements written")
    59  	return nil
    60  }
    61  
    62  func (jw *JSONWriter) watchCtx() {
    63  	<-jw.ctx.Done()
    64  	jw.lw.Close()
    65  }
    66  
    67  func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error {
    68  	if jw.ctx.Err() != nil {
    69  		return jw.ctx.Err()
    70  	}
    71  	// do nothing, we don't care
    72  	return nil
    73  }
    74