Go进阶55:Go语言SSE server handler

Go进阶55:Go语言SSE server handler

1. Server Sent Event Handler

stream 是SSE handler Wrapper


import (
	"bytes"
	"github.com/gin-contrib/sse"
	"github.com/gin-gonic/gin"
	"io"
	"log/slog"
	"time"
)


type serviceHandler func(c *gin.Context, eventsChan chan<- sse.Event) error

// stream SSE 流式输出的帮助函数
func stream(serviceFn serviceHandler) gin.HandlerFunc {
	return func(c *gin.Context) {
		defer c.Request.Body.Close()
		bs, _ := io.ReadAll(c.Request.Body)
		c.Request.Body = io.NopCloser(bytes.NewBuffer(bs))

		c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
		c.Writer.Header().Set("Cache-Control", "no-cache")
		c.Writer.Header().Set("Connection", "keep-alive")
		c.Writer.Header().Set("Transfer-Encoding", "chunked")

		eventsChan := make(chan sse.Event)
		stopSigChan := make(chan struct{}, 1)

		go doSsePing(eventsChan, stopSigChan)
		go doServiceFn(c, serviceFn, eventsChan, stopSigChan)

		c.Stream(func(w io.Writer) bool {
			if ev, ok := <-eventsChan; ok {
				c.Render(-1, ev)
				return true
			}
			return false
		})
	}
}

func doServiceFn(c *gin.Context, serviceFn serviceHandler, eventsChan chan<- sse.Event, stopSigChan chan<- struct{}) {
	defer func() {
		if r := recover(); r != nil {
			slog.ErrorContext(c, "serviceFn", "panic", r)
		}
		stopSigChan <- struct{}{} // notify ping goroutine( doSsePing ) to stop
		close(eventsChan)         // notify sse goroutine(c.Stream) to stop
	}()
	err := serviceFn(c, eventsChan) //这里是关键,将业务逻辑的处理函数传入
	if err != nil {
		msg := err.Error()
		slog.ErrorContext(c, "serviceFn", "err", msg)
		eventsChan <- sse.Event{Event: "err", Data: msg}
	}
}

func doSsePing(msgChan chan<- sse.Event, stopPingChan <-chan struct{}) {
	tk := time.NewTicker(time.Second * 15)
	defer tk.Stop()
	msgChan <- sse.Event{Event: "ping", Data: ""}
	for {
		select {
		case <-tk.C:
			msgChan <- sse.Event{Event: "ping", Data: ""}
		case <-stopPingChan:
			return
		}
	}
}

2. Demo Sse Handler


import (
	"github.com/gin-contrib/sse"
	"github.com/gin-gonic/gin"
	"time"
)

var demo serviceHandler = func(c *gin.Context, eventsChan chan<- sse.Event) error {
	time.Sleep(5 * time.Second)
	for i := 0; i < 50; i++ {
		eventsChan <- sse.Event{Event: "message", Data: "hello 世界"}
		eventsChan <- sse.Event{Event: "message", Data: map[string]any{"a": "w", "i": 0.1, "b": true, "zh": "是的"}}
		time.Sleep(time.Second)
	}
	return nil
}

3. Usage

func LoadRoutes() *gin.Engine {
	root := gin.Default()
	root.GET("demo", stream(demo))
	return root
}
目录