1 package sinks
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "sync"
9
10 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
11 )
12
13
14 type Writer interface {
15 SyncMetric(sourceName, metricName string, op SyncOp) error
16 Write(msgs metrics.MeasurementEnvelope) error
17 }
18
19
20 type MetricsDefiner interface {
21 DefineMetrics(metrics *metrics.Metrics) error
22 }
23
24
25 type MultiWriter struct {
26 writers []Writer
27 sync.Mutex
28 }
29
30
31 func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error) {
32 if len(opts.Sinks) == 0 {
33 return nil, errors.New("no sinks specified for measurements")
34 }
35 mw := &MultiWriter{}
36 for _, sinkConnStr := range opts.Sinks {
37 scheme, target, found := strings.Cut(sinkConnStr, "://")
38 if !found || scheme == "" || target == "" {
39 return nil, fmt.Errorf("malformed sink URI %s", sinkConnStr)
40 }
41 switch scheme {
42 case "jsonfile":
43 w, err = NewJSONWriter(ctx, target)
44 case "postgres", "postgresql":
45 w, err = NewPostgresWriter(ctx, sinkConnStr, opts)
46 case "prometheus":
47 w, err = NewPrometheusWriter(ctx, target)
48 case "rpc", "grpc":
49 w, err = NewRPCWriter(ctx, sinkConnStr)
50 default:
51 return nil, fmt.Errorf("unknown schema %s in sink URI %s", scheme, sinkConnStr)
52 }
53 if err != nil {
54 return nil, err
55 }
56 mw.AddWriter(w)
57 }
58 if len(mw.writers) == 1 {
59 return mw.writers[0], nil
60 }
61 return mw, nil
62 }
63
64 func (mw *MultiWriter) Count() int {
65 return len(mw.writers)
66 }
67
68 func (mw *MultiWriter) AddWriter(w Writer) {
69 mw.Lock()
70 mw.writers = append(mw.writers, w)
71 mw.Unlock()
72 }
73
74 func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
75 for _, w := range mw.writers {
76 if definer, ok := w.(MetricsDefiner); ok {
77 err = errors.Join(err, definer.DefineMetrics(metrics))
78 }
79 }
80 return nil
81 }
82
83 func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error) {
84 for _, w := range mw.writers {
85 err = errors.Join(err, w.SyncMetric(sourceName, metricName, op))
86 }
87 return
88 }
89
90 func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error) {
91 for _, w := range mw.writers {
92 err = errors.Join(err, w.Write(msg))
93 }
94 return
95 }
96
97
98
99
100 func (mw *MultiWriter) Migrate() (err error) {
101 for _, w := range mw.writers {
102 if m, ok := w.(interface {
103 Migrate() error
104 }); ok {
105 err = errors.Join(err, m.Migrate())
106 }
107 }
108 return
109 }
110
111
112 func (mw *MultiWriter) NeedsMigration() (bool, error) {
113 for _, w := range mw.writers {
114 if m, ok := w.(interface {
115 NeedsMigration() (bool, error)
116 }); ok {
117 if needs, err := m.NeedsMigration(); err != nil {
118 return false, err
119 } else if needs {
120 return true, nil
121 }
122 }
123 }
124 return false, nil
125 }
126