前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >grpc-go之负载均衡(七)

grpc-go之负载均衡(七)

原创
作者头像
Johns
修改2022-10-12 17:16:23
9390
修改2022-10-12 17:16:23
举报
文章被收录于专栏:代码工具

介绍

gRPC 中的负载平衡基于每个调用而不是每个连接发生。即使所有请求都来自单个客户端,我们仍然希望它们在所有服务器之间进行负载平衡。

gRPC 负载均衡包括客户端负载均衡和服务端负载均衡两种方向, gRPC 的客户端负载均衡的流程如下

  • 域名解析 启动时,gRPC 客户端会针对服务器名称发出名称解析请求。该名称将解析为 IP 地址列表、指示使用哪个客户端负载平衡策略 关于gRPC-go域名解析的内容可以参考《grpc-go之NameResolver(六)
  • 实例化负载平衡策略 客户端实例化负载平衡策略, 负载平衡策略为服务器的 IP 地址创建一组子通道。并监视子通道的连接状态并决定每个子通道何时应尝试连接。对于每个发送的 RPC,负载平衡策略决定 RPC 应该发送到哪个子通道。

负载平衡策略

gRPC-Go 中内置了pick_first和round_robin两种算法。

  • pick_first 尝试连接到第一个地址,如果连接成功,则将其用于所有RPC,如果连接失败,则尝试下一个地址(并继续这样做,直到一个连接成功)。
  • round_robin 连接到所有地址,并依次向每个后端发送一个RPC。例如,第一个RPC将发送到backend-1,第二个RPC将发送到backend-2,第三个RPC将再次发送到backend-1。

案例说明

name_reslover/grpc_reslover.go

代码语言:go
复制
package grpc_resolver

import (
	"context"
	"fmt"
	"google.golang.org/grpc/resolver"
	"sync"
	"time"
)

const (
	ExampleScheme      = "ns"
	ExampleServiceName = "resolver.example.grpc.io"
	backendAddr1       = "localhost:50051"
	backendAddr2       = "localhost:50052"
	minNSResRate       = 10 * time.Second
)

type exampleResolverBuilder struct{}

func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	r := &exampleResolver{
		target: target,
		cc:     cc,
		addrsStore: map[string][]string{
			ExampleServiceName: { backendAddr2},
		},
		rn:     make(chan struct{}, 1),
		ctx:    ctx,
		cancel: cancel,
	}
	r.wg.Add(1)
	// 开启监听, 一般注册中心有变化, 则及时更新地址信息
	go r.watcher()
	r.ResolveNow(resolver.ResolveNowOptions{})
	return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return ExampleScheme }

// exampleResolver is a
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type exampleResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	addrsStore map[string][]string
	rn         chan struct{}
	ctx        context.Context
	cancel     context.CancelFunc
	wg         sync.WaitGroup
}

