...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/source_reaper.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"cmp"
     5  	"context"
     6  	"errors"
     7  	"fmt"
     8  	"time"
     9  
    10  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    11  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    12  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    13  	"github.com/jackc/pgx/v5"
    14  )
    15  
    16  const minTickInterval = 1 // seconds - floor for GCD to help handle zero/negative intervals
    17  
    18  // SourceReaper manages metric collection for a single monitored source.
    19  // Instead of one goroutine per metric it runs a single GCD-based tick loop
    20  // and batches SQL queries via pgx.Batch when the source is a real Postgres
    21  // connection (non-pgbouncer, non-pgpool).
    22  type SourceReaper struct {
    23  	reaper          *Reaper
    24  	md              *sources.SourceConn
    25  	lastFetch       map[string]time.Time
    26  	lastUptimeS     int64               // last seen postmaster_uptime_s for restart detection
    27  	degradedMetrics map[string]struct{} // metrics that failed individual retry; executed via fetchMetric until they recover
    28  }
    29  
    30  // NewSourceReaper creates a SourceReaper for the given source connection.
    31  func NewSourceReaper(r *Reaper, md *sources.SourceConn) *SourceReaper {
    32  	sr := &SourceReaper{
    33  		reaper:          r,
    34  		md:              md,
    35  		lastFetch:       make(map[string]time.Time),
    36  		degradedMetrics: make(map[string]struct{}),
    37  	}
    38  	return sr
    39  }
    40  
    41  // activeMetrics returns a snapshot copy of the currently active metric intervals
    42  // based on the source's recovery state. Copying under the lock prevents data
    43  // races when the caller iterates after the lock is released.
    44  func (sr *SourceReaper) activeMetrics() map[string]time.Duration {
    45  	sr.md.RLock()
    46  	defer sr.md.RUnlock()
    47  	am := sr.md.Metrics
    48  	if sr.md.IsInRecovery && len(sr.md.MetricsStandby) > 0 {
    49  		am = sr.md.MetricsStandby
    50  	}
    51  	c := make(map[string]time.Duration, len(am))
    52  	for k, v := range am {
    53  		c[k] = time.Duration(v) * time.Second
    54  	}
    55  	return c
    56  }
    57  
    58  // GCDSlice computes GCD across a slice. Returns 0 for empty input.
    59  func GCDSlice(vals []int) int {
    60  	if len(vals) == 0 {
    61  		return 0
    62  	}
    63  	g := vals[0]
    64  	for _, v := range vals[1:] {
    65  		for v != 0 {
    66  			g, v = v, g%v
    67  		}
    68  	}
    69  	return g
    70  }
    71  
    72  // calcTickInterval computes GCD of all metric intervals with a minimum floor.
    73  func (sr *SourceReaper) calcTickInterval() time.Duration {
    74  	am := sr.activeMetrics()
    75  	intervals := make([]int, 0, len(am))
    76  	for _, d := range am {
    77  		intervals = append(intervals, max(int(d.Seconds()), minTickInterval))
    78  	}
    79  	return time.Duration(max(GCDSlice(intervals), minTickInterval)) * time.Second
    80  }
    81  
    82  // cacheKey returns the instance-level cache key for the given metric.
    83  func (sr *SourceReaper) cacheKey(m metrics.Metric, name string) string {
    84  	age := sr.reaper.Metrics.CacheAge()
    85  	if m.IsInstanceLevel && age > 0 && sr.md.GetMetricInterval(name) < age {
    86  		return fmt.Sprintf("%s:%s", sr.md.GetClusterIdentifier(), name)
    87  	}
    88  	return ""
    89  }
    90  
    91  // isRoleExcluded returns true if the metric should be skipped based on the
    92  // source's recovery state (e.g. primary-only metric on a standby).
    93  func (sr *SourceReaper) isRoleExcluded(m metrics.Metric) bool {
    94  	sr.md.RLock()
    95  	defer sr.md.RUnlock()
    96  	return (m.PrimaryOnly() && sr.md.IsInRecovery) || (m.StandbyOnly() && !sr.md.IsInRecovery)
    97  }
    98  
    99  // sendEnvelope adds sysinfo and dispatches a MeasurementEnvelope to the
   100  // measurement channel.
   101  func (sr *SourceReaper) sendEnvelope(ctx context.Context, name, storageName string, data metrics.Measurements) {
   102  	log.GetLogger(ctx).WithField("metric", name).WithField("rows", len(data)).Info("measurements fetched")
   103  	sr.reaper.AddSysinfoToMeasurements(data, sr.md)
   104  	sr.reaper.measurementCh <- metrics.MeasurementEnvelope{
   105  		DBName:     sr.md.Name,
   106  		MetricName: cmp.Or(storageName, name),
   107  		Data:       data,
   108  		CustomTags: sr.md.CustomTags,
   109  	}
   110  }
   111  
   112  // dispatchMetricData handles the post-fetch workflow for a collected metric:
   113  // caching, sysinfo enrichment, sending, and restart detection.
   114  func (sr *SourceReaper) dispatchMetricData(ctx context.Context, name string, metric metrics.Metric, data metrics.Measurements) {
   115  	if key := sr.cacheKey(metric, name); key != "" {
   116  		sr.reaper.measurementCache.Put(key, data)
   117  	}
   118  	sr.sendEnvelope(ctx, name, metric.StorageName, data)
   119  	if name == "db_stats" {
   120  		sr.detectServerRestart(ctx, data)
   121  	}
   122  }
   123  
   124  // batchEntry holds the minimum info needed to execute and dispatch a metric query.
   125  type batchEntry struct {
   126  	name   string
   127  	metric metrics.Metric
   128  	sql    string
   129  }
   130  
   131  // Run is the main loop for a single source. It replaces N per-metric goroutines
   132  // with one goroutine that batches SQL queries at GCD-aligned ticks.
   133  func (sr *SourceReaper) Run(ctx context.Context) {
   134  	l := log.GetLogger(ctx).WithField("source", sr.md.Name)
   135  	ctx = log.WithLogger(ctx, l)
   136  	var err error
   137  	for {
   138  		if err = sr.md.FetchRuntimeInfo(ctx, false); err != nil {
   139  			l.WithError(err).Warning("could not refresh runtime info")
   140  		}
   141  
   142  		now := time.Now()
   143  		var batch []batchEntry
   144  
   145  		for name, interval := range sr.activeMetrics() {
   146  			if interval <= 0 {
   147  				continue
   148  			}
   149  			if lf := sr.lastFetch[name]; !lf.IsZero() && now.Sub(lf) < interval {
   150  				continue
   151  			}
   152  
   153  			metric, ok := metricDefs.GetMetricDef(name)
   154  			if !ok || sr.isRoleExcluded(metric) {
   155  				continue
   156  			}
   157  
   158  			switch {
   159  			case name == specialMetricServerLogEventCounts:
   160  				if sr.lastFetch[name].IsZero() {
   161  					go func() {
   162  						if e := sr.runLogParser(ctx); e != nil {
   163  							l.WithError(e).Error("log parser error")
   164  						}
   165  					}()
   166  				}
   167  			case IsDirectlyFetchableMetric(sr.md, name):
   168  				err = sr.fetchOSMetric(ctx, name)
   169  				sr.lastFetch[name] = time.Now()
   170  			case name == specialMetricChangeEvents || name == specialMetricInstanceUp:
   171  				err = sr.fetchSpecialMetric(ctx, name, metric.StorageName)
   172  				sr.lastFetch[name] = time.Now()
   173  			default:
   174  				if cached := sr.reaper.GetMeasurementCache(sr.cacheKey(metric, name)); len(cached) > 0 {
   175  					l.WithField("metric", name).Info("instance level cache hit")
   176  					sr.sendEnvelope(ctx, name, metric.StorageName, cached)
   177  					sr.lastFetch[name] = time.Now()
   178  					break
   179  				}
   180  				sr.md.RLock()
   181  				version := sr.md.Version
   182  				sr.md.RUnlock()
   183  				sql := metric.GetSQL(version)
   184  				if sql == "" {
   185  					l.WithField("metric", name).WithField("version", version).Warning("no SQL found for metric version")
   186  					sr.lastFetch[name] = time.Now()
   187  					break
   188  				}
   189  				if _, degraded := sr.degradedMetrics[name]; degraded {
   190  					if err = sr.fetchMetric(ctx, batchEntry{name: name, metric: metric, sql: sql}); err != nil {
   191  						l.WithError(err).WithField("metric", name).Error("degraded metric fetch failed")
   192  					} else {
   193  						l.WithField("metric", name).Info("degraded metric recovered, returning to batch execution")
   194  						delete(sr.degradedMetrics, name)
   195  					}
   196  					sr.lastFetch[name] = time.Now()
   197  					break
   198  				}
   199  				batch = append(batch, batchEntry{name: name, metric: metric, sql: sql})
   200  			}
   201  			if err != nil {
   202  				l.WithError(err).WithField("metric", name).Error("failed to fetch metric")
   203  			}
   204  		}
   205  
   206  		if len(batch) > 0 {
   207  			if sr.md.IsPostgresSource() {
   208  				err = sr.executeBatch(ctx, batch)
   209  			} else {
   210  				for _, e := range batch {
   211  					err = sr.fetchMetric(ctx, e)
   212  				}
   213  			}
   214  			if err != nil {
   215  				l.WithError(err).Error("failed to fetch metrics")
   216  			}
   217  
   218  			now := time.Now()
   219  			for _, e := range batch {
   220  				sr.lastFetch[e.name] = now
   221  			}
   222  		}
   223  		select {
   224  		case <-ctx.Done():
   225  			return
   226  		case <-time.After(sr.calcTickInterval()):
   227  		}
   228  	}
   229  }
   230  
   231  // executeBatch sends all SQLs in a single pgx.Batch round-trip, dispatching
   232  // each result immediately as it arrives. If any query fails, PostgreSQL's
   233  // extended protocol aborts all subsequent queries in the same sync boundary
   234  // (cascade failure). Any entry that returns an error from the batch is retried
   235  // individually via fetchMetric to isolate real failures from cascade failures.
   236  // Entries that fail even after the individual retry are marked as degraded
   237  // so that subsequent runs use fetchMetric for them until they recover.
   238  func (sr *SourceReaper) executeBatch(ctx context.Context, entries []batchEntry) error {
   239  	batch := &pgx.Batch{}
   240  	for _, e := range entries {
   241  		batch.Queue(e.sql)
   242  	}
   243  
   244  	br := sr.md.Conn.SendBatch(ctx, batch)
   245  	defer func() { _ = br.Close() }()
   246  
   247  	var (
   248  		errs    []error
   249  		retries []batchEntry
   250  	)
   251  	for _, e := range entries {
   252  		rows, err := br.Query()
   253  		if err != nil {
   254  			// May be a real error or a cascade from an earlier failure; retry individually.
   255  			retries = append(retries, e)
   256  			continue
   257  		}
   258  		errs = append(errs, sr.CollectAndDispatch(ctx, rows, e.name, e.metric))
   259  	}
   260  
   261  	for _, e := range retries {
   262  		if err := sr.fetchMetric(ctx, e); err != nil {
   263  			errs = append(errs, fmt.Errorf("failed to fetch metric %s: %v", e.name, err))
   264  			log.GetLogger(ctx).WithField("metric", e.name).Warning("metric degraded after repeated failures, switching to individual fetch")
   265  			sr.degradedMetrics[e.name] = struct{}{}
   266  		}
   267  	}
   268  	return errors.Join(errs...)
   269  }
   270  
   271  // fetchMetric executes a single SQL query and returns the resulting measurements.
   272  func (sr *SourceReaper) fetchMetric(ctx context.Context, entry batchEntry) error {
   273  	rows, err := sr.md.Conn.Query(ctx, entry.sql, pgx.QueryExecModeSimpleProtocol)
   274  	if err != nil {
   275  		return err
   276  	}
   277  	return sr.CollectAndDispatch(ctx, rows, entry.name, entry.metric)
   278  }
   279  
   280  // CollectAndDispatch is a helper that collects rows from a pgx.Rows and dispatches them.
   281  func (sr *SourceReaper) CollectAndDispatch(ctx context.Context, rows pgx.Rows, name string, metric metrics.Metric) error {
   282  	data, err := pgx.CollectRows(rows, metrics.RowToMeasurement)
   283  	if err != nil {
   284  		return err
   285  	}
   286  	if len(data) > 0 {
   287  		sr.dispatchMetricData(ctx, name, metric, data)
   288  	}
   289  	return nil
   290  }
   291  
   292  // fetchOSMetric handles gopsutil-based OS metrics.
   293  func (sr *SourceReaper) fetchOSMetric(ctx context.Context, name string) error {
   294  	msg, err := sr.reaper.FetchStatsDirectlyFromOS(ctx, sr.md, name)
   295  	if err != nil {
   296  		return fmt.Errorf("could not read metric from OS: %v", err)
   297  	}
   298  	if msg != nil && len(msg.Data) > 0 {
   299  		log.GetLogger(ctx).WithField("metric", name).WithField("rows", len(msg.Data)).Info("measurements fetched")
   300  		sr.reaper.measurementCh <- *msg
   301  	}
   302  	return nil
   303  }
   304  
   305  // fetchSpecialMetric handles change_events and instance_up metrics.
   306  func (sr *SourceReaper) fetchSpecialMetric(ctx context.Context, name, storageName string) error {
   307  	var (
   308  		data metrics.Measurements
   309  		err  error
   310  	)
   311  	switch name {
   312  	case specialMetricChangeEvents:
   313  		data, err = sr.reaper.GetObjectChangesMeasurement(ctx, sr.md)
   314  	case specialMetricInstanceUp:
   315  		data, err = sr.reaper.GetInstanceUpMeasurement(ctx, sr.md)
   316  	}
   317  	if err != nil {
   318  		return fmt.Errorf("failed to fetch special metric: %v", err)
   319  	}
   320  	if len(data) > 0 {
   321  		sr.sendEnvelope(ctx, name, storageName, data)
   322  	}
   323  	return err
   324  }
   325  
   326  // runLogParser launches the server log event counts parser.
   327  func (sr *SourceReaper) runLogParser(ctx context.Context) error {
   328  	lp, err := NewLogParser(ctx, sr.md, sr.reaper.measurementCh)
   329  	if err != nil {
   330  		return fmt.Errorf("failed to initialize log parser: %v", err)
   331  	}
   332  	if err := lp.ParseLogs(); err != nil {
   333  		return fmt.Errorf("log parser error: %v", err)
   334  	}
   335  	return nil
   336  }
   337  
   338  // detectServerRestart checks for PostgreSQL server restarts via postmaster_uptime_s
   339  // in db_stats metric data and emits an object_changes measurement if detected.
   340  func (sr *SourceReaper) detectServerRestart(ctx context.Context, data metrics.Measurements) {
   341  	if len(data) == 0 {
   342  		return
   343  	}
   344  	uptimeS, ok := data[0]["postmaster_uptime_s"].(int64)
   345  	if !ok {
   346  		return
   347  	}
   348  	prev := sr.lastUptimeS
   349  	sr.lastUptimeS = uptimeS
   350  	if prev > 0 && uptimeS < prev {
   351  		l := log.GetLogger(ctx)
   352  		l.Warning("Detected server restart (or failover)")
   353  		entry := metrics.NewMeasurement(data.GetEpoch())
   354  		entry["details"] = "Detected server restart (or failover)"
   355  		sr.reaper.measurementCh <- metrics.MeasurementEnvelope{
   356  			DBName:     sr.md.Name,
   357  			MetricName: "object_changes",
   358  			Data:       metrics.Measurements{entry},
   359  			CustomTags: sr.md.CustomTags,
   360  		}
   361  	}
   362  }
   363