Go进阶55:Go语言SSE server handler
1. Server Sent Event Handler
stream
是SSE handler Wrapper
import (
"bytes"
"github.com/gin-contrib/sse"
"github.com/gin-gonic/gin"
"io"
"log/slog"
"time"
)
type serviceHandler func(c *gin.Context, eventsChan chan<- sse.Event) error
// stream SSE 流式输出的帮助函数
func stream(serviceFn serviceHandler) gin.HandlerFunc {
return func(c *gin.Context) {
defer c.Request.Body.Close()
bs, _ := io.ReadAll(c.Request.Body)
c.Request.Body = io.NopCloser(bytes.NewBuffer(bs))
c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
eventsChan := make(chan sse.Event)
stopSigChan := make(chan struct{}, 1)
go doSsePing(eventsChan, stopSigChan)
go doServiceFn(c, serviceFn, eventsChan, stopSigChan)
c.Stream(func(w io.Writer) bool {
if ev, ok := <-eventsChan; ok {
c.Render(-1, ev)
return true
}
return false
})
}
}
func doServiceFn(c *gin.Context, serviceFn serviceHandler, eventsChan chan<- sse.Event, stopSigChan chan<- struct{}) {
defer func() {
if r := recover(); r != nil {
slog.ErrorContext(c, "serviceFn", "panic", r)
}
stopSigChan <- struct{}{} // notify ping goroutine( doSsePing ) to stop
close(eventsChan) // notify sse goroutine(c.Stream) to stop
}()
err := serviceFn(c, eventsChan) //这里是关键,将业务逻辑的处理函数传入
if err != nil {
msg := err.Error()
slog.ErrorContext(c, "serviceFn", "err", msg)
eventsChan <- sse.Event{Event: "err", Data: msg}
}
}
func doSsePing(msgChan chan<- sse.Event, stopPingChan <-chan struct{}) {
tk := time.NewTicker(time.Second * 15)
defer tk.Stop()
msgChan <- sse.Event{Event: "ping", Data: ""}
for {
select {
case <-tk.C:
msgChan <- sse.Event{Event: "ping", Data: ""}
case <-stopPingChan:
return
}
}
}
2. Demo Sse Handler
import (
"github.com/gin-contrib/sse"
"github.com/gin-gonic/gin"
"time"
)
var demo serviceHandler = func(c *gin.Context, eventsChan chan<- sse.Event) error {
time.Sleep(5 * time.Second)
for i := 0; i < 50; i++ {
eventsChan <- sse.Event{Event: "message", Data: "hello 世界"}
eventsChan <- sse.Event{Event: "message", Data: map[string]any{"a": "w", "i": 0.1, "b": true, "zh": "是的"}}
time.Sleep(time.Second)
}
return nil
}
3. Usage
func LoadRoutes() *gin.Engine {
root := gin.Default()
root.GET("demo", stream(demo))
return root
}