...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts/cmdconfig.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts

     1  package cmdopts
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  
     8  	"github.com/cybertec-postgresql/pgwatch/v5/internal/db"
     9  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    10  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
    11  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    12  )
    13  
    14  type ConfigCommand struct {
    15  	owner   *Options
    16  	Init    ConfigInitCommand    `command:"init" description:"Initialize configuration"`
    17  	Upgrade ConfigUpgradeCommand `command:"upgrade" description:"Upgrade configuration schema"`
    18  }
    19  
    20  func NewConfigCommand(owner *Options) *ConfigCommand {
    21  	return &ConfigCommand{
    22  		owner:   owner,
    23  		Init:    ConfigInitCommand{owner: owner},
    24  		Upgrade: ConfigUpgradeCommand{owner: owner},
    25  	}
    26  }
    27  
    28  type ConfigInitCommand struct {
    29  	owner *Options
    30  }
    31  
    32  // Execute initializes the configuration.
    33  func (cmd *ConfigInitCommand) Execute([]string) (err error) {
    34  	if err = cmd.owner.ValidateConfig(); err != nil {
    35  		return
    36  	}
    37  	if cmd.owner.Metrics.Metrics > "" {
    38  		err = cmd.InitMetrics()
    39  	}
    40  	if cmd.owner.Sources.Sources > "" && cmd.owner.Metrics.Metrics != cmd.owner.Sources.Sources {
    41  		err = errors.Join(err, cmd.InitSources())
    42  	}
    43  	if len(cmd.owner.Sinks.Sinks) > 0 {
    44  		err = errors.Join(err, cmd.InitSinks())
    45  	}
    46  	cmd.owner.CompleteCommand(map[bool]int32{
    47  		true:  ExitCodeOK,
    48  		false: ExitCodeConfigError,
    49  	}[err == nil])
    50  	return
    51  }
    52  
    53  // InitSources initializes the sources configuration.
    54  func (cmd *ConfigInitCommand) InitSources() (err error) {
    55  	ctx := context.Background()
    56  	opts := cmd.owner
    57  	if opts.IsPgConnStr(opts.Sources.Sources) {
    58  		return opts.InitSourceReader(ctx)
    59  	}
    60  	rw, _ := sources.NewYAMLSourcesReaderWriter(ctx, opts.Sources.Sources)
    61  	return rw.WriteSources(sources.Sources{sources.Source{}})
    62  }
    63  
    64  // InitMetrics initializes the metrics configuration.
    65  func (cmd *ConfigInitCommand) InitMetrics() (err error) {
    66  	ctx := context.Background()
    67  	opts := cmd.owner
    68  	err = opts.InitMetricReader(ctx)
    69  	if err != nil || opts.IsPgConnStr(opts.Metrics.Metrics) {
    70  		return // nothing to do, database initialized automatically
    71  	}
    72  	reader, _ := metrics.NewYAMLMetricReaderWriter(ctx, "")
    73  	defMetrics, _ := reader.GetMetrics()
    74  	return opts.MetricsReaderWriter.WriteMetrics(defMetrics)
    75  }
    76  
    77  // InitSinks initializes the sinks configuration.
    78  func (cmd *ConfigInitCommand) InitSinks() (err error) {
    79  	ctx := context.Background()
    80  	opts := cmd.owner
    81  	return opts.InitSinkWriter(ctx)
    82  }
    83  
    84  type ConfigUpgradeCommand struct {
    85  	owner *Options
    86  }
    87  
    88  // Execute upgrades the configuration schema.
    89  func (cmd *ConfigUpgradeCommand) Execute([]string) (err error) {
    90  	opts := cmd.owner
    91  	// For upgrade command, validate that at least one component is specified
    92  	if len(opts.Sources.Sources)+len(opts.Metrics.Metrics)+len(opts.Sinks.Sinks) == 0 {
    93  		opts.CompleteCommand(ExitCodeConfigError)
    94  		return errors.New("at least one of --sources, --metrics, or --sink must be specified")
    95  	}
    96  
    97  	ctx := context.Background()
    98  
    99  	f := func(uri string, newMigratorFunc func() (any, error)) error {
   100  		if uri == "" {
   101  			return nil
   102  		}
   103  		if !opts.IsPgConnStr(uri) {
   104  			return fmt.Errorf("cannot upgrade storage %s: %w", uri, errors.ErrUnsupported)
   105  		}
   106  		m, initErr := newMigratorFunc()
   107  		if initErr != nil {
   108  			return initErr
   109  		}
   110  		return m.(db.Migrator).Migrate()
   111  
   112  	}
   113  
   114  	err = f(opts.Sources.Sources, func() (any, error) {
   115  		return sources.NewPostgresSourcesReaderWriter(ctx, opts.Sources.Sources)
   116  	})
   117  
   118  	err = errors.Join(err, f(opts.Metrics.Metrics, func() (any, error) {
   119  		return metrics.NewPostgresMetricReaderWriter(ctx, opts.Metrics.Metrics)
   120  	}))
   121  
   122  	for _, uri := range opts.Sinks.Sinks {
   123  		err = errors.Join(err, f(uri, func() (any, error) {
   124  			return sinks.NewPostgresSinkMigrator(ctx, uri)
   125  		}))
   126  	}
   127  
   128  	if err == nil {
   129  		opts.CompleteCommand(ExitCodeOK)
   130  		return nil
   131  	}
   132  
   133  	// Check if all errors are ErrUnsupported
   134  	allUnsupported := true
   135  	for _, e := range err.(interface{ Unwrap() []error }).Unwrap() {
   136  		if !errors.Is(e, errors.ErrUnsupported) {
   137  			allUnsupported = false
   138  			break
   139  		}
   140  		fmt.Fprintln(opts.OutputWriter, e)
   141  	}
   142  
   143  	if allUnsupported {
   144  		opts.CompleteCommand(ExitCodeOK)
   145  		return nil
   146  	}
   147  	opts.CompleteCommand(ExitCodeConfigError)
   148  	return err
   149  }
   150