1 package reaper
2
3 import (
4 "context"
5 "fmt"
6 "slices"
7 "strings"
8 "time"
9
10 "sync/atomic"
11
12 "github.com/cybertec-postgresql/pgwatch/v3/internal/cmdopts"
13 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
14 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
15 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
16 )
17
18 var monitoredSources = make(sources.SourceConns, 0)
19 var hostLastKnownStatusInRecovery = make(map[string]bool)
20 var metricConfig map[string]float64
21 var metricDefs = NewConcurrentMetricDefs()
22
23
24 type Reaper struct {
25 *cmdopts.Options
26 ready atomic.Bool
27 measurementCh chan []metrics.MeasurementEnvelope
28 measurementCache *InstanceMetricCache
29 logger log.LoggerIface
30 }
31
32
33 func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper, err error) {
34 r = &Reaper{
35 Options: opts,
36 measurementCh: make(chan []metrics.MeasurementEnvelope, 10000),
37 measurementCache: NewInstanceMetricCache(),
38 logger: log.GetLogger(ctx),
39 }
40 return r, nil
41 }
42
43
44 func (r *Reaper) Ready() bool {
45 return r.ready.Load()
46 }
47
48
49
50
51
52 func (r *Reaper) Reap(ctx context.Context) (err error) {
53 cancelFuncs := make(map[string]context.CancelFunc)
54
55 mainLoopCount := 0
56 logger := r.logger
57
58 go r.WriteMeasurements(ctx)
59 go r.WriteMonitoredSources(ctx)
60
61 r.ready.Store(true)
62
63 for {
64 if err = r.LoadSources(); err != nil {
65 logger.WithError(err).Error("could not refresh active sources, using last valid cache")
66 }
67 if err = r.LoadMetrics(); err != nil {
68 logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
69 }
70
71 UpdateMonitoredDBCache(monitoredSources)
72 hostsToShutDownDueToRoleChange := make(map[string]bool)
73 for _, monitoredSource := range monitoredSources {
74 srcL := logger.WithField("source", monitoredSource.Name)
75
76 if monitoredSource.Connect(ctx, r.Sources) != nil {
77 srcL.WithError(err).Warning("could not init connection, retrying on next iteration")
78 continue
79 }
80
81 InitPGVersionInfoFetchingLockIfNil(monitoredSource)
82
83 var dbSettings MonitoredDatabaseSettings
84
85 dbSettings, err = GetMonitoredDatabaseSettings(ctx, monitoredSource, true)
86 if err != nil {
87 srcL.WithError(err).Error("could not start metric gathering")
88 continue
89 }
90 srcL.WithField("recovery", dbSettings.IsInRecovery).Infof("Connect OK. Version: %s", dbSettings.VersionStr)
91 if dbSettings.IsInRecovery && monitoredSource.OnlyIfMaster {
92 srcL.Info("not added to monitoring due to 'master only' property")
93 continue
94 }
95 metricConfig = func() map[string]float64 {
96 if len(monitoredSource.Metrics) > 0 {
97 return monitoredSource.Metrics
98 }
99 if monitoredSource.PresetMetrics > "" {
100 return metricDefs.GetPresetMetrics(monitoredSource.PresetMetrics)
101 }
102 return nil
103 }()
104 hostLastKnownStatusInRecovery[monitoredSource.Name] = dbSettings.IsInRecovery
105 if dbSettings.IsInRecovery {
106 metricConfig = func() map[string]float64 {
107 if len(monitoredSource.MetricsStandby) > 0 {
108 return monitoredSource.MetricsStandby
109 }
110 if monitoredSource.PresetMetricsStandby > "" {
111 return metricDefs.GetPresetMetrics(monitoredSource.PresetMetricsStandby)
112 }
113 return nil
114 }()
115 }
116
117 if monitoredSource.IsPostgresSource() && !dbSettings.IsInRecovery && r.Metrics.CreateHelpers {
118 srcL.Info("trying to create helper objects if missing")
119 if err = TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
120 srcL.WithError(err).Warning("failed to create helper functions")
121 }
122 }
123
124 if monitoredSource.IsPostgresSource() {
125 var DBSizeMB int64
126
127 if r.Sources.MinDbSizeMB >= 8 {
128 DBSizeMB, _ = DBGetSizeMB(ctx, monitoredSource.Name)
129 if DBSizeMB != 0 {
130 if DBSizeMB < r.Sources.MinDbSizeMB {
131 srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
132 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
133 continue
134 }
135 }
136 }
137 ver, err := GetMonitoredDatabaseSettings(ctx, monitoredSource, false)
138 if err == nil {
139 lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
140 if ver.IsInRecovery && monitoredSource.OnlyIfMaster {
141 srcL.Info("to be removed from monitoring due to 'master only' property and status change")
142 hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
143 continue
144 } else if lastKnownStatusInRecovery != ver.IsInRecovery {
145 if ver.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
146 srcL.Warning("Switching metrics collection to standby config...")
147 metricConfig = monitoredSource.MetricsStandby
148 hostLastKnownStatusInRecovery[monitoredSource.Name] = true
149 } else {
150 srcL.Warning("Switching metrics collection to primary config...")
151 metricConfig = monitoredSource.Metrics
152 hostLastKnownStatusInRecovery[monitoredSource.Name] = false
153 }
154 }
155 }
156
157 if mainLoopCount == 0 && r.Sources.TryCreateListedExtsIfMissing != "" && !ver.IsInRecovery {
158 extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
159 extsCreated := TryCreateMissingExtensions(ctx, monitoredSource.Name, extsToCreate, ver.Extensions)
160 srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
161 }
162 }
163
164 for metricName, interval := range metricConfig {
165 metric := metricName
166 metricDefOk := false
167 var mvp metrics.Metric
168
169 if strings.HasPrefix(metric, recoPrefix) {
170 metric = recoMetricName
171 metricDefOk = true
172 } else {
173 mvp, metricDefOk = metricDefs.GetMetricDef(metric)
174 }
175
176 dbMetric := monitoredSource.Name + dbMetricJoinStr + metric
177 _, chOk := cancelFuncs[dbMetric]
178
179 if metricDefOk && !chOk {
180 if interval > 0 {
181 srcL.WithField("metric", metric).WithField("interval", interval).Info("starting gatherer")
182 metricCtx, cancelFunc := context.WithCancel(ctx)
183 cancelFuncs[dbMetric] = cancelFunc
184
185 metricNameForStorage := metricName
186 if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric {
187 metricNameForStorage = mvp.StorageName
188 }
189
190 if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, "add"); err != nil {
191 srcL.Error(err)
192 }
193
194 go r.reapMetricMeasurements(metricCtx, monitoredSource, metric, metricConfig[metric])
195 }
196 } else if (!metricDefOk && chOk) || interval <= 0 {
197
198 if cancelFunc, isOk := cancelFuncs[dbMetric]; isOk {
199 cancelFunc()
200 }
201 srcL.WithField("metric", metric).Warning("shutting down gatherer...")
202 delete(cancelFuncs, dbMetric)
203 } else if !metricDefOk {
204 epoch, ok := lastSQLFetchError.Load(metric)
205 if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) {
206 srcL.WithField("metric", metric).Warning("metric definition not found")
207 lastSQLFetchError.Store(metric, time.Now().Unix())
208 }
209 }
210 }
211 }
212
213 if mainLoopCount == 0 {
214 goto MainLoopSleep
215 }
216
217
218
219 logger.Debug("checking if any workers need to be shut down...")
220 for dbMetric, cancelFunc := range cancelFuncs {
221 var currentMetricConfig map[string]float64
222 var dbInfo *sources.SourceConn
223 var ok, dbRemovedFromConfig bool
224 singleMetricDisabled := false
225 splits := strings.Split(dbMetric, dbMetricJoinStr)
226 db := splits[0]
227 metric := splits[1]
228
229 _, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
230 if !wholeDbShutDownDueToRoleChange {
231 monitoredDbCacheLock.RLock()
232 dbInfo, ok = monitoredDbCache[db]
233 monitoredDbCacheLock.RUnlock()
234 if !ok {
235 dbRemovedFromConfig = true
236 logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
237 }
238 }
239
240 if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) {
241 MonitoredDatabasesSettingsLock.RLock()
242 verInfo, ok := MonitoredDatabasesSettings[db]
243 MonitoredDatabasesSettingsLock.RUnlock()
244 if !ok {
245 logger.Warningf("Could not find PG version info for DB %s, skipping shutdown check of metric worker process for %s", db, metric)
246 continue
247 }
248 if verInfo.IsInRecovery && dbInfo.PresetMetricsStandby > "" || !verInfo.IsInRecovery && dbInfo.PresetMetrics > "" {
249 continue
250 }
251 if verInfo.IsInRecovery && len(dbInfo.MetricsStandby) > 0 {
252 currentMetricConfig = dbInfo.MetricsStandby
253 } else {
254 currentMetricConfig = dbInfo.Metrics
255 }
256
257 interval, isMetricActive := currentMetricConfig[metric]
258 if !isMetricActive || interval <= 0 {
259 singleMetricDisabled = true
260 }
261 }
262
263 if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
264 logger.WithField("source", db).WithField("metric", metric).Info("stoppin gatherer...")
265 cancelFunc()
266 delete(cancelFuncs, dbMetric)
267 if err := r.SinksWriter.SyncMetric(db, metric, "remove"); err != nil {
268 logger.Error(err)
269 }
270 }
271 }
272
273
274 CloseResourcesForRemovedMonitoredDBs(r.SinksWriter, monitoredSources, prevLoopMonitoredDBs, hostsToShutDownDueToRoleChange)
275
276 MainLoopSleep:
277 mainLoopCount++
278 prevLoopMonitoredDBs = slices.Clone(monitoredSources)
279
280 logger.Debugf("main sleeping %ds...", r.Sources.Refresh)
281 select {
282 case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
283
284 case <-ctx.Done():
285 return
286 }
287 }
288 }
289
290
291 func (r *Reaper) reapMetricMeasurements(ctx context.Context, mdb *sources.SourceConn, metricName string, interval float64) {
292 hostState := make(map[string]map[string]string)
293 var lastUptimeS int64 = -1
294 var lastErrorNotificationTime time.Time
295 var vme MonitoredDatabaseSettings
296 var mvp metrics.Metric
297 var err error
298 var ok bool
299 var envelopes []metrics.MeasurementEnvelope
300
301 failedFetches := 0
302 lastDBVersionFetchTime := time.Unix(0, 0)
303
304 l := r.logger.WithField("source", mdb.Name).WithField("metric", metricName)
305 if metricName == specialMetricServerLogEventCounts {
306 MonitoredDatabasesSettingsLock.RLock()
307 realDbname := MonitoredDatabasesSettings[mdb.Name].RealDbname
308 MonitoredDatabasesSettingsLock.RUnlock()
309 metrics.ParseLogs(ctx, mdb, realDbname, interval, r.measurementCh)
310 return
311 }
312
313 for {
314 if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
315 vme, err = GetMonitoredDatabaseSettings(ctx, mdb, false)
316 if err != nil {
317 lastDBVersionFetchTime = time.Now()
318 }
319
320 mvp, ok = metricDefs.GetMetricDef(metricName)
321 if !ok {
322 l.Errorf("Could not get metric version properties: %s", metricName)
323 return
324 }
325 }
326
327 var metricStoreMessages *metrics.MeasurementEnvelope
328 mfm := MetricFetchConfig{
329 DBUniqueName: mdb.Name,
330 DBUniqueNameOrig: mdb.GetDatabaseName(),
331 MetricName: metricName,
332 Source: mdb.Kind,
333 Interval: time.Second * time.Duration(interval),
334 StmtTimeoutOverride: 0,
335 }
336
337
338 if r.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
339 metricStoreMessages, err = FetchStatsDirectlyFromOS(ctx, mfm, vme, mvp)
340 if err != nil {
341 l.WithError(err).Errorf("Could not reader metric directly from OS")
342 }
343 }
344 t1 := time.Now()
345 if metricStoreMessages == nil {
346 metricStoreMessages, err = r.FetchMetric(ctx, mfm, hostState)
347 }
348 t2 := time.Now()
349
350 if t2.Sub(t1) > (time.Second * time.Duration(interval)) {
351 l.Warningf("Total fetching time of %vs bigger than %vs interval", t2.Sub(t1).Truncate(time.Millisecond*100).Seconds(), interval)
352 }
353
354 if err != nil {
355 failedFetches++
356
357 if lastErrorNotificationTime.IsZero() || lastErrorNotificationTime.Add(time.Second*time.Duration(600)).Before(time.Now()) {
358 l.WithError(err).Error("failed to fetch metric data")
359 if failedFetches > 1 {
360 l.Errorf("Total failed fetches: %d", failedFetches)
361 }
362 lastErrorNotificationTime = time.Now()
363 }
364 } else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
365 envelopes = append(envelopes, *metricStoreMessages)
366
367 if metricName == "db_stats" {
368 postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
369 if ok {
370 if lastUptimeS != -1 {
371 if postmasterUptimeS.(int64) < lastUptimeS {
372 message := "Detected server restart (or failover) of \"" + mdb.Name + "\""
373 l.Warning(message)
374 detectedChangesSummary := make(metrics.Measurements, 0)
375 entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
376 entry["details"] = message
377 detectedChangesSummary = append(detectedChangesSummary, entry)
378 envelopes = append(envelopes,
379 metrics.MeasurementEnvelope{
380 DBName: mdb.Name,
381 SourceType: string(mdb.Kind),
382 MetricName: "object_changes",
383 Data: detectedChangesSummary,
384 CustomTags: metricStoreMessages.CustomTags,
385 })
386 }
387 }
388 lastUptimeS = postmasterUptimeS.(int64)
389 }
390 }
391 r.measurementCh <- envelopes
392 }
393
394 select {
395 case <-ctx.Done():
396 return
397 case <-time.After(time.Second * time.Duration(interval)):
398
399 }
400 }
401 }
402
403
404 func (r *Reaper) LoadSources() (err error) {
405 if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
406 r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
407 monitoredSources = make([]*sources.SourceConn, 0)
408 return nil
409 }
410 if monitoredSources, err = monitoredSources.SyncFromReader(r.SourcesReaderWriter); err != nil {
411 return err
412 }
413 r.logger.WithField("sources", len(monitoredSources)).Info("sources refreshed")
414 return nil
415 }
416
417
418
419 func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
420 for {
421 if len(monitoredDbCache) > 0 {
422 msms := make([]metrics.MeasurementEnvelope, len(monitoredDbCache))
423 now := time.Now().UnixNano()
424
425 monitoredDbCacheLock.RLock()
426 for _, mdb := range monitoredDbCache {
427 db := metrics.NewMeasurement(now)
428 db["tag_group"] = mdb.Group
429 db["master_only"] = mdb.OnlyIfMaster
430 for k, v := range mdb.CustomTags {
431 db[metrics.TagPrefix+k] = v
432 }
433 msms = append(msms, metrics.MeasurementEnvelope{
434 DBName: mdb.Name,
435 MetricName: monitoredDbsDatastoreSyncMetricName,
436 Data: metrics.Measurements{db},
437 })
438 }
439 monitoredDbCacheLock.RUnlock()
440 r.measurementCh <- msms
441 }
442 select {
443 case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
444
445 case <-ctx.Done():
446 return
447 }
448 }
449 }
450
451
452 func (r *Reaper) WriteMeasurements(ctx context.Context) {
453 var err error
454 for {
455 select {
456 case <-ctx.Done():
457 return
458 case msg := <-r.measurementCh:
459 if err = r.SinksWriter.Write(msg); err != nil {
460 r.logger.Error(err)
461 }
462 }
463 }
464 }
465
466 func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, ver MonitoredDatabaseSettings) metrics.Measurements {
467 enrichedData := make(metrics.Measurements, 0)
468 for _, dr := range data {
469 if r.Sinks.RealDbnameField > "" && ver.RealDbname > "" {
470 old, ok := dr[r.Sinks.RealDbnameField]
471 if !ok || old == "" {
472 dr[r.Sinks.RealDbnameField] = ver.RealDbname
473 }
474 }
475 if r.Sinks.SystemIdentifierField > "" && ver.SystemIdentifier > "" {
476 old, ok := dr[r.Sinks.SystemIdentifierField]
477 if !ok || old == "" {
478 dr[r.Sinks.SystemIdentifierField] = ver.SystemIdentifier
479 }
480 }
481 enrichedData = append(enrichedData, dr)
482 }
483 return enrichedData
484 }
485
486 func (r *Reaper) FetchMetric(ctx context.Context, msg MetricFetchConfig, hostState map[string]map[string]string) (*metrics.MeasurementEnvelope, error) {
487
488 var dbSettings MonitoredDatabaseSettings
489 var dbVersion int
490 var err error
491 var sql string
492 var data, cachedData metrics.Measurements
493 var md *sources.SourceConn
494 var metric metrics.Metric
495 var fromCache bool
496 var cacheKey string
497 var ok bool
498
499 if md, err = GetMonitoredDatabaseByUniqueName(msg.DBUniqueName); err != nil {
500 return nil, err
501 }
502
503 if metric, ok = metricDefs.GetMetricDef(msg.MetricName); !ok {
504 return nil, metrics.ErrMetricNotFound
505 }
506
507 dbSettings, err = GetMonitoredDatabaseSettings(ctx, md, false)
508 if err != nil {
509 log.GetLogger(ctx).Error("failed to fetch pg version for ", msg.DBUniqueName, msg.MetricName, err)
510 return nil, err
511 }
512 if msg.MetricName == specialMetricDbSize || msg.MetricName == specialMetricTableStats {
513 if dbSettings.ExecEnv == execEnvAzureSingle && dbSettings.ApproxDBSizeB > 1e12 {
514 subsMetricName := msg.MetricName + "_approx"
515 mvpApprox, ok := metricDefs.GetMetricDef(subsMetricName)
516 if ok && mvpApprox.StorageName == msg.MetricName {
517 log.GetLogger(ctx).Infof("[%s:%s] Transparently swapping metric to %s due to hard-coded rules...", msg.DBUniqueName, msg.MetricName, subsMetricName)
518 msg.MetricName = subsMetricName
519 }
520 }
521 }
522 dbVersion = dbSettings.Version
523
524 if msg.Source == sources.SourcePgBouncer {
525 dbVersion = 0
526 }
527
528 if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && msg.Interval < r.Metrics.CacheAge() {
529 cacheKey = fmt.Sprintf("%s:%s:%d:%s",
530 dbSettings.SystemIdentifier,
531 md.ConnConfig.ConnConfig.Host,
532 md.ConnConfig.ConnConfig.Port,
533 msg.MetricName)
534 }
535 if cachedData = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge()); len(cachedData) > 0 {
536 fromCache = true
537 goto send_to_storageChannel
538 }
539
540 sql = metric.GetSQL(dbVersion)
541 if sql == "" && !(msg.MetricName == specialMetricChangeEvents || msg.MetricName == recoMetricName) {
542
543 log.GetLogger(ctx).Debugf("[%s:%s] Ignoring fetch message - got an empty/dummy SQL string", msg.DBUniqueName, msg.MetricName)
544 return nil, nil
545 }
546
547 if (metric.PrimaryOnly() && dbSettings.IsInRecovery) || (metric.StandbyOnly() && !dbSettings.IsInRecovery) {
548 log.GetLogger(ctx).Debugf("[%s:%s] Skipping fetching of as server not in wanted state (IsInRecovery=%v)", msg.DBUniqueName, msg.MetricName, dbSettings.IsInRecovery)
549 return nil, nil
550 }
551
552 if msg.MetricName == specialMetricChangeEvents {
553 r.CheckForPGObjectChangesAndStore(ctx, msg.DBUniqueName, dbSettings, hostState)
554 } else if msg.MetricName == recoMetricName {
555 if data, err = GetRecommendations(ctx, msg.DBUniqueName, dbSettings); err != nil {
556 return nil, err
557 }
558 } else if msg.Source == sources.SourcePgPool {
559 if data, err = FetchMetricsPgpool(ctx, msg, dbSettings, metric); err != nil {
560 return nil, err
561 }
562 } else {
563 data, err = QueryMeasurements(ctx, msg.DBUniqueName, sql)
564
565 if err != nil {
566
567 if strings.Contains(err.Error(), "recovery is in progress") {
568 MonitoredDatabasesSettingsLock.RLock()
569 ver := MonitoredDatabasesSettings[msg.DBUniqueName]
570 MonitoredDatabasesSettingsLock.RUnlock()
571 if ver.IsInRecovery {
572 log.GetLogger(ctx).Debugf("[%s:%s] failed to fetch metrics: %s", msg.DBUniqueName, msg.MetricName, err)
573 return nil, err
574 }
575 }
576
577 if msg.MetricName == specialMetricInstanceUp {
578 log.GetLogger(ctx).WithError(err).Debugf("[%s:%s] failed to fetch metrics. marking instance as not up", msg.DBUniqueName, msg.MetricName)
579 data = make(metrics.Measurements, 1)
580 data[0] = metrics.NewMeasurement(time.Now().UnixNano())
581 data[0]["is_up"] = 0
582 goto send_to_storageChannel
583 }
584
585 log.GetLogger(ctx).
586 WithFields(map[string]any{"source": msg.DBUniqueName, "metric": msg.MetricName}).
587 WithError(err).Error("failed to fetch metrics")
588
589 return nil, err
590 }
591
592 log.GetLogger(ctx).WithFields(map[string]any{"source": msg.DBUniqueName, "metric": msg.MetricName, "rows": len(data)}).Info("measurements fetched")
593 }
594
595 r.measurementCache.Put(cacheKey, data)
596
597 send_to_storageChannel:
598
599 if (r.Sinks.RealDbnameField > "" || r.Sinks.SystemIdentifierField > "") && msg.Source == sources.SourcePostgres {
600 MonitoredDatabasesSettingsLock.RLock()
601 ver := MonitoredDatabasesSettings[msg.DBUniqueName]
602 MonitoredDatabasesSettingsLock.RUnlock()
603 data = r.AddSysinfoToMeasurements(data, ver)
604 }
605
606 if metric.StorageName != "" {
607 log.GetLogger(ctx).Debugf("[%s] rerouting metric %s data to %s based on metric attributes", msg.DBUniqueName, msg.MetricName, metric.StorageName)
608 msg.MetricName = metric.StorageName
609 }
610 if fromCache {
611 log.GetLogger(ctx).Infof("[%s:%s] loaded %d rows from the instance cache", msg.DBUniqueName, msg.MetricName, len(cachedData))
612 return &metrics.MeasurementEnvelope{DBName: msg.DBUniqueName, MetricName: msg.MetricName, Data: cachedData, CustomTags: md.CustomTags,
613 MetricDef: metric, RealDbname: dbSettings.RealDbname, SystemIdentifier: dbSettings.SystemIdentifier}, nil
614 }
615 return &metrics.MeasurementEnvelope{DBName: msg.DBUniqueName, MetricName: msg.MetricName, Data: data, CustomTags: md.CustomTags,
616 MetricDef: metric, RealDbname: dbSettings.RealDbname, SystemIdentifier: dbSettings.SystemIdentifier}, nil
617
618 }
619