相关代码: grpc-simple

rpc 简介

单体应用微服务化之后,之间会有调用需求,可以通过 http or rpc 方式完成

http 是和数据传输没关系的,数据传输是 tcp/ip 层做的, 而 http 负责定义好 clientserver 交互数据的格式, 定义好后就可以有四层进行传输了。

RPC 则是远程过程调用,它不是一种协议,而是一种使用习惯/方式,所以和 HTTP 是两个维度的东西,RPC 也涉及到交互,需要约定好格式。它的实现可以基于http,也可以自定义格式,再由 tcp 四层传输,通常底层是http/2实现.

so why rpc?

http 是可行么? 答案时肯定的,考虑使用习惯和效率而选用 rpc

  • 习惯: RPC 调用是因为服务的拆分,服务被拆分后非常期望代码上还和之前本地调用相同,因此需要 RPC 框架,来屏蔽这些底层调用细节
  • 效率:
    • http是通用规范,所以第三方库都使用http作为通信协议。对于专用内部组件,完全可以改成自定义的 rpc 这种协议
    • http的json格式的数据是冗余,序列成二进制后许多重复的字段,浪费带宽,这其实是为了用户的可读性设计而rpc使用的是proto buf格式,放弃了可读性而追求效率。进行两个方面优化,
      • Protobuf 选用了 VarInts 对数字进行编码,解决了效率问题
      • 给每个字段指定一个整数编号,传输的时候只传字段编号,解决了冗余问题

csi 中的应用

image

csi的各个sidecar之间的调用都是通过grpc, 本次着重分析使用的接口文件.
csi中只使用了csi/csi.pb.go,而官方还会有另外一个csi_grpc.pb.go, 官方给出了这样的解释。
从1.30.x开始,protoc会生成两个单独的文件,将入参、出参、序列化代码存于pb.go,而grpc.pb.go存放接口

1
2
3
4
5
6
From 1.30.x gRPC, gRPC protoc generates *.pb.go and *_grpc.pb.go separately:

*.pb.go, which contains all the protocol buffer code to populate, serialize, and retrieve request and response message types.
*_grpc.pb.go, which contains the following:
An interface type (or stub) for clients to call with the methods defined in the service.
An interface type for servers to implement, also with the methods defined in the service.

自己动手,丰衣足食

Prerequisites

  • 安装Go
  • protoc
    • protocol compile
    • protoc是用c++编写,用于编译.proto文件
  • protoc-gen-go
    • 安装Go plugins for protocol compiler
    • 除了安装protoc之外,还需要安装各个语言对应的编译插件。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13

      # PB_REL="https://github.com/protocolbuffers/protobuf/releases"
      # curl -LO $PB_REL/download/v3.15.8/protoc-3.15.8-linux-x86_64.zip
      # unzip protoc-3.15.8-linux-x86_64.zip -d $HOME/.local
      # export PATH="$PATH:$HOME/.local/bin"

      1. install
      $ export GOPROXY=https://goproxy.cn,direct
      $ go get google.golang.org/protobuf/cmd/protoc-gen-go@v1.26
      $ go get google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1

      2.Update your PATH so that the protoc compiler can find the plugins:
      $ export PATH="$PATH:$(go env GOPATH)/bin"

      protoc 中原生包含了部分语言(java、php、python、ruby等等)的编译插件,但是没有 Go 语言的,所以需要额外安装一个插件。https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/compiler/main.cc

Demo

创建.proto 文件

syntax = "proto3";
option go_package = "google.golang.org/grpc/examples/helloworld/helloworld";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

编译proto

1
2
# Syntax: protoc [OPTION] PROTO_FILES
$ protoc --proto_path=IMPORT_PATH --go_out=OUT_DIR --go_opt=paths=source_relative path/to/file.proto

