...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/reaper/logparser_local.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/reaper

     1  package reaper
     2  
     3  import (
     4  	"bufio"
     5  	"io"
     6  	"os"
     7  	"path/filepath"
     8  	"time"
     9  
    10  	"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
    11  )
    12  
    13  func (lp *LogParser) parseLogsLocal() error {
    14  	var latest, previous string
    15  	var latestHandle *os.File
    16  	var reader *bufio.Reader
    17  	var linesRead int // to skip over already parsed lines on Postgres server restart for example
    18  	var err error
    19  	var firstRun = true
    20  	var currInterval time.Duration
    21  
    22  	logger := log.GetLogger(lp.ctx)
    23  	logsGlobPath := filepath.Join(lp.LogFolder, csvLogDefaultGlobSuffix)
    24  
    25  	for { // re-try loop. re-start in case of FS errors or just to refresh host config
    26  		select {
    27  		case <-lp.ctx.Done():
    28  			return nil
    29  		case <-time.After(currInterval):
    30  			if currInterval == 0 {
    31  				currInterval = time.Second * time.Duration(lp.Interval)
    32  			}
    33  		}
    34  
    35  		if latest == "" || firstRun {
    36  			globMatches, err := filepath.Glob(logsGlobPath)
    37  			if err != nil || len(globMatches) == 0 {
    38  				logger.Infof("No logfiles found to parse from glob '%s'", logsGlobPath)
    39  				continue
    40  			}
    41  
    42  			logger.Debugf("Found %v logfiles from glob pattern, picking the latest", len(globMatches))
    43  			if len(globMatches) > 1 {
    44  				// find latest timestamp
    45  				latest, _ = getFileWithLatestTimestamp(globMatches)
    46  				if latest == "" {
    47  					logger.Warningf("Could not determine the latest logfile")
    48  					continue
    49  				}
    50  
    51  			} else if len(globMatches) == 1 {
    52  				latest = globMatches[0]
    53  			}
    54  			logger.Infof("Starting to parse logfile: %s ", latest)
    55  		}
    56  
    57  		if latestHandle == nil {
    58  			latestHandle, err = os.Open(latest)
    59  			if err != nil {
    60  				logger.Warningf("Failed to open logfile %s: %s", latest, err)
    61  				continue
    62  			}
    63  			defer latestHandle.Close()
    64  			reader = bufio.NewReader(latestHandle)
    65  
    66  			linesOffset, ok := lp.fileOffsets[latest]
    67  			if ok && lp.LogTruncOnRotation == "off" {
    68  				linesRead = int(linesOffset)
    69  			}
    70  			if (ok || previous == latest) && linesRead > 0 { // skip already read lines
    71  				i := 1
    72  				for i <= linesRead {
    73  					_, err = reader.ReadString('\n')
    74  					if err == io.EOF && i < linesRead {
    75  						logger.Warningf("Failed to open logfile %s: %s", latest, err)
    76  						linesRead = 0
    77  						break
    78  					} else if err != nil {
    79  						logger.Warningf("Failed to skip %d logfile lines for %s, there might be duplicates reported. Error: %s", linesRead, latest, err)
    80  						linesRead = i
    81  						break
    82  					}
    83  					i++
    84  				}
    85  				logger.Debugf("Skipped %d already processed lines from %s", linesRead, latest)
    86  			} else if firstRun { // seek to end
    87  				_, _ = latestHandle.Seek(0, 2)
    88  				firstRun = false
    89  			}
    90  		}
    91  
    92  		for {
    93  			line, err := reader.ReadString('\n')
    94  			if err != nil && err != io.EOF {
    95  				logger.Warningf("Failed to read logfile %s: %s", latest, err)
    96  				_ = latestHandle.Close()
    97  				latestHandle = nil
    98  				break
    99  			}
   100  
   101  			if err == io.EOF {
   102  				// EOF reached, wait for new files to be added
   103  				select {
   104  				case <-lp.ctx.Done():
   105  					return nil
   106  				case <-time.After(currInterval):
   107  				}
   108  				// check for newly opened logfiles
   109  				file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
   110  				if file != "" && file != latest {
   111  					if lp.LogTruncOnRotation == "off" {
   112  						lp.fileOffsets[latest] = uint64(linesRead)
   113  						if len(lp.fileOffsets) > maxTrackedFiles {
   114  							clear(lp.fileOffsets) // To avoid unbounded growth
   115  						}
   116  					}
   117  					previous = latest
   118  					latest = file
   119  					_ = latestHandle.Close()
   120  					latestHandle = nil
   121  					logger.Infof("Switching to new logfile: %s", file)
   122  					linesRead = 0
   123  					break
   124  				}
   125  			} else {
   126  				linesRead++
   127  			}
   128  
   129  			if err == nil && line != "" {
   130  				matches := lp.LogsMatchRegex.FindStringSubmatch(line)
   131  				if len(matches) == 0 {
   132  					goto send_to_storage_if_needed
   133  				}
   134  				result := lp.regexMatchesToMap(matches)
   135  				errorSeverity, ok := result["error_severity"]
   136  				if !ok {
   137  					logger.Error("error_severity group must be defined in parse regex:", lp.LogsMatchRegex)
   138  					time.Sleep(time.Minute)
   139  					break
   140  				}
   141  				if lp.ServerMessagesLang != "en" {
   142  					errorSeverity = severityToEnglish(lp.ServerMessagesLang, errorSeverity)
   143  				}
   144  
   145  				databaseName, ok := result["database_name"]
   146  				if !ok {
   147  					logger.Error("database_name group must be defined in parse regex:", lp.LogsMatchRegex)
   148  					time.Sleep(time.Minute)
   149  					break
   150  				}
   151  				if lp.SourceConn.RealDbname == databaseName {
   152  					lp.eventCounts[errorSeverity]++
   153  				}
   154  				lp.eventCountsTotal[errorSeverity]++
   155  			}
   156  
   157  		send_to_storage_if_needed:
   158  			if lp.HasSendIntervalElapsed() {
   159  				logger.Debugf("Sending log event counts for last interval to storage channel. Local eventcounts: %+v, global eventcounts: %+v", lp.eventCounts, lp.eventCountsTotal)
   160  				select {
   161  				case <-lp.ctx.Done():
   162  					return nil
   163  				case lp.StoreCh <- lp.GetMeasurementEnvelope():
   164  					zeroEventCounts(lp.eventCounts)
   165  					zeroEventCounts(lp.eventCountsTotal)
   166  					lp.lastSendTime = time.Now()
   167  				}
   168  			}
   169  
   170  		} // file read loop
   171  	} // config loop
   172  
   173  }
   174  
   175  func getFileWithLatestTimestamp(files []string) (string, error) {
   176  	var maxDate time.Time
   177  	var latest string
   178  
   179  	for _, f := range files {
   180  		fi, err := os.Stat(f)
   181  		if err != nil {
   182  			return "", err
   183  		}
   184  		if fi.ModTime().After(maxDate) {
   185  			latest = f
   186  			maxDate = fi.ModTime()
   187  		}
   188  	}
   189  	return latest, nil
   190  }
   191  
   192  func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, error) {
   193  	var nextFile string
   194  	var nextMod time.Time
   195  
   196  	files, err := filepath.Glob(logsGlobPath)
   197  	if err != nil {
   198  		return "", err
   199  	}
   200  
   201  	fiCurrent, err := os.Stat(currentFile)
   202  	if err != nil {
   203  		return "", err
   204  	}
   205  
   206  	for _, f := range files {
   207  		if f == currentFile {
   208  			continue
   209  		}
   210  		fi, err := os.Stat(f)
   211  		if err != nil {
   212  			continue
   213  		}
   214  		if (nextMod.IsZero() || fi.ModTime().Before(nextMod)) && fi.ModTime().After(fiCurrent.ModTime()) {
   215  			nextMod = fi.ModTime()
   216  			nextFile = f
   217  		}
   218  	}
   219  	return nextFile, nil
   220  }
   221