...

Source file src/github.com/cybertec-postgresql/pgwatch/v3/internal/log/log_broker_hook.go

Documentation: github.com/cybertec-postgresql/pgwatch/v3/internal/log

     1  package log
     2  
     3  import (
     4  	"context"
     5  	"slices"
     6  	"sync"
     7  	"time"
     8  
     9  	"github.com/sirupsen/logrus"
    10  )
    11  
    12  // MessageType represents the format of the message
    13  type MessageType string
    14  
    15  // MessageChanType represents the format of the message channel
    16  type MessageChanType chan MessageType
    17  
    18  // BrokerHook is the implementation of the logrus hook for publicating logs to subscribers
    19  type BrokerHook struct {
    20  	highLoadTimeout time.Duration     // wait this amount of time before skip log entry
    21  	subscribers     []MessageChanType //
    22  	input           chan *logrus.Entry
    23  	ctx             context.Context
    24  	lastError       chan error
    25  	level           string
    26  	mu              *sync.Mutex
    27  	formatter       logrus.Formatter
    28  }
    29  
    30  const cacheLimit = 512
    31  const highLoadLimit = 200 * time.Millisecond
    32  
    33  // NewBrokerHook creates a LogHook to be added to an instance of logger
    34  func NewBrokerHook(ctx context.Context, level string) *BrokerHook {
    35  	l := &BrokerHook{
    36  		highLoadTimeout: highLoadLimit,
    37  		input:           make(chan *logrus.Entry, cacheLimit),
    38  		lastError:       make(chan error),
    39  		ctx:             ctx,
    40  		level:           level,
    41  		mu:              new(sync.Mutex),
    42  		formatter:       defaultFormatter,
    43  	}
    44  	go l.poll(l.input)
    45  	return l
    46  }
    47  
    48  // AddSubscriber adds receiving channel to the subscription
    49  func (hook *BrokerHook) AddSubscriber(msgCh MessageChanType) {
    50  	hook.mu.Lock()
    51  	defer hook.mu.Unlock()
    52  	hook.subscribers = append(hook.subscribers, msgCh)
    53  }
    54  
    55  // RemoveSubscriber deletes receiving channel from the subscription
    56  func (hook *BrokerHook) RemoveSubscriber(msgCh MessageChanType) {
    57  	hook.mu.Lock()
    58  	defer hook.mu.Unlock()
    59  	hook.subscribers = slices.DeleteFunc(hook.subscribers, func(E MessageChanType) bool {
    60  		return E == msgCh
    61  	})
    62  }
    63  
    64  var defaultFormatter = &logrus.TextFormatter{DisableColors: true}
    65  
    66  // SetBrokerFormatter sets the format that will be used by hook.
    67  func (hook *BrokerHook) SetBrokerFormatter(formatter logrus.Formatter) {
    68  	hook.mu.Lock()
    69  	defer hook.mu.Unlock()
    70  	if formatter == nil {
    71  		hook.formatter = defaultFormatter
    72  	} else {
    73  		hook.formatter = formatter
    74  	}
    75  }
    76  
    77  // Fire adds logrus log message to the internal queue for processing
    78  func (hook *BrokerHook) Fire(entry *logrus.Entry) error {
    79  	if hook.ctx.Err() != nil {
    80  		return nil
    81  	}
    82  	select {
    83  	case hook.input <- entry:
    84  		// entry sent
    85  	case <-time.After(hook.highLoadTimeout):
    86  		// entry dropped due to a huge load, check stdout or file for detailed log
    87  	}
    88  	select {
    89  	case err := <-hook.lastError:
    90  		return err
    91  	default:
    92  		return nil
    93  	}
    94  }
    95  
    96  // Levels returns the available logging levels
    97  func (hook *BrokerHook) Levels() []logrus.Level {
    98  	switch hook.level {
    99  	case "none":
   100  		return []logrus.Level{}
   101  	case "debug":
   102  		return logrus.AllLevels
   103  	case "info":
   104  		return []logrus.Level{
   105  			logrus.PanicLevel,
   106  			logrus.FatalLevel,
   107  			logrus.ErrorLevel,
   108  			logrus.WarnLevel,
   109  			logrus.InfoLevel,
   110  		}
   111  	default:
   112  		return []logrus.Level{
   113  			logrus.PanicLevel,
   114  			logrus.FatalLevel,
   115  			logrus.ErrorLevel,
   116  		}
   117  	}
   118  }
   119  
   120  // poll checks for incoming messages and caches them internally
   121  // until either a maximum amount is reached, or a timeout occurs.
   122  func (hook *BrokerHook) poll(input <-chan *logrus.Entry) {
   123  	for {
   124  		select {
   125  		case <-hook.ctx.Done(): //check context with high priority
   126  			return
   127  		case entry := <-input:
   128  			hook.send(entry)
   129  		}
   130  	}
   131  }
   132  
   133  // send sends cached messages to the postgres server
   134  func (hook *BrokerHook) send(entry *logrus.Entry) {
   135  	if len(hook.subscribers) == 0 {
   136  		return // Nothing to do here.
   137  	}
   138  	hook.mu.Lock()
   139  	defer hook.mu.Unlock()
   140  	msg, err := hook.formatter.Format(entry)
   141  	for _, subscriber := range hook.subscribers {
   142  		select {
   143  		case subscriber <- MessageType(msg):
   144  		default:
   145  			//no time to wait
   146  		}
   147  	}
   148  	if err != nil {
   149  		select {
   150  		case hook.lastError <- err:
   151  			//error sent to the logger
   152  		default:
   153  			//there is unprocessed error already
   154  		}
   155  	}
   156  }
   157