...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/reaper/database.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"maps"
     8  	"strings"
     9  	"time"
    10  
    11  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    12  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    13  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    16  	"github.com/jackc/pgx/v5"
    17  )
    18  
    19  func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
    20  	var conn db.PgxIface
    21  	var err error
    22  	var tx pgx.Tx
    23  	if strings.TrimSpace(sql) == "" {
    24  		return nil, errors.New("empty SQL")
    25  	}
    26  
    27  	conn = md.Conn
    28  	if md.IsPostgresSource() {
    29  		// we don't want transaction for non-postgres sources, e.g. pgbouncer
    30  		if tx, err = conn.Begin(ctx); err != nil {
    31  			return nil, err
    32  		}
    33  		defer func() { _ = tx.Commit(ctx) }()
    34  		_, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
    35  		if err != nil {
    36  			return nil, err
    37  		}
    38  		conn = tx
    39  	} else {
    40  		// we want simple protocol for non-postgres connections, e.g. pgpool
    41  		args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
    42  	}
    43  	rows, err := conn.Query(ctx, sql, args...)
    44  	if err == nil {
    45  		return pgx.CollectRows(rows, metrics.RowToMeasurement)
    46  	}
    47  	return nil, err
    48  }
    49  
    50  func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults) {
    51  	detectedChanges := make(metrics.Measurements, 0)
    52  	var firstRun bool
    53  	l := log.GetLogger(ctx)
    54  	changeCounts.Target = "functions"
    55  	l.Debug("checking for sproc changes...")
    56  	if _, ok := md.ChangeState["sproc_hashes"]; !ok {
    57  		firstRun = true
    58  		md.ChangeState["sproc_hashes"] = make(map[string]string)
    59  	}
    60  
    61  	mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
    62  	if !ok {
    63  		l.Error("could not get sproc_hashes sql")
    64  		return
    65  	}
    66  
    67  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
    68  	if err != nil {
    69  		l.Error(err)
    70  		return
    71  	}
    72  
    73  	for _, dr := range data {
    74  		objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
    75  		prevHash, ok := md.ChangeState["sproc_hashes"][objIdent]
    76  		ll := l.WithField("sproc", dr["tag_sproc"]).WithField("oid", dr["tag_oid"])
    77  		if ok { // we have existing state
    78  			if prevHash != dr["md5"].(string) {
    79  				ll.Debug("change detected")
    80  				dr["event"] = "alter"
    81  				detectedChanges = append(detectedChanges, dr)
    82  				md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
    83  				changeCounts.Altered++
    84  			}
    85  		} else { // check for new / delete
    86  			if !firstRun {
    87  				ll.Debug("new sproc detected")
    88  				dr["event"] = "create"
    89  				detectedChanges = append(detectedChanges, dr)
    90  				changeCounts.Created++
    91  			}
    92  			md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
    93  		}
    94  	}
    95  	// detect deletes
    96  	if !firstRun && len(md.ChangeState["sproc_hashes"]) != len(data) {
    97  		// turn resultset to map => [oid]=true for faster checks
    98  		currentOidMap := make(map[string]bool)
    99  		for _, dr := range data {
   100  			currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
   101  		}
   102  		for sprocIdent := range md.ChangeState["sproc_hashes"] {
   103  			_, ok := currentOidMap[sprocIdent]
   104  			if !ok {
   105  				splits := strings.Split(sprocIdent, dbMetricJoinStr)
   106  				l.WithField("sproc", splits[0]).WithField("oid", splits[1]).Debug("deleted sproc detected")
   107  				m := metrics.NewMeasurement(data.GetEpoch())
   108  				m["event"] = "drop"
   109  				m["tag_sproc"] = splits[0]
   110  				m["tag_oid"] = splits[1]
   111  				detectedChanges = append(detectedChanges, m)
   112  				delete(md.ChangeState["sproc_hashes"], sprocIdent)
   113  				changeCounts.Dropped++
   114  			}
   115  		}
   116  	}
   117  	l.Debugf("sproc changes detected: %d", len(detectedChanges))
   118  	if len(detectedChanges) > 0 {
   119  		r.measurementCh <- metrics.MeasurementEnvelope{
   120  			DBName:     md.Name,
   121  			MetricName: "sproc_changes",
   122  			Data:       detectedChanges,
   123  			CustomTags: md.CustomTags,
   124  		}
   125  	}
   126  
   127  	return changeCounts
   128  }
   129  
   130  func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   131  	detectedChanges := make(metrics.Measurements, 0)
   132  	var firstRun bool
   133  	var changeCounts ChangeDetectionResults
   134  	l := log.GetLogger(ctx)
   135  	changeCounts.Target = "tables"
   136  	l.Debug("checking for table changes...")
   137  	if _, ok := md.ChangeState["table_hashes"]; !ok {
   138  		firstRun = true
   139  		md.ChangeState["table_hashes"] = make(map[string]string)
   140  	}
   141  
   142  	mvp, ok := metricDefs.GetMetricDef("table_hashes")
   143  	if !ok {
   144  		l.Error("could not get table_hashes sql")
   145  		return changeCounts
   146  	}
   147  
   148  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   149  	if err != nil {
   150  		l.Error(err)
   151  		return changeCounts
   152  	}
   153  
   154  	for _, dr := range data {
   155  		objIdent := dr["tag_table"].(string)
   156  		prevHash, ok := md.ChangeState["table_hashes"][objIdent]
   157  		ll := l.WithField("table", dr["tag_table"])
   158  		if ok { // we have existing state
   159  			if prevHash != dr["md5"].(string) {
   160  				ll.Debug("change detected")
   161  				dr["event"] = "alter"
   162  				detectedChanges = append(detectedChanges, dr)
   163  				md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
   164  				changeCounts.Altered++
   165  			}
   166  		} else { // check for new / delete
   167  			if !firstRun {
   168  				ll.Debug("new table detected")
   169  				dr["event"] = "create"
   170  				detectedChanges = append(detectedChanges, dr)
   171  				changeCounts.Created++
   172  			}
   173  			md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
   174  		}
   175  	}
   176  	// detect deletes
   177  	if !firstRun && len(md.ChangeState["table_hashes"]) != len(data) {
   178  		deletedTables := make([]string, 0)
   179  		// turn resultset to map => [table]=true for faster checks
   180  		currentTableMap := make(map[string]bool)
   181  		for _, dr := range data {
   182  			currentTableMap[dr["tag_table"].(string)] = true
   183  		}
   184  		for table := range md.ChangeState["table_hashes"] {
   185  			_, ok := currentTableMap[table]
   186  			if !ok {
   187  				l.WithField("table", table).Debug("deleted table detected")
   188  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   189  				influxEntry["event"] = "drop"
   190  				influxEntry["tag_table"] = table
   191  				detectedChanges = append(detectedChanges, influxEntry)
   192  				deletedTables = append(deletedTables, table)
   193  				changeCounts.Dropped++
   194  			}
   195  		}
   196  		for _, deletedTable := range deletedTables {
   197  			delete(md.ChangeState["table_hashes"], deletedTable)
   198  		}
   199  	}
   200  
   201  	l.Debugf("table changes detected: %d", len(detectedChanges))
   202  	if len(detectedChanges) > 0 {
   203  		r.measurementCh <- metrics.MeasurementEnvelope{
   204  			DBName:     md.Name,
   205  			MetricName: "table_changes",
   206  			Data:       detectedChanges,
   207  			CustomTags: md.CustomTags,
   208  		}
   209  	}
   210  
   211  	return changeCounts
   212  }
   213  
   214  func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   215  	detectedChanges := make(metrics.Measurements, 0)
   216  	var firstRun bool
   217  	var changeCounts ChangeDetectionResults
   218  	l := log.GetLogger(ctx)
   219  	changeCounts.Target = "indexes"
   220  	l.Debug("checking for index changes...")
   221  	if _, ok := md.ChangeState["index_hashes"]; !ok {
   222  		firstRun = true
   223  		md.ChangeState["index_hashes"] = make(map[string]string)
   224  	}
   225  
   226  	mvp, ok := metricDefs.GetMetricDef("index_hashes")
   227  	if !ok {
   228  		l.Error("could not get index_hashes sql")
   229  		return changeCounts
   230  	}
   231  
   232  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   233  	if err != nil {
   234  		l.Error(err)
   235  		return changeCounts
   236  	}
   237  
   238  	for _, dr := range data {
   239  		objIdent := dr["tag_index"].(string)
   240  		prevHash, ok := md.ChangeState["index_hashes"][objIdent]
   241  		ll := l.WithField("index", dr["tag_index"]).WithField("table", dr["table"])
   242  		if ok { // we have existing state
   243  			if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
   244  				ll.Debug("change detected")
   245  				dr["event"] = "alter"
   246  				detectedChanges = append(detectedChanges, dr)
   247  				md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   248  				changeCounts.Altered++
   249  			}
   250  		} else { // check for new / delete
   251  			if !firstRun {
   252  				ll.Debug("new index detected")
   253  				dr["event"] = "create"
   254  				detectedChanges = append(detectedChanges, dr)
   255  				changeCounts.Created++
   256  			}
   257  			md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   258  		}
   259  	}
   260  	// detect deletes
   261  	if !firstRun && len(md.ChangeState["index_hashes"]) != len(data) {
   262  		deletedIndexes := make([]string, 0)
   263  		// turn resultset to map => [table]=true for faster checks
   264  		currentIndexMap := make(map[string]bool)
   265  		for _, dr := range data {
   266  			currentIndexMap[dr["tag_index"].(string)] = true
   267  		}
   268  		for indexName := range md.ChangeState["index_hashes"] {
   269  			_, ok := currentIndexMap[indexName]
   270  			if !ok {
   271  				l.WithField("index", indexName).Debug("deleted index detected")
   272  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   273  				influxEntry["event"] = "drop"
   274  				influxEntry["tag_index"] = indexName
   275  				detectedChanges = append(detectedChanges, influxEntry)
   276  				deletedIndexes = append(deletedIndexes, indexName)
   277  				changeCounts.Dropped++
   278  			}
   279  		}
   280  		for _, deletedIndex := range deletedIndexes {
   281  			delete(md.ChangeState["index_hashes"], deletedIndex)
   282  		}
   283  	}
   284  	l.Debugf("index changes detected: %d", len(detectedChanges))
   285  	if len(detectedChanges) > 0 {
   286  		r.measurementCh <- metrics.MeasurementEnvelope{
   287  			DBName:     md.Name,
   288  			MetricName: "index_changes",
   289  			Data:       detectedChanges,
   290  			CustomTags: md.CustomTags,
   291  		}
   292  	}
   293  
   294  	return changeCounts
   295  }
   296  
   297  func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   298  	detectedChanges := make(metrics.Measurements, 0)
   299  	var firstRun bool
   300  	var changeCounts ChangeDetectionResults
   301  	l := log.GetLogger(ctx)
   302  	changeCounts.Target = "privileges"
   303  	l.Debug("checking object privilege changes...")
   304  	if _, ok := md.ChangeState["object_privileges"]; !ok {
   305  		firstRun = true
   306  		md.ChangeState["object_privileges"] = make(map[string]string)
   307  	}
   308  
   309  	mvp, ok := metricDefs.GetMetricDef("privilege_changes")
   310  	if !ok || mvp.GetSQL(int(md.Version)) == "" {
   311  		l.Warning("could not get SQL for 'privilege_changes'. cannot detect privilege changes")
   312  		return changeCounts
   313  	}
   314  
   315  	// returns rows of: object_type, tag_role, tag_object, privilege_type
   316  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   317  	if err != nil {
   318  		l.Error(err)
   319  		return changeCounts
   320  	}
   321  
   322  	currentState := make(map[string]bool)
   323  	for _, dr := range data {
   324  		objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
   325  		ll := l.WithField("role", dr["tag_role"]).
   326  			WithField("object_type", dr["object_type"]).
   327  			WithField("object", dr["tag_object"]).
   328  			WithField("privilege_type", dr["privilege_type"])
   329  		if firstRun {
   330  			md.ChangeState["object_privileges"][objIdent] = ""
   331  		} else {
   332  			_, ok := md.ChangeState["object_privileges"][objIdent]
   333  			if !ok {
   334  				ll.Debug("new object privileges detected")
   335  				dr["event"] = "GRANT"
   336  				detectedChanges = append(detectedChanges, dr)
   337  				changeCounts.Created++
   338  				md.ChangeState["object_privileges"][objIdent] = ""
   339  			}
   340  			currentState[objIdent] = true
   341  		}
   342  	}
   343  	// check revokes - exists in old state only
   344  	if !firstRun && len(currentState) > 0 {
   345  		for objPrevRun := range md.ChangeState["object_privileges"] {
   346  			if _, ok := currentState[objPrevRun]; !ok {
   347  				splits := strings.Split(objPrevRun, "#:#")
   348  				l.WithField("role", splits[1]).
   349  					WithField("object_type", splits[0]).
   350  					WithField("object", splits[2]).
   351  					WithField("privilege_type", splits[3]).
   352  					Debug("removed object privileges detected")
   353  				revokeEntry := metrics.NewMeasurement(data.GetEpoch())
   354  				revokeEntry["object_type"] = splits[0]
   355  				revokeEntry["tag_role"] = splits[1]
   356  				revokeEntry["tag_object"] = splits[2]
   357  				revokeEntry["privilege_type"] = splits[3]
   358  				revokeEntry["event"] = "REVOKE"
   359  				detectedChanges = append(detectedChanges, revokeEntry)
   360  				changeCounts.Dropped++
   361  				delete(md.ChangeState["object_privileges"], objPrevRun)
   362  			}
   363  		}
   364  	}
   365  
   366  	l.Debugf("object privilege changes detected: %d", len(detectedChanges))
   367  	if len(detectedChanges) > 0 {
   368  		r.measurementCh <- metrics.MeasurementEnvelope{
   369  			DBName:     md.Name,
   370  			MetricName: "privilege_changes",
   371  			Data:       detectedChanges,
   372  			CustomTags: md.CustomTags,
   373  		}
   374  	}
   375  
   376  	return changeCounts
   377  }
   378  
   379  func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   380  	detectedChanges := make(metrics.Measurements, 0)
   381  	var firstRun bool
   382  	var changeCounts ChangeDetectionResults
   383  	l := log.GetLogger(ctx)
   384  	changeCounts.Target = "settings"
   385  	l.Debug("checking for configuration changes...")
   386  	if _, ok := md.ChangeState["configuration_hashes"]; !ok {
   387  		firstRun = true
   388  		md.ChangeState["configuration_hashes"] = make(map[string]string)
   389  	}
   390  
   391  	mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
   392  	if !ok {
   393  		l.Error("could not get configuration_hashes sql")
   394  		return changeCounts
   395  	}
   396  
   397  	rows, err := md.Conn.Query(ctx, mvp.GetSQL(md.Version))
   398  	if err != nil {
   399  		l.Error(err)
   400  		return changeCounts
   401  	}
   402  	defer rows.Close()
   403  	var (
   404  		objIdent, objValue string
   405  		epoch              int64
   406  	)
   407  	for rows.Next() {
   408  		if rows.Scan(&epoch, &objIdent, &objValue) != nil {
   409  			return changeCounts
   410  		}
   411  		prevРash, ok := md.ChangeState["configuration_hashes"][objIdent]
   412  		ll := l.WithField("setting", objIdent)
   413  		if ok { // we have existing state
   414  			if prevРash != objValue {
   415  				ll.Warningf("settings change detected: %s = %s (prev: %s)", objIdent, objValue, prevРash)
   416  				detectedChanges = append(detectedChanges, metrics.Measurement{
   417  					metrics.EpochColumnName: epoch,
   418  					"tag_setting":           objIdent,
   419  					"value":                 objValue,
   420  					"event":                 "alter"})
   421  				md.ChangeState["configuration_hashes"][objIdent] = objValue
   422  				changeCounts.Altered++
   423  			}
   424  		} else { // check for new, delete not relevant here (pg_upgrade)
   425  			md.ChangeState["configuration_hashes"][objIdent] = objValue
   426  			if firstRun {
   427  				continue
   428  			}
   429  			ll.Debug("new setting detected")
   430  			detectedChanges = append(detectedChanges, metrics.Measurement{
   431  				metrics.EpochColumnName: epoch,
   432  				"tag_setting":           objIdent,
   433  				"value":                 objValue,
   434  				"event":                 "create"})
   435  			changeCounts.Created++
   436  		}
   437  	}
   438  
   439  	l.Debugf("configuration changes detected: %d", len(detectedChanges))
   440  	if len(detectedChanges) > 0 {
   441  		r.measurementCh <- metrics.MeasurementEnvelope{
   442  			DBName:     md.Name,
   443  			MetricName: "configuration_changes",
   444  			Data:       detectedChanges,
   445  			CustomTags: md.CustomTags,
   446  		}
   447  	}
   448  	return changeCounts
   449  }
   450  
   451  // GetInstanceUpMeasurement returns a single measurement with "instance_up" metric
   452  // used to detect if the instance is up or down
   453  func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
   454  	err := md.Conn.Ping(ctx)
   455  	return metrics.Measurements{
   456  		metrics.Measurement{
   457  			metrics.EpochColumnName: time.Now().UnixNano(),
   458  			"instance_up": func() int {
   459  				if err == nil {
   460  					return 1
   461  				}
   462  				return 0
   463  			}(), // true if connection is up
   464  		},
   465  	}, err
   466  }
   467  
   468  func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
   469  	md.Lock()
   470  	defer md.Unlock()
   471  
   472  	spN := r.DetectSprocChanges(ctx, md)
   473  	tblN := r.DetectTableChanges(ctx, md)
   474  	idxN := r.DetectIndexChanges(ctx, md)
   475  	cnfN := r.DetectConfigurationChanges(ctx, md)
   476  	privN := r.DetectPrivilegeChanges(ctx, md)
   477  
   478  	if spN.Total()+tblN.Total()+idxN.Total()+cnfN.Total()+privN.Total() == 0 {
   479  		return nil, nil
   480  	}
   481  
   482  	m := metrics.NewMeasurement(time.Now().UnixNano())
   483  	m["details"] = strings.Join([]string{spN.String(), tblN.String(), idxN.String(), cnfN.String(), privN.String()}, " ")
   484  	return metrics.Measurements{m}, nil
   485  }
   486  
   487  // Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
   488  // With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
   489  // whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
   490  func TryCreateMissingExtensions(ctx context.Context, md *sources.SourceConn, extensionNames []string, existingExtensions map[string]int) []string {
   491  	// TODO: move to sources package and use direct pgx connection
   492  	sqlAvailable := `select name::text from pg_available_extensions`
   493  	extsCreated := make([]string, 0)
   494  
   495  	// For security reasons don't allow to execute random strings but check that it's an existing extension
   496  	data, err := QueryMeasurements(ctx, md, sqlAvailable)
   497  	if err != nil {
   498  		log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", md, err)
   499  		return extsCreated
   500  	}
   501  
   502  	availableExts := make(map[string]bool)
   503  	for _, row := range data {
   504  		availableExts[row["name"].(string)] = true
   505  	}
   506  
   507  	for _, extToCreate := range extensionNames {
   508  		if _, ok := existingExtensions[extToCreate]; ok {
   509  			continue
   510  		}
   511  		_, ok := availableExts[extToCreate]
   512  		if !ok {
   513  			log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", md, extToCreate)
   514  		} else {
   515  			sqlCreateExt := `create extension ` + extToCreate
   516  			_, err := QueryMeasurements(ctx, md, sqlCreateExt)
   517  			if err != nil {
   518  				log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", md, extToCreate, err)
   519  			}
   520  			extsCreated = append(extsCreated, extToCreate)
   521  		}
   522  	}
   523  
   524  	return extsCreated
   525  }
   526  
   527  // Called once on daemon startup to try to create "metric fething helper" functions automatically
   528  func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
   529  	sl := log.GetLogger(ctx).WithField("source", md.Name)
   530  	metrics := maps.Clone(md.Metrics)
   531  	maps.Insert(metrics, maps.All(md.MetricsStandby))
   532  	for metricName := range metrics {
   533  		metric, ok := metricDefs.GetMetricDef(metricName)
   534  		if !ok {
   535  			continue
   536  		}
   537  		if _, err = md.Conn.Exec(ctx, metric.InitSQL); err != nil {
   538  			return
   539  		}
   540  		sl.WithField("metric", metricName).Info("Successfully created metric fetching helpers")
   541  	}
   542  	return
   543  }
   544  
   545  func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) {
   546  	for _, prevDB := range r.prevLoopMonitoredDBs {
   547  		if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil { // removed from config
   548  			prevDB.Conn.Close()
   549  			_ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
   550  		}
   551  	}
   552  
   553  	for roleChangedDB := range shutDownDueToRoleChange {
   554  		if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil {
   555  			db.Conn.Close()
   556  		}
   557  		_ = r.SinksWriter.SyncMetric(roleChangedDB, "", sinks.DeleteOp)
   558  	}
   559  }
   560