1 package cmdopts
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "os"
9 "time"
10
11 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
12 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
13 "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/webserver"
16 "github.com/jackc/pgx/v5"
17 flags "github.com/jessevdk/go-flags"
18 )
19
20 const (
21 ExitCodeOK int32 = iota
22 ExitCodeConfigError
23 ExitCodeCmdError
24 ExitCodeWebUIError
25 ExitCodeUpgradeError
26 ExitCodeUserCancel
27 ExitCodeShutdownCommand
28 ExitCodeFatalError
29 )
30
31 type Kind int
32
33 const (
34 ConfigPgURL Kind = iota
35 ConfigFile
36 ConfigFolder
37 ConfigError
38 )
39
40
41 type Options struct {
42 Sources sources.CmdOpts `group:"Sources"`
43 Metrics metrics.CmdOpts `group:"Metrics"`
44 Sinks sinks.CmdOpts `group:"Sinks"`
45 Logging log.CmdOpts `group:"Logging"`
46 WebUI webserver.CmdOpts `group:"WebUI"`
47 Help bool
48
49 SourcesReaderWriter sources.ReaderWriter
50 MetricsReaderWriter metrics.ReaderWriter
51 SinksWriter sinks.Writer
52
53 ExitCode int32
54 CommandCompleted bool
55
56 OutputWriter io.Writer
57 }
58
59 func addCommands(parser *flags.Parser, opts *Options) {
60 _, _ = parser.AddCommand("metric", "Manage metrics", "", NewMetricCommand(opts))
61 _, _ = parser.AddCommand("source", "Manage sources", "", NewSourceCommand(opts))
62 _, _ = parser.AddCommand("config", "Manage configurations", "", NewConfigCommand(opts))
63 }
64
65
66
67
68
69
70 func New(writer io.Writer) (cmdOpts *Options, err error) {
71 cmdOpts = new(Options)
72 parser := flags.NewParser(cmdOpts, flags.HelpFlag)
73 parser.SubcommandsOptional = true
74 cmdOpts.OutputWriter = writer
75 addCommands(parser, cmdOpts)
76 nonParsedArgs, err := parser.Parse()
77 if err != nil {
78 if flagsErr, ok := err.(*flags.Error); ok && flagsErr.Type == flags.ErrHelp {
79 cmdOpts.Help = true
80 }
81 if !flags.WroteHelp(err) && !cmdOpts.CommandCompleted {
82 parser.WriteHelp(writer)
83 }
84 return cmdOpts, err
85 }
86 if cmdOpts.CommandCompleted {
87 return
88 }
89 if len(nonParsedArgs) > 0 {
90 return cmdOpts, fmt.Errorf("unknown argument(s): %v", nonParsedArgs)
91 }
92 err = cmdOpts.ValidateConfig()
93 return
94 }
95
96 func (c *Options) CompleteCommand(code int32) {
97 c.CommandCompleted = true
98 c.ExitCode = code
99 }
100
101
102 func (c *Options) Verbose() bool {
103 return c.Logging.LogLevel == "debug"
104 }
105
106 func (c *Options) GetConfigKind(arg string) (_ Kind, err error) {
107 if arg == "" {
108 return Kind(ConfigError), errors.New("no configuration provided")
109 }
110 if c.IsPgConnStr(arg) {
111 return Kind(ConfigPgURL), nil
112 }
113 var fi os.FileInfo
114 if fi, err = os.Stat(arg); err == nil {
115 if fi.IsDir() {
116 return Kind(ConfigFolder), nil
117 }
118 return Kind(ConfigFile), nil
119 }
120 return Kind(ConfigError), err
121 }
122
123 func (c *Options) IsPgConnStr(arg string) bool {
124 _, err := pgx.ParseConfig(arg)
125 return err == nil
126 }
127
128
129 func (c *Options) InitMetricReader(ctx context.Context) (err error) {
130 if c.Metrics.Metrics == "" {
131 c.MetricsReaderWriter, err = metrics.NewYAMLMetricReaderWriter(ctx, "")
132 return
133 }
134 if c.IsPgConnStr(c.Metrics.Metrics) {
135 c.MetricsReaderWriter, err = metrics.NewPostgresMetricReaderWriter(ctx, c.Metrics.Metrics)
136 } else {
137 c.MetricsReaderWriter, err = metrics.NewYAMLMetricReaderWriter(ctx, c.Metrics.Metrics)
138 }
139 return
140 }
141
142
143 func (c *Options) InitSourceReader(ctx context.Context) (err error) {
144 var configKind Kind
145 if configKind, err = c.GetConfigKind(c.Sources.Sources); err != nil {
146 return
147 }
148 switch configKind {
149 case ConfigPgURL:
150 c.SourcesReaderWriter, err = sources.NewPostgresSourcesReaderWriter(ctx, c.Sources.Sources)
151 default:
152 c.SourcesReaderWriter, err = sources.NewYAMLSourcesReaderWriter(ctx, c.Sources.Sources)
153 }
154 return
155 }
156
157
158 func (c *Options) InitConfigReaders(ctx context.Context) error {
159 return errors.Join(c.InitMetricReader(ctx), c.InitSourceReader(ctx))
160 }
161
162
163 func (c *Options) InitSinkWriter(ctx context.Context) (err error) {
164 metricDefs, err := c.MetricsReaderWriter.GetMetrics()
165 if err != nil {
166 return err
167 }
168 c.SinksWriter, err = sinks.NewSinkWriter(ctx, &c.Sinks, metricDefs)
169 return
170 }
171
172
173 func (c *Options) NeedsSchemaUpgrade() (upgrade bool, err error) {
174 if m, ok := c.SourcesReaderWriter.(metrics.Migrator); ok {
175 upgrade, err = m.NeedsMigration()
176 }
177 if upgrade || err != nil {
178 return
179 }
180 if m, ok := c.MetricsReaderWriter.(metrics.Migrator); ok {
181 return m.NeedsMigration()
182 }
183 return
184 }
185
186
187
188
189 func (c *Options) ValidateConfig() error {
190 if len(c.Sources.Sources)+len(c.Metrics.Metrics) == 0 {
191 return errors.New("both --sources and --metrics are empty")
192 }
193 switch {
194 case c.Sources.Sources == "" && c.IsPgConnStr(c.Metrics.Metrics):
195 c.Sources.Sources = c.Metrics.Metrics
196 case c.Metrics.Metrics == "" && c.IsPgConnStr(c.Sources.Sources):
197 c.Metrics.Metrics = c.Sources.Sources
198 }
199 if c.Sources.Refresh <= 1 {
200 return errors.New("--servers-refresh-loop-seconds must be greater than 1")
201 }
202 if c.Sources.MaxParallelConnectionsPerDb < 1 {
203 return errors.New("--max-parallel-connections-per-db must be >= 1")
204 }
205
206
207 if c.Sinks.BatchingDelay <= 0 || c.Sinks.BatchingDelay > time.Hour {
208 return errors.New("--batching-delay-ms must be between 0 and 3600000")
209 }
210
211 return nil
212 }
213