波波技术栈
article

Go 语言实现 SSE (Server-Sent Events) 完整教程

本文详细介绍了如何使用 Go 语言构建基于 SSE(Server-Sent Events)协议的服务端推送系统。教程涵盖项目结构、后端实现(包括客户端管理、消息广播、连接处理)以及前端测试页面。通过 Go 标准库 net/http 实现高性能、支持多客户端并发连接的实时数据推送服务。

SSE(Server-Sent Events)是一种允许服务器向浏览器推送更新的技术。与 WebSocket 不同,SSE 是单向的(仅服务器到客户端),基于标准的 HTTP 协议,实现简单且原生支持自动重连。

本教程将使用 Go 语言构建一个高性能、支持多客户端广播的 SSE 服务,并提供一个现代化的前端测试页面。

1. 项目结构

我们将创建一个简单的 Go 模块,包含后端服务和前端页面。

text
go-sse-demo/
├── main.go          # Go 后端主程序
├── go.mod           # Go 模块依赖文件
└── index.html       # 前端测试页面

2. 后端实现 (Go)

Go 标准库 net/http 对 SSE 的支持非常友好。我们需要做的是:

  1. 设置正确的响应头 (Content-Type: text/event-stream)。
  2. 保持连接打开。
  3. 按照 SSE 格式 (data: ...\n\n) 写入数据并刷新缓冲区。

package main

import (
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"
)

// Client 代表一个连接的客户端
type Client struct {
	ID     string
	Events chan string
}

// Server 管理所有客户端连接
type Server struct {
	clients    map[string]*Client
	mu         sync.RWMutex
	register   chan *Client
	unregister chan *Client
	broadcast  chan string
}

// NewServer 创建一个新的SSE服务器实例
func NewServer() *Server {
	return &Server{
		clients:    make(map[string]*Client),
		register:   make(chan *Client),
		unregister: make(chan *Client),
		broadcast:  make(chan string, 256),
	}
}

// Run 启动服务器后台协程,处理连接管理和消息广播
func (s *Server) Run() {
	for {
		select {
		case client := <-s.register:
			s.mu.Lock()
			s.clients[client.ID] = client
			s.mu.Unlock()
			log.Printf("Client connected: %s", client.ID)

		case client := <-s.unregister:
			s.mu.Lock()
			if _, ok := s.clients[client.ID]; ok {
				delete(s.clients, client.ID)
				close(client.Events)
			}
			s.mu.Unlock()
			log.Printf("Client disconnected: %s", client.ID)

		case message := <-s.broadcast:
			s.mu.RLock()
			for _, client := range s.clients {
				// 非阻塞发送,防止慢客户端阻塞广播
				select {
				case client.Events <- message:
				default:
					log.Printf("Dropped message for client %s", client.ID)
				}
			}
			s.mu.RUnlock()
		}
	}
}

// SSEHandler 处理SSE连接的HTTP Handler
func (s *Server) SSEHandler(w http.ResponseWriter, r *http.Request) {
	// 1. 设置SSE必要的响应头
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("Access-Control-Allow-Origin", "*")

	// 2. 获取Flusher接口,用于强制刷新缓冲区
	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
		return
	}

	// 3. 注册新客户端
	client := &Client{
		ID:     fmt.Sprintf("%d", time.Now().UnixNano()),
		Events: make(chan string, 10),
	}
	s.register <- client

	// 4. 确保断开连接时清理资源
	defer func() {
		s.unregister <- client
	}()

	// 5. 通知客户端连接已建立
	fmt.Fprintf(w, ": connected\n\n")
	flusher.Flush()

	// 6. 监听客户端事件通道和请求上下文
	ctx := r.Context()
	for {
		select {
		case msg, ok := <-client.Events:
			if !ok {
				// 通道关闭,退出
				return
			}
			// 格式化SSE数据: data: <message>\n\n
			fmt.Fprintf(w, "data: %s\n\n", msg)
			flusher.Flush()

		case <-ctx.Done():
			// 客户端断开连接
			return
		}
	}
}

// BroadcastHandler 模拟触发广播消息的接口
func (s *Server) BroadcastHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	message := fmt.Sprintf("Message from server at %s", time.Now().Format(time.RFC3339))
	s.broadcast <- message

	w.WriteHeader(http.StatusOK)
	w.Write([]byte("Event broadcasted"))
}

