...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/database.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"strings"
     8  	"time"
     9  
    10  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    11  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    12  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
    13  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    14  	"github.com/jackc/pgx/v5"
    15  )
    16  
    17  func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
    18  	if strings.TrimSpace(sql) == "" {
    19  		return nil, errors.New("empty SQL")
    20  	}
    21  	// For non-postgres connections (e.g. pgbouncer, pgpool), use simple protocol
    22  	if !md.IsPostgresSource() {
    23  		args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
    24  	}
    25  	// lock_timeout is set at connection level via RuntimeParams, no need for transaction wrapper
    26  	rows, err := md.Conn.Query(ctx, sql, args...)
    27  	if err == nil {
    28  		return pgx.CollectRows(rows, metrics.RowToMeasurement)
    29  	}
    30  	return nil, err
    31  }
    32  
    33  func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults) {
    34  	detectedChanges := make(metrics.Measurements, 0)
    35  	var firstRun bool
    36  	l := log.GetLogger(ctx)
    37  	changeCounts.Target = "functions"
    38  	l.Debug("checking for sproc changes...")
    39  	if _, ok := md.ChangeState["sproc_hashes"]; !ok {
    40  		firstRun = true
    41  		md.ChangeState["sproc_hashes"] = make(map[string]string)
    42  	}
    43  	mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
    44  	if !ok {
    45  		l.Error("could not get sproc_hashes sql")
    46  		return
    47  	}
    48  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
    49  	if err != nil {
    50  		l.Error(err)
    51  		return
    52  	}
    53  	for _, dr := range data {
    54  		objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
    55  		prevHash, ok := md.ChangeState["sproc_hashes"][objIdent]
    56  		ll := l.WithField("sproc", dr["tag_sproc"]).WithField("oid", dr["tag_oid"])
    57  		if ok { // we have existing state
    58  			if prevHash != dr["md5"].(string) {
    59  				ll.Debug("change detected")
    60  				dr["event"] = "alter"
    61  				detectedChanges = append(detectedChanges, dr)
    62  				md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
    63  				changeCounts.Altered++
    64  			}
    65  		} else { // check for new / delete
    66  			if !firstRun {
    67  				ll.Debug("new sproc detected")
    68  				dr["event"] = "create"
    69  				detectedChanges = append(detectedChanges, dr)
    70  				changeCounts.Created++
    71  			}
    72  			md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
    73  		}
    74  	}
    75  	// detect deletes
    76  	if !firstRun && len(md.ChangeState["sproc_hashes"]) != len(data) {
    77  		// turn resultset to map => [oid]=true for faster checks
    78  		currentOidMap := make(map[string]bool)
    79  		for _, dr := range data {
    80  			currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
    81  		}
    82  		for sprocIdent := range md.ChangeState["sproc_hashes"] {
    83  			_, ok := currentOidMap[sprocIdent]
    84  			if !ok {
    85  				splits := strings.Split(sprocIdent, dbMetricJoinStr)
    86  				l.WithField("sproc", splits[0]).WithField("oid", splits[1]).Debug("deleted sproc detected")
    87  				m := metrics.NewMeasurement(data.GetEpoch())
    88  				m["event"] = "drop"
    89  				m["tag_sproc"] = splits[0]
    90  				m["tag_oid"] = splits[1]
    91  				detectedChanges = append(detectedChanges, m)
    92  				delete(md.ChangeState["sproc_hashes"], sprocIdent)
    93  				changeCounts.Dropped++
    94  			}
    95  		}
    96  	}
    97  	l.Debugf("sproc changes detected: %d", len(detectedChanges))
    98  	if len(detectedChanges) > 0 {
    99  		r.measurementCh <- metrics.MeasurementEnvelope{
   100  			DBName:     md.Name,
   101  			MetricName: "sproc_changes",
   102  			Data:       detectedChanges,
   103  			CustomTags: md.CustomTags,
   104  		}
   105  	}
   106  	return changeCounts
   107  }
   108  
   109  func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   110  	detectedChanges := make(metrics.Measurements, 0)
   111  	var firstRun bool
   112  	var changeCounts ChangeDetectionResults
   113  	l := log.GetLogger(ctx)
   114  	changeCounts.Target = "tables"
   115  	l.Debug("checking for table changes...")
   116  	if _, ok := md.ChangeState["table_hashes"]; !ok {
   117  		firstRun = true
   118  		md.ChangeState["table_hashes"] = make(map[string]string)
   119  	}
   120  	mvp, ok := metricDefs.GetMetricDef("table_hashes")
   121  	if !ok {
   122  		l.Error("could not get table_hashes sql")
   123  		return changeCounts
   124  	}
   125  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   126  	if err != nil {
   127  		l.Error(err)
   128  		return changeCounts
   129  	}
   130  	for _, dr := range data {
   131  		objIdent := dr["tag_table"].(string)
   132  		prevHash, ok := md.ChangeState["table_hashes"][objIdent]
   133  		ll := l.WithField("table", dr["tag_table"])
   134  		if ok { // we have existing state
   135  			if prevHash != dr["md5"].(string) {
   136  				ll.Debug("change detected")
   137  				dr["event"] = "alter"
   138  				detectedChanges = append(detectedChanges, dr)
   139  				md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
   140  				changeCounts.Altered++
   141  			}
   142  		} else { // check for new / delete
   143  			if !firstRun {
   144  				ll.Debug("new table detected")
   145  				dr["event"] = "create"
   146  				detectedChanges = append(detectedChanges, dr)
   147  				changeCounts.Created++
   148  			}
   149  			md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
   150  		}
   151  	}
   152  	// detect deletes
   153  	if !firstRun && len(md.ChangeState["table_hashes"]) != len(data) {
   154  		deletedTables := make([]string, 0)
   155  		// turn resultset to map => [table]=true for faster checks
   156  		currentTableMap := make(map[string]bool)
   157  		for _, dr := range data {
   158  			currentTableMap[dr["tag_table"].(string)] = true
   159  		}
   160  		for table := range md.ChangeState["table_hashes"] {
   161  			_, ok := currentTableMap[table]
   162  			if !ok {
   163  				l.WithField("table", table).Debug("deleted table detected")
   164  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   165  				influxEntry["event"] = "drop"
   166  				influxEntry["tag_table"] = table
   167  				detectedChanges = append(detectedChanges, influxEntry)
   168  				deletedTables = append(deletedTables, table)
   169  				changeCounts.Dropped++
   170  			}
   171  		}
   172  		for _, deletedTable := range deletedTables {
   173  			delete(md.ChangeState["table_hashes"], deletedTable)
   174  		}
   175  	}
   176  	l.Debugf("table changes detected: %d", len(detectedChanges))
   177  	if len(detectedChanges) > 0 {
   178  		r.measurementCh <- metrics.MeasurementEnvelope{
   179  			DBName:     md.Name,
   180  			MetricName: "table_changes",
   181  			Data:       detectedChanges,
   182  			CustomTags: md.CustomTags,
   183  		}
   184  	}
   185  	return changeCounts
   186  }
   187  
   188  func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   189  	detectedChanges := make(metrics.Measurements, 0)
   190  	var firstRun bool
   191  	var changeCounts ChangeDetectionResults
   192  	l := log.GetLogger(ctx)
   193  	changeCounts.Target = "indexes"
   194  	l.Debug("checking for index changes...")
   195  	if _, ok := md.ChangeState["index_hashes"]; !ok {
   196  		firstRun = true
   197  		md.ChangeState["index_hashes"] = make(map[string]string)
   198  	}
   199  	mvp, ok := metricDefs.GetMetricDef("index_hashes")
   200  	if !ok {
   201  		l.Error("could not get index_hashes sql")
   202  		return changeCounts
   203  	}
   204  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   205  	if err != nil {
   206  		l.Error(err)
   207  		return changeCounts
   208  	}
   209  	for _, dr := range data {
   210  		objIdent := dr["tag_index"].(string)
   211  		prevHash, ok := md.ChangeState["index_hashes"][objIdent]
   212  		ll := l.WithField("index", dr["tag_index"]).WithField("table", dr["table"])
   213  		if ok { // we have existing state
   214  			if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
   215  				ll.Debug("change detected")
   216  				dr["event"] = "alter"
   217  				detectedChanges = append(detectedChanges, dr)
   218  				md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   219  				changeCounts.Altered++
   220  			}
   221  		} else { // check for new / delete
   222  			if !firstRun {
   223  				ll.Debug("new index detected")
   224  				dr["event"] = "create"
   225  				detectedChanges = append(detectedChanges, dr)
   226  				changeCounts.Created++
   227  			}
   228  			md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   229  		}
   230  	}
   231  	// detect deletes
   232  	if !firstRun && len(md.ChangeState["index_hashes"]) != len(data) {
   233  		deletedIndexes := make([]string, 0)
   234  		// turn resultset to map => [table]=true for faster checks
   235  		currentIndexMap := make(map[string]bool)
   236  		for _, dr := range data {
   237  			currentIndexMap[dr["tag_index"].(string)] = true
   238  		}
   239  		for indexName := range md.ChangeState["index_hashes"] {
   240  			_, ok := currentIndexMap[indexName]
   241  			if !ok {
   242  				l.WithField("index", indexName).Debug("deleted index detected")
   243  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   244  				influxEntry["event"] = "drop"
   245  				influxEntry["tag_index"] = indexName
   246  				detectedChanges = append(detectedChanges, influxEntry)
   247  				deletedIndexes = append(deletedIndexes, indexName)
   248  				changeCounts.Dropped++
   249  			}
   250  		}
   251  		for _, deletedIndex := range deletedIndexes {
   252  			delete(md.ChangeState["index_hashes"], deletedIndex)
   253  		}
   254  	}
   255  	l.Debugf("index changes detected: %d", len(detectedChanges))
   256  	if len(detectedChanges) > 0 {
   257  		r.measurementCh <- metrics.MeasurementEnvelope{
   258  			DBName:     md.Name,
   259  			MetricName: "index_changes",
   260  			Data:       detectedChanges,
   261  			CustomTags: md.CustomTags,
   262  		}
   263  	}
   264  	return changeCounts
   265  }
   266  
   267  func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   268  	detectedChanges := make(metrics.Measurements, 0)
   269  	var firstRun bool
   270  	var changeCounts ChangeDetectionResults
   271  	l := log.GetLogger(ctx)
   272  	changeCounts.Target = "privileges"
   273  	l.Debug("checking object privilege changes...")
   274  	if _, ok := md.ChangeState["object_privileges"]; !ok {
   275  		firstRun = true
   276  		md.ChangeState["object_privileges"] = make(map[string]string)
   277  	}
   278  	mvp, ok := metricDefs.GetMetricDef("privilege_changes")
   279  	if !ok || mvp.GetSQL(int(md.Version)) == "" {
   280  		l.Warning("could not get SQL for 'privilege_changes'. cannot detect privilege changes")
   281  		return changeCounts
   282  	}
   283  	// returns rows of: object_type, tag_role, tag_object, privilege_type
   284  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   285  	if err != nil {
   286  		l.Error(err)
   287  		return changeCounts
   288  	}
   289  	currentState := make(map[string]bool)
   290  	for _, dr := range data {
   291  		objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
   292  		ll := l.WithField("role", dr["tag_role"]).
   293  			WithField("object_type", dr["object_type"]).
   294  			WithField("object", dr["tag_object"]).
   295  			WithField("privilege_type", dr["privilege_type"])
   296  		if firstRun {
   297  			md.ChangeState["object_privileges"][objIdent] = ""
   298  		} else {
   299  			_, ok := md.ChangeState["object_privileges"][objIdent]
   300  			if !ok {
   301  				ll.Debug("new object privileges detected")
   302  				dr["event"] = "GRANT"
   303  				detectedChanges = append(detectedChanges, dr)
   304  				changeCounts.Created++
   305  				md.ChangeState["object_privileges"][objIdent] = ""
   306  			}
   307  			currentState[objIdent] = true
   308  		}
   309  	}
   310  	// check revokes - exists in old state only
   311  	if !firstRun && len(currentState) > 0 {
   312  		for objPrevRun := range md.ChangeState["object_privileges"] {
   313  			if _, ok := currentState[objPrevRun]; !ok {
   314  				splits := strings.Split(objPrevRun, "#:#")
   315  				l.WithField("role", splits[1]).
   316  					WithField("object_type", splits[0]).
   317  					WithField("object", splits[2]).
   318  					WithField("privilege_type", splits[3]).
   319  					Debug("removed object privileges detected")
   320  				revokeEntry := metrics.NewMeasurement(data.GetEpoch())
   321  				revokeEntry["object_type"] = splits[0]
   322  				revokeEntry["tag_role"] = splits[1]
   323  				revokeEntry["tag_object"] = splits[2]
   324  				revokeEntry["privilege_type"] = splits[3]
   325  				revokeEntry["event"] = "REVOKE"
   326  				detectedChanges = append(detectedChanges, revokeEntry)
   327  				changeCounts.Dropped++
   328  				delete(md.ChangeState["object_privileges"], objPrevRun)
   329  			}
   330  		}
   331  	}
   332  	l.Debugf("object privilege changes detected: %d", len(detectedChanges))
   333  	if len(detectedChanges) > 0 {
   334  		r.measurementCh <- metrics.MeasurementEnvelope{
   335  			DBName:     md.Name,
   336  			MetricName: "privilege_changes",
   337  			Data:       detectedChanges,
   338  			CustomTags: md.CustomTags,
   339  		}
   340  	}
   341  	return changeCounts
   342  }
   343  
   344  func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   345  	detectedChanges := make(metrics.Measurements, 0)
   346  	var firstRun bool
   347  	var changeCounts ChangeDetectionResults
   348  	l := log.GetLogger(ctx)
   349  	changeCounts.Target = "settings"
   350  	l.Debug("checking for configuration changes...")
   351  	if _, ok := md.ChangeState["configuration_hashes"]; !ok {
   352  		firstRun = true
   353  		md.ChangeState["configuration_hashes"] = make(map[string]string)
   354  	}
   355  	mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
   356  	if !ok {
   357  		l.Error("could not get configuration_hashes sql")
   358  		return changeCounts
   359  	}
   360  	rows, err := md.Conn.Query(ctx, mvp.GetSQL(md.Version))
   361  	if err != nil {
   362  		l.Error(err)
   363  		return changeCounts
   364  	}
   365  	defer rows.Close()
   366  	var (
   367  		objIdent, objValue string
   368  		epoch              int64
   369  	)
   370  	for rows.Next() {
   371  		if rows.Scan(&epoch, &objIdent, &objValue) != nil {
   372  			return changeCounts
   373  		}
   374  		prevРash, ok := md.ChangeState["configuration_hashes"][objIdent]
   375  		ll := l.WithField("setting", objIdent)
   376  		if ok { // we have existing state
   377  			if prevРash != objValue {
   378  				ll.Warningf("settings change detected: %s = %s (prev: %s)", objIdent, objValue, prevРash)
   379  				detectedChanges = append(detectedChanges, metrics.Measurement{
   380  					metrics.EpochColumnName: epoch,
   381  					"tag_setting":           objIdent,
   382  					"value":                 objValue,
   383  					"event":                 "alter"})
   384  				md.ChangeState["configuration_hashes"][objIdent] = objValue
   385  				changeCounts.Altered++
   386  			}
   387  		} else { // check for new, delete not relevant here (pg_upgrade)
   388  			md.ChangeState["configuration_hashes"][objIdent] = objValue
   389  			if firstRun {
   390  				continue
   391  			}
   392  			ll.Debug("new setting detected")
   393  			detectedChanges = append(detectedChanges, metrics.Measurement{
   394  				metrics.EpochColumnName: epoch,
   395  				"tag_setting":           objIdent,
   396  				"value":                 objValue,
   397  				"event":                 "create"})
   398  			changeCounts.Created++
   399  		}
   400  	}
   401  	l.Debugf("configuration changes detected: %d", len(detectedChanges))
   402  	if len(detectedChanges) > 0 {
   403  		r.measurementCh <- metrics.MeasurementEnvelope{
   404  			DBName:     md.Name,
   405  			MetricName: "configuration_changes",
   406  			Data:       detectedChanges,
   407  			CustomTags: md.CustomTags,
   408  		}
   409  	}
   410  	return changeCounts
   411  }
   412  
   413  // GetInstanceUpMeasurement returns a single measurement with "instance_up" metric
   414  // used to detect if the instance is up or down
   415  func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
   416  	return metrics.Measurements{
   417  		metrics.Measurement{
   418  			metrics.EpochColumnName: time.Now().UnixNano(),
   419  			"instance_up": func() int {
   420  				if md.Conn.Ping(ctx) == nil {
   421  					return 1
   422  				}
   423  				return 0
   424  			}(), // true if connection is up
   425  		},
   426  	}, nil // always return nil error for the status metric
   427  }
   428  
   429  func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
   430  	md.Lock()
   431  	defer md.Unlock()
   432  	spN := r.DetectSprocChanges(ctx, md)
   433  	tblN := r.DetectTableChanges(ctx, md)
   434  	idxN := r.DetectIndexChanges(ctx, md)
   435  	cnfN := r.DetectConfigurationChanges(ctx, md)
   436  	privN := r.DetectPrivilegeChanges(ctx, md)
   437  	if spN.Total()+tblN.Total()+idxN.Total()+cnfN.Total()+privN.Total() == 0 {
   438  		return nil, nil
   439  	}
   440  	m := metrics.NewMeasurement(time.Now().UnixNano())
   441  	m["details"] = strings.Join([]string{spN.String(), tblN.String(), idxN.String(), cnfN.String(), privN.String()}, " ")
   442  	return metrics.Measurements{m}, nil
   443  }
   444  func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool) {
   445  	for _, prevDB := range r.prevLoopMonitoredDBs {
   446  		if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil { // removed from config
   447  			prevDB.Conn.Close()
   448  			_ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
   449  		}
   450  	}
   451  	for toShutDownDB := range hostsToShutDown {
   452  		if db := r.monitoredSources.GetMonitoredDatabase(toShutDownDB); db != nil {
   453  			db.Conn.Close()
   454  		}
   455  		_ = r.SinksWriter.SyncMetric(toShutDownDB, "", sinks.DeleteOp)
   456  	}
   457  }
   458