心跳最典型的应用场景是是探测服务是否存活,比如在 Zookeeper 中,会使用心跳探测服务是否存货,如果服务已经死亡,会将服务从注册表中删除,避免服务请求路由到一个已经宕机的服务中。
Go 中实现心跳机制可以通过 time.NewTimeTicker()
, 配合 channel
使用,就可以实现一个简单的心跳程序:
import (
"code.byted.org/gopkg/logs"
"context"
"fmt"
"testing"
"time"
)
func sendHeartbeat(heartbeatChan chan<- time.Time) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
heartbeatChan <- t
}
}
}
func TestHeartbeat(t *testing.T) {
heartbeatChan := make(chan time.Time)
go doWork([]int{1, 2, 3, 5, 5}, heartbeatChan)
for t := range heartbeatChan {
// 上报心跳包的逻辑,可以在这里实现
fmt.Println("Received heartbeat at", t)
}
fmt.Println("heartbeat finished")
}
func doWork(nums []int, heartbeatChan chan time.Time) {
go sendHeartbeat(heartbeatChan)
defer func() {
close(heartbeatChan)
}()
for num := range nums {
time.Sleep(1 * time.Second)
fmt.Println(num)
}
}
执行结果如下:
=== RUN TestHeartbeat
0
Received heartbeat at 2024-01-31 11:30:46.3363 +0800 CST m=+1.002830252
Received heartbeat at 2024-01-31 11:30:47.335975 +0800 CST m=+2.002513164
1
Received heartbeat at 2024-01-31 11:30:48.336252 +0800 CST m=+3.002795914
2
Received heartbeat at 2024-01-31 11:30:49.33622 +0800 CST m=+4.002768883
3
Received heartbeat at 2024-01-31 11:30:50.336222 +0800 CST m=+5.002775378
4
heartbeat finished
--- PASS: TestHeartbeat (5.00s)
PASS
客户端发送心跳请求, 并通过重试机制。判断重试X次失败认为服务离线 服务端响应心跳请求,通过超时机制。超时X秒未收到心跳则判断客户端离线
package main
import (
"fmt"
"net"
"time"
)
func main() {
// 启动服务端
go startServer()
// 启动客户端
go startClient()
// 保持 main goroutine 活跃,避免程序退出
select {}
}
func startServer() {
ln, err := net.Listen("tcp", ":8080")
if err != nil {
fmt.Println(err)
return
}
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
fmt.Println(err)
continue
}
go handleConnection(conn)
}
}
func handleConnection(conn net.Conn) {
// 使用 bufio 包中的 ReadWriter 类型,方便读写字符串
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
// 启动一个 goroutine,定时发送心跳消息
go func() {
for {
time.Sleep(time.Second)
if _, err := rw.WriteString("heartbeat\n"); err != nil {
fmt.Println(err)
return
}
if err := rw.Flush(); err != nil {
fmt.Println(err)
return
}
}
}()
// 循环读取客户端发送的消息
for {
line, err := rw.ReadString('\n')
if err != nil {
fmt.Println(err)
return
}
fmt.Println("received:", line)
}
}
func startClient() {
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()
//
}