...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/reaper/database.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"math"
     8  	"regexp"
     9  	"strconv"
    10  	"strings"
    11  	"sync"
    12  	"time"
    13  
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
    15  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    16  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    17  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
    18  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    19  	"github.com/jackc/pgx/v5"
    20  )
    21  
    22  func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
    23  	var conn db.PgxIface
    24  	var md *sources.SourceConn
    25  	var err error
    26  	var tx pgx.Tx
    27  	if strings.TrimSpace(sql) == "" {
    28  		return nil, errors.New("empty SQL")
    29  	}
    30  	if md, err = GetMonitoredDatabaseByUniqueName(dbUnique); err != nil {
    31  		return nil, err
    32  	}
    33  	conn = md.Conn
    34  	if md.IsPostgresSource() {
    35  		// we don't want transaction for non-postgres sources, e.g. pgbouncer
    36  		if tx, err = conn.Begin(ctx); err != nil {
    37  			return nil, err
    38  		}
    39  		defer func() { _ = tx.Commit(ctx) }()
    40  		_, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
    41  		if err != nil {
    42  			return nil, err
    43  		}
    44  		conn = tx
    45  	}
    46  	rows, err := conn.Query(ctx, sql, args...)
    47  	if err == nil {
    48  		return pgx.CollectRows(rows, pgx.RowToMap)
    49  	}
    50  	return nil, err
    51  }
    52  
    53  const (
    54  	execEnvUnknown       = "UNKNOWN"
    55  	execEnvAzureSingle   = "AZURE_SINGLE"
    56  	execEnvAzureFlexible = "AZURE_FLEXIBLE"
    57  	execEnvGoogle        = "GOOGLE"
    58  )
    59  
    60  func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error) {
    61  	sqlDbSize := `select /* pgwatch_generated */ pg_database_size(current_database());`
    62  	var sizeMB int64
    63  
    64  	lastDBSizeCheckLock.RLock()
    65  	lastDBSizeCheckTime := lastDBSizeFetchTime[dbUnique]
    66  	lastDBSize, ok := lastDBSizeMB[dbUnique]
    67  	lastDBSizeCheckLock.RUnlock()
    68  
    69  	if !ok || lastDBSizeCheckTime.Add(dbSizeCachingInterval).Before(time.Now()) {
    70  		md, err := GetMonitoredDatabaseByUniqueName(dbUnique)
    71  		if err != nil {
    72  			return 0, err
    73  		}
    74  		ver, err := GetMonitoredDatabaseSettings(ctx, md, false)
    75  		if err != nil || (ver.ExecEnv != execEnvAzureSingle) || (ver.ExecEnv == execEnvAzureSingle && ver.ApproxDBSizeB < 1e12) {
    76  			log.GetLogger(ctx).Debugf("[%s] determining DB size ...", dbUnique)
    77  
    78  			data, err := QueryMeasurements(ctx, dbUnique, sqlDbSize) // can take some time on ancient FS, use 300s stmt timeout
    79  			if err != nil {
    80  				log.GetLogger(ctx).Errorf("[%s] failed to determine DB size...cannot apply --min-db-size-mb flag. err: %v ...", dbUnique, err)
    81  				return 0, err
    82  			}
    83  			sizeMB = data[0]["pg_database_size"].(int64) / 1048576
    84  		} else {
    85  			log.GetLogger(ctx).Debugf("[%s] Using approx DB size for the --min-db-size-mb filter ...", dbUnique)
    86  			sizeMB = ver.ApproxDBSizeB / 1048576
    87  		}
    88  
    89  		log.GetLogger(ctx).Debugf("[%s] DB size = %d MB, caching for %v ...", dbUnique, sizeMB, dbSizeCachingInterval)
    90  
    91  		lastDBSizeCheckLock.Lock()
    92  		lastDBSizeFetchTime[dbUnique] = time.Now()
    93  		lastDBSizeMB[dbUnique] = sizeMB
    94  		lastDBSizeCheckLock.Unlock()
    95  
    96  		return sizeMB, nil
    97  
    98  	}
    99  	log.GetLogger(ctx).Debugf("[%s] using cached DBsize %d MB for the --min-db-size-mb filter check", dbUnique, lastDBSize)
   100  	return lastDBSize, nil
   101  }
   102  
   103  // VersionToInt parses a given version and returns an integer  or
   104  // an error if unable to parse the version. Only parses valid semantic versions.
   105  // Performs checking that can find errors within the version.
   106  // Examples: v1.2 -> 01_02_00, v9.6.3 -> 09_06_03, v11 -> 11_00_00
   107  var regVer = regexp.MustCompile(`(\d+).?(\d*).?(\d*)`)
   108  
   109  func VersionToInt(version string) (v int) {
   110  	if matches := regVer.FindStringSubmatch(version); len(matches) > 1 {
   111  		for i, match := range matches[1:] {
   112  			v += func() (m int) { m, _ = strconv.Atoi(match); return }() * int(math.Pow10(4-i*2))
   113  		}
   114  	}
   115  	return
   116  }
   117  
   118  var rBouncerAndPgpoolVerMatch = regexp.MustCompile(`\d+\.+\d+`) // extract $major.minor from "4.1.2 (karasukiboshi)" or "PgBouncer 1.12.0"
   119  
   120  func GetMonitoredDatabaseSettings(ctx context.Context, md *sources.SourceConn, noCache bool) (MonitoredDatabaseSettings, error) {
   121  	var dbSettings MonitoredDatabaseSettings
   122  	var dbNewSettings MonitoredDatabaseSettings
   123  	var ok bool
   124  
   125  	l := log.GetLogger(ctx).WithField("source", md.Name).WithField("kind", md.Kind)
   126  
   127  	sqlExtensions := `select /* pgwatch_generated */ extname::text, (regexp_matches(extversion, $$\d+\.?\d+?$$))[1]::text as extversion from pg_extension order by 1;`
   128  
   129  	MonitoredDatabasesSettingsLock.Lock()
   130  	getVerLock, ok := MonitoredDatabasesSettingsGetLock[md.Name]
   131  	if !ok {
   132  		MonitoredDatabasesSettingsGetLock[md.Name] = &sync.RWMutex{}
   133  		getVerLock = MonitoredDatabasesSettingsGetLock[md.Name]
   134  	}
   135  	dbSettings, ok = MonitoredDatabasesSettings[md.Name]
   136  	MonitoredDatabasesSettingsLock.Unlock()
   137  
   138  	if !noCache && ok && dbSettings.LastCheckedOn.After(time.Now().Add(time.Minute*-2)) { // use cached version for 2 min
   139  		return dbSettings, nil
   140  	}
   141  	getVerLock.Lock() // limit to 1 concurrent version info fetch per DB
   142  	defer getVerLock.Unlock()
   143  	l.Debug("determining DB version and recovery status...")
   144  
   145  	if dbNewSettings.Extensions == nil {
   146  		dbNewSettings.Extensions = make(map[string]int)
   147  	}
   148  
   149  	switch md.Kind {
   150  	case sources.SourcePgBouncer:
   151  		if err := md.Conn.QueryRow(ctx, "SHOW VERSION").Scan(&dbNewSettings.VersionStr); err != nil {
   152  			return dbNewSettings, err
   153  		}
   154  		matches := rBouncerAndPgpoolVerMatch.FindStringSubmatch(dbNewSettings.VersionStr)
   155  		if len(matches) != 1 {
   156  			return dbSettings, fmt.Errorf("unexpected PgBouncer version input: %s", dbNewSettings.VersionStr)
   157  		}
   158  		dbNewSettings.Version = VersionToInt(matches[0])
   159  	case sources.SourcePgPool:
   160  		if err := md.Conn.QueryRow(ctx, "SHOW POOL_VERSION").Scan(&dbNewSettings.VersionStr); err != nil {
   161  			return dbNewSettings, err
   162  		}
   163  
   164  		matches := rBouncerAndPgpoolVerMatch.FindStringSubmatch(dbNewSettings.VersionStr)
   165  		if len(matches) != 1 {
   166  			return dbSettings, fmt.Errorf("unexpected PgPool version input: %s", dbNewSettings.VersionStr)
   167  		}
   168  		dbNewSettings.Version = VersionToInt(matches[0])
   169  	default:
   170  		sql := `select /* pgwatch_generated */ 
   171  	div(current_setting('server_version_num')::int, 10000) as ver, 
   172  	version(), 
   173  	pg_is_in_recovery(), 
   174  	current_database()::TEXT,
   175  	system_identifier,
   176  	current_setting('is_superuser')::bool
   177  FROM
   178  	pg_control_system()`
   179  
   180  		err := md.Conn.QueryRow(ctx, sql).
   181  			Scan(&dbNewSettings.Version, &dbNewSettings.VersionStr,
   182  				&dbNewSettings.IsInRecovery, &dbNewSettings.RealDbname,
   183  				&dbNewSettings.SystemIdentifier, &dbNewSettings.IsSuperuser)
   184  		if err != nil {
   185  			if noCache {
   186  				return dbSettings, err
   187  			}
   188  			l.Error("DBGetPGVersion failed, using old cached value: ", err)
   189  			return dbSettings, nil
   190  		}
   191  
   192  		if dbSettings.ExecEnv != "" {
   193  			dbNewSettings.ExecEnv = dbSettings.ExecEnv // carry over as not likely to change ever
   194  		} else {
   195  			l.Debugf("determining the execution env...")
   196  			dbNewSettings.ExecEnv = md.DiscoverPlatform(ctx)
   197  		}
   198  
   199  		// to work around poor Azure Single Server FS functions performance for some metrics + the --min-db-size-mb filter
   200  		if dbNewSettings.ExecEnv == execEnvAzureSingle {
   201  			if approxSize, err := md.GetApproxSize(ctx); err == nil {
   202  				dbNewSettings.ApproxDBSizeB = approxSize
   203  			} else {
   204  				dbNewSettings.ApproxDBSizeB = dbSettings.ApproxDBSizeB
   205  			}
   206  		}
   207  
   208  		l.Debugf("[%s] determining installed extensions info...", md.Name)
   209  		data, err := QueryMeasurements(ctx, md.Name, sqlExtensions)
   210  		if err != nil {
   211  			l.Errorf("[%s] failed to determine installed extensions info: %v", md.Name, err)
   212  		} else {
   213  			for _, dr := range data {
   214  				extver := VersionToInt(dr["extversion"].(string))
   215  				if extver == 0 {
   216  					l.Error("failed to determine extension version info for extension: ", dr["extname"])
   217  					continue
   218  				}
   219  				dbNewSettings.Extensions[dr["extname"].(string)] = extver
   220  			}
   221  			l.Debugf("[%s] installed extensions: %+v", md.Name, dbNewSettings.Extensions)
   222  		}
   223  
   224  	}
   225  
   226  	dbNewSettings.LastCheckedOn = time.Now()
   227  	MonitoredDatabasesSettingsLock.Lock()
   228  	MonitoredDatabasesSettings[md.Name] = dbNewSettings
   229  	MonitoredDatabasesSettingsLock.Unlock()
   230  
   231  	return dbNewSettings, nil
   232  }
   233  
   234  func DetectSprocChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   235  	detectedChanges := make(metrics.Measurements, 0)
   236  	var firstRun bool
   237  	var changeCounts ChangeDetectionResults
   238  
   239  	log.GetLogger(ctx).Debugf("[%s][%s] checking for sproc changes...", dbUnique, specialMetricChangeEvents)
   240  	if _, ok := hostState["sproc_hashes"]; !ok {
   241  		firstRun = true
   242  		hostState["sproc_hashes"] = make(map[string]string)
   243  	}
   244  
   245  	mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
   246  	if !ok {
   247  		log.GetLogger(ctx).Error("could not get sproc_hashes sql")
   248  		return changeCounts
   249  	}
   250  
   251  	data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
   252  	if err != nil {
   253  		log.GetLogger(ctx).Error("could not read sproc_hashes from monitored host: ", dbUnique, ", err:", err)
   254  		return changeCounts
   255  	}
   256  
   257  	for _, dr := range data {
   258  		objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
   259  		prevHash, ok := hostState["sproc_hashes"][objIdent]
   260  		if ok { // we have existing state
   261  			if prevHash != dr["md5"].(string) {
   262  				log.GetLogger(ctx).Info("detected change in sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
   263  				dr["event"] = "alter"
   264  				detectedChanges = append(detectedChanges, dr)
   265  				hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
   266  				changeCounts.Altered++
   267  			}
   268  		} else { // check for new / delete
   269  			if !firstRun {
   270  				log.GetLogger(ctx).Info("detected new sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
   271  				dr["event"] = "create"
   272  				detectedChanges = append(detectedChanges, dr)
   273  				changeCounts.Created++
   274  			}
   275  			hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
   276  		}
   277  	}
   278  	// detect deletes
   279  	if !firstRun && len(hostState["sproc_hashes"]) != len(data) {
   280  		deletedSProcs := make([]string, 0)
   281  		// turn resultset to map => [oid]=true for faster checks
   282  		currentOidMap := make(map[string]bool)
   283  		for _, dr := range data {
   284  			currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
   285  		}
   286  		for sprocIdent := range hostState["sproc_hashes"] {
   287  			_, ok := currentOidMap[sprocIdent]
   288  			if !ok {
   289  				splits := strings.Split(sprocIdent, dbMetricJoinStr)
   290  				log.GetLogger(ctx).Info("detected delete of sproc:", splits[0], ", oid:", splits[1])
   291  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   292  				influxEntry["event"] = "drop"
   293  				influxEntry["tag_sproc"] = splits[0]
   294  				influxEntry["tag_oid"] = splits[1]
   295  				detectedChanges = append(detectedChanges, influxEntry)
   296  				deletedSProcs = append(deletedSProcs, sprocIdent)
   297  				changeCounts.Dropped++
   298  			}
   299  		}
   300  		for _, deletedSProc := range deletedSProcs {
   301  			delete(hostState["sproc_hashes"], deletedSProc)
   302  		}
   303  	}
   304  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d sproc changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
   305  	if len(detectedChanges) > 0 {
   306  		md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
   307  		storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique, MetricName: "sproc_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
   308  	}
   309  
   310  	return changeCounts
   311  }
   312  
   313  func DetectTableChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   314  	detectedChanges := make(metrics.Measurements, 0)
   315  	var firstRun bool
   316  	var changeCounts ChangeDetectionResults
   317  
   318  	log.GetLogger(ctx).Debugf("[%s][%s] checking for table changes...", dbUnique, specialMetricChangeEvents)
   319  	if _, ok := hostState["table_hashes"]; !ok {
   320  		firstRun = true
   321  		hostState["table_hashes"] = make(map[string]string)
   322  	}
   323  
   324  	mvp, ok := metricDefs.GetMetricDef("table_hashes")
   325  	if !ok {
   326  		log.GetLogger(ctx).Error("could not get table_hashes sql")
   327  		return changeCounts
   328  	}
   329  
   330  	data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
   331  	if err != nil {
   332  		log.GetLogger(ctx).Error("could not read table_hashes from monitored host:", dbUnique, ", err:", err)
   333  		return changeCounts
   334  	}
   335  
   336  	for _, dr := range data {
   337  		objIdent := dr["tag_table"].(string)
   338  		prevHash, ok := hostState["table_hashes"][objIdent]
   339  		//log.Debug("inspecting table:", objIdent, "hash:", prev_hash)
   340  		if ok { // we have existing state
   341  			if prevHash != dr["md5"].(string) {
   342  				log.GetLogger(ctx).Info("detected DDL change in table:", dr["tag_table"])
   343  				dr["event"] = "alter"
   344  				detectedChanges = append(detectedChanges, dr)
   345  				hostState["table_hashes"][objIdent] = dr["md5"].(string)
   346  				changeCounts.Altered++
   347  			}
   348  		} else { // check for new / delete
   349  			if !firstRun {
   350  				log.GetLogger(ctx).Info("detected new table:", dr["tag_table"])
   351  				dr["event"] = "create"
   352  				detectedChanges = append(detectedChanges, dr)
   353  				changeCounts.Created++
   354  			}
   355  			hostState["table_hashes"][objIdent] = dr["md5"].(string)
   356  		}
   357  	}
   358  	// detect deletes
   359  	if !firstRun && len(hostState["table_hashes"]) != len(data) {
   360  		deletedTables := make([]string, 0)
   361  		// turn resultset to map => [table]=true for faster checks
   362  		currentTableMap := make(map[string]bool)
   363  		for _, dr := range data {
   364  			currentTableMap[dr["tag_table"].(string)] = true
   365  		}
   366  		for table := range hostState["table_hashes"] {
   367  			_, ok := currentTableMap[table]
   368  			if !ok {
   369  				log.GetLogger(ctx).Info("detected drop of table:", table)
   370  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   371  				influxEntry["event"] = "drop"
   372  				influxEntry["tag_table"] = table
   373  				detectedChanges = append(detectedChanges, influxEntry)
   374  				deletedTables = append(deletedTables, table)
   375  				changeCounts.Dropped++
   376  			}
   377  		}
   378  		for _, deletedTable := range deletedTables {
   379  			delete(hostState["table_hashes"], deletedTable)
   380  		}
   381  	}
   382  
   383  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d table changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
   384  	if len(detectedChanges) > 0 {
   385  		md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
   386  		storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique, MetricName: "table_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
   387  	}
   388  
   389  	return changeCounts
   390  }
   391  
   392  func DetectIndexChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   393  	detectedChanges := make(metrics.Measurements, 0)
   394  	var firstRun bool
   395  	var changeCounts ChangeDetectionResults
   396  
   397  	log.GetLogger(ctx).Debugf("[%s][%s] checking for index changes...", dbUnique, specialMetricChangeEvents)
   398  	if _, ok := hostState["index_hashes"]; !ok {
   399  		firstRun = true
   400  		hostState["index_hashes"] = make(map[string]string)
   401  	}
   402  
   403  	mvp, ok := metricDefs.GetMetricDef("index_hashes")
   404  	if !ok {
   405  		log.GetLogger(ctx).Error("could not get index_hashes sql")
   406  		return changeCounts
   407  	}
   408  
   409  	data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
   410  	if err != nil {
   411  		log.GetLogger(ctx).Error("could not read index_hashes from monitored host:", dbUnique, ", err:", err)
   412  		return changeCounts
   413  	}
   414  
   415  	for _, dr := range data {
   416  		objIdent := dr["tag_index"].(string)
   417  		prevHash, ok := hostState["index_hashes"][objIdent]
   418  		if ok { // we have existing state
   419  			if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
   420  				log.GetLogger(ctx).Info("detected index change:", dr["tag_index"], ", table:", dr["table"])
   421  				dr["event"] = "alter"
   422  				detectedChanges = append(detectedChanges, dr)
   423  				hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   424  				changeCounts.Altered++
   425  			}
   426  		} else { // check for new / delete
   427  			if !firstRun {
   428  				log.GetLogger(ctx).Info("detected new index:", dr["tag_index"])
   429  				dr["event"] = "create"
   430  				detectedChanges = append(detectedChanges, dr)
   431  				changeCounts.Created++
   432  			}
   433  			hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   434  		}
   435  	}
   436  	// detect deletes
   437  	if !firstRun && len(hostState["index_hashes"]) != len(data) {
   438  		deletedIndexes := make([]string, 0)
   439  		// turn resultset to map => [table]=true for faster checks
   440  		currentIndexMap := make(map[string]bool)
   441  		for _, dr := range data {
   442  			currentIndexMap[dr["tag_index"].(string)] = true
   443  		}
   444  		for indexName := range hostState["index_hashes"] {
   445  			_, ok := currentIndexMap[indexName]
   446  			if !ok {
   447  				log.GetLogger(ctx).Info("detected drop of index_name:", indexName)
   448  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   449  				influxEntry["event"] = "drop"
   450  				influxEntry["tag_index"] = indexName
   451  				detectedChanges = append(detectedChanges, influxEntry)
   452  				deletedIndexes = append(deletedIndexes, indexName)
   453  				changeCounts.Dropped++
   454  			}
   455  		}
   456  		for _, deletedIndex := range deletedIndexes {
   457  			delete(hostState["index_hashes"], deletedIndex)
   458  		}
   459  	}
   460  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d index changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
   461  	if len(detectedChanges) > 0 {
   462  		md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
   463  		storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique, MetricName: "index_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
   464  	}
   465  
   466  	return changeCounts
   467  }
   468  
   469  func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   470  	detectedChanges := make(metrics.Measurements, 0)
   471  	var firstRun bool
   472  	var changeCounts ChangeDetectionResults
   473  
   474  	log.GetLogger(ctx).Debugf("[%s][%s] checking object privilege changes...", dbUnique, specialMetricChangeEvents)
   475  	if _, ok := hostState["object_privileges"]; !ok {
   476  		firstRun = true
   477  		hostState["object_privileges"] = make(map[string]string)
   478  	}
   479  
   480  	mvp, ok := metricDefs.GetMetricDef("privilege_changes")
   481  	if !ok || mvp.GetSQL(int(vme.Version)) == "" {
   482  		log.GetLogger(ctx).Warningf("[%s][%s] could not get SQL for 'privilege_changes'. cannot detect privilege changes", dbUnique, specialMetricChangeEvents)
   483  		return changeCounts
   484  	}
   485  
   486  	// returns rows of: object_type, tag_role, tag_object, privilege_type
   487  	data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
   488  	if err != nil {
   489  		log.GetLogger(ctx).Errorf("[%s][%s] failed to fetch object privileges info: %v", dbUnique, specialMetricChangeEvents, err)
   490  		return changeCounts
   491  	}
   492  
   493  	currentState := make(map[string]bool)
   494  	for _, dr := range data {
   495  		objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
   496  		if firstRun {
   497  			hostState["object_privileges"][objIdent] = ""
   498  		} else {
   499  			_, ok := hostState["object_privileges"][objIdent]
   500  			if !ok {
   501  				log.GetLogger(ctx).Infof("[%s][%s] detected new object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
   502  					dbUnique, specialMetricChangeEvents, dr["tag_role"], dr["object_type"], dr["tag_object"], dr["privilege_type"])
   503  				dr["event"] = "GRANT"
   504  				detectedChanges = append(detectedChanges, dr)
   505  				changeCounts.Created++
   506  				hostState["object_privileges"][objIdent] = ""
   507  			}
   508  			currentState[objIdent] = true
   509  		}
   510  	}
   511  	// check revokes - exists in old state only
   512  	if !firstRun && len(currentState) > 0 {
   513  		for objPrevRun := range hostState["object_privileges"] {
   514  			if _, ok := currentState[objPrevRun]; !ok {
   515  				splits := strings.Split(objPrevRun, "#:#")
   516  				log.GetLogger(ctx).Infof("[%s][%s] detected removed object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
   517  					dbUnique, specialMetricChangeEvents, splits[1], splits[0], splits[2], splits[3])
   518  				revokeEntry := metrics.NewMeasurement(data.GetEpoch())
   519  				revokeEntry["object_type"] = splits[0]
   520  				revokeEntry["tag_role"] = splits[1]
   521  				revokeEntry["tag_object"] = splits[2]
   522  				revokeEntry["privilege_type"] = splits[3]
   523  				revokeEntry["event"] = "REVOKE"
   524  				detectedChanges = append(detectedChanges, revokeEntry)
   525  				changeCounts.Dropped++
   526  				delete(hostState["object_privileges"], objPrevRun)
   527  			}
   528  		}
   529  	}
   530  
   531  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d object privilege changes...", dbUnique, specialMetricChangeEvents, len(detectedChanges))
   532  	if len(detectedChanges) > 0 {
   533  		md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
   534  		storageCh <- []metrics.MeasurementEnvelope{
   535  			{
   536  				DBName:     dbUnique,
   537  				MetricName: "privilege_changes",
   538  				Data:       detectedChanges,
   539  				CustomTags: md.CustomTags,
   540  			}}
   541  	}
   542  
   543  	return changeCounts
   544  }
   545  
   546  func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
   547  	detectedChanges := make(metrics.Measurements, 0)
   548  	var firstRun bool
   549  	var changeCounts ChangeDetectionResults
   550  
   551  	log.GetLogger(ctx).Debugf("[%s][%s] checking for configuration changes...", dbUnique, specialMetricChangeEvents)
   552  	if _, ok := hostState["configuration_hashes"]; !ok {
   553  		firstRun = true
   554  		hostState["configuration_hashes"] = make(map[string]string)
   555  	}
   556  
   557  	mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
   558  	if !ok {
   559  		log.GetLogger(ctx).Errorf("[%s][%s] could not get configuration_hashes sql", dbUnique, specialMetricChangeEvents)
   560  		return changeCounts
   561  	}
   562  
   563  	data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
   564  	if err != nil {
   565  		log.GetLogger(ctx).Errorf("[%s][%s] could not read configuration_hashes from monitored host: %v", dbUnique, specialMetricChangeEvents, err)
   566  		return changeCounts
   567  	}
   568  
   569  	for _, dr := range data {
   570  		objIdent := dr["tag_setting"].(string)
   571  		objValue := dr["value"].(string)
   572  		prevРash, ok := hostState["configuration_hashes"][objIdent]
   573  		if ok { // we have existing state
   574  			if prevРash != objValue {
   575  				if objIdent == "connection_ID" {
   576  					continue // ignore some weird Azure managed PG service setting
   577  				}
   578  				log.GetLogger(ctx).Warningf("[%s][%s] detected settings change: %s = %s (prev: %s)",
   579  					dbUnique, specialMetricChangeEvents, objIdent, objValue, prevРash)
   580  				dr["event"] = "alter"
   581  				detectedChanges = append(detectedChanges, dr)
   582  				hostState["configuration_hashes"][objIdent] = objValue
   583  				changeCounts.Altered++
   584  			}
   585  		} else { // check for new, delete not relevant here (pg_upgrade)
   586  			if !firstRun {
   587  				log.GetLogger(ctx).Warningf("[%s][%s] detected new setting: %s", dbUnique, specialMetricChangeEvents, objIdent)
   588  				dr["event"] = "create"
   589  				detectedChanges = append(detectedChanges, dr)
   590  				changeCounts.Created++
   591  			}
   592  			hostState["configuration_hashes"][objIdent] = objValue
   593  		}
   594  	}
   595  
   596  	log.GetLogger(ctx).Debugf("[%s][%s] detected %d configuration changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
   597  	if len(detectedChanges) > 0 {
   598  		md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
   599  		storageCh <- []metrics.MeasurementEnvelope{{
   600  			DBName:     dbUnique,
   601  			MetricName: "configuration_changes",
   602  			Data:       detectedChanges,
   603  			CustomTags: md.CustomTags,
   604  		}}
   605  	}
   606  
   607  	return changeCounts
   608  }
   609  
   610  func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, hostState map[string]map[string]string) {
   611  	storageCh := r.measurementCh
   612  	sprocCounts := DetectSprocChanges(ctx, dbUnique, vme, storageCh, hostState) // TODO some of Detect*() code could be unified...
   613  	tableCounts := DetectTableChanges(ctx, dbUnique, vme, storageCh, hostState)
   614  	indexCounts := DetectIndexChanges(ctx, dbUnique, vme, storageCh, hostState)
   615  	confCounts := DetectConfigurationChanges(ctx, dbUnique, vme, storageCh, hostState)
   616  	privChangeCounts := DetectPrivilegeChanges(ctx, dbUnique, vme, storageCh, hostState)
   617  
   618  	// need to send info on all object changes as one message as Grafana applies "last wins" for annotations with similar timestamp
   619  	message := ""
   620  	if sprocCounts.Altered > 0 || sprocCounts.Created > 0 || sprocCounts.Dropped > 0 {
   621  		message += fmt.Sprintf(" sprocs %d/%d/%d", sprocCounts.Created, sprocCounts.Altered, sprocCounts.Dropped)
   622  	}
   623  	if tableCounts.Altered > 0 || tableCounts.Created > 0 || tableCounts.Dropped > 0 {
   624  		message += fmt.Sprintf(" tables/views %d/%d/%d", tableCounts.Created, tableCounts.Altered, tableCounts.Dropped)
   625  	}
   626  	if indexCounts.Altered > 0 || indexCounts.Created > 0 || indexCounts.Dropped > 0 {
   627  		message += fmt.Sprintf(" indexes %d/%d/%d", indexCounts.Created, indexCounts.Altered, indexCounts.Dropped)
   628  	}
   629  	if confCounts.Altered > 0 || confCounts.Created > 0 {
   630  		message += fmt.Sprintf(" configuration %d/%d/%d", confCounts.Created, confCounts.Altered, confCounts.Dropped)
   631  	}
   632  	if privChangeCounts.Dropped > 0 || privChangeCounts.Created > 0 {
   633  		message += fmt.Sprintf(" privileges %d/%d/%d", privChangeCounts.Created, privChangeCounts.Altered, privChangeCounts.Dropped)
   634  	}
   635  
   636  	if message > "" {
   637  		message = "Detected changes for \"" + dbUnique + "\" [Created/Altered/Dropped]:" + message
   638  		log.GetLogger(ctx).Info(message)
   639  		detectedChangesSummary := make(metrics.Measurements, 0)
   640  		influxEntry := metrics.NewMeasurement(time.Now().UnixNano())
   641  		influxEntry["details"] = message
   642  		detectedChangesSummary = append(detectedChangesSummary, influxEntry)
   643  		md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
   644  		storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique,
   645  			SourceType: string(md.Kind),
   646  			MetricName: "object_changes",
   647  			Data:       detectedChangesSummary,
   648  			CustomTags: md.CustomTags,
   649  		}}
   650  
   651  	}
   652  }
   653  
   654  // some extra work needed as pgpool SHOW commands don't specify the return data types for some reason
   655  func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.Measurements, error) {
   656  	var retData = make(metrics.Measurements, 0)
   657  	epochNs := time.Now().UnixNano()
   658  
   659  	sqlLines := strings.Split(strings.ToUpper(mvp.GetSQL(int(vme.Version))), "\n")
   660  
   661  	for _, sql := range sqlLines {
   662  		if strings.HasPrefix(sql, "SHOW POOL_NODES") {
   663  			data, err := QueryMeasurements(ctx, msg.DBUniqueName, sql)
   664  			if err != nil {
   665  				log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
   666  				return data, err
   667  			}
   668  
   669  			for _, row := range data {
   670  				retRow := metrics.NewMeasurement(epochNs)
   671  				for k, v := range row {
   672  					vs := string(v.([]byte))
   673  					// need 1 tag so that Influx would not merge rows
   674  					if k == "node_id" {
   675  						retRow["tag_node_id"] = vs
   676  						continue
   677  					}
   678  
   679  					retRow[k] = vs
   680  					if k == "status" { // was changed from numeric to string at some pgpool version so leave the string
   681  						// but also add "status_num" field
   682  						switch vs {
   683  						case "up":
   684  							retRow["status_num"] = 1
   685  						case "down":
   686  							retRow["status_num"] = 0
   687  						default:
   688  							i, err := strconv.ParseInt(vs, 10, 64)
   689  							if err == nil {
   690  								retRow["status_num"] = i
   691  							}
   692  						}
   693  						continue
   694  					}
   695  					// everything is returned as text, so try to convert all numerics into ints / floats
   696  					if k != "lb_weight" {
   697  						i, err := strconv.ParseInt(vs, 10, 64)
   698  						if err == nil {
   699  							retRow[k] = i
   700  							continue
   701  						}
   702  					}
   703  					f, err := strconv.ParseFloat(vs, 64)
   704  					if err == nil {
   705  						retRow[k] = f
   706  						continue
   707  					}
   708  				}
   709  				retData = append(retData, retRow)
   710  			}
   711  		} else if strings.HasPrefix(sql, "SHOW POOL_PROCESSES") {
   712  			if len(retData) == 0 {
   713  				log.GetLogger(ctx).Warningf("[%s][%s] SHOW POOL_NODES needs to be placed before SHOW POOL_PROCESSES. ignoring SHOW POOL_PROCESSES", msg.DBUniqueName, msg.MetricName)
   714  				continue
   715  			}
   716  
   717  			data, err := QueryMeasurements(ctx, msg.DBUniqueName, sql)
   718  			if err != nil {
   719  				log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
   720  				continue
   721  			}
   722  
   723  			// summarize processesTotal / processes_active over all rows
   724  			processesTotal := 0
   725  			processesActive := 0
   726  			for _, row := range data {
   727  				processesTotal++
   728  				v, ok := row["database"]
   729  				if !ok {
   730  					log.GetLogger(ctx).Infof("[%s][%s] column 'database' not found from data returned by SHOW POOL_PROCESSES, check pool version / SQL definition", msg.DBUniqueName, msg.MetricName)
   731  					continue
   732  				}
   733  				if len(v.([]byte)) > 0 {
   734  					processesActive++
   735  				}
   736  			}
   737  
   738  			for _, retRow := range retData {
   739  				retRow["processes_total"] = processesTotal
   740  				retRow["processes_active"] = processesActive
   741  			}
   742  		}
   743  	}
   744  	return retData, nil
   745  }
   746  
   747  // Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
   748  // With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
   749  // whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
   750  func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string {
   751  	sqlAvailable := `select name::text from pg_available_extensions`
   752  	extsCreated := make([]string, 0)
   753  
   754  	// For security reasons don't allow to execute random strings but check that it's an existing extension
   755  	data, err := QueryMeasurements(ctx, dbUnique, sqlAvailable)
   756  	if err != nil {
   757  		log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", dbUnique, err)
   758  		return extsCreated
   759  	}
   760  
   761  	availableExts := make(map[string]bool)
   762  	for _, row := range data {
   763  		availableExts[row["name"].(string)] = true
   764  	}
   765  
   766  	for _, extToCreate := range extensionNames {
   767  		if _, ok := existingExtensions[extToCreate]; ok {
   768  			continue
   769  		}
   770  		_, ok := availableExts[extToCreate]
   771  		if !ok {
   772  			log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", dbUnique, extToCreate)
   773  		} else {
   774  			sqlCreateExt := `create extension ` + extToCreate
   775  			_, err := QueryMeasurements(ctx, dbUnique, sqlCreateExt)
   776  			if err != nil {
   777  				log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", dbUnique, extToCreate, err)
   778  			}
   779  			extsCreated = append(extsCreated, extToCreate)
   780  		}
   781  	}
   782  
   783  	return extsCreated
   784  }
   785  
   786  // Called once on daemon startup to try to create "metric fething helper" functions automatically
   787  func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
   788  	metricConfig := func() map[string]float64 {
   789  		if len(md.Metrics) > 0 {
   790  			return md.Metrics
   791  		}
   792  		if md.PresetMetrics > "" {
   793  			return metricDefs.GetPresetMetrics(md.PresetMetrics)
   794  		}
   795  		return nil
   796  	}()
   797  	conf, err := pgx.ParseConfig(md.ConnStr)
   798  	if err != nil {
   799  		return err
   800  	}
   801  	conf.DefaultQueryExecMode = pgx.QueryExecModeExec
   802  	c, err := pgx.ConnectConfig(ctx, conf)
   803  	if err != nil {
   804  		return nil
   805  	}
   806  	defer c.Close(ctx)
   807  
   808  	for metricName := range metricConfig {
   809  		Metric, ok := metricDefs.GetMetricDef(metricName)
   810  		if !ok {
   811  			continue
   812  		}
   813  
   814  		_, err = c.Exec(ctx, Metric.InitSQL)
   815  		if err != nil {
   816  			log.GetLogger(ctx).Warningf("Failed to create a metric fetching helper for %s in %s: %v", md.Name, metricName, err)
   817  		} else {
   818  			log.GetLogger(ctx).Info("Successfully created metric fetching helper for", md.Name, metricName)
   819  		}
   820  	}
   821  	return nil
   822  }
   823  
   824  func CloseResourcesForRemovedMonitoredDBs(metricsWriter sinks.Writer, currentDBs, prevLoopDBs sources.SourceConns, shutDownDueToRoleChange map[string]bool) {
   825  	var curDBsMap = make(map[string]bool)
   826  
   827  	for _, curDB := range currentDBs {
   828  		curDBsMap[curDB.Name] = true
   829  	}
   830  
   831  	for _, prevDB := range prevLoopDBs {
   832  		if _, ok := curDBsMap[prevDB.Name]; !ok { // removed from config
   833  			prevDB.Conn.Close()
   834  			_ = metricsWriter.SyncMetric(prevDB.Name, "", "remove")
   835  		}
   836  	}
   837  
   838  	// or to be ignored due to current instance state
   839  	for roleChangedDB := range shutDownDueToRoleChange {
   840  		if db := currentDBs.GetMonitoredDatabase(roleChangedDB); db != nil {
   841  			db.Conn.Close()
   842  		}
   843  		_ = metricsWriter.SyncMetric(roleChangedDB, "", "remove")
   844  	}
   845  }
   846