...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/metrics/postgres.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/metrics

     1  package metrics
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  
     7  	"github.com/jackc/pgx/v5/pgconn"
     8  
     9  	"github.com/cybertec-postgresql/pgwatch/v5/internal/db"
    10  )
    11  
    12  func NewPostgresMetricReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error) {
    13  	conn, err := db.New(ctx, connstr)
    14  	if err != nil {
    15  		return nil, err
    16  	}
    17  	return NewPostgresMetricReaderWriterConn(ctx, conn)
    18  }
    19  
    20  func NewPostgresMetricReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error) {
    21  	if err := initSchema(ctx, conn); err != nil {
    22  		return nil, err
    23  	}
    24  	dmrw := &dbMetricReaderWriter{
    25  		ctx:      ctx,
    26  		configDb: conn,
    27  	}
    28  	return dmrw, conn.Ping(ctx)
    29  }
    30  
    31  type dbMetricReaderWriter struct {
    32  	ctx      context.Context
    33  	configDb db.PgxIface
    34  }
    35  
    36  var (
    37  	ErrNeedsMigration = errors.New("config database schema is outdated, please run migrations using `pgwatch config upgrade` command")
    38  	ErrMetricNotFound = errors.New("metric not found")
    39  	ErrPresetNotFound = errors.New("preset not found")
    40  	ErrInvalidMetric  = errors.New("invalid metric")
    41  	ErrInvalidPreset  = errors.New("invalid preset")
    42  	ErrMetricExists   = errors.New("metric already exists")
    43  	ErrPresetExists   = errors.New("preset already exists")
    44  )
    45  
    46  // make sure *dbMetricReaderWriter implements the Migrator interface
    47  var _ db.Migrator = (*dbMetricReaderWriter)(nil)
    48  
    49  // writeMetricsToPostgres writes the metrics and presets definitions to the
    50  // pgwatch.metric and pgwatch.preset tables in the ConfigDB.
    51  func writeMetricsToPostgres(ctx context.Context, conn db.PgxIface, metricDefs *Metrics) error {
    52  	tx, err := conn.Begin(ctx)
    53  	if err != nil {
    54  		return err
    55  	}
    56  	defer func() { _ = tx.Rollback(ctx) }()
    57  	for metricName, metric := range metricDefs.MetricDefs {
    58  		_, err = tx.Exec(ctx, `INSERT INTO pgwatch.metric (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
    59  		values ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (name) 
    60  		DO UPDATE SET sqls = $2, init_sql = $3, description = $4, node_status = $5, 
    61  		gauges = $6, is_instance_level = $7, storage_name = $8`,
    62  			metricName, metric.SQLs, metric.InitSQL, metric.Description, metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
    63  		if err != nil {
    64  			return err
    65  		}
    66  	}
    67  	for presetName, preset := range metricDefs.PresetDefs {
    68  		_, err = tx.Exec(ctx, `INSERT INTO pgwatch.preset (name, description, metrics) 
    69  		VALUES ($1, $2, $3) ON CONFLICT (name) DO UPDATE SET description = $2, metrics = $3;`,
    70  			presetName, preset.Description, preset.Metrics)
    71  		if err != nil {
    72  			return err
    73  		}
    74  	}
    75  	return tx.Commit(ctx)
    76  }
    77  
    78  // ReadMetricsFromPostgres reads the metrics and presets definitions from the pgwatch.metric and pgwatch.preset tables from the ConfigDB.
    79  func (dmrw *dbMetricReaderWriter) GetMetrics() (metricDefMapNew *Metrics, err error) {
    80  	ctx := dmrw.ctx
    81  	conn := dmrw.configDb
    82  	metricDefMapNew = &Metrics{MetricDefs{}, PresetDefs{}}
    83  	rows, err := conn.Query(ctx, `SELECT name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name FROM pgwatch.metric`)
    84  	if err != nil {
    85  		return nil, err
    86  	}
    87  	defer rows.Close()
    88  	for rows.Next() {
    89  		metric := Metric{}
    90  		var name string
    91  		err = rows.Scan(&name, &metric.SQLs, &metric.InitSQL, &metric.Description, &metric.NodeStatus, &metric.Gauges, &metric.IsInstanceLevel, &metric.StorageName)
    92  		if err != nil {
    93  			return nil, err
    94  		}
    95  		metricDefMapNew.MetricDefs[name] = metric
    96  	}
    97  	rows, err = conn.Query(ctx, `SELECT name, description, metrics FROM pgwatch.preset`)
    98  	if err != nil {
    99  		return nil, err
   100  	}
   101  	defer rows.Close()
   102  	for rows.Next() {
   103  		preset := Preset{}
   104  		var name string
   105  		err = rows.Scan(&name, &preset.Description, &preset.Metrics)
   106  		if err != nil {
   107  			return nil, err
   108  		}
   109  		metricDefMapNew.PresetDefs[name] = preset
   110  	}
   111  	return metricDefMapNew, nil
   112  }
   113  
   114  func (dmrw *dbMetricReaderWriter) WriteMetrics(metricDefs *Metrics) error {
   115  	return writeMetricsToPostgres(dmrw.ctx, dmrw.configDb, metricDefs)
   116  }
   117  
   118  func (dmrw *dbMetricReaderWriter) DeleteMetric(metricName string) error {
   119  	_, err := dmrw.configDb.Exec(dmrw.ctx, `DELETE FROM pgwatch.metric WHERE name = $1`, metricName)
   120  	return err
   121  }
   122  
   123  func (dmrw *dbMetricReaderWriter) UpdateMetric(metricName string, metric Metric) error {
   124  	ct, err := dmrw.configDb.Exec(dmrw.ctx, `INSERT INTO pgwatch.metric
   125  (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
   126  VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 
   127  ON CONFLICT (name) DO UPDATE SET 
   128  sqls = $2, init_sql = $3, description = $4, node_status = $5, gauges = $6, is_instance_level = $7, storage_name = $8`,
   129  		metricName, db.MarshallParamToJSONB(metric.SQLs), metric.InitSQL, metric.Description,
   130  		metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
   131  	if err == nil && ct.RowsAffected() == 0 {
   132  		return ErrMetricNotFound
   133  	}
   134  	return err
   135  }
   136  
   137  func (dmrw *dbMetricReaderWriter) CreateMetric(metricName string, metric Metric) error {
   138  	_, err := dmrw.configDb.Exec(dmrw.ctx, `INSERT INTO pgwatch.metric
   139  (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
   140  VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
   141  		metricName, db.MarshallParamToJSONB(metric.SQLs), metric.InitSQL, metric.Description,
   142  		metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
   143  	if err != nil {
   144  		// Check for unique constraint violation using PostgreSQL error code
   145  		var pgErr *pgconn.PgError
   146  		if errors.As(err, &pgErr) && pgErr.SQLState() == "23505" {
   147  			return ErrMetricExists
   148  		}
   149  	}
   150  	return err
   151  }
   152  
   153  func (dmrw *dbMetricReaderWriter) DeletePreset(presetName string) error {
   154  	_, err := dmrw.configDb.Exec(dmrw.ctx, `DELETE FROM pgwatch.preset WHERE name = $1`, presetName)
   155  	return err
   156  }
   157  
   158  func (dmrw *dbMetricReaderWriter) UpdatePreset(presetName string, preset Preset) error {
   159  	sql := `INSERT INTO pgwatch.preset(name, description, metrics) VALUES ($1, $2, $3)
   160  	ON CONFLICT (name) DO UPDATE SET description = $2, metrics = $3`
   161  	ct, err := dmrw.configDb.Exec(dmrw.ctx, sql, presetName, preset.Description, db.MarshallParamToJSONB(preset.Metrics))
   162  	if err == nil && ct.RowsAffected() == 0 {
   163  		return ErrPresetNotFound
   164  	}
   165  	return err
   166  }
   167  
   168  func (dmrw *dbMetricReaderWriter) CreatePreset(presetName string, preset Preset) error {
   169  	sql := `INSERT INTO pgwatch.preset(name, description, metrics) VALUES ($1, $2, $3)`
   170  	_, err := dmrw.configDb.Exec(dmrw.ctx, sql, presetName, preset.Description, db.MarshallParamToJSONB(preset.Metrics))
   171  	if err != nil {
   172  		// Check for unique constraint violation using PostgreSQL error code
   173  		var pgErr *pgconn.PgError
   174  		if errors.As(err, &pgErr) && pgErr.Code == "23505" {
   175  			return ErrPresetExists
   176  		}
   177  	}
   178  	return err
   179  }
   180