...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/reaper.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"cmp"
     5  	"context"
     6  	"fmt"
     7  	"runtime"
     8  	"slices"
     9  	"strings"
    10  	"time"
    11  
    12  	"sync/atomic"
    13  
    14  	"github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
    15  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    16  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    17  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
    18  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    19  )
    20  
    21  const (
    22  	specialMetricChangeEvents         = "change_events"
    23  	specialMetricServerLogEventCounts = "server_log_event_counts"
    24  	specialMetricInstanceUp           = "instance_up"
    25  )
    26  
    27  var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
    28  
    29  var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
    30  var metricsConfig metrics.MetricIntervals                 // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
    31  var metricDefs = NewConcurrentMetricDefs()
    32  
    33  // Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
    34  type Reaper struct {
    35  	*cmdopts.Options
    36  	ready                atomic.Bool
    37  	measurementCh        chan metrics.MeasurementEnvelope
    38  	measurementCache     *InstanceMetricCache
    39  	logger               log.Logger
    40  	monitoredSources     sources.SourceConns
    41  	prevLoopMonitoredDBs sources.SourceConns
    42  	cancelFuncs          map[string]context.CancelFunc
    43  }
    44  
    45  // NewReaper creates a new Reaper instance
    46  func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
    47  	return &Reaper{
    48  		Options:              opts,
    49  		measurementCh:        make(chan metrics.MeasurementEnvelope, 256),
    50  		measurementCache:     NewInstanceMetricCache(),
    51  		logger:               log.GetLogger(ctx),
    52  		monitoredSources:     make(sources.SourceConns, 0),
    53  		prevLoopMonitoredDBs: make(sources.SourceConns, 0),
    54  		cancelFuncs:          make(map[string]context.CancelFunc), // [db1+metric1]cancel()
    55  	}
    56  }
    57  
    58  // Ready() returns true if the service is healthy and operating correctly
    59  func (r *Reaper) Ready() bool {
    60  	return r.ready.Load()
    61  }
    62  
    63  func (r *Reaper) PrintMemStats() {
    64  	var m runtime.MemStats
    65  	runtime.ReadMemStats(&m)
    66  
    67  	bToKb := func(b uint64) uint64 {
    68  		return b / 1024
    69  	}
    70  	r.logger.Debugf("Alloc: %d Kb, TotalAlloc: %d Kb, Sys: %d Kb, NumGC: %d, HeapAlloc: %d Kb, HeapSys: %d Kb",
    71  		bToKb(m.Alloc), bToKb(m.TotalAlloc), bToKb(m.Sys), m.NumGC, bToKb(m.HeapAlloc), bToKb(m.HeapSys))
    72  }
    73  
    74  // Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements
    75  // from the sources and storing them to the sinks. It also manages the lifecycle of
    76  // the metric gatherers. In case of a source or metric definition change, it will
    77  // start or stop the gatherers accordingly.
    78  func (r *Reaper) Reap(ctx context.Context) {
    79  	var err error
    80  	logger := r.logger
    81  
    82  	go r.WriteMeasurements(ctx)
    83  
    84  	r.ready.Store(true)
    85  
    86  	for { //main loop
    87  		if r.Logging.LogLevel == "debug" {
    88  			r.PrintMemStats()
    89  		}
    90  		if err = r.LoadSources(ctx); err != nil {
    91  			logger.WithError(err).Error("could not refresh active sources, using last valid cache")
    92  		}
    93  		if err = r.LoadMetrics(); err != nil {
    94  			logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
    95  		}
    96  
    97  		// UpdateMonitoredDBCache(r.monitoredSources)
    98  		hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set
    99  		for _, monitoredSource := range r.monitoredSources {
   100  			srcL := logger.WithField("source", monitoredSource.Name)
   101  			ctx = log.WithLogger(ctx, srcL)
   102  
   103  			if monitoredSource.Connect(ctx, r.Sources) != nil {
   104  				r.WriteInstanceDown(monitoredSource)
   105  				srcL.Warning("could not init connection, retrying on next iteration")
   106  				continue
   107  			}
   108  
   109  			if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
   110  				srcL.WithError(err).Error("could not start metric gathering")
   111  				continue
   112  			}
   113  			srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
   114  			if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
   115  				srcL.Info("not added to monitoring due to 'master only' property")
   116  				if monitoredSource.IsPostgresSource() {
   117  					srcL.Info("to be removed from monitoring due to 'master only' property and status change")
   118  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
   119  				}
   120  				continue
   121  			}
   122  
   123  			if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   124  				metricsConfig = monitoredSource.MetricsStandby
   125  			} else {
   126  				metricsConfig = monitoredSource.Metrics
   127  			}
   128  
   129  			r.CreateSourceHelpers(ctx, srcL, monitoredSource)
   130  
   131  			if monitoredSource.IsPostgresSource() {
   132  				DBSizeMB := monitoredSource.ApproxDbSize / 1048576 // only remove from monitoring when we're certain it's under the threshold
   133  				if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
   134  					srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
   135  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true // for the case when DB size was previosly above the threshold
   136  					continue
   137  				}
   138  
   139  				lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
   140  				if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
   141  					if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   142  						srcL.Warning("Switching metrics collection to standby config...")
   143  						metricsConfig = monitoredSource.MetricsStandby
   144  					} else if !monitoredSource.IsInRecovery {
   145  						srcL.Warning("Switching metrics collection to primary config...")
   146  						metricsConfig = monitoredSource.Metrics
   147  					}
   148  					// else: it already has primary config do nothing + no warn
   149  				}
   150  			}
   151  			hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
   152  
   153  			for metricName, interval := range metricsConfig {
   154  				metricDefExists := false
   155  				var mvp metrics.Metric
   156  
   157  				mvp, metricDefExists = metricDefs.GetMetricDef(metricName)
   158  
   159  				dbMetric := monitoredSource.Name + dbMetricJoinStr + metricName
   160  				_, cancelFuncExists := r.cancelFuncs[dbMetric]
   161  
   162  				if metricDefExists && !cancelFuncExists { // initialize a new per db/per metric control channel
   163  					if interval > 0 {
   164  						srcL.WithField("metric", metricName).WithField("interval", interval).Info("starting gatherer")
   165  						metricCtx, cancelFunc := context.WithCancel(ctx)
   166  						r.cancelFuncs[dbMetric] = cancelFunc
   167  
   168  						metricNameForStorage := metricName
   169  						if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
   170  							metricNameForStorage = mvp.StorageName
   171  						}
   172  
   173  						if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, sinks.AddOp); err != nil {
   174  							srcL.Error(err)
   175  						}
   176  
   177  						go r.reapMetricMeasurements(metricCtx, monitoredSource, metricName)
   178  					}
   179  				} else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
   180  					// metric definition files were recently removed or interval set to zero
   181  					if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
   182  						cancelFunc()
   183  					}
   184  					srcL.WithField("metric", metricName).Warning("shutting down gatherer...")
   185  					delete(r.cancelFuncs, dbMetric)
   186  				} else if !metricDefExists {
   187  					epoch, ok := lastSQLFetchError.Load(metricName)
   188  					if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) { // complain only 1x per hour
   189  						srcL.WithField("metric", metricName).Warning("metric definition not found")
   190  						lastSQLFetchError.Store(metricName, time.Now().Unix())
   191  					}
   192  				}
   193  			}
   194  		}
   195  
   196  		r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
   197  
   198  		r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
   199  		select {
   200  		case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
   201  			logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
   202  		case <-ctx.Done():
   203  			return
   204  		}
   205  	}
   206  }
   207  
   208  // CreateSourceHelpers creates the extensions and metric helpers for the monitored source
   209  func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
   210  	if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
   211  		return // already created
   212  	}
   213  	if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
   214  		return // no need to create anything for non-postgres sources
   215  	}
   216  
   217  	if r.Sources.TryCreateListedExtsIfMissing > "" {
   218  		srcL.Info("trying to create extensions if missing")
   219  		extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
   220  		extsCreated, err := monitoredSource.TryCreateMissingExtensions(ctx, extsToCreate)
   221  		if err != nil {
   222  			srcL.Warning(err)
   223  		}
   224  		if extsCreated != "" {
   225  			srcL.Infof("%d/%d extensions created: %s", len(extsCreated), len(extsToCreate), extsCreated)
   226  		}
   227  	}
   228  
   229  	if r.Sources.CreateHelpers {
   230  		srcL.Info("trying to create helper objects if missing")
   231  		if err := monitoredSource.TryCreateMetricsHelpers(ctx, func(metric string) string {
   232  			if m, ok := metricDefs.GetMetricDef(metric); ok {
   233  				return m.InitSQL
   234  			}
   235  			return ""
   236  		}); err != nil {
   237  			srcL.Warning(err)
   238  		}
   239  	}
   240  
   241  }
   242  
   243  func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool) {
   244  	logger := r.logger
   245  	// loop over existing channels and stop workers if DB or metric removed from config
   246  	// or state change makes it uninteresting
   247  	logger.Debug("checking if any workers need to be shut down...")
   248  	for dbMetric, cancelFunc := range r.cancelFuncs {
   249  		var currentMetricConfig metrics.MetricIntervals
   250  		var md *sources.SourceConn
   251  		var dbRemovedFromConfig bool
   252  		var metricRemovedFromPreset bool
   253  		splits := strings.Split(dbMetric, dbMetricJoinStr)
   254  		db := splits[0]
   255  		metric := splits[1]
   256  
   257  		_, wholeDbShutDown := hostsToShutDown[db]
   258  		if !wholeDbShutDown {
   259  			md = r.monitoredSources.GetMonitoredDatabase(db)
   260  			if md == nil { // normal removing of DB from config
   261  				dbRemovedFromConfig = true
   262  				logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
   263  			}
   264  		}
   265  
   266  		// Detects metrics removed from a preset definition.
   267  		//
   268  		// If not using presets, a metric removed from configs will
   269  		// be detected earlier by `LoadSources()` as configs change that
   270  		// triggers a restart and get passed in `hostsToShutDown`.
   271  		if !(wholeDbShutDown || dbRemovedFromConfig) {
   272  			if md.IsInRecovery && len(md.MetricsStandby) > 0 {
   273  				currentMetricConfig = md.MetricsStandby
   274  			} else {
   275  				currentMetricConfig = md.Metrics
   276  			}
   277  			interval, isMetricActive := currentMetricConfig[metric]
   278  			metricRemovedFromPreset = !isMetricActive || interval <= 0
   279  		}
   280  
   281  		if ctx.Err() != nil || wholeDbShutDown || dbRemovedFromConfig || metricRemovedFromPreset {
   282  			logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
   283  			cancelFunc()
   284  			delete(r.cancelFuncs, dbMetric)
   285  			if err := r.SinksWriter.SyncMetric(db, metric, sinks.DeleteOp); err != nil {
   286  				logger.Error(err)
   287  			}
   288  		}
   289  	}
   290  
   291  	// Destroy conn pools and metric writers
   292  	r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDown)
   293  }
   294  
   295  func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
   296  	var lastUptimeS int64 = -1 // used for "server restarted" event detection
   297  	var lastErrorNotificationTime time.Time
   298  	var err error
   299  	var ok bool
   300  
   301  	failedFetches := 0
   302  	lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
   303  
   304  	l := log.GetLogger(ctx).WithField("metric", metricName)
   305  	ctx = log.WithLogger(ctx, l)
   306  
   307  	if metricName == specialMetricServerLogEventCounts {
   308  		lp, err := NewLogParser(ctx, md, r.measurementCh)
   309  		if err != nil {
   310  			l.WithError(err).Error("Failed to init log parser")
   311  			return
   312  		}
   313  		err = lp.ParseLogs()
   314  		if err != nil {
   315  			l.WithError(err).Error("Error parsing logs")
   316  		}
   317  		return
   318  	}
   319  
   320  	for {
   321  		interval := md.GetMetricInterval(metricName)
   322  		if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
   323  			// in case of errors just ignore metric "disabled" time ranges
   324  			if err = md.FetchRuntimeInfo(ctx, false); err != nil {
   325  				lastDBVersionFetchTime = time.Now()
   326  			}
   327  
   328  			if _, ok = metricDefs.GetMetricDef(metricName); !ok {
   329  				l.WithField("source", md.Name).Error("metric definition not found")
   330  				return
   331  			}
   332  		}
   333  
   334  		var metricStoreMessages *metrics.MeasurementEnvelope
   335  
   336  		t1 := time.Now()
   337  		// 1st try local overrides for system metrics if operating on the same host
   338  		if IsDirectlyFetchableMetric(md, metricName) {
   339  			if metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName); err != nil {
   340  				l.WithError(err).Errorf("Could not read metric directly from OS")
   341  			}
   342  		}
   343  		if metricStoreMessages == nil {
   344  			metricStoreMessages, err = r.FetchMetric(ctx, md, metricName)
   345  		}
   346  
   347  		if time.Since(t1) > interval {
   348  			l.Warningf("Total fetching time of %v bigger than %v interval", time.Since(t1), interval)
   349  		}
   350  
   351  		if err != nil {
   352  			failedFetches++
   353  			// complain only 1x per 10min per host/metric...
   354  			if time.Since(lastErrorNotificationTime) > time.Minute*10 {
   355  				l.WithError(err).WithField("count", failedFetches).Error("failed to fetch metric data")
   356  				lastErrorNotificationTime = time.Now()
   357  			}
   358  		} else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
   359  			r.measurementCh <- *metricStoreMessages
   360  			// pick up "server restarted" events here to avoid doing extra selects from CheckForPGObjectChangesAndStore code
   361  			if metricName == "db_stats" {
   362  				postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
   363  				if ok {
   364  					if lastUptimeS != -1 {
   365  						if postmasterUptimeS.(int64) < lastUptimeS { // restart (or possibly also failover when host is routed) happened
   366  							message := "Detected server restart (or failover)"
   367  							l.Warning(message)
   368  							detectedChangesSummary := make(metrics.Measurements, 0)
   369  							entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
   370  							entry["details"] = message
   371  							detectedChangesSummary = append(detectedChangesSummary, entry)
   372  							r.measurementCh <- metrics.MeasurementEnvelope{
   373  								DBName:     md.Name,
   374  								MetricName: "object_changes",
   375  								Data:       detectedChangesSummary,
   376  								CustomTags: metricStoreMessages.CustomTags,
   377  							}
   378  						}
   379  					}
   380  					lastUptimeS = postmasterUptimeS.(int64)
   381  				}
   382  			}
   383  		}
   384  
   385  		select {
   386  		case <-ctx.Done():
   387  			return
   388  		case <-time.After(interval):
   389  			// continue
   390  		}
   391  	}
   392  }
   393  
   394  // LoadSources loads sources from the reader
   395  func (r *Reaper) LoadSources(ctx context.Context) (err error) {
   396  	if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
   397  		r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
   398  		r.monitoredSources = make(sources.SourceConns, 0)
   399  		return nil
   400  	}
   401  
   402  	var newSrcs sources.SourceConns
   403  	srcs, err := r.SourcesReaderWriter.GetSources()
   404  	if err != nil {
   405  		return err
   406  	}
   407  	srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
   408  		return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group)
   409  	})
   410  	if newSrcs, err = srcs.ResolveDatabases(); err != nil {
   411  		r.logger.WithError(err).Error("could not resolve databases from sources")
   412  	}
   413  
   414  	for i, newMD := range newSrcs {
   415  		md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
   416  		if md == nil {
   417  			continue
   418  		}
   419  		if md.Equal(newMD.Source) {
   420  			// replace with the existing connection if the source is the same
   421  			newSrcs[i] = md
   422  			continue
   423  		}
   424  		// Source configs changed, stop all running gatherers to trigger a restart
   425  		// TODO: Optimize this for single metric addition/deletion/interval-change cases to not do a full restart
   426  		r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...")
   427  		r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true})
   428  	}
   429  	r.monitoredSources = newSrcs
   430  	r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
   431  	return nil
   432  }
   433  
   434  // WriteInstanceDown writes instance_up = 0 metric to sinks for the given source
   435  func (r *Reaper) WriteInstanceDown(md *sources.SourceConn) {
   436  	r.measurementCh <- metrics.MeasurementEnvelope{
   437  		DBName:     md.Name,
   438  		MetricName: specialMetricInstanceUp,
   439  		Data: metrics.Measurements{metrics.Measurement{
   440  			metrics.EpochColumnName: time.Now().UnixNano(),
   441  			specialMetricInstanceUp: 0},
   442  		},
   443  	}
   444  }
   445  
   446  // WriteMeasurements() writes the metrics to the sinks
   447  func (r *Reaper) WriteMeasurements(ctx context.Context) {
   448  	var err error
   449  	for {
   450  		select {
   451  		case <-ctx.Done():
   452  			return
   453  		case msg := <-r.measurementCh:
   454  			if err = r.SinksWriter.Write(msg); err != nil {
   455  				r.logger.Error(err)
   456  			}
   457  		}
   458  	}
   459  }
   460  
   461  func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
   462  	for _, dr := range data {
   463  		if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
   464  			dr[r.Sinks.RealDbnameField] = md.RealDbname
   465  		}
   466  		if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
   467  			dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
   468  		}
   469  	}
   470  }
   471  
   472  func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string) (_ *metrics.MeasurementEnvelope, err error) {
   473  	var sql string
   474  	var data metrics.Measurements
   475  	var metric metrics.Metric
   476  	var fromCache bool
   477  	var cacheKey string
   478  	var ok bool
   479  
   480  	if metric, ok = metricDefs.GetMetricDef(metricName); !ok {
   481  		return nil, metrics.ErrMetricNotFound
   482  	}
   483  	l := log.GetLogger(ctx)
   484  
   485  	if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && md.GetMetricInterval(metricName) < r.Metrics.CacheAge() {
   486  		cacheKey = fmt.Sprintf("%s:%s", md.GetClusterIdentifier(), metricName)
   487  	}
   488  	data = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge())
   489  	fromCache = len(data) > 0
   490  	if !fromCache {
   491  		if (metric.PrimaryOnly() && md.IsInRecovery) || (metric.StandbyOnly() && !md.IsInRecovery) {
   492  			l.Debug("Skipping fetching of as server in wrong IsInRecovery: ", md.IsInRecovery)
   493  			return nil, nil
   494  		}
   495  		switch metricName {
   496  		case specialMetricChangeEvents:
   497  			data, err = r.GetObjectChangesMeasurement(ctx, md)
   498  		case specialMetricInstanceUp:
   499  			data, err = r.GetInstanceUpMeasurement(ctx, md)
   500  		default:
   501  			sql = metric.GetSQL(md.Version)
   502  			if sql == "" {
   503  				l.WithField("source", md.Name).WithField("version", md.Version).Warning("no SQL found for metric version")
   504  				return nil, nil
   505  			}
   506  			data, err = QueryMeasurements(ctx, md, sql)
   507  		}
   508  		if err != nil || len(data) == 0 {
   509  			return nil, err
   510  		}
   511  		r.measurementCache.Put(cacheKey, data)
   512  	}
   513  	r.AddSysinfoToMeasurements(data, md)
   514  	l.WithField("cache", fromCache).WithField("rows", len(data)).Info("measurements fetched")
   515  	return &metrics.MeasurementEnvelope{
   516  		DBName:     md.Name,
   517  		MetricName: cmp.Or(metric.StorageName, metricName),
   518  		Data:       data,
   519  		CustomTags: md.CustomTags}, nil
   520  }
   521