...
1 package sinks
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "sync"
9
10 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
11 )
12
13
14 type Writer interface {
15 SyncMetric(dbUnique, metricName, op string) error
16 Write(msgs []metrics.MeasurementEnvelope) error
17 }
18
19
20 type MultiWriter struct {
21 writers []Writer
22 sync.Mutex
23 }
24
25
26 func NewSinkWriter(ctx context.Context, opts *CmdOpts, metricDefs *metrics.Metrics) (w Writer, err error) {
27 if len(opts.Sinks) == 0 {
28 return nil, errors.New("no sinks specified for measurements")
29 }
30 mw := &MultiWriter{}
31 for _, s := range opts.Sinks {
32 scheme, path, found := strings.Cut(s, "://")
33 if !found || scheme == "" || path == "" {
34 return nil, fmt.Errorf("malformed sink URI %s", s)
35 }
36 switch scheme {
37 case "jsonfile":
38 w, err = NewJSONWriter(ctx, path)
39 case "postgres", "postgresql":
40 w, err = NewPostgresWriter(ctx, s, opts, metricDefs)
41 case "prometheus":
42 w, err = NewPrometheusWriter(ctx, path)
43 case "rpc":
44 w, err = NewRPCWriter(ctx, path)
45 default:
46 return nil, fmt.Errorf("unknown schema %s in sink URI %s", scheme, s)
47 }
48 if err != nil {
49 return nil, err
50 }
51 mw.AddWriter(w)
52 }
53 if len(mw.writers) == 1 {
54 return mw.writers[0], nil
55 }
56 return mw, nil
57 }
58
59 func (mw *MultiWriter) AddWriter(w Writer) {
60 mw.Lock()
61 mw.writers = append(mw.writers, w)
62 mw.Unlock()
63 }
64
65 func (mw *MultiWriter) SyncMetric(dbUnique, metricName, op string) (err error) {
66 for _, w := range mw.writers {
67 err = errors.Join(err, w.SyncMetric(dbUnique, metricName, op))
68 }
69 return
70 }
71
72 func (mw *MultiWriter) Write(msgs []metrics.MeasurementEnvelope) (err error) {
73 for _, w := range mw.writers {
74 err = errors.Join(err, w.Write(msgs))
75 }
76 return
77 }
78