...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/reaper/database.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"maps"
     8  	"strings"
     9  	"time"
    10  
    11  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    12  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    13  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    15  	"github.com/jackc/pgx/v5"
    16  )
    17  
    18  func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
    19  	// TODO: move to sources package and use direct pgx connection
    20  	var conn db.PgxIface
    21  	var md *sources.SourceConn
    22  	var err error
    23  	var tx pgx.Tx
    24  	if strings.TrimSpace(sql) == "" {
    25  		return nil, errors.New("empty SQL")
    26  	}
    27  	if md, err = GetMonitoredDatabaseByUniqueName(ctx, dbUnique); err != nil {
    28  		return nil, err
    29  	}
    30  	conn = md.Conn
    31  	if md.IsPostgresSource() {
    32  		// we don't want transaction for non-postgres sources, e.g. pgbouncer
    33  		if tx, err = conn.Begin(ctx); err != nil {
    34  			return nil, err
    35  		}
    36  		defer func() { _ = tx.Commit(ctx) }()
    37  		_, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
    38  		if err != nil {
    39  			return nil, err
    40  		}
    41  		conn = tx
    42  	} else {
    43  		// we want simple protocol for non-postgres connections, e.g. pgpool
    44  		args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
    45  	}
    46  	rows, err := conn.Query(ctx, sql, args...)
    47  	if err == nil {
    48  		return pgx.CollectRows(rows, metrics.RowToMeasurement)
    49  	}
    50  	return nil, err
    51  }
    52  
    53  func DetectSprocChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
    54  	detectedChanges := make(metrics.Measurements, 0)
    55  	var firstRun bool
    56  	var changeCounts ChangeDetectionResults
    57  
    58  	log.GetLogger(ctx).Debugf("[%s][%s] checking for sproc changes...", md.Name, specialMetricChangeEvents)
    59  	if _, ok := hostState["sproc_hashes"]; !ok {
    60  		firstRun = true
    61  		hostState["sproc_hashes"] = make(map[string]string)
    62  	}
    63  
    64  	mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
    65  	if !ok {
    66  		log.GetLogger(ctx).Error("could not get sproc_hashes sql")
    67  		return changeCounts
    68  	}
    69  
    70  	data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
    71  	if err != nil {
    72  		log.GetLogger(ctx).Error("could not read sproc_hashes from monitored host: ", md.Name, ", err:", err)
    73  		return changeCounts
    74  	}
    75  
    76  	for _, dr := range data {
    77  		objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
    78  		prevHash, ok := hostState["sproc_hashes"][objIdent]
    79  		if ok { // we have existing state
    80  			if prevHash != dr["md5"].(string) {
    81  				log.GetLogger(ctx).Info("detected change in sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
    82  				dr["event"] = "alter"
    83  				detectedChanges = append(detectedChanges, dr)
    84  				hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
    85  				changeCounts.Altered++
    86  			}
    87  		} else { // check for new / delete
    88  			if !firstRun {
    89  				log.GetLogger(ctx).Info("detected new sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
    90  				dr["event"] = "create"
    91  				detectedChanges = append(detectedChanges, dr)
    92  				changeCounts.Created++
    93  			}
    94  			hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
    95  		}
    96  	}
    97  	// detect deletes
    98  	if !firstRun && len(hostState["sproc_hashes"]) != len(data) {
    99  		deletedSProcs := make([]string, 0)
   100  		// turn resultset to map => [oid]=true for faster checks
   101  		currentOidMap := make(map[string]bool)
   102  		for _, dr := range data {
   103  			currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
   104  		}
   105  		for sprocIdent := range hostState["sproc_hashes"] {
   106  			_, ok := currentOidMap[sprocIdent]
   107  			if !ok {
   108  				splits := strings.Split(sprocIdent, dbMetricJoinStr)
   109  				log.GetLogger(ctx).Info("detected delete of sproc:", splits[0], ", oid:", splits[1])
   110  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   111  				influxEntry["event"] = "drop"
   112  				influxEntry["tag_sproc"] = splits[0]
   113  				influxEntry["tag_oid"] = splits[1]
   114  				detectedChanges = append(detectedChanges, influxEntry)
   115  				deletedSProcs = append(deletedSProcs, sprocIdent)
   116  				changeCounts.Dropped++
   117  			}
   118  		}
   119  		for _, deletedSProc := range deletedSProcs {
   120  			delete(hostState["sproc_hashes"], deletedSProc)
   121  		}
   122  	}
   123  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d sproc changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
   124  	if len(detectedChanges) > 0 {
   125  		storageCh <- []metrics.MeasurementEnvelope{{DBName: md.Name, MetricName: "sproc_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
   126  	}
   127  
   128  	return changeCounts
   129  }
   130  
   131  func DetectTableChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   132  	detectedChanges := make(metrics.Measurements, 0)
   133  	var firstRun bool
   134  	var changeCounts ChangeDetectionResults
   135  
   136  	log.GetLogger(ctx).Debugf("[%s][%s] checking for table changes...", md.Name, specialMetricChangeEvents)
   137  	if _, ok := hostState["table_hashes"]; !ok {
   138  		firstRun = true
   139  		hostState["table_hashes"] = make(map[string]string)
   140  	}
   141  
   142  	mvp, ok := metricDefs.GetMetricDef("table_hashes")
   143  	if !ok {
   144  		log.GetLogger(ctx).Error("could not get table_hashes sql")
   145  		return changeCounts
   146  	}
   147  
   148  	data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
   149  	if err != nil {
   150  		log.GetLogger(ctx).Error("could not read table_hashes from monitored host:", md.Name, ", err:", err)
   151  		return changeCounts
   152  	}
   153  
   154  	for _, dr := range data {
   155  		objIdent := dr["tag_table"].(string)
   156  		prevHash, ok := hostState["table_hashes"][objIdent]
   157  		//log.Debug("inspecting table:", objIdent, "hash:", prev_hash)
   158  		if ok { // we have existing state
   159  			if prevHash != dr["md5"].(string) {
   160  				log.GetLogger(ctx).Info("detected DDL change in table:", dr["tag_table"])
   161  				dr["event"] = "alter"
   162  				detectedChanges = append(detectedChanges, dr)
   163  				hostState["table_hashes"][objIdent] = dr["md5"].(string)
   164  				changeCounts.Altered++
   165  			}
   166  		} else { // check for new / delete
   167  			if !firstRun {
   168  				log.GetLogger(ctx).Info("detected new table:", dr["tag_table"])
   169  				dr["event"] = "create"
   170  				detectedChanges = append(detectedChanges, dr)
   171  				changeCounts.Created++
   172  			}
   173  			hostState["table_hashes"][objIdent] = dr["md5"].(string)
   174  		}
   175  	}
   176  	// detect deletes
   177  	if !firstRun && len(hostState["table_hashes"]) != len(data) {
   178  		deletedTables := make([]string, 0)
   179  		// turn resultset to map => [table]=true for faster checks
   180  		currentTableMap := make(map[string]bool)
   181  		for _, dr := range data {
   182  			currentTableMap[dr["tag_table"].(string)] = true
   183  		}
   184  		for table := range hostState["table_hashes"] {
   185  			_, ok := currentTableMap[table]
   186  			if !ok {
   187  				log.GetLogger(ctx).Info("detected drop of table:", table)
   188  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   189  				influxEntry["event"] = "drop"
   190  				influxEntry["tag_table"] = table
   191  				detectedChanges = append(detectedChanges, influxEntry)
   192  				deletedTables = append(deletedTables, table)
   193  				changeCounts.Dropped++
   194  			}
   195  		}
   196  		for _, deletedTable := range deletedTables {
   197  			delete(hostState["table_hashes"], deletedTable)
   198  		}
   199  	}
   200  
   201  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d table changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
   202  	if len(detectedChanges) > 0 {
   203  		storageCh <- []metrics.MeasurementEnvelope{{DBName: md.Name, MetricName: "table_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
   204  	}
   205  
   206  	return changeCounts
   207  }
   208  
   209  func DetectIndexChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   210  	detectedChanges := make(metrics.Measurements, 0)
   211  	var firstRun bool
   212  	var changeCounts ChangeDetectionResults
   213  
   214  	log.GetLogger(ctx).Debugf("[%s][%s] checking for index changes...", md.Name, specialMetricChangeEvents)
   215  	if _, ok := hostState["index_hashes"]; !ok {
   216  		firstRun = true
   217  		hostState["index_hashes"] = make(map[string]string)
   218  	}
   219  
   220  	mvp, ok := metricDefs.GetMetricDef("index_hashes")
   221  	if !ok {
   222  		log.GetLogger(ctx).Error("could not get index_hashes sql")
   223  		return changeCounts
   224  	}
   225  
   226  	data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
   227  	if err != nil {
   228  		log.GetLogger(ctx).Error("could not read index_hashes from monitored host:", md.Name, ", err:", err)
   229  		return changeCounts
   230  	}
   231  
   232  	for _, dr := range data {
   233  		objIdent := dr["tag_index"].(string)
   234  		prevHash, ok := hostState["index_hashes"][objIdent]
   235  		if ok { // we have existing state
   236  			if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
   237  				log.GetLogger(ctx).Info("detected index change:", dr["tag_index"], ", table:", dr["table"])
   238  				dr["event"] = "alter"
   239  				detectedChanges = append(detectedChanges, dr)
   240  				hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   241  				changeCounts.Altered++
   242  			}
   243  		} else { // check for new / delete
   244  			if !firstRun {
   245  				log.GetLogger(ctx).Info("detected new index:", dr["tag_index"])
   246  				dr["event"] = "create"
   247  				detectedChanges = append(detectedChanges, dr)
   248  				changeCounts.Created++
   249  			}
   250  			hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   251  		}
   252  	}
   253  	// detect deletes
   254  	if !firstRun && len(hostState["index_hashes"]) != len(data) {
   255  		deletedIndexes := make([]string, 0)
   256  		// turn resultset to map => [table]=true for faster checks
   257  		currentIndexMap := make(map[string]bool)
   258  		for _, dr := range data {
   259  			currentIndexMap[dr["tag_index"].(string)] = true
   260  		}
   261  		for indexName := range hostState["index_hashes"] {
   262  			_, ok := currentIndexMap[indexName]
   263  			if !ok {
   264  				log.GetLogger(ctx).Info("detected drop of index_name:", indexName)
   265  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   266  				influxEntry["event"] = "drop"
   267  				influxEntry["tag_index"] = indexName
   268  				detectedChanges = append(detectedChanges, influxEntry)
   269  				deletedIndexes = append(deletedIndexes, indexName)
   270  				changeCounts.Dropped++
   271  			}
   272  		}
   273  		for _, deletedIndex := range deletedIndexes {
   274  			delete(hostState["index_hashes"], deletedIndex)
   275  		}
   276  	}
   277  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d index changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
   278  	if len(detectedChanges) > 0 {
   279  		storageCh <- []metrics.MeasurementEnvelope{{DBName: md.Name, MetricName: "index_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
   280  	}
   281  
   282  	return changeCounts
   283  }
   284  
   285  func DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   286  	detectedChanges := make(metrics.Measurements, 0)
   287  	var firstRun bool
   288  	var changeCounts ChangeDetectionResults
   289  
   290  	log.GetLogger(ctx).Debugf("[%s][%s] checking object privilege changes...", md.Name, specialMetricChangeEvents)
   291  	if _, ok := hostState["object_privileges"]; !ok {
   292  		firstRun = true
   293  		hostState["object_privileges"] = make(map[string]string)
   294  	}
   295  
   296  	mvp, ok := metricDefs.GetMetricDef("privilege_changes")
   297  	if !ok || mvp.GetSQL(int(md.Version)) == "" {
   298  		log.GetLogger(ctx).Warningf("[%s][%s] could not get SQL for 'privilege_changes'. cannot detect privilege changes", md.Name, specialMetricChangeEvents)
   299  		return changeCounts
   300  	}
   301  
   302  	// returns rows of: object_type, tag_role, tag_object, privilege_type
   303  	data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
   304  	if err != nil {
   305  		log.GetLogger(ctx).Errorf("[%s][%s] failed to fetch object privileges info: %v", md.Name, specialMetricChangeEvents, err)
   306  		return changeCounts
   307  	}
   308  
   309  	currentState := make(map[string]bool)
   310  	for _, dr := range data {
   311  		objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
   312  		if firstRun {
   313  			hostState["object_privileges"][objIdent] = ""
   314  		} else {
   315  			_, ok := hostState["object_privileges"][objIdent]
   316  			if !ok {
   317  				log.GetLogger(ctx).Infof("[%s][%s] detected new object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
   318  					md.Name, specialMetricChangeEvents, dr["tag_role"], dr["object_type"], dr["tag_object"], dr["privilege_type"])
   319  				dr["event"] = "GRANT"
   320  				detectedChanges = append(detectedChanges, dr)
   321  				changeCounts.Created++
   322  				hostState["object_privileges"][objIdent] = ""
   323  			}
   324  			currentState[objIdent] = true
   325  		}
   326  	}
   327  	// check revokes - exists in old state only
   328  	if !firstRun && len(currentState) > 0 {
   329  		for objPrevRun := range hostState["object_privileges"] {
   330  			if _, ok := currentState[objPrevRun]; !ok {
   331  				splits := strings.Split(objPrevRun, "#:#")
   332  				log.GetLogger(ctx).Infof("[%s][%s] detected removed object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
   333  					md.Name, specialMetricChangeEvents, splits[1], splits[0], splits[2], splits[3])
   334  				revokeEntry := metrics.NewMeasurement(data.GetEpoch())
   335  				revokeEntry["object_type"] = splits[0]
   336  				revokeEntry["tag_role"] = splits[1]
   337  				revokeEntry["tag_object"] = splits[2]
   338  				revokeEntry["privilege_type"] = splits[3]
   339  				revokeEntry["event"] = "REVOKE"
   340  				detectedChanges = append(detectedChanges, revokeEntry)
   341  				changeCounts.Dropped++
   342  				delete(hostState["object_privileges"], objPrevRun)
   343  			}
   344  		}
   345  	}
   346  
   347  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d object privilege changes...", md.Name, specialMetricChangeEvents, len(detectedChanges))
   348  	if len(detectedChanges) > 0 {
   349  		storageCh <- []metrics.MeasurementEnvelope{
   350  			{
   351  				DBName:     md.Name,
   352  				MetricName: "privilege_changes",
   353  				Data:       detectedChanges,
   354  				CustomTags: md.CustomTags,
   355  			}}
   356  	}
   357  
   358  	return changeCounts
   359  }
   360  
   361  func DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   362  	detectedChanges := make(metrics.Measurements, 0)
   363  	var firstRun bool
   364  	var changeCounts ChangeDetectionResults
   365  
   366  	log.GetLogger(ctx).Debugf("[%s][%s] checking for configuration changes...", md.Name, specialMetricChangeEvents)
   367  	if _, ok := hostState["configuration_hashes"]; !ok {
   368  		firstRun = true
   369  		hostState["configuration_hashes"] = make(map[string]string)
   370  	}
   371  
   372  	mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
   373  	if !ok {
   374  		log.GetLogger(ctx).Errorf("[%s][%s] could not get configuration_hashes sql", md.Name, specialMetricChangeEvents)
   375  		return changeCounts
   376  	}
   377  
   378  	data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
   379  	if err != nil {
   380  		log.GetLogger(ctx).Errorf("[%s][%s] could not read configuration_hashes from monitored host: %v", md.Name, specialMetricChangeEvents, err)
   381  		return changeCounts
   382  	}
   383  
   384  	for _, dr := range data {
   385  		objIdent := dr["tag_setting"].(string)
   386  		objValue := dr["value"].(string)
   387  		prevРash, ok := hostState["configuration_hashes"][objIdent]
   388  		if ok { // we have existing state
   389  			if prevРash != objValue {
   390  				if objIdent == "connection_ID" {
   391  					continue // ignore some weird Azure managed PG service setting
   392  				}
   393  				log.GetLogger(ctx).Warningf("[%s][%s] detected settings change: %s = %s (prev: %s)",
   394  					md.Name, specialMetricChangeEvents, objIdent, objValue, prevРash)
   395  				dr["event"] = "alter"
   396  				detectedChanges = append(detectedChanges, dr)
   397  				hostState["configuration_hashes"][objIdent] = objValue
   398  				changeCounts.Altered++
   399  			}
   400  		} else { // check for new, delete not relevant here (pg_upgrade)
   401  			if !firstRun {
   402  				log.GetLogger(ctx).Warningf("[%s][%s] detected new setting: %s", md.Name, specialMetricChangeEvents, objIdent)
   403  				dr["event"] = "create"
   404  				detectedChanges = append(detectedChanges, dr)
   405  				changeCounts.Created++
   406  			}
   407  			hostState["configuration_hashes"][objIdent] = objValue
   408  		}
   409  	}
   410  
   411  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d configuration changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
   412  	if len(detectedChanges) > 0 {
   413  		storageCh <- []metrics.MeasurementEnvelope{{
   414  			DBName:     md.Name,
   415  			MetricName: "configuration_changes",
   416  			Data:       detectedChanges,
   417  			CustomTags: md.CustomTags,
   418  		}}
   419  	}
   420  
   421  	return changeCounts
   422  }
   423  
   424  func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, md *sources.SourceConn, hostState map[string]map[string]string) {
   425  	storageCh := r.measurementCh
   426  	sprocCounts := DetectSprocChanges(ctx, md, storageCh, hostState) // TODO some of Detect*() code could be unified...
   427  	tableCounts := DetectTableChanges(ctx, md, storageCh, hostState)
   428  	indexCounts := DetectIndexChanges(ctx, md, storageCh, hostState)
   429  	confCounts := DetectConfigurationChanges(ctx, md, storageCh, hostState)
   430  	privChangeCounts := DetectPrivilegeChanges(ctx, md, storageCh, hostState)
   431  
   432  	// need to send info on all object changes as one message as Grafana applies "last wins" for annotations with similar timestamp
   433  	message := ""
   434  	if sprocCounts.Altered > 0 || sprocCounts.Created > 0 || sprocCounts.Dropped > 0 {
   435  		message += fmt.Sprintf(" sprocs %d/%d/%d", sprocCounts.Created, sprocCounts.Altered, sprocCounts.Dropped)
   436  	}
   437  	if tableCounts.Altered > 0 || tableCounts.Created > 0 || tableCounts.Dropped > 0 {
   438  		message += fmt.Sprintf(" tables/views %d/%d/%d", tableCounts.Created, tableCounts.Altered, tableCounts.Dropped)
   439  	}
   440  	if indexCounts.Altered > 0 || indexCounts.Created > 0 || indexCounts.Dropped > 0 {
   441  		message += fmt.Sprintf(" indexes %d/%d/%d", indexCounts.Created, indexCounts.Altered, indexCounts.Dropped)
   442  	}
   443  	if confCounts.Altered > 0 || confCounts.Created > 0 {
   444  		message += fmt.Sprintf(" configuration %d/%d/%d", confCounts.Created, confCounts.Altered, confCounts.Dropped)
   445  	}
   446  	if privChangeCounts.Dropped > 0 || privChangeCounts.Created > 0 {
   447  		message += fmt.Sprintf(" privileges %d/%d/%d", privChangeCounts.Created, privChangeCounts.Altered, privChangeCounts.Dropped)
   448  	}
   449  
   450  	if message > "" {
   451  		message = "Detected changes for \"" + dbUnique + "\" [Created/Altered/Dropped]:" + message
   452  		log.GetLogger(ctx).Info(message)
   453  		detectedChangesSummary := make(metrics.Measurements, 0)
   454  		influxEntry := metrics.NewMeasurement(time.Now().UnixNano())
   455  		influxEntry["details"] = message
   456  		detectedChangesSummary = append(detectedChangesSummary, influxEntry)
   457  		md, _ := GetMonitoredDatabaseByUniqueName(ctx, dbUnique)
   458  		storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique,
   459  			SourceType: string(md.Kind),
   460  			MetricName: "object_changes",
   461  			Data:       detectedChangesSummary,
   462  			CustomTags: md.CustomTags,
   463  		}}
   464  
   465  	}
   466  }
   467  
   468  // Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
   469  // With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
   470  // whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
   471  func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string {
   472  	// TODO: move to sources package and use direct pgx connection
   473  	sqlAvailable := `select name::text from pg_available_extensions`
   474  	extsCreated := make([]string, 0)
   475  
   476  	// For security reasons don't allow to execute random strings but check that it's an existing extension
   477  	data, err := QueryMeasurements(ctx, dbUnique, sqlAvailable)
   478  	if err != nil {
   479  		log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", dbUnique, err)
   480  		return extsCreated
   481  	}
   482  
   483  	availableExts := make(map[string]bool)
   484  	for _, row := range data {
   485  		availableExts[row["name"].(string)] = true
   486  	}
   487  
   488  	for _, extToCreate := range extensionNames {
   489  		if _, ok := existingExtensions[extToCreate]; ok {
   490  			continue
   491  		}
   492  		_, ok := availableExts[extToCreate]
   493  		if !ok {
   494  			log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", dbUnique, extToCreate)
   495  		} else {
   496  			sqlCreateExt := `create extension ` + extToCreate
   497  			_, err := QueryMeasurements(ctx, dbUnique, sqlCreateExt)
   498  			if err != nil {
   499  				log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", dbUnique, extToCreate, err)
   500  			}
   501  			extsCreated = append(extsCreated, extToCreate)
   502  		}
   503  	}
   504  
   505  	return extsCreated
   506  }
   507  
   508  // Called once on daemon startup to try to create "metric fething helper" functions automatically
   509  func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
   510  	sl := log.GetLogger(ctx).WithField("source", md.Name)
   511  	metrics := maps.Clone(md.Metrics)
   512  	maps.Insert(metrics, maps.All(md.MetricsStandby))
   513  	for metricName := range metrics {
   514  		metric, ok := metricDefs.GetMetricDef(metricName)
   515  		if !ok {
   516  			continue
   517  		}
   518  		if _, err = md.Conn.Exec(ctx, metric.InitSQL); err != nil {
   519  			return
   520  		}
   521  		sl.WithField("metric", metricName).Info("Successfully created metric fetching helpers")
   522  	}
   523  	return
   524  }
   525  
   526  func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) {
   527  	for _, prevDB := range r.prevLoopMonitoredDBs {
   528  		if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil { // removed from config
   529  			prevDB.Conn.Close()
   530  			_ = r.SinksWriter.SyncMetric(prevDB.Name, "", "remove")
   531  		}
   532  	}
   533  
   534  	for roleChangedDB := range shutDownDueToRoleChange {
   535  		if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil {
   536  			db.Conn.Close()
   537  		}
   538  		_ = r.SinksWriter.SyncMetric(roleChangedDB, "", "remove")
   539  	}
   540  }
   541