...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/reaper/reaper.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"cmp"
     5  	"context"
     6  	"fmt"
     7  	"runtime"
     8  	"slices"
     9  	"strings"
    10  	"time"
    11  
    12  	"sync/atomic"
    13  
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/cmdopts"
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    17  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
    18  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    19  )
    20  
    21  const (
    22  	specialMetricChangeEvents         = "change_events"
    23  	specialMetricServerLogEventCounts = "server_log_event_counts"
    24  	specialMetricInstanceUp           = "instance_up"
    25  )
    26  
    27  var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
    28  
    29  var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
    30  var metricsConfig map[string]float64                      // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
    31  var metricDefs = NewConcurrentMetricDefs()
    32  
    33  // Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
    34  type Reaper struct {
    35  	*cmdopts.Options
    36  	ready                atomic.Bool
    37  	measurementCh        chan metrics.MeasurementEnvelope
    38  	measurementCache     *InstanceMetricCache
    39  	logger               log.Logger
    40  	monitoredSources     sources.SourceConns
    41  	prevLoopMonitoredDBs sources.SourceConns
    42  	cancelFuncs          map[string]context.CancelFunc
    43  }
    44  
    45  // NewReaper creates a new Reaper instance
    46  func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
    47  	return &Reaper{
    48  		Options:              opts,
    49  		measurementCh:        make(chan metrics.MeasurementEnvelope, 256),
    50  		measurementCache:     NewInstanceMetricCache(),
    51  		logger:               log.GetLogger(ctx),
    52  		monitoredSources:     make(sources.SourceConns, 0),
    53  		prevLoopMonitoredDBs: make(sources.SourceConns, 0),
    54  		cancelFuncs:          make(map[string]context.CancelFunc), // [db1+metric1]cancel()
    55  	}
    56  }
    57  
    58  // Ready() returns true if the service is healthy and operating correctly
    59  func (r *Reaper) Ready() bool {
    60  	return r.ready.Load()
    61  }
    62  
    63  func (r *Reaper) PrintMemStats() {
    64  	var m runtime.MemStats
    65  	runtime.ReadMemStats(&m)
    66  
    67  	bToKb := func(b uint64) uint64 {
    68  		return b / 1024
    69  	}
    70  	r.logger.Debugf("Alloc: %d Kb, TotalAlloc: %d Kb, Sys: %d Kb, NumGC: %d, HeapAlloc: %d Kb, HeapSys: %d Kb",
    71  		bToKb(m.Alloc), bToKb(m.TotalAlloc), bToKb(m.Sys), m.NumGC, bToKb(m.HeapAlloc), bToKb(m.HeapSys))
    72  }
    73  
    74  // Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements
    75  // from the sources and storing them to the sinks. It also manages the lifecycle of
    76  // the metric gatherers. In case of a source or metric definition change, it will
    77  // start or stop the gatherers accordingly.
    78  func (r *Reaper) Reap(ctx context.Context) {
    79  	var err error
    80  	logger := r.logger
    81  
    82  	go r.WriteMeasurements(ctx)
    83  	go r.WriteMonitoredSources(ctx)
    84  
    85  	r.ready.Store(true)
    86  
    87  	for { //main loop
    88  		if r.Logging.LogLevel == "debug" {
    89  			r.PrintMemStats()
    90  		}
    91  		if err = r.LoadSources(); err != nil {
    92  			logger.WithError(err).Error("could not refresh active sources, using last valid cache")
    93  		}
    94  		if err = r.LoadMetrics(); err != nil {
    95  			logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
    96  		}
    97  
    98  		// UpdateMonitoredDBCache(r.monitoredSources)
    99  		hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set
   100  		for _, monitoredSource := range r.monitoredSources {
   101  			srcL := logger.WithField("source", monitoredSource.Name)
   102  			ctx = log.WithLogger(ctx, srcL)
   103  
   104  			if monitoredSource.Connect(ctx, r.Sources) != nil {
   105  				srcL.Warning("could not init connection, retrying on next iteration")
   106  				continue
   107  			}
   108  
   109  			if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
   110  				srcL.WithError(err).Error("could not start metric gathering")
   111  				continue
   112  			}
   113  			srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
   114  			if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
   115  				srcL.Info("not added to monitoring due to 'master only' property")
   116  				if monitoredSource.IsPostgresSource() {
   117  					srcL.Info("to be removed from monitoring due to 'master only' property and status change")
   118  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
   119  				}
   120  				continue
   121  			}
   122  
   123  			if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   124  				metricsConfig = monitoredSource.MetricsStandby
   125  			} else {
   126  				metricsConfig = monitoredSource.Metrics
   127  			}
   128  
   129  			r.CreateSourceHelpers(ctx, srcL, monitoredSource)
   130  
   131  			if monitoredSource.IsPostgresSource() {
   132  				DBSizeMB := monitoredSource.ApproxDbSize / 1048576 // only remove from monitoring when we're certain it's under the threshold
   133  				if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
   134  					srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
   135  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true // for the case when DB size was previosly above the threshold
   136  					continue
   137  				}
   138  
   139  				lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
   140  				if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
   141  					if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   142  						srcL.Warning("Switching metrics collection to standby config...")
   143  						metricsConfig = monitoredSource.MetricsStandby
   144  					} else if !monitoredSource.IsInRecovery {
   145  						srcL.Warning("Switching metrics collection to primary config...")
   146  						metricsConfig = monitoredSource.Metrics
   147  					}
   148  					// else: it already has primary config do nothing + no warn
   149  				}
   150  			}
   151  			hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
   152  
   153  			for metricName, interval := range metricsConfig {
   154  				metricDefExists := false
   155  				var mvp metrics.Metric
   156  
   157  				mvp, metricDefExists = metricDefs.GetMetricDef(metricName)
   158  
   159  				dbMetric := monitoredSource.Name + dbMetricJoinStr + metricName
   160  				_, cancelFuncExists := r.cancelFuncs[dbMetric]
   161  
   162  				if metricDefExists && !cancelFuncExists { // initialize a new per db/per metric control channel
   163  					if interval > 0 {
   164  						srcL.WithField("metric", metricName).WithField("interval", interval).Info("starting gatherer")
   165  						metricCtx, cancelFunc := context.WithCancel(ctx)
   166  						r.cancelFuncs[dbMetric] = cancelFunc
   167  
   168  						metricNameForStorage := metricName
   169  						if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
   170  							metricNameForStorage = mvp.StorageName
   171  						}
   172  
   173  						if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, sinks.AddOp); err != nil {
   174  							srcL.Error(err)
   175  						}
   176  
   177  						go r.reapMetricMeasurements(metricCtx, monitoredSource, metricName)
   178  					}
   179  				} else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
   180  					// metric definition files were recently removed or interval set to zero
   181  					if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
   182  						cancelFunc()
   183  					}
   184  					srcL.WithField("metric", metricName).Warning("shutting down gatherer...")
   185  					delete(r.cancelFuncs, dbMetric)
   186  				} else if !metricDefExists {
   187  					epoch, ok := lastSQLFetchError.Load(metricName)
   188  					if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) { // complain only 1x per hour
   189  						srcL.WithField("metric", metricName).Warning("metric definition not found")
   190  						lastSQLFetchError.Store(metricName, time.Now().Unix())
   191  					}
   192  				}
   193  			}
   194  		}
   195  
   196  		r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
   197  
   198  		r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
   199  		select {
   200  		case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
   201  			logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
   202  		case <-ctx.Done():
   203  			return
   204  		}
   205  	}
   206  }
   207  
   208  // CreateSourceHelpers creates the extensions and metric helpers for the monitored source
   209  func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
   210  	if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
   211  		return // already created
   212  	}
   213  	if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
   214  		return // no need to create anything for non-postgres sources
   215  	}
   216  
   217  	if r.Sources.TryCreateListedExtsIfMissing > "" {
   218  		srcL.Info("trying to create extensions if missing")
   219  		extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
   220  		monitoredSource.RLock()
   221  		extsCreated := TryCreateMissingExtensions(ctx, monitoredSource, extsToCreate, monitoredSource.Extensions)
   222  		monitoredSource.RUnlock()
   223  		srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
   224  	}
   225  
   226  	if r.Sources.CreateHelpers {
   227  		srcL.Info("trying to create helper objects if missing")
   228  		if err := TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
   229  			srcL.WithError(err).Warning("failed to create helper functions")
   230  		}
   231  	}
   232  
   233  }
   234  
   235  func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRoleChange map[string]bool) {
   236  	logger := r.logger
   237  	// loop over existing channels and stop workers if DB or metric removed from config
   238  	// or state change makes it uninteresting
   239  	logger.Debug("checking if any workers need to be shut down...")
   240  	for dbMetric, cancelFunc := range r.cancelFuncs {
   241  		var currentMetricConfig map[string]float64
   242  		var md *sources.SourceConn
   243  		var dbRemovedFromConfig bool
   244  		singleMetricDisabled := false
   245  		splits := strings.Split(dbMetric, dbMetricJoinStr)
   246  		db := splits[0]
   247  		metric := splits[1]
   248  
   249  		_, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
   250  		if !wholeDbShutDownDueToRoleChange {
   251  			md = r.monitoredSources.GetMonitoredDatabase(db)
   252  			if md == nil { // normal removing of DB from config
   253  				dbRemovedFromConfig = true
   254  				logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
   255  			}
   256  		}
   257  
   258  		if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) { // maybe some single metric was disabled
   259  			if md.IsInRecovery && len(md.MetricsStandby) > 0 {
   260  				currentMetricConfig = md.MetricsStandby
   261  			} else {
   262  				currentMetricConfig = md.Metrics
   263  			}
   264  			interval, isMetricActive := currentMetricConfig[metric]
   265  			singleMetricDisabled = !isMetricActive || interval <= 0
   266  		}
   267  
   268  		if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
   269  			logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
   270  			cancelFunc()
   271  			delete(r.cancelFuncs, dbMetric)
   272  			if err := r.SinksWriter.SyncMetric(db, metric, sinks.DeleteOp); err != nil {
   273  				logger.Error(err)
   274  			}
   275  		}
   276  	}
   277  
   278  	// Destroy conn pools and metric writers
   279  	r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDownDueToRoleChange)
   280  }
   281  
   282  func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
   283  	var lastUptimeS int64 = -1 // used for "server restarted" event detection
   284  	var lastErrorNotificationTime time.Time
   285  	var err error
   286  	var ok bool
   287  
   288  	failedFetches := 0
   289  	lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
   290  
   291  	l := log.GetLogger(ctx).WithField("metric", metricName)
   292  	ctx = log.WithLogger(ctx, l)
   293  
   294  	if metricName == specialMetricServerLogEventCounts {
   295  		metrics.ParseLogs(ctx, md, md.RealDbname, md.GetMetricInterval(metricName), r.measurementCh, "", "")
   296  		return
   297  	}
   298  
   299  	for {
   300  		interval := md.GetMetricInterval(metricName)
   301  		if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
   302  			// in case of errors just ignore metric "disabled" time ranges
   303  			if err = md.FetchRuntimeInfo(ctx, false); err != nil {
   304  				lastDBVersionFetchTime = time.Now()
   305  			}
   306  
   307  			if _, ok = metricDefs.GetMetricDef(metricName); !ok {
   308  				l.Error("Could not get metric version properties")
   309  				return
   310  			}
   311  		}
   312  
   313  		var metricStoreMessages *metrics.MeasurementEnvelope
   314  
   315  		// 1st try local overrides for some metrics if operating in push mode
   316  		if r.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
   317  			metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName)
   318  			if err != nil {
   319  				l.WithError(err).Errorf("Could not read metric directly from OS")
   320  			}
   321  		}
   322  		t1 := time.Now()
   323  		if metricStoreMessages == nil {
   324  			metricStoreMessages, err = r.FetchMetric(ctx, md, metricName)
   325  		}
   326  
   327  		if time.Since(t1) > (time.Second * time.Duration(interval)) {
   328  			l.Warningf("Total fetching time of %v bigger than %vs interval", time.Since(t1), interval)
   329  		}
   330  
   331  		if err != nil {
   332  			failedFetches++
   333  			// complain only 1x per 10min per host/metric...
   334  			if time.Since(lastErrorNotificationTime) > time.Minute*10 {
   335  				l.WithError(err).WithField("count", failedFetches).Error("failed to fetch metric data")
   336  				lastErrorNotificationTime = time.Now()
   337  			}
   338  		} else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
   339  			r.measurementCh <- *metricStoreMessages
   340  			// pick up "server restarted" events here to avoid doing extra selects from CheckForPGObjectChangesAndStore code
   341  			if metricName == "db_stats" {
   342  				postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
   343  				if ok {
   344  					if lastUptimeS != -1 {
   345  						if postmasterUptimeS.(int64) < lastUptimeS { // restart (or possibly also failover when host is routed) happened
   346  							message := "Detected server restart (or failover)"
   347  							l.Warning(message)
   348  							detectedChangesSummary := make(metrics.Measurements, 0)
   349  							entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
   350  							entry["details"] = message
   351  							detectedChangesSummary = append(detectedChangesSummary, entry)
   352  							r.measurementCh <- metrics.MeasurementEnvelope{
   353  								DBName:     md.Name,
   354  								MetricName: "object_changes",
   355  								Data:       detectedChangesSummary,
   356  								CustomTags: metricStoreMessages.CustomTags,
   357  							}
   358  						}
   359  					}
   360  					lastUptimeS = postmasterUptimeS.(int64)
   361  				}
   362  			}
   363  		}
   364  
   365  		select {
   366  		case <-ctx.Done():
   367  			return
   368  		case <-time.After(time.Second * time.Duration(interval)):
   369  			// continue
   370  		}
   371  	}
   372  }
   373  
   374  // LoadSources loads sources from the reader
   375  func (r *Reaper) LoadSources() (err error) {
   376  	if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
   377  		r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
   378  		r.monitoredSources = make(sources.SourceConns, 0)
   379  		return nil
   380  	}
   381  
   382  	var newSrcs sources.SourceConns
   383  	srcs, err := r.SourcesReaderWriter.GetSources()
   384  	if err != nil {
   385  		return err
   386  	}
   387  	srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
   388  		return !s.IsEnabled || len(r.Sources.Groups) > 0 && !s.IsDefaultGroup() && !slices.Contains(r.Sources.Groups, s.Group)
   389  	})
   390  	if newSrcs, err = srcs.ResolveDatabases(); err != nil {
   391  		r.logger.WithError(err).Error("could not resolve databases from sources")
   392  	}
   393  
   394  	for i, newMD := range newSrcs {
   395  		md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
   396  		if md == nil {
   397  			continue
   398  		}
   399  		if md.Equal(newMD.Source) {
   400  			// replace with the existing connection if the source is the same
   401  			newSrcs[i] = md
   402  			continue
   403  		}
   404  	}
   405  	r.monitoredSources = newSrcs
   406  	r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
   407  	return nil
   408  }
   409  
   410  // WriteMonitoredSources writes actively monitored DBs listing to sinks
   411  // every monitoredDbsDatastoreSyncIntervalSeconds (default 10min)
   412  func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
   413  	for {
   414  		if len(r.monitoredSources) > 0 {
   415  			now := time.Now().UnixNano()
   416  			for _, mdb := range r.monitoredSources {
   417  				db := metrics.NewMeasurement(now)
   418  				db["tag_group"] = mdb.Group
   419  				db["master_only"] = mdb.OnlyIfMaster
   420  				for k, v := range mdb.CustomTags {
   421  					db[metrics.TagPrefix+k] = v
   422  				}
   423  				r.measurementCh <- metrics.MeasurementEnvelope{
   424  					DBName:     mdb.Name,
   425  					MetricName: monitoredDbsDatastoreSyncMetricName,
   426  					Data:       metrics.Measurements{db},
   427  				}
   428  			}
   429  		}
   430  		select {
   431  		case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
   432  			// continue
   433  		case <-ctx.Done():
   434  			return
   435  		}
   436  	}
   437  }
   438  
   439  // WriteMeasurements() writes the metrics to the sinks
   440  func (r *Reaper) WriteMeasurements(ctx context.Context) {
   441  	var err error
   442  	for {
   443  		select {
   444  		case <-ctx.Done():
   445  			return
   446  		case msg := <-r.measurementCh:
   447  			if err = r.SinksWriter.Write(msg); err != nil {
   448  				r.logger.Error(err)
   449  			}
   450  		}
   451  	}
   452  }
   453  
   454  func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
   455  	for _, dr := range data {
   456  		if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
   457  			dr[r.Sinks.RealDbnameField] = md.RealDbname
   458  		}
   459  		if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
   460  			dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
   461  		}
   462  	}
   463  }
   464  
   465  func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string) (_ *metrics.MeasurementEnvelope, err error) {
   466  	var sql string
   467  	var data metrics.Measurements
   468  	var metric metrics.Metric
   469  	var fromCache bool
   470  	var cacheKey string
   471  	var ok bool
   472  
   473  	if metric, ok = metricDefs.GetMetricDef(metricName); !ok {
   474  		return nil, metrics.ErrMetricNotFound
   475  	}
   476  	l := log.GetLogger(ctx)
   477  
   478  	if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && time.Second*time.Duration(md.GetMetricInterval(metricName)) < r.Metrics.CacheAge() {
   479  		cacheKey = fmt.Sprintf("%s:%s", md.GetClusterIdentifier(), metricName)
   480  	}
   481  	data = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge())
   482  	fromCache = len(data) > 0
   483  	if !fromCache {
   484  		if (metric.PrimaryOnly() && md.IsInRecovery) || (metric.StandbyOnly() && !md.IsInRecovery) {
   485  			l.Debug("Skipping fetching of as server in wrong IsInRecovery: ", md.IsInRecovery)
   486  			return nil, nil
   487  		}
   488  		switch metricName {
   489  		case specialMetricChangeEvents:
   490  			data, err = r.GetObjectChangesMeasurement(ctx, md)
   491  		case specialMetricInstanceUp:
   492  			data, err = r.GetInstanceUpMeasurement(ctx, md)
   493  		default:
   494  			sql = metric.GetSQL(md.Version)
   495  			if sql == "" {
   496  				l.Warning("Ignoring fetching because of empty SQL")
   497  				return nil, nil
   498  			}
   499  			data, err = QueryMeasurements(ctx, md, sql)
   500  		}
   501  		if err != nil || len(data) == 0 {
   502  			return nil, err
   503  		}
   504  		r.measurementCache.Put(cacheKey, data)
   505  	}
   506  	r.AddSysinfoToMeasurements(data, md)
   507  	l.WithField("cache", fromCache).WithField("rows", len(data)).Info("measurements fetched")
   508  	return &metrics.MeasurementEnvelope{
   509  		DBName:     md.Name,
   510  		MetricName: cmp.Or(metric.StorageName, metricName),
   511  		Data:       data,
   512  		CustomTags: md.CustomTags}, nil
   513  }
   514