...
1 package webserver
2
3 import (
4 "net/http"
5 "time"
6
7 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
8 "github.com/gorilla/websocket"
9 )
10
11 const (
12
13 writeWait = 10 * time.Second
14
15
16 pongWait = 60 * time.Second
17
18
19 pingPeriod = (pongWait * 9) / 10
20 )
21
22 var (
23 upgrader = websocket.Upgrader{
24 ReadBufferSize: 1024,
25 WriteBufferSize: 1024,
26 }
27 )
28
29 func reader(ws *websocket.Conn, done chan struct{}) {
30 defer ws.Close()
31 ws.SetReadLimit(512)
32 err := ws.SetReadDeadline(time.Now().Add(pongWait))
33 if err != nil {
34 return
35 }
36 ws.SetPongHandler(func(string) error { return ws.SetReadDeadline(time.Now().Add(pongWait)) })
37 for {
38 _, _, err = ws.ReadMessage()
39 if err != nil {
40 close(done)
41 break
42 }
43 }
44 }
45
46 func writer(ws *websocket.Conn, l log.LoggerHooker, done <-chan struct{}) {
47 pingTicker := time.NewTicker(pingPeriod)
48 defer func() {
49 pingTicker.Stop()
50 ws.Close()
51 }()
52 msgChan := make(log.MessageChanType)
53 l.AddSubscriber(msgChan)
54 defer l.RemoveSubscriber(msgChan)
55 for {
56 select {
57 case msg := <-msgChan:
58 if ws.SetWriteDeadline(time.Now().Add(writeWait)) != nil ||
59 ws.WriteMessage(websocket.TextMessage, []byte(msg)) != nil {
60 return
61 }
62 case <-pingTicker.C:
63 if ws.SetWriteDeadline(time.Now().Add(writeWait)) != nil ||
64 ws.WriteMessage(websocket.PingMessage, []byte{}) != nil {
65 return
66 }
67 case <-done:
68 return
69 }
70 }
71 }
72
73 func (Server *WebUIServer) serveWsLog(w http.ResponseWriter, r *http.Request) {
74 ws, err := upgrader.Upgrade(w, r, nil)
75 if err != nil {
76 Server.Error(err)
77 http.Error(w, err.Error(), http.StatusBadRequest)
78 return
79 }
80 Server.WithField("reguest", r.URL.String()).Debugf("established websocket connection")
81 l, _ := Server.Logger.(log.LoggerHooker)
82 done := make(chan struct{})
83 go writer(ws, l, done)
84 go reader(ws, done)
85 }
86