最近使用了一下 grpc 通信,现在整理一下,防止遗忘。
安装 grpc 可以支持不同的语言,详细可查看 github 。 这次我主要使用的是 go 和 java, golang 因为墙的关系有几个包需要找镜像下载1
1 2 3 4 5 6 7 8 git clone https://github.com/grpc/grpc-go.git $GOPATH/src/google.golang.org/grpc git clone https://github.com/golang/net.git $GOPATH/src/golang.org/x/net git clone https://github.com/golang/text.git $GOPATH/src/golang.org/x/text git clone https://github.com/golang/sys.git $GOPATH/src/golang.org/x/sys go get -u github.com/golang/protobuf/{proto,protoc-gen-go} git clone https://github.com/google/go-genproto.git $GOPATH/src/google.golang.org/genproto cd $GOPATH/src/ go install google.golang.org/grpc
我看官方也详细写了使用 go mod
的安装方法。
protoc 这是一个通过 proto
文件,自动生成不同语言代码的工具。 github 上可以直接下载 release,地址:https://github.com/protocolbuffers/protobuf/releases 。 比较省事的就是下载编译好的程序,比如我系统是 linux 64,就下载 protoc-x.x.x-linux-x86_64.zip
,直接解压放到 PATH
下就行,protoc --veriosn
查看版本。
protoc-gen-go 配合 protoc
goalng 使用的插件
go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
使用时注意 protoc-gen-go
必须在系统 PATH
中export PATH=$PATH:$GOPATH/bin
运行以下命令可以自动生成 go 文件
protoc --go_out=plugins=grpc:. *.proto
protoc-gen-grpc-java 官方提供的 java 插件,安装下载地址 https://github.com/grpc/grpc-java/tree/master/compiler
貌似系统 java 1.7 无法编译,应该需要更高的版本,或者下载已经编译好的工具,如 1.19
运行以下命令自动生成 java 文件
protoc --java_out=. *.proto
protoc --plugin=protoc-gen-grpc-java=/xxx/xxx/protoc-gen-grpc-java --grpc-java_out=. *.proto
路径根据实际情况修改,java 至少需要 2 个文件
proto 官方都有给简易例子,以下添加一些说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 syntax = "proto3"; option java_multiple_files = false; // java 文件是否生成多个文件,我直接选择否方便 git 操作 option java_package = "com.wishlily.helloworld"; // java 路径 option java_outer_classname = "HelloWorldProto"; // java 类名 package grpc; // golang 包名 // 通信接口 service Greeter { rpc SayHello(Hello) returns (Null) {} // 客户端给服务器信息 rpc SayBye(Null) returns (stream Bye) {} // 可用作服务器回调信息 } message Null {} // 即便传输为空也需要一个构造,或者引入官方包中的 google.protobuf.Empty message Hello { string name = 1; string msg = 2; int64 num = 3; } message Bye { enum Number { ONE = 0; TWO = 1; } Number num = 1; string msg = 2; }
go 根据官方例子修改如下:
server 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package mainimport ( "context" "fmt" "net" "time" pb "sayhi/grpc" "google.golang.org/grpc" )type server struct { hi bool number int64 name string msg string }func (s *server) SayHello(ctx context.Context, in *pb.Hello) (*pb.Null, error ) { reply := &pb.Null{} s.number = in.GetNum() s.name = in.GetName() s.msg = in.GetMsg() fmt.Printf("%d: %s Say %s\n" , s.number, s.name, s.msg) s.hi = true return reply, nil }func (s *server) SayBye(in *pb.Null, stream pb.Greeter_SayByeServer) error { for { if s.hi { bye := &pb.Bye{ Msg: "Receive: " + s.msg, } if (s.number % 2 ) == 0 { bye.Num = pb.Bye_ONE } else { bye.Num = pb.Bye_TWO } if err := stream.Send(bye); err != nil { return err } s.hi = false } else { time.Sleep(100 * time.Millisecond) } } }func main () { lis, err := net.Listen("tcp" , "localhost:10086" ) if err != nil { panic (err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, new (server)) fmt.Println("Server start ..." ) if err := s.Serve(lis); err != nil { panic (err) } }
开始接触 grpc 时一直想不明白,服务器怎么主动发起消息,后来明白其实就使用 stream,单方向就一直保持这个 stream 不关闭就行,当然这个连接还是要 client 先发起才行。
client 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 func main () { wait := make (chan int , 1 ) conn, err := grpc.Dial("localhost:10086" , grpc.WithInsecure()) if err != nil { panic (err) } defer conn.Close() c := pb.NewGreeterClient(conn) name := "Robyn" if len (os.Args) > 1 { name = os.Args[1 ] } msg := "Hello" if len (os.Args) > 2 { msg = os.Args[2 ] } num := int64 (120 ) if len (os.Args) > 3 { n, _ := strconv.Atoi(os.Args[3 ]) num = int64 (n) } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() _, err = c.SayHello(ctx, &pb.Hello{Name: name, Msg: msg, Num: num}) if err != nil { panic (err) } fmt.Printf("%d: %s Say %s\n" , num, name, msg) go func () { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() stream, err := c.SayBye(ctx, &pb.Null{}) if err != nil { panic (err) } for { bye, err := stream.Recv() if err == io.EOF { break } if err != nil { panic (err) } fmt.Printf("Say Bye %v %v\n" , bye.GetNum(), bye.GetMsg()) break } wait <- 0 }() <-wait }
demo 测试结果如下:
client
1 2 3 4 5 6 # go run main.goSay Bye ONE:Receive: Hello 120: Robyn Say Hello # go run main.go Emily Hi 23Say Bye TWO Receive: Hi 23: Emily Say Hi
server
1 2 3 4 # go run main.go Server start ... 120: Robyn Say Hello 23: Emily Say Hi
java 首先介绍打包方式:一种是使用 maven,另一种是使用 ant。
maven 这部分根据官方例子 摘抄
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 <properties > ... <grpc.version > 1.19.0</grpc.version > </properties > <dependencyManagement > <dependencies > <dependency > <groupId > io.grpc</groupId > <artifactId > grpc-bom</artifactId > <version > ${grpc.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <dependencies > <dependency > <groupId > io.grpc</groupId > <artifactId > grpc-netty-shaded</artifactId > <scope > runtime</scope > </dependency > <dependency > <groupId > io.grpc</groupId > <artifactId > grpc-protobuf</artifactId > </dependency > <dependency > <groupId > io.grpc</groupId > <artifactId > grpc-stub</artifactId > </dependency > </dependencies >
ant 就需要提前下载好 jar 包,将这些一起打包到你的包里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <target name ="client" depends ="compile" > <mkdir dir ="${prepjar}" /> <copy todir ="${prepjar}" > <fileset dir ="${classes}" /> </copy > <jar jarfile ="${clientjarfilename}" basedir ="${prepjar}" > <manifest > <attribute name ="Main-Class" value ="com.xxx.MainClass" /> </manifest > <exclude name ="**/xxx/**" /> <zipgroupfileset dir ="${grpc}" includes ="**/*.jar" /> </jar > <delete dir ="${prepjar}" /> </target >
server 服务器首先使用 protoc
工具生成两个文件:GreeterGrpc.java
& HelloWorldProto.java
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public class Server { private io.grpc.Server server; private void start () throws IOException { int port = 10086 ; server = ServerBuilder.forPort(port).addService(new GreeterImpl ()).build().start(); System.out.println("Server started, listening on 10086" ); Runtime.getRuntime().addShutdownHook(new Thread () { @Override public void run () { System.err.println("*** shutting down gRPC server since JVM is shutting down" ); Server.this .stop(); System.err.println("*** server shut down" ); } }); } private void stop () { if (server != null ) { server.shutdown(); } } private void blockUntilShutdown () throws InterruptedException { if (server != null ) { server.awaitTermination(); } } static class GreeterImpl extends GreeterImplBase { private boolean sayHi = false ; private Bye bye; public void sayHello (Hello req, StreamObserver<Null> responseObserver) { Null reply = Null.newBuilder().build(); Number num = Number.ONE; if (req.getNum() % 2 == 1 ) { num = Number.TWO; } String name = req.getName(); String msg = req.getMsg(); System.out.printf("%d: %s Say %s\n" , req.getNum(), name, msg); bye = Bye.newBuilder().setNum(num).setMsg("Get: " + msg).build(); responseObserver.onNext(reply); responseObserver.onCompleted(); sayHi = true ; } public void sayBye (Null req, StreamObserver<Bye> responseObserver) { while (true ) { if (sayHi) { sayHi = false ; System.out.printf("%s Bye %s\n" , bye.getNum(), bye.getMsg()); try { responseObserver.onNext(bye); } catch (StatusRuntimeException e) { System.out.println(e.toString()); responseObserver.onError(e); break ; } } else { try { Thread.sleep(1000 ); } catch (InterruptedException e) { } } } } } public static void main (String[] args) throws IOException, InterruptedException { final Server server = new Server (); server.start(); server.blockUntilShutdown(); } }
client 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 public class Client { private final ManagedChannel channel; private final GreeterGrpc.GreeterBlockingStub blockingStub; private ByeListenThread byeListenThread; public Client (String host, int port) { this (ManagedChannelBuilder.forAddress(host, port).usePlaintext()); } public Client (ManagedChannelBuilder<?> channelBuilder) { channel = channelBuilder.build(); blockingStub = GreeterGrpc.newBlockingStub(channel); byeListenThread = new ByeListenThread (); byeListenThread.setName("Client_ByeListenThread" ); byeListenThread.setDaemon(true ); byeListenThread.start(); } public void shutdown () throws InterruptedException { channel.shutdown().awaitTermination(5 , TimeUnit.SECONDS); byeListenThread.cancel(); } public void sayHello (String name, String msg, int num) { HelloWorldProto.Hello request = HelloWorldProto.Hello.newBuilder().setName(name).setMsg(msg).setNum(num) .build(); blockingStub.sayHello(request); System.out.printf("%d: %s Say %s\n" , num, name, msg); } private class ByeListenThread extends Thread { private volatile boolean isAlive = false ; HelloWorldProto.Null request = HelloWorldProto.Null.newBuilder().build(); public void run () { Iterator<HelloWorldProto.Bye> bye; while (isAlive) { bye = blockingStub.sayBye(request); while (bye.hasNext()) { HelloWorldProto.Bye data = bye.next(); System.out.printf("Bye %s: %s\n" , data.getNum(), data.getMsg()); } } } public synchronized void start () { isAlive = true ; super .start(); } public synchronized void cancel () { isAlive = false ; } } public static void main (String[] args) throws IOException, InterruptedException { Client c = new Client ("127.0.0.1" , 10086 ); String name = "Java" ; if (args.length > 0 ) { name = args[0 ]; } String msg = "hello" ; if (args.length > 1 ) { msg = args[1 ]; } int num = 2046 ; if (args.length > 2 ) { num = Integer.parseInt(args[2 ]); } c.sayHello(name, msg, num); System.out.println("wait ..." ); InputStreamReader is_reader = new InputStreamReader (System.in); String str = new BufferedReader (is_reader).readLine(); c.shutdown(); } }
demo 和 golang 一样而且可以交叉测试,不过 go client & java server 会有超时错误,没有详细查看。