1 package metrics
2
3 import (
4 "context"
5 "os"
6 "path/filepath"
7 "regexp"
8 "testing"
9 "time"
10
11 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
12 "github.com/pashagolub/pgxmock/v4"
13 "github.com/stretchr/testify/assert"
14 "github.com/stretchr/testify/require"
15 )
16
17 var testCtx = context.Background()
18
19 func TestGetFileWithLatestTimestamp(t *testing.T) {
20
21 tempDir := t.TempDir()
22
23 t.Run("single file", func(t *testing.T) {
24 file1 := filepath.Join(tempDir, "test1.log")
25 err := os.WriteFile(file1, []byte("test"), 0644)
26 require.NoError(t, err)
27
28 latest, err := getFileWithLatestTimestamp([]string{file1})
29 assert.NoError(t, err)
30 assert.Equal(t, file1, latest)
31 })
32
33 t.Run("multiple files with different timestamps", func(t *testing.T) {
34 file1 := filepath.Join(tempDir, "old.log")
35 file2 := filepath.Join(tempDir, "new.log")
36
37
38 err := os.WriteFile(file1, []byte("old"), 0644)
39 require.NoError(t, err)
40
41
42 time.Sleep(10 * time.Millisecond)
43
44
45 err = os.WriteFile(file2, []byte("new"), 0644)
46 require.NoError(t, err)
47
48 latest, err := getFileWithLatestTimestamp([]string{file1, file2})
49 assert.NoError(t, err)
50 assert.Equal(t, file2, latest)
51 })
52
53 t.Run("empty file list", func(t *testing.T) {
54 latest, err := getFileWithLatestTimestamp([]string{})
55 assert.NoError(t, err)
56 assert.Equal(t, "", latest)
57 })
58
59 t.Run("non-existent file", func(t *testing.T) {
60 nonExistent := filepath.Join(tempDir, "nonexistent.log")
61 latest, err := getFileWithLatestTimestamp([]string{nonExistent})
62 assert.Error(t, err)
63 assert.Equal(t, "", latest)
64 })
65 }
66
67 func TestGetFileWithNextModTimestamp(t *testing.T) {
68 tempDir := t.TempDir()
69
70 t.Run("finds next file", func(t *testing.T) {
71 file1 := filepath.Join(tempDir, "first.log")
72 file2 := filepath.Join(tempDir, "second.log")
73 file3 := filepath.Join(tempDir, "third.log")
74
75
76 err := os.WriteFile(file1, []byte("first"), 0644)
77 require.NoError(t, err)
78
79 time.Sleep(10 * time.Millisecond)
80 err = os.WriteFile(file2, []byte("second"), 0644)
81 require.NoError(t, err)
82
83 time.Sleep(10 * time.Millisecond)
84 err = os.WriteFile(file3, []byte("third"), 0644)
85 require.NoError(t, err)
86
87 globPattern := filepath.Join(tempDir, "*.log")
88 next, err := getFileWithNextModTimestamp(globPattern, file1)
89 assert.NoError(t, err)
90 assert.Equal(t, file2, next)
91 })
92
93 t.Run("no next file", func(t *testing.T) {
94 file1 := filepath.Join(tempDir, "only.log")
95 err := os.WriteFile(file1, []byte("only"), 0644)
96 require.NoError(t, err)
97
98 globPattern := filepath.Join(tempDir, "*.log")
99 next, err := getFileWithNextModTimestamp(globPattern, file1)
100 assert.NoError(t, err)
101 assert.Equal(t, "", next)
102 })
103
104 t.Run("invalid glob pattern", func(t *testing.T) {
105 invalidGlob := "["
106 file1 := filepath.Join(tempDir, "test.log")
107 next, err := getFileWithNextModTimestamp(invalidGlob, file1)
108 assert.Error(t, err)
109 assert.Equal(t, "", next)
110 })
111 }
112
113 func TestEventCountsToMetricStoreMessages(t *testing.T) {
114 mdb := &sources.SourceConn{
115 Source: sources.Source{
116 Name: "test-db",
117 Kind: sources.SourcePostgres,
118 CustomTags: map[string]string{"env": "test"},
119 },
120 }
121
122 eventCounts := map[string]int64{
123 "ERROR": 5,
124 "WARNING": 10,
125 }
126
127 eventCountsTotal := map[string]int64{
128 "ERROR": 15,
129 "WARNING": 25,
130 "INFO": 50,
131 }
132
133 result := eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb)
134
135 assert.Equal(t, "test-db", result.DBName)
136 assert.Equal(t, specialMetricServerLogEventCounts, result.MetricName)
137 assert.Equal(t, map[string]string{"env": "test"}, result.CustomTags)
138
139
140 assert.Len(t, result.Data, 1)
141 measurement := result.Data[0]
142
143
144 assert.Equal(t, int64(5), measurement["error"])
145 assert.Equal(t, int64(10), measurement["warning"])
146 assert.Equal(t, int64(0), measurement["info"])
147 assert.Equal(t, int64(0), measurement["debug"])
148
149
150 assert.Equal(t, int64(15), measurement["error_total"])
151 assert.Equal(t, int64(25), measurement["warning_total"])
152 assert.Equal(t, int64(50), measurement["info_total"])
153 assert.Equal(t, int64(0), measurement["debug_total"])
154 }
155
156 func TestSeverityToEnglish(t *testing.T) {
157 tests := []struct {
158 serverLang string
159 errorSeverity string
160 expected string
161 }{
162 {"en", "ERROR", "ERROR"},
163 {"de", "FEHLER", "ERROR"},
164 {"fr", "ERREUR", "ERROR"},
165 {"de", "WARNUNG", "WARNING"},
166 {"ru", "ОШИБКА", "ERROR"},
167 {"zh", "错误", "ERROR"},
168 {"unknown", "ERROR", "ERROR"},
169 {"de", "UNKNOWN_SEVERITY", "UNKNOWN_SEVERITY"},
170 }
171
172 for _, tt := range tests {
173 t.Run(tt.serverLang+"_"+tt.errorSeverity, func(t *testing.T) {
174 result := severityToEnglish(tt.serverLang, tt.errorSeverity)
175 assert.Equal(t, tt.expected, result)
176 })
177 }
178 }
179
180 func TestZeroEventCounts(t *testing.T) {
181 eventCounts := map[string]int64{
182 "ERROR": 5,
183 "WARNING": 10,
184 "INFO": 15,
185 }
186
187 zeroEventCounts(eventCounts)
188
189
190 for _, severity := range PgSeverities {
191 assert.Equal(t, int64(0), eventCounts[severity])
192 }
193 }
194
195 func TestTryDetermineLogFolder(t *testing.T) {
196 t.Run("absolute log directory", func(t *testing.T) {
197 mock, err := pgxmock.NewPool()
198 require.NoError(t, err)
199 defer mock.Close()
200
201 mock.ExpectQuery(`select current_setting\('data_directory'\) as dd, current_setting\('log_directory'\) as ld`).
202 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld"}).
203 AddRow("/data", "/var/log/postgresql"))
204
205 logPath, err := tryDetermineLogFolder(testCtx, mock)
206 assert.NoError(t, err)
207 assert.Equal(t, "/var/log/postgresql/*.csv", logPath)
208 assert.NoError(t, mock.ExpectationsWereMet())
209 })
210
211 t.Run("relative log directory", func(t *testing.T) {
212 mock, err := pgxmock.NewPool()
213 require.NoError(t, err)
214 defer mock.Close()
215
216 mock.ExpectQuery(`select current_setting\('data_directory'\) as dd, current_setting\('log_directory'\) as ld`).
217 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld"}).
218 AddRow("/data", "log"))
219
220 logPath, err := tryDetermineLogFolder(testCtx, mock)
221 assert.NoError(t, err)
222 assert.Equal(t, "/data/log/*.csv", logPath)
223 assert.NoError(t, mock.ExpectationsWereMet())
224 })
225
226 t.Run("query error", func(t *testing.T) {
227 mock, err := pgxmock.NewPool()
228 require.NoError(t, err)
229 defer mock.Close()
230
231 mock.ExpectQuery(`select current_setting\('data_directory'\) as dd, current_setting\('log_directory'\) as ld`).
232 WillReturnError(assert.AnError)
233
234 logPath, err := tryDetermineLogFolder(testCtx, mock)
235 assert.Error(t, err)
236 assert.Equal(t, "", logPath)
237 assert.NoError(t, mock.ExpectationsWereMet())
238 })
239 }
240
241 func TestTryDetermineLogMessagesLanguage(t *testing.T) {
242 t.Run("known language", func(t *testing.T) {
243 mock, err := pgxmock.NewPool()
244 require.NoError(t, err)
245 defer mock.Close()
246
247 mock.ExpectQuery(`select current_setting\('lc_messages'\)::varchar\(2\) as lc_messages;`).
248 WillReturnRows(pgxmock.NewRows([]string{"lc_messages"}).AddRow("de"))
249
250 lang, err := tryDetermineLogMessagesLanguage(testCtx, mock)
251 assert.NoError(t, err)
252 assert.Equal(t, "de", lang)
253 assert.NoError(t, mock.ExpectationsWereMet())
254 })
255
256 t.Run("unknown language defaults to en", func(t *testing.T) {
257 mock, err := pgxmock.NewPool()
258 require.NoError(t, err)
259 defer mock.Close()
260
261 mock.ExpectQuery(`select current_setting\('lc_messages'\)::varchar\(2\) as lc_messages;`).
262 WillReturnRows(pgxmock.NewRows([]string{"lc_messages"}).AddRow("xx"))
263
264 lang, err := tryDetermineLogMessagesLanguage(testCtx, mock)
265 assert.NoError(t, err)
266 assert.Equal(t, "en", lang)
267 assert.NoError(t, mock.ExpectationsWereMet())
268 })
269
270 t.Run("query error", func(t *testing.T) {
271 mock, err := pgxmock.NewPool()
272 require.NoError(t, err)
273 defer mock.Close()
274
275 mock.ExpectQuery(`select current_setting\('lc_messages'\)::varchar\(2\) as lc_messages;`).
276 WillReturnError(assert.AnError)
277
278 lang, err := tryDetermineLogMessagesLanguage(testCtx, mock)
279 assert.Error(t, err)
280 assert.Equal(t, "", lang)
281 assert.NoError(t, mock.ExpectationsWereMet())
282 })
283 }
284
285 func TestRegexMatchesToMap(t *testing.T) {
286 t.Run("successful match", func(t *testing.T) {
287 regex := regexp.MustCompile(`(?P<severity>\w+): (?P<message>.+)`)
288 matches := []string{"ERROR: Something went wrong", "ERROR", "Something went wrong"}
289
290 result := regexMatchesToMap(regex, matches)
291 expected := map[string]string{
292 "severity": "ERROR",
293 "message": "Something went wrong",
294 }
295
296 assert.Equal(t, expected, result)
297 })
298
299 t.Run("no matches", func(t *testing.T) {
300 regex := regexp.MustCompile(`(?P<severity>\w+): (?P<message>.+)`)
301 matches := []string{}
302
303 result := regexMatchesToMap(regex, matches)
304 assert.Empty(t, result)
305 })
306
307 t.Run("nil regex", func(t *testing.T) {
308 matches := []string{"test"}
309
310 result := regexMatchesToMap(nil, matches)
311 assert.Empty(t, result)
312 })
313 }
314
315 func TestCSVLogRegex(t *testing.T) {
316
317 regex, err := regexp.Compile(CSVLogDefaultRegEx)
318 require.NoError(t, err)
319
320 testLines := []struct {
321 line string
322 expected map[string]string
323 }{
324 {
325 line: `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,`,
326 expected: map[string]string{
327 "log_time": "2023-12-01 10:30:45.123 UTC",
328 "user_name": "postgres",
329 "database_name": "testdb",
330 "process_id": "12345",
331 "connection_from": "127.0.0.1:54321",
332 "session_id": "session123",
333 "session_line_num": "1",
334 "command_tag": "SELECT",
335 "error_severity": "ERROR",
336 },
337 },
338 {
339 line: `2023-12-01 10:30:45.123 UTC,postgres,testdb,12345,127.0.0.1:54321,session123,1,SELECT,2023-12-01 10:30:00 UTC,1/234,567,WARNING,`,
340 expected: map[string]string{
341 "log_time": "2023-12-01 10:30:45.123 UTC",
342 "user_name": "postgres",
343 "database_name": "testdb",
344 "process_id": "12345",
345 "connection_from": "127.0.0.1:54321",
346 "session_id": "session123",
347 "session_line_num": "1",
348 "command_tag": "SELECT",
349 "error_severity": "WARNING",
350 },
351 },
352 }
353
354 for i, tt := range testLines {
355 t.Run(string(rune('A'+i)), func(t *testing.T) {
356 matches := regex.FindStringSubmatch(tt.line)
357 assert.NotEmpty(t, matches, "regex should match the log line")
358
359 result := regexMatchesToMap(regex, matches)
360 for key, expected := range tt.expected {
361 assert.Equal(t, expected, result[key], "mismatch for key %s", key)
362 }
363 })
364 }
365 }
366
367
368 func TestLogParse(t *testing.T) {
369 tempDir := t.TempDir()
370 logFile := filepath.Join(tempDir, "test.csv")
371
372
373 logContent := `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,"duplicate key value violates unique constraint"
374 2023-12-01 10:30:46.124 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,2,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,WARNING,"this is a warning message"
375 2023-12-01 10:30:47.125 UTC,"postgres","otherdb",12346,"127.0.0.1:54322",session124,1,"INSERT",2023-12-01 10:30:00 UTC,1/235,568,ERROR,"another error message"
376 `
377
378 err := os.WriteFile(logFile, []byte(logContent), 0644)
379 require.NoError(t, err)
380
381
382 mock, err := pgxmock.NewPool()
383 require.NoError(t, err)
384 defer mock.Close()
385
386
387 mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
388 pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(true))
389
390 mock.ExpectQuery(`select current_setting\('lc_messages'\)::varchar\(2\) as lc_messages;`).
391 WillReturnRows(pgxmock.NewRows([]string{"lc_messages"}).AddRow("en"))
392
393
394 sourceConn := &sources.SourceConn{
395 Source: sources.Source{
396 Name: "test-source",
397 HostConfig: sources.HostConfigAttrs{
398 LogsGlobPath: filepath.Join(tempDir, "*.csv"),
399
400 },
401 },
402 Conn: mock,
403 }
404
405
406 ctx, cancel := context.WithTimeout(testCtx, 3*time.Second)
407 defer cancel()
408
409
410 storeCh := make(chan MeasurementEnvelope, 10)
411
412
413 ParseLogs(ctx, sourceConn, "testdb", 0.5, storeCh)
414
415
416 var measurement MeasurementEnvelope
417 select {
418 case measurement = <-storeCh:
419 assert.NotEmpty(t, measurement.Data, "Measurement data should not be empty")
420 case <-time.After(2 * time.Second):
421 break
422 }
423
424 assert.Equal(t, "test-source", measurement.DBName)
425 assert.Equal(t, specialMetricServerLogEventCounts, measurement.MetricName)
426
427
428 data := measurement.Data[0]
429
430 _, hasError := data["error"]
431 _, hasWarning := data["warning"]
432 assert.True(t, hasError && hasWarning, "Should have at least error and warning")
433
434
435 assert.NoError(t, mock.ExpectationsWereMet())
436 }
437