...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/reaper/cache.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"fmt"
     5  	"sync"
     6  	"time"
     7  
     8  	"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
     9  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    10  )
    11  
    12  var monitoredDbCache map[string]*sources.SourceConn
    13  var monitoredDbCacheLock sync.RWMutex
    14  var MonitoredDatabasesSettings = make(map[string]MonitoredDatabaseSettings)
    15  var MonitoredDatabasesSettingsLock = sync.RWMutex{}
    16  var MonitoredDatabasesSettingsGetLock = make(map[string]*sync.RWMutex) // synchronize initial PG version detection to 1 instance for each defined host
    17  
    18  var lastDBSizeMB = make(map[string]int64)
    19  var lastDBSizeFetchTime = make(map[string]time.Time) // cached for DB_SIZE_CACHING_INTERVAL
    20  var lastDBSizeCheckLock sync.RWMutex
    21  
    22  var prevLoopMonitoredDBs sources.SourceConns // to be able to detect DBs removed from config
    23  
    24  var lastSQLFetchError sync.Map
    25  
    26  func InitPGVersionInfoFetchingLockIfNil(md *sources.SourceConn) {
    27  	MonitoredDatabasesSettingsLock.Lock()
    28  	if _, ok := MonitoredDatabasesSettingsGetLock[md.Name]; !ok {
    29  		MonitoredDatabasesSettingsGetLock[md.Name] = &sync.RWMutex{}
    30  	}
    31  	MonitoredDatabasesSettingsLock.Unlock()
    32  }
    33  
    34  func UpdateMonitoredDBCache(data sources.SourceConns) {
    35  	monitoredDbCacheNew := make(map[string]*sources.SourceConn)
    36  	for _, row := range data {
    37  		monitoredDbCacheNew[row.Name] = row
    38  	}
    39  	monitoredDbCacheLock.Lock()
    40  	monitoredDbCache = monitoredDbCacheNew
    41  	monitoredDbCacheLock.Unlock()
    42  }
    43  
    44  func GetMonitoredDatabaseByUniqueName(name string) (*sources.SourceConn, error) {
    45  	monitoredDbCacheLock.RLock()
    46  	defer monitoredDbCacheLock.RUnlock()
    47  	md, exists := monitoredDbCache[name]
    48  	if !exists || md == nil {
    49  		return nil, fmt.Errorf("database %s not found in cache", name)
    50  	}
    51  	return md, nil
    52  }
    53  
    54  type InstanceMetricCache struct {
    55  	cache map[string](metrics.Measurements) // [dbUnique+metric]lastly_fetched_data
    56  	sync.RWMutex
    57  }
    58  
    59  func NewInstanceMetricCache() *InstanceMetricCache {
    60  	return &InstanceMetricCache{
    61  		cache: make(map[string](metrics.Measurements)),
    62  	}
    63  }
    64  
    65  func (imc *InstanceMetricCache) Get(key string, age time.Duration) metrics.Measurements {
    66  	if key == "" {
    67  		return nil
    68  	}
    69  	imc.RLock()
    70  	defer imc.RUnlock()
    71  	instanceMetricEpochNs := (imc.cache[key]).GetEpoch()
    72  
    73  	if time.Now().UnixNano()-instanceMetricEpochNs > age.Nanoseconds() {
    74  		return nil
    75  	}
    76  	instanceMetricData, ok := imc.cache[key]
    77  	if !ok {
    78  		return nil
    79  	}
    80  	return instanceMetricData.DeepCopy()
    81  }
    82  
    83  func (imc *InstanceMetricCache) Put(key string, data metrics.Measurements) {
    84  	if len(data) == 0 || key == "" {
    85  		return
    86  	}
    87  	imc.Lock()
    88  	defer imc.Unlock()
    89  	m := data.DeepCopy()
    90  	if !m.IsEpochSet() {
    91  		m.Touch()
    92  	}
    93  	imc.cache[key] = m
    94  }
    95