...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/database_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"testing"
     6  	"time"
     7  
     8  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
     9  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    10  	pgxmock "github.com/pashagolub/pgxmock/v4"
    11  	"github.com/stretchr/testify/assert"
    12  	"github.com/stretchr/testify/require"
    13  )
    14  
    15  // Helper function to create a test SourceConn with pgxmock
    16  func createTestSourceConn(t *testing.T) (*sources.SourceConn, pgxmock.PgxPoolIface) {
    17  	mock, err := pgxmock.NewPool()
    18  	require.NoError(t, err)
    19  
    20  	md := &sources.SourceConn{
    21  		Conn:   mock,
    22  		Source: sources.Source{Name: "testdb"},
    23  		RuntimeInfo: sources.RuntimeInfo{
    24  			Version:     120000,
    25  			ChangeState: make(map[string]map[string]string),
    26  		},
    27  	}
    28  	return md, mock
    29  }
    30  
    31  func TestTryCreateMetricsFetchingHelpers(t *testing.T) {
    32  	ctx := context.Background()
    33  	mock, err := pgxmock.NewPool()
    34  	assert.NoError(t, err)
    35  	defer mock.Close()
    36  
    37  	metricDefs.MetricDefs["metric1"] = metrics.Metric{
    38  		InitSQL: "CREATE FUNCTION metric1",
    39  	}
    40  
    41  	md := &sources.SourceConn{
    42  		Conn: mock,
    43  		Source: sources.Source{
    44  			Name:           "testdb",
    45  			Metrics:        map[string]float64{"metric1": 42, "nonexistent": 0},
    46  			MetricsStandby: map[string]float64{"metric1": 42},
    47  		},
    48  	}
    49  
    50  	t.Run("success", func(t *testing.T) {
    51  		mock.ExpectExec("CREATE FUNCTION metric1").WillReturnResult(pgxmock.NewResult("CREATE", 1))
    52  
    53  		err = TryCreateMetricsFetchingHelpers(ctx, md)
    54  		assert.NoError(t, err)
    55  		assert.NoError(t, mock.ExpectationsWereMet())
    56  	})
    57  
    58  	t.Run("error on exec", func(t *testing.T) {
    59  		mock.ExpectExec("CREATE FUNCTION metric1").WillReturnError(assert.AnError)
    60  
    61  		err = TryCreateMetricsFetchingHelpers(ctx, md)
    62  		assert.Error(t, err)
    63  		assert.NoError(t, mock.ExpectationsWereMet())
    64  	})
    65  
    66  }
    67  
    68  func TestDetectSprocChanges(t *testing.T) {
    69  	ctx := context.Background()
    70  
    71  	// Set up simple test metric (instead of complex real one)
    72  	metricDefs.MetricDefs["sproc_hashes"] = metrics.Metric{
    73  		SQLs: map[int]string{
    74  			120000: "SELECT",
    75  		},
    76  	}
    77  
    78  	// Create single connection and reaper to maintain state across calls
    79  	md, mock := createTestSourceConn(t)
    80  	defer mock.Close()
    81  
    82  	reaper := &Reaper{
    83  		measurementCh: make(chan metrics.MeasurementEnvelope, 10),
    84  	}
    85  
    86  	// First run - establish baseline
    87  	initialRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
    88  		AddRow("func1", "123", "hash1", time.Now().UnixNano()).
    89  		AddRow("func2", "456", "hash2", time.Now().UnixNano())
    90  	mock.ExpectQuery("SELECT").WillReturnRows(initialRows)
    91  
    92  	result := reaper.DetectSprocChanges(ctx, md)
    93  	assert.Equal(t, 0, result.Created) // First run should not count anything as created
    94  	assert.Equal(t, 0, result.Altered)
    95  	assert.Equal(t, 0, result.Dropped)
    96  
    97  	// State should now be populated
    98  	assert.NotEmpty(t, md.ChangeState["sproc_hashes"])
    99  
   100  	// Second run - detect altered sproc (different hash for func1)
   101  	modifiedRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
   102  		AddRow("func1", "123", "new_hash", time.Now().UnixNano()).
   103  		AddRow("func2", "456", "hash2", time.Now().UnixNano())
   104  	mock.ExpectQuery("SELECT").WillReturnRows(modifiedRows)
   105  
   106  	result = reaper.DetectSprocChanges(ctx, md)
   107  	assert.Equal(t, 0, result.Created)
   108  	assert.Equal(t, 1, result.Altered) // func1 was altered
   109  	assert.Equal(t, 0, result.Dropped)
   110  
   111  	// Third run - detect new sproc (func3 added)
   112  	newSprocRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
   113  		AddRow("func1", "123", "new_hash", time.Now().UnixNano()).
   114  		AddRow("func2", "456", "hash2", time.Now().UnixNano()).
   115  		AddRow("func3", "789", "hash3", time.Now().UnixNano()) // new sproc
   116  	mock.ExpectQuery("SELECT").WillReturnRows(newSprocRows)
   117  
   118  	result = reaper.DetectSprocChanges(ctx, md)
   119  	assert.Equal(t, 1, result.Created) // func3 was created
   120  	assert.Equal(t, 0, result.Altered)
   121  	assert.Equal(t, 0, result.Dropped)
   122  
   123  	// Verify measurement is sent
   124  	select {
   125  	case <-reaper.measurementCh:
   126  		// Good, measurement was sent
   127  	default:
   128  		t.Error("Expected measurement to be sent")
   129  	}
   130  
   131  	// Fourth run - detect dropped sproc (func2 removed)
   132  	droppedSprocRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
   133  		AddRow("func1", "123", "new_hash", time.Now().UnixNano()).
   134  		AddRow("func3", "789", "hash3", time.Now().UnixNano()) // func2 dropped
   135  	mock.ExpectQuery("SELECT").WillReturnRows(droppedSprocRows)
   136  
   137  	result = reaper.DetectSprocChanges(ctx, md)
   138  	assert.Equal(t, 0, result.Created)
   139  	assert.Equal(t, 0, result.Altered)
   140  	assert.Equal(t, 1, result.Dropped) // func2 was dropped
   141  
   142  	assert.NoError(t, mock.ExpectationsWereMet())
   143  }
   144  
   145  func TestDetectTableChanges(t *testing.T) {
   146  	ctx := context.Background()
   147  
   148  	// Set up simple test metric
   149  	metricDefs.MetricDefs["table_hashes"] = metrics.Metric{
   150  		SQLs: map[int]string{
   151  			120000: "SELECT",
   152  		},
   153  	}
   154  
   155  	// Create single connection and reaper to maintain state across calls
   156  	md, mock := createTestSourceConn(t)
   157  	defer mock.Close()
   158  
   159  	reaper := &Reaper{
   160  		measurementCh: make(chan metrics.MeasurementEnvelope, 10),
   161  	}
   162  
   163  	// First run - establish baseline
   164  	initialRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
   165  		AddRow("table1", "123", "hash1", time.Now().UnixNano()).
   166  		AddRow("table2", "456", "hash2", time.Now().UnixNano())
   167  	mock.ExpectQuery("SELECT").WillReturnRows(initialRows)
   168  
   169  	result := reaper.DetectTableChanges(ctx, md)
   170  	assert.Equal(t, 0, result.Created) // First run should not count anything as created
   171  	assert.Equal(t, 0, result.Altered)
   172  	assert.Equal(t, 0, result.Dropped)
   173  	assert.NotEmpty(t, md.ChangeState["table_hashes"])
   174  
   175  	// Second run - detect altered table (different hash for table1)
   176  	modifiedRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
   177  		AddRow("table1", "123", "new_hash", time.Now().UnixNano()).
   178  		AddRow("table2", "456", "hash2", time.Now().UnixNano())
   179  	mock.ExpectQuery("SELECT").WillReturnRows(modifiedRows)
   180  
   181  	result = reaper.DetectTableChanges(ctx, md)
   182  	assert.Equal(t, 0, result.Created)
   183  	assert.Equal(t, 1, result.Altered) // table1 was altered
   184  	assert.Equal(t, 0, result.Dropped)
   185  
   186  	// Third run - detect new table (table3 added)
   187  	newTableRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
   188  		AddRow("table1", "123", "new_hash", time.Now().UnixNano()).
   189  		AddRow("table2", "456", "hash2", time.Now().UnixNano()).
   190  		AddRow("table3", "789", "hash3", time.Now().UnixNano()) // new table
   191  	mock.ExpectQuery("SELECT").WillReturnRows(newTableRows)
   192  
   193  	result = reaper.DetectTableChanges(ctx, md)
   194  	assert.Equal(t, 1, result.Created) // table3 was created
   195  	assert.Equal(t, 0, result.Altered)
   196  	assert.Equal(t, 0, result.Dropped)
   197  
   198  	// Verify measurement is sent
   199  	select {
   200  	case msg := <-reaper.measurementCh:
   201  		assert.Equal(t, "table_changes", msg.MetricName)
   202  		assert.Equal(t, "testdb", msg.DBName)
   203  	default:
   204  		t.Error("Expected measurement to be sent")
   205  	}
   206  
   207  	// Fourth run - detect dropped table (table2 removed)
   208  	droppedTableRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
   209  		AddRow("table1", "123", "new_hash", time.Now().UnixNano()).
   210  		AddRow("table3", "789", "hash3", time.Now().UnixNano()) // table2 dropped
   211  	mock.ExpectQuery("SELECT").WillReturnRows(droppedTableRows)
   212  
   213  	result = reaper.DetectTableChanges(ctx, md)
   214  	assert.Equal(t, 0, result.Created)
   215  	assert.Equal(t, 0, result.Altered)
   216  	assert.Equal(t, 1, result.Dropped) // table2 was dropped
   217  
   218  	// Check that table2 was removed from state
   219  	_, exists := md.ChangeState["table_hashes"]["table2"]
   220  	assert.False(t, exists)
   221  
   222  	assert.NoError(t, mock.ExpectationsWereMet())
   223  }
   224  
   225  func TestDetectIndexChanges(t *testing.T) {
   226  	ctx := context.Background()
   227  
   228  	// Set up simple test metric
   229  	metricDefs.MetricDefs["index_hashes"] = metrics.Metric{
   230  		SQLs: map[int]string{
   231  			120000: "SELECT",
   232  		},
   233  	}
   234  
   235  	// Create single connection and reaper to maintain state across calls
   236  	md, mock := createTestSourceConn(t)
   237  	defer mock.Close()
   238  
   239  	reaper := &Reaper{
   240  		measurementCh: make(chan metrics.MeasurementEnvelope, 10),
   241  	}
   242  
   243  	// First run - establish baseline
   244  	initialRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
   245  		AddRow("idx1", "table1", "hash1", "t", time.Now().UnixNano()).
   246  		AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano())
   247  	mock.ExpectQuery("SELECT").WillReturnRows(initialRows)
   248  
   249  	result := reaper.DetectIndexChanges(ctx, md)
   250  	assert.Equal(t, 0, result.Created) // First run should not count anything as created
   251  	assert.Equal(t, 0, result.Altered)
   252  	assert.Equal(t, 0, result.Dropped)
   253  	assert.NotEmpty(t, md.ChangeState["index_hashes"])
   254  
   255  	// Second run - detect altered index (is_valid changed for idx1)
   256  	modifiedRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
   257  		AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()). // now invalid
   258  		AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano())
   259  	mock.ExpectQuery("SELECT").WillReturnRows(modifiedRows)
   260  
   261  	result = reaper.DetectIndexChanges(ctx, md)
   262  	assert.Equal(t, 0, result.Created)
   263  	assert.Equal(t, 1, result.Altered) // idx1 was altered
   264  	assert.Equal(t, 0, result.Dropped)
   265  
   266  	// Third run - detect new index (idx3 added)
   267  	newIndexRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
   268  		AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()).
   269  		AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano()).
   270  		AddRow("idx3", "table2", "hash3", "t", time.Now().UnixNano()) // new index
   271  	mock.ExpectQuery("SELECT").WillReturnRows(newIndexRows)
   272  
   273  	result = reaper.DetectIndexChanges(ctx, md)
   274  	assert.Equal(t, 1, result.Created) // idx3 was created
   275  	assert.Equal(t, 0, result.Altered)
   276  	assert.Equal(t, 0, result.Dropped)
   277  
   278  	// Verify measurement is sent
   279  	select {
   280  	case msg := <-reaper.measurementCh:
   281  		assert.Equal(t, "index_changes", msg.MetricName)
   282  		assert.Equal(t, "testdb", msg.DBName)
   283  	default:
   284  		t.Error("Expected measurement to be sent")
   285  	}
   286  
   287  	// Fourth run - detect dropped index (idx2 removed)
   288  	droppedIndexRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
   289  		AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()).
   290  		AddRow("idx3", "table2", "hash3", "t", time.Now().UnixNano()) // idx2 dropped
   291  	mock.ExpectQuery("SELECT").WillReturnRows(droppedIndexRows)
   292  
   293  	result = reaper.DetectIndexChanges(ctx, md)
   294  	assert.Equal(t, 0, result.Created)
   295  	assert.Equal(t, 0, result.Altered)
   296  	assert.Equal(t, 1, result.Dropped) // idx2 was dropped
   297  
   298  	// Check that idx2 was removed from state
   299  	_, exists := md.ChangeState["index_hashes"]["idx2"]
   300  	assert.False(t, exists)
   301  
   302  	assert.NoError(t, mock.ExpectationsWereMet())
   303  }
   304  
   305  func TestDetectPrivilegeChanges(t *testing.T) {
   306  	ctx := context.Background()
   307  
   308  	// Set up simple test metric
   309  	metricDefs.MetricDefs["privilege_changes"] = metrics.Metric{
   310  		SQLs: map[int]string{
   311  			120000: "SELECT",
   312  		},
   313  	}
   314  
   315  	// Create single connection and reaper to maintain state across calls
   316  	md, mock := createTestSourceConn(t)
   317  	defer mock.Close()
   318  
   319  	reaper := &Reaper{
   320  		measurementCh: make(chan metrics.MeasurementEnvelope, 10),
   321  	}
   322  
   323  	// First run - establish baseline
   324  	initialRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}).
   325  		AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()).
   326  		AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano())
   327  	mock.ExpectQuery("SELECT").WillReturnRows(initialRows)
   328  
   329  	result := reaper.DetectPrivilegeChanges(ctx, md)
   330  	assert.Equal(t, 0, result.Created) // First run should not count anything as created
   331  	assert.Equal(t, 0, result.Altered)
   332  	assert.Equal(t, 0, result.Dropped)
   333  	assert.NotEmpty(t, md.ChangeState["object_privileges"])
   334  
   335  	// Second run - detect new privilege grant (user1 gets INSERT privilege on table1)
   336  	newPrivilegeRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}).
   337  		AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()).
   338  		AddRow("table", "user1", "table1", "INSERT", time.Now().UnixNano()). // new privilege
   339  		AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano())
   340  	mock.ExpectQuery("SELECT").WillReturnRows(newPrivilegeRows)
   341  
   342  	result = reaper.DetectPrivilegeChanges(ctx, md)
   343  	assert.Equal(t, 1, result.Created) // new privilege was granted
   344  	assert.Equal(t, 0, result.Altered)
   345  	assert.Equal(t, 0, result.Dropped)
   346  
   347  	// Verify measurement is sent
   348  	select {
   349  	case msg := <-reaper.measurementCh:
   350  		assert.Equal(t, "privilege_changes", msg.MetricName)
   351  		assert.Equal(t, "testdb", msg.DBName)
   352  	default:
   353  		t.Error("Expected measurement to be sent")
   354  	}
   355  
   356  	// Third run - detect privilege revoke (user1 loses INSERT privilege on table1)
   357  	revokedPrivilegeRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}).
   358  		AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()).
   359  		AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano()) // user1 INSERT privilege revoked
   360  	mock.ExpectQuery("SELECT").WillReturnRows(revokedPrivilegeRows)
   361  
   362  	result = reaper.DetectPrivilegeChanges(ctx, md)
   363  	assert.Equal(t, 0, result.Created)
   364  	assert.Equal(t, 0, result.Altered)
   365  	assert.Equal(t, 1, result.Dropped) // privilege was revoked
   366  
   367  	// Check that revoked privilege was removed from state
   368  	_, exists := md.ChangeState["object_privileges"]["table#:#user1#:#table1#:#INSERT"]
   369  	assert.False(t, exists)
   370  
   371  	assert.NoError(t, mock.ExpectationsWereMet())
   372  }
   373  
   374  func TestDetectConfigurationChanges(t *testing.T) {
   375  	ctx := context.Background()
   376  
   377  	// Set up simple test metric
   378  	metricDefs.MetricDefs["configuration_hashes"] = metrics.Metric{
   379  		SQLs: map[int]string{
   380  			120000: "SELECT",
   381  		},
   382  	}
   383  
   384  	// Create direct mock connection without transaction expectations
   385  	mock, err := pgxmock.NewPool()
   386  	require.NoError(t, err)
   387  	defer mock.Close()
   388  
   389  	md := &sources.SourceConn{
   390  		Conn:   mock,
   391  		Source: sources.Source{Name: "testdb"},
   392  		RuntimeInfo: sources.RuntimeInfo{
   393  			Version:     120000,
   394  			ChangeState: make(map[string]map[string]string),
   395  		},
   396  	}
   397  
   398  	reaper := &Reaper{
   399  		measurementCh: make(chan metrics.MeasurementEnvelope, 10),
   400  	}
   401  
   402  	// First run - establish baseline configuration
   403  	initialRows := pgxmock.NewRows([]string{"epoch_ns", "setting", "value"}).
   404  		AddRow(time.Now().UnixNano(), "max_connections", "100").
   405  		AddRow(time.Now().UnixNano(), "shared_buffers", "128MB")
   406  	mock.ExpectQuery("SELECT").WillReturnRows(initialRows)
   407  
   408  	result := reaper.DetectConfigurationChanges(ctx, md)
   409  	assert.Equal(t, 0, result.Created) // First run should not count anything as created
   410  	assert.Equal(t, 0, result.Altered)
   411  	assert.Equal(t, 0, result.Dropped)
   412  	assert.NotEmpty(t, md.ChangeState["configuration_hashes"])
   413  
   414  	// Second run - detect new configuration setting
   415  	newSettingRows := pgxmock.NewRows([]string{"epoch_ns", "setting", "value"}).
   416  		AddRow(time.Now().UnixNano(), "max_connections", "100").
   417  		AddRow(time.Now().UnixNano(), "shared_buffers", "128MB").
   418  		AddRow(time.Now().UnixNano(), "work_mem", "4MB") // new setting
   419  	mock.ExpectQuery("SELECT").WillReturnRows(newSettingRows)
   420  
   421  	result = reaper.DetectConfigurationChanges(ctx, md)
   422  	assert.Equal(t, 1, result.Created) // new setting was added
   423  	assert.Equal(t, 0, result.Altered)
   424  	assert.Equal(t, 0, result.Dropped)
   425  
   426  	// Verify measurement is sent
   427  	select {
   428  	case msg := <-reaper.measurementCh:
   429  		assert.Equal(t, "configuration_changes", msg.MetricName)
   430  		assert.Equal(t, "testdb", msg.DBName)
   431  	default:
   432  		t.Error("Expected measurement to be sent")
   433  	}
   434  
   435  	// Third run - detect configuration change
   436  	changedValueRows := pgxmock.NewRows([]string{"epoch_ns", "setting", "value"}).
   437  		AddRow(time.Now().UnixNano(), "max_connections", "200").  // changed value
   438  		AddRow(time.Now().UnixNano(), "shared_buffers", "256MB"). // changed value
   439  		AddRow(time.Now().UnixNano(), "work_mem", "4MB")
   440  	mock.ExpectQuery("SELECT").WillReturnRows(changedValueRows)
   441  
   442  	result = reaper.DetectConfigurationChanges(ctx, md)
   443  	assert.Equal(t, 0, result.Created)
   444  	assert.Equal(t, 2, result.Altered) // two settings were changed
   445  	assert.Equal(t, 0, result.Dropped)
   446  
   447  	// Check that state was updated
   448  	assert.Equal(t, "200", md.ChangeState["configuration_hashes"]["max_connections"])
   449  	assert.Equal(t, "256MB", md.ChangeState["configuration_hashes"]["shared_buffers"])
   450  
   451  	assert.NoError(t, mock.ExpectationsWereMet())
   452  }
   453  
   454  func TestGetInstanceUpMeasurement(t *testing.T) {
   455  	ctx := context.Background()
   456  	reaper := &Reaper{}
   457  
   458  	testCases := []struct {
   459  		name            string
   460  		pingError       error
   461  		expectedUpValue int
   462  	}{
   463  		{
   464  			name:            "connection is up",
   465  			pingError:       nil,
   466  			expectedUpValue: 1,
   467  		},
   468  		{
   469  			name:            "connection is down",
   470  			pingError:       assert.AnError,
   471  			expectedUpValue: 0,
   472  		},
   473  		{
   474  			name:            "connection timeout",
   475  			pingError:       context.DeadlineExceeded,
   476  			expectedUpValue: 0,
   477  		},
   478  	}
   479  
   480  	for _, tc := range testCases {
   481  		t.Run(tc.name, func(t *testing.T) {
   482  			md, mock := createTestSourceConn(t)
   483  			defer mock.Close()
   484  
   485  			// Setup ping expectation
   486  			if tc.pingError == nil {
   487  				mock.ExpectPing()
   488  			} else {
   489  				mock.ExpectPing().WillReturnError(tc.pingError)
   490  			}
   491  
   492  			measurements, err := reaper.GetInstanceUpMeasurement(ctx, md)
   493  
   494  			// Should never return an error
   495  			assert.NoError(t, err)
   496  			require.NotNil(t, measurements)
   497  			require.Len(t, measurements, 1)
   498  
   499  			// Check instance_up metric value
   500  			measurement := measurements[0]
   501  			assert.Contains(t, measurement, "instance_up")
   502  			assert.Equal(t, tc.expectedUpValue, measurement["instance_up"])
   503  
   504  			// Verify epoch is set and valid
   505  			assert.Contains(t, measurement, metrics.EpochColumnName)
   506  			assert.Greater(t, measurement[metrics.EpochColumnName].(int64), int64(0))
   507  			assert.LessOrEqual(t, measurement[metrics.EpochColumnName].(int64), time.Now().UnixNano())
   508  
   509  			assert.NoError(t, mock.ExpectationsWereMet())
   510  		})
   511  	}
   512  }
   513