...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/reaper/reaper.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"slices"
     7  	"strings"
     8  	"time"
     9  
    10  	"sync/atomic"
    11  
    12  	"github.com/cybertec-postgresql/pgwatch/v3/internal/cmdopts"
    13  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    16  )
    17  
    18  var monitoredSources = make(sources.SourceConns, 0)
    19  var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
    20  var metricConfig map[string]float64                       // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
    21  var metricDefs = NewConcurrentMetricDefs()
    22  
    23  // Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
    24  type Reaper struct {
    25  	*cmdopts.Options
    26  	ready            atomic.Bool
    27  	measurementCh    chan []metrics.MeasurementEnvelope
    28  	measurementCache *InstanceMetricCache
    29  	logger           log.LoggerIface
    30  }
    31  
    32  // NewReaper creates a new Reaper instance
    33  func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper, err error) {
    34  	r = &Reaper{
    35  		Options:          opts,
    36  		measurementCh:    make(chan []metrics.MeasurementEnvelope, 10000),
    37  		measurementCache: NewInstanceMetricCache(),
    38  		logger:           log.GetLogger(ctx),
    39  	}
    40  	return r, nil
    41  }
    42  
    43  // Ready() returns true if the service is healthy and operating correctly
    44  func (r *Reaper) Ready() bool {
    45  	return r.ready.Load()
    46  }
    47  
    48  // Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements
    49  // from the sources and storing them to the sinks. It also manages the lifecycle of
    50  // the metric gatherers. In case of a source or metric definition change, it will
    51  // start or stop the gatherers accordingly.
    52  func (r *Reaper) Reap(ctx context.Context) (err error) {
    53  	cancelFuncs := make(map[string]context.CancelFunc) // [db1+metric1]=chan
    54  
    55  	mainLoopCount := 0
    56  	logger := r.logger
    57  
    58  	go r.WriteMeasurements(ctx)
    59  	go r.WriteMonitoredSources(ctx)
    60  
    61  	r.ready.Store(true)
    62  
    63  	for { //main loop
    64  		if err = r.LoadSources(); err != nil {
    65  			logger.WithError(err).Error("could not refresh active sources, using last valid cache")
    66  		}
    67  		if err = r.LoadMetrics(); err != nil {
    68  			logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
    69  		}
    70  
    71  		UpdateMonitoredDBCache(monitoredSources)
    72  		hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set
    73  		for _, monitoredSource := range monitoredSources {
    74  			srcL := logger.WithField("source", monitoredSource.Name)
    75  
    76  			if monitoredSource.Connect(ctx, r.Sources) != nil {
    77  				srcL.WithError(err).Warning("could not init connection, retrying on next iteration")
    78  				continue
    79  			}
    80  
    81  			InitPGVersionInfoFetchingLockIfNil(monitoredSource)
    82  
    83  			var dbSettings MonitoredDatabaseSettings
    84  
    85  			dbSettings, err = GetMonitoredDatabaseSettings(ctx, monitoredSource, true)
    86  			if err != nil {
    87  				srcL.WithError(err).Error("could not start metric gathering")
    88  				continue
    89  			}
    90  			srcL.WithField("recovery", dbSettings.IsInRecovery).Infof("Connect OK. Version: %s", dbSettings.VersionStr)
    91  			if dbSettings.IsInRecovery && monitoredSource.OnlyIfMaster {
    92  				srcL.Info("not added to monitoring due to 'master only' property")
    93  				continue
    94  			}
    95  			metricConfig = func() map[string]float64 {
    96  				if len(monitoredSource.Metrics) > 0 {
    97  					return monitoredSource.Metrics
    98  				}
    99  				if monitoredSource.PresetMetrics > "" {
   100  					return metricDefs.GetPresetMetrics(monitoredSource.PresetMetrics)
   101  				}
   102  				return nil
   103  			}()
   104  			hostLastKnownStatusInRecovery[monitoredSource.Name] = dbSettings.IsInRecovery
   105  			if dbSettings.IsInRecovery {
   106  				metricConfig = func() map[string]float64 {
   107  					if len(monitoredSource.MetricsStandby) > 0 {
   108  						return monitoredSource.MetricsStandby
   109  					}
   110  					if monitoredSource.PresetMetricsStandby > "" {
   111  						return metricDefs.GetPresetMetrics(monitoredSource.PresetMetricsStandby)
   112  					}
   113  					return nil
   114  				}()
   115  			}
   116  
   117  			if monitoredSource.IsPostgresSource() && !dbSettings.IsInRecovery && r.Metrics.CreateHelpers {
   118  				srcL.Info("trying to create helper objects if missing")
   119  				if err = TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
   120  					srcL.WithError(err).Warning("failed to create helper functions")
   121  				}
   122  			}
   123  
   124  			if monitoredSource.IsPostgresSource() {
   125  				var DBSizeMB int64
   126  
   127  				if r.Sources.MinDbSizeMB >= 8 { // an empty DB is a bit less than 8MB
   128  					DBSizeMB, _ = DBGetSizeMB(ctx, monitoredSource.Name) // ignore errors, i.e. only remove from monitoring when we're certain it's under the threshold
   129  					if DBSizeMB != 0 {
   130  						if DBSizeMB < r.Sources.MinDbSizeMB {
   131  							srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
   132  							hostsToShutDownDueToRoleChange[monitoredSource.Name] = true // for the case when DB size was previosly above the threshold
   133  							continue
   134  						}
   135  					}
   136  				}
   137  				ver, err := GetMonitoredDatabaseSettings(ctx, monitoredSource, false)
   138  				if err == nil { // ok to ignore error, re-tried on next loop
   139  					lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
   140  					if ver.IsInRecovery && monitoredSource.OnlyIfMaster {
   141  						srcL.Info("to be removed from monitoring due to 'master only' property and status change")
   142  						hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
   143  						continue
   144  					} else if lastKnownStatusInRecovery != ver.IsInRecovery {
   145  						if ver.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   146  							srcL.Warning("Switching metrics collection to standby config...")
   147  							metricConfig = monitoredSource.MetricsStandby
   148  							hostLastKnownStatusInRecovery[monitoredSource.Name] = true
   149  						} else {
   150  							srcL.Warning("Switching metrics collection to primary config...")
   151  							metricConfig = monitoredSource.Metrics
   152  							hostLastKnownStatusInRecovery[monitoredSource.Name] = false
   153  						}
   154  					}
   155  				}
   156  
   157  				if mainLoopCount == 0 && r.Sources.TryCreateListedExtsIfMissing != "" && !ver.IsInRecovery {
   158  					extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
   159  					extsCreated := TryCreateMissingExtensions(ctx, monitoredSource.Name, extsToCreate, ver.Extensions)
   160  					srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
   161  				}
   162  			}
   163  
   164  			for metricName, interval := range metricConfig {
   165  				metric := metricName
   166  				metricDefOk := false
   167  				var mvp metrics.Metric
   168  
   169  				if strings.HasPrefix(metric, recoPrefix) {
   170  					metric = recoMetricName
   171  					metricDefOk = true
   172  				} else {
   173  					mvp, metricDefOk = metricDefs.GetMetricDef(metric)
   174  				}
   175  
   176  				dbMetric := monitoredSource.Name + dbMetricJoinStr + metric
   177  				_, chOk := cancelFuncs[dbMetric]
   178  
   179  				if metricDefOk && !chOk { // initialize a new per db/per metric control channel
   180  					if interval > 0 {
   181  						srcL.WithField("metric", metric).WithField("interval", interval).Info("starting gatherer")
   182  						metricCtx, cancelFunc := context.WithCancel(ctx)
   183  						cancelFuncs[dbMetric] = cancelFunc
   184  
   185  						metricNameForStorage := metricName
   186  						if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric {
   187  							metricNameForStorage = mvp.StorageName
   188  						}
   189  
   190  						if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, "add"); err != nil {
   191  							srcL.Error(err)
   192  						}
   193  
   194  						go r.reapMetricMeasurements(metricCtx, monitoredSource, metric, metricConfig[metric])
   195  					}
   196  				} else if (!metricDefOk && chOk) || interval <= 0 {
   197  					// metric definition files were recently removed or interval set to zero
   198  					if cancelFunc, isOk := cancelFuncs[dbMetric]; isOk {
   199  						cancelFunc()
   200  					}
   201  					srcL.WithField("metric", metric).Warning("shutting down gatherer...")
   202  					delete(cancelFuncs, dbMetric)
   203  				} else if !metricDefOk {
   204  					epoch, ok := lastSQLFetchError.Load(metric)
   205  					if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) { // complain only 1x per hour
   206  						srcL.WithField("metric", metric).Warning("metric definition not found")
   207  						lastSQLFetchError.Store(metric, time.Now().Unix())
   208  					}
   209  				}
   210  			}
   211  		}
   212  
   213  		if mainLoopCount == 0 {
   214  			goto MainLoopSleep
   215  		}
   216  
   217  		// loop over existing channels and stop workers if DB or metric removed from config
   218  		// or state change makes it uninteresting
   219  		logger.Debug("checking if any workers need to be shut down...")
   220  		for dbMetric, cancelFunc := range cancelFuncs {
   221  			var currentMetricConfig map[string]float64
   222  			var dbInfo *sources.SourceConn
   223  			var ok, dbRemovedFromConfig bool
   224  			singleMetricDisabled := false
   225  			splits := strings.Split(dbMetric, dbMetricJoinStr)
   226  			db := splits[0]
   227  			metric := splits[1]
   228  
   229  			_, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
   230  			if !wholeDbShutDownDueToRoleChange {
   231  				monitoredDbCacheLock.RLock()
   232  				dbInfo, ok = monitoredDbCache[db]
   233  				monitoredDbCacheLock.RUnlock()
   234  				if !ok { // normal removing of DB from config
   235  					dbRemovedFromConfig = true
   236  					logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
   237  				}
   238  			}
   239  
   240  			if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) { // maybe some single metric was disabled
   241  				MonitoredDatabasesSettingsLock.RLock()
   242  				verInfo, ok := MonitoredDatabasesSettings[db]
   243  				MonitoredDatabasesSettingsLock.RUnlock()
   244  				if !ok {
   245  					logger.Warningf("Could not find PG version info for DB %s, skipping shutdown check of metric worker process for %s", db, metric)
   246  					continue
   247  				}
   248  				if verInfo.IsInRecovery && dbInfo.PresetMetricsStandby > "" || !verInfo.IsInRecovery && dbInfo.PresetMetrics > "" {
   249  					continue // no need to check presets for single metric disabling
   250  				}
   251  				if verInfo.IsInRecovery && len(dbInfo.MetricsStandby) > 0 {
   252  					currentMetricConfig = dbInfo.MetricsStandby
   253  				} else {
   254  					currentMetricConfig = dbInfo.Metrics
   255  				}
   256  
   257  				interval, isMetricActive := currentMetricConfig[metric]
   258  				if !isMetricActive || interval <= 0 {
   259  					singleMetricDisabled = true
   260  				}
   261  			}
   262  
   263  			if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
   264  				logger.WithField("source", db).WithField("metric", metric).Info("stoppin gatherer...")
   265  				cancelFunc()
   266  				delete(cancelFuncs, dbMetric)
   267  				if err := r.SinksWriter.SyncMetric(db, metric, "remove"); err != nil {
   268  					logger.Error(err)
   269  				}
   270  			}
   271  		}
   272  
   273  		// Destroy conn pools and metric writers
   274  		CloseResourcesForRemovedMonitoredDBs(r.SinksWriter, monitoredSources, prevLoopMonitoredDBs, hostsToShutDownDueToRoleChange)
   275  
   276  	MainLoopSleep:
   277  		mainLoopCount++
   278  		prevLoopMonitoredDBs = slices.Clone(monitoredSources)
   279  
   280  		logger.Debugf("main sleeping %ds...", r.Sources.Refresh)
   281  		select {
   282  		case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
   283  			// pass
   284  		case <-ctx.Done():
   285  			return
   286  		}
   287  	}
   288  }
   289  
   290  // metrics.ControlMessage notifies of shutdown + interval change
   291  func (r *Reaper) reapMetricMeasurements(ctx context.Context, mdb *sources.SourceConn, metricName string, interval float64) {
   292  	hostState := make(map[string]map[string]string)
   293  	var lastUptimeS int64 = -1 // used for "server restarted" event detection
   294  	var lastErrorNotificationTime time.Time
   295  	var vme MonitoredDatabaseSettings
   296  	var mvp metrics.Metric
   297  	var err error
   298  	var ok bool
   299  	var envelopes []metrics.MeasurementEnvelope
   300  
   301  	failedFetches := 0
   302  	lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
   303  
   304  	l := r.logger.WithField("source", mdb.Name).WithField("metric", metricName)
   305  	if metricName == specialMetricServerLogEventCounts {
   306  		MonitoredDatabasesSettingsLock.RLock()
   307  		realDbname := MonitoredDatabasesSettings[mdb.Name].RealDbname // to manage 2 sets of event counts - monitored DB + global
   308  		MonitoredDatabasesSettingsLock.RUnlock()
   309  		metrics.ParseLogs(ctx, mdb, realDbname, interval, r.measurementCh) // no return
   310  		return
   311  	}
   312  
   313  	for {
   314  		if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
   315  			vme, err = GetMonitoredDatabaseSettings(ctx, mdb, false) // in case of errors just ignore metric "disabled" time ranges
   316  			if err != nil {
   317  				lastDBVersionFetchTime = time.Now()
   318  			}
   319  
   320  			mvp, ok = metricDefs.GetMetricDef(metricName)
   321  			if !ok {
   322  				l.Errorf("Could not get metric version properties: %s", metricName)
   323  				return
   324  			}
   325  		}
   326  
   327  		var metricStoreMessages *metrics.MeasurementEnvelope
   328  		mfm := MetricFetchConfig{
   329  			DBUniqueName:        mdb.Name,
   330  			DBUniqueNameOrig:    mdb.GetDatabaseName(),
   331  			MetricName:          metricName,
   332  			Source:              mdb.Kind,
   333  			Interval:            time.Second * time.Duration(interval),
   334  			StmtTimeoutOverride: 0,
   335  		}
   336  
   337  		// 1st try local overrides for some metrics if operating in push mode
   338  		if r.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
   339  			metricStoreMessages, err = FetchStatsDirectlyFromOS(ctx, mfm, vme, mvp)
   340  			if err != nil {
   341  				l.WithError(err).Errorf("Could not reader metric directly from OS")
   342  			}
   343  		}
   344  		t1 := time.Now()
   345  		if metricStoreMessages == nil {
   346  			metricStoreMessages, err = r.FetchMetric(ctx, mfm, hostState)
   347  		}
   348  		t2 := time.Now()
   349  
   350  		if t2.Sub(t1) > (time.Second * time.Duration(interval)) {
   351  			l.Warningf("Total fetching time of %vs bigger than %vs interval", t2.Sub(t1).Truncate(time.Millisecond*100).Seconds(), interval)
   352  		}
   353  
   354  		if err != nil {
   355  			failedFetches++
   356  			// complain only 1x per 10min per host/metric...
   357  			if lastErrorNotificationTime.IsZero() || lastErrorNotificationTime.Add(time.Second*time.Duration(600)).Before(time.Now()) {
   358  				l.WithError(err).Error("failed to fetch metric data")
   359  				if failedFetches > 1 {
   360  					l.Errorf("Total failed fetches: %d", failedFetches)
   361  				}
   362  				lastErrorNotificationTime = time.Now()
   363  			}
   364  		} else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
   365  			envelopes = append(envelopes, *metricStoreMessages)
   366  			// pick up "server restarted" events here to avoid doing extra selects from CheckForPGObjectChangesAndStore code
   367  			if metricName == "db_stats" {
   368  				postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
   369  				if ok {
   370  					if lastUptimeS != -1 {
   371  						if postmasterUptimeS.(int64) < lastUptimeS { // restart (or possibly also failover when host is routed) happened
   372  							message := "Detected server restart (or failover) of \"" + mdb.Name + "\""
   373  							l.Warning(message)
   374  							detectedChangesSummary := make(metrics.Measurements, 0)
   375  							entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
   376  							entry["details"] = message
   377  							detectedChangesSummary = append(detectedChangesSummary, entry)
   378  							envelopes = append(envelopes,
   379  								metrics.MeasurementEnvelope{
   380  									DBName:     mdb.Name,
   381  									SourceType: string(mdb.Kind),
   382  									MetricName: "object_changes",
   383  									Data:       detectedChangesSummary,
   384  									CustomTags: metricStoreMessages.CustomTags,
   385  								})
   386  						}
   387  					}
   388  					lastUptimeS = postmasterUptimeS.(int64)
   389  				}
   390  			}
   391  			r.measurementCh <- envelopes
   392  		}
   393  
   394  		select {
   395  		case <-ctx.Done():
   396  			return
   397  		case <-time.After(time.Second * time.Duration(interval)):
   398  			// continue
   399  		}
   400  	}
   401  }
   402  
   403  // LoadSources loads sources from the reader
   404  func (r *Reaper) LoadSources() (err error) {
   405  	if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
   406  		r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
   407  		monitoredSources = make([]*sources.SourceConn, 0)
   408  		return nil
   409  	}
   410  	if monitoredSources, err = monitoredSources.SyncFromReader(r.SourcesReaderWriter); err != nil {
   411  		return err
   412  	}
   413  	r.logger.WithField("sources", len(monitoredSources)).Info("sources refreshed")
   414  	return nil
   415  }
   416  
   417  // WriteMonitoredSources writes actively monitored DBs listing to sinks
   418  // every monitoredDbsDatastoreSyncIntervalSeconds (default 10min)
   419  func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
   420  	for {
   421  		if len(monitoredDbCache) > 0 {
   422  			msms := make([]metrics.MeasurementEnvelope, len(monitoredDbCache))
   423  			now := time.Now().UnixNano()
   424  
   425  			monitoredDbCacheLock.RLock()
   426  			for _, mdb := range monitoredDbCache {
   427  				db := metrics.NewMeasurement(now)
   428  				db["tag_group"] = mdb.Group
   429  				db["master_only"] = mdb.OnlyIfMaster
   430  				for k, v := range mdb.CustomTags {
   431  					db[metrics.TagPrefix+k] = v
   432  				}
   433  				msms = append(msms, metrics.MeasurementEnvelope{
   434  					DBName:     mdb.Name,
   435  					MetricName: monitoredDbsDatastoreSyncMetricName,
   436  					Data:       metrics.Measurements{db},
   437  				})
   438  			}
   439  			monitoredDbCacheLock.RUnlock()
   440  			r.measurementCh <- msms
   441  		}
   442  		select {
   443  		case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
   444  			// continue
   445  		case <-ctx.Done():
   446  			return
   447  		}
   448  	}
   449  }
   450  
   451  // WriteMeasurements() writes the metrics to the sinks
   452  func (r *Reaper) WriteMeasurements(ctx context.Context) {
   453  	var err error
   454  	for {
   455  		select {
   456  		case <-ctx.Done():
   457  			return
   458  		case msg := <-r.measurementCh:
   459  			if err = r.SinksWriter.Write(msg); err != nil {
   460  				r.logger.Error(err)
   461  			}
   462  		}
   463  	}
   464  }
   465  
   466  func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, ver MonitoredDatabaseSettings) metrics.Measurements {
   467  	enrichedData := make(metrics.Measurements, 0)
   468  	for _, dr := range data {
   469  		if r.Sinks.RealDbnameField > "" && ver.RealDbname > "" {
   470  			old, ok := dr[r.Sinks.RealDbnameField]
   471  			if !ok || old == "" {
   472  				dr[r.Sinks.RealDbnameField] = ver.RealDbname
   473  			}
   474  		}
   475  		if r.Sinks.SystemIdentifierField > "" && ver.SystemIdentifier > "" {
   476  			old, ok := dr[r.Sinks.SystemIdentifierField]
   477  			if !ok || old == "" {
   478  				dr[r.Sinks.SystemIdentifierField] = ver.SystemIdentifier
   479  			}
   480  		}
   481  		enrichedData = append(enrichedData, dr)
   482  	}
   483  	return enrichedData
   484  }
   485  
   486  func (r *Reaper) FetchMetric(ctx context.Context, msg MetricFetchConfig, hostState map[string]map[string]string) (*metrics.MeasurementEnvelope, error) {
   487  
   488  	var dbSettings MonitoredDatabaseSettings
   489  	var dbVersion int
   490  	var err error
   491  	var sql string
   492  	var data, cachedData metrics.Measurements
   493  	var md *sources.SourceConn
   494  	var metric metrics.Metric
   495  	var fromCache bool
   496  	var cacheKey string
   497  	var ok bool
   498  
   499  	if md, err = GetMonitoredDatabaseByUniqueName(msg.DBUniqueName); err != nil {
   500  		return nil, err
   501  	}
   502  
   503  	if metric, ok = metricDefs.GetMetricDef(msg.MetricName); !ok {
   504  		return nil, metrics.ErrMetricNotFound
   505  	}
   506  
   507  	dbSettings, err = GetMonitoredDatabaseSettings(ctx, md, false)
   508  	if err != nil {
   509  		log.GetLogger(ctx).Error("failed to fetch pg version for ", msg.DBUniqueName, msg.MetricName, err)
   510  		return nil, err
   511  	}
   512  	if msg.MetricName == specialMetricDbSize || msg.MetricName == specialMetricTableStats {
   513  		if dbSettings.ExecEnv == execEnvAzureSingle && dbSettings.ApproxDBSizeB > 1e12 { // 1TB
   514  			subsMetricName := msg.MetricName + "_approx"
   515  			mvpApprox, ok := metricDefs.GetMetricDef(subsMetricName)
   516  			if ok && mvpApprox.StorageName == msg.MetricName {
   517  				log.GetLogger(ctx).Infof("[%s:%s] Transparently swapping metric to %s due to hard-coded rules...", msg.DBUniqueName, msg.MetricName, subsMetricName)
   518  				msg.MetricName = subsMetricName
   519  			}
   520  		}
   521  	}
   522  	dbVersion = dbSettings.Version
   523  
   524  	if msg.Source == sources.SourcePgBouncer {
   525  		dbVersion = 0 // version is 0.0 for all pgbouncer sql per convention
   526  	}
   527  
   528  	if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && msg.Interval < r.Metrics.CacheAge() {
   529  		cacheKey = fmt.Sprintf("%s:%s:%d:%s",
   530  			dbSettings.SystemIdentifier,
   531  			md.ConnConfig.ConnConfig.Host,
   532  			md.ConnConfig.ConnConfig.Port,
   533  			msg.MetricName)
   534  	}
   535  	if cachedData = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge()); len(cachedData) > 0 {
   536  		fromCache = true
   537  		goto send_to_storageChannel
   538  	}
   539  
   540  	sql = metric.GetSQL(dbVersion)
   541  	if sql == "" && !(msg.MetricName == specialMetricChangeEvents || msg.MetricName == recoMetricName) {
   542  		// let's ignore dummy SQLs
   543  		log.GetLogger(ctx).Debugf("[%s:%s] Ignoring fetch message - got an empty/dummy SQL string", msg.DBUniqueName, msg.MetricName)
   544  		return nil, nil
   545  	}
   546  
   547  	if (metric.PrimaryOnly() && dbSettings.IsInRecovery) || (metric.StandbyOnly() && !dbSettings.IsInRecovery) {
   548  		log.GetLogger(ctx).Debugf("[%s:%s] Skipping fetching of  as server not in wanted state (IsInRecovery=%v)", msg.DBUniqueName, msg.MetricName, dbSettings.IsInRecovery)
   549  		return nil, nil
   550  	}
   551  
   552  	if msg.MetricName == specialMetricChangeEvents { // special handling, multiple queries + stateful
   553  		r.CheckForPGObjectChangesAndStore(ctx, msg.DBUniqueName, dbSettings, hostState) // TODO no hostState for Prometheus currently
   554  	} else if msg.MetricName == recoMetricName {
   555  		if data, err = GetRecommendations(ctx, msg.DBUniqueName, dbSettings); err != nil {
   556  			return nil, err
   557  		}
   558  	} else if msg.Source == sources.SourcePgPool {
   559  		if data, err = FetchMetricsPgpool(ctx, msg, dbSettings, metric); err != nil {
   560  			return nil, err
   561  		}
   562  	} else {
   563  		data, err = QueryMeasurements(ctx, msg.DBUniqueName, sql)
   564  
   565  		if err != nil {
   566  			// let's soften errors to "info" from functions that expect the server to be a primary to reduce noise
   567  			if strings.Contains(err.Error(), "recovery is in progress") {
   568  				MonitoredDatabasesSettingsLock.RLock()
   569  				ver := MonitoredDatabasesSettings[msg.DBUniqueName]
   570  				MonitoredDatabasesSettingsLock.RUnlock()
   571  				if ver.IsInRecovery {
   572  					log.GetLogger(ctx).Debugf("[%s:%s] failed to fetch metrics: %s", msg.DBUniqueName, msg.MetricName, err)
   573  					return nil, err
   574  				}
   575  			}
   576  
   577  			if msg.MetricName == specialMetricInstanceUp {
   578  				log.GetLogger(ctx).WithError(err).Debugf("[%s:%s] failed to fetch metrics. marking instance as not up", msg.DBUniqueName, msg.MetricName)
   579  				data = make(metrics.Measurements, 1)
   580  				data[0] = metrics.NewMeasurement(time.Now().UnixNano())
   581  				data[0]["is_up"] = 0 // should be updated if the "instance_up" metric definition is changed
   582  				goto send_to_storageChannel
   583  			}
   584  
   585  			log.GetLogger(ctx).
   586  				WithFields(map[string]any{"source": msg.DBUniqueName, "metric": msg.MetricName}).
   587  				WithError(err).Error("failed to fetch metrics")
   588  
   589  			return nil, err
   590  		}
   591  
   592  		log.GetLogger(ctx).WithFields(map[string]any{"source": msg.DBUniqueName, "metric": msg.MetricName, "rows": len(data)}).Info("measurements fetched")
   593  	}
   594  
   595  	r.measurementCache.Put(cacheKey, data)
   596  
   597  send_to_storageChannel:
   598  
   599  	if (r.Sinks.RealDbnameField > "" || r.Sinks.SystemIdentifierField > "") && msg.Source == sources.SourcePostgres {
   600  		MonitoredDatabasesSettingsLock.RLock()
   601  		ver := MonitoredDatabasesSettings[msg.DBUniqueName]
   602  		MonitoredDatabasesSettingsLock.RUnlock()
   603  		data = r.AddSysinfoToMeasurements(data, ver)
   604  	}
   605  
   606  	if metric.StorageName != "" {
   607  		log.GetLogger(ctx).Debugf("[%s] rerouting metric %s data to %s based on metric attributes", msg.DBUniqueName, msg.MetricName, metric.StorageName)
   608  		msg.MetricName = metric.StorageName
   609  	}
   610  	if fromCache {
   611  		log.GetLogger(ctx).Infof("[%s:%s] loaded %d rows from the instance cache", msg.DBUniqueName, msg.MetricName, len(cachedData))
   612  		return &metrics.MeasurementEnvelope{DBName: msg.DBUniqueName, MetricName: msg.MetricName, Data: cachedData, CustomTags: md.CustomTags,
   613  			MetricDef: metric, RealDbname: dbSettings.RealDbname, SystemIdentifier: dbSettings.SystemIdentifier}, nil
   614  	}
   615  	return &metrics.MeasurementEnvelope{DBName: msg.DBUniqueName, MetricName: msg.MetricName, Data: data, CustomTags: md.CustomTags,
   616  		MetricDef: metric, RealDbname: dbSettings.RealDbname, SystemIdentifier: dbSettings.SystemIdentifier}, nil
   617  
   618  }
   619