...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/reaper.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"cmp"
     5  	"context"
     6  	"fmt"
     7  	"runtime"
     8  	"slices"
     9  	"strings"
    10  	"time"
    11  
    12  	"sync/atomic"
    13  
    14  	"github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
    15  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    16  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    17  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
    18  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    19  )
    20  
    21  const (
    22  	specialMetricChangeEvents         = "change_events"
    23  	specialMetricServerLogEventCounts = "server_log_event_counts"
    24  	specialMetricInstanceUp           = "instance_up"
    25  )
    26  
    27  var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
    28  
    29  var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
    30  var metricsConfig map[string]float64                      // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
    31  var metricDefs = NewConcurrentMetricDefs()
    32  
    33  // Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
    34  type Reaper struct {
    35  	*cmdopts.Options
    36  	ready                atomic.Bool
    37  	measurementCh        chan metrics.MeasurementEnvelope
    38  	measurementCache     *InstanceMetricCache
    39  	logger               log.Logger
    40  	monitoredSources     sources.SourceConns
    41  	prevLoopMonitoredDBs sources.SourceConns
    42  	cancelFuncs          map[string]context.CancelFunc
    43  }
    44  
    45  // NewReaper creates a new Reaper instance
    46  func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
    47  	return &Reaper{
    48  		Options:              opts,
    49  		measurementCh:        make(chan metrics.MeasurementEnvelope, 256),
    50  		measurementCache:     NewInstanceMetricCache(),
    51  		logger:               log.GetLogger(ctx),
    52  		monitoredSources:     make(sources.SourceConns, 0),
    53  		prevLoopMonitoredDBs: make(sources.SourceConns, 0),
    54  		cancelFuncs:          make(map[string]context.CancelFunc), // [db1+metric1]cancel()
    55  	}
    56  }
    57  
    58  // Ready() returns true if the service is healthy and operating correctly
    59  func (r *Reaper) Ready() bool {
    60  	return r.ready.Load()
    61  }
    62  
    63  func (r *Reaper) PrintMemStats() {
    64  	var m runtime.MemStats
    65  	runtime.ReadMemStats(&m)
    66  
    67  	bToKb := func(b uint64) uint64 {
    68  		return b / 1024
    69  	}
    70  	r.logger.Debugf("Alloc: %d Kb, TotalAlloc: %d Kb, Sys: %d Kb, NumGC: %d, HeapAlloc: %d Kb, HeapSys: %d Kb",
    71  		bToKb(m.Alloc), bToKb(m.TotalAlloc), bToKb(m.Sys), m.NumGC, bToKb(m.HeapAlloc), bToKb(m.HeapSys))
    72  }
    73  
    74  // Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements
    75  // from the sources and storing them to the sinks. It also manages the lifecycle of
    76  // the metric gatherers. In case of a source or metric definition change, it will
    77  // start or stop the gatherers accordingly.
    78  func (r *Reaper) Reap(ctx context.Context) {
    79  	var err error
    80  	logger := r.logger
    81  
    82  	go r.WriteMeasurements(ctx)
    83  	go r.WriteMonitoredSources(ctx)
    84  
    85  	r.ready.Store(true)
    86  
    87  	for { //main loop
    88  		if r.Logging.LogLevel == "debug" {
    89  			r.PrintMemStats()
    90  		}
    91  		if err = r.LoadSources(ctx); err != nil {
    92  			logger.WithError(err).Error("could not refresh active sources, using last valid cache")
    93  		}
    94  		if err = r.LoadMetrics(); err != nil {
    95  			logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
    96  		}
    97  
    98  		// UpdateMonitoredDBCache(r.monitoredSources)
    99  		hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set
   100  		for _, monitoredSource := range r.monitoredSources {
   101  			srcL := logger.WithField("source", monitoredSource.Name)
   102  			ctx = log.WithLogger(ctx, srcL)
   103  
   104  			if monitoredSource.Connect(ctx, r.Sources) != nil {
   105  				srcL.Warning("could not init connection, retrying on next iteration")
   106  				continue
   107  			}
   108  
   109  			if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
   110  				srcL.WithError(err).Error("could not start metric gathering")
   111  				continue
   112  			}
   113  			srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
   114  			if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
   115  				srcL.Info("not added to monitoring due to 'master only' property")
   116  				if monitoredSource.IsPostgresSource() {
   117  					srcL.Info("to be removed from monitoring due to 'master only' property and status change")
   118  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
   119  				}
   120  				continue
   121  			}
   122  
   123  			if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   124  				metricsConfig = monitoredSource.MetricsStandby
   125  			} else {
   126  				metricsConfig = monitoredSource.Metrics
   127  			}
   128  
   129  			r.CreateSourceHelpers(ctx, srcL, monitoredSource)
   130  
   131  			if monitoredSource.IsPostgresSource() {
   132  				DBSizeMB := monitoredSource.ApproxDbSize / 1048576 // only remove from monitoring when we're certain it's under the threshold
   133  				if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
   134  					srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
   135  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true // for the case when DB size was previosly above the threshold
   136  					continue
   137  				}
   138  
   139  				lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
   140  				if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
   141  					if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   142  						srcL.Warning("Switching metrics collection to standby config...")
   143  						metricsConfig = monitoredSource.MetricsStandby
   144  					} else if !monitoredSource.IsInRecovery {
   145  						srcL.Warning("Switching metrics collection to primary config...")
   146  						metricsConfig = monitoredSource.Metrics
   147  					}
   148  					// else: it already has primary config do nothing + no warn
   149  				}
   150  			}
   151  			hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
   152  
   153  			for metricName, interval := range metricsConfig {
   154  				metricDefExists := false
   155  				var mvp metrics.Metric
   156  
   157  				mvp, metricDefExists = metricDefs.GetMetricDef(metricName)
   158  
   159  				dbMetric := monitoredSource.Name + dbMetricJoinStr + metricName
   160  				_, cancelFuncExists := r.cancelFuncs[dbMetric]
   161  
   162  				if metricDefExists && !cancelFuncExists { // initialize a new per db/per metric control channel
   163  					if interval > 0 {
   164  						srcL.WithField("metric", metricName).WithField("interval", interval).Info("starting gatherer")
   165  						metricCtx, cancelFunc := context.WithCancel(ctx)
   166  						r.cancelFuncs[dbMetric] = cancelFunc
   167  
   168  						metricNameForStorage := metricName
   169  						if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
   170  							metricNameForStorage = mvp.StorageName
   171  						}
   172  
   173  						if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, sinks.AddOp); err != nil {
   174  							srcL.Error(err)
   175  						}
   176  
   177  						go r.reapMetricMeasurements(metricCtx, monitoredSource, metricName)
   178  					}
   179  				} else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
   180  					// metric definition files were recently removed or interval set to zero
   181  					if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
   182  						cancelFunc()
   183  					}
   184  					srcL.WithField("metric", metricName).Warning("shutting down gatherer...")
   185  					delete(r.cancelFuncs, dbMetric)
   186  				} else if !metricDefExists {
   187  					epoch, ok := lastSQLFetchError.Load(metricName)
   188  					if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) { // complain only 1x per hour
   189  						srcL.WithField("metric", metricName).Warning("metric definition not found")
   190  						lastSQLFetchError.Store(metricName, time.Now().Unix())
   191  					}
   192  				}
   193  			}
   194  		}
   195  
   196  		r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
   197  
   198  		r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
   199  		select {
   200  		case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
   201  			logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
   202  		case <-ctx.Done():
   203  			return
   204  		}
   205  	}
   206  }
   207  
   208  // CreateSourceHelpers creates the extensions and metric helpers for the monitored source
   209  func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
   210  	if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
   211  		return // already created
   212  	}
   213  	if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
   214  		return // no need to create anything for non-postgres sources
   215  	}
   216  
   217  	if r.Sources.TryCreateListedExtsIfMissing > "" {
   218  		srcL.Info("trying to create extensions if missing")
   219  		extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
   220  		monitoredSource.RLock()
   221  		extsCreated := TryCreateMissingExtensions(ctx, monitoredSource, extsToCreate, monitoredSource.Extensions)
   222  		monitoredSource.RUnlock()
   223  		srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
   224  	}
   225  
   226  	if r.Sources.CreateHelpers {
   227  		srcL.Info("trying to create helper objects if missing")
   228  		if err := TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
   229  			srcL.WithError(err).Warning("failed to create helper functions")
   230  		}
   231  	}
   232  
   233  }
   234  
   235  func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool) {
   236  	logger := r.logger
   237  	// loop over existing channels and stop workers if DB or metric removed from config
   238  	// or state change makes it uninteresting
   239  	logger.Debug("checking if any workers need to be shut down...")
   240  	for dbMetric, cancelFunc := range r.cancelFuncs {
   241  		var currentMetricConfig map[string]float64
   242  		var md *sources.SourceConn
   243  		var dbRemovedFromConfig bool
   244  		var metricRemovedFromPreset bool
   245  		splits := strings.Split(dbMetric, dbMetricJoinStr)
   246  		db := splits[0]
   247  		metric := splits[1]
   248  
   249  		_, wholeDbShutDown := hostsToShutDown[db]
   250  		if !wholeDbShutDown {
   251  			md = r.monitoredSources.GetMonitoredDatabase(db)
   252  			if md == nil { // normal removing of DB from config
   253  				dbRemovedFromConfig = true
   254  				logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
   255  			}
   256  		}
   257  
   258  		// Detects metrics removed from a preset definition.
   259  		//
   260  		// If not using presets, a metric removed from configs will
   261  		// be detected earlier by `LoadSources()` as configs change that 
   262  		// triggers a restart and get passed in `hostsToShutDown`.
   263  		if !(wholeDbShutDown || dbRemovedFromConfig) {
   264  			if md.IsInRecovery && len(md.MetricsStandby) > 0 {
   265  				currentMetricConfig = md.MetricsStandby
   266  			} else {
   267  				currentMetricConfig = md.Metrics
   268  			}
   269  			interval, isMetricActive := currentMetricConfig[metric]
   270  			metricRemovedFromPreset = !isMetricActive || interval <= 0
   271  		}
   272  
   273  		if ctx.Err() != nil || wholeDbShutDown || dbRemovedFromConfig || metricRemovedFromPreset {
   274  			logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
   275  			cancelFunc()
   276  			delete(r.cancelFuncs, dbMetric)
   277  			if err := r.SinksWriter.SyncMetric(db, metric, sinks.DeleteOp); err != nil {
   278  				logger.Error(err)
   279  			}
   280  		}
   281  	}
   282  
   283  	// Destroy conn pools and metric writers
   284  	r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDown)
   285  }
   286  
   287  func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
   288  	var lastUptimeS int64 = -1 // used for "server restarted" event detection
   289  	var lastErrorNotificationTime time.Time
   290  	var err error
   291  	var ok bool
   292  
   293  	failedFetches := 0
   294  	lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
   295  
   296  	l := log.GetLogger(ctx).WithField("metric", metricName)
   297  	ctx = log.WithLogger(ctx, l)
   298  
   299  	if metricName == specialMetricServerLogEventCounts {
   300  		lp, err := NewLogParser(ctx, md, r.measurementCh)
   301  		if err != nil {
   302  			l.WithError(err).Error("Failed to init log parser")
   303  			return
   304  		}
   305  		err = lp.ParseLogs()
   306  		if err != nil {
   307  			l.WithError(err).Error("Error parsing logs")
   308  		}
   309  		return
   310  	}
   311  
   312  	for {
   313  		interval := md.GetMetricInterval(metricName)
   314  		if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
   315  			// in case of errors just ignore metric "disabled" time ranges
   316  			if err = md.FetchRuntimeInfo(ctx, false); err != nil {
   317  				lastDBVersionFetchTime = time.Now()
   318  			}
   319  
   320  			if _, ok = metricDefs.GetMetricDef(metricName); !ok {
   321  				l.Error("Could not get metric version properties")
   322  				return
   323  			}
   324  		}
   325  
   326  		var metricStoreMessages *metrics.MeasurementEnvelope
   327  
   328  		t1 := time.Now()
   329  		// 1st try local overrides for system metrics if operating on the same host
   330  		if IsDirectlyFetchableMetric(md, metricName) {
   331  			if metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName); err != nil {
   332  				l.WithError(err).Errorf("Could not read metric directly from OS")
   333  			}
   334  		}
   335  		if metricStoreMessages == nil {
   336  			metricStoreMessages, err = r.FetchMetric(ctx, md, metricName)
   337  		}
   338  
   339  		if time.Since(t1) > (time.Second * time.Duration(interval)) {
   340  			l.Warningf("Total fetching time of %v bigger than %vs interval", time.Since(t1), interval)
   341  		}
   342  
   343  		if err != nil {
   344  			failedFetches++
   345  			// complain only 1x per 10min per host/metric...
   346  			if time.Since(lastErrorNotificationTime) > time.Minute*10 {
   347  				l.WithError(err).WithField("count", failedFetches).Error("failed to fetch metric data")
   348  				lastErrorNotificationTime = time.Now()
   349  			}
   350  		} else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
   351  			r.measurementCh <- *metricStoreMessages
   352  			// pick up "server restarted" events here to avoid doing extra selects from CheckForPGObjectChangesAndStore code
   353  			if metricName == "db_stats" {
   354  				postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
   355  				if ok {
   356  					if lastUptimeS != -1 {
   357  						if postmasterUptimeS.(int64) < lastUptimeS { // restart (or possibly also failover when host is routed) happened
   358  							message := "Detected server restart (or failover)"
   359  							l.Warning(message)
   360  							detectedChangesSummary := make(metrics.Measurements, 0)
   361  							entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
   362  							entry["details"] = message
   363  							detectedChangesSummary = append(detectedChangesSummary, entry)
   364  							r.measurementCh <- metrics.MeasurementEnvelope{
   365  								DBName:     md.Name,
   366  								MetricName: "object_changes",
   367  								Data:       detectedChangesSummary,
   368  								CustomTags: metricStoreMessages.CustomTags,
   369  							}
   370  						}
   371  					}
   372  					lastUptimeS = postmasterUptimeS.(int64)
   373  				}
   374  			}
   375  		}
   376  
   377  		select {
   378  		case <-ctx.Done():
   379  			return
   380  		case <-time.After(time.Second * time.Duration(interval)):
   381  			// continue
   382  		}
   383  	}
   384  }
   385  
   386  // LoadSources loads sources from the reader
   387  func (r *Reaper) LoadSources(ctx context.Context) (err error) {
   388  	if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
   389  		r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
   390  		r.monitoredSources = make(sources.SourceConns, 0)
   391  		return nil
   392  	}
   393  
   394  	var newSrcs sources.SourceConns
   395  	srcs, err := r.SourcesReaderWriter.GetSources()
   396  	if err != nil {
   397  		return err
   398  	}
   399  	srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
   400  		return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group)
   401  	})
   402  	if newSrcs, err = srcs.ResolveDatabases(); err != nil {
   403  		r.logger.WithError(err).Error("could not resolve databases from sources")
   404  	}
   405  
   406  	for i, newMD := range newSrcs {
   407  		md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
   408  		if md == nil {
   409  			continue
   410  		}
   411  		if md.Equal(newMD.Source) {
   412  			// replace with the existing connection if the source is the same
   413  			newSrcs[i] = md
   414  			continue
   415  		}
   416  		// Source configs changed, stop all running gatherers to trigger a restart
   417  		// TODO: Optimize this for single metric addition/deletion/interval-change cases to not do a full restart
   418  		r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...")
   419  		r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true})
   420  	}
   421  	r.monitoredSources = newSrcs
   422  	r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
   423  	return nil
   424  }
   425  
   426  // WriteMonitoredSources writes actively monitored DBs listing to sinks
   427  // every monitoredDbsDatastoreSyncIntervalSeconds (default 10min)
   428  func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
   429  	for {
   430  		if len(r.monitoredSources) > 0 {
   431  			now := time.Now().UnixNano()
   432  			for _, mdb := range r.monitoredSources {
   433  				db := metrics.NewMeasurement(now)
   434  				db["tag_group"] = mdb.Group
   435  				db["master_only"] = mdb.OnlyIfMaster
   436  				for k, v := range mdb.CustomTags {
   437  					db[metrics.TagPrefix+k] = v
   438  				}
   439  				r.measurementCh <- metrics.MeasurementEnvelope{
   440  					DBName:     mdb.Name,
   441  					MetricName: monitoredDbsDatastoreSyncMetricName,
   442  					Data:       metrics.Measurements{db},
   443  				}
   444  			}
   445  		}
   446  		select {
   447  		case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
   448  			// continue
   449  		case <-ctx.Done():
   450  			return
   451  		}
   452  	}
   453  }
   454  
   455  // WriteMeasurements() writes the metrics to the sinks
   456  func (r *Reaper) WriteMeasurements(ctx context.Context) {
   457  	var err error
   458  	for {
   459  		select {
   460  		case <-ctx.Done():
   461  			return
   462  		case msg := <-r.measurementCh:
   463  			if err = r.SinksWriter.Write(msg); err != nil {
   464  				r.logger.Error(err)
   465  			}
   466  		}
   467  	}
   468  }
   469  
   470  func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
   471  	for _, dr := range data {
   472  		if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
   473  			dr[r.Sinks.RealDbnameField] = md.RealDbname
   474  		}
   475  		if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
   476  			dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
   477  		}
   478  	}
   479  }
   480  
   481  func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string) (_ *metrics.MeasurementEnvelope, err error) {
   482  	var sql string
   483  	var data metrics.Measurements
   484  	var metric metrics.Metric
   485  	var fromCache bool
   486  	var cacheKey string
   487  	var ok bool
   488  
   489  	if metric, ok = metricDefs.GetMetricDef(metricName); !ok {
   490  		return nil, metrics.ErrMetricNotFound
   491  	}
   492  	l := log.GetLogger(ctx)
   493  
   494  	if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && time.Second*time.Duration(md.GetMetricInterval(metricName)) < r.Metrics.CacheAge() {
   495  		cacheKey = fmt.Sprintf("%s:%s", md.GetClusterIdentifier(), metricName)
   496  	}
   497  	data = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge())
   498  	fromCache = len(data) > 0
   499  	if !fromCache {
   500  		if (metric.PrimaryOnly() && md.IsInRecovery) || (metric.StandbyOnly() && !md.IsInRecovery) {
   501  			l.Debug("Skipping fetching of as server in wrong IsInRecovery: ", md.IsInRecovery)
   502  			return nil, nil
   503  		}
   504  		switch metricName {
   505  		case specialMetricChangeEvents:
   506  			data, err = r.GetObjectChangesMeasurement(ctx, md)
   507  		case specialMetricInstanceUp:
   508  			data, err = r.GetInstanceUpMeasurement(ctx, md)
   509  		default:
   510  			sql = metric.GetSQL(md.Version)
   511  			if sql == "" {
   512  				l.Warning("Ignoring fetching because of empty SQL")
   513  				return nil, nil
   514  			}
   515  			data, err = QueryMeasurements(ctx, md, sql)
   516  		}
   517  		if err != nil || len(data) == 0 {
   518  			return nil, err
   519  		}
   520  		r.measurementCache.Put(cacheKey, data)
   521  	}
   522  	r.AddSysinfoToMeasurements(data, md)
   523  	l.WithField("cache", fromCache).WithField("rows", len(data)).Info("measurements fetched")
   524  	return &metrics.MeasurementEnvelope{
   525  		DBName:     md.Name,
   526  		MetricName: cmp.Or(metric.StorageName, metricName),
   527  		Data:       data,
   528  		CustomTags: md.CustomTags}, nil
   529  }
   530