...
1 package log
2
3 import (
4 "context"
5 "slices"
6 "sync"
7 "time"
8
9 "github.com/sirupsen/logrus"
10 )
11
12
13 type MessageType string
14
15
16 type MessageChanType chan MessageType
17
18
19 type BrokerHook struct {
20 highLoadTimeout time.Duration
21 subscribers []MessageChanType
22 input chan *logrus.Entry
23 ctx context.Context
24 lastError chan error
25 level string
26 mu *sync.Mutex
27 formatter logrus.Formatter
28 }
29
30 const cacheLimit = 512
31 const highLoadLimit = 200 * time.Millisecond
32
33
34 func NewBrokerHook(ctx context.Context, level string) *BrokerHook {
35 l := &BrokerHook{
36 highLoadTimeout: highLoadLimit,
37 input: make(chan *logrus.Entry, cacheLimit),
38 lastError: make(chan error),
39 ctx: ctx,
40 level: level,
41 mu: new(sync.Mutex),
42 formatter: defaultFormatter,
43 }
44 go l.poll(l.input)
45 return l
46 }
47
48
49 func (hook *BrokerHook) AddSubscriber(msgCh MessageChanType) {
50 hook.mu.Lock()
51 defer hook.mu.Unlock()
52 hook.subscribers = append(hook.subscribers, msgCh)
53 }
54
55
56 func (hook *BrokerHook) RemoveSubscriber(msgCh MessageChanType) {
57 hook.mu.Lock()
58 defer hook.mu.Unlock()
59 hook.subscribers = slices.DeleteFunc(hook.subscribers, func(E MessageChanType) bool {
60 return E == msgCh
61 })
62 }
63
64 var defaultFormatter = &logrus.TextFormatter{DisableColors: true}
65
66
67 func (hook *BrokerHook) SetBrokerFormatter(formatter logrus.Formatter) {
68 hook.mu.Lock()
69 defer hook.mu.Unlock()
70 if formatter == nil {
71 hook.formatter = defaultFormatter
72 } else {
73 hook.formatter = formatter
74 }
75 }
76
77
78 func (hook *BrokerHook) Fire(entry *logrus.Entry) error {
79 if hook.ctx.Err() != nil {
80 return nil
81 }
82 select {
83 case hook.input <- entry:
84
85 case <-time.After(hook.highLoadTimeout):
86
87 }
88 select {
89 case err := <-hook.lastError:
90 return err
91 default:
92 return nil
93 }
94 }
95
96
97 func (hook *BrokerHook) Levels() []logrus.Level {
98 switch hook.level {
99 case "none":
100 return []logrus.Level{}
101 case "debug":
102 return logrus.AllLevels
103 case "info":
104 return []logrus.Level{
105 logrus.PanicLevel,
106 logrus.FatalLevel,
107 logrus.ErrorLevel,
108 logrus.WarnLevel,
109 logrus.InfoLevel,
110 }
111 default:
112 return []logrus.Level{
113 logrus.PanicLevel,
114 logrus.FatalLevel,
115 logrus.ErrorLevel,
116 }
117 }
118 }
119
120
121
122 func (hook *BrokerHook) poll(input <-chan *logrus.Entry) {
123 for {
124 select {
125 case <-hook.ctx.Done():
126 return
127 case entry := <-input:
128 hook.send(entry)
129 }
130 }
131 }
132
133
134 func (hook *BrokerHook) send(entry *logrus.Entry) {
135 if len(hook.subscribers) == 0 {
136 return
137 }
138 hook.mu.Lock()
139 defer hook.mu.Unlock()
140 msg, err := hook.formatter.Format(entry)
141 for _, subscriber := range hook.subscribers {
142 select {
143 case subscriber <- MessageType(msg):
144 default:
145
146 }
147 }
148 if err != nil {
149 select {
150 case hook.lastError <- err:
151
152 default:
153
154 }
155 }
156 }
157