grpc

最近使用了一下 grpc 通信,现在整理一下,防止遗忘。

安装

grpc 可以支持不同的语言,详细可查看 github
这次我主要使用的是 go 和 java,
golang 因为墙的关系有几个包需要找镜像下载1

1
2
3
4
5
6
7
8
git clone https://github.com/grpc/grpc-go.git $GOPATH/src/google.golang.org/grpc
git clone https://github.com/golang/net.git $GOPATH/src/golang.org/x/net
git clone https://github.com/golang/text.git $GOPATH/src/golang.org/x/text
git clone https://github.com/golang/sys.git $GOPATH/src/golang.org/x/sys
go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
git clone https://github.com/google/go-genproto.git $GOPATH/src/google.golang.org/genproto
cd $GOPATH/src/
go install google.golang.org/grpc

我看官方也详细写了使用 go mod 的安装方法。

protoc

这是一个通过 proto 文件,自动生成不同语言代码的工具。
github 上可以直接下载 release,地址:https://github.com/protocolbuffers/protobuf/releases
比较省事的就是下载编译好的程序,比如我系统是 linux 64,就下载 protoc-x.x.x-linux-x86_64.zip,直接解压放到 PATH 下就行,protoc --veriosn 查看版本。

protoc-gen-go

配合 protoc goalng 使用的插件

go get -u github.com/golang/protobuf/{proto,protoc-gen-go}

使用时注意 protoc-gen-go 必须在系统 PATH
export PATH=$PATH:$GOPATH/bin

运行以下命令可以自动生成 go 文件

protoc --go_out=plugins=grpc:. *.proto

protoc-gen-grpc-java

官方提供的 java 插件,安装下载地址 https://github.com/grpc/grpc-java/tree/master/compiler

貌似系统 java 1.7 无法编译,应该需要更高的版本,或者下载已经编译好的工具,如 1.19

运行以下命令自动生成 java 文件

protoc --java_out=. *.proto
protoc --plugin=protoc-gen-grpc-java=/xxx/xxx/protoc-gen-grpc-java --grpc-java_out=. *.proto

路径根据实际情况修改,java 至少需要 2 个文件

proto

官方都有给简易例子,以下添加一些说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
syntax = "proto3";

option java_multiple_files = false; // java 文件是否生成多个文件,我直接选择否方便 git 操作
option java_package = "com.wishlily.helloworld"; // java 路径
option java_outer_classname = "HelloWorldProto"; // java 类名

package grpc; // golang 包名

// 通信接口
service Greeter {
rpc SayHello(Hello) returns (Null) {} // 客户端给服务器信息
rpc SayBye(Null) returns (stream Bye) {} // 可用作服务器回调信息
}

message Null {} // 即便传输为空也需要一个构造,或者引入官方包中的 google.protobuf.Empty

message Hello {
string name = 1;
string msg = 2;
int64 num = 3;
}

message Bye {
enum Number {
ONE = 0;
TWO = 1;
}
Number num = 1;
string msg = 2;
}

go

根据官方例子修改如下:

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"context"
"fmt"
"net"
"time"

pb "sayhi/grpc" // protoc-gen-go 生成文件包

"google.golang.org/grpc"
)

type server struct {
hi bool

number int64
name string
msg string
}

// 客户端 -> 服务器
func (s *server) SayHello(ctx context.Context, in *pb.Hello) (*pb.Null, error) {
reply := &pb.Null{}
s.number = in.GetNum()
s.name = in.GetName()
s.msg = in.GetMsg()
fmt.Printf("%d: %s Say %s\n", s.number, s.name, s.msg)
s.hi = true
return reply, nil
}

// 服务器 -> 客户端
func (s *server) SayBye(in *pb.Null, stream pb.Greeter_SayByeServer) error {
for { // 此处不退出,会一直在该线路上发送,哪怕连接已断,一定要判断 Send 结果
if s.hi {
bye := &pb.Bye{
Msg: "Receive: " + s.msg,
}
if (s.number % 2) == 0 {
bye.Num = pb.Bye_ONE
} else {
bye.Num = pb.Bye_TWO
}
if err := stream.Send(bye); err != nil { // 服务器主动发起消息
return err // Disconnect
}
s.hi = false
} else {
time.Sleep(100 * time.Millisecond)
}
}
}

func main() {
lis, err := net.Listen("tcp", "localhost:10086")
if err != nil {
panic(err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, new(server))
fmt.Println("Server start ...")
if err := s.Serve(lis); err != nil {
panic(err)
}
}

开始接触 grpc 时一直想不明白,服务器怎么主动发起消息,后来明白其实就使用 stream,单方向就一直保持这个 stream 不关闭就行,当然这个连接还是要 client 先发起才行。

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func main() {
wait := make(chan int, 1)

conn, err := grpc.Dial("localhost:10086", grpc.WithInsecure())
if err != nil {
panic(err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)

name := "Robyn"
if len(os.Args) > 1 {
name = os.Args[1]
}
msg := "Hello"
if len(os.Args) > 2 {
msg = os.Args[2]
}
num := int64(120)
if len(os.Args) > 3 {
n, _ := strconv.Atoi(os.Args[3])
num = int64(n)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err = c.SayHello(ctx, &pb.Hello{Name: name, Msg: msg, Num: num})
if err != nil {
panic(err)
}
fmt.Printf("%d: %s Say %s\n", num, name, msg)

go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stream, err := c.SayBye(ctx, &pb.Null{})
if err != nil {
panic(err)
}
for {
bye, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
fmt.Printf("Say Bye %v %v\n", bye.GetNum(), bye.GetMsg())
break
}
wait <- 0
}()

<-wait
}

demo

测试结果如下:

client

1
2
3
4
5
6
[root@localhost client]# go run main.go
Say Bye ONE:Receive: Hello
120: Robyn Say Hello
[root@localhost client]# go run main.go Emily Hi 23
Say Bye TWO Receive: Hi
23: Emily Say Hi

server

1
2
3
4
[root@localhost server]# go run main.go
Server start ...
120: Robyn Say Hello
23: Emily Say Hi

java

首先介绍打包方式:一种是使用 maven,另一种是使用 ant。

maven 这部分根据官方例子摘抄

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<properties>
...
<grpc.version>1.19.0</grpc.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
</dependencies>

ant 就需要提前下载好 jar 包,将这些一起打包到你的包里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<target name="client" depends="compile">
<mkdir dir="${prepjar}" />
<copy todir="${prepjar}">
<fileset dir="${classes}"/>
</copy>
<jar jarfile="${clientjarfilename}" basedir="${prepjar}">
<manifest>
<attribute name="Main-Class" value="com.xxx.MainClass" />
</manifest>
<exclude name="**/xxx/**"/>
<zipgroupfileset dir="${grpc}" includes="**/*.jar" /> <!-- 主要这句拷贝路径下所有 jar -->
</jar>
<delete dir="${prepjar}" />
</target>

server

服务器首先使用 protoc 工具生成两个文件:GreeterGrpc.java & HelloWorldProto.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class Server {
private io.grpc.Server server;

private void start() throws IOException {
int port = 10086;
server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();
System.out.println("Server started, listening on 10086");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown
// hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
Server.this.stop();
System.err.println("*** server shut down");
}
});
}

private void stop() {
if (server != null) {
server.shutdown();
}
}

private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

static class GreeterImpl extends GreeterImplBase {
private boolean sayHi = false;
private Bye bye;

public void sayHello(Hello req, StreamObserver<Null> responseObserver) {
Null reply = Null.newBuilder().build();

Number num = Number.ONE;
if (req.getNum() % 2 == 1) {
num = Number.TWO;
}
String name = req.getName();
String msg = req.getMsg();
System.out.printf("%d: %s Say %s\n", req.getNum(), name, msg);
bye = Bye.newBuilder().setNum(num).setMsg("Get: " + msg).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
sayHi = true;
}

public void sayBye(Null req, StreamObserver<Bye> responseObserver) {
while (true) {
if (sayHi) {
sayHi = false;
System.out.printf("%s Bye %s\n", bye.getNum(), bye.getMsg());
try {
responseObserver.onNext(bye);
} catch (StatusRuntimeException e) { // 是 grpc 出现错误时抛出异常
System.out.println(e.toString());
responseObserver.onError(e); // 测试中如果没有 onError | onCompleted,则下一次连接会出现问题
break;
}
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
}

public static void main(String[] args) throws IOException, InterruptedException {
final Server server = new Server();
server.start();
server.blockUntilShutdown();
}
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class Client {
private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;

private ByeListenThread byeListenThread;

/** Construct client for accessing RouteGuide server at {@code host:port}. */
public Client(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}

/**
* Construct client for accessing RouteGuide server using the existing channel.
*/
public Client(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = GreeterGrpc.newBlockingStub(channel);

byeListenThread = new ByeListenThread();
byeListenThread.setName("Client_ByeListenThread");
byeListenThread.setDaemon(true);
byeListenThread.start();
}

public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
byeListenThread.cancel();
}

public void sayHello(String name, String msg, int num) {
HelloWorldProto.Hello request = HelloWorldProto.Hello.newBuilder().setName(name).setMsg(msg).setNum(num)
.build();
blockingStub.sayHello(request);
System.out.printf("%d: %s Say %s\n", num, name, msg);
}

private class ByeListenThread extends Thread {
private volatile boolean isAlive = false;
HelloWorldProto.Null request = HelloWorldProto.Null.newBuilder().build();

public void run() {
Iterator<HelloWorldProto.Bye> bye;
while (isAlive) {
bye = blockingStub.sayBye(request);
while (bye.hasNext()) { // block
HelloWorldProto.Bye data = bye.next();
// if data have ptr part, hasXXX check
System.out.printf("Bye %s: %s\n", data.getNum(), data.getMsg());
}
}
}

public synchronized void start() {
isAlive = true;
super.start();
}

public synchronized void cancel() {
isAlive = false;
}
}

public static void main(String[] args) throws IOException, InterruptedException {
Client c = new Client("127.0.0.1", 10086);

String name = "Java";
if (args.length > 0) {
name = args[0];
}
String msg = "hello";
if (args.length > 1) {
msg = args[1];
}
int num = 2046;
if (args.length > 2) {
num = Integer.parseInt(args[2]);
}

c.sayHello(name, msg, num);
// wait
System.out.println("wait ...");
InputStreamReader is_reader = new InputStreamReader(System.in);
String str = new BufferedReader(is_reader).readLine();
c.shutdown();
}
}

demo

和 golang 一样而且可以交叉测试,不过 go client & java server 会有超时错误,没有详细查看。


grpc
https://wishlily.github.io/article/bus/2019/06/09/grpc/
作者
Wishlily
发布于
2019年6月9日
许可协议