目录

grpc

最近使用了一下 grpc 通信,现在整理一下,防止遗忘。

安装

grpc 可以支持不同的语言,详细可查看 github
这次我主要使用的是 go 和 java,
golang 因为墙的关系有几个包需要找镜像下载1

git clone https://github.com/grpc/grpc-go.git $GOPATH/src/google.golang.org/grpc
git clone https://github.com/golang/net.git $GOPATH/src/golang.org/x/net
git clone https://github.com/golang/text.git $GOPATH/src/golang.org/x/text
git clone https://github.com/golang/sys.git $GOPATH/src/golang.org/x/sys
go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
git clone https://github.com/google/go-genproto.git $GOPATH/src/google.golang.org/genproto
cd $GOPATH/src/
go install google.golang.org/grpc

我看官方也详细写了使用 go mod 的安装方法。

protoc

这是一个通过 proto 文件,自动生成不同语言代码的工具。
github 上可以直接下载 release,地址:https://github.com/protocolbuffers/protobuf/releases
比较省事的就是下载编译好的程序,比如我系统是 linux 64,就下载 protoc-x.x.x-linux-x86_64.zip,直接解压放到 PATH 下就行,protoc --veriosn 查看版本。

protoc-gen-go

配合 protoc goalng 使用的插件

go get -u github.com/golang/protobuf/{proto,protoc-gen-go}

使用时注意 protoc-gen-go 必须在系统 PATH
export PATH=$PATH:$GOPATH/bin

运行以下命令可以自动生成 go 文件

protoc --go_out=plugins=grpc:. *.proto

protoc-gen-grpc-java

官方提供的 java 插件,安装下载地址 https://github.com/grpc/grpc-java/tree/master/compiler

貌似系统 java 1.7 无法编译,应该需要更高的版本,或者下载已经编译好的工具,如 1.19

运行以下命令自动生成 java 文件

protoc --java_out=. *.proto
protoc --plugin=protoc-gen-grpc-java=/xxx/xxx/protoc-gen-grpc-java --grpc-java_out=. *.proto

路径根据实际情况修改,java 至少需要 2 个文件

proto

官方都有给简易例子,以下添加一些说明:

syntax = "proto3";

option java_multiple_files = false;              // java 文件是否生成多个文件,我直接选择否方便 git 操作
option java_package = "com.wishlily.helloworld"; // java 路径
option java_outer_classname = "HelloWorldProto"; // java 类名

package grpc;                                    // golang 包名

// 通信接口
service Greeter {
    rpc SayHello(Hello) returns (Null) {}       // 客户端给服务器信息
    rpc SayBye(Null) returns (stream Bye) {}    // 可用作服务器回调信息
}

message Null {}             // 即便传输为空也需要一个构造,或者引入官方包中的 google.protobuf.Empty

message Hello {
    string name = 1;
    string msg = 2;
    int64 num = 3;
}

message Bye {
    enum Number {
        ONE = 0;
        TWO = 1;
    }
    Number num = 1;
    string msg = 2;
}

go

根据官方例子修改如下:

server

package main

import (
	"context"
	"fmt"
	"net"
	"time"

	pb "sayhi/grpc"          // protoc-gen-go 生成文件包

	"google.golang.org/grpc"
)

type server struct {
	hi bool

	number int64
	name   string
	msg    string
}

// 客户端 -> 服务器
func (s *server) SayHello(ctx context.Context, in *pb.Hello) (*pb.Null, error) {
	reply := &pb.Null{}
	s.number = in.GetNum()
	s.name = in.GetName()
	s.msg = in.GetMsg()
	fmt.Printf("%d: %s Say %s\n", s.number, s.name, s.msg)
	s.hi = true
	return reply, nil
}

