1 package reaper
2
3 import (
4 "context"
5 "os"
6 "path/filepath"
7 "regexp"
8 "testing"
9 "time"
10
11 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
12 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
13 "github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
14 "github.com/pashagolub/pgxmock/v4"
15 "github.com/stretchr/testify/assert"
16 "github.com/stretchr/testify/require"
17 )
18
19 func TestNewLogParser(t *testing.T) {
20 tempDir := t.TempDir()
21
22 mock, err := pgxmock.NewPool()
23 require.NoError(t, err)
24 defer mock.Close()
25
26 sourceConn := &sources.SourceConn{
27 Source: sources.Source{
28 Name: "test-source",
29 Metrics: map[string]float64{specialMetricServerLogEventCounts: 60.0},
30 },
31 Conn: mock,
32 }
33 storeCh := make(chan metrics.MeasurementEnvelope, 10)
34
35 t.Run("success", func(t *testing.T) {
36 mock.ExpectQuery(`select
37 current_setting\('data_directory'\) as dd,
38 current_setting\('log_directory'\) as ld,
39 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
40 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
41 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
42 AddRow("", tempDir, "en", "off"))
43
44 lp, err := NewLogParser(testutil.TestContext, sourceConn, storeCh)
45 assert.NoError(t, err)
46 assert.NotNil(t, lp)
47 assert.Equal(t, tempDir, lp.LogFolder)
48 assert.Equal(t, "en", lp.ServerMessagesLang)
49 assert.Equal(t, "off", lp.LogTruncOnRotation)
50 assert.Equal(t, 60.0, lp.Interval)
51 assert.NotNil(t, lp.LogsMatchRegex)
52 assert.NoError(t, mock.ExpectationsWereMet())
53 })
54
55 t.Run("tryDetermineLogSettings error", func(t *testing.T) {
56 mock.ExpectQuery(`select
57 current_setting\('data_directory'\) as dd,
58 current_setting\('log_directory'\) as ld,
59 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
60 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
61 WillReturnError(assert.AnError)
62
63 lp, err := NewLogParser(testutil.TestContext, sourceConn, storeCh)
64 assert.Error(t, err)
65 assert.Nil(t, lp)
66 assert.NoError(t, mock.ExpectationsWereMet())
67 })
68
69 t.Run("unknown language defaults to en", func(t *testing.T) {
70 mock.ExpectQuery(`select
71 current_setting\('data_directory'\) as dd,
72 current_setting\('log_directory'\) as ld,
73 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
74 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
75 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
76 AddRow("", tempDir, "zz", "off"))
77
78 lp, err := NewLogParser(testutil.TestContext, sourceConn, storeCh)
79 assert.NoError(t, err)
80 assert.NotNil(t, lp)
81 assert.Equal(t, "en", lp.ServerMessagesLang)
82 assert.NoError(t, mock.ExpectationsWereMet())
83 })
84
85 t.Run("relative log directory", func(t *testing.T) {
86 mock.ExpectQuery(`select
87 current_setting\('data_directory'\) as dd,
88 current_setting\('log_directory'\) as ld,
89 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
90 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
91 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
92 AddRow("/data", "pg_log", "de", "on"))
93
94 lp, err := NewLogParser(testutil.TestContext, sourceConn, storeCh)
95 assert.NoError(t, err)
96 assert.NotNil(t, lp)
97 assert.Equal(t, "/data/pg_log", lp.LogFolder)
98 assert.Equal(t, "de", lp.ServerMessagesLang)
99 assert.Equal(t, "on", lp.LogTruncOnRotation)
100 assert.NoError(t, mock.ExpectationsWereMet())
101 })
102 }
103
104 func TestTryDetermineLogSettings(t *testing.T) {
105 t.Run("absolute log directory - known lang", func(t *testing.T) {
106 mock, err := pgxmock.NewPool()
107 require.NoError(t, err)
108 defer mock.Close()
109
110 mock.ExpectQuery(`select
111 current_setting\('data_directory'\) as dd,
112 current_setting\('log_directory'\) as ld,
113 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
114 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
115 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
116 AddRow("/data", "/var/log/postgresql", "de", "off"))
117
118 logPath, lang, logTrunc, err := tryDetermineLogSettings(testutil.TestContext, mock)
119 assert.NoError(t, err)
120 assert.Equal(t, "/var/log/postgresql", logPath)
121 assert.Equal(t, "de", lang)
122 assert.Equal(t, "off", logTrunc)
123 assert.NoError(t, mock.ExpectationsWereMet())
124 })
125
126 t.Run("relative log directory - unknown lang", func(t *testing.T) {
127 mock, err := pgxmock.NewPool()
128 require.NoError(t, err)
129 defer mock.Close()
130
131 mock.ExpectQuery(`select
132 current_setting\('data_directory'\) as dd,
133 current_setting\('log_directory'\) as ld,
134 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
135 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
136 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
137 AddRow("/data", "log", "xx", "off"))
138
139 logPath, lang, logTrunc, err := tryDetermineLogSettings(testutil.TestContext, mock)
140 assert.NoError(t, err)
141 assert.Equal(t, "/data/log", logPath)
142 assert.Equal(t, "en", lang)
143 assert.Equal(t, "off", logTrunc)
144 assert.NoError(t, mock.ExpectationsWereMet())
145 })
146
147 t.Run("query error", func(t *testing.T) {
148 mock, err := pgxmock.NewPool()
149 require.NoError(t, err)
150 defer mock.Close()
151
152 mock.ExpectQuery(`select
153 current_setting\('data_directory'\) as dd,
154 current_setting\('log_directory'\) as ld,
155 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
156 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
157 WillReturnError(assert.AnError)
158
159 logPath, lang, logTrunc, err := tryDetermineLogSettings(testutil.TestContext, mock)
160 assert.Error(t, err)
161 assert.Equal(t, "", logPath)
162 assert.Equal(t, "", lang)
163 assert.Equal(t, "", logTrunc)
164 assert.NoError(t, mock.ExpectationsWereMet())
165 })
166 }
167
168 func TestCheckHasPrivileges(t *testing.T) {
169 tempDir := t.TempDir()
170
171 names := [2]string{"pg_ls_logdir() fails", "pg_read_file() permission denied"}
172 for _, name := range names {
173 t.Run("checkHasRemotePrivileges fails - "+name, func(t *testing.T) {
174 mock, err := pgxmock.NewPool()
175 require.NoError(t, err)
176 defer mock.Close()
177
178 mock.ExpectQuery(`select
179 current_setting\('data_directory'\) as dd,
180 current_setting\('log_directory'\) as ld,
181 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
182 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
183 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
184 AddRow("", tempDir, "en", "off"))
185
186
187 mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
188 pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false))
189
190 if name == "pg_ls_logdir() fails" {
191
192 mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
193 WillReturnError(assert.AnError)
194 } else {
195
196 mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
197 WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow("log.csv"))
198
199
200 mock.ExpectQuery(`select pg_read_file\(\$1, 0, 0\)`).
201 WithArgs(filepath.Join(tempDir, "log.csv")).
202 WillReturnError(assert.AnError)
203 }
204
205 sourceConn := &sources.SourceConn{
206 Source: sources.Source{
207 Name: "test-source",
208 },
209 Conn: mock,
210 }
211
212 storeCh := make(chan metrics.MeasurementEnvelope, 10)
213
214 lp, err := NewLogParser(testutil.TestContext, sourceConn, storeCh)
215 require.NoError(t, err)
216
217 err = lp.ParseLogs()
218 assert.Error(t, err)
219
220
221 assert.NoError(t, mock.ExpectationsWereMet())
222
223
224 select {
225 case measurement := <-storeCh:
226 t.Errorf("Expected no data, but got: %+v", measurement)
227 case <-time.After(time.Second):
228
229 }
230 })
231 }
232 }
233
234 func TestEventCountsToMetricStoreMessages(t *testing.T) {
235 mdb := &sources.SourceConn{
236 Source: sources.Source{
237 Name: "test-db",
238 Kind: sources.SourcePostgres,
239 CustomTags: map[string]string{"env": "test"},
240 },
241 }
242 lp := &LogParser{
243 SourceConn: mdb,
244 eventCounts: map[string]int64{
245 "ERROR": 5,
246 "WARNING": 10,
247 },
248 eventCountsTotal: map[string]int64{
249 "ERROR": 15,
250 "WARNING": 25,
251 "INFO": 50,
252 },
253 }
254 result := lp.GetMeasurementEnvelope()
255
256 assert.Equal(t, "test-db", result.DBName)
257 assert.Equal(t, specialMetricServerLogEventCounts, result.MetricName)
258 assert.Equal(t, map[string]string{"env": "test"}, result.CustomTags)
259
260
261 assert.Len(t, result.Data, 1)
262 measurement := result.Data[0]
263
264
265 assert.Equal(t, int64(5), measurement["error"])
266 assert.Equal(t, int64(10), measurement["warning"])
267 assert.Equal(t, int64(0), measurement["info"])
268 assert.Equal(t, int64(0), measurement["debug"])
269
270
271 assert.Equal(t, int64(15), measurement["error_total"])
272 assert.Equal(t, int64(25), measurement["warning_total"])
273 assert.Equal(t, int64(50), measurement["info_total"])
274 assert.Equal(t, int64(0), measurement["debug_total"])
275 }
276
277 func TestSeverityToEnglish(t *testing.T) {
278 tests := []struct {
279 serverLang string
280 errorSeverity string
281 expected string
282 }{
283 {"en", "ERROR", "ERROR"},
284 {"de", "FEHLER", "ERROR"},
285 {"fr", "ERREUR", "ERROR"},
286 {"de", "WARNUNG", "WARNING"},
287 {"ru", "ОШИБКА", "ERROR"},
288 {"zh", "错误", "ERROR"},
289 {"unknown", "ERROR", "ERROR"},
290 {"de", "UNKNOWN_SEVERITY", "UNKNOWN_SEVERITY"},
291 }
292
293 for _, tt := range tests {
294 t.Run(tt.serverLang+"_"+tt.errorSeverity, func(t *testing.T) {
295 result := severityToEnglish(tt.serverLang, tt.errorSeverity)
296 assert.Equal(t, tt.expected, result)
297 })
298 }
299 }
300
301 func TestZeroEventCounts(t *testing.T) {
302 eventCounts := map[string]int64{
303 "ERROR": 5,
304 "WARNING": 10,
305 "INFO": 15,
306 }
307
308 zeroEventCounts(eventCounts)
309
310
311 for _, severity := range pgSeverities {
312 assert.Equal(t, int64(0), eventCounts[severity])
313 }
314 }
315
316 func TestRegexMatchesToMap(t *testing.T) {
317 t.Run("successful match", func(t *testing.T) {
318 lp := &LogParser{
319 LogsMatchRegex: regexp.MustCompile(`(?P<severity>\w+): (?P<message>.+)`),
320 }
321 matches := []string{"ERROR: Something went wrong", "ERROR", "Something went wrong"}
322
323 result := lp.regexMatchesToMap(matches)
324 expected := map[string]string{
325 "severity": "ERROR",
326 "message": "Something went wrong",
327 }
328
329 assert.Equal(t, expected, result)
330 })
331
332 t.Run("no matches", func(t *testing.T) {
333 lp := &LogParser{
334 LogsMatchRegex: regexp.MustCompile(`(?P<severity>\w+): (?P<message>.+)`),
335 }
336 matches := []string{}
337
338 result := lp.regexMatchesToMap(matches)
339 assert.Empty(t, result)
340 })
341
342 t.Run("nil regex", func(t *testing.T) {
343 lp := &LogParser{}
344 matches := []string{"test"}
345
346 result := lp.regexMatchesToMap(matches)
347 assert.Empty(t, result)
348 })
349 }
350
351 func TestCSVLogRegex(t *testing.T) {
352
353 lp := &LogParser{
354 LogsMatchRegex: regexp.MustCompile(csvLogDefaultRegEx),
355 }
356
357 testLines := []struct {
358 line string
359 expected map[string]string
360 }{
361 {
362 line: `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,`,
363 expected: map[string]string{
364 "log_time": "2023-12-01 10:30:45.123 UTC",
365 "user_name": "postgres",
366 "database_name": "testdb",
367 "process_id": "12345",
368 "connection_from": "127.0.0.1:54321",
369 "session_id": "session123",
370 "session_line_num": "1",
371 "command_tag": "SELECT",
372 "error_severity": "ERROR",
373 },
374 },
375 {
376 line: `2023-12-01 10:30:45.123 UTC,postgres,testdb,12345,127.0.0.1:54321,session123,1,SELECT,2023-12-01 10:30:00 UTC,1/234,567,WARNING,`,
377 expected: map[string]string{
378 "log_time": "2023-12-01 10:30:45.123 UTC",
379 "user_name": "postgres",
380 "database_name": "testdb",
381 "process_id": "12345",
382 "connection_from": "127.0.0.1:54321",
383 "session_id": "session123",
384 "session_line_num": "1",
385 "command_tag": "SELECT",
386 "error_severity": "WARNING",
387 },
388 },
389 }
390
391 for i, tt := range testLines {
392 t.Run(string(rune('A'+i)), func(t *testing.T) {
393 matches := lp.LogsMatchRegex.FindStringSubmatch(tt.line)
394 assert.NotEmpty(t, matches, "regex should match the log line")
395
396 result := lp.regexMatchesToMap(matches)
397 for key, expected := range tt.expected {
398 assert.Equal(t, expected, result[key], "mismatch for key %s", key)
399 }
400 })
401 }
402 }
403
404 func TestLogParseLocal(t *testing.T) {
405 tempDir := t.TempDir()
406 logFile := filepath.Join(tempDir, "test.csv")
407
408
409 logContent := `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,"duplicate key value violates unique constraint"
410 2023-12-01 10:30:46.124 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,2,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,WARNING,"this is a warning message"
411 2023-12-01 10:30:47.125 UTC,"postgres","otherdb",12346,"127.0.0.1:54322",session124,1,"INSERT",2023-12-01 10:30:00 UTC,1/235,568,ERROR,"another error message"
412 `
413
414 err := os.WriteFile(logFile, []byte(logContent), 0644)
415 require.NoError(t, err)
416
417
418 mock, err := pgxmock.NewPool()
419 require.NoError(t, err)
420 defer mock.Close()
421
422 mock.ExpectQuery(`select
423 current_setting\('data_directory'\) as dd,
424 current_setting\('log_directory'\) as ld,
425 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
426 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
427 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
428 AddRow("", tempDir, "en", "off"))
429
430 mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
431 pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(true))
432
433
434 sourceConn := &sources.SourceConn{
435 Source: sources.Source{
436 Name: "test-source",
437 },
438 Conn: mock,
439 }
440
441
442 ctx, cancel := context.WithTimeout(testutil.TestContext, 2*time.Second)
443 defer cancel()
444
445
446 storeCh := make(chan metrics.MeasurementEnvelope, 10)
447
448 lp, err := NewLogParser(ctx, sourceConn, storeCh)
449 require.NoError(t, err)
450 err = lp.ParseLogs()
451 assert.NoError(t, err)
452
453
454 assert.NoError(t, mock.ExpectationsWereMet())
455
456
457 var measurement metrics.MeasurementEnvelope
458 select {
459 case measurement = <-storeCh:
460 assert.NotEmpty(t, measurement.Data, "Measurement data should not be empty")
461 case <-time.After(2 * time.Second):
462 }
463
464 assert.Equal(t, "test-source", measurement.DBName)
465 assert.Equal(t, specialMetricServerLogEventCounts, measurement.MetricName)
466
467
468 data := measurement.Data[0]
469
470 _, hasError := data["error"]
471 _, hasWarning := data["warning"]
472 assert.True(t, hasError && hasWarning, "Should have at least error and warning")
473 }
474
475 func TestGetFileWithLatestTimestamp(t *testing.T) {
476
477 tempDir := t.TempDir()
478
479 t.Run("single file", func(t *testing.T) {
480 file1 := filepath.Join(tempDir, "test1.log")
481 err := os.WriteFile(file1, []byte("test"), 0644)
482 require.NoError(t, err)
483
484 latest, err := getFileWithLatestTimestamp([]string{file1})
485 assert.NoError(t, err)
486 assert.Equal(t, file1, latest)
487 })
488
489 t.Run("multiple files with different timestamps", func(t *testing.T) {
490 file1 := filepath.Join(tempDir, "old.log")
491 file2 := filepath.Join(tempDir, "new.log")
492
493
494 err := os.WriteFile(file1, []byte("old"), 0644)
495 require.NoError(t, err)
496
497
498 time.Sleep(10 * time.Millisecond)
499
500
501 err = os.WriteFile(file2, []byte("new"), 0644)
502 require.NoError(t, err)
503
504 latest, err := getFileWithLatestTimestamp([]string{file1, file2})
505 assert.NoError(t, err)
506 assert.Equal(t, file2, latest)
507 })
508
509 t.Run("empty file list", func(t *testing.T) {
510 latest, err := getFileWithLatestTimestamp([]string{})
511 assert.NoError(t, err)
512 assert.Equal(t, "", latest)
513 })
514
515 t.Run("non-existent file", func(t *testing.T) {
516 nonExistent := filepath.Join(tempDir, "nonexistent.log")
517 latest, err := getFileWithLatestTimestamp([]string{nonExistent})
518 assert.Error(t, err)
519 assert.Equal(t, "", latest)
520 })
521 }
522
523 func TestGetFileWithNextModTimestamp(t *testing.T) {
524 tempDir := t.TempDir()
525
526 t.Run("finds next file", func(t *testing.T) {
527 file1 := filepath.Join(tempDir, "first.log")
528 file2 := filepath.Join(tempDir, "second.log")
529 file3 := filepath.Join(tempDir, "third.log")
530
531
532 err := os.WriteFile(file1, []byte("first"), 0644)
533 require.NoError(t, err)
534
535 time.Sleep(10 * time.Millisecond)
536 err = os.WriteFile(file2, []byte("second"), 0644)
537 require.NoError(t, err)
538
539 time.Sleep(10 * time.Millisecond)
540 err = os.WriteFile(file3, []byte("third"), 0644)
541 require.NoError(t, err)
542
543 globPattern := filepath.Join(tempDir, "*.log")
544 next, err := getFileWithNextModTimestamp(globPattern, file1)
545 assert.NoError(t, err)
546 assert.Equal(t, file2, next)
547 })
548
549 t.Run("no next file", func(t *testing.T) {
550 file1 := filepath.Join(tempDir, "only.log")
551 err := os.WriteFile(file1, []byte("only"), 0644)
552 require.NoError(t, err)
553
554 globPattern := filepath.Join(tempDir, "*.log")
555 next, err := getFileWithNextModTimestamp(globPattern, file1)
556 assert.NoError(t, err)
557 assert.Equal(t, "", next)
558 })
559
560 t.Run("invalid glob pattern", func(t *testing.T) {
561 invalidGlob := "["
562 file1 := filepath.Join(tempDir, "test.log")
563 next, err := getFileWithNextModTimestamp(invalidGlob, file1)
564 assert.Error(t, err)
565 assert.Equal(t, "", next)
566 })
567 }
568
569 func TestLogParseRemote(t *testing.T) {
570 const (
571 testTimeout = 3 * time.Second
572 channelBufferSize = 10
573 logFileName = "postgresql.csv"
574 testDbName = "testdb"
575 )
576
577
578 logContent := `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,"duplicate key value violates unique constraint"
579 2023-12-01 10:30:46.124 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,2,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,WARNING,"this is a warning message"
580 2023-12-01 10:30:47.125 UTC,"postgres","otherdb",12346,"127.0.0.1:54322",session124,1,"INSERT",2023-12-01 10:30:00 UTC,1/235,568,ERROR,"another error message"
581 `
582
583 t.Run("success - parses CSV logs with correct counts", func(t *testing.T) {
584 tempDir := t.TempDir()
585
586 mock, err := pgxmock.NewPool()
587 require.NoError(t, err)
588 defer mock.Close()
589
590 mock.ExpectQuery(`select
591 current_setting\('data_directory'\) as dd,
592 current_setting\('log_directory'\) as ld,
593 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
594 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
595 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
596 AddRow("", tempDir, "en", "off"))
597
598
599 mock.ExpectQuery(`SELECT COALESCE`).
600 WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false))
601
602
603 mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
604 WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow(logFileName))
605 mock.ExpectQuery(`select pg_read_file\(\$1, 0, 0\)`).
606 WithArgs(filepath.Join(tempDir, logFileName)).
607 WillReturnRows(pgxmock.NewRows([]string{"pg_read_file"}).AddRow(""))
608
609
610
611 mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`).
612 WillReturnRows(pgxmock.NewRows([]string{"name", "size", "modification"}).
613 AddRow(logFileName, int32(len(logContent)), time.Now()))
614
615 sourceConn := &sources.SourceConn{
616 Source: sources.Source{
617 Name: "test-source",
618 Metrics: map[string]float64{specialMetricServerLogEventCounts: 60},
619 },
620 Conn: mock,
621 }
622 sourceConn.RealDbname = testDbName
623
624 ctx, cancel := context.WithTimeout(testutil.TestContext, 500*time.Millisecond)
625 defer cancel()
626
627 storeCh := make(chan metrics.MeasurementEnvelope, channelBufferSize)
628
629 lp, err := NewLogParser(ctx, sourceConn, storeCh)
630 require.NoError(t, err)
631
632
633 go func() {
634 _ = lp.ParseLogs()
635 }()
636
637
638
639
640 <-ctx.Done()
641 time.Sleep(100 * time.Millisecond)
642
643
644 assert.NoError(t, mock.ExpectationsWereMet(), "All mock expectations should be met")
645
646 cancel()
647 })
648
649 t.Run("handles empty log directory gracefully", func(t *testing.T) {
650 tempDir := t.TempDir()
651 mock, err := pgxmock.NewPool()
652 require.NoError(t, err)
653 defer mock.Close()
654
655
656 mock.ExpectQuery(`select
657 current_setting\('data_directory'\) as dd,
658 current_setting\('log_directory'\) as ld,
659 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
660 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
661 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
662 AddRow("", tempDir, "en", "off"))
663 mock.ExpectQuery(`SELECT COALESCE`).
664 WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false))
665
666
667 mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
668 WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow(logFileName))
669 mock.ExpectQuery(`select pg_read_file\(\$1, 0, 0\)`).
670 WithArgs(filepath.Join(tempDir, logFileName)).
671 WillReturnRows(pgxmock.NewRows([]string{"pg_read_file"}).AddRow(""))
672
673
674 mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`).
675 WillReturnError(assert.AnError)
676
677 mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`).
678 WillReturnError(assert.AnError)
679
680 sourceConn := &sources.SourceConn{
681 Source: sources.Source{
682 Name: "test-source",
683 Metrics: map[string]float64{specialMetricServerLogEventCounts: 0.1},
684 },
685 Conn: mock,
686 }
687
688 ctx, cancel := context.WithTimeout(testutil.TestContext, 500*time.Millisecond)
689 defer cancel()
690
691 storeCh := make(chan metrics.MeasurementEnvelope, channelBufferSize)
692
693 lp, err := NewLogParser(ctx, sourceConn, storeCh)
694 require.NoError(t, err)
695
696
697 go func() {
698 _ = lp.ParseLogs()
699 }()
700
701
702 <-ctx.Done()
703 time.Sleep(100 * time.Millisecond)
704
705
706 select {
707 case m := <-storeCh:
708 t.Errorf("Expected no measurements, but received: %+v", m)
709 default:
710
711 }
712 })
713
714 t.Run("malformed CSV entries are skipped gracefully", func(t *testing.T) {
715 tempDir := t.TempDir()
716
717 malformedContent := `2023-12-01 10:30:45.123 UTC,"postgres","testdb",12345,"127.0.0.1:54321",session123,1,"SELECT",2023-12-01 10:30:00 UTC,1/234,567,ERROR,"valid entry"
718 this is not a valid CSV line at all
719 incomplete line without proper fields
720 2023-12-01 10:30:47.125 UTC,"postgres","testdb",12346,"127.0.0.1:54322",session124,1,"INSERT",2023-12-01 10:30:00 UTC,1/235,568,WARNING,"another valid entry"
721 `
722
723 mock, err := pgxmock.NewPool()
724 require.NoError(t, err)
725 defer mock.Close()
726
727
728 mock.ExpectQuery(`select
729 current_setting\('data_directory'\) as dd,
730 current_setting\('log_directory'\) as ld,
731 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
732 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
733 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
734 AddRow("", tempDir, "en", "off"))
735 mock.ExpectQuery(`SELECT COALESCE`).
736 WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false))
737 mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
738 WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow(logFileName))
739 mock.ExpectQuery(`select pg_read_file\(\$1, 0, 0\)`).
740 WithArgs(filepath.Join(tempDir, logFileName)).
741 WillReturnRows(pgxmock.NewRows([]string{"pg_read_file"}).AddRow(""))
742
743
744 mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`).
745 WillReturnRows(pgxmock.NewRows([]string{"name", "size", "modification"}).
746 AddRow(logFileName, int32(len(malformedContent)), time.Now()))
747
748 sourceConn := &sources.SourceConn{
749 Source: sources.Source{
750 Name: "test-source",
751 Metrics: map[string]float64{specialMetricServerLogEventCounts: 60},
752 },
753 Conn: mock,
754 }
755 sourceConn.RealDbname = testDbName
756
757 ctx, cancel := context.WithTimeout(testutil.TestContext, 500*time.Millisecond)
758 defer cancel()
759
760 storeCh := make(chan metrics.MeasurementEnvelope, channelBufferSize)
761
762 lp, err := NewLogParser(ctx, sourceConn, storeCh)
763 require.NoError(t, err)
764
765
766 go func() {
767 _ = lp.ParseLogs()
768 }()
769
770
771 <-ctx.Done()
772 time.Sleep(100 * time.Millisecond)
773
774
775
776
777 assert.NoError(t, mock.ExpectationsWereMet())
778
779 cancel()
780 })
781
782 t.Run("file read permission denied during parse", func(t *testing.T) {
783 tempDir := t.TempDir()
784
785 mock, err := pgxmock.NewPool()
786 require.NoError(t, err)
787 defer mock.Close()
788
789
790 mock.ExpectQuery(`select
791 current_setting\('data_directory'\) as dd,
792 current_setting\('log_directory'\) as ld,
793 current_setting\('lc_messages'\)::varchar\(2\) as lc_messages,
794 current_setting\('log_truncate_on_rotation'\) as log_trunc`).
795 WillReturnRows(pgxmock.NewRows([]string{"dd", "ld", "lc_messages", "log_trunc"}).
796 AddRow("", tempDir, "en", "off"))
797 mock.ExpectQuery(`SELECT COALESCE`).
798 WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false))
799 mock.ExpectQuery(`select name from pg_ls_logdir\(\) limit 1`).
800 WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow(logFileName))
801 mock.ExpectQuery(`select pg_read_file\(\$1, 0, 0\)`).
802 WithArgs(filepath.Join(tempDir, logFileName)).
803 WillReturnRows(pgxmock.NewRows([]string{"pg_read_file"}).AddRow(""))
804
805
806 mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`).
807 WillReturnRows(pgxmock.NewRows([]string{"name", "size", "modification"}).
808 AddRow(logFileName, int32(0), time.Now()))
809
810
811 mock.ExpectQuery(`select size, modification from pg_ls_logdir\(\) where name = \$1;`).
812 WithArgs(logFileName).
813 WillReturnRows(pgxmock.NewRows([]string{"size", "modification"}).
814 AddRow(int32(len(logContent)), time.Now()))
815
816
817 mock.ExpectQuery(`select pg_read_file\(\$1, \$2, \$3\)`).
818 WithArgs(filepath.Join(tempDir, logFileName), int32(0), int32(len(logContent))).
819 WillReturnError(assert.AnError)
820
821 sourceConn := &sources.SourceConn{
822 Source: sources.Source{
823 Name: "test-source",
824 Metrics: map[string]float64{specialMetricServerLogEventCounts: 0.1},
825 },
826 Conn: mock,
827 }
828
829 ctx, cancel := context.WithTimeout(testutil.TestContext, 500*time.Millisecond)
830 defer cancel()
831
832 storeCh := make(chan metrics.MeasurementEnvelope, channelBufferSize)
833
834 lp, err := NewLogParser(ctx, sourceConn, storeCh)
835 require.NoError(t, err)
836
837
838 go func() {
839 _ = lp.ParseLogs()
840 }()
841
842
843 <-ctx.Done()
844 time.Sleep(100 * time.Millisecond)
845
846
847 select {
848 case m := <-storeCh:
849
850
851 data := m.Data[0]
852 assert.Equal(t, int64(0), data["error"], "Should have 0 errors since read failed")
853 assert.Equal(t, int64(0), data["warning"], "Should have 0 warnings since read failed")
854 default:
855
856 }
857 })
858 }
859