这里简单介绍编译参数:

  • –proto_path或者-I:指定 import 的其他proto路径,可以指定多个参数,编译时按顺序查找,不指定时默认查找当前目录。
    • .proto 文件中也可以引入其他 .proto 文件,这里主要用于指定被引入文件的位置。
  • –go_out:golang编译支持,指定输出文件路径
    其他语言则替换即可,比如 –java_out 等等
  • –go_opt:可选参数,比如–go_opt=paths=source_relative就是表明生成文件输出使用相对路径。
  • path/to/file.proto :被编译的 .proto 文件放在最后面
$ helloworld.proto

# protoc --go_out=. \
       --go_opt=paths=source_relative \
       --go-grpc_out=. \
       --go-grpc_opt=paths=source_relative \
       ./helloworld.proto

编译后会生成两个文件。

helloworld.pb.go
helloworld_grpc.pb.go

编译过程可以分成两步

  • protoc解析.protoc文件,编译成 protobuf 的原生数据结构保存在内存中;
  • 调用特定的语言插件,渲染输出为特定语言的模板。

编写client和server侧

两侧的main.go,可以参考官方例子 helloworld

1
2
3
4
5
6
7
8
9
10
11
12
13
# git clone -b v1.52.0 --depth 1 https://github.com/grpc/grpc-go
# cd grpc-go/examples/helloworld
# tree
.
├── greeter_client
│ └── main.go // client
├── greeter_server
│ └── main.go // server
└── helloworld
├── helloworld_grpc.pb.go //
├── helloworld.pb.go //
└── helloworld.proto // protocol buffers协议规范

grpc 高级特性

拦截器实现原理分析

拦截器分为服务端拦截器和客户端拦截器,所以一共有以下4种类型:

  • grpc.UnaryServerInterceptor
  • grpc.StreamServerInterceptor
  • grpc.UnaryClientInterceptor
  • grpc.StreamClientInterceptor

client 侧的拦截器为例进行分析

1
2
3
4
5
6
# client
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(creds), grpc.WithStreamInterceptor(streamInterceptor))

# server
s := grpc.NewServer(grpc.Creds(creds), grpc.StreamInterceptor(streamInterceptor))

多个形如 grpc.WithXXXInterceptor 方法作为函数入参,其实则是动态参数的DialOption对象, 语法糖。

with 传入的拦截器作为 Dial() 的参数,分析 Dial 呼叫的流程

func Dial(target string, opts ...DialOption) {
    return DialContext(context.Background(), target, opts...) // 一个opts表示一个Withxxx参数,这里有多个,通过加三个点,告知下一层
}

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {

    cc := &ClientConn{
            ...
            dopts:             defaultDialOptions(),
            ...
    }
   // opts 是with传入的可选拦截器
    for _, opt := range opts {
        opt.apply(&cc.dopts)
    }

}

执行每一个 DialOption 对象的 apply() 方法,入参是 default dialoption 对象

主要流程结束,再返回头来分析 grpc.WithXXXInterceptor, 以 grpc.WithUnaryInterceptor(unaryInterceptor) 为例子。

// 闭包的用法
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
    return newFuncDialOption(func(o *dialOptions) {
        // 将拦截器方法 赋值给dialOptions的unaryInt field
        o.unaryInt = f
    })
}

func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
    return &funcDialOption{
        f: f,
    }
}

将外部拦截器 f,传入函数对象且作为入参。
这里直接返回匿名函数,内部是f(o)执行不就可以了吗?

实际是将匿名函数传入NewFuncXXX, 在newFuncXXX中,实例化funcDialOption对象,匿名函数作为其属性而已。

该类型对象及其方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
type funcDialOption struct {
f func(*dialOptions)
}

func (fdo *funcDialOption) apply(do *dialOptions) {
fdo.f(do)
}

func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
return &funcDialOption{
f: f,
}
}

再回到执行处

1
2
3
   for _, opt := range opts {
opt.apply(&cc.dopts)
}

