...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/sinks/prometheus_race_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/sinks

     1  package sinks
     2  
     3  import (
     4  	"sync"
     5  	"testing"
     6  	"time"
     7  
     8  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
     9  	"github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
    10  	"github.com/prometheus/client_golang/prometheus"
    11  )
    12  
    13  func TestCollect_RaceCondition_Real(_ *testing.T) {
    14  	// 1. Initialize the real PrometheusWriter
    15  	promw, _ := NewPrometheusWriter(testutil.TestContext, "127.0.0.1:0/pgwatch")
    16  
    17  	// 2. Register a metric so Write() actually puts data into the map
    18  	_ = promw.SyncMetric("race_db", "test_metric", AddOp)
    19  
    20  	var wg sync.WaitGroup
    21  	done := make(chan struct{})
    22  
    23  	// --- The Writer (Simulating Database Updates) ---
    24  	wg.Go(func() {
    25  		for {
    26  			select {
    27  			case <-done:
    28  				return
    29  			default:
    30  				// Call the REAL Write method
    31  				_ = promw.Write(metrics.MeasurementEnvelope{
    32  					DBName:     "race_db",
    33  					MetricName: "test_metric",
    34  					Data: metrics.Measurements{
    35  						{
    36  							metrics.EpochColumnName: time.Now().UnixNano(),
    37  							"value":                 int64(100),
    38  						},
    39  					},
    40  				})
    41  				// No sleep here -> hammer the map as fast as possible
    42  			}
    43  		}
    44  	})
    45  
    46  	// --- The Collector (Simulating Prometheus Scrapes) ---
    47  	wg.Go(func() {
    48  		// Prometheus provides a channel to receive metrics
    49  		ch := make(chan prometheus.Metric, 10000)
    50  
    51  		// Scrape 50 times (more than enough to trigger a race in a tight loop)
    52  		for range 50 {
    53  			// Call the REAL Collect method
    54  			promw.Collect(ch)
    55  
    56  			// Drain the channel so it doesn't block
    57  		drainLoop:
    58  			for {
    59  				select {
    60  				case <-ch:
    61  				default:
    62  					break drainLoop
    63  				}
    64  			}
    65  		}
    66  		close(done) // Tell the writer to stop
    67  	})
    68  
    69  	wg.Wait()
    70  }
    71