func (r *exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {
	select {
	case r.rn <- struct{}{}:
	default:
	}
}
func (r *exampleResolver) Close() {
	r.cancel()
	r.wg.Wait()
}

// 全局 resolverBuild 都存放一个 map 中,key 为 scheme,value 为对应的 resolverBuilder。
// init的时候会把自定义的exampleResolverBuilder注册进去
func init() {
	// Register the example ResolverBuilder. This is usually done in a package's
	// init() function.
	resolver.Register(&exampleResolverBuilder{})
}

// ServiceInstance 定义服务注册中心返回的服务实例
type ServiceInstance struct {
	// 服务名称
	ServiceName string
	// 服务地址(包括了ip和端口)
	Addr string
}

// GetAllInstance 从服务注册中心获取可用的服务实例列表
// 这里为了演示, 简单的直接返回了一个列表,真实场景是需要调服务注册中心的API的
func GetAllInstance() ([]ServiceInstance, error) {
	return []ServiceInstance{
		{ExampleServiceName, backendAddr1},
		{ExampleServiceName, backendAddr2},
	}, nil
}

func (r *exampleResolver) lookup() (*resolver.State, error) {
	instances, err := GetAllInstance()
	addresses := make([]resolver.Address, len(instances))
	for i, instance := range instances {
		addresses[i] = resolver.Address{Addr: instance.Addr, ServerName: instance.ServiceName}
	}
	state := &resolver.State{Addresses: addresses}
	return state, err
}

//watcher backend svr change
func (r *exampleResolver) watcher() {
	defer r.wg.Done()
	for {
		select {
		case <-r.ctx.Done():
			fmt.Println("关闭了")
			return
		case <-r.rn:
		}
		state, err := r.lookup()
		if err != nil {
			r.cc.ReportError(err)
		} else {
			r.cc.UpdateState(*state)
		}

		// 第二个select 用一个 timer 来限制dns更新频率
		t := time.NewTimer(minNSResRate)
		select {
		case <-t.C:
			r.rn <- struct{}{}
		case <-r.ctx.Done():
			t.Stop()
			return
		}
	}
}

client/main.go

代码语言:go
复制
package main

import (
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/credentials/oauth"
	"google.golang.org/grpc/examples/data"
	"grpc-demo/helloworld/auth"
	"grpc-demo/helloworld/grpc_validator"
	grpc_resolver "grpc-demo/helloworld/name_reslover"
	"grpc-demo/helloworld/pb"
	"log"
)

func main() {

	// 更多配置信息查看官方文档: https://github.com/grpc/grpc/blob/master/doc/service_config.md
	// service这里语法为<package>.<service> package就是proto文件中指定的package,service也是proto文件中指定的 Service Name。
	// method 可以不指定 即当前service下的所以方法都使用该配置。
	serverPolicy1 := `{
		"methodConfig": [{
		  "name": [{"service": "pb.Greeter","method":"SayHello"}],
		  "retryPolicy": {
			  "MaxAttempts": 5,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE", "DEADLINE_EXCEEDED" ]
		  }
		}]}`

	// 构建一个 PerRPCCredentials。
	// 使用内置的Oauth2
	oauthAuth := oauth.NewOauthAccess(auth.FetchToken())

	// 使用自定一的的身份验证
	userPwdAuth := auth.NewUserPwdAuth()

	// 使用自定一的的身份验证
	jwtAuth := auth.NewJWTAuthToken()

	cred, err := credentials.NewClientTLSFromFile(data.Path("/Users/guirong/go/src/grpc-demo/helloworld/client/ca.crt"),
		"www.ggr.com")
	if err != nil {
		log.Fatalf("failed to load credentials: %v", err)
	}

	fmt.Printf("call BattleService.Battle %s:///%s, use loadBalancingPolicy=pickFirst \n", grpc_resolver.ExampleScheme,
		grpc_resolver.ExampleServiceName)
	pickFirstConn, err := grpc.Dial(
		fmt.Sprintf("%s:///%s", grpc_resolver.ExampleScheme, grpc_resolver.ExampleServiceName),
		grpc.WithDefaultServiceConfig(serverPolicy1),
		grpc.WithTransportCredentials(cred),
		grpc.WithPerRPCCredentials(userPwdAuth),
		grpc.WithPerRPCCredentials(oauthAuth),
		grpc.WithPerRPCCredentials(jwtAuth),
		grpc.WithChainUnaryInterceptor(grpc_validator.UnaryClientInterceptor()),
		grpc.WithStreamInterceptor(grpc_validator.ClientStreamInterceptor()),
	)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer pickFirstConn.Close()

	client1 := pb.NewBattleServiceClient(pickFirstConn)
	for i := 0; i < 10; i++ {
		bidirectionalStreamBattle(client1)
	}

	serverPolicy2 := `{
		"loadBalancingConfig": [ { "round_robin": {} } ],
		"methodConfig": [{
		  "name": [{"service": "pb.Greeter","method":"SayHello"}],
		  "retryPolicy": {
			  "MaxAttempts": 5,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE", "DEADLINE_EXCEEDED" ]
		  }
		}]}`
	fmt.Printf("call BattleService.Battle %s:///%s, use loadBalancingPolicy=round_robin\n",
		grpc_resolver.ExampleScheme,
		grpc_resolver.ExampleServiceName)
	//ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
	//defer cancel()
	roundRobinConn, err := grpc.Dial(
		fmt.Sprintf("%s:///%s", grpc_resolver.ExampleScheme, grpc_resolver.ExampleServiceName),
		// 配置负载均衡策略, 默认是pickFirst
		grpc.WithDefaultServiceConfig(serverPolicy2),
		grpc.WithTransportCredentials(cred),
		grpc.WithPerRPCCredentials(userPwdAuth),
		grpc.WithPerRPCCredentials(oauthAuth),
		grpc.WithPerRPCCredentials(jwtAuth),
		grpc.WithChainUnaryInterceptor(grpc_validator.UnaryClientInterceptor()),
		grpc.WithStreamInterceptor(grpc_validator.ClientStreamInterceptor()),
	)

	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer roundRobinConn.Close()

	client2 := pb.NewBattleServiceClient(roundRobinConn)
	for i := 0; i < 10; i++ {
		bidirectionalStreamBattle(client2)
	}
}

其中loadBalancingConfig": [ { "round_robin": {} } ]配置就是用来执行负载均衡策略的.

image.png
image.png

输出结果

代码语言:txt
复制
拦截器
拦截器
拦截器
拦截器
2022/10/12 16:50:35 Serving gRPC on 0.0.0.0:50052
2022/10/12 16:50:35 Serving gRPC on 0.0.0.0:50051
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50052
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50052
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50052
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50052
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50052
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50052
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50052
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50052
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50052
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50052
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>

参考

https://github.com/grpc/grpc/blob/master/doc/load-balancing.md

https://www.lixueduan.com/posts/grpc/12-client-side-loadbalance/

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 负载平衡策略
  • 案例说明
  • 输出结果
  • 参考
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档