...
1 package log
2
3 import (
4 "context"
5 "slices"
6 "sync"
7 "time"
8
9 "github.com/sirupsen/logrus"
10 )
11
12
13 type MessageType string
14
15
16 type MessageChanType chan MessageType
17
18
19 type BrokerHook struct {
20 highLoadTimeout time.Duration
21 subscribers []MessageChanType
22 input chan MessageType
23 ctx context.Context
24 level string
25 mu *sync.Mutex
26 formatter logrus.Formatter
27 }
28
29 const cacheLimit = 512
30 const highLoadLimit = 200 * time.Millisecond
31
32
33 func NewBrokerHook(ctx context.Context, level string) *BrokerHook {
34 l := &BrokerHook{
35 highLoadTimeout: highLoadLimit,
36 input: make(chan MessageType, cacheLimit),
37 ctx: ctx,
38 level: level,
39 mu: new(sync.Mutex),
40 formatter: defaultFormatter,
41 }
42 go l.poll(l.input)
43 return l
44 }
45
46
47 func (hook *BrokerHook) AddSubscriber(msgCh MessageChanType) {
48 hook.mu.Lock()
49 defer hook.mu.Unlock()
50 hook.subscribers = append(hook.subscribers, msgCh)
51 }
52
53
54 func (hook *BrokerHook) RemoveSubscriber(msgCh MessageChanType) {
55 hook.mu.Lock()
56 defer hook.mu.Unlock()
57 hook.subscribers = slices.DeleteFunc(hook.subscribers, func(E MessageChanType) bool {
58 return E == msgCh
59 })
60 }
61
62 var defaultFormatter = &logrus.TextFormatter{DisableColors: true}
63
64
65 func (hook *BrokerHook) SetBrokerFormatter(formatter logrus.Formatter) {
66 hook.mu.Lock()
67 defer hook.mu.Unlock()
68 if formatter == nil {
69 hook.formatter = defaultFormatter
70 } else {
71 hook.formatter = formatter
72 }
73 }
74
75
76 func (hook *BrokerHook) Fire(entry *logrus.Entry) error {
77 if hook.ctx.Err() != nil {
78 return nil
79 }
80 hook.mu.Lock()
81 f := hook.formatter
82 hook.mu.Unlock()
83 raw, err := f.Format(entry)
84 if err != nil {
85 return err
86 }
87 select {
88 case hook.input <- MessageType(raw):
89
90 case <-time.After(hook.highLoadTimeout):
91
92 }
93 return nil
94 }
95
96
97 func (hook *BrokerHook) Levels() []logrus.Level {
98 switch hook.level {
99 case "none":
100 return []logrus.Level{}
101 case "debug":
102 return logrus.AllLevels
103 case "info":
104 return []logrus.Level{
105 logrus.PanicLevel,
106 logrus.FatalLevel,
107 logrus.ErrorLevel,
108 logrus.WarnLevel,
109 logrus.InfoLevel,
110 }
111 default:
112 return []logrus.Level{
113 logrus.PanicLevel,
114 logrus.FatalLevel,
115 logrus.ErrorLevel,
116 }
117 }
118 }
119
120
121
122 func (hook *BrokerHook) poll(input <-chan MessageType) {
123 for {
124 select {
125 case <-hook.ctx.Done():
126 return
127 case msg := <-input:
128 hook.send(msg)
129 }
130 }
131 }
132
133
134 func (hook *BrokerHook) send(msg MessageType) {
135 hook.mu.Lock()
136 defer hook.mu.Unlock()
137 if len(hook.subscribers) == 0 {
138 return
139 }
140 for _, subscriber := range hook.subscribers {
141 select {
142 case subscriber <- msg:
143 default:
144
145 }
146 }
147 }
148