...
1 package sinks
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8 "strings"
9
10 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
11 )
12
13
14 type Writer interface {
15 SyncMetric(dbUnique, metricName string, op SyncOp) error
16 Write(msgs metrics.MeasurementEnvelope) error
17 }
18
19
20 type MetricsDefiner interface {
21 DefineMetrics(metric *metrics.Metrics) error
22 }
23
24
25 type MultiWriter struct {
26 writers []Writer
27 sync.Mutex
28 }
29
30
31 func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error) {
32 if len(opts.Sinks) == 0 {
33 return nil, errors.New("no sinks specified for measurements")
34 }
35 mw := &MultiWriter{}
36 for _, s := range opts.Sinks {
37 scheme, path, found := strings.Cut(s, "://")
38 if !found || scheme == "" || path == "" {
39 return nil, fmt.Errorf("malformed sink URI %s", s)
40 }
41 switch scheme {
42 case "jsonfile":
43 w, err = NewJSONWriter(ctx, path)
44 case "postgres", "postgresql":
45 w, err = NewPostgresWriter(ctx, s, opts)
46 case "prometheus":
47 w, err = NewPrometheusWriter(ctx, path)
48 case "rpc":
49 w, err = NewRPCWriter(ctx, s)
50 default:
51 return nil, fmt.Errorf("unknown schema %s in sink URI %s", scheme, s)
52 }
53 if err != nil {
54 return nil, err
55 }
56 mw.AddWriter(w)
57 }
58 if len(mw.writers) == 1 {
59 return mw.writers[0], nil
60 }
61 return mw, nil
62 }
63
64 func (mw *MultiWriter) AddWriter(w Writer) {
65 mw.Lock()
66 mw.writers = append(mw.writers, w)
67 mw.Unlock()
68 }
69
70 func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
71 for _, w := range mw.writers {
72 if definer, ok := w.(MetricsDefiner); ok {
73 err = errors.Join(err, definer.DefineMetrics(metrics))
74 }
75 }
76 return nil
77 }
78
79 func (mw *MultiWriter) SyncMetric(dbUnique, metricName string, op SyncOp) (err error) {
80 for _, w := range mw.writers {
81 err = errors.Join(err, w.SyncMetric(dbUnique, metricName, op))
82 }
83 return
84 }
85
86 func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error) {
87 for _, w := range mw.writers {
88 err = errors.Join(err, w.Write(msg))
89 }
90 return
91 }
92