...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/reaper/database.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"maps"
     8  	"strings"
     9  	"time"
    10  
    11  	"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
    12  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
    13  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
    14  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    15  	"github.com/jackc/pgx/v5"
    16  )
    17  
    18  func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
    19  	if strings.TrimSpace(sql) == "" {
    20  		return nil, errors.New("empty SQL")
    21  	}
    22  
    23  	// For non-postgres connections (e.g. pgbouncer, pgpool), use simple protocol
    24  	if !md.IsPostgresSource() {
    25  		args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
    26  	}
    27  	// lock_timeout is set at connection level via RuntimeParams, no need for transaction wrapper
    28  	rows, err := md.Conn.Query(ctx, sql, args...)
    29  	if err == nil {
    30  		return pgx.CollectRows(rows, metrics.RowToMeasurement)
    31  	}
    32  	return nil, err
    33  }
    34  
    35  func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults) {
    36  	detectedChanges := make(metrics.Measurements, 0)
    37  	var firstRun bool
    38  	l := log.GetLogger(ctx)
    39  	changeCounts.Target = "functions"
    40  	l.Debug("checking for sproc changes...")
    41  	if _, ok := md.ChangeState["sproc_hashes"]; !ok {
    42  		firstRun = true
    43  		md.ChangeState["sproc_hashes"] = make(map[string]string)
    44  	}
    45  
    46  	mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
    47  	if !ok {
    48  		l.Error("could not get sproc_hashes sql")
    49  		return
    50  	}
    51  
    52  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
    53  	if err != nil {
    54  		l.Error(err)
    55  		return
    56  	}
    57  
    58  	for _, dr := range data {
    59  		objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
    60  		prevHash, ok := md.ChangeState["sproc_hashes"][objIdent]
    61  		ll := l.WithField("sproc", dr["tag_sproc"]).WithField("oid", dr["tag_oid"])
    62  		if ok { // we have existing state
    63  			if prevHash != dr["md5"].(string) {
    64  				ll.Debug("change detected")
    65  				dr["event"] = "alter"
    66  				detectedChanges = append(detectedChanges, dr)
    67  				md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
    68  				changeCounts.Altered++
    69  			}
    70  		} else { // check for new / delete
    71  			if !firstRun {
    72  				ll.Debug("new sproc detected")
    73  				dr["event"] = "create"
    74  				detectedChanges = append(detectedChanges, dr)
    75  				changeCounts.Created++
    76  			}
    77  			md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
    78  		}
    79  	}
    80  	// detect deletes
    81  	if !firstRun && len(md.ChangeState["sproc_hashes"]) != len(data) {
    82  		// turn resultset to map => [oid]=true for faster checks
    83  		currentOidMap := make(map[string]bool)
    84  		for _, dr := range data {
    85  			currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
    86  		}
    87  		for sprocIdent := range md.ChangeState["sproc_hashes"] {
    88  			_, ok := currentOidMap[sprocIdent]
    89  			if !ok {
    90  				splits := strings.Split(sprocIdent, dbMetricJoinStr)
    91  				l.WithField("sproc", splits[0]).WithField("oid", splits[1]).Debug("deleted sproc detected")
    92  				m := metrics.NewMeasurement(data.GetEpoch())
    93  				m["event"] = "drop"
    94  				m["tag_sproc"] = splits[0]
    95  				m["tag_oid"] = splits[1]
    96  				detectedChanges = append(detectedChanges, m)
    97  				delete(md.ChangeState["sproc_hashes"], sprocIdent)
    98  				changeCounts.Dropped++
    99  			}
   100  		}
   101  	}
   102  	l.Debugf("sproc changes detected: %d", len(detectedChanges))
   103  	if len(detectedChanges) > 0 {
   104  		r.measurementCh <- metrics.MeasurementEnvelope{
   105  			DBName:     md.Name,
   106  			MetricName: "sproc_changes",
   107  			Data:       detectedChanges,
   108  			CustomTags: md.CustomTags,
   109  		}
   110  	}
   111  
   112  	return changeCounts
   113  }
   114  
   115  func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   116  	detectedChanges := make(metrics.Measurements, 0)
   117  	var firstRun bool
   118  	var changeCounts ChangeDetectionResults
   119  	l := log.GetLogger(ctx)
   120  	changeCounts.Target = "tables"
   121  	l.Debug("checking for table changes...")
   122  	if _, ok := md.ChangeState["table_hashes"]; !ok {
   123  		firstRun = true
   124  		md.ChangeState["table_hashes"] = make(map[string]string)
   125  	}
   126  
   127  	mvp, ok := metricDefs.GetMetricDef("table_hashes")
   128  	if !ok {
   129  		l.Error("could not get table_hashes sql")
   130  		return changeCounts
   131  	}
   132  
   133  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   134  	if err != nil {
   135  		l.Error(err)
   136  		return changeCounts
   137  	}
   138  
   139  	for _, dr := range data {
   140  		objIdent := dr["tag_table"].(string)
   141  		prevHash, ok := md.ChangeState["table_hashes"][objIdent]
   142  		ll := l.WithField("table", dr["tag_table"])
   143  		if ok { // we have existing state
   144  			if prevHash != dr["md5"].(string) {
   145  				ll.Debug("change detected")
   146  				dr["event"] = "alter"
   147  				detectedChanges = append(detectedChanges, dr)
   148  				md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
   149  				changeCounts.Altered++
   150  			}
   151  		} else { // check for new / delete
   152  			if !firstRun {
   153  				ll.Debug("new table detected")
   154  				dr["event"] = "create"
   155  				detectedChanges = append(detectedChanges, dr)
   156  				changeCounts.Created++
   157  			}
   158  			md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
   159  		}
   160  	}
   161  	// detect deletes
   162  	if !firstRun && len(md.ChangeState["table_hashes"]) != len(data) {
   163  		deletedTables := make([]string, 0)
   164  		// turn resultset to map => [table]=true for faster checks
   165  		currentTableMap := make(map[string]bool)
   166  		for _, dr := range data {
   167  			currentTableMap[dr["tag_table"].(string)] = true
   168  		}
   169  		for table := range md.ChangeState["table_hashes"] {
   170  			_, ok := currentTableMap[table]
   171  			if !ok {
   172  				l.WithField("table", table).Debug("deleted table detected")
   173  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   174  				influxEntry["event"] = "drop"
   175  				influxEntry["tag_table"] = table
   176  				detectedChanges = append(detectedChanges, influxEntry)
   177  				deletedTables = append(deletedTables, table)
   178  				changeCounts.Dropped++
   179  			}
   180  		}
   181  		for _, deletedTable := range deletedTables {
   182  			delete(md.ChangeState["table_hashes"], deletedTable)
   183  		}
   184  	}
   185  
   186  	l.Debugf("table changes detected: %d", len(detectedChanges))
   187  	if len(detectedChanges) > 0 {
   188  		r.measurementCh <- metrics.MeasurementEnvelope{
   189  			DBName:     md.Name,
   190  			MetricName: "table_changes",
   191  			Data:       detectedChanges,
   192  			CustomTags: md.CustomTags,
   193  		}
   194  	}
   195  
   196  	return changeCounts
   197  }
   198  
   199  func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   200  	detectedChanges := make(metrics.Measurements, 0)
   201  	var firstRun bool
   202  	var changeCounts ChangeDetectionResults
   203  	l := log.GetLogger(ctx)
   204  	changeCounts.Target = "indexes"
   205  	l.Debug("checking for index changes...")
   206  	if _, ok := md.ChangeState["index_hashes"]; !ok {
   207  		firstRun = true
   208  		md.ChangeState["index_hashes"] = make(map[string]string)
   209  	}
   210  
   211  	mvp, ok := metricDefs.GetMetricDef("index_hashes")
   212  	if !ok {
   213  		l.Error("could not get index_hashes sql")
   214  		return changeCounts
   215  	}
   216  
   217  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   218  	if err != nil {
   219  		l.Error(err)
   220  		return changeCounts
   221  	}
   222  
   223  	for _, dr := range data {
   224  		objIdent := dr["tag_index"].(string)
   225  		prevHash, ok := md.ChangeState["index_hashes"][objIdent]
   226  		ll := l.WithField("index", dr["tag_index"]).WithField("table", dr["table"])
   227  		if ok { // we have existing state
   228  			if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
   229  				ll.Debug("change detected")
   230  				dr["event"] = "alter"
   231  				detectedChanges = append(detectedChanges, dr)
   232  				md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   233  				changeCounts.Altered++
   234  			}
   235  		} else { // check for new / delete
   236  			if !firstRun {
   237  				ll.Debug("new index detected")
   238  				dr["event"] = "create"
   239  				detectedChanges = append(detectedChanges, dr)
   240  				changeCounts.Created++
   241  			}
   242  			md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   243  		}
   244  	}
   245  	// detect deletes
   246  	if !firstRun && len(md.ChangeState["index_hashes"]) != len(data) {
   247  		deletedIndexes := make([]string, 0)
   248  		// turn resultset to map => [table]=true for faster checks
   249  		currentIndexMap := make(map[string]bool)
   250  		for _, dr := range data {
   251  			currentIndexMap[dr["tag_index"].(string)] = true
   252  		}
   253  		for indexName := range md.ChangeState["index_hashes"] {
   254  			_, ok := currentIndexMap[indexName]
   255  			if !ok {
   256  				l.WithField("index", indexName).Debug("deleted index detected")
   257  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   258  				influxEntry["event"] = "drop"
   259  				influxEntry["tag_index"] = indexName
   260  				detectedChanges = append(detectedChanges, influxEntry)
   261  				deletedIndexes = append(deletedIndexes, indexName)
   262  				changeCounts.Dropped++
   263  			}
   264  		}
   265  		for _, deletedIndex := range deletedIndexes {
   266  			delete(md.ChangeState["index_hashes"], deletedIndex)
   267  		}
   268  	}
   269  	l.Debugf("index changes detected: %d", len(detectedChanges))
   270  	if len(detectedChanges) > 0 {
   271  		r.measurementCh <- metrics.MeasurementEnvelope{
   272  			DBName:     md.Name,
   273  			MetricName: "index_changes",
   274  			Data:       detectedChanges,
   275  			CustomTags: md.CustomTags,
   276  		}
   277  	}
   278  
   279  	return changeCounts
   280  }
   281  
   282  func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   283  	detectedChanges := make(metrics.Measurements, 0)
   284  	var firstRun bool
   285  	var changeCounts ChangeDetectionResults
   286  	l := log.GetLogger(ctx)
   287  	changeCounts.Target = "privileges"
   288  	l.Debug("checking object privilege changes...")
   289  	if _, ok := md.ChangeState["object_privileges"]; !ok {
   290  		firstRun = true
   291  		md.ChangeState["object_privileges"] = make(map[string]string)
   292  	}
   293  
   294  	mvp, ok := metricDefs.GetMetricDef("privilege_changes")
   295  	if !ok || mvp.GetSQL(int(md.Version)) == "" {
   296  		l.Warning("could not get SQL for 'privilege_changes'. cannot detect privilege changes")
   297  		return changeCounts
   298  	}
   299  
   300  	// returns rows of: object_type, tag_role, tag_object, privilege_type
   301  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   302  	if err != nil {
   303  		l.Error(err)
   304  		return changeCounts
   305  	}
   306  
   307  	currentState := make(map[string]bool)
   308  	for _, dr := range data {
   309  		objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
   310  		ll := l.WithField("role", dr["tag_role"]).
   311  			WithField("object_type", dr["object_type"]).
   312  			WithField("object", dr["tag_object"]).
   313  			WithField("privilege_type", dr["privilege_type"])
   314  		if firstRun {
   315  			md.ChangeState["object_privileges"][objIdent] = ""
   316  		} else {
   317  			_, ok := md.ChangeState["object_privileges"][objIdent]
   318  			if !ok {
   319  				ll.Debug("new object privileges detected")
   320  				dr["event"] = "GRANT"
   321  				detectedChanges = append(detectedChanges, dr)
   322  				changeCounts.Created++
   323  				md.ChangeState["object_privileges"][objIdent] = ""
   324  			}
   325  			currentState[objIdent] = true
   326  		}
   327  	}
   328  	// check revokes - exists in old state only
   329  	if !firstRun && len(currentState) > 0 {
   330  		for objPrevRun := range md.ChangeState["object_privileges"] {
   331  			if _, ok := currentState[objPrevRun]; !ok {
   332  				splits := strings.Split(objPrevRun, "#:#")
   333  				l.WithField("role", splits[1]).
   334  					WithField("object_type", splits[0]).
   335  					WithField("object", splits[2]).
   336  					WithField("privilege_type", splits[3]).
   337  					Debug("removed object privileges detected")
   338  				revokeEntry := metrics.NewMeasurement(data.GetEpoch())
   339  				revokeEntry["object_type"] = splits[0]
   340  				revokeEntry["tag_role"] = splits[1]
   341  				revokeEntry["tag_object"] = splits[2]
   342  				revokeEntry["privilege_type"] = splits[3]
   343  				revokeEntry["event"] = "REVOKE"
   344  				detectedChanges = append(detectedChanges, revokeEntry)
   345  				changeCounts.Dropped++
   346  				delete(md.ChangeState["object_privileges"], objPrevRun)
   347  			}
   348  		}
   349  	}
   350  
   351  	l.Debugf("object privilege changes detected: %d", len(detectedChanges))
   352  	if len(detectedChanges) > 0 {
   353  		r.measurementCh <- metrics.MeasurementEnvelope{
   354  			DBName:     md.Name,
   355  			MetricName: "privilege_changes",
   356  			Data:       detectedChanges,
   357  			CustomTags: md.CustomTags,
   358  		}
   359  	}
   360  
   361  	return changeCounts
   362  }
   363  
   364  func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   365  	detectedChanges := make(metrics.Measurements, 0)
   366  	var firstRun bool
   367  	var changeCounts ChangeDetectionResults
   368  	l := log.GetLogger(ctx)
   369  	changeCounts.Target = "settings"
   370  	l.Debug("checking for configuration changes...")
   371  	if _, ok := md.ChangeState["configuration_hashes"]; !ok {
   372  		firstRun = true
   373  		md.ChangeState["configuration_hashes"] = make(map[string]string)
   374  	}
   375  
   376  	mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
   377  	if !ok {
   378  		l.Error("could not get configuration_hashes sql")
   379  		return changeCounts
   380  	}
   381  
   382  	rows, err := md.Conn.Query(ctx, mvp.GetSQL(md.Version))
   383  	if err != nil {
   384  		l.Error(err)
   385  		return changeCounts
   386  	}
   387  	defer rows.Close()
   388  	var (
   389  		objIdent, objValue string
   390  		epoch              int64
   391  	)
   392  	for rows.Next() {
   393  		if rows.Scan(&epoch, &objIdent, &objValue) != nil {
   394  			return changeCounts
   395  		}
   396  		prevРash, ok := md.ChangeState["configuration_hashes"][objIdent]
   397  		ll := l.WithField("setting", objIdent)
   398  		if ok { // we have existing state
   399  			if prevРash != objValue {
   400  				ll.Warningf("settings change detected: %s = %s (prev: %s)", objIdent, objValue, prevРash)
   401  				detectedChanges = append(detectedChanges, metrics.Measurement{
   402  					metrics.EpochColumnName: epoch,
   403  					"tag_setting":           objIdent,
   404  					"value":                 objValue,
   405  					"event":                 "alter"})
   406  				md.ChangeState["configuration_hashes"][objIdent] = objValue
   407  				changeCounts.Altered++
   408  			}
   409  		} else { // check for new, delete not relevant here (pg_upgrade)
   410  			md.ChangeState["configuration_hashes"][objIdent] = objValue
   411  			if firstRun {
   412  				continue
   413  			}
   414  			ll.Debug("new setting detected")
   415  			detectedChanges = append(detectedChanges, metrics.Measurement{
   416  				metrics.EpochColumnName: epoch,
   417  				"tag_setting":           objIdent,
   418  				"value":                 objValue,
   419  				"event":                 "create"})
   420  			changeCounts.Created++
   421  		}
   422  	}
   423  
   424  	l.Debugf("configuration changes detected: %d", len(detectedChanges))
   425  	if len(detectedChanges) > 0 {
   426  		r.measurementCh <- metrics.MeasurementEnvelope{
   427  			DBName:     md.Name,
   428  			MetricName: "configuration_changes",
   429  			Data:       detectedChanges,
   430  			CustomTags: md.CustomTags,
   431  		}
   432  	}
   433  	return changeCounts
   434  }
   435  
   436  // GetInstanceUpMeasurement returns a single measurement with "instance_up" metric
   437  // used to detect if the instance is up or down
   438  func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
   439  	err := md.Conn.Ping(ctx)
   440  	return metrics.Measurements{
   441  		metrics.Measurement{
   442  			metrics.EpochColumnName: time.Now().UnixNano(),
   443  			"instance_up": func() int {
   444  				if err == nil {
   445  					return 1
   446  				}
   447  				return 0
   448  			}(), // true if connection is up
   449  		},
   450  	}, err
   451  }
   452  
   453  func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
   454  	md.Lock()
   455  	defer md.Unlock()
   456  
   457  	spN := r.DetectSprocChanges(ctx, md)
   458  	tblN := r.DetectTableChanges(ctx, md)
   459  	idxN := r.DetectIndexChanges(ctx, md)
   460  	cnfN := r.DetectConfigurationChanges(ctx, md)
   461  	privN := r.DetectPrivilegeChanges(ctx, md)
   462  
   463  	if spN.Total()+tblN.Total()+idxN.Total()+cnfN.Total()+privN.Total() == 0 {
   464  		return nil, nil
   465  	}
   466  
   467  	m := metrics.NewMeasurement(time.Now().UnixNano())
   468  	m["details"] = strings.Join([]string{spN.String(), tblN.String(), idxN.String(), cnfN.String(), privN.String()}, " ")
   469  	return metrics.Measurements{m}, nil
   470  }
   471  
   472  // Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
   473  // With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
   474  // whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
   475  func TryCreateMissingExtensions(ctx context.Context, md *sources.SourceConn, extensionNames []string, existingExtensions map[string]int) []string {
   476  	// TODO: move to sources package and use direct pgx connection
   477  	sqlAvailable := `select name::text from pg_available_extensions`
   478  	extsCreated := make([]string, 0)
   479  
   480  	// For security reasons don't allow to execute random strings but check that it's an existing extension
   481  	data, err := QueryMeasurements(ctx, md, sqlAvailable)
   482  	if err != nil {
   483  		log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", md, err)
   484  		return extsCreated
   485  	}
   486  
   487  	availableExts := make(map[string]bool)
   488  	for _, row := range data {
   489  		availableExts[row["name"].(string)] = true
   490  	}
   491  
   492  	for _, extToCreate := range extensionNames {
   493  		if _, ok := existingExtensions[extToCreate]; ok {
   494  			continue
   495  		}
   496  		_, ok := availableExts[extToCreate]
   497  		if !ok {
   498  			log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", md, extToCreate)
   499  		} else {
   500  			sqlCreateExt := `create extension ` + extToCreate
   501  			_, err := QueryMeasurements(ctx, md, sqlCreateExt)
   502  			if err != nil {
   503  				log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", md, extToCreate, err)
   504  			}
   505  			extsCreated = append(extsCreated, extToCreate)
   506  		}
   507  	}
   508  
   509  	return extsCreated
   510  }
   511  
   512  // Called once on daemon startup to try to create "metric fething helper" functions automatically
   513  func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
   514  	sl := log.GetLogger(ctx).WithField("source", md.Name)
   515  	metrics := maps.Clone(md.Metrics)
   516  	maps.Insert(metrics, maps.All(md.MetricsStandby))
   517  	for metricName := range metrics {
   518  		metric, ok := metricDefs.GetMetricDef(metricName)
   519  		if !ok {
   520  			continue
   521  		}
   522  		if _, err = md.Conn.Exec(ctx, metric.InitSQL); err != nil {
   523  			return
   524  		}
   525  		sl.WithField("metric", metricName).Info("Successfully created metric fetching helpers")
   526  	}
   527  	return
   528  }
   529  
   530  func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) {
   531  	for _, prevDB := range r.prevLoopMonitoredDBs {
   532  		if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil { // removed from config
   533  			prevDB.Conn.Close()
   534  			_ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
   535  		}
   536  	}
   537  
   538  	for roleChangedDB := range shutDownDueToRoleChange {
   539  		if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil {
   540  			db.Conn.Close()
   541  		}
   542  		_ = r.SinksWriter.SyncMetric(roleChangedDB, "", sinks.DeleteOp)
   543  	}
   544  }
   545