...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/metrics/postgres.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/metrics

     1  package metrics
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  
     7  	"github.com/jackc/pgx/v5/pgconn"
     8  
     9  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    10  )
    11  
    12  func NewPostgresMetricReaderWriter(ctx context.Context, connstr string) (ReaderWriter, error) {
    13  	conn, err := db.New(ctx, connstr)
    14  	if err != nil {
    15  		return nil, err
    16  	}
    17  	return NewPostgresMetricReaderWriterConn(ctx, conn)
    18  }
    19  
    20  func NewPostgresMetricReaderWriterConn(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error) {
    21  	if err := initSchema(ctx, conn); err != nil {
    22  		return nil, err
    23  	}
    24  	return &dbMetricReaderWriter{
    25  		ctx:      ctx,
    26  		configDb: conn,
    27  	}, conn.Ping(ctx)
    28  }
    29  
    30  type dbMetricReaderWriter struct {
    31  	ctx      context.Context
    32  	configDb db.PgxIface
    33  }
    34  
    35  var (
    36  	ErrMetricNotFound = errors.New("metric not found")
    37  	ErrPresetNotFound = errors.New("preset not found")
    38  	ErrInvalidMetric  = errors.New("invalid metric")
    39  	ErrInvalidPreset  = errors.New("invalid preset")
    40  	ErrMetricExists   = errors.New("metric already exists")
    41  	ErrPresetExists   = errors.New("preset already exists")
    42  )
    43  
    44  // make sure *dbMetricReaderWriter implements the Migrator interface
    45  var _ Migrator = (*dbMetricReaderWriter)(nil)
    46  
    47  // writeMetricsToPostgres writes the metrics and presets definitions to the
    48  // pgwatch.metric and pgwatch.preset tables in the ConfigDB.
    49  func writeMetricsToPostgres(ctx context.Context, conn db.PgxIface, metricDefs *Metrics) error {
    50  	tx, err := conn.Begin(ctx)
    51  	if err != nil {
    52  		return err
    53  	}
    54  	defer func() { _ = tx.Rollback(ctx) }()
    55  	for metricName, metric := range metricDefs.MetricDefs {
    56  		_, err = tx.Exec(ctx, `INSERT INTO pgwatch.metric (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
    57  		values ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (name) 
    58  		DO UPDATE SET sqls = $2, init_sql = $3, description = $4, node_status = $5, 
    59  		gauges = $6, is_instance_level = $7, storage_name = $8`,
    60  			metricName, metric.SQLs, metric.InitSQL, metric.Description, metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
    61  		if err != nil {
    62  			return err
    63  		}
    64  	}
    65  	for presetName, preset := range metricDefs.PresetDefs {
    66  		_, err = tx.Exec(ctx, `INSERT INTO pgwatch.preset (name, description, metrics) 
    67  		VALUES ($1, $2, $3) ON CONFLICT (name) DO UPDATE SET description = $2, metrics = $3;`,
    68  			presetName, preset.Description, preset.Metrics)
    69  		if err != nil {
    70  			return err
    71  		}
    72  	}
    73  	return tx.Commit(ctx)
    74  }
    75  
    76  // ReadMetricsFromPostgres reads the metrics and presets definitions from the pgwatch.metric and pgwatch.preset tables from the ConfigDB.
    77  func (dmrw *dbMetricReaderWriter) GetMetrics() (metricDefMapNew *Metrics, err error) {
    78  	ctx := dmrw.ctx
    79  	conn := dmrw.configDb
    80  	metricDefMapNew = &Metrics{MetricDefs{}, PresetDefs{}}
    81  	rows, err := conn.Query(ctx, `SELECT name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name FROM pgwatch.metric`)
    82  	if err != nil {
    83  		return nil, err
    84  	}
    85  	defer rows.Close()
    86  	for rows.Next() {
    87  		metric := Metric{}
    88  		var name string
    89  		err = rows.Scan(&name, &metric.SQLs, &metric.InitSQL, &metric.Description, &metric.NodeStatus, &metric.Gauges, &metric.IsInstanceLevel, &metric.StorageName)
    90  		if err != nil {
    91  			return nil, err
    92  		}
    93  		metricDefMapNew.MetricDefs[name] = metric
    94  	}
    95  	rows, err = conn.Query(ctx, `SELECT name, description, metrics FROM pgwatch.preset`)
    96  	if err != nil {
    97  		return nil, err
    98  	}
    99  	defer rows.Close()
   100  	for rows.Next() {
   101  		preset := Preset{}
   102  		var name string
   103  		err = rows.Scan(&name, &preset.Description, &preset.Metrics)
   104  		if err != nil {
   105  			return nil, err
   106  		}
   107  		metricDefMapNew.PresetDefs[name] = preset
   108  	}
   109  	return metricDefMapNew, nil
   110  }
   111  
   112  func (dmrw *dbMetricReaderWriter) WriteMetrics(metricDefs *Metrics) error {
   113  	return writeMetricsToPostgres(dmrw.ctx, dmrw.configDb, metricDefs)
   114  }
   115  
   116  func (dmrw *dbMetricReaderWriter) DeleteMetric(metricName string) error {
   117  	_, err := dmrw.configDb.Exec(dmrw.ctx, `DELETE FROM pgwatch.metric WHERE name = $1`, metricName)
   118  	return err
   119  }
   120  
   121  func (dmrw *dbMetricReaderWriter) UpdateMetric(metricName string, metric Metric) error {
   122  	ct, err := dmrw.configDb.Exec(dmrw.ctx, `INSERT INTO pgwatch.metric
   123  (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
   124  VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 
   125  ON CONFLICT (name) DO UPDATE SET 
   126  sqls = $2, init_sql = $3, description = $4, node_status = $5, gauges = $6, is_instance_level = $7, storage_name = $8`,
   127  		metricName, db.MarshallParamToJSONB(metric.SQLs), metric.InitSQL, metric.Description,
   128  		metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
   129  	if err == nil && ct.RowsAffected() == 0 {
   130  		return ErrMetricNotFound
   131  	}
   132  	return err
   133  }
   134  
   135  func (dmrw *dbMetricReaderWriter) CreateMetric(metricName string, metric Metric) error {
   136  	_, err := dmrw.configDb.Exec(dmrw.ctx, `INSERT INTO pgwatch.metric
   137  (name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name)
   138  VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
   139  		metricName, db.MarshallParamToJSONB(metric.SQLs), metric.InitSQL, metric.Description,
   140  		metric.NodeStatus, metric.Gauges, metric.IsInstanceLevel, metric.StorageName)
   141  	if err != nil {
   142  		// Check for unique constraint violation using PostgreSQL error code
   143  		var pgErr *pgconn.PgError
   144  		if errors.As(err, &pgErr) && pgErr.SQLState() == "23505" {
   145  			return ErrMetricExists
   146  		}
   147  	}
   148  	return err
   149  }
   150  
   151  func (dmrw *dbMetricReaderWriter) DeletePreset(presetName string) error {
   152  	_, err := dmrw.configDb.Exec(dmrw.ctx, `DELETE FROM pgwatch.preset WHERE name = $1`, presetName)
   153  	return err
   154  }
   155  
   156  func (dmrw *dbMetricReaderWriter) UpdatePreset(presetName string, preset Preset) error {
   157  	sql := `INSERT INTO pgwatch.preset(name, description, metrics) VALUES ($1, $2, $3)
   158  	ON CONFLICT (name) DO UPDATE SET description = $2, metrics = $3`
   159  	ct, err := dmrw.configDb.Exec(dmrw.ctx, sql, presetName, preset.Description, db.MarshallParamToJSONB(preset.Metrics))
   160  	if err == nil && ct.RowsAffected() == 0 {
   161  		return ErrPresetNotFound
   162  	}
   163  	return err
   164  }
   165  
   166  func (dmrw *dbMetricReaderWriter) CreatePreset(presetName string, preset Preset) error {
   167  	sql := `INSERT INTO pgwatch.preset(name, description, metrics) VALUES ($1, $2, $3)`
   168  	_, err := dmrw.configDb.Exec(dmrw.ctx, sql, presetName, preset.Description, db.MarshallParamToJSONB(preset.Metrics))
   169  	if err != nil {
   170  		// Check for unique constraint violation using PostgreSQL error code
   171  		var pgErr *pgconn.PgError
   172  		if errors.As(err, &pgErr) && pgErr.Code == "23505" {
   173  			return ErrPresetExists
   174  		}
   175  	}
   176  	return err
   177  }
   178