// 服务器 -> 客户端
func (s *server) SayBye(in *pb.Null, stream pb.Greeter_SayByeServer) error {
	for { // 此处不退出,会一直在该线路上发送,哪怕连接已断,一定要判断 Send 结果
		if s.hi {
			bye := &pb.Bye{
				Msg: "Receive: " + s.msg,
			}
			if (s.number % 2) == 0 {
				bye.Num = pb.Bye_ONE
			} else {
				bye.Num = pb.Bye_TWO
			}
			if err := stream.Send(bye); err != nil { // 服务器主动发起消息
				return err // Disconnect
			}
			s.hi = false
		} else {
			time.Sleep(100 * time.Millisecond)
		}
	}
}

func main() {
	lis, err := net.Listen("tcp", "localhost:10086")
	if err != nil {
		panic(err)
	}
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, new(server))
	fmt.Println("Server start ...")
	if err := s.Serve(lis); err != nil {
		panic(err)
	}
}

开始接触 grpc 时一直想不明白,服务器怎么主动发起消息,后来明白其实就使用 stream,单方向就一直保持这个 stream 不关闭就行,当然这个连接还是要 client 先发起才行。

client

func main() {
	wait := make(chan int, 1)

	conn, err := grpc.Dial("localhost:10086", grpc.WithInsecure())
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	name := "Robyn"
	if len(os.Args) > 1 {
		name = os.Args[1]
	}
	msg := "Hello"
	if len(os.Args) > 2 {
		msg = os.Args[2]
	}
	num := int64(120)
	if len(os.Args) > 3 {
		n, _ := strconv.Atoi(os.Args[3])
		num = int64(n)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	_, err = c.SayHello(ctx, &pb.Hello{Name: name, Msg: msg, Num: num})
	if err != nil {
		panic(err)
	}
	fmt.Printf("%d: %s Say %s\n", num, name, msg)

	go func() {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
		defer cancel()
		stream, err := c.SayBye(ctx, &pb.Null{})
		if err != nil {
			panic(err)
		}
		for {
			bye, err := stream.Recv()
			if err == io.EOF {
				break
			}
			if err != nil {
				panic(err)
			}
			fmt.Printf("Say Bye %v %v\n", bye.GetNum(), bye.GetMsg())
			break
		}
		wait <- 0
	}()

	<-wait
}

demo

测试结果如下:

client

[root@localhost client]## go run main.go
Say Bye ONE:Receive: Hello
120: Robyn Say Hello
[root@localhost client]## go run main.go Emily Hi 23
Say Bye TWO Receive: Hi
23: Emily Say Hi

server

[root@localhost server]## go run main.go
Server start ...
120: Robyn Say Hello
23: Emily Say Hi

java

首先介绍打包方式:一种是使用 maven,另一种是使用 ant。

maven 这部分根据官方例子摘抄

<properties>
	...
	<grpc.version>1.19.0</grpc.version>
</properties>

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>io.grpc</groupId>
			<artifactId>grpc-bom</artifactId>
			<version>${grpc.version}</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>

<dependencies>
	<dependency>
		<groupId>io.grpc</groupId>
		<artifactId>grpc-netty-shaded</artifactId>
		<scope>runtime</scope>
	</dependency>
	<dependency>
		<groupId>io.grpc</groupId>
		<artifactId>grpc-protobuf</artifactId>
	</dependency>
	<dependency>
		<groupId>io.grpc</groupId>
		<artifactId>grpc-stub</artifactId>
	</dependency>
</dependencies>

ant 就需要提前下载好 jar 包,将这些一起打包到你的包里:

<target name="client" depends="compile">
	<mkdir dir="${prepjar}" />
	<copy todir="${prepjar}">
		<fileset dir="${classes}"/>
	</copy>
	<jar jarfile="${clientjarfilename}" basedir="${prepjar}">
		<manifest>
			<attribute name="Main-Class" value="com.xxx.MainClass" />
		</manifest>
		<exclude name="**/xxx/**"/>
		<zipgroupfileset dir="${grpc}" includes="**/*.jar" /> <!-- 主要这句拷贝路径下所有 jar -->
	</jar>
	<delete dir="${prepjar}" />
</target>

server

服务器首先使用 protoc 工具生成两个文件:GreeterGrpc.java & HelloWorldProto.java

public class Server {
    private io.grpc.Server server;

    private void start() throws IOException {
        int port = 10086;
        server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();
        System.out.println("Server started, listening on 10086");
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                // Use stderr here since the logger may have been reset by its JVM shutdown
                // hook.
                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                Server.this.stop();
                System.err.println("*** server shut down");
            }
        });
    }

    private void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    static class GreeterImpl extends GreeterImplBase {
        private boolean sayHi = false;
        private Bye bye;

        public void sayHello(Hello req, StreamObserver<Null> responseObserver) {
            Null reply = Null.newBuilder().build();

            Number num = Number.ONE;
            if (req.getNum() % 2 == 1) {
                num = Number.TWO;
            }
            String name = req.getName();
            String msg = req.getMsg();
            System.out.printf("%d: %s Say %s\n", req.getNum(), name, msg);
            bye = Bye.newBuilder().setNum(num).setMsg("Get: " + msg).build();
            responseObserver.onNext(reply);
            responseObserver.onCompleted();
            sayHi = true;
        }

        public void sayBye(Null req, StreamObserver<Bye> responseObserver) {
            while (true) {
                if (sayHi) {
                    sayHi = false;
                    System.out.printf("%s Bye %s\n", bye.getNum(), bye.getMsg());
                    try {
                        responseObserver.onNext(bye);
                    } catch (StatusRuntimeException e) { // 是 grpc 出现错误时抛出异常
                        System.out.println(e.toString());
                        responseObserver.onError(e);     // 测试中如果没有 onError | onCompleted,则下一次连接会出现问题
                        break;
                    }
                } else {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        final Server server = new Server();
        server.start();
        server.blockUntilShutdown();
    }
}

client

public class Client {
    private final ManagedChannel channel;
    private final GreeterGrpc.GreeterBlockingStub blockingStub;

    private ByeListenThread byeListenThread;

    /** Construct client for accessing RouteGuide server at {@code host:port}. */
    public Client(String host, int port) {
        this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
    }

    /**
     * Construct client for accessing RouteGuide server using the existing channel.
     */
    public Client(ManagedChannelBuilder<?> channelBuilder) {
        channel = channelBuilder.build();
        blockingStub = GreeterGrpc.newBlockingStub(channel);

        byeListenThread = new ByeListenThread();
        byeListenThread.setName("Client_ByeListenThread");
        byeListenThread.setDaemon(true);
        byeListenThread.start();
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
        byeListenThread.cancel();
    }

    public void sayHello(String name, String msg, int num) {
        HelloWorldProto.Hello request = HelloWorldProto.Hello.newBuilder().setName(name).setMsg(msg).setNum(num)
                .build();
        blockingStub.sayHello(request);
        System.out.printf("%d: %s Say %s\n", num, name, msg);
    }

    private class ByeListenThread extends Thread {
        private volatile boolean isAlive = false;
        HelloWorldProto.Null request = HelloWorldProto.Null.newBuilder().build();

        public void run() {
            Iterator<HelloWorldProto.Bye> bye;
            while (isAlive) {
                bye = blockingStub.sayBye(request);
                while (bye.hasNext()) { // block
                    HelloWorldProto.Bye data = bye.next();
                    // if data have ptr part, hasXXX check
                    System.out.printf("Bye %s: %s\n", data.getNum(), data.getMsg());
                }
            }
        }

        public synchronized void start() {
            isAlive = true;
            super.start();
        }

        public synchronized void cancel() {
            isAlive = false;
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Client c = new Client("127.0.0.1", 10086);

        String name = "Java";
        if (args.length > 0) {
            name = args[0];
        }
        String msg = "hello";
        if (args.length > 1) {
            msg = args[1];
        }
        int num = 2046;
        if (args.length > 2) {
            num = Integer.parseInt(args[2]);
        }

        c.sayHello(name, msg, num);
        // wait
        System.out.println("wait ...");
        InputStreamReader is_reader = new InputStreamReader(System.in);
        String str = new BufferedReader(is_reader).readLine();
        c.shutdown();
    }
}

demo

和 golang 一样而且可以交叉测试,不过 go client & java server 会有超时错误,没有详细查看。