...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/logparser_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"context"
     5  	"os"
     6  	"path/filepath"
     7  	"regexp"
     8  	"testing"
     9  	"time"
    10  
    11  	"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
    12  	"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
    13  	"github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
    14  	"github.com/pashagolub/pgxmock/v4"
    15  	"github.com/stretchr/testify/assert"
    16  	"github.com/stretchr/testify/require"
    17  )
    18  
    19  func TestNewLogParser(t *testing.T) {
    20  	tempDir := t.TempDir()
    21  
    22  	mock, err := pgxmock.NewPool()
    23  	require.NoError(t, err)
    24  	defer mock.Close()
    25  
    26  	sourceConn := &sources.SourceConn{
    27  		Source: sources.Source{
    28  			Name:    "test-source",
    29  			Metrics: map[string]float64{specialMetricServerLogEventCounts: 60.0},
    30  		},
    31  		Conn: mock,
    32  	}
    33  	storeCh := make(chan metrics.MeasurementEnvelope, 10)
    34  
    35  	t.Run("success", func(t *testing.T) {
    36  		mock.ExpectQuery(`select 
    37  			current_setting\('data_directory'\) as dd, 
    38  			current_setting\('log_directory'\) as ld,
    39  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
    40  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
    41  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
    42  				AddRow("", tempDir, "en", "off"))
    43  
    44  		lp, err := NewLogParser(testutil.TestContext, sourceConn, storeCh)
    45  		assert.NoError(t, err)
    46  		assert.NotNil(t, lp)
    47  		assert.Equal(t, tempDir, lp.LogFolder)
    48  		assert.Equal(t, "en", lp.ServerMessagesLang)
    49  		assert.Equal(t, "off", lp.LogTruncOnRotation)
    50  		assert.Equal(t, 60.0, lp.Interval)
    51  		assert.NotNil(t, lp.LogsMatchRegex)
    52  		assert.NoError(t, mock.ExpectationsWereMet())
    53  	})
    54  
    55  	t.Run("tryDetermineLogSettings error", func(t *testing.T) {
    56  		mock.ExpectQuery(`select 
    57  			current_setting\('data_directory'\) as dd, 
    58  			current_setting\('log_directory'\) as ld,
    59  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
    60  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
    61  			WillReturnError(assert.AnError)
    62  
    63  		lp, err := NewLogParser(testutil.TestContext, sourceConn, storeCh)
    64  		assert.Error(t, err)
    65  		assert.Nil(t, lp)
    66  		assert.NoError(t, mock.ExpectationsWereMet())
    67  	})
    68  
    69  	t.Run("unknown language defaults to en", func(t *testing.T) {
    70  		mock.ExpectQuery(`select 
    71  			current_setting\('data_directory'\) as dd, 
    72  			current_setting\('log_directory'\) as ld,
    73  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
    74  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
    75  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
    76  				AddRow("", tempDir, "zz", "off"))
    77  
    78  		lp, err := NewLogParser(testutil.TestContext, sourceConn, storeCh)
    79  		assert.NoError(t, err)
    80  		assert.NotNil(t, lp)
    81  		assert.Equal(t, "en", lp.ServerMessagesLang)
    82  		assert.NoError(t, mock.ExpectationsWereMet())
    83  	})
    84  
    85  	t.Run("relative log directory", func(t *testing.T) {
    86  		mock.ExpectQuery(`select 
    87  			current_setting\('data_directory'\) as dd, 
    88  			current_setting\('log_directory'\) as ld,
    89  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
    90  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
    91  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
    92  				AddRow("/data", "pg_log", "de", "on"))
    93  
    94  		lp, err := NewLogParser(testutil.TestContext, sourceConn, storeCh)
    95  		assert.NoError(t, err)
    96  		assert.NotNil(t, lp)
    97  		assert.Equal(t, "/data/pg_log", lp.LogFolder)
    98  		assert.Equal(t, "de", lp.ServerMessagesLang)
    99  		assert.Equal(t, "on", lp.LogTruncOnRotation)
   100  		assert.NoError(t, mock.ExpectationsWereMet())
   101  	})
   102  }
   103  
   104  func TestTryDetermineLogSettings(t *testing.T) {
   105  	t.Run("absolute log directory - known lang", func(t *testing.T) {
   106  		mock, err := pgxmock.NewPool()
   107  		require.NoError(t, err)
   108  		defer mock.Close()
   109  
   110  		mock.ExpectQuery(`select 
   111  			current_setting\('data_directory'\) as dd, 
   112  			current_setting\('log_directory'\) as ld,
   113  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
   114  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
   115  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
   116  				AddRow("/data", "/var/log/postgresql", "de", "off"))
   117  
   118  		logPath, lang, logTrunc, err := tryDetermineLogSettings(testutil.TestContext, mock)
   119  		assert.NoError(t, err)
   120  		assert.Equal(t, "/var/log/postgresql", logPath)
   121  		assert.Equal(t, "de", lang)
   122  		assert.Equal(t, "off", logTrunc)
   123  		assert.NoError(t, mock.ExpectationsWereMet())
   124  	})
   125  
   126  	t.Run("relative log directory - unknown lang", func(t *testing.T) {
   127  		mock, err := pgxmock.NewPool()
   128  		require.NoError(t, err)
   129  		defer mock.Close()
   130  
   131  		mock.ExpectQuery(`select 
   132  			current_setting\('data_directory'\) as dd, 
   133  			current_setting\('log_directory'\) as ld,
   134  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
   135  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
   136  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
   137  				AddRow("/data", "log", "xx", "off"))
   138  
   139  		logPath, lang, logTrunc, err := tryDetermineLogSettings(testutil.TestContext, mock)
   140  		assert.NoError(t, err)
   141  		assert.Equal(t, "/data/log", logPath)
   142  		assert.Equal(t, "en", lang)
   143  		assert.Equal(t, "off", logTrunc)
   144  		assert.NoError(t, mock.ExpectationsWereMet())
   145  	})
   146  
   147  	t.Run("query error", func(t *testing.T) {
   148  		mock, err := pgxmock.NewPool()
   149  		require.NoError(t, err)
   150  		defer mock.Close()
   151  
   152  		mock.ExpectQuery(`select 
   153  			current_setting\('data_directory'\) as dd, 
   154  			current_setting\('log_directory'\) as ld,
   155  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
   156  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
   157  			WillReturnError(assert.AnError)
   158  
   159  		logPath, lang, logTrunc, err := tryDetermineLogSettings(testutil.TestContext, mock)
   160  		assert.Error(t, err)
   161  		assert.Equal(t, "", logPath)
   162  		assert.Equal(t, "", lang)
   163  		assert.Equal(t, "", logTrunc)
   164  		assert.NoError(t, mock.ExpectationsWereMet())
   165  	})
   166  }
   167  
   168  func TestCheckHasPrivileges(t *testing.T) {
   169  	tempDir := t.TempDir()
   170  
   171  	names := [2]string{"pg_ls_logdir() fails", "pg_read_file() permission denied"}
   172  	for _, name := range names {
   173  		t.Run("checkHasRemotePrivileges fails - "+name, func(t *testing.T) {
   174  			mock, err := pgxmock.NewPool()
   175  			require.NoError(t, err)
   176  			defer mock.Close()
   177  
   178  			mock.ExpectQuery(`select 
   179  				current_setting\('data_directory'\) as dd, 
   180  				current_setting\('log_directory'\) as ld,
   181  				current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
   182  				current_setting\('log_truncate_on_rotation'\) as log_trunc`).
   183  				WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
   184  					AddRow("", tempDir, "en", "off"))
   185  
   186  			// Mock IsClientOnSameHost to return false (remote)
   187  			mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
   188  				pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false))
   189  
   190  			if name == "pg_ls_logdir() fails" {
   191  				// Mock pg_ls_logdir() to fail (permission denied)
   192  				mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
   193  					WillReturnError(assert.AnError)
   194  			} else {
   195  				// Mock pg_ls_logdir() to return a log file
   196  				mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
   197  					WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow("log.csv"))
   198  
   199  				// Mock pg_read_file() to fail with permission denied error
   200  				mock.ExpectQuery(`select pg_read_file\(\$1, 0, 0\)`).
   201  					WithArgs(filepath.Join(tempDir, "log.csv")).
   202  					WillReturnError(assert.AnError)
   203  			}
   204  
   205  			sourceConn := &sources.SourceConn{
   206  				Source: sources.Source{
   207  					Name: "test-source",
   208  				},
   209  				Conn: mock,
   210  			}
   211  
   212  			storeCh := make(chan metrics.MeasurementEnvelope, 10)
   213  
   214  			lp, err := NewLogParser(testutil.TestContext, sourceConn, storeCh)
   215  			require.NoError(t, err)
   216  			// Parse logs should stop the worker and return due to privilege errors.
   217  			err = lp.ParseLogs()
   218  			assert.Error(t, err)
   219  
   220  			// Ensure mock expectations were met
   221  			assert.NoError(t, mock.ExpectationsWereMet())
   222  
   223  			// No data should be received since checkHasPrivileges should fail
   224  			select {
   225  			case measurement := <-storeCh:
   226  				t.Errorf("Expected no data, but got: %+v", measurement)
   227  			case <-time.After(time.Second):
   228  				// Expected: no data received
   229  			}
   230  		})
   231  	}
   232  }
   233  
   234  func TestEventCountsToMetricStoreMessages(t *testing.T) {
   235  	mdb := &sources.SourceConn{
   236  		Source: sources.Source{
   237  			Name:       "test-db",
   238  			Kind:       sources.SourcePostgres,
   239  			CustomTags: map[string]string{"env": "test"},
   240  		},
   241  	}
   242  	lp := &LogParser{
   243  		SourceConn: mdb,
   244  		eventCounts: map[string]int64{
   245  			"ERROR":   5,
   246  			"WARNING": 10,
   247  		},
   248  		eventCountsTotal: map[string]int64{
   249  			"ERROR":   15,
   250  			"WARNING": 25,
   251  			"INFO":    50,
   252  		},
   253  	}
   254  	result := lp.GetMeasurementEnvelope()
   255  
   256  	assert.Equal(t, "test-db", result.DBName)
   257  	assert.Equal(t, specialMetricServerLogEventCounts, result.MetricName)
   258  	assert.Equal(t, map[string]string{"env": "test"}, result.CustomTags)
   259  
   260  	// Check that all severities are present in the measurement
   261  	assert.Len(t, result.Data, 1)
   262  	measurement := result.Data[0]
   263  
   264  	// Check individual severities
   265  	assert.Equal(t, int64(5), measurement["error"])
   266  	assert.Equal(t, int64(10), measurement["warning"])
   267  	assert.Equal(t, int64(0), measurement["info"])  // Not in eventCounts
   268  	assert.Equal(t, int64(0), measurement["debug"]) // Not in either map
   269  
   270  	// Check total counts
   271  	assert.Equal(t, int64(15), measurement["error_total"])
   272  	assert.Equal(t, int64(25), measurement["warning_total"])
   273  	assert.Equal(t, int64(50), measurement["info_total"])
   274  	assert.Equal(t, int64(0), measurement["debug_total"])
   275  }
   276  
   277  func TestSeverityToEnglish(t *testing.T) {
   278  	tests := []struct {
   279  		serverLang    string
   280  		errorSeverity string
   281  		expected      string
   282  	}{
   283  		{"en", "ERROR", "ERROR"},
   284  		{"de", "FEHLER", "ERROR"},
   285  		{"fr", "ERREUR", "ERROR"},
   286  		{"de", "WARNUNG", "WARNING"},
   287  		{"ru", "ОШИБКА", "ERROR"},
   288  		{"zh", "错误", "ERROR"},
   289  		{"unknown", "ERROR", "ERROR"},                  // Unknown language, return as-is
   290  		{"de", "UNKNOWN_SEVERITY", "UNKNOWN_SEVERITY"}, // Unknown severity in known language
   291  	}
   292  
   293  	for _, tt := range tests {
   294  		t.Run(tt.serverLang+"_"+tt.errorSeverity, func(t *testing.T) {
   295  			result := severityToEnglish(tt.serverLang, tt.errorSeverity)
   296  			assert.Equal(t, tt.expected, result)
   297  		})
   298  	}
   299  }
   300  
   301  func TestZeroEventCounts(t *testing.T) {
   302  	eventCounts := map[string]int64{
   303  		"ERROR":   5,
   304  		"WARNING": 10,
   305  		"INFO":    15,
   306  	}
   307  
   308  	zeroEventCounts(eventCounts)
   309  
   310  	// Check that all pgSeverities are zeroed
   311  	for _, severity := range pgSeverities {
   312  		assert.Equal(t, int64(0), eventCounts[severity])
   313  	}
   314  }
   315  
   316  func TestRegexMatchesToMap(t *testing.T) {
   317  	t.Run("successful match", func(t *testing.T) {
   318  		lp := &LogParser{
   319  			LogsMatchRegex: regexp.MustCompile(`(?P<severity>\w+): (?P<message>.+)`),
   320  		}
   321  		matches := []string{"ERROR: Something went wrong", "ERROR", "Something went wrong"}
   322  
   323  		result := lp.regexMatchesToMap(matches)
   324  		expected := map[string]string{
   325  			"severity": "ERROR",
   326  			"message":  "Something went wrong",
   327  		}
   328  
   329  		assert.Equal(t, expected, result)
   330  	})
   331  
   332  	t.Run("no matches", func(t *testing.T) {
   333  		lp := &LogParser{
   334  			LogsMatchRegex: regexp.MustCompile(`(?P<severity>\w+): (?P<message>.+)`),
   335  		}
   336  		matches := []string{}
   337  
   338  		result := lp.regexMatchesToMap(matches)
   339  		assert.Empty(t, result)
   340  	})
   341  
   342  	t.Run("nil regex", func(t *testing.T) {
   343  		lp := &LogParser{}
   344  		matches := []string{"test"}
   345  
   346  		result := lp.regexMatchesToMap(matches)
   347  		assert.Empty(t, result)
   348  	})
   349  }
   350  
   351  func TestCSVLogRegex(t *testing.T) {
   352  	// Test the default CSV log regex with sample log lines
   353  	lp := &LogParser{
   354  		LogsMatchRegex: regexp.MustCompile(csvLogDefaultRegEx),
   355  	}
   356  
   357  	testLines := []struct {
   358  		line     string
   359  		expected map[string]string
   360  	}{
   361  		{
   362  			line: `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,`,
   363  			expected: map[string]string{
   364  				"log_time":         "2023-12-01 10:30:45.123 UTC",
   365  				"user_name":        "postgres",
   366  				"database_name":    "testdb",
   367  				"process_id":       "12345",
   368  				"connection_from":  "127.0.0.1:54321",
   369  				"session_id":       "session123",
   370  				"session_line_num": "1",
   371  				"command_tag":      "SELECT",
   372  				"error_severity":   "ERROR",
   373  			},
   374  		},
   375  		{
   376  			line: `2023-12-01 10:30:45.123 UTC,postgres,testdb,12345,127.0.0.1:54321,session123,1,SELECT,2023-12-01 10:30:00 UTC,1/234,567,WARNING,`,
   377  			expected: map[string]string{
   378  				"log_time":         "2023-12-01 10:30:45.123 UTC",
   379  				"user_name":        "postgres",
   380  				"database_name":    "testdb",
   381  				"process_id":       "12345",
   382  				"connection_from":  "127.0.0.1:54321",
   383  				"session_id":       "session123",
   384  				"session_line_num": "1",
   385  				"command_tag":      "SELECT",
   386  				"error_severity":   "WARNING",
   387  			},
   388  		},
   389  	}
   390  
   391  	for i, tt := range testLines {
   392  		t.Run(string(rune('A'+i)), func(t *testing.T) {
   393  			matches := lp.LogsMatchRegex.FindStringSubmatch(tt.line)
   394  			assert.NotEmpty(t, matches, "regex should match the log line")
   395  
   396  			result := lp.regexMatchesToMap(matches)
   397  			for key, expected := range tt.expected {
   398  				assert.Equal(t, expected, result[key], "mismatch for key %s", key)
   399  			}
   400  		})
   401  	}
   402  }
   403  
   404  func TestLogParseLocal(t *testing.T) {
   405  	tempDir := t.TempDir()
   406  	logFile := filepath.Join(tempDir, "test.csv")
   407  
   408  	// Create a test log file with CSV format entries
   409  	logContent := `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,"duplicate key value violates unique constraint"
   410  	2023-12-01 10:30:46.124 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,2,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,WARNING,"this is a warning message"
   411  	2023-12-01 10:30:47.125 UTC,"postgres","otherdb",12346,"127.0.0.1:54322",session124,1,"INSERT",2023-12-01 10:30:00 UTC,1/235,568,ERROR,"another error message"
   412  	`
   413  
   414  	err := os.WriteFile(logFile, []byte(logContent), 0644)
   415  	require.NoError(t, err)
   416  
   417  	// Create a mock database connection
   418  	mock, err := pgxmock.NewPool()
   419  	require.NoError(t, err)
   420  	defer mock.Close()
   421  
   422  	mock.ExpectQuery(`select 
   423  		current_setting\('data_directory'\) as dd, 
   424  		current_setting\('log_directory'\) as ld,
   425  		current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
   426  		current_setting\('log_truncate_on_rotation'\) as log_trunc`).
   427  		WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
   428  			AddRow("", tempDir, "en", "off"))
   429  
   430  	mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
   431  		pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(true))
   432  
   433  	// Create a SourceConn for testing
   434  	sourceConn := &sources.SourceConn{
   435  		Source: sources.Source{
   436  			Name: "test-source",
   437  		},
   438  		Conn: mock,
   439  	}
   440  
   441  	// Create a context with timeout to prevent test from hanging
   442  	ctx, cancel := context.WithTimeout(testutil.TestContext, 2*time.Second)
   443  	defer cancel()
   444  
   445  	// Create a channel to receive measurement envelopes
   446  	storeCh := make(chan metrics.MeasurementEnvelope, 10)
   447  
   448  	lp, err := NewLogParser(ctx, sourceConn, storeCh)
   449  	require.NoError(t, err)
   450  	err = lp.ParseLogs()
   451  	assert.NoError(t, err)
   452  
   453  	// Ensure mock expectations were met.
   454  	assert.NoError(t, mock.ExpectationsWereMet())
   455  
   456  	// Wait for measurements to be sent or timeout
   457  	var measurement metrics.MeasurementEnvelope
   458  	select {
   459  	case measurement = <-storeCh:
   460  		assert.NotEmpty(t, measurement.Data, "Measurement data should not be empty")
   461  	case <-time.After(2 * time.Second):
   462  	}
   463  
   464  	assert.Equal(t, "test-source", measurement.DBName)
   465  	assert.Equal(t, specialMetricServerLogEventCounts, measurement.MetricName)
   466  
   467  	// Verify the data contains expected fields for both local and total counts
   468  	data := measurement.Data[0]
   469  	// Check that severity counts are present
   470  	_, hasError := data["error"]
   471  	_, hasWarning := data["warning"]
   472  	assert.True(t, hasError && hasWarning, "Should have at least error and warning")
   473  }
   474  
   475  func TestGetFileWithLatestTimestamp(t *testing.T) {
   476  	// Create temporary test files
   477  	tempDir := t.TempDir()
   478  
   479  	t.Run("single file", func(t *testing.T) {
   480  		file1 := filepath.Join(tempDir, "test1.log")
   481  		err := os.WriteFile(file1, []byte("test"), 0644)
   482  		require.NoError(t, err)
   483  
   484  		latest, err := getFileWithLatestTimestamp([]string{file1})
   485  		assert.NoError(t, err)
   486  		assert.Equal(t, file1, latest)
   487  	})
   488  
   489  	t.Run("multiple files with different timestamps", func(t *testing.T) {
   490  		file1 := filepath.Join(tempDir, "old.log")
   491  		file2 := filepath.Join(tempDir, "new.log")
   492  
   493  		// Create first file
   494  		err := os.WriteFile(file1, []byte("old"), 0644)
   495  		require.NoError(t, err)
   496  
   497  		// Wait to ensure different timestamps
   498  		time.Sleep(10 * time.Millisecond)
   499  
   500  		// Create second file (newer)
   501  		err = os.WriteFile(file2, []byte("new"), 0644)
   502  		require.NoError(t, err)
   503  
   504  		latest, err := getFileWithLatestTimestamp([]string{file1, file2})
   505  		assert.NoError(t, err)
   506  		assert.Equal(t, file2, latest)
   507  	})
   508  
   509  	t.Run("empty file list", func(t *testing.T) {
   510  		latest, err := getFileWithLatestTimestamp([]string{})
   511  		assert.NoError(t, err)
   512  		assert.Equal(t, "", latest)
   513  	})
   514  
   515  	t.Run("non-existent file", func(t *testing.T) {
   516  		nonExistent := filepath.Join(tempDir, "nonexistent.log")
   517  		latest, err := getFileWithLatestTimestamp([]string{nonExistent})
   518  		assert.Error(t, err)
   519  		assert.Equal(t, "", latest)
   520  	})
   521  }
   522  
   523  func TestGetFileWithNextModTimestamp(t *testing.T) {
   524  	tempDir := t.TempDir()
   525  
   526  	t.Run("finds next file", func(t *testing.T) {
   527  		file1 := filepath.Join(tempDir, "first.log")
   528  		file2 := filepath.Join(tempDir, "second.log")
   529  		file3 := filepath.Join(tempDir, "third.log")
   530  
   531  		// Create files with increasing timestamps
   532  		err := os.WriteFile(file1, []byte("first"), 0644)
   533  		require.NoError(t, err)
   534  
   535  		time.Sleep(10 * time.Millisecond)
   536  		err = os.WriteFile(file2, []byte("second"), 0644)
   537  		require.NoError(t, err)
   538  
   539  		time.Sleep(10 * time.Millisecond)
   540  		err = os.WriteFile(file3, []byte("third"), 0644)
   541  		require.NoError(t, err)
   542  
   543  		globPattern := filepath.Join(tempDir, "*.log")
   544  		next, err := getFileWithNextModTimestamp(globPattern, file1)
   545  		assert.NoError(t, err)
   546  		assert.Equal(t, file2, next)
   547  	})
   548  
   549  	t.Run("no next file", func(t *testing.T) {
   550  		file1 := filepath.Join(tempDir, "only.log")
   551  		err := os.WriteFile(file1, []byte("only"), 0644)
   552  		require.NoError(t, err)
   553  
   554  		globPattern := filepath.Join(tempDir, "*.log")
   555  		next, err := getFileWithNextModTimestamp(globPattern, file1)
   556  		assert.NoError(t, err)
   557  		assert.Equal(t, "", next)
   558  	})
   559  
   560  	t.Run("invalid glob pattern", func(t *testing.T) {
   561  		invalidGlob := "["
   562  		file1 := filepath.Join(tempDir, "test.log")
   563  		next, err := getFileWithNextModTimestamp(invalidGlob, file1)
   564  		assert.Error(t, err)
   565  		assert.Equal(t, "", next)
   566  	})
   567  }
   568  
   569  func TestLogParseRemote(t *testing.T) {
   570  	const (
   571  		testTimeout       = 3 * time.Second
   572  		channelBufferSize = 10
   573  		logFileName       = "postgresql.csv"
   574  		testDbName        = "testdb"
   575  	)
   576  
   577  	// Sample log content with 3 entries: 2 ERRORs in different DBs, 1 WARNING
   578  	logContent := `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,"duplicate key value violates unique constraint"
   579  2023-12-01 10:30:46.124 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,2,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,WARNING,"this is a warning message"
   580  2023-12-01 10:30:47.125 UTC,"postgres","otherdb",12346,"127.0.0.1:54322",session124,1,"INSERT",2023-12-01 10:30:00 UTC,1/235,568,ERROR,"another error message"
   581  `
   582  
   583  	t.Run("success - parses CSV logs with correct counts", func(t *testing.T) {
   584  		tempDir := t.TempDir()
   585  
   586  		mock, err := pgxmock.NewPool()
   587  		require.NoError(t, err)
   588  		defer mock.Close()
   589  
   590  		mock.ExpectQuery(`select 
   591  			current_setting\('data_directory'\) as dd, 
   592  			current_setting\('log_directory'\) as ld,
   593  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
   594  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
   595  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
   596  				AddRow("", tempDir, "en", "off"))
   597  
   598  		// Phase 2: Mode detection - returns false to trigger remote mode
   599  		mock.ExpectQuery(`SELECT COALESCE`).
   600  			WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false))
   601  
   602  		// Phase 3: Privilege check - verifies pg_ls_logdir() and pg_read_file() permissions
   603  		mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
   604  			WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow(logFileName))
   605  		mock.ExpectQuery(`select pg_read_file\(\$1, 0, 0\)`).
   606  			WithArgs(filepath.Join(tempDir, logFileName)).
   607  			WillReturnRows(pgxmock.NewRows([]string{"pg_read_file"}).AddRow("")) // 0 bytes read = permission test
   608  
   609  		// Phase 4: Log file discovery - finds the most recent CSV log file with existing content
   610  		// Note: parseLogsRemote sets offset = size on first run, so it starts at EOF and only reads new data
   611  		mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`).
   612  			WillReturnRows(pgxmock.NewRows([]string{"name", "size", "modification"}).
   613  				AddRow(logFileName, int32(len(logContent)), time.Now()))
   614  
   615  		sourceConn := &sources.SourceConn{
   616  			Source: sources.Source{
   617  				Name:    "test-source",
   618  				Metrics: map[string]float64{specialMetricServerLogEventCounts: 60}, // 60s interval - won't trigger during test
   619  			},
   620  			Conn: mock,
   621  		}
   622  		sourceConn.RealDbname = testDbName
   623  
   624  		ctx, cancel := context.WithTimeout(testutil.TestContext, 500*time.Millisecond)
   625  		defer cancel()
   626  
   627  		storeCh := make(chan metrics.MeasurementEnvelope, channelBufferSize)
   628  
   629  		lp, err := NewLogParser(ctx, sourceConn, storeCh)
   630  		require.NoError(t, err)
   631  
   632  		// Run ParseLogs in a goroutine since it runs infinitely until context cancels
   633  		go func() {
   634  			_ = lp.ParseLogs()
   635  		}()
   636  
   637  		// Wait for context to timeout
   638  		// Note: parseLogsRemote starts reading from EOF, so existing log content isn't parsed
   639  		// This test verifies the initialization and setup flow
   640  		<-ctx.Done()
   641  		time.Sleep(100 * time.Millisecond)
   642  
   643  		// Verify mock expectations were met (privilege check + file discovery)
   644  		assert.NoError(t, mock.ExpectationsWereMet(), "All mock expectations should be met")
   645  
   646  		cancel()
   647  	})
   648  
   649  	t.Run("handles empty log directory gracefully", func(t *testing.T) {
   650  		tempDir := t.TempDir()
   651  		mock, err := pgxmock.NewPool()
   652  		require.NoError(t, err)
   653  		defer mock.Close()
   654  
   655  		// Setup mocks for initialization
   656  		mock.ExpectQuery(`select 
   657  			current_setting\('data_directory'\) as dd, 
   658  			current_setting\('log_directory'\) as ld,
   659  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
   660  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
   661  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
   662  				AddRow("", tempDir, "en", "off"))
   663  		mock.ExpectQuery(`SELECT COALESCE`).
   664  			WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false))
   665  
   666  		// Privilege check passes
   667  		mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
   668  			WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow(logFileName))
   669  		mock.ExpectQuery(`select pg_read_file\(\$1, 0, 0\)`).
   670  			WithArgs(filepath.Join(tempDir, logFileName)).
   671  			WillReturnRows(pgxmock.NewRows([]string{"pg_read_file"}).AddRow(""))
   672  
   673  		// No CSV files found initially - parseLogsRemote will keep retrying
   674  		mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`).
   675  			WillReturnError(assert.AnError)
   676  		// Expect it to retry
   677  		mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`).
   678  			WillReturnError(assert.AnError)
   679  
   680  		sourceConn := &sources.SourceConn{
   681  			Source: sources.Source{
   682  				Name:    "test-source",
   683  				Metrics: map[string]float64{specialMetricServerLogEventCounts: 0.1},
   684  			},
   685  			Conn: mock,
   686  		}
   687  
   688  		ctx, cancel := context.WithTimeout(testutil.TestContext, 500*time.Millisecond)
   689  		defer cancel()
   690  
   691  		storeCh := make(chan metrics.MeasurementEnvelope, channelBufferSize)
   692  
   693  		lp, err := NewLogParser(ctx, sourceConn, storeCh)
   694  		require.NoError(t, err)
   695  
   696  		// Run in goroutine since it runs infinitely until context cancels
   697  		go func() {
   698  			_ = lp.ParseLogs()
   699  		}()
   700  
   701  		// Wait for context to timeout
   702  		<-ctx.Done()
   703  		time.Sleep(100 * time.Millisecond)
   704  
   705  		// No measurements should be received since no files were found
   706  		select {
   707  		case m := <-storeCh:
   708  			t.Errorf("Expected no measurements, but received: %+v", m)
   709  		default:
   710  			// Expected: no measurements
   711  		}
   712  	})
   713  
   714  	t.Run("malformed CSV entries are skipped gracefully", func(t *testing.T) {
   715  		tempDir := t.TempDir()
   716  		// Mix of valid and malformed log entries
   717  		malformedContent := `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,"valid entry"
   718  this is not a valid CSV line at all
   719  incomplete line without proper fields
   720  2023-12-01 10:30:47.125 UTC,"postgres","testdb",12346,"127.0.0.1:54322",session124,1,"INSERT",2023-12-01 10:30:00 UTC,1/235,568,WARNING,"another valid entry"
   721  `
   722  
   723  		mock, err := pgxmock.NewPool()
   724  		require.NoError(t, err)
   725  		defer mock.Close()
   726  
   727  		// Setup all required mocks
   728  		mock.ExpectQuery(`select 
   729  			current_setting\('data_directory'\) as dd, 
   730  			current_setting\('log_directory'\) as ld,
   731  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
   732  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
   733  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
   734  				AddRow("", tempDir, "en", "off"))
   735  		mock.ExpectQuery(`SELECT COALESCE`).
   736  			WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false))
   737  		mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
   738  			WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow(logFileName))
   739  		mock.ExpectQuery(`select pg_read_file\(\$1, 0, 0\)`).
   740  			WithArgs(filepath.Join(tempDir, logFileName)).
   741  			WillReturnRows(pgxmock.NewRows([]string{"pg_read_file"}).AddRow(""))
   742  
   743  		// Start at EOF (existing content won't be parsed initially)
   744  		mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`).
   745  			WillReturnRows(pgxmock.NewRows([]string{"name", "size", "modification"}).
   746  				AddRow(logFileName, int32(len(malformedContent)), time.Now()))
   747  
   748  		sourceConn := &sources.SourceConn{
   749  			Source: sources.Source{
   750  				Name:    "test-source",
   751  				Metrics: map[string]float64{specialMetricServerLogEventCounts: 60}, // Long interval
   752  			},
   753  			Conn: mock,
   754  		}
   755  		sourceConn.RealDbname = testDbName
   756  
   757  		ctx, cancel := context.WithTimeout(testutil.TestContext, 500*time.Millisecond)
   758  		defer cancel()
   759  
   760  		storeCh := make(chan metrics.MeasurementEnvelope, channelBufferSize)
   761  
   762  		lp, err := NewLogParser(ctx, sourceConn, storeCh)
   763  		require.NoError(t, err)
   764  
   765  		// Run in goroutine
   766  		go func() {
   767  			_ = lp.ParseLogs()
   768  		}()
   769  
   770  		// Wait for context to finish
   771  		<-ctx.Done()
   772  		time.Sleep(100 * time.Millisecond)
   773  
   774  		// This test verifies the parser doesn't crash on malformed entries
   775  		// Since we start at EOF and use a long interval, no parsing happens during the test
   776  		// The real test is that initialization succeeds without errors
   777  		assert.NoError(t, mock.ExpectationsWereMet())
   778  
   779  		cancel()
   780  	})
   781  
   782  	t.Run("file read permission denied during parse", func(t *testing.T) {
   783  		tempDir := t.TempDir()
   784  
   785  		mock, err := pgxmock.NewPool()
   786  		require.NoError(t, err)
   787  		defer mock.Close()
   788  
   789  		// Setup mocks - privilege check passes initially
   790  		mock.ExpectQuery(`select 
   791  			current_setting\('data_directory'\) as dd, 
   792  			current_setting\('log_directory'\) as ld,
   793  			current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
   794  			current_setting\('log_truncate_on_rotation'\) as log_trunc`).
   795  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
   796  				AddRow("", tempDir, "en", "off"))
   797  		mock.ExpectQuery(`SELECT COALESCE`).
   798  			WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false))
   799  		mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
   800  			WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow(logFileName))
   801  		mock.ExpectQuery(`select pg_read_file\(\$1, 0, 0\)`).
   802  			WithArgs(filepath.Join(tempDir, logFileName)).
   803  			WillReturnRows(pgxmock.NewRows([]string{"pg_read_file"}).AddRow(""))
   804  
   805  		// File discovery succeeds
   806  		mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`).
   807  			WillReturnRows(pgxmock.NewRows([]string{"name", "size", "modification"}).
   808  				AddRow(logFileName, int32(0), time.Now()))
   809  
   810  		// File state shows it has grown
   811  		mock.ExpectQuery(`select size, modification from pg_ls_logdir\(\) where name = \$1;`).
   812  			WithArgs(logFileName).
   813  			WillReturnRows(pgxmock.NewRows([]string{"size", "modification"}).
   814  				AddRow(int32(len(logContent)), time.Now()))
   815  
   816  		// But pg_read_file fails with permission error during actual read
   817  		mock.ExpectQuery(`select pg_read_file\(\$1, \$2, \$3\)`).
   818  			WithArgs(filepath.Join(tempDir, logFileName), int32(0), int32(len(logContent))).
   819  			WillReturnError(assert.AnError)
   820  
   821  		sourceConn := &sources.SourceConn{
   822  			Source: sources.Source{
   823  				Name:    "test-source",
   824  				Metrics: map[string]float64{specialMetricServerLogEventCounts: 0.1},
   825  			},
   826  			Conn: mock,
   827  		}
   828  
   829  		ctx, cancel := context.WithTimeout(testutil.TestContext, 500*time.Millisecond)
   830  		defer cancel()
   831  
   832  		storeCh := make(chan metrics.MeasurementEnvelope, channelBufferSize)
   833  
   834  		lp, err := NewLogParser(ctx, sourceConn, storeCh)
   835  		require.NoError(t, err)
   836  
   837  		// Run in goroutine
   838  		go func() {
   839  			_ = lp.ParseLogs() // It will log a warning and continue retrying
   840  		}()
   841  
   842  		// Wait for context to finish
   843  		<-ctx.Done()
   844  		time.Sleep(100 * time.Millisecond)
   845  
   846  		// No measurements should be sent since read failed
   847  		select {
   848  		case m := <-storeCh:
   849  			// The parser might send an empty measurement before the error
   850  			// Verify it's zeroed
   851  			data := m.Data[0]
   852  			assert.Equal(t, int64(0), data["error"], "Should have 0 errors since read failed")
   853  			assert.Equal(t, int64(0), data["warning"], "Should have 0 warnings since read failed")
   854  		default:
   855  			// Also acceptable: no measurement sent at all
   856  		}
   857  	})
   858  }
   859