...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/metrics/logparse_test.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/metrics

     1  package metrics
     2  
     3  import (
     4  	"context"
     5  	"os"
     6  	"path/filepath"
     7  	"regexp"
     8  	"testing"
     9  	"time"
    10  
    11  	"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
    12  	"github.com/pashagolub/pgxmock/v4"
    13  	"github.com/stretchr/testify/assert"
    14  	"github.com/stretchr/testify/require"
    15  )
    16  
    17  var testCtx = context.Background()
    18  
    19  func TestGetFileWithLatestTimestamp(t *testing.T) {
    20  	// Create temporary test files
    21  	tempDir := t.TempDir()
    22  
    23  	t.Run("single file", func(t *testing.T) {
    24  		file1 := filepath.Join(tempDir, "test1.log")
    25  		err := os.WriteFile(file1, []byte("test"), 0644)
    26  		require.NoError(t, err)
    27  
    28  		latest, err := getFileWithLatestTimestamp([]string{file1})
    29  		assert.NoError(t, err)
    30  		assert.Equal(t, file1, latest)
    31  	})
    32  
    33  	t.Run("multiple files with different timestamps", func(t *testing.T) {
    34  		file1 := filepath.Join(tempDir, "old.log")
    35  		file2 := filepath.Join(tempDir, "new.log")
    36  
    37  		// Create first file
    38  		err := os.WriteFile(file1, []byte("old"), 0644)
    39  		require.NoError(t, err)
    40  
    41  		// Wait to ensure different timestamps
    42  		time.Sleep(10 * time.Millisecond)
    43  
    44  		// Create second file (newer)
    45  		err = os.WriteFile(file2, []byte("new"), 0644)
    46  		require.NoError(t, err)
    47  
    48  		latest, err := getFileWithLatestTimestamp([]string{file1, file2})
    49  		assert.NoError(t, err)
    50  		assert.Equal(t, file2, latest)
    51  	})
    52  
    53  	t.Run("empty file list", func(t *testing.T) {
    54  		latest, err := getFileWithLatestTimestamp([]string{})
    55  		assert.NoError(t, err)
    56  		assert.Equal(t, "", latest)
    57  	})
    58  
    59  	t.Run("non-existent file", func(t *testing.T) {
    60  		nonExistent := filepath.Join(tempDir, "nonexistent.log")
    61  		latest, err := getFileWithLatestTimestamp([]string{nonExistent})
    62  		assert.Error(t, err)
    63  		assert.Equal(t, "", latest)
    64  	})
    65  }
    66  
    67  func TestGetFileWithNextModTimestamp(t *testing.T) {
    68  	tempDir := t.TempDir()
    69  
    70  	t.Run("finds next file", func(t *testing.T) {
    71  		file1 := filepath.Join(tempDir, "first.log")
    72  		file2 := filepath.Join(tempDir, "second.log")
    73  		file3 := filepath.Join(tempDir, "third.log")
    74  
    75  		// Create files with increasing timestamps
    76  		err := os.WriteFile(file1, []byte("first"), 0644)
    77  		require.NoError(t, err)
    78  
    79  		time.Sleep(10 * time.Millisecond)
    80  		err = os.WriteFile(file2, []byte("second"), 0644)
    81  		require.NoError(t, err)
    82  
    83  		time.Sleep(10 * time.Millisecond)
    84  		err = os.WriteFile(file3, []byte("third"), 0644)
    85  		require.NoError(t, err)
    86  
    87  		globPattern := filepath.Join(tempDir, "*.log")
    88  		next, err := getFileWithNextModTimestamp(globPattern, file1)
    89  		assert.NoError(t, err)
    90  		assert.Equal(t, file2, next)
    91  	})
    92  
    93  	t.Run("no next file", func(t *testing.T) {
    94  		file1 := filepath.Join(tempDir, "only.log")
    95  		err := os.WriteFile(file1, []byte("only"), 0644)
    96  		require.NoError(t, err)
    97  
    98  		globPattern := filepath.Join(tempDir, "*.log")
    99  		next, err := getFileWithNextModTimestamp(globPattern, file1)
   100  		assert.NoError(t, err)
   101  		assert.Equal(t, "", next)
   102  	})
   103  
   104  	t.Run("invalid glob pattern", func(t *testing.T) {
   105  		invalidGlob := "["
   106  		file1 := filepath.Join(tempDir, "test.log")
   107  		next, err := getFileWithNextModTimestamp(invalidGlob, file1)
   108  		assert.Error(t, err)
   109  		assert.Equal(t, "", next)
   110  	})
   111  }
   112  
   113  func TestEventCountsToMetricStoreMessages(t *testing.T) {
   114  	mdb := &sources.SourceConn{
   115  		Source: sources.Source{
   116  			Name:       "test-db",
   117  			Kind:       sources.SourcePostgres,
   118  			CustomTags: map[string]string{"env": "test"},
   119  		},
   120  	}
   121  
   122  	eventCounts := map[string]int64{
   123  		"ERROR":   5,
   124  		"WARNING": 10,
   125  	}
   126  
   127  	eventCountsTotal := map[string]int64{
   128  		"ERROR":   15,
   129  		"WARNING": 25,
   130  		"INFO":    50,
   131  	}
   132  
   133  	result := eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb)
   134  
   135  	assert.Equal(t, "test-db", result.DBName)
   136  	assert.Equal(t, specialMetricServerLogEventCounts, result.MetricName)
   137  	assert.Equal(t, map[string]string{"env": "test"}, result.CustomTags)
   138  
   139  	// Check that all severities are present in the measurement
   140  	assert.Len(t, result.Data, 1)
   141  	measurement := result.Data[0]
   142  
   143  	// Check individual severities
   144  	assert.Equal(t, int64(5), measurement["error"])
   145  	assert.Equal(t, int64(10), measurement["warning"])
   146  	assert.Equal(t, int64(0), measurement["info"])  // Not in eventCounts
   147  	assert.Equal(t, int64(0), measurement["debug"]) // Not in either map
   148  
   149  	// Check total counts
   150  	assert.Equal(t, int64(15), measurement["error_total"])
   151  	assert.Equal(t, int64(25), measurement["warning_total"])
   152  	assert.Equal(t, int64(50), measurement["info_total"])
   153  	assert.Equal(t, int64(0), measurement["debug_total"])
   154  }
   155  
   156  func TestSeverityToEnglish(t *testing.T) {
   157  	tests := []struct {
   158  		serverLang    string
   159  		errorSeverity string
   160  		expected      string
   161  	}{
   162  		{"en", "ERROR", "ERROR"},
   163  		{"de", "FEHLER", "ERROR"},
   164  		{"fr", "ERREUR", "ERROR"},
   165  		{"de", "WARNUNG", "WARNING"},
   166  		{"ru", "ОШИБКА", "ERROR"},
   167  		{"zh", "错误", "ERROR"},
   168  		{"unknown", "ERROR", "ERROR"},                  // Unknown language, return as-is
   169  		{"de", "UNKNOWN_SEVERITY", "UNKNOWN_SEVERITY"}, // Unknown severity in known language
   170  	}
   171  
   172  	for _, tt := range tests {
   173  		t.Run(tt.serverLang+"_"+tt.errorSeverity, func(t *testing.T) {
   174  			result := severityToEnglish(tt.serverLang, tt.errorSeverity)
   175  			assert.Equal(t, tt.expected, result)
   176  		})
   177  	}
   178  }
   179  
   180  func TestZeroEventCounts(t *testing.T) {
   181  	eventCounts := map[string]int64{
   182  		"ERROR":   5,
   183  		"WARNING": 10,
   184  		"INFO":    15,
   185  	}
   186  
   187  	zeroEventCounts(eventCounts)
   188  
   189  	// Check that all PgSeverities are zeroed
   190  	for _, severity := range PgSeverities {
   191  		assert.Equal(t, int64(0), eventCounts[severity])
   192  	}
   193  }
   194  
   195  func TestTryDetermineLogFolder(t *testing.T) {
   196  	t.Run("absolute log directory", func(t *testing.T) {
   197  		mock, err := pgxmock.NewPool()
   198  		require.NoError(t, err)
   199  		defer mock.Close()
   200  
   201  		mock.ExpectQuery(`select current_setting\('data_directory'\) as dd, current_setting\('log_directory'\) as ld`).
   202  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld"}).
   203  				AddRow("/data", "/var/log/postgresql"))
   204  
   205  		logPath, err := tryDetermineLogFolder(testCtx, mock)
   206  		assert.NoError(t, err)
   207  		assert.Equal(t, "/var/log/postgresql/*.csv", logPath)
   208  		assert.NoError(t, mock.ExpectationsWereMet())
   209  	})
   210  
   211  	t.Run("relative log directory", func(t *testing.T) {
   212  		mock, err := pgxmock.NewPool()
   213  		require.NoError(t, err)
   214  		defer mock.Close()
   215  
   216  		mock.ExpectQuery(`select current_setting\('data_directory'\) as dd, current_setting\('log_directory'\) as ld`).
   217  			WillReturnRows(pgxmock.NewRows([]string{"dd", "ld"}).
   218  				AddRow("/data", "log"))
   219  
   220  		logPath, err := tryDetermineLogFolder(testCtx, mock)
   221  		assert.NoError(t, err)
   222  		assert.Equal(t, "/data/log/*.csv", logPath)
   223  		assert.NoError(t, mock.ExpectationsWereMet())
   224  	})
   225  
   226  	t.Run("query error", func(t *testing.T) {
   227  		mock, err := pgxmock.NewPool()
   228  		require.NoError(t, err)
   229  		defer mock.Close()
   230  
   231  		mock.ExpectQuery(`select current_setting\('data_directory'\) as dd, current_setting\('log_directory'\) as ld`).
   232  			WillReturnError(assert.AnError)
   233  
   234  		logPath, err := tryDetermineLogFolder(testCtx, mock)
   235  		assert.Error(t, err)
   236  		assert.Equal(t, "", logPath)
   237  		assert.NoError(t, mock.ExpectationsWereMet())
   238  	})
   239  }
   240  
   241  func TestTryDetermineLogMessagesLanguage(t *testing.T) {
   242  	t.Run("known language", func(t *testing.T) {
   243  		mock, err := pgxmock.NewPool()
   244  		require.NoError(t, err)
   245  		defer mock.Close()
   246  
   247  		mock.ExpectQuery(`select current_setting\('lc_messages'\)::varchar\(2\) as lc_messages;`).
   248  			WillReturnRows(pgxmock.NewRows([]string{"lc_messages"}).AddRow("de"))
   249  
   250  		lang, err := tryDetermineLogMessagesLanguage(testCtx, mock)
   251  		assert.NoError(t, err)
   252  		assert.Equal(t, "de", lang)
   253  		assert.NoError(t, mock.ExpectationsWereMet())
   254  	})
   255  
   256  	t.Run("unknown language defaults to en", func(t *testing.T) {
   257  		mock, err := pgxmock.NewPool()
   258  		require.NoError(t, err)
   259  		defer mock.Close()
   260  
   261  		mock.ExpectQuery(`select current_setting\('lc_messages'\)::varchar\(2\) as lc_messages;`).
   262  			WillReturnRows(pgxmock.NewRows([]string{"lc_messages"}).AddRow("xx"))
   263  
   264  		lang, err := tryDetermineLogMessagesLanguage(testCtx, mock)
   265  		assert.NoError(t, err)
   266  		assert.Equal(t, "en", lang)
   267  		assert.NoError(t, mock.ExpectationsWereMet())
   268  	})
   269  
   270  	t.Run("query error", func(t *testing.T) {
   271  		mock, err := pgxmock.NewPool()
   272  		require.NoError(t, err)
   273  		defer mock.Close()
   274  
   275  		mock.ExpectQuery(`select current_setting\('lc_messages'\)::varchar\(2\) as lc_messages;`).
   276  			WillReturnError(assert.AnError)
   277  
   278  		lang, err := tryDetermineLogMessagesLanguage(testCtx, mock)
   279  		assert.Error(t, err)
   280  		assert.Equal(t, "", lang)
   281  		assert.NoError(t, mock.ExpectationsWereMet())
   282  	})
   283  }
   284  
   285  func TestRegexMatchesToMap(t *testing.T) {
   286  	t.Run("successful match", func(t *testing.T) {
   287  		regex := regexp.MustCompile(`(?P<severity>\w+): (?P<message>.+)`)
   288  		matches := []string{"ERROR: Something went wrong", "ERROR", "Something went wrong"}
   289  
   290  		result := regexMatchesToMap(regex, matches)
   291  		expected := map[string]string{
   292  			"severity": "ERROR",
   293  			"message":  "Something went wrong",
   294  		}
   295  
   296  		assert.Equal(t, expected, result)
   297  	})
   298  
   299  	t.Run("no matches", func(t *testing.T) {
   300  		regex := regexp.MustCompile(`(?P<severity>\w+): (?P<message>.+)`)
   301  		matches := []string{}
   302  
   303  		result := regexMatchesToMap(regex, matches)
   304  		assert.Empty(t, result)
   305  	})
   306  
   307  	t.Run("nil regex", func(t *testing.T) {
   308  		matches := []string{"test"}
   309  
   310  		result := regexMatchesToMap(nil, matches)
   311  		assert.Empty(t, result)
   312  	})
   313  }
   314  
   315  func TestCSVLogRegex(t *testing.T) {
   316  	// Test the default CSV log regex with sample log lines
   317  	regex, err := regexp.Compile(CSVLogDefaultRegEx)
   318  	require.NoError(t, err)
   319  
   320  	testLines := []struct {
   321  		line     string
   322  		expected map[string]string
   323  	}{
   324  		{
   325  			line: `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,`,
   326  			expected: map[string]string{
   327  				"log_time":         "2023-12-01 10:30:45.123 UTC",
   328  				"user_name":        "postgres",
   329  				"database_name":    "testdb",
   330  				"process_id":       "12345",
   331  				"connection_from":  "127.0.0.1:54321",
   332  				"session_id":       "session123",
   333  				"session_line_num": "1",
   334  				"command_tag":      "SELECT",
   335  				"error_severity":   "ERROR",
   336  			},
   337  		},
   338  		{
   339  			line: `2023-12-01 10:30:45.123 UTC,postgres,testdb,12345,127.0.0.1:54321,session123,1,SELECT,2023-12-01 10:30:00 UTC,1/234,567,WARNING,`,
   340  			expected: map[string]string{
   341  				"log_time":         "2023-12-01 10:30:45.123 UTC",
   342  				"user_name":        "postgres",
   343  				"database_name":    "testdb",
   344  				"process_id":       "12345",
   345  				"connection_from":  "127.0.0.1:54321",
   346  				"session_id":       "session123",
   347  				"session_line_num": "1",
   348  				"command_tag":      "SELECT",
   349  				"error_severity":   "WARNING",
   350  			},
   351  		},
   352  	}
   353  
   354  	for i, tt := range testLines {
   355  		t.Run(string(rune('A'+i)), func(t *testing.T) {
   356  			matches := regex.FindStringSubmatch(tt.line)
   357  			assert.NotEmpty(t, matches, "regex should match the log line")
   358  
   359  			result := regexMatchesToMap(regex, matches)
   360  			for key, expected := range tt.expected {
   361  				assert.Equal(t, expected, result[key], "mismatch for key %s", key)
   362  			}
   363  		})
   364  	}
   365  }
   366  
   367  // Integration test that creates actual log files and tests the ParseLogs function
   368  func TestLogParse(t *testing.T) {
   369  	tempDir := t.TempDir()
   370  	logFile := filepath.Join(tempDir, "test.csv")
   371  
   372  	// Create a test log file with CSV format entries
   373  	logContent := `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,"duplicate key value violates unique constraint"
   374  2023-12-01 10:30:46.124 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,2,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,WARNING,"this is a warning message"
   375  2023-12-01 10:30:47.125 UTC,"postgres","otherdb",12346,"127.0.0.1:54322",session124,1,"INSERT",2023-12-01 10:30:00 UTC,1/235,568,ERROR,"another error message"
   376  `
   377  
   378  	err := os.WriteFile(logFile, []byte(logContent), 0644)
   379  	require.NoError(t, err)
   380  
   381  	// Create a mock database connection
   382  	mock, err := pgxmock.NewPool()
   383  	require.NoError(t, err)
   384  	defer mock.Close()
   385  
   386  	// pretend we're connected via UNIX socket
   387  	mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
   388  		pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(true))
   389  	// Mock the language detection query
   390  	mock.ExpectQuery(`select current_setting\('lc_messages'\)::varchar\(2\) as lc_messages;`).
   391  		WillReturnRows(pgxmock.NewRows([]string{"lc_messages"}).AddRow("en"))
   392  
   393  	// Create a SourceConn for testing
   394  	sourceConn := &sources.SourceConn{
   395  		Source: sources.Source{
   396  			Name: "test-source",
   397  			HostConfig: sources.HostConfigAttrs{
   398  				LogsGlobPath: filepath.Join(tempDir, "*.csv"),
   399  				// Use default regex by leaving LogsMatchRegex empty
   400  			},
   401  		},
   402  		Conn: mock,
   403  	}
   404  
   405  	// Create a context with timeout to prevent test from hanging
   406  	ctx, cancel := context.WithTimeout(testCtx, 3*time.Second)
   407  	defer cancel()
   408  
   409  	// Create a channel to receive measurement envelopes
   410  	storeCh := make(chan MeasurementEnvelope, 10)
   411  
   412  	// Use a short interval for testing (0.5 seconds)
   413  	ParseLogs(ctx, sourceConn, "testdb", 0.5, storeCh)
   414  
   415  	// Wait for measurements to be sent or timeout
   416  	var measurement MeasurementEnvelope
   417  	select {
   418  	case measurement = <-storeCh:
   419  		assert.NotEmpty(t, measurement.Data, "Measurement data should not be empty")
   420  	case <-time.After(2 * time.Second):
   421  		break
   422  	}
   423  
   424  	assert.Equal(t, "test-source", measurement.DBName)
   425  	assert.Equal(t, specialMetricServerLogEventCounts, measurement.MetricName)
   426  
   427  	// Verify the data contains expected fields for both local and total counts
   428  	data := measurement.Data[0]
   429  	// Check that severity counts are present
   430  	_, hasError := data["error"]
   431  	_, hasWarning := data["warning"]
   432  	assert.True(t, hasError && hasWarning, "Should have at least error and warning")
   433  
   434  	// Ensure mock expectations were met
   435  	assert.NoError(t, mock.ExpectationsWereMet())
   436  }
   437