...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/database.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"strings"
     8  	"time"
     9  
    10  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    11  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    12  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
    13  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    14  	"github.com/jackc/pgx/v5"
    15  )
    16  
    17  func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
    18  	if strings.TrimSpace(sql) == "" {
    19  		return nil, errors.New("empty SQL")
    20  	}
    21  
    22  	// For non-postgres connections (e.g. pgbouncer, pgpool), use simple protocol
    23  	if !md.IsPostgresSource() {
    24  		args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
    25  	}
    26  	// lock_timeout is set at connection level via RuntimeParams, no need for transaction wrapper
    27  	rows, err := md.Conn.Query(ctx, sql, args...)
    28  	if err == nil {
    29  		return pgx.CollectRows(rows, metrics.RowToMeasurement)
    30  	}
    31  	return nil, err
    32  }
    33  
    34  func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults) {
    35  	detectedChanges := make(metrics.Measurements, 0)
    36  	var firstRun bool
    37  	l := log.GetLogger(ctx)
    38  	changeCounts.Target = "functions"
    39  	l.Debug("checking for sproc changes...")
    40  	if _, ok := md.ChangeState["sproc_hashes"]; !ok {
    41  		firstRun = true
    42  		md.ChangeState["sproc_hashes"] = make(map[string]string)
    43  	}
    44  
    45  	mvp, ok := metricDefs.GetMetricDef("sproc_hashes")
    46  	if !ok {
    47  		l.Error("could not get sproc_hashes sql")
    48  		return
    49  	}
    50  
    51  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
    52  	if err != nil {
    53  		l.Error(err)
    54  		return
    55  	}
    56  
    57  	for _, dr := range data {
    58  		objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
    59  		prevHash, ok := md.ChangeState["sproc_hashes"][objIdent]
    60  		ll := l.WithField("sproc", dr["tag_sproc"]).WithField("oid", dr["tag_oid"])
    61  		if ok { // we have existing state
    62  			if prevHash != dr["md5"].(string) {
    63  				ll.Debug("change detected")
    64  				dr["event"] = "alter"
    65  				detectedChanges = append(detectedChanges, dr)
    66  				md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
    67  				changeCounts.Altered++
    68  			}
    69  		} else { // check for new / delete
    70  			if !firstRun {
    71  				ll.Debug("new sproc detected")
    72  				dr["event"] = "create"
    73  				detectedChanges = append(detectedChanges, dr)
    74  				changeCounts.Created++
    75  			}
    76  			md.ChangeState["sproc_hashes"][objIdent] = dr["md5"].(string)
    77  		}
    78  	}
    79  	// detect deletes
    80  	if !firstRun && len(md.ChangeState["sproc_hashes"]) != len(data) {
    81  		// turn resultset to map => [oid]=true for faster checks
    82  		currentOidMap := make(map[string]bool)
    83  		for _, dr := range data {
    84  			currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
    85  		}
    86  		for sprocIdent := range md.ChangeState["sproc_hashes"] {
    87  			_, ok := currentOidMap[sprocIdent]
    88  			if !ok {
    89  				splits := strings.Split(sprocIdent, dbMetricJoinStr)
    90  				l.WithField("sproc", splits[0]).WithField("oid", splits[1]).Debug("deleted sproc detected")
    91  				m := metrics.NewMeasurement(data.GetEpoch())
    92  				m["event"] = "drop"
    93  				m["tag_sproc"] = splits[0]
    94  				m["tag_oid"] = splits[1]
    95  				detectedChanges = append(detectedChanges, m)
    96  				delete(md.ChangeState["sproc_hashes"], sprocIdent)
    97  				changeCounts.Dropped++
    98  			}
    99  		}
   100  	}
   101  	l.Debugf("sproc changes detected: %d", len(detectedChanges))
   102  	if len(detectedChanges) > 0 {
   103  		r.measurementCh <- metrics.MeasurementEnvelope{
   104  			DBName:     md.Name,
   105  			MetricName: "sproc_changes",
   106  			Data:       detectedChanges,
   107  			CustomTags: md.CustomTags,
   108  		}
   109  	}
   110  
   111  	return changeCounts
   112  }
   113  
   114  func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   115  	detectedChanges := make(metrics.Measurements, 0)
   116  	var firstRun bool
   117  	var changeCounts ChangeDetectionResults
   118  	l := log.GetLogger(ctx)
   119  	changeCounts.Target = "tables"
   120  	l.Debug("checking for table changes...")
   121  	if _, ok := md.ChangeState["table_hashes"]; !ok {
   122  		firstRun = true
   123  		md.ChangeState["table_hashes"] = make(map[string]string)
   124  	}
   125  
   126  	mvp, ok := metricDefs.GetMetricDef("table_hashes")
   127  	if !ok {
   128  		l.Error("could not get table_hashes sql")
   129  		return changeCounts
   130  	}
   131  
   132  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   133  	if err != nil {
   134  		l.Error(err)
   135  		return changeCounts
   136  	}
   137  
   138  	for _, dr := range data {
   139  		objIdent := dr["tag_table"].(string)
   140  		prevHash, ok := md.ChangeState["table_hashes"][objIdent]
   141  		ll := l.WithField("table", dr["tag_table"])
   142  		if ok { // we have existing state
   143  			if prevHash != dr["md5"].(string) {
   144  				ll.Debug("change detected")
   145  				dr["event"] = "alter"
   146  				detectedChanges = append(detectedChanges, dr)
   147  				md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
   148  				changeCounts.Altered++
   149  			}
   150  		} else { // check for new / delete
   151  			if !firstRun {
   152  				ll.Debug("new table detected")
   153  				dr["event"] = "create"
   154  				detectedChanges = append(detectedChanges, dr)
   155  				changeCounts.Created++
   156  			}
   157  			md.ChangeState["table_hashes"][objIdent] = dr["md5"].(string)
   158  		}
   159  	}
   160  	// detect deletes
   161  	if !firstRun && len(md.ChangeState["table_hashes"]) != len(data) {
   162  		deletedTables := make([]string, 0)
   163  		// turn resultset to map => [table]=true for faster checks
   164  		currentTableMap := make(map[string]bool)
   165  		for _, dr := range data {
   166  			currentTableMap[dr["tag_table"].(string)] = true
   167  		}
   168  		for table := range md.ChangeState["table_hashes"] {
   169  			_, ok := currentTableMap[table]
   170  			if !ok {
   171  				l.WithField("table", table).Debug("deleted table detected")
   172  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   173  				influxEntry["event"] = "drop"
   174  				influxEntry["tag_table"] = table
   175  				detectedChanges = append(detectedChanges, influxEntry)
   176  				deletedTables = append(deletedTables, table)
   177  				changeCounts.Dropped++
   178  			}
   179  		}
   180  		for _, deletedTable := range deletedTables {
   181  			delete(md.ChangeState["table_hashes"], deletedTable)
   182  		}
   183  	}
   184  
   185  	l.Debugf("table changes detected: %d", len(detectedChanges))
   186  	if len(detectedChanges) > 0 {
   187  		r.measurementCh <- metrics.MeasurementEnvelope{
   188  			DBName:     md.Name,
   189  			MetricName: "table_changes",
   190  			Data:       detectedChanges,
   191  			CustomTags: md.CustomTags,
   192  		}
   193  	}
   194  
   195  	return changeCounts
   196  }
   197  
   198  func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   199  	detectedChanges := make(metrics.Measurements, 0)
   200  	var firstRun bool
   201  	var changeCounts ChangeDetectionResults
   202  	l := log.GetLogger(ctx)
   203  	changeCounts.Target = "indexes"
   204  	l.Debug("checking for index changes...")
   205  	if _, ok := md.ChangeState["index_hashes"]; !ok {
   206  		firstRun = true
   207  		md.ChangeState["index_hashes"] = make(map[string]string)
   208  	}
   209  
   210  	mvp, ok := metricDefs.GetMetricDef("index_hashes")
   211  	if !ok {
   212  		l.Error("could not get index_hashes sql")
   213  		return changeCounts
   214  	}
   215  
   216  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   217  	if err != nil {
   218  		l.Error(err)
   219  		return changeCounts
   220  	}
   221  
   222  	for _, dr := range data {
   223  		objIdent := dr["tag_index"].(string)
   224  		prevHash, ok := md.ChangeState["index_hashes"][objIdent]
   225  		ll := l.WithField("index", dr["tag_index"]).WithField("table", dr["table"])
   226  		if ok { // we have existing state
   227  			if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
   228  				ll.Debug("change detected")
   229  				dr["event"] = "alter"
   230  				detectedChanges = append(detectedChanges, dr)
   231  				md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   232  				changeCounts.Altered++
   233  			}
   234  		} else { // check for new / delete
   235  			if !firstRun {
   236  				ll.Debug("new index detected")
   237  				dr["event"] = "create"
   238  				detectedChanges = append(detectedChanges, dr)
   239  				changeCounts.Created++
   240  			}
   241  			md.ChangeState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
   242  		}
   243  	}
   244  	// detect deletes
   245  	if !firstRun && len(md.ChangeState["index_hashes"]) != len(data) {
   246  		deletedIndexes := make([]string, 0)
   247  		// turn resultset to map => [table]=true for faster checks
   248  		currentIndexMap := make(map[string]bool)
   249  		for _, dr := range data {
   250  			currentIndexMap[dr["tag_index"].(string)] = true
   251  		}
   252  		for indexName := range md.ChangeState["index_hashes"] {
   253  			_, ok := currentIndexMap[indexName]
   254  			if !ok {
   255  				l.WithField("index", indexName).Debug("deleted index detected")
   256  				influxEntry := metrics.NewMeasurement(data.GetEpoch())
   257  				influxEntry["event"] = "drop"
   258  				influxEntry["tag_index"] = indexName
   259  				detectedChanges = append(detectedChanges, influxEntry)
   260  				deletedIndexes = append(deletedIndexes, indexName)
   261  				changeCounts.Dropped++
   262  			}
   263  		}
   264  		for _, deletedIndex := range deletedIndexes {
   265  			delete(md.ChangeState["index_hashes"], deletedIndex)
   266  		}
   267  	}
   268  	l.Debugf("index changes detected: %d", len(detectedChanges))
   269  	if len(detectedChanges) > 0 {
   270  		r.measurementCh <- metrics.MeasurementEnvelope{
   271  			DBName:     md.Name,
   272  			MetricName: "index_changes",
   273  			Data:       detectedChanges,
   274  			CustomTags: md.CustomTags,
   275  		}
   276  	}
   277  
   278  	return changeCounts
   279  }
   280  
   281  func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   282  	detectedChanges := make(metrics.Measurements, 0)
   283  	var firstRun bool
   284  	var changeCounts ChangeDetectionResults
   285  	l := log.GetLogger(ctx)
   286  	changeCounts.Target = "privileges"
   287  	l.Debug("checking object privilege changes...")
   288  	if _, ok := md.ChangeState["object_privileges"]; !ok {
   289  		firstRun = true
   290  		md.ChangeState["object_privileges"] = make(map[string]string)
   291  	}
   292  
   293  	mvp, ok := metricDefs.GetMetricDef("privilege_changes")
   294  	if !ok || mvp.GetSQL(int(md.Version)) == "" {
   295  		l.Warning("could not get SQL for 'privilege_changes'. cannot detect privilege changes")
   296  		return changeCounts
   297  	}
   298  
   299  	// returns rows of: object_type, tag_role, tag_object, privilege_type
   300  	data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
   301  	if err != nil {
   302  		l.Error(err)
   303  		return changeCounts
   304  	}
   305  
   306  	currentState := make(map[string]bool)
   307  	for _, dr := range data {
   308  		objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
   309  		ll := l.WithField("role", dr["tag_role"]).
   310  			WithField("object_type", dr["object_type"]).
   311  			WithField("object", dr["tag_object"]).
   312  			WithField("privilege_type", dr["privilege_type"])
   313  		if firstRun {
   314  			md.ChangeState["object_privileges"][objIdent] = ""
   315  		} else {
   316  			_, ok := md.ChangeState["object_privileges"][objIdent]
   317  			if !ok {
   318  				ll.Debug("new object privileges detected")
   319  				dr["event"] = "GRANT"
   320  				detectedChanges = append(detectedChanges, dr)
   321  				changeCounts.Created++
   322  				md.ChangeState["object_privileges"][objIdent] = ""
   323  			}
   324  			currentState[objIdent] = true
   325  		}
   326  	}
   327  	// check revokes - exists in old state only
   328  	if !firstRun && len(currentState) > 0 {
   329  		for objPrevRun := range md.ChangeState["object_privileges"] {
   330  			if _, ok := currentState[objPrevRun]; !ok {
   331  				splits := strings.Split(objPrevRun, "#:#")
   332  				l.WithField("role", splits[1]).
   333  					WithField("object_type", splits[0]).
   334  					WithField("object", splits[2]).
   335  					WithField("privilege_type", splits[3]).
   336  					Debug("removed object privileges detected")
   337  				revokeEntry := metrics.NewMeasurement(data.GetEpoch())
   338  				revokeEntry["object_type"] = splits[0]
   339  				revokeEntry["tag_role"] = splits[1]
   340  				revokeEntry["tag_object"] = splits[2]
   341  				revokeEntry["privilege_type"] = splits[3]
   342  				revokeEntry["event"] = "REVOKE"
   343  				detectedChanges = append(detectedChanges, revokeEntry)
   344  				changeCounts.Dropped++
   345  				delete(md.ChangeState["object_privileges"], objPrevRun)
   346  			}
   347  		}
   348  	}
   349  
   350  	l.Debugf("object privilege changes detected: %d", len(detectedChanges))
   351  	if len(detectedChanges) > 0 {
   352  		r.measurementCh <- metrics.MeasurementEnvelope{
   353  			DBName:     md.Name,
   354  			MetricName: "privilege_changes",
   355  			Data:       detectedChanges,
   356  			CustomTags: md.CustomTags,
   357  		}
   358  	}
   359  
   360  	return changeCounts
   361  }
   362  
   363  func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults {
   364  	detectedChanges := make(metrics.Measurements, 0)
   365  	var firstRun bool
   366  	var changeCounts ChangeDetectionResults
   367  	l := log.GetLogger(ctx)
   368  	changeCounts.Target = "settings"
   369  	l.Debug("checking for configuration changes...")
   370  	if _, ok := md.ChangeState["configuration_hashes"]; !ok {
   371  		firstRun = true
   372  		md.ChangeState["configuration_hashes"] = make(map[string]string)
   373  	}
   374  
   375  	mvp, ok := metricDefs.GetMetricDef("configuration_hashes")
   376  	if !ok {
   377  		l.Error("could not get configuration_hashes sql")
   378  		return changeCounts
   379  	}
   380  
   381  	rows, err := md.Conn.Query(ctx, mvp.GetSQL(md.Version))
   382  	if err != nil {
   383  		l.Error(err)
   384  		return changeCounts
   385  	}
   386  	defer rows.Close()
   387  	var (
   388  		objIdent, objValue string
   389  		epoch              int64
   390  	)
   391  	for rows.Next() {
   392  		if rows.Scan(&epoch, &objIdent, &objValue) != nil {
   393  			return changeCounts
   394  		}
   395  		prevРash, ok := md.ChangeState["configuration_hashes"][objIdent]
   396  		ll := l.WithField("setting", objIdent)
   397  		if ok { // we have existing state
   398  			if prevРash != objValue {
   399  				ll.Warningf("settings change detected: %s = %s (prev: %s)", objIdent, objValue, prevРash)
   400  				detectedChanges = append(detectedChanges, metrics.Measurement{
   401  					metrics.EpochColumnName: epoch,
   402  					"tag_setting":           objIdent,
   403  					"value":                 objValue,
   404  					"event":                 "alter"})
   405  				md.ChangeState["configuration_hashes"][objIdent] = objValue
   406  				changeCounts.Altered++
   407  			}
   408  		} else { // check for new, delete not relevant here (pg_upgrade)
   409  			md.ChangeState["configuration_hashes"][objIdent] = objValue
   410  			if firstRun {
   411  				continue
   412  			}
   413  			ll.Debug("new setting detected")
   414  			detectedChanges = append(detectedChanges, metrics.Measurement{
   415  				metrics.EpochColumnName: epoch,
   416  				"tag_setting":           objIdent,
   417  				"value":                 objValue,
   418  				"event":                 "create"})
   419  			changeCounts.Created++
   420  		}
   421  	}
   422  
   423  	l.Debugf("configuration changes detected: %d", len(detectedChanges))
   424  	if len(detectedChanges) > 0 {
   425  		r.measurementCh <- metrics.MeasurementEnvelope{
   426  			DBName:     md.Name,
   427  			MetricName: "configuration_changes",
   428  			Data:       detectedChanges,
   429  			CustomTags: md.CustomTags,
   430  		}
   431  	}
   432  	return changeCounts
   433  }
   434  
   435  // GetInstanceUpMeasurement returns a single measurement with "instance_up" metric
   436  // used to detect if the instance is up or down
   437  func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
   438  	return metrics.Measurements{
   439  		metrics.Measurement{
   440  			metrics.EpochColumnName: time.Now().UnixNano(),
   441  			"instance_up": func() int {
   442  				if md.Conn.Ping(ctx) == nil {
   443  					return 1
   444  				}
   445  				return 0
   446  			}(), // true if connection is up
   447  		},
   448  	}, nil // always return nil error for the status metric
   449  }
   450  
   451  func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
   452  	md.Lock()
   453  	defer md.Unlock()
   454  
   455  	spN := r.DetectSprocChanges(ctx, md)
   456  	tblN := r.DetectTableChanges(ctx, md)
   457  	idxN := r.DetectIndexChanges(ctx, md)
   458  	cnfN := r.DetectConfigurationChanges(ctx, md)
   459  	privN := r.DetectPrivilegeChanges(ctx, md)
   460  
   461  	if spN.Total()+tblN.Total()+idxN.Total()+cnfN.Total()+privN.Total() == 0 {
   462  		return nil, nil
   463  	}
   464  
   465  	m := metrics.NewMeasurement(time.Now().UnixNano())
   466  	m["details"] = strings.Join([]string{spN.String(), tblN.String(), idxN.String(), cnfN.String(), privN.String()}, " ")
   467  	return metrics.Measurements{m}, nil
   468  }
   469  
   470  func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool) {
   471  	for _, prevDB := range r.prevLoopMonitoredDBs {
   472  		if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil { // removed from config
   473  			prevDB.Conn.Close()
   474  			_ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
   475  		}
   476  	}
   477  
   478  	for toShutDownDB := range hostsToShutDown {
   479  		if db := r.monitoredSources.GetMonitoredDatabase(toShutDownDB); db != nil {
   480  			db.Conn.Close()
   481  		}
   482  		_ = r.SinksWriter.SyncMetric(toShutDownDB, "", sinks.DeleteOp)
   483  	}
   484  }
   485