...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/sinks/multiwriter.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"strings"
     8  	"sync"
     9  
    10  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    11  )
    12  
    13  // Writer is an interface that writes metrics values
    14  type Writer interface {
    15  	SyncMetric(dbUnique, metricName, op string) error
    16  	Write(msgs []metrics.MeasurementEnvelope) error
    17  }
    18  
    19  // MultiWriter ensures the simultaneous storage of data in several storages.
    20  type MultiWriter struct {
    21  	writers []Writer
    22  	sync.Mutex
    23  }
    24  
    25  // NewSinkWriter creates and returns new instance of MultiWriter struct.
    26  func NewSinkWriter(ctx context.Context, opts *CmdOpts, metricDefs *metrics.Metrics) (w Writer, err error) {
    27  	if len(opts.Sinks) == 0 {
    28  		return nil, errors.New("no sinks specified for measurements")
    29  	}
    30  	mw := &MultiWriter{}
    31  	for _, s := range opts.Sinks {
    32  		scheme, path, found := strings.Cut(s, "://")
    33  		if !found || scheme == "" || path == "" {
    34  			return nil, fmt.Errorf("malformed sink URI %s", s)
    35  		}
    36  		switch scheme {
    37  		case "jsonfile":
    38  			w, err = NewJSONWriter(ctx, path)
    39  		case "postgres", "postgresql":
    40  			w, err = NewPostgresWriter(ctx, s, opts, metricDefs)
    41  		case "prometheus":
    42  			w, err = NewPrometheusWriter(ctx, path)
    43  		case "rpc":
    44  			w, err = NewRPCWriter(ctx, path)
    45  		default:
    46  			return nil, fmt.Errorf("unknown schema %s in sink URI %s", scheme, s)
    47  		}
    48  		if err != nil {
    49  			return nil, err
    50  		}
    51  		mw.AddWriter(w)
    52  	}
    53  	if len(mw.writers) == 1 {
    54  		return mw.writers[0], nil
    55  	}
    56  	return mw, nil
    57  }
    58  
    59  func (mw *MultiWriter) AddWriter(w Writer) {
    60  	mw.Lock()
    61  	mw.writers = append(mw.writers, w)
    62  	mw.Unlock()
    63  }
    64  
    65  func (mw *MultiWriter) SyncMetric(dbUnique, metricName, op string) (err error) {
    66  	for _, w := range mw.writers {
    67  		err = errors.Join(err, w.SyncMetric(dbUnique, metricName, op))
    68  	}
    69  	return
    70  }
    71  
    72  func (mw *MultiWriter) Write(msgs []metrics.MeasurementEnvelope) (err error) {
    73  	for _, w := range mw.writers {
    74  		err = errors.Join(err, w.Write(msgs))
    75  	}
    76  	return
    77  }
    78