...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/json.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"time"
     7  
     8  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
     9  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    10  	"gopkg.in/natefinch/lumberjack.v2"
    11  )
    12  
    13  // JSONWriter is a sink that writes metric measurements to a file in JSON format.
    14  // It supports compression and rotation of output files. The default rotation is based on the file size (100Mb).
    15  // JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems,
    16  // such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.
    17  type JSONWriter struct {
    18  	ctx context.Context
    19  	lw  *lumberjack.Logger
    20  }
    21  
    22  func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error) {
    23  	l := log.GetLogger(ctx).WithField("sink", "jsonfile").WithField("filename", fname)
    24  	ctx = log.WithLogger(ctx, l)
    25  	jw := &JSONWriter{
    26  		ctx: ctx,
    27  		lw:  &lumberjack.Logger{Filename: fname, Compress: true},
    28  	}
    29  	go jw.watchCtx()
    30  	return jw, nil
    31  }
    32  
    33  func (jw *JSONWriter) Write(msgs []metrics.MeasurementEnvelope) error {
    34  	if jw.ctx.Err() != nil {
    35  		return jw.ctx.Err()
    36  	}
    37  	if len(msgs) == 0 {
    38  		return nil
    39  	}
    40  	enc := json.NewEncoder(jw.lw)
    41  	t1 := time.Now()
    42  	for _, msg := range msgs {
    43  		dataRow := map[string]any{
    44  			"metric":      msg.MetricName,
    45  			"data":        msg.Data,
    46  			"dbname":      msg.DBName,
    47  			"custom_tags": msg.CustomTags,
    48  		}
    49  		if err := enc.Encode(dataRow); err != nil {
    50  			return err
    51  		}
    52  	}
    53  	diff := time.Since(t1)
    54  	log.GetLogger(jw.ctx).WithField("rows", len(msgs)).WithField("elapsed", diff).Info("measurements written")
    55  	return nil
    56  }
    57  
    58  func (jw *JSONWriter) watchCtx() {
    59  	<-jw.ctx.Done()
    60  	jw.lw.Close()
    61  }
    62  
    63  func (jw *JSONWriter) SyncMetric(_, _, _ string) error {
    64  	if jw.ctx.Err() != nil {
    65  		return jw.ctx.Err()
    66  	}
    67  	// do nothing, we don't care
    68  	return nil
    69  }
    70