...
1 package cmdopts
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7
8 "github.com/cybertec-postgresql/pgwatch/v5/internal/db"
9 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
10 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
11 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
12 )
13
14 type ConfigCommand struct {
15 owner *Options
16 Init ConfigInitCommand `command:"init" description:"Initialize configuration"`
17 Upgrade ConfigUpgradeCommand `command:"upgrade" description:"Upgrade configuration schema"`
18 }
19
20 func NewConfigCommand(owner *Options) *ConfigCommand {
21 return &ConfigCommand{
22 owner: owner,
23 Init: ConfigInitCommand{owner: owner},
24 Upgrade: ConfigUpgradeCommand{owner: owner},
25 }
26 }
27
28 type ConfigInitCommand struct {
29 owner *Options
30 }
31
32
33 func (cmd *ConfigInitCommand) Execute([]string) (err error) {
34 if err = cmd.owner.ValidateConfig(); err != nil {
35 return
36 }
37 if cmd.owner.Metrics.Metrics > "" {
38 err = cmd.InitMetrics()
39 }
40 if cmd.owner.Sources.Sources > "" && cmd.owner.Metrics.Metrics != cmd.owner.Sources.Sources {
41 err = errors.Join(err, cmd.InitSources())
42 }
43 if len(cmd.owner.Sinks.Sinks) > 0 {
44 err = errors.Join(err, cmd.InitSinks())
45 }
46 cmd.owner.CompleteCommand(map[bool]int32{
47 true: ExitCodeOK,
48 false: ExitCodeConfigError,
49 }[err == nil])
50 return
51 }
52
53
54 func (cmd *ConfigInitCommand) InitSources() (err error) {
55 ctx := context.Background()
56 opts := cmd.owner
57 if opts.IsPgConnStr(opts.Sources.Sources) {
58 return opts.InitSourceReader(ctx)
59 }
60 rw, _ := sources.NewYAMLSourcesReaderWriter(ctx, opts.Sources.Sources)
61 return rw.WriteSources(sources.Sources{sources.Source{}})
62 }
63
64
65 func (cmd *ConfigInitCommand) InitMetrics() (err error) {
66 ctx := context.Background()
67 opts := cmd.owner
68 err = opts.InitMetricReader(ctx)
69 if err != nil || opts.IsPgConnStr(opts.Metrics.Metrics) {
70 return
71 }
72 reader, _ := metrics.NewYAMLMetricReaderWriter(ctx, "")
73 defMetrics, _ := reader.GetMetrics()
74 return opts.MetricsReaderWriter.WriteMetrics(defMetrics)
75 }
76
77
78 func (cmd *ConfigInitCommand) InitSinks() (err error) {
79 ctx := context.Background()
80 opts := cmd.owner
81 return opts.InitSinkWriter(ctx)
82 }
83
84 type ConfigUpgradeCommand struct {
85 owner *Options
86 }
87
88
89 func (cmd *ConfigUpgradeCommand) Execute([]string) (err error) {
90 opts := cmd.owner
91
92 if len(opts.Sources.Sources)+len(opts.Metrics.Metrics)+len(opts.Sinks.Sinks) == 0 {
93 opts.CompleteCommand(ExitCodeConfigError)
94 return errors.New("at least one of --sources, --metrics, or --sink must be specified")
95 }
96
97 ctx := context.Background()
98
99 f := func(uri string, newMigratorFunc func() (any, error)) error {
100 if uri == "" {
101 return nil
102 }
103 if !opts.IsPgConnStr(uri) {
104 return fmt.Errorf("cannot upgrade storage %s: %w", uri, errors.ErrUnsupported)
105 }
106 m, initErr := newMigratorFunc()
107 if initErr != nil {
108 return initErr
109 }
110 return m.(db.Migrator).Migrate()
111
112 }
113
114 err = f(opts.Sources.Sources, func() (any, error) {
115 return sources.NewPostgresSourcesReaderWriter(ctx, opts.Sources.Sources)
116 })
117
118 err = errors.Join(err, f(opts.Metrics.Metrics, func() (any, error) {
119 return metrics.NewPostgresMetricReaderWriter(ctx, opts.Metrics.Metrics)
120 }))
121
122 for _, uri := range opts.Sinks.Sinks {
123 err = errors.Join(err, f(uri, func() (any, error) {
124 return sinks.NewPostgresSinkMigrator(ctx, uri)
125 }))
126 }
127
128 if err == nil {
129 opts.CompleteCommand(ExitCodeOK)
130 return nil
131 }
132
133
134 allUnsupported := true
135 for _, e := range err.(interface{ Unwrap() []error }).Unwrap() {
136 if !errors.Is(e, errors.ErrUnsupported) {
137 allUnsupported = false
138 break
139 }
140 fmt.Fprintln(opts.OutputWriter, e)
141 }
142
143 if allUnsupported {
144 opts.CompleteCommand(ExitCodeOK)
145 return nil
146 }
147 opts.CompleteCommand(ExitCodeConfigError)
148 return err
149 }
150