func main() {
	server := NewServer()

	// 启动后台管理协程
	go server.Run()

	// 模拟定期自动发送消息
	go func() {
		ticker := time.NewTicker(5 * time.Second)
		defer ticker.Stop()
		counter := 0
		for range ticker.C {
			counter++
			server.broadcast <- fmt.Sprintf("Auto heartbeat #%d", counter)
		}
	}()

	// 注册路由
	http.HandleFunc("/stream", server.SSEHandler)
	http.HandleFunc("/broadcast", server.BroadcastHandler)

	// 提供前端页面
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		http.ServeFile(w, r, "index.html")
	})

	log.Println("SSE Server starting on :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

3. 前端实现 (HTML + JS)

为了提供优秀的用户体验,我们使用 TailwindCSS 构建一个现代化的界面,并通过原生 JavaScript 的 EventSource API 连接后端。


<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Go SSE 实时演示</title>
    <!-- 引入 TailwindCSS -->
    <script src="https://cdn.tailwindcss.com"></script>
    <!-- 引入 Font Awesome -->
    <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css">
    <style>
        /* 自定义滚动条样式 */
        ::-webkit-scrollbar {
            width: 8px;
        }
        ::-webkit-scrollbar-track {
            background: #f1f1f1; 
        }
        ::-webkit-scrollbar-thumb {
            background: #888; 
            border-radius: 4px;
        }
        ::-webkit-scrollbar-thumb:hover {
            background: #555; 
        }
        .fade-in {
            animation: fadeIn 0.3s ease-in-out;
        }
        @keyframes fadeIn {
            from { opacity: 0; transform: translateY(10px); }
            to { opacity: 1; transform: translateY(0); }
        }
    </style>
</head>
<body class="bg-gray-100 min-h-screen flex items-center justify-center p-4 font-sans">

    <div class="bg-white rounded-xl shadow-2xl w-full max-w-2xl overflow-hidden">
        <!-- 头部 -->
        <div class="bg-gradient-to-r from-blue-600 to-indigo-700 p-6 text-white flex justify-between items-center">
            <div>
                <h1 class="text-2xl font-bold"><i class="fas fa-server mr-2"></i>Go SSE 实时推送</h1>
                <p class="text-blue-100 text-sm mt-1">基于 Go net/http 的高性能单向通信示例</p>
            </div>
            <div id="status-badge" class="px-3 py-1 rounded-full text-xs font-semibold bg-red-500 text-white flex items-center">
                <span class="w-2 h-2 bg-white rounded-full mr-2 animate-pulse"></span>
                未连接
            </div>
        </div>

        <!-- 消息区域 -->
        <div class="p-6">
            <div class="flex justify-between items-center mb-4">
                <h2 class="text-lg font-semibold text-gray-700">实时消息流</h2>
                <button onclick="clearMessages()" class="text-sm text-gray-500 hover:text-red-500 transition-colors">
                    <i class="fas fa-trash-alt mr-1"></i> 清空
                </button>
            </div>
        
            <div id="message-container" class="h-64 overflow-y-auto border border-gray-200 rounded-lg p-4 bg-gray-50 space-y-3">
                <div class="text-center text-gray-400 mt-20 italic" id="empty-state">
                    等待连接...
                </div>
            </div>

            <!-- 控制按钮 -->
            <div class="mt-6 flex gap-4">
                <button id="btn-connect" onclick="connectSSE()" class="flex-1 bg-blue-600 hover:bg-blue-700 text-white font-bold py-3 px-4 rounded-lg transition duration-300 shadow-md flex justify-center items-center">
                    <i class="fas fa-plug mr-2"></i> 连接服务
                </button>
                <button id="btn-disconnect" onclick="disconnectSSE()" disabled class="flex-1 bg-gray-300 text-gray-500 font-bold py-3 px-4 rounded-lg transition duration-300 cursor-not-allowed flex justify-center items-center">
                    <i class="fas fa-times-circle mr-2"></i> 断开连接
                </button>
                <button onclick="triggerBroadcast()" class="flex-1 bg-indigo-600 hover:bg-indigo-700 text-white font-bold py-3 px-4 rounded-lg transition duration-300 shadow-md flex justify-center items-center">
                    <i class="fas fa-bullhorn mr-2"></i> 手动广播
                </button>
            </div>
        </div>

        <!-- 页脚 -->
        <div class="bg-gray-50 p-4 text-center text-xs text-gray-500 border-t">
            Powered by Go & Server-Sent Events
        </div>
    </div>

    <script>
        let eventSource = null;
        const container = document.getElementById('message-container');
        const statusBadge = document.getElementById('status-badge');
        const btnConnect = document.getElementById('btn-connect');
        const btnDisconnect = document.getElementById('btn-disconnect');
        const emptyState = document.getElementById('empty-state');

        function updateStatus(connected) {
            if (connected) {
                statusBadge.className = "px-3 py-1 rounded-full text-xs font-semibold bg-green-500 text-white flex items-center";
                statusBadge.innerHTML = '<span class="w-2 h-2 bg-white rounded-full mr-2"></span> 已连接';
                btnConnect.disabled = true;
                btnConnect.classList.add('opacity-50', 'cursor-not-allowed');
                btnDisconnect.disabled = false;
                btnDisconnect.classList.remove('bg-gray-300', 'text-gray-500', 'cursor-not-allowed');
                btnDisconnect.classList.add('bg-red-500', 'hover:bg-red-600', 'text-white', 'shadow-md');
                if(emptyState) emptyState.style.display = 'none';
            } else {
                statusBadge.className = "px-3 py-1 rounded-full text-xs font-semibold bg-red-500 text-white flex items-center";
                statusBadge.innerHTML = '<span class="w-2 h-2 bg-white rounded-full mr-2 animate-pulse"></span> 未连接';
                btnConnect.disabled = false;
                btnConnect.classList.remove('opacity-50', 'cursor-not-allowed');
                btnDisconnect.disabled = true;
                btnDisconnect.classList.add('bg-gray-300', 'text-gray-500', 'cursor-not-allowed');
                btnDisconnect.classList.remove('bg-red-500', 'hover:bg-red-600', 'text-white', 'shadow-md');
            }
        }

        function addMessage(text, type = 'info') {
            const div = document.createElement('div');
            div.className = "fade-in p-3 rounded-lg text-sm shadow-sm border-l-4 " + 
                (type === 'error' ? "bg-red-50 border-red-500 text-red-700" : 
                 type === 'system' ? "bg-gray-100 border-gray-400 text-gray-600 italic" :
                 "bg-white border-blue-500 text-gray-800");
        
            const time = new Date().toLocaleTimeString();
            div.innerHTML = `<span class="text-xs opacity-50 mr-2">[${time}]</span> ${text}`;
        
            container.appendChild(div);
            container.scrollTop = container.scrollHeight;
        }

        function connectSSE() {
            if (eventSource) return;

            addMessage("正在建立连接...", "system");
        
            try {
                eventSource = new EventSource('/stream');

                eventSource.onopen = () => {
                    updateStatus(true);
                    addMessage("连接成功!等待服务器推送...", "system");
                };

                eventSource.onmessage = (e) => {
                    addMessage(e.data);
                };

                eventSource.onerror = (err) => {
                    console.error("SSE Error:", err);
                    // EventSource 会自动重连,除非调用 close()
                    if (eventSource.readyState === EventSource.CLOSED) {
                        updateStatus(false);
                        addMessage("连接已关闭", "error");
                    } else {
                        addMessage("连接错误,尝试重连...", "error");
                    }
                };
            } catch (e) {
                addMessage("创建连接失败: " + e.message, "error");
            }
        }

        function disconnectSSE() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
                updateStatus(false);
                addMessage("用户主动断开连接", "system");
            }
        }

        function triggerBroadcast() {
            fetch('/broadcast', { method: 'POST' })
                .then(res => {
                    if(res.ok) addMessage("广播请求已发送", "system");
                    else addMessage("广播请求失败", "error");
                })
                .catch(err => addMessage("网络错误: " + err, "error"));
        }

        function clearMessages() {
            container.innerHTML = '<div class="text-center text-gray-400 mt-20 italic" id="empty-state">等待连接...</div>';
        }
    </script>
