...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/sinks/multiwriter.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"strings"
     8  	"sync"
     9  
    10  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    11  )
    12  
    13  // Writer is an interface that writes metrics values
    14  type Writer interface {
    15  	SyncMetric(sourceName, metricName string, op SyncOp) error
    16  	Write(msgs metrics.MeasurementEnvelope) error
    17  }
    18  
    19  // MetricDefiner is an interface for passing metric definitions to a sink.
    20  type MetricsDefiner interface {
    21  	DefineMetrics(metrics *metrics.Metrics) error
    22  }
    23  
    24  // MultiWriter ensures the simultaneous storage of data in several storages.
    25  type MultiWriter struct {
    26  	writers []Writer
    27  	sync.Mutex
    28  }
    29  
    30  // NewSinkWriter creates and returns new instance of MultiWriter struct.
    31  func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error) {
    32  	if len(opts.Sinks) == 0 {
    33  		return nil, errors.New("no sinks specified for measurements")
    34  	}
    35  	mw := &MultiWriter{}
    36  	for _, sinkConnStr := range opts.Sinks {
    37  		scheme, target, found := strings.Cut(sinkConnStr, "://")
    38  		if !found || scheme == "" || target == "" {
    39  			return nil, fmt.Errorf("malformed sink URI %s", sinkConnStr)
    40  		}
    41  		switch scheme {
    42  		case "jsonfile":
    43  			w, err = NewJSONWriter(ctx, target)
    44  		case "postgres", "postgresql":
    45  			w, err = NewPostgresWriter(ctx, sinkConnStr, opts)
    46  		case "prometheus":
    47  			w, err = NewPrometheusWriter(ctx, target)
    48  		case "rpc", "grpc":
    49  			w, err = NewRPCWriter(ctx, sinkConnStr)
    50  		default:
    51  			return nil, fmt.Errorf("unknown schema %s in sink URI %s", scheme, sinkConnStr)
    52  		}
    53  		if err != nil {
    54  			return nil, err
    55  		}
    56  		mw.AddWriter(w)
    57  	}
    58  	if len(mw.writers) == 1 {
    59  		return mw.writers[0], nil
    60  	}
    61  	return mw, nil
    62  }
    63  
    64  func (mw *MultiWriter) Count() int {
    65  	return len(mw.writers)
    66  }
    67  
    68  func (mw *MultiWriter) AddWriter(w Writer) {
    69  	mw.Lock()
    70  	mw.writers = append(mw.writers, w)
    71  	mw.Unlock()
    72  }
    73  
    74  func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
    75  	for _, w := range mw.writers {
    76  		if definer, ok := w.(MetricsDefiner); ok {
    77  			err = errors.Join(err, definer.DefineMetrics(metrics))
    78  		}
    79  	}
    80  	return nil
    81  }
    82  
    83  func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error) {
    84  	for _, w := range mw.writers {
    85  		err = errors.Join(err, w.SyncMetric(sourceName, metricName, op))
    86  	}
    87  	return
    88  }
    89  
    90  func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error) {
    91  	for _, w := range mw.writers {
    92  		err = errors.Join(err, w.Write(msg))
    93  	}
    94  	return
    95  }
    96  
    97  // Migrator interface implementation for MultiWriter
    98  
    99  // Migrate runs migrations on all writers that support it
   100  func (mw *MultiWriter) Migrate() (err error) {
   101  	for _, w := range mw.writers {
   102  		if m, ok := w.(interface {
   103  			Migrate() error
   104  		}); ok {
   105  			err = errors.Join(err, m.Migrate())
   106  		}
   107  	}
   108  	return
   109  }
   110  
   111  // NeedsMigration checks if any writer needs migration
   112  func (mw *MultiWriter) NeedsMigration() (bool, error) {
   113  	for _, w := range mw.writers {
   114  		if m, ok := w.(interface {
   115  			NeedsMigration() (bool, error)
   116  		}); ok {
   117  			if needs, err := m.NeedsMigration(); err != nil {
   118  				return false, err
   119  			} else if needs {
   120  				return true, nil
   121  			}
   122  		}
   123  	}
   124  	return false, nil
   125  }
   126