...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/logparser_remote.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"path/filepath"
     5  	"strings"
     6  	"time"
     7  
     8  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
     9  )
    10  
    11  func (lp *LogParser) parseLogsRemote() error {
    12  	var latestLogFile string
    13  	var linesRead int // to skip over already parsed lines on Postgres server restart for example
    14  	var firstRun = true
    15  	var currInterval time.Duration
    16  	var size uint64
    17  	var offset uint64
    18  	var modification time.Time
    19  	var chunk string
    20  	var lines []string
    21  	var numOfLines int
    22  
    23  	logger := log.GetLogger(lp.ctx)
    24  
    25  	for { // detect current log file. read new chunks. re-start in case of errors
    26  		select {
    27  		case <-lp.ctx.Done():
    28  			return nil
    29  		case <-time.After(currInterval):
    30  			if currInterval == 0 {
    31  				currInterval = time.Second * time.Duration(lp.Interval)
    32  			}
    33  		}
    34  
    35  		if latestLogFile == "" || firstRun {
    36  			sql := "select name, size, modification from pg_ls_logdir() where name like '%csv' order by modification desc limit 1;"
    37  			err := lp.SourceConn.Conn.QueryRow(lp.ctx, sql).Scan(&latestLogFile, &size, &modification)
    38  			if err != nil {
    39  				logger.Infof("No logfiles found in log dir: '%s'", lp.LogFolder)
    40  				continue
    41  			}
    42  			offset = size // Seek to an end
    43  			firstRun = false
    44  			logger.Infof("Starting to parse logfile: '%s'", latestLogFile)
    45  		}
    46  
    47  		if linesRead == numOfLines && size != offset {
    48  			logFilePath := filepath.Join(lp.LogFolder, latestLogFile)
    49  			sizeToRead := min(maxChunkSize, size-offset)
    50  			err := lp.SourceConn.Conn.QueryRow(lp.ctx, "select pg_read_file($1, $2, $3)", logFilePath, offset, sizeToRead).Scan(&chunk)
    51  			offset += sizeToRead
    52  			if err != nil {
    53  				logger.Warningf("Failed to read logfile '%s': %s", latestLogFile, err)
    54  				continue
    55  			}
    56  			lines = strings.Split(chunk, "\n")
    57  			if sizeToRead == maxChunkSize {
    58  				// last line may be incomplete, re-read it next time
    59  				offset -= uint64(len(lines[len(lines)-1]))
    60  			}
    61  			numOfLines = len(lines)
    62  			linesRead = 0
    63  			logger.WithField("lines", len(lines)).Info("logs fetched")
    64  		}
    65  
    66  		for {
    67  			if linesRead == numOfLines {
    68  				select {
    69  				case <-lp.ctx.Done():
    70  					return nil
    71  				case <-time.After(currInterval):
    72  				}
    73  
    74  				var latestSize uint64
    75  				err := lp.SourceConn.Conn.QueryRow(lp.ctx, "select size, modification from pg_ls_logdir() where name = $1;", latestLogFile).Scan(&latestSize, &modification)
    76  				if err != nil {
    77  					logger.Warnf("Failed to read state info of logfile: '%s'", latestLogFile)
    78  				}
    79  
    80  				var fileName string
    81  				if size == latestSize && offset == size || err != nil {
    82  					sql := "select name, size from pg_ls_logdir() where modification > $1 and name like '%csv' order by modification, name limit 1;"
    83  					err := lp.SourceConn.Conn.QueryRow(lp.ctx, sql, modification).Scan(&fileName, &latestSize)
    84  					if err == nil && latestLogFile != fileName {
    85  						if lp.LogTruncOnRotation == "off" {
    86  							lp.fileOffsets[latestLogFile] = size
    87  							if len(lp.fileOffsets) > maxTrackedFiles {
    88  								clear(lp.fileOffsets) // To avoid unbounded growth
    89  							}
    90  						}
    91  						latestLogFile = fileName
    92  						size = latestSize
    93  						if lastOffset, ok := lp.fileOffsets[latestLogFile]; ok && lp.LogTruncOnRotation == "off" {
    94  							offset = lastOffset
    95  						} else {
    96  							offset = 0
    97  						}
    98  						logger.Infof("Switching to new logfile: '%s'", fileName)
    99  						currInterval = 0 // We already slept. It will be resetted.
   100  						break
   101  					}
   102  				} else {
   103  					size = latestSize
   104  					currInterval = 0 // We already slept. It will be resetted.
   105  					break
   106  				}
   107  			}
   108  
   109  			if linesRead < numOfLines {
   110  				line := lines[linesRead]
   111  				linesRead++
   112  
   113  				matches := lp.LogsMatchRegex.FindStringSubmatch(line)
   114  				if len(matches) != 0 {
   115  					result := lp.regexMatchesToMap(matches)
   116  					errorSeverity := result["error_severity"]
   117  					if lp.ServerMessagesLang != "en" {
   118  						errorSeverity = severityToEnglish(lp.ServerMessagesLang, errorSeverity)
   119  					}
   120  
   121  					databaseName := result["database_name"]
   122  					if lp.SourceConn.RealDbname == databaseName {
   123  						lp.eventCounts[errorSeverity]++
   124  					}
   125  					lp.eventCountsTotal[errorSeverity]++
   126  				}
   127  			}
   128  
   129  			if lp.HasSendIntervalElapsed() {
   130  				select {
   131  				case <-lp.ctx.Done():
   132  					return nil
   133  				case lp.StoreCh <- lp.GetMeasurementEnvelope():
   134  					zeroEventCounts(lp.eventCounts)
   135  					zeroEventCounts(lp.eventCountsTotal)
   136  					lp.lastSendTime = time.Now()
   137  				}
   138  			}
   139  
   140  		} // line read loop
   141  	} // chunk read loop
   142  
   143  }
   144