</body>
</html>

4. 代码功能与特点说明

后端 (Go)

  • 并发安全‌:使用 sync.RWMutex 保护客户端映射表,确保在高并发连接下的数据安全。
  • Channel 通信‌:利用 Go 的 Channel 机制实现客户端注册、注销和消息广播的解耦,符合 "Share memory by communicating" 的最佳实践。
  • 非阻塞广播‌:在向客户端发送消息时使用 select 默认分支,防止某个慢客户端阻塞整个广播循环,提升系统稳定性。
  • 标准 SSE 格式‌:严格遵循 data: ...\n\n 格式,并设置 Content-Type: text/event-stream 等关键 Header。
  • 资源清理‌:通过 deferr.Context().Done() 确保客户端断开时及时清理内存,防止泄漏。

前端 (HTML/JS)

  • 现代化 UI‌:使用 TailwindCSS 构建响应式界面,包含渐变色彩、阴影和圆角设计,视觉体验优雅。
  • 实时状态反馈‌:顶部状态栏实时显示连接状态(已连接/未连接),并有相应的颜色变化。
  • 交互优化‌:消息列表自动滚动到底部,新消息带有淡入动画,提供清空消息和手动触发广播的功能。
  • 原生支持‌:直接使用浏览器原生的 EventSource API,无需引入额外的重型库,轻量且高效。

5. 如何运行

  1. 确保你已安装 Go 1.21 或更高版本。
  2. 将上述代码保存为对应的文件 (main.go, go.mod, index.html)。
  3. 在终端进入项目目录,运行:
    bash
    go run main.go
    
  4. 打开浏览器访问 http://localhost:8080
  5. 点击“连接服务”按钮,你将看到每 5 秒自动收到的心跳消息,也可以点击“手动广播”立即触发一条消息。