调用 funcDialOption 对象的 apply 方法。
实际是执行外部匿名函数

1
2
3
func(o *dialOptions) {
o.unaryInt = f
}

所以最后实现的功能是将拦截器方法赋值给 dialOptions 对象的 unaryInt field, 拦截器 f 是没有执行的。

拦截器何时执行呢?

1
2
3
4
5
6
7
8
9
10
11
12
// call.go
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
// 有拦截器, 通过拦截器执行invoke
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
// 直接执行 invoke
return invoke(ctx, method, args, reply, cc, opts...)
}

没有拦截器则直接执行invoke
有拦截器,则由 UnaryInterceptor 拦截器去执行, invoke作为参数。

client 侧经常看到的c.cc.Invoke(ctx, "/helloworld.Hello/UnaryHello", in, out, opts...) 执行的就是该方法.

clientConn 对象就是 conn, err := grpc.Dial(address,...) 呼叫后返回conn链接对象.

UnaryInterceptor 拦截器的实现

1
2
3
4
5
6
7
8
9
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// pre-processing, 拦截器之前
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...) // invoking RPC method
// post-processing,拦截器之后
end := time.Now()
log.Printf("RPC: %s, req:%v start time: %s, end time: %s, err: %v", method, req, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
return err
}

可以看到,最后执行的还是传入的 invoke 方法, 在 invoke 之前或之后可以定义一些逻辑,从而实现了 python 装饰器的功能,比如动态添加一些逻辑。

不管是普通rpc, 还是拦截器,最后调用的是invoke方法

1
2
3
4
5
6
7
8
9
10
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}

依赖底层的clientStream对象,通过SendMsg, RecvMsg方法实现请求。

函数名作为变量的理解

  1. 入参是UnaryClientInterceptor, 函数名表示一种类型,类似int.
  2. func(o *dialOptions)也是一种类型,等价于函数名,只是匿名而已,后面括号表示实例化,整体
    表示一个函数实例化对象。
    1
    2
    3
    func(o *dialOptions) {
    o.unaryInt = f
    }
    个人认为没必要再封装一层funcDialOption
1
2
3
4
5
6
7
8
9
10
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
o.unaryInt = f
}
}

调用时
for _, opt := range opts {
opt(&cc.dopts)
}

gRPC-Gateway

这种 protoc 插件使 rpc server 能对外提供 RESTful HTTP APIgrpc service

安装插件

用于将protobuf内容生成http方法
go get github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway

整体流程

大致就是以 .proto 文件为基础,编写插件对 protoc 进行扩展,编译出不同语言不同模块的源文件。

1)首先定义 .proto 文件;
2)然后由 protoc.proto 文件编译成 protobuf 格式的数据;
3)将 2 中编译后的数据传递到各个插件,生成对应语言、对应模块的源代码。

  • Go Plugins 用于生成 .pb.go 文件
  • gRPC Plugins 用于生成 _grpc.pb.go
  • gRPC-Gateway 则是 pb.gw.go, proxy功能的实现

原理图

image.png

client 发起 http req, 将七层的req类比ip层,req先到gateway, 由gateway转发请求到具体的grpc service.

为了让grpc service支持restful请求
1.修改 .proto 文件, 在原先 rpc 方法添加其他调用选项 option, 声明使用 google.api.http, methoh, url, body等参数, 形如

1
2
3
4
5
6
7
8
9
service Hello {
// UnaryAPI
rpc UnaryHello(HelloRequest) returns (HelloReply) {
option (google.api.http) = {
post: "/v1/UnaryHello"
body: "*"
};
}
}

gRPC-Gateway 插件才能生成对应 http 方法

测试

运行server

1
2
3
 # go run main.go
2023/05/30 11:21:44 Serving gRPC on 0.0.0.0:50051
2023/05/30 11:21:44 Serving gRPC-Gateway on http://0.0.0.0:8080

client通过grpc方式

