...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/reaper.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"runtime"
     6  	"slices"
     7  	"strings"
     8  	"time"
     9  
    10  	"sync/atomic"
    11  
    12  	"github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
    13  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    14  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    15  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
    16  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    17  )
    18  
    19  const (
    20  	specialMetricChangeEvents         = "change_events"
    21  	specialMetricServerLogEventCounts = "server_log_event_counts"
    22  	specialMetricInstanceUp           = "instance_up"
    23  )
    24  
    25  var specialMetrics = map[string]bool{specialMetricChangeEvents: true, specialMetricServerLogEventCounts: true}
    26  
    27  var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
    28  var metricsConfig metrics.MetricIntervals                 // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
    29  var metricDefs = NewConcurrentMetricDefs()
    30  
    31  // Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
    32  type Reaper struct {
    33  	*cmdopts.Options
    34  	ready                atomic.Bool
    35  	measurementCh        chan metrics.MeasurementEnvelope
    36  	measurementCache     *InstanceMetricCache
    37  	logger               log.Logger
    38  	monitoredSources     sources.SourceConns
    39  	prevLoopMonitoredDBs sources.SourceConns
    40  	cancelFuncs          map[string]context.CancelFunc // [sourceName]cancel() — one per source
    41  	sourceReapers        map[string]*SourceReaper      // [sourceName] — active SourceReaper instances
    42  }
    43  
    44  // NewReaper creates a new Reaper instance
    45  func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
    46  	return &Reaper{
    47  		Options:              opts,
    48  		measurementCh:        make(chan metrics.MeasurementEnvelope, 256),
    49  		measurementCache:     NewInstanceMetricCache(),
    50  		logger:               log.GetLogger(ctx),
    51  		monitoredSources:     make(sources.SourceConns, 0),
    52  		prevLoopMonitoredDBs: make(sources.SourceConns, 0),
    53  		cancelFuncs:          make(map[string]context.CancelFunc), // [sourceName]cancel()
    54  		sourceReapers:        make(map[string]*SourceReaper),
    55  	}
    56  }
    57  
    58  // Ready() returns true if the service is healthy and operating correctly
    59  func (r *Reaper) Ready() bool {
    60  	return r.ready.Load()
    61  }
    62  
    63  func (r *Reaper) PrintMemStats() {
    64  	var m runtime.MemStats
    65  	runtime.ReadMemStats(&m)
    66  
    67  	bToKb := func(b uint64) uint64 {
    68  		return b / 1024
    69  	}
    70  	r.logger.Debugf("Alloc: %d Kb, TotalAlloc: %d Kb, Sys: %d Kb, NumGC: %d, HeapAlloc: %d Kb, HeapSys: %d Kb",
    71  		bToKb(m.Alloc), bToKb(m.TotalAlloc), bToKb(m.Sys), m.NumGC, bToKb(m.HeapAlloc), bToKb(m.HeapSys))
    72  }
    73  
    74  // Reap() starts the main monitoring loop. It is responsible for fetching metrics measurements
    75  // from the sources and storing them to the sinks. It also manages the lifecycle of
    76  // the metric gatherers. In case of a source or metric definition change, it will
    77  // start or stop the gatherers accordingly.
    78  func (r *Reaper) Reap(ctx context.Context) {
    79  	var err error
    80  	logger := r.logger
    81  
    82  	go r.WriteMeasurements(ctx)
    83  
    84  	r.ready.Store(true)
    85  
    86  	for { //main loop
    87  		if r.Logging.LogLevel == "debug" {
    88  			r.PrintMemStats()
    89  		}
    90  		if err = r.LoadSources(ctx); err != nil {
    91  			logger.WithError(err).Error("could not refresh active sources, using last valid cache")
    92  		}
    93  		if err = r.LoadMetrics(); err != nil {
    94  			logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
    95  		}
    96  
    97  		// UpdateMonitoredDBCache(r.monitoredSources)
    98  		hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set
    99  		for _, monitoredSource := range r.monitoredSources {
   100  			srcL := logger.WithField("source", monitoredSource.Name)
   101  			ctx = log.WithLogger(ctx, srcL)
   102  
   103  			if monitoredSource.Connect(ctx, r.Sources) != nil {
   104  				r.WriteInstanceDown(monitoredSource.Name)
   105  				srcL.Warning("could not init connection, retrying on next iteration")
   106  				continue
   107  			}
   108  
   109  			if err = monitoredSource.FetchRuntimeInfo(ctx, true); err != nil {
   110  				srcL.WithError(err).Error("could not start metric gathering")
   111  				continue
   112  			}
   113  			srcL.WithField("recovery", monitoredSource.IsInRecovery).Infof("Connect OK. Version: %s", monitoredSource.VersionStr)
   114  			if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
   115  				srcL.Info("not added to monitoring due to 'master only' property")
   116  				if monitoredSource.IsPostgresSource() {
   117  					srcL.Info("to be removed from monitoring due to 'master only' property and status change")
   118  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
   119  				}
   120  				continue
   121  			}
   122  
   123  			if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   124  				metricsConfig = monitoredSource.MetricsStandby
   125  			} else {
   126  				metricsConfig = monitoredSource.Metrics
   127  			}
   128  
   129  			r.CreateSourceHelpers(ctx, srcL, monitoredSource)
   130  
   131  			if monitoredSource.IsPostgresSource() {
   132  				DBSizeMB := monitoredSource.ApproxDbSize / 1048576 // only remove from monitoring when we're certain it's under the threshold
   133  				if DBSizeMB != 0 && DBSizeMB < r.Sources.MinDbSizeMB {
   134  					srcL.Infof("ignored due to the --min-db-size-mb filter, current size %d MB", DBSizeMB)
   135  					hostsToShutDownDueToRoleChange[monitoredSource.Name] = true // for the case when DB size was previously above the threshold
   136  					continue
   137  				}
   138  
   139  				lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
   140  				if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
   141  					if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
   142  						srcL.Warning("Switching metrics collection to standby config...")
   143  						metricsConfig = monitoredSource.MetricsStandby
   144  					} else if !monitoredSource.IsInRecovery {
   145  						srcL.Warning("Switching metrics collection to primary config...")
   146  						metricsConfig = monitoredSource.Metrics
   147  					}
   148  					// else: it already has primary config do nothing + no warn
   149  				}
   150  			}
   151  			hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
   152  
   153  			// Sync metric names with sinks for the active config
   154  			for metricName := range metricsConfig {
   155  				mvp, metricDefExists := metricDefs.GetMetricDef(metricName)
   156  				if !metricDefExists {
   157  					epoch, ok := lastSQLFetchError.Load(metricName)
   158  					if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) {
   159  						srcL.WithField("metric", metricName).Warning("metric definition not found")
   160  						lastSQLFetchError.Store(metricName, time.Now().Unix())
   161  					}
   162  					continue
   163  				}
   164  				metricNameForStorage := metricName
   165  				if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric && mvp.StorageName > "" {
   166  					metricNameForStorage = mvp.StorageName
   167  				}
   168  				if err := r.SinksWriter.SyncMetric(monitoredSource.Name, metricNameForStorage, sinks.AddOp); err != nil {
   169  					srcL.Error(err)
   170  				}
   171  			}
   172  
   173  			// Start SourceReaper for this source if not already running
   174  			if _, exists := r.sourceReapers[monitoredSource.Name]; !exists {
   175  				srcL.Info("starting source reaper")
   176  				sr := NewSourceReaper(r, monitoredSource)
   177  				sourceCtx, cancelFunc := context.WithCancel(ctx)
   178  				r.cancelFuncs[monitoredSource.Name] = cancelFunc
   179  				r.sourceReapers[monitoredSource.Name] = sr
   180  				go sr.Run(sourceCtx)
   181  			}
   182  		}
   183  
   184  		r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
   185  
   186  		r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
   187  		select {
   188  		case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
   189  			logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
   190  		case <-ctx.Done():
   191  			return
   192  		}
   193  	}
   194  }
   195  
   196  // CreateSourceHelpers creates the extensions and metric helpers for the monitored source
   197  func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monitoredSource *sources.SourceConn) {
   198  	if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
   199  		return // already created
   200  	}
   201  	if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
   202  		return // no need to create anything for non-postgres sources
   203  	}
   204  
   205  	if r.Sources.TryCreateListedExtsIfMissing > "" {
   206  		srcL.Info("trying to create extensions if missing")
   207  		extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
   208  		extsCreated, err := monitoredSource.TryCreateMissingExtensions(ctx, extsToCreate)
   209  		if err != nil {
   210  			srcL.Warning(err)
   211  		}
   212  		if extsCreated != "" {
   213  			srcL.Infof("%d/%d extensions created: %s", len(extsCreated), len(extsToCreate), extsCreated)
   214  		}
   215  	}
   216  
   217  	if r.Sources.CreateHelpers {
   218  		srcL.Info("trying to create helper objects if missing")
   219  		if err := monitoredSource.TryCreateMetricsHelpers(ctx, func(metric string) string {
   220  			if m, ok := metricDefs.GetMetricDef(metric); ok {
   221  				return m.InitSQL
   222  			}
   223  			return ""
   224  		}); err != nil {
   225  			srcL.Warning(err)
   226  		}
   227  	}
   228  
   229  }
   230  
   231  func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool) {
   232  	logger := r.logger
   233  	// loop over existing source reapers and stop if DB removed from config
   234  	// or state change makes it uninteresting
   235  	logger.Debug("checking if any workers need to be shut down...")
   236  	for sourceName, cancelFunc := range r.cancelFuncs {
   237  		var dbRemovedFromConfig bool
   238  
   239  		_, wholeDbShutDown := hostsToShutDown[sourceName]
   240  		if !wholeDbShutDown {
   241  			md := r.monitoredSources.GetMonitoredDatabase(sourceName)
   242  			if md == nil { // normal removing of DB from config
   243  				dbRemovedFromConfig = true
   244  				logger.Debugf("DB %s removed from config, shutting down source reaper...", sourceName)
   245  			}
   246  		}
   247  
   248  		if ctx.Err() != nil || wholeDbShutDown || dbRemovedFromConfig {
   249  			logger.WithField("source", sourceName).Info("stopping source reaper...")
   250  			cancelFunc()
   251  			delete(r.cancelFuncs, sourceName)
   252  			delete(r.sourceReapers, sourceName)
   253  			if err := r.SinksWriter.SyncMetric(sourceName, "", sinks.DeleteOp); err != nil {
   254  				logger.Error(err)
   255  			}
   256  		}
   257  	}
   258  
   259  	// Destroy conn pools and metric writers
   260  	r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDown)
   261  }
   262  
   263  // LoadSources loads sources from the reader
   264  func (r *Reaper) LoadSources(ctx context.Context) (err error) {
   265  	if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
   266  		r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
   267  		r.monitoredSources = make(sources.SourceConns, 0)
   268  		return nil
   269  	}
   270  
   271  	var newSrcs sources.SourceConns
   272  	srcs, err := r.SourcesReaderWriter.GetSources()
   273  	if err != nil {
   274  		return err
   275  	}
   276  	srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
   277  		// filter out disabled sources and sources with group not in the list of groups to monitor
   278  		return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group)
   279  	})
   280  
   281  	if newSrcs, err = srcs.ResolveDatabases(r.WriteInstanceDown); err != nil {
   282  		// discover dtabases for continuous monitoring sources
   283  		r.logger.WithError(err).Error("could not resolve databases from sources")
   284  	}
   285  
   286  	for i, newMD := range newSrcs {
   287  		md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
   288  		if md == nil {
   289  			continue
   290  		}
   291  		if md.Equal(newMD.Source) {
   292  			// replace with the existing connection if the source is the same
   293  			newSrcs[i] = md
   294  			continue
   295  		}
   296  		// Source configs changed, stop all running gatherers to trigger a restart
   297  		// TODO: Optimize this for single metric addition/deletion/interval-change cases to not do a full restart
   298  		r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...")
   299  		r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true})
   300  	}
   301  	r.monitoredSources = newSrcs
   302  	r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
   303  	return nil
   304  }
   305  
   306  // WriteInstanceDown writes instance_up = 0 metric to sinks for the given source
   307  func (r *Reaper) WriteInstanceDown(name string) {
   308  	r.measurementCh <- metrics.MeasurementEnvelope{
   309  		DBName:     name,
   310  		MetricName: specialMetricInstanceUp,
   311  		Data: metrics.Measurements{metrics.Measurement{
   312  			metrics.EpochColumnName: time.Now().UnixNano(),
   313  			specialMetricInstanceUp: 0},
   314  		},
   315  	}
   316  }
   317  
   318  // GetMeasurementCache returns the instance-level metric cache
   319  func (r *Reaper) GetMeasurementCache(key string) metrics.Measurements {
   320  	return r.measurementCache.Get(key, r.Metrics.CacheAge())
   321  }
   322  
   323  // WriteMeasurements() writes the metrics to the sinks
   324  func (r *Reaper) WriteMeasurements(ctx context.Context) {
   325  	var err error
   326  	for {
   327  		select {
   328  		case <-ctx.Done():
   329  			return
   330  		case msg := <-r.measurementCh:
   331  			if err = r.SinksWriter.Write(msg); err != nil {
   332  				r.logger.Error(err)
   333  			}
   334  		}
   335  	}
   336  }
   337  
   338  func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, md *sources.SourceConn) {
   339  	for _, dr := range data {
   340  		if r.Sinks.RealDbnameField > "" && md.RealDbname > "" {
   341  			dr[r.Sinks.RealDbnameField] = md.RealDbname
   342  		}
   343  		if r.Sinks.SystemIdentifierField > "" && md.SystemIdentifier > "" {
   344  			dr[r.Sinks.SystemIdentifierField] = md.SystemIdentifier
   345  		}
   346  	}
   347  }
   348