...

Source file src/github.com/cybertec-postgresql/pgwatch/v5/internal/log/log_broker_hook.go

Documentation: github.com/cybertec-postgresql/pgwatch/v5/internal/log

     1  package log
     2  
     3  import (
     4  	"context"
     5  	"slices"
     6  	"sync"
     7  	"time"
     8  
     9  	"github.com/sirupsen/logrus"
    10  )
    11  
    12  // MessageType represents the format of the message
    13  type MessageType string
    14  
    15  // MessageChanType represents the format of the message channel
    16  type MessageChanType chan MessageType
    17  
    18  // BrokerHook is the implementation of the logrus hook for publishing logs to subscribers
    19  type BrokerHook struct {
    20  	highLoadTimeout time.Duration     // wait this amount of time before skip log entry
    21  	subscribers     []MessageChanType //
    22  	input           chan MessageType
    23  	ctx             context.Context
    24  	level           string
    25  	mu              *sync.Mutex
    26  	formatter       logrus.Formatter
    27  }
    28  
    29  const cacheLimit = 512
    30  const highLoadLimit = 200 * time.Millisecond
    31  
    32  // NewBrokerHook creates a LogHook to be added to an instance of logger
    33  func NewBrokerHook(ctx context.Context, level string) *BrokerHook {
    34  	l := &BrokerHook{
    35  		highLoadTimeout: highLoadLimit,
    36  		input:           make(chan MessageType, cacheLimit),
    37  		ctx:             ctx,
    38  		level:           level,
    39  		mu:              new(sync.Mutex),
    40  		formatter:       defaultFormatter,
    41  	}
    42  	go l.poll(l.input)
    43  	return l
    44  }
    45  
    46  // AddSubscriber adds receiving channel to the subscription
    47  func (hook *BrokerHook) AddSubscriber(msgCh MessageChanType) {
    48  	hook.mu.Lock()
    49  	defer hook.mu.Unlock()
    50  	hook.subscribers = append(hook.subscribers, msgCh)
    51  }
    52  
    53  // RemoveSubscriber deletes receiving channel from the subscription
    54  func (hook *BrokerHook) RemoveSubscriber(msgCh MessageChanType) {
    55  	hook.mu.Lock()
    56  	defer hook.mu.Unlock()
    57  	hook.subscribers = slices.DeleteFunc(hook.subscribers, func(E MessageChanType) bool {
    58  		return E == msgCh
    59  	})
    60  }
    61  
    62  var defaultFormatter = &logrus.TextFormatter{DisableColors: true}
    63  
    64  // SetBrokerFormatter sets the format that will be used by hook.
    65  func (hook *BrokerHook) SetBrokerFormatter(formatter logrus.Formatter) {
    66  	hook.mu.Lock()
    67  	defer hook.mu.Unlock()
    68  	if formatter == nil {
    69  		hook.formatter = defaultFormatter
    70  	} else {
    71  		hook.formatter = formatter
    72  	}
    73  }
    74  
    75  // Fire adds logrus log message to the internal queue for processing
    76  func (hook *BrokerHook) Fire(entry *logrus.Entry) error {
    77  	if hook.ctx.Err() != nil {
    78  		return nil
    79  	}
    80  	hook.mu.Lock()
    81  	f := hook.formatter
    82  	hook.mu.Unlock()
    83  	raw, err := f.Format(entry)
    84  	if err != nil {
    85  		return err
    86  	}
    87  	select {
    88  	case hook.input <- MessageType(raw):
    89  		// entry sent
    90  	case <-time.After(hook.highLoadTimeout):
    91  		// entry dropped due to a huge load, check stdout or file for detailed log
    92  	}
    93  	return nil
    94  }
    95  
    96  // Levels returns the available logging levels
    97  func (hook *BrokerHook) Levels() []logrus.Level {
    98  	switch hook.level {
    99  	case "none":
   100  		return []logrus.Level{}
   101  	case "debug":
   102  		return logrus.AllLevels
   103  	case "info":
   104  		return []logrus.Level{
   105  			logrus.PanicLevel,
   106  			logrus.FatalLevel,
   107  			logrus.ErrorLevel,
   108  			logrus.WarnLevel,
   109  			logrus.InfoLevel,
   110  		}
   111  	default:
   112  		return []logrus.Level{
   113  			logrus.PanicLevel,
   114  			logrus.FatalLevel,
   115  			logrus.ErrorLevel,
   116  		}
   117  	}
   118  }
   119  
   120  // poll checks for incoming messages and caches them internally
   121  // until either a maximum amount is reached, or a timeout occurs.
   122  func (hook *BrokerHook) poll(input <-chan MessageType) {
   123  	for {
   124  		select {
   125  		case <-hook.ctx.Done(): //check context with high priority
   126  			return
   127  		case msg := <-input:
   128  			hook.send(msg)
   129  		}
   130  	}
   131  }
   132  
   133  // send sends out a pre-formatted message to all subscribers
   134  func (hook *BrokerHook) send(msg MessageType) {
   135  	hook.mu.Lock()
   136  	defer hook.mu.Unlock()
   137  	if len(hook.subscribers) == 0 {
   138  		return
   139  	}
   140  	for _, subscriber := range hook.subscribers {
   141  		select {
   142  		case subscriber <- msg:
   143  		default:
   144  			//no time to wait
   145  		}
   146  	}
   147  }
   148