1
2
% go run main.go
2023/05/30 11:21:47 Greeting: hello world

client通过http方式

1
2
% curl -X POST -k http://0.0.0.0:8080/v1/UnaryHello -d '{"name": " world"}'
{"message":"hello world"}

grpc server同时提供 gPRC 服务和 HTTP 服务

原理分析

grpc server 仍然运行,因此 rpc 请求依然可以给予 service, 额外增加了一个http server, 由其向 rpc server 发起 rpc 请求, 具体方法是

  1. runtime.NewServeMux() New了一个 servermux
  2. 通过pb.RegisterHelloHandler 注册pathhandler.
  3. servermux配置为http serverhandler
1
2
3
4
5
6
7
8
9
10
11
12
13
func RegisterHelloHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterHelloHandlerClient(ctx, mux, NewHelloClient(conn))
}

func RegisterHelloHandlerClient(ctx context.Context, mux *runtime.ServeMux, client HelloClient) error {

mux.Handle("POST", pattern_Hello_UnaryHello_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
resp, md, err := request_Hello_UnaryHello_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
forward_Hello_UnaryHello_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
1
2
3
4
5
6
7
8
9
10
# 路由path, 用于curl  /v1/UnaryHello
var (
pattern_Hello_UnaryHello_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "UnaryHello"}, ""))
)

func request_Hello_UnaryHello_0(ctx context.Context, marshaler runtime.Marshaler, client HelloClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
....
msg, err := client.UnaryHello(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}

restful 请求先到http server, 由其 handler 通过 grpc client发起 rpc 请求

使用 context 进行超时控制

超时控制可以分成两种

  1. deadline: server侧执行超时,client被迫关闭。
  2. cancel: client发现错误主动关闭,不需要等server侧返回错误。

client 调用时传入

1
2
3
4
5
6
7
// 1秒超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// call时传入ctx
_, err := c.UnaryHello(ctx, req)
got := status.Code(err)

只要 server 侧接口返回超过1s, client call 就返回 DeadlineExceeded error

第二种方式是不放在 defer 里,而是 client 直接调用 cancel()

小结:

  1. 类似 goroutinectx, context 方式也适用于 grpc, 其内部已经做了 context 的处理
  2. ctx 可以使用context.WithDeadline()或者context.WithTimeout(),二者效果类似,只是传递的参数不一样。 前者是在指定时刻超时,后者是过多久会超时。

grpc负载均衡(TODO)

grpc 服务发现

总结

  1. 编写 x.proto 定义文件
  2. 安装特定语言的插件,生成 pb.go, gprc.pb.go, 使用的也是这两个文件,不包括 x.proto, 这两个文件通常作为 lib,托管在github 上,里面定义好了 clientserver intrface. client 已经实现好了,可以直接用,需要自己实现 server. 三者名字关系一致,不受里面配置的影响。x.proto, x.pb.go, x_grpc.pb.go,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type helloClient struct {
cc grpc.ClientConnInterface
}

func NewHelloClient(cc grpc.ClientConnInterface) HelloClient {
return &helloClient{cc}
}

func (c *helloClient) UnaryHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Hello/UnaryHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
  1. 实现 client/main.go, 主要步骤是 grpc.Dial(addr), pb.NewClient, client.callMethod()

  2. 实现 server/main.go,主要步骤是s:=grpc.NewServer()pb.RegisterHelloServer(s, &Hello{})注册服务Hello对象实现grpc接口,net.Listen(),rpcserver.Serve(listen)

  3. 根据 x.proto中定义,可以确定请求路径是 ·/${包名}.${服务名}/${接口名}

Ref

  1. https://www.lixueduan.com/posts/grpc/01-protobuf/#3-go-plugins
  2. https://www.51cto.com/article/706308.html
  3. https://www.jianshu.com/p/fe5ccfc5d7bd
  4. https://taoshu.in/grpc.html