...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/reaper/database.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"maps"
     8  	"strings"
     9  	"time"
    10  
    11  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    12  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    13  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    16  	"github.com/jackc/pgx/v5"
    17  )
    18  
    19  func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
    20  	var conn db.PgxIface
    21  	var err error
    22  	var tx pgx.Tx
    23  	if strings.TrimSpace(sql) == "" {
    24  		return nil, errors.New("empty SQL")
    25  	}
    26  
    27  	conn = md.Conn
    28  	if md.IsPostgresSource() {
    29  		// we don't want transaction for non-postgres sources, e.g. pgbouncer
    30  		if tx, err = conn.Begin(ctx); err != nil {
    31  			return nil, err
    32  		}
    33  		defer func() { _ = tx.Commit(ctx) }()
    34  		_, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
    35  		if err != nil {
    36  			return nil, err
    37  		}
    38  		conn = tx
    39  	} else {
    40  		// we want simple protocol for non-postgres connections, e.g. pgpool
    41  		args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
    42  	}
    43  	rows, err := conn.Query(ctx, sql, args...)
    44  	if err == nil {
    45  		return pgx.CollectRows(rows, metrics.RowToMeasurement)
    46  	}
    47  	return nil, err
    48  }
    49  
    50  func DetectSprocChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
    51  	detectedChanges := make(metrics.Measurements, 0)
    52  	var firstRun bool
    53  	var changeCounts ChangeDetectionResults
    54  
    55  	log.GetLogger(ctx).Debugf("[%s][%s] checking for sproc changes...", md.Name, specialMetricChangeEvents)
    56  	if _, ok := hostState["sproc_hashes"]; !ok {
    57  		firstRun = true
    58  		hostState["sproc_hashes"] = make(map[string]string)
    59  	}
    60  
    61  	mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
    62  	if !ok {
    63  		log.GetLogger(ctx).Error("could not get sproc_hashes sql")
    64  		return changeCounts
    65  	}
    66  
    67  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
    68  	if err != nil {
    69  		log.GetLogger(ctx).Error("could not read sproc_hashes from monitored host: ", md.Name, ", err:", err)
    70  		return changeCounts
    71  	}
    72  
    73  	for _, dr := range data {
    74  		objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
    75  		prevHash, ok := hostState["sproc_hashes"][objIdent]
    76  		if ok { // we have existing state
    77  			if prevHash != dr["md5"].(string) {
    78  				log.GetLogger(ctx).Info("detected change in sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
    79  				dr["event"] = "alter"
    80  				detectedChanges = append(detectedChanges, dr)
    81  				hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
    82  				changeCounts.Altered++
    83  			}
    84  		} else { // check for new / delete
    85  			if !firstRun {
    86  				log.GetLogger(ctx).Info("detected new sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
    87  				dr["event"] = "create"
    88  				detectedChanges = append(detectedChanges, dr)
    89  				changeCounts.Created++
    90  			}
    91  			hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
    92  		}
    93  	}
    94  	// detect deletes
    95  	if !firstRun && len(hostState["sproc_hashes"]) != len(data) {
    96  		deletedSProcs := make([]string, 0)
    97  		// turn resultset to map => [oid]=true for faster checks
    98  		currentOidMap := make(map[string]bool)
    99  		for _, dr := range data {
   100  			currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
   101  		}
   102  		for sprocIdent := range hostState["sproc_hashes"] {
   103  			_, ok := currentOidMap[sprocIdent]
   104  			if !ok {
   105  				splits := strings.Split(sprocIdent, dbMetricJoinStr)
   106  				log.GetLogger(ctx).Info("detected delete of sproc:", splits[0], ", oid:", splits[1])
   107  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   108  				influxEntry["event"] = "drop"
   109  				influxEntry["tag_sproc"] = splits[0]
   110  				influxEntry["tag_oid"] = splits[1]
   111  				detectedChanges = append(detectedChanges, influxEntry)
   112  				deletedSProcs = append(deletedSProcs, sprocIdent)
   113  				changeCounts.Dropped++
   114  			}
   115  		}
   116  		for _, deletedSProc := range deletedSProcs {
   117  			delete(hostState["sproc_hashes"], deletedSProc)
   118  		}
   119  	}
   120  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d sproc changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
   121  	if len(detectedChanges) > 0 {
   122  		storageCh <- metrics.MeasurementEnvelope{
   123  			DBName:     md.Name,
   124  			MetricName: "sproc_changes",
   125  			Data:       detectedChanges,
   126  			CustomTags: md.CustomTags,
   127  		}
   128  	}
   129  
   130  	return changeCounts
   131  }
   132  
   133  func DetectTableChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   134  	detectedChanges := make(metrics.Measurements, 0)
   135  	var firstRun bool
   136  	var changeCounts ChangeDetectionResults
   137  
   138  	log.GetLogger(ctx).Debugf("[%s][%s] checking for table changes...", md.Name, specialMetricChangeEvents)
   139  	if _, ok := hostState["table_hashes"]; !ok {
   140  		firstRun = true
   141  		hostState["table_hashes"] = make(map[string]string)
   142  	}
   143  
   144  	mvp, ok := metricDefs.GetMetricDef("table_hashes")
   145  	if !ok {
   146  		log.GetLogger(ctx).Error("could not get table_hashes sql")
   147  		return changeCounts
   148  	}
   149  
   150  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   151  	if err != nil {
   152  		log.GetLogger(ctx).Error("could not read table_hashes from monitored host:", md.Name, ", err:", err)
   153  		return changeCounts
   154  	}
   155  
   156  	for _, dr := range data {
   157  		objIdent := dr["tag_table"].(string)
   158  		prevHash, ok := hostState["table_hashes"][objIdent]
   159  		if ok { // we have existing state
   160  			if prevHash != dr["md5"].(string) {
   161  				log.GetLogger(ctx).Info("detected DDL change in table:", dr["tag_table"])
   162  				dr["event"] = "alter"
   163  				detectedChanges = append(detectedChanges, dr)
   164  				hostState["table_hashes"][objIdent] = dr["md5"].(string)
   165  				changeCounts.Altered++
   166  			}
   167  		} else { // check for new / delete
   168  			if !firstRun {
   169  				log.GetLogger(ctx).Info("detected new table:", dr["tag_table"])
   170  				dr["event"] = "create"
   171  				detectedChanges = append(detectedChanges, dr)
   172  				changeCounts.Created++
   173  			}
   174  			hostState["table_hashes"][objIdent] = dr["md5"].(string)
   175  		}
   176  	}
   177  	// detect deletes
   178  	if !firstRun && len(hostState["table_hashes"]) != len(data) {
   179  		deletedTables := make([]string, 0)
   180  		// turn resultset to map => [table]=true for faster checks
   181  		currentTableMap := make(map[string]bool)
   182  		for _, dr := range data {
   183  			currentTableMap[dr["tag_table"].(string)] = true
   184  		}
   185  		for table := range hostState["table_hashes"] {
   186  			_, ok := currentTableMap[table]
   187  			if !ok {
   188  				log.GetLogger(ctx).Info("detected drop of table:", table)
   189  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   190  				influxEntry["event"] = "drop"
   191  				influxEntry["tag_table"] = table
   192  				detectedChanges = append(detectedChanges, influxEntry)
   193  				deletedTables = append(deletedTables, table)
   194  				changeCounts.Dropped++
   195  			}
   196  		}
   197  		for _, deletedTable := range deletedTables {
   198  			delete(hostState["table_hashes"], deletedTable)
   199  		}
   200  	}
   201  
   202  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d table changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
   203  	if len(detectedChanges) > 0 {
   204  		storageCh <- metrics.MeasurementEnvelope{
   205  			DBName:     md.Name,
   206  			MetricName: "table_changes",
   207  			Data:       detectedChanges,
   208  			CustomTags: md.CustomTags,
   209  		}
   210  	}
   211  
   212  	return changeCounts
   213  }
   214  
   215  func DetectIndexChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   216  	detectedChanges := make(metrics.Measurements, 0)
   217  	var firstRun bool
   218  	var changeCounts ChangeDetectionResults
   219  
   220  	log.GetLogger(ctx).Debugf("[%s][%s] checking for index changes...", md.Name, specialMetricChangeEvents)
   221  	if _, ok := hostState["index_hashes"]; !ok {
   222  		firstRun = true
   223  		hostState["index_hashes"] = make(map[string]string)
   224  	}
   225  
   226  	mvp, ok := metricDefs.GetMetricDef("index_hashes")
   227  	if !ok {
   228  		log.GetLogger(ctx).Error("could not get index_hashes sql")
   229  		return changeCounts
   230  	}
   231  
   232  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   233  	if err != nil {
   234  		log.GetLogger(ctx).Error("could not read index_hashes from monitored host:", md.Name, ", err:", err)
   235  		return changeCounts
   236  	}
   237  
   238  	for _, dr := range data {
   239  		objIdent := dr["tag_index"].(string)
   240  		prevHash, ok := hostState["index_hashes"][objIdent]
   241  		if ok { // we have existing state
   242  			if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
   243  				log.GetLogger(ctx).Info("detected index change:", dr["tag_index"], ", table:", dr["table"])
   244  				dr["event"] = "alter"
   245  				detectedChanges = append(detectedChanges, dr)
   246  				hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   247  				changeCounts.Altered++
   248  			}
   249  		} else { // check for new / delete
   250  			if !firstRun {
   251  				log.GetLogger(ctx).Info("detected new index:", dr["tag_index"])
   252  				dr["event"] = "create"
   253  				detectedChanges = append(detectedChanges, dr)
   254  				changeCounts.Created++
   255  			}
   256  			hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   257  		}
   258  	}
   259  	// detect deletes
   260  	if !firstRun && len(hostState["index_hashes"]) != len(data) {
   261  		deletedIndexes := make([]string, 0)
   262  		// turn resultset to map => [table]=true for faster checks
   263  		currentIndexMap := make(map[string]bool)
   264  		for _, dr := range data {
   265  			currentIndexMap[dr["tag_index"].(string)] = true
   266  		}
   267  		for indexName := range hostState["index_hashes"] {
   268  			_, ok := currentIndexMap[indexName]
   269  			if !ok {
   270  				log.GetLogger(ctx).Info("detected drop of index_name:", indexName)
   271  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   272  				influxEntry["event"] = "drop"
   273  				influxEntry["tag_index"] = indexName
   274  				detectedChanges = append(detectedChanges, influxEntry)
   275  				deletedIndexes = append(deletedIndexes, indexName)
   276  				changeCounts.Dropped++
   277  			}
   278  		}
   279  		for _, deletedIndex := range deletedIndexes {
   280  			delete(hostState["index_hashes"], deletedIndex)
   281  		}
   282  	}
   283  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d index changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
   284  	if len(detectedChanges) > 0 {
   285  		storageCh <- metrics.MeasurementEnvelope{
   286  			DBName:     md.Name,
   287  			MetricName: "index_changes",
   288  			Data:       detectedChanges,
   289  			CustomTags: md.CustomTags,
   290  		}
   291  	}
   292  
   293  	return changeCounts
   294  }
   295  
   296  func DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   297  	detectedChanges := make(metrics.Measurements, 0)
   298  	var firstRun bool
   299  	var changeCounts ChangeDetectionResults
   300  
   301  	log.GetLogger(ctx).Debugf("[%s][%s] checking object privilege changes...", md.Name, specialMetricChangeEvents)
   302  	if _, ok := hostState["object_privileges"]; !ok {
   303  		firstRun = true
   304  		hostState["object_privileges"] = make(map[string]string)
   305  	}
   306  
   307  	mvp, ok := metricDefs.GetMetricDef("privilege_changes")
   308  	if !ok || mvp.GetSQL(int(md.Version)) == "" {
   309  		log.GetLogger(ctx).Warningf("[%s][%s] could not get SQL for 'privilege_changes'. cannot detect privilege changes", md.Name, specialMetricChangeEvents)
   310  		return changeCounts
   311  	}
   312  
   313  	// returns rows of: object_type, tag_role, tag_object, privilege_type
   314  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   315  	if err != nil {
   316  		log.GetLogger(ctx).Errorf("[%s][%s] failed to fetch object privileges info: %v", md.Name, specialMetricChangeEvents, err)
   317  		return changeCounts
   318  	}
   319  
   320  	currentState := make(map[string]bool)
   321  	for _, dr := range data {
   322  		objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
   323  		if firstRun {
   324  			hostState["object_privileges"][objIdent] = ""
   325  		} else {
   326  			_, ok := hostState["object_privileges"][objIdent]
   327  			if !ok {
   328  				log.GetLogger(ctx).Infof("[%s][%s] detected new object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
   329  					md.Name, specialMetricChangeEvents, dr["tag_role"], dr["object_type"], dr["tag_object"], dr["privilege_type"])
   330  				dr["event"] = "GRANT"
   331  				detectedChanges = append(detectedChanges, dr)
   332  				changeCounts.Created++
   333  				hostState["object_privileges"][objIdent] = ""
   334  			}
   335  			currentState[objIdent] = true
   336  		}
   337  	}
   338  	// check revokes - exists in old state only
   339  	if !firstRun && len(currentState) > 0 {
   340  		for objPrevRun := range hostState["object_privileges"] {
   341  			if _, ok := currentState[objPrevRun]; !ok {
   342  				splits := strings.Split(objPrevRun, "#:#")
   343  				log.GetLogger(ctx).Infof("[%s][%s] detected removed object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
   344  					md.Name, specialMetricChangeEvents, splits[1], splits[0], splits[2], splits[3])
   345  				revokeEntry := metrics.NewMeasurement(data.GetEpoch())
   346  				revokeEntry["object_type"] = splits[0]
   347  				revokeEntry["tag_role"] = splits[1]
   348  				revokeEntry["tag_object"] = splits[2]
   349  				revokeEntry["privilege_type"] = splits[3]
   350  				revokeEntry["event"] = "REVOKE"
   351  				detectedChanges = append(detectedChanges, revokeEntry)
   352  				changeCounts.Dropped++
   353  				delete(hostState["object_privileges"], objPrevRun)
   354  			}
   355  		}
   356  	}
   357  
   358  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d object privilege changes...", md.Name, specialMetricChangeEvents, len(detectedChanges))
   359  	if len(detectedChanges) > 0 {
   360  		storageCh <- metrics.MeasurementEnvelope{
   361  			DBName:     md.Name,
   362  			MetricName: "privilege_changes",
   363  			Data:       detectedChanges,
   364  			CustomTags: md.CustomTags,
   365  		}
   366  	}
   367  
   368  	return changeCounts
   369  }
   370  
   371  func DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   372  	detectedChanges := make(metrics.Measurements, 0)
   373  	var firstRun bool
   374  	var changeCounts ChangeDetectionResults
   375  
   376  	logger := log.GetLogger(ctx).WithField("source", md.Name).WithField("metric", specialMetricChangeEvents)
   377  
   378  	if _, ok := hostState["configuration_hashes"]; !ok {
   379  		firstRun = true
   380  		hostState["configuration_hashes"] = make(map[string]string)
   381  	}
   382  
   383  	mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
   384  	if !ok {
   385  		logger.Error("could not get configuration_hashes sql")
   386  		return changeCounts
   387  	}
   388  
   389  	rows, err := md.Conn.Query(ctx, mvp.GetSQL(md.Version))
   390  	if err != nil {
   391  		logger.Error("could not read configuration_hashes from monitored host: ", err)
   392  		return changeCounts
   393  	}
   394  	defer rows.Close()
   395  	var (
   396  		objIdent, objValue string
   397  		epoch              int64
   398  	)
   399  	for rows.Next() {
   400  		if rows.Scan(&epoch, &objIdent, &objValue) != nil {
   401  			return changeCounts
   402  		}
   403  		prevРash, ok := hostState["configuration_hashes"][objIdent]
   404  		if ok { // we have existing state
   405  			if prevРash != objValue {
   406  				logger.Warningf("detected settings change: %s = %s (prev: %s)", objIdent, objValue, prevРash)
   407  				detectedChanges = append(detectedChanges, metrics.Measurement{
   408  					metrics.EpochColumnName: epoch,
   409  					"tag_setting":           objIdent,
   410  					"value":                 objValue,
   411  					"event":                 "alter"})
   412  				hostState["configuration_hashes"][objIdent] = objValue
   413  				changeCounts.Altered++
   414  			}
   415  		} else { // check for new, delete not relevant here (pg_upgrade)
   416  			hostState["configuration_hashes"][objIdent] = objValue
   417  			if firstRun {
   418  				continue
   419  			}
   420  			logger.Warning("detected new setting: ", objIdent)
   421  			detectedChanges = append(detectedChanges, metrics.Measurement{
   422  				metrics.EpochColumnName: epoch,
   423  				"tag_setting":           objIdent,
   424  				"value":                 objValue,
   425  				"event":                 "create"})
   426  			changeCounts.Created++
   427  		}
   428  	}
   429  
   430  	if len(detectedChanges) > 0 {
   431  		storageCh <- metrics.MeasurementEnvelope{
   432  			DBName:     md.Name,
   433  			MetricName: "configuration_changes",
   434  			Data:       detectedChanges,
   435  			CustomTags: md.CustomTags,
   436  		}
   437  	}
   438  	return changeCounts
   439  }
   440  
   441  // GetInstanceUpMeasurement returns a single measurement with "instance_up" metric
   442  // used to detect if the instance is up or down
   443  func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
   444  	err := md.Conn.Ping(ctx)
   445  	return metrics.Measurements{
   446  		metrics.Measurement{
   447  			metrics.EpochColumnName: time.Now().UnixNano(),
   448  			"instance_up": func() int {
   449  				if err == nil {
   450  					return 1
   451  				}
   452  				return 0
   453  			}(), // true if connection is up
   454  		},
   455  	}, err
   456  }
   457  
   458  func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, md *sources.SourceConn, hostState map[string]map[string]string) {
   459  	storageCh := r.measurementCh
   460  	sprocCounts := DetectSprocChanges(ctx, md, storageCh, hostState) // TODO some of Detect*() code could be unified...
   461  	tableCounts := DetectTableChanges(ctx, md, storageCh, hostState)
   462  	indexCounts := DetectIndexChanges(ctx, md, storageCh, hostState)
   463  	confCounts := DetectConfigurationChanges(ctx, md, storageCh, hostState)
   464  	privChangeCounts := DetectPrivilegeChanges(ctx, md, storageCh, hostState)
   465  
   466  	// need to send info on all object changes as one message as Grafana applies "last wins" for annotations with similar timestamp
   467  	message := ""
   468  	if sprocCounts.Altered > 0 || sprocCounts.Created > 0 || sprocCounts.Dropped > 0 {
   469  		message += fmt.Sprintf(" sprocs %d/%d/%d", sprocCounts.Created, sprocCounts.Altered, sprocCounts.Dropped)
   470  	}
   471  	if tableCounts.Altered > 0 || tableCounts.Created > 0 || tableCounts.Dropped > 0 {
   472  		message += fmt.Sprintf(" tables/views %d/%d/%d", tableCounts.Created, tableCounts.Altered, tableCounts.Dropped)
   473  	}
   474  	if indexCounts.Altered > 0 || indexCounts.Created > 0 || indexCounts.Dropped > 0 {
   475  		message += fmt.Sprintf(" indexes %d/%d/%d", indexCounts.Created, indexCounts.Altered, indexCounts.Dropped)
   476  	}
   477  	if confCounts.Altered > 0 || confCounts.Created > 0 {
   478  		message += fmt.Sprintf(" configuration %d/%d/%d", confCounts.Created, confCounts.Altered, confCounts.Dropped)
   479  	}
   480  	if privChangeCounts.Dropped > 0 || privChangeCounts.Created > 0 {
   481  		message += fmt.Sprintf(" privileges %d/%d/%d", privChangeCounts.Created, privChangeCounts.Altered, privChangeCounts.Dropped)
   482  	}
   483  
   484  	if message > "" {
   485  		message = "Detected changes for \"" + dbUnique + "\" [Created/Altered/Dropped]:" + message
   486  		log.GetLogger(ctx).Info(message)
   487  		detectedChangesSummary := make(metrics.Measurements, 0)
   488  		influxEntry := metrics.NewMeasurement(time.Now().UnixNano())
   489  		influxEntry["details"] = message
   490  		detectedChangesSummary = append(detectedChangesSummary, influxEntry)
   491  		storageCh <- metrics.MeasurementEnvelope{
   492  			DBName:     dbUnique,
   493  			MetricName: "object_changes",
   494  			Data:       detectedChangesSummary,
   495  			CustomTags: md.CustomTags,
   496  		}
   497  
   498  	}
   499  }
   500  
   501  // Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
   502  // With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
   503  // whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
   504  func TryCreateMissingExtensions(ctx context.Context, md *sources.SourceConn, extensionNames []string, existingExtensions map[string]int) []string {
   505  	// TODO: move to sources package and use direct pgx connection
   506  	sqlAvailable := `select name::text from pg_available_extensions`
   507  	extsCreated := make([]string, 0)
   508  
   509  	// For security reasons don't allow to execute random strings but check that it's an existing extension
   510  	data, err := QueryMeasurements(ctx, md, sqlAvailable)
   511  	if err != nil {
   512  		log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", md, err)
   513  		return extsCreated
   514  	}
   515  
   516  	availableExts := make(map[string]bool)
   517  	for _, row := range data {
   518  		availableExts[row["name"].(string)] = true
   519  	}
   520  
   521  	for _, extToCreate := range extensionNames {
   522  		if _, ok := existingExtensions[extToCreate]; ok {
   523  			continue
   524  		}
   525  		_, ok := availableExts[extToCreate]
   526  		if !ok {
   527  			log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", md, extToCreate)
   528  		} else {
   529  			sqlCreateExt := `create extension ` + extToCreate
   530  			_, err := QueryMeasurements(ctx, md, sqlCreateExt)
   531  			if err != nil {
   532  				log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", md, extToCreate, err)
   533  			}
   534  			extsCreated = append(extsCreated, extToCreate)
   535  		}
   536  	}
   537  
   538  	return extsCreated
   539  }
   540  
   541  // Called once on daemon startup to try to create "metric fething helper" functions automatically
   542  func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
   543  	sl := log.GetLogger(ctx).WithField("source", md.Name)
   544  	metrics := maps.Clone(md.Metrics)
   545  	maps.Insert(metrics, maps.All(md.MetricsStandby))
   546  	for metricName := range metrics {
   547  		metric, ok := metricDefs.GetMetricDef(metricName)
   548  		if !ok {
   549  			continue
   550  		}
   551  		if _, err = md.Conn.Exec(ctx, metric.InitSQL); err != nil {
   552  			return
   553  		}
   554  		sl.WithField("metric", metricName).Info("Successfully created metric fetching helpers")
   555  	}
   556  	return
   557  }
   558  
   559  func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) {
   560  	for _, prevDB := range r.prevLoopMonitoredDBs {
   561  		if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil { // removed from config
   562  			prevDB.Conn.Close()
   563  			_ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
   564  		}
   565  	}
   566  
   567  	for roleChangedDB := range shutDownDueToRoleChange {
   568  		if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil {
   569  			db.Conn.Close()
   570  		}
   571  		_ = r.SinksWriter.SyncMetric(roleChangedDB, "", sinks.DeleteOp)
   572  	}
   573  }
   574