...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/reaper/reaper.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"cmp"
     5  	"context"
     6  	"fmt"
     7  	"slices"
     8  	"strings"
     9  	"time"
    10  
    11  	"sync/atomic"
    12  
    13  	"github.com/cybertec-postgresql/pgwatch/v3/internal/cmdopts"
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    17  )
    18  
    19  var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
    20  var metricsConfig map[string]float64                      // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
    21  var metricDefs = NewConcurrentMetricDefs()
    22  
    23  // Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
    24  type Reaper struct {
    25  	*cmdopts.Options
    26  	ready                atomic.Bool
    27  	measurementCh        chan []metrics.MeasurementEnvelope
    28  	measurementCache     *InstanceMetricCache
    29  	logger               log.Logger
    30  	monitoredSources     sources.SourceConns
    31  	prevLoopMonitoredDBs sources.SourceConns
    32  	cancelFuncs          map[string]context.CancelFunc
    33  }
    34  
    35  // NewReaper creates a new Reaper instance
    36  func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
    37  	return &Reaper{
    38  		Options:              opts,
    39  		measurementCh:        make(chan []metrics.MeasurementEnvelope, 10000),
    40  		measurementCache:     NewInstanceMetricCache(),
    41  		logger:               log.GetLogger(ctx),
    42  		monitoredSources:     make(sources.SourceConns, 0),
    43  		prevLoopMonitoredDBs: make(sources.SourceConns, 0),
    44  		cancelFuncs:          make(map[string]context.CancelFunc), // [db1+metric1]cancel()
    45  	}
    46  }
    47  
    48  // Ready() returns true if the service is healthy and operating correctly
    49  func (r *Reaper) Ready() bool {
    50  	return r.ready.Load()
    51  }
    52  
    53  // Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements
    54  // from the sources and storing them to the sinks. It also manages the lifecycle of
    55  // the metric gatherers. In case of a source or metric definition change, it will
    56  // start or stop the gatherers accordingly.
    57  func (r *Reaper) Reap(ctx context.Context) {
    58  	var err error
    59  	logger := r.logger
    60  
    61  	go r.WriteMeasurements(ctx)
    62  	go r.WriteMonitoredSources(ctx)
    63  
    64  	r.ready.Store(true)
    65  
    66  	for { //main loop
    67  		if err = r.LoadSources(); err != nil {
    68  			logger.WithError(err).Error("could not refresh active sources, using last valid cache")
    69  		}
    70  		if err = r.LoadMetrics(); err != nil {
    71  			logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
    72  		}
    73  
    74  		UpdateMonitoredDBCache(r.monitoredSources)
    75  		hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set
    76  		for _, monitoredSource := range r.monitoredSources {
    77  			srcL := logger.WithField("source", monitoredSource.Name)
    78  
    79  			if monitoredSource.Connect(ctx, r.Sources) != nil {
    80  				srcL.WithError(err).Warning("could not init connection, retrying on next iteration")
    81  				continue
    82  			}
    83  
    84  			if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
    85  				srcL.WithError(err).Error("could not start metric gathering")
    86  				continue
    87  			}
    88  			srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
    89  			if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
    90  				srcL.Info("not added to monitoring due to 'master only' property")
    91  				continue
    92  			}
    93  
    94  			if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
    95  				metricsConfig = monitoredSource.MetricsStandby
    96  			} else {
    97  				metricsConfig = monitoredSource.Metrics
    98  			}
    99  			hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
   100  
   101  			r.CreateSourceHelpers(ctx, srcL, monitoredSource)
   102  
   103  			if monitoredSource.IsPostgresSource() {
   104  				if r.Sources.MinDbSizeMB >= 8 { // an empty DB is a bit less than 8MB
   105  					DBSizeMB := monitoredSource.ApproxDbSize / 1048576 // only remove from monitoring when we're certain it's under the threshold
   106  					if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
   107  						srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
   108  						hostsToShutDownDueToRoleChange[monitoredSource.Name] = true // for the case when DB size was previosly above the threshold
   109  						continue
   110  					}
   111  				}
   112  
   113  				lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
   114  				if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
   115  					srcL.Info("to be removed from monitoring due to 'master only' property and status change")
   116  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
   117  					continue
   118  				} else if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
   119  					if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   120  						srcL.Warning("Switching metrics collection to standby config...")
   121  						metricsConfig = monitoredSource.MetricsStandby
   122  						hostLastKnownStatusInRecovery[monitoredSource.Name] = true
   123  					} else {
   124  						srcL.Warning("Switching metrics collection to primary config...")
   125  						metricsConfig = monitoredSource.Metrics
   126  						hostLastKnownStatusInRecovery[monitoredSource.Name] = false
   127  					}
   128  				}
   129  
   130  			}
   131  
   132  			for metricName, interval := range metricsConfig {
   133  				metric := metricName
   134  				metricDefExists := false
   135  				var mvp metrics.Metric
   136  
   137  				if strings.HasPrefix(metric, recoPrefix) {
   138  					metric = recoMetricName
   139  					metricDefExists = true
   140  				} else {
   141  					mvp, metricDefExists = metricDefs.GetMetricDef(metric)
   142  				}
   143  
   144  				dbMetric := monitoredSource.Name + dbMetricJoinStr + metric
   145  				_, cancelFuncExists := r.cancelFuncs[dbMetric]
   146  
   147  				if metricDefExists && !cancelFuncExists { // initialize a new per db/per metric control channel
   148  					if interval > 0 {
   149  						srcL.WithField("metric", metric).WithField("interval", interval).Info("starting gatherer")
   150  						metricCtx, cancelFunc := context.WithCancel(ctx)
   151  						r.cancelFuncs[dbMetric] = cancelFunc
   152  
   153  						metricNameForStorage := metricName
   154  						if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
   155  							metricNameForStorage = mvp.StorageName
   156  						}
   157  
   158  						if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, "add"); err != nil {
   159  							srcL.Error(err)
   160  						}
   161  
   162  						go r.reapMetricMeasurements(metricCtx, monitoredSource, metric)
   163  					}
   164  				} else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
   165  					// metric definition files were recently removed or interval set to zero
   166  					if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
   167  						cancelFunc()
   168  					}
   169  					srcL.WithField("metric", metric).Warning("shutting down gatherer...")
   170  					delete(r.cancelFuncs, dbMetric)
   171  				} else if !metricDefExists {
   172  					epoch, ok := lastSQLFetchError.Load(metric)
   173  					if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) { // complain only 1x per hour
   174  						srcL.WithField("metric", metric).Warning("metric definition not found")
   175  						lastSQLFetchError.Store(metric, time.Now().Unix())
   176  					}
   177  				}
   178  			}
   179  		}
   180  
   181  		r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
   182  
   183  		r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
   184  		select {
   185  		case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
   186  			logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
   187  		case <-ctx.Done():
   188  			return
   189  		}
   190  	}
   191  }
   192  
   193  // CreateSourceHelpers creates the extensions and metric helpers for the monitored source
   194  func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
   195  	if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
   196  		return // already created
   197  	}
   198  	if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
   199  		return // no need to create anything for non-postgres sources
   200  	}
   201  
   202  	if r.Sources.TryCreateListedExtsIfMissing > "" {
   203  		srcL.Info("trying to create extensions if missing")
   204  		extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
   205  		extsCreated := TryCreateMissingExtensions(ctx, monitoredSource.Name, extsToCreate, monitoredSource.Extensions)
   206  		srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
   207  	}
   208  
   209  	if r.Sources.CreateHelpers {
   210  		srcL.Info("trying to create helper objects if missing")
   211  		if err := TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
   212  			srcL.WithError(err).Warning("failed to create helper functions")
   213  		}
   214  	}
   215  
   216  }
   217  
   218  func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRoleChange map[string]bool) {
   219  	logger := r.logger
   220  	// loop over existing channels and stop workers if DB or metric removed from config
   221  	// or state change makes it uninteresting
   222  	logger.Debug("checking if any workers need to be shut down...")
   223  	for dbMetric, cancelFunc := range r.cancelFuncs {
   224  		var currentMetricConfig map[string]float64
   225  		var md *sources.SourceConn
   226  		var ok, dbRemovedFromConfig bool
   227  		singleMetricDisabled := false
   228  		splits := strings.Split(dbMetric, dbMetricJoinStr)
   229  		db := splits[0]
   230  		metric := splits[1]
   231  
   232  		_, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
   233  		if !wholeDbShutDownDueToRoleChange {
   234  			monitoredDbCacheLock.RLock()
   235  			md, ok = monitoredDbCache[db]
   236  			monitoredDbCacheLock.RUnlock()
   237  			if !ok { // normal removing of DB from config
   238  				dbRemovedFromConfig = true
   239  				logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
   240  			}
   241  		}
   242  
   243  		if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) { // maybe some single metric was disabled
   244  			if md.IsInRecovery && len(md.MetricsStandby) > 0 {
   245  				currentMetricConfig = md.MetricsStandby
   246  			} else {
   247  				currentMetricConfig = md.Metrics
   248  			}
   249  			interval, isMetricActive := currentMetricConfig[metric]
   250  			singleMetricDisabled = !isMetricActive || interval <= 0
   251  		}
   252  
   253  		if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
   254  			logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
   255  			cancelFunc()
   256  			delete(r.cancelFuncs, dbMetric)
   257  			if err := r.SinksWriter.SyncMetric(db, metric, "remove"); err != nil {
   258  				logger.Error(err)
   259  			}
   260  		}
   261  	}
   262  
   263  	// Destroy conn pools and metric writers
   264  	r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDownDueToRoleChange)
   265  }
   266  
   267  // metrics.ControlMessage notifies of shutdown + interval change
   268  func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
   269  	hostState := make(map[string]map[string]string)
   270  	var lastUptimeS int64 = -1 // used for "server restarted" event detection
   271  	var lastErrorNotificationTime time.Time
   272  	var err error
   273  	var ok bool
   274  	var envelopes []metrics.MeasurementEnvelope
   275  
   276  	failedFetches := 0
   277  	lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
   278  
   279  	l := r.logger.WithField("source", md.Name).WithField("metric", metricName)
   280  	if metricName == specialMetricServerLogEventCounts {
   281  		metrics.ParseLogs(ctx, md, md.RealDbname, md.GetMetricInterval(metricName), r.measurementCh) // no return
   282  		return
   283  	}
   284  
   285  	for {
   286  		interval := md.GetMetricInterval(metricName)
   287  		if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
   288  			// in case of errors just ignore metric "disabled" time ranges
   289  			if err = md.FetchRuntimeInfo(ctx, false); err != nil {
   290  				lastDBVersionFetchTime = time.Now()
   291  			}
   292  
   293  			if _, ok = metricDefs.GetMetricDef(metricName); !ok {
   294  				l.Error("Could not get metric version properties")
   295  				return
   296  			}
   297  		}
   298  
   299  		var metricStoreMessages *metrics.MeasurementEnvelope
   300  
   301  		// 1st try local overrides for some metrics if operating in push mode
   302  		if r.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
   303  			metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName)
   304  			if err != nil {
   305  				l.WithError(err).Errorf("Could not reader metric directly from OS")
   306  			}
   307  		}
   308  		t1 := time.Now()
   309  		if metricStoreMessages == nil {
   310  			metricStoreMessages, err = r.FetchMetric(ctx, md, metricName, hostState)
   311  		}
   312  
   313  		if time.Since(t1) > (time.Second * time.Duration(interval)) {
   314  			l.Warningf("Total fetching time of %v bigger than %vs interval", time.Since(t1), interval)
   315  		}
   316  
   317  		if err != nil {
   318  			failedFetches++
   319  			// complain only 1x per 10min per host/metric...
   320  			if time.Since(lastErrorNotificationTime) > time.Minute*10 {
   321  				l.WithError(err).WithField("count", failedFetches).Error("failed to fetch metric data")
   322  				lastErrorNotificationTime = time.Now()
   323  			}
   324  		} else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
   325  			envelopes = []metrics.MeasurementEnvelope{*metricStoreMessages}
   326  			// pick up "server restarted" events here to avoid doing extra selects from CheckForPGObjectChangesAndStore code
   327  			if metricName == "db_stats" {
   328  				postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
   329  				if ok {
   330  					if lastUptimeS != -1 {
   331  						if postmasterUptimeS.(int64) < lastUptimeS { // restart (or possibly also failover when host is routed) happened
   332  							message := "Detected server restart (or failover)"
   333  							l.Warning(message)
   334  							detectedChangesSummary := make(metrics.Measurements, 0)
   335  							entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
   336  							entry["details"] = message
   337  							detectedChangesSummary = append(detectedChangesSummary, entry)
   338  							envelopes = append(envelopes,
   339  								metrics.MeasurementEnvelope{
   340  									DBName:     md.Name,
   341  									SourceType: string(md.Kind),
   342  									MetricName: "object_changes",
   343  									Data:       detectedChangesSummary,
   344  									CustomTags: metricStoreMessages.CustomTags,
   345  								})
   346  						}
   347  					}
   348  					lastUptimeS = postmasterUptimeS.(int64)
   349  				}
   350  			}
   351  			r.measurementCh <- envelopes
   352  		}
   353  
   354  		select {
   355  		case <-ctx.Done():
   356  			return
   357  		case <-time.After(time.Second * time.Duration(interval)):
   358  			// continue
   359  		}
   360  	}
   361  }
   362  
   363  // LoadSources loads sources from the reader
   364  func (r *Reaper) LoadSources() (err error) {
   365  	if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
   366  		r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
   367  		r.monitoredSources = make([]*sources.SourceConn, 0)
   368  		return nil
   369  	}
   370  	if r.monitoredSources, err = r.monitoredSources.SyncFromReader(r.SourcesReaderWriter); err != nil {
   371  		return err
   372  	}
   373  	r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
   374  	return nil
   375  }
   376  
   377  // WriteMonitoredSources writes actively monitored DBs listing to sinks
   378  // every monitoredDbsDatastoreSyncIntervalSeconds (default 10min)
   379  func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
   380  	for {
   381  		if len(monitoredDbCache) > 0 {
   382  			msms := make([]metrics.MeasurementEnvelope, len(monitoredDbCache))
   383  			now := time.Now().UnixNano()
   384  
   385  			monitoredDbCacheLock.RLock()
   386  			for _, mdb := range monitoredDbCache {
   387  				db := metrics.NewMeasurement(now)
   388  				db["tag_group"] = mdb.Group
   389  				db["master_only"] = mdb.OnlyIfMaster
   390  				for k, v := range mdb.CustomTags {
   391  					db[metrics.TagPrefix+k] = v
   392  				}
   393  				msms = append(msms, metrics.MeasurementEnvelope{
   394  					DBName:     mdb.Name,
   395  					MetricName: monitoredDbsDatastoreSyncMetricName,
   396  					Data:       metrics.Measurements{db},
   397  				})
   398  			}
   399  			monitoredDbCacheLock.RUnlock()
   400  			r.measurementCh <- msms
   401  		}
   402  		select {
   403  		case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
   404  			// continue
   405  		case <-ctx.Done():
   406  			return
   407  		}
   408  	}
   409  }
   410  
   411  // WriteMeasurements() writes the metrics to the sinks
   412  func (r *Reaper) WriteMeasurements(ctx context.Context) {
   413  	var err error
   414  	for {
   415  		select {
   416  		case <-ctx.Done():
   417  			return
   418  		case msg := <-r.measurementCh:
   419  			if err = r.SinksWriter.Write(msg); err != nil {
   420  				r.logger.Error(err)
   421  			}
   422  		}
   423  	}
   424  }
   425  
   426  func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
   427  	for _, dr := range data {
   428  		if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
   429  			dr[r.Sinks.RealDbnameField] = md.RealDbname
   430  		}
   431  		if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
   432  			dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
   433  		}
   434  	}
   435  }
   436  
   437  func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metricName string, hostState map[string]map[string]string) (_ *metrics.MeasurementEnvelope, err error) {
   438  	var sql string
   439  	var data metrics.Measurements
   440  	var metric metrics.Metric
   441  	var fromCache bool
   442  	var cacheKey string
   443  	var ok bool
   444  
   445  	if metric, ok = metricDefs.GetMetricDef(metricName); !ok {
   446  		return nil, metrics.ErrMetricNotFound
   447  	}
   448  	l := r.logger.WithField("source", md.Name).WithField("metric", metricName)
   449  
   450  	if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && time.Second*time.Duration(md.GetMetricInterval(metricName)) < r.Metrics.CacheAge() {
   451  		cacheKey = fmt.Sprintf("%s:%s", md.GetClusterIdentifier(), metricName)
   452  	}
   453  	if data = r.measurementCache.Get(cacheKey, r.Metrics.CacheAge()); len(data) > 0 {
   454  		fromCache = true
   455  		goto send_to_storageChannel
   456  	}
   457  
   458  	sql = metric.GetSQL(md.Version)
   459  	if sql == "" && !(metricName == specialMetricChangeEvents || metricName == recoMetricName) {
   460  		l.Warning("Ignoring fetching because of empty SQL")
   461  		return nil, nil
   462  	}
   463  
   464  	if (metric.PrimaryOnly() && md.IsInRecovery) || (metric.StandbyOnly() && !md.IsInRecovery) {
   465  		l.Debug("Skipping fetching of as server in wrong IsInRecovery: ", md.IsInRecovery)
   466  		return nil, nil
   467  	}
   468  
   469  	switch metricName {
   470  	case specialMetricChangeEvents:
   471  		r.CheckForPGObjectChangesAndStore(ctx, md.Name, md, hostState) // TODO no hostState for Prometheus currently
   472  		return nil, nil
   473  	case recoMetricName:
   474  		if data, err = GetRecommendations(ctx, md.Name, md); err != nil {
   475  			return nil, err
   476  		}
   477  	default:
   478  		if data, err = QueryMeasurements(ctx, md.Name, sql); err != nil {
   479  			// let's soften errors to "info" from functions that expect the server to be a primary to reduce noise
   480  			if strings.Contains(err.Error(), "recovery is in progress") && md.IsInRecovery {
   481  				l.Debugf("[%s:%s] failed to fetch metrics: %s", md.Name, metricName, err)
   482  				return nil, err
   483  			}
   484  			if metricName == specialMetricInstanceUp {
   485  				l.WithError(err).Debugf("[%s:%s] failed to fetch metrics. marking instance as not up", md.Name, metricName)
   486  				data = make(metrics.Measurements, 1)
   487  				data[0] = metrics.NewMeasurement(time.Now().UnixNano())
   488  				data[0]["is_up"] = 0 // should be updated if the "instance_up" metric definition is changed
   489  				goto send_to_storageChannel
   490  			}
   491  			l.
   492  				WithFields(map[string]any{"source": md.Name, "metric": metricName}).
   493  				WithError(err).Error("failed to fetch metrics")
   494  
   495  			return nil, err
   496  		}
   497  	}
   498  	r.measurementCache.Put(cacheKey, data)
   499  
   500  send_to_storageChannel:
   501  	r.AddSysinfoToMeasurements(data, md)
   502  	l.WithField("cache", fromCache).WithField("rows", len(data)).Info("measurements fetched")
   503  	return &metrics.MeasurementEnvelope{
   504  		DBName:           md.Name,
   505  		MetricName:       cmp.Or(metric.StorageName, metricName),
   506  		Data:             data,
   507  		CustomTags:       md.CustomTags,
   508  		MetricDef:        metric,
   509  		RealDbname:       md.RealDbname,
   510  		SystemIdentifier: md.SystemIdentifier}, nil
   511  }
   512