...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/multiwriter.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"sync"
     8  	"strings"
     9  
    10  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    11  )
    12  
    13  // Writer is an interface that writes metrics values
    14  type Writer interface {
    15  	SyncMetric(dbUnique, metricName string, op SyncOp) error
    16  	Write(msgs metrics.MeasurementEnvelope) error
    17  }
    18  
    19  // MetricDefiner is an interface for passing metric definitions to a sink.
    20  type MetricsDefiner interface {
    21  	DefineMetrics(metric *metrics.Metrics) error
    22  }
    23  
    24  // MultiWriter ensures the simultaneous storage of data in several storages.
    25  type MultiWriter struct {
    26  	writers []Writer
    27  	sync.Mutex
    28  }
    29  
    30  // NewSinkWriter creates and returns new instance of MultiWriter struct.
    31  func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error) {
    32  	if len(opts.Sinks) == 0 {
    33  		return nil, errors.New("no sinks specified for measurements")
    34  	}
    35  	mw := &MultiWriter{}
    36  	for _, s := range opts.Sinks {
    37  		scheme, path, found := strings.Cut(s, "://")
    38  		if !found || scheme == "" || path == "" {
    39  			return nil, fmt.Errorf("malformed sink URI %s", s)
    40  		}
    41  		switch scheme {
    42  		case "jsonfile":
    43  			w, err = NewJSONWriter(ctx, path)
    44  		case "postgres", "postgresql":
    45  			w, err = NewPostgresWriter(ctx, s, opts)
    46  		case "prometheus":
    47  			w, err = NewPrometheusWriter(ctx, path)
    48  		case "rpc":
    49  			w, err = NewRPCWriter(ctx, s)
    50  		default:
    51  			return nil, fmt.Errorf("unknown schema %s in sink URI %s", scheme, s)
    52  		}
    53  		if err != nil {
    54  			return nil, err
    55  		}
    56  		mw.AddWriter(w)
    57  	}
    58  	if len(mw.writers) == 1 {
    59  		return mw.writers[0], nil
    60  	}
    61  	return mw, nil
    62  }
    63  
    64  func (mw *MultiWriter) AddWriter(w Writer) {
    65  	mw.Lock()
    66  	mw.writers = append(mw.writers, w)
    67  	mw.Unlock()
    68  }
    69  
    70  func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
    71  	for _, w := range mw.writers {
    72  		if definer, ok := w.(MetricsDefiner); ok {
    73  			err = errors.Join(err, definer.DefineMetrics(metrics))
    74  		}
    75  	}
    76  	return nil
    77  }
    78  
    79  func (mw *MultiWriter) SyncMetric(dbUnique, metricName string, op SyncOp) (err error) {
    80  	for _, w := range mw.writers {
    81  		err = errors.Join(err, w.SyncMetric(dbUnique, metricName, op))
    82  	}
    83  	return
    84  }
    85  
    86  func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error) {
    87  	for _, w := range mw.writers {
    88  		err = errors.Join(err, w.Write(msg))
    89  	}
    90  	return
    91  }
    92