...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/reaper/reaper.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"cmp"
     5  	"context"
     6  	"fmt"
     7  	"runtime"
     8  	"slices"
     9  	"strings"
    10  	"time"
    11  
    12  	"sync/atomic"
    13  
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/cmdopts"
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    17  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
    18  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    19  )
    20  
    21  const (
    22  	specialMetricChangeEvents         = "change_events"
    23  	specialMetricServerLogEventCounts = "server_log_event_counts"
    24  	specialMetricInstanceUp           = "instance_up"
    25  )
    26  
    27  var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
    28  
    29  var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
    30  var metricsConfig map[string]float64                      // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
    31  var metricDefs = NewConcurrentMetricDefs()
    32  
    33  // Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
    34  type Reaper struct {
    35  	*cmdopts.Options
    36  	ready                atomic.Bool
    37  	measurementCh        chan metrics.MeasurementEnvelope
    38  	measurementCache     *InstanceMetricCache
    39  	logger               log.Logger
    40  	monitoredSources     sources.SourceConns
    41  	prevLoopMonitoredDBs sources.SourceConns
    42  	cancelFuncs          map[string]context.CancelFunc
    43  }
    44  
    45  // NewReaper creates a new Reaper instance
    46  func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
    47  	return &Reaper{
    48  		Options:              opts,
    49  		measurementCh:        make(chan metrics.MeasurementEnvelope, 256),
    50  		measurementCache:     NewInstanceMetricCache(),
    51  		logger:               log.GetLogger(ctx),
    52  		monitoredSources:     make(sources.SourceConns, 0),
    53  		prevLoopMonitoredDBs: make(sources.SourceConns, 0),
    54  		cancelFuncs:          make(map[string]context.CancelFunc), // [db1+metric1]cancel()
    55  	}
    56  }
    57  
    58  // Ready() returns true if the service is healthy and operating correctly
    59  func (r *Reaper) Ready() bool {
    60  	return r.ready.Load()
    61  }
    62  
    63  func (r *Reaper) PrintMemStats() {
    64  	var m runtime.MemStats
    65  	runtime.ReadMemStats(&m)
    66  
    67  	bToKb := func(b uint64) uint64 {
    68  		return b / 1024
    69  	}
    70  	r.logger.Debugf("Alloc: %d Kb, TotalAlloc: %d Kb, Sys: %d Kb, NumGC: %d, HeapAlloc: %d Kb, HeapSys: %d Kb",
    71  		bToKb(m.Alloc), bToKb(m.TotalAlloc), bToKb(m.Sys), m.NumGC, bToKb(m.HeapAlloc), bToKb(m.HeapSys))
    72  }
    73  
    74  // Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements
    75  // from the sources and storing them to the sinks. It also manages the lifecycle of
    76  // the metric gatherers. In case of a source or metric definition change, it will
    77  // start or stop the gatherers accordingly.
    78  func (r *Reaper) Reap(ctx context.Context) {
    79  	var err error
    80  	logger := r.logger
    81  
    82  	go r.WriteMeasurements(ctx)
    83  	go r.WriteMonitoredSources(ctx)
    84  
    85  	r.ready.Store(true)
    86  
    87  	for { //main loop
    88  		if r.Logging.LogLevel == "debug" {
    89  			r.PrintMemStats()
    90  		}
    91  		if err = r.LoadSources(); err != nil {
    92  			logger.WithError(err).Error("could not refresh active sources, using last valid cache")
    93  		}
    94  		if err = r.LoadMetrics(); err != nil {
    95  			logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
    96  		}
    97  
    98  		// UpdateMonitoredDBCache(r.monitoredSources)
    99  		hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set
   100  		for _, monitoredSource := range r.monitoredSources {
   101  			srcL := logger.WithField("source", monitoredSource.Name)
   102  
   103  			if monitoredSource.Connect(ctx, r.Sources) != nil {
   104  				srcL.Warning("could not init connection, retrying on next iteration")
   105  				continue
   106  			}
   107  
   108  			if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
   109  				srcL.WithError(err).Error("could not start metric gathering")
   110  				continue
   111  			}
   112  			srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
   113  			if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
   114  				srcL.Info("not added to monitoring due to 'master only' property")
   115  				if monitoredSource.IsPostgresSource() {
   116  					srcL.Info("to be removed from monitoring due to 'master only' property and status change")
   117  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
   118  				}
   119  				continue
   120  			}
   121  
   122  			if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   123  				metricsConfig = monitoredSource.MetricsStandby
   124  			} else {
   125  				metricsConfig = monitoredSource.Metrics
   126  			}
   127  
   128  			r.CreateSourceHelpers(ctx, srcL, monitoredSource)
   129  
   130  			if monitoredSource.IsPostgresSource() {
   131  				DBSizeMB := monitoredSource.ApproxDbSize / 1048576 // only remove from monitoring when we're certain it's under the threshold
   132  				if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
   133  					srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
   134  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true // for the case when DB size was previosly above the threshold
   135  					continue
   136  				}
   137  
   138  				lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
   139  				if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
   140  					if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   141  						srcL.Warning("Switching metrics collection to standby config...")
   142  						metricsConfig = monitoredSource.MetricsStandby
   143  					} else if !monitoredSource.IsInRecovery {
   144  						srcL.Warning("Switching metrics collection to primary config...")
   145  						metricsConfig = monitoredSource.Metrics
   146  					}
   147  					// else: it already has primary config do nothing + no warn
   148  				}
   149  			}
   150  			hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
   151  
   152  			for metricName, interval := range metricsConfig {
   153  				metricDefExists := false
   154  				var mvp metrics.Metric
   155  
   156  				mvp, metricDefExists = metricDefs.GetMetricDef(metricName)
   157  
   158  				dbMetric := monitoredSource.Name + dbMetricJoinStr + metricName
   159  				_, cancelFuncExists := r.cancelFuncs[dbMetric]
   160  
   161  				if metricDefExists && !cancelFuncExists { // initialize a new per db/per metric control channel
   162  					if interval > 0 {
   163  						srcL.WithField("metric", metricName).WithField("interval", interval).Info("starting gatherer")
   164  						metricCtx, cancelFunc := context.WithCancel(ctx)
   165  						r.cancelFuncs[dbMetric] = cancelFunc
   166  
   167  						metricNameForStorage := metricName
   168  						if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
   169  							metricNameForStorage = mvp.StorageName
   170  						}
   171  
   172  						if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, sinks.AddOp); err != nil {
   173  							srcL.Error(err)
   174  						}
   175  
   176  						go r.reapMetricMeasurements(metricCtx, monitoredSource, metricName)
   177  					}
   178  				} else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
   179  					// metric definition files were recently removed or interval set to zero
   180  					if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
   181  						cancelFunc()
   182  					}
   183  					srcL.WithField("metric", metricName).Warning("shutting down gatherer...")
   184  					delete(r.cancelFuncs, dbMetric)
   185  				} else if !metricDefExists {
   186  					epoch, ok := lastSQLFetchError.Load(metricName)
   187  					if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) { // complain only 1x per hour
   188  						srcL.WithField("metric", metricName).Warning("metric definition not found")
   189  						lastSQLFetchError.Store(metricName, time.Now().Unix())
   190  					}
   191  				}
   192  			}
   193  		}
   194  
   195  		r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
   196  
   197  		r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
   198  		select {
   199  		case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
   200  			logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
   201  		case <-ctx.Done():
   202  			return
   203  		}
   204  	}
   205  }
   206  
   207  // CreateSourceHelpers creates the extensions and metric helpers for the monitored source
   208  func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
   209  	if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
   210  		return // already created
   211  	}
   212  	if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
   213  		return // no need to create anything for non-postgres sources
   214  	}
   215  
   216  	if r.Sources.TryCreateListedExtsIfMissing > "" {
   217  		srcL.Info("trying to create extensions if missing")
   218  		extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
   219  		extsCreated := TryCreateMissingExtensions(ctx, monitoredSource, extsToCreate, monitoredSource.Extensions)
   220  		srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
   221  	}
   222  
   223  	if r.Sources.CreateHelpers {
   224  		srcL.Info("trying to create helper objects if missing")
   225  		if err := TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
   226  			srcL.WithError(err).Warning("failed to create helper functions")
   227  		}
   228  	}
   229  
   230  }
   231  
   232  func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRoleChange map[string]bool) {
   233  	logger := r.logger
   234  	// loop over existing channels and stop workers if DB or metric removed from config
   235  	// or state change makes it uninteresting
   236  	logger.Debug("checking if any workers need to be shut down...")
   237  	for dbMetric, cancelFunc := range r.cancelFuncs {
   238  		var currentMetricConfig map[string]float64
   239  		var md *sources.SourceConn
   240  		var dbRemovedFromConfig bool
   241  		singleMetricDisabled := false
   242  		splits := strings.Split(dbMetric, dbMetricJoinStr)
   243  		db := splits[0]
   244  		metric := splits[1]
   245  
   246  		_, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
   247  		if !wholeDbShutDownDueToRoleChange {
   248  			md = r.monitoredSources.GetMonitoredDatabase(db)
   249  			if md == nil { // normal removing of DB from config
   250  				dbRemovedFromConfig = true
   251  				logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
   252  			}
   253  		}
   254  
   255  		if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) { // maybe some single metric was disabled
   256  			if md.IsInRecovery && len(md.MetricsStandby) > 0 {
   257  				currentMetricConfig = md.MetricsStandby
   258  			} else {
   259  				currentMetricConfig = md.Metrics
   260  			}
   261  			interval, isMetricActive := currentMetricConfig[metric]
   262  			singleMetricDisabled = !isMetricActive || interval <= 0
   263  		}
   264  
   265  		if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
   266  			logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
   267  			cancelFunc()
   268  			delete(r.cancelFuncs, dbMetric)
   269  			if err := r.SinksWriter.SyncMetric(db, metric, sinks.DeleteOp); err != nil {
   270  				logger.Error(err)
   271  			}
   272  		}
   273  	}
   274  
   275  	// Destroy conn pools and metric writers
   276  	r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDownDueToRoleChange)
   277  }
   278  
   279  func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
   280  	hostState := make(map[string]map[string]string)
   281  	var lastUptimeS int64 = -1 // used for "server restarted" event detection
   282  	var lastErrorNotificationTime time.Time
   283  	var err error
   284  	var ok bool
   285  
   286  	failedFetches := 0
   287  	lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
   288  
   289  	l := r.logger.WithField("source", md.Name).WithField("metric", metricName)
   290  	if metricName == specialMetricServerLogEventCounts {
   291  		metrics.ParseLogs(ctx, md, md.RealDbname, md.GetMetricInterval(metricName), r.measurementCh, "", "")
   292  		return
   293  	}
   294  
   295  	for {
   296  		interval := md.GetMetricInterval(metricName)
   297  		if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
   298  			// in case of errors just ignore metric "disabled" time ranges
   299  			if err = md.FetchRuntimeInfo(ctx, false); err != nil {
   300  				lastDBVersionFetchTime = time.Now()
   301  			}
   302  
   303  			if _, ok = metricDefs.GetMetricDef(metricName); !ok {
   304  				l.Error("Could not get metric version properties")
   305  				return
   306  			}
   307  		}
   308  
   309  		var metricStoreMessages *metrics.MeasurementEnvelope
   310  
   311  		// 1st try local overrides for some metrics if operating in push mode
   312  		if r.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
   313  			metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName)
   314  			if err != nil {
   315  				l.WithError(err).Errorf("Could not read metric directly from OS")
   316  			}
   317  		}
   318  		t1 := time.Now()
   319  		if metricStoreMessages == nil {
   320  			metricStoreMessages, err = r.FetchMetric(ctx, md, metricName, hostState)
   321  		}
   322  
   323  		if time.Since(t1) > (time.Second * time.Duration(interval)) {
   324  			l.Warningf("Total fetching time of %v bigger than %vs interval", time.Since(t1), interval)
   325  		}
   326  
   327  		if err != nil {
   328  			failedFetches++
   329  			// complain only 1x per 10min per host/metric...
   330  			if time.Since(lastErrorNotificationTime) > time.Minute*10 {
   331  				l.WithError(err).WithField("count", failedFetches).Error("failed to fetch metric data")
   332  				lastErrorNotificationTime = time.Now()
   333  			}
   334  		} else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
   335  			r.measurementCh <- *metricStoreMessages
   336  			// pick up "server restarted" events here to avoid doing extra selects from CheckForPGObjectChangesAndStore code
   337  			if metricName == "db_stats" {
   338  				postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
   339  				if ok {
   340  					if lastUptimeS != -1 {
   341  						if postmasterUptimeS.(int64) < lastUptimeS { // restart (or possibly also failover when host is routed) happened
   342  							message := "Detected server restart (or failover)"
   343  							l.Warning(message)
   344  							detectedChangesSummary := make(metrics.Measurements, 0)
   345  							entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
   346  							entry["details"] = message
   347  							detectedChangesSummary = append(detectedChangesSummary, entry)
   348  							r.measurementCh <- metrics.MeasurementEnvelope{
   349  								DBName:     md.Name,
   350  								MetricName: "object_changes",
   351  								Data:       detectedChangesSummary,
   352  								CustomTags: metricStoreMessages.CustomTags,
   353  							}
   354  						}
   355  					}
   356  					lastUptimeS = postmasterUptimeS.(int64)
   357  				}
   358  			}
   359  		}
   360  
   361  		select {
   362  		case <-ctx.Done():
   363  			return
   364  		case <-time.After(time.Second * time.Duration(interval)):
   365  			// continue
   366  		}
   367  	}
   368  }
   369  
   370  // LoadSources loads sources from the reader
   371  func (r *Reaper) LoadSources() (err error) {
   372  	if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
   373  		r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
   374  		r.monitoredSources = make(sources.SourceConns, 0)
   375  		return nil
   376  	}
   377  
   378  	var newSrcs sources.SourceConns
   379  	srcs, err := r.SourcesReaderWriter.GetSources()
   380  	if err != nil {
   381  		return err
   382  	}
   383  	srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
   384  		return !s.IsEnabled || len(r.Sources.Groups) > 0 && !s.IsDefaultGroup() && !slices.Contains(r.Sources.Groups, s.Group)
   385  	})
   386  	if newSrcs, err = srcs.ResolveDatabases(); err != nil {
   387  		r.logger.WithError(err).Error("could not resolve databases from sources")
   388  	}
   389  
   390  	for i, newMD := range newSrcs {
   391  		md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
   392  		if md == nil {
   393  			continue
   394  		}
   395  		if md.Equal(newMD.Source) {
   396  			// replace with the existing connection if the source is the same
   397  			newSrcs[i] = md
   398  			continue
   399  		}
   400  	}
   401  	r.monitoredSources = newSrcs
   402  	r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
   403  	return nil
   404  }
   405  
   406  // WriteMonitoredSources writes actively monitored DBs listing to sinks
   407  // every monitoredDbsDatastoreSyncIntervalSeconds (default 10min)
   408  func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
   409  	for {
   410  		if len(r.monitoredSources) > 0 {
   411  			now := time.Now().UnixNano()
   412  			for _, mdb := range r.monitoredSources {
   413  				db := metrics.NewMeasurement(now)
   414  				db["tag_group"] = mdb.Group
   415  				db["master_only"] = mdb.OnlyIfMaster
   416  				for k, v := range mdb.CustomTags {
   417  					db[metrics.TagPrefix+k] = v
   418  				}
   419  				r.measurementCh <- metrics.MeasurementEnvelope{
   420  					DBName:     mdb.Name,
   421  					MetricName: monitoredDbsDatastoreSyncMetricName,
   422  					Data:       metrics.Measurements{db},
   423  				}
   424  			}
   425  		}
   426  		select {
   427  		case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
   428  			// continue
   429  		case <-ctx.Done():
   430  			return
   431  		}
   432  	}
   433  }
   434  
   435  // WriteMeasurements() writes the metrics to the sinks
   436  func (r *Reaper) WriteMeasurements(ctx context.Context) {
   437  	var err error
   438  	for {
   439  		select {
   440  		case <-ctx.Done():
   441  			return
   442  		case msg := <-r.measurementCh:
   443  			if err = r.SinksWriter.Write(msg); err != nil {
   444  				r.logger.Error(err)
   445  			}
   446  		}
   447  	}
   448  }
   449  
   450  func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
   451  	for _, dr := range data {
   452  		if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
   453  			dr[r.Sinks.RealDbnameField] = md.RealDbname
   454  		}
   455  		if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
   456  			dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
   457  		}
   458  	}
   459  }
   460  
   461  func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string, hostState map[string]map[string]string) (_ *metrics.MeasurementEnvelope, err error) {
   462  	var sql string
   463  	var data metrics.Measurements
   464  	var metric metrics.Metric
   465  	var fromCache bool
   466  	var cacheKey string
   467  	var ok bool
   468  
   469  	if metric, ok = metricDefs.GetMetricDef(metricName); !ok {
   470  		return nil, metrics.ErrMetricNotFound
   471  	}
   472  	l := r.logger.WithField("source", md.Name).WithField("metric", metricName)
   473  
   474  	if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && time.Second*time.Duration(md.GetMetricInterval(metricName)) < r.Metrics.CacheAge() {
   475  		cacheKey = fmt.Sprintf("%s:%s", md.GetClusterIdentifier(), metricName)
   476  	}
   477  	data = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge())
   478  	fromCache = len(data) > 0
   479  	if !fromCache {
   480  		if (metric.PrimaryOnly() && md.IsInRecovery) || (metric.StandbyOnly() && !md.IsInRecovery) {
   481  			l.Debug("Skipping fetching of as server in wrong IsInRecovery: ", md.IsInRecovery)
   482  			return nil, nil
   483  		}
   484  		switch metricName {
   485  		case specialMetricChangeEvents:
   486  			r.CheckForPGObjectChangesAndStore(ctx, md.Name, md, hostState)
   487  			return nil, nil
   488  		case specialMetricInstanceUp:
   489  			data, err = r.GetInstanceUpMeasurement(ctx, md)
   490  		default:
   491  			sql = metric.GetSQL(md.Version)
   492  			if sql == "" {
   493  				l.Warning("Ignoring fetching because of empty SQL")
   494  				return nil, nil
   495  			}
   496  			data, err = QueryMeasurements(ctx, md, sql)
   497  		}
   498  		if err != nil {
   499  			return nil, err
   500  		}
   501  		r.measurementCache.Put(cacheKey, data)
   502  	}
   503  	r.AddSysinfoToMeasurements(data, md)
   504  	l.WithField("cache", fromCache).WithField("rows", len(data)).Info("measurements fetched")
   505  	return &metrics.MeasurementEnvelope{
   506  		DBName:     md.Name,
   507  		MetricName: cmp.Or(metric.StorageName, metricName),
   508  		Data:       data,
   509  		CustomTags: md.CustomTags}, nil
   510  }
   511