温馨提示:本文需要结合上一篇 gRPC 文章 一起食用,否则可能看不懂。
前面一篇文章松哥和大家聊了 gRPC 的基本用法,今天我们再来稍微深入一点点,来看下 gRPC 中四种不同的通信模式。
gRPC 中四种不同的通信模式分别是:
一元 RPC
服务端流 RPC
客户端流 RPC
双向流 RPC
接下来松哥就通过四个完整的案例,来分别和向伙伴们演示这四种不同的通信模式。
1. 准备工作 关于 gRPC 的基础知识我们就不啰嗦了,咱们直接来看我今天的 proto 文件,如下:
这次我新建了一个名为 book.proto 的文件,这里主要定义了一些图书相关的方法,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 syntax = "proto3" ; option java_multiple_files = true ;option java_package = "org.javaboy.grpc.demo" ;option java_outer_classname = "BookServiceProto" ;import "google/protobuf/wrappers.proto" ;package book;service BookService { rpc addBook(Book) returns (google.protobuf.StringValue) ; rpc getBook(google.protobuf.StringValue) returns (Book) ; rpc searchBooks(google.protobuf.StringValue) returns (stream Book) ; rpc updateBooks(stream Book) returns (google.protobuf.StringValue) ; rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet) ; } message Book { string id = 1 ; repeated string tags = 2 ; string name = 3 ; float price = 4 ; string author = 5 ; } message BookSet { string id = 1 ; repeated Book bookList = 3 ; }
这个文件中,有一些内容我们在上篇文章 中都讲过了,讲过的我就不再重复了,我说一些上篇文章 没有涉及到的东西:
由于我们在这个文件中,引用了 Google 提供的 StringValue(google.protobuf.StringValue),所以这个文件上面我们首先用 import 导入相关的文件,导入之后,才可以使用。
在方法参数和返回值中出现的 stream,就表示这个方法的参数或者返回值是流的形式(其实就是数据可以多次传输)。
message 中出现了一个上篇文章 没有的关键字 repeated,这个表示这个字段可以重复,可以简单理解为这就是我们 Java 中的数组。
好了,和上篇文章 相比,本文主要就是这几个地方不一样。
proto 文件写好之后,按照上篇文章 介绍的方法进行编译,生成对应的代码,这里就不再重复了。
2. 一元 RPC 一元 RPC 是一种比较简单的 RPC 模式,其实说白了我们上篇文章 和大家介绍的就是一种一元 RPC,也就是客户端发起一个请求,服务端给出一个响应,然后请求结束。
上面我们定义的五个方法中,addBook 和 getBook 都算是一种一元 RPC。
2.1 addBook 先来看 addBook 方法,这个方法的逻辑很简单,我们提前在服务端准备一个 Map 用来保存 Book,addBook 调用的时候,就把 book 对象存入到 Map 中,并且将 book 的 ID 返回,大家就这样一件事,来看看服务端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class BookServiceImpl extends BookServiceGrpc .BookServiceImplBase { private Map<String, Book> bookMap = new HashMap <>(); public BookServiceImpl () { Book b1 = Book.newBuilder().setId("1" ).setName("三国演义" ).setAuthor("罗贯中" ).setPrice(30 ).addTags("明清小说" ).addTags("通俗小说" ).build(); Book b2 = Book.newBuilder().setId("2" ).setName("西游记" ).setAuthor("吴承恩" ).setPrice(40 ).addTags("志怪小说" ).addTags("通俗小说" ).build(); Book b3 = Book.newBuilder().setId("3" ).setName("水浒传" ).setAuthor("施耐庵" ).setPrice(50 ).addTags("明清小说" ).addTags("通俗小说" ).build(); bookMap.put("1" , b1); bookMap.put("2" , b2); bookMap.put("3" , b3); } @Override public void addBook (Book request, StreamObserver<StringValue> responseObserver) { bookMap.put(request.getId(), request); responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build()); responseObserver.onCompleted(); } }
看过上篇文章 的小伙伴,我觉得这段代码应该很好理解。
客户端调用方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class BookServiceClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .build(); BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel); addBook(stub); } private static void addBook (BookServiceGrpc.BookServiceStub stub) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (1 ); stub.addBook(Book.newBuilder().setPrice(99 ).setId("100" ).setName("java" ).setAuthor("javaboy" ).build(), new StreamObserver <StringValue>() { @Override public void onNext (StringValue stringValue) { System.out.println("stringValue.getValue() = " + stringValue.getValue()); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { countDownLatch.countDown(); System.out.println("添加完毕" ); } }); countDownLatch.await(); } }
这里我使用了 CountDownLatch 来实现线程等待,等服务端给出响应之后,客户端再结束。这里在回调的 onNext 方法中,我们就可以拿到服务端的返回值。
2.2 getBook getBook 跟上面的 addBook 类似,先来看服务端代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class BookServiceImpl extends BookServiceGrpc .BookServiceImplBase { private Map<String, Book> bookMap = new HashMap <>(); public BookServiceImpl () { Book b1 = Book.newBuilder().setId("1" ).setName("三国演义" ).setAuthor("罗贯中" ).setPrice(30 ).addTags("明清小说" ).addTags("通俗小说" ).build(); Book b2 = Book.newBuilder().setId("2" ).setName("西游记" ).setAuthor("吴承恩" ).setPrice(40 ).addTags("志怪小说" ).addTags("通俗小说" ).build(); Book b3 = Book.newBuilder().setId("3" ).setName("水浒传" ).setAuthor("施耐庵" ).setPrice(50 ).addTags("明清小说" ).addTags("通俗小说" ).build(); bookMap.put("1" , b1); bookMap.put("2" , b2); bookMap.put("3" , b3); } @Override public void getBook (StringValue request, StreamObserver<Book> responseObserver) { String id = request.getValue(); Book book = bookMap.get(id); if (book != null ) { responseObserver.onNext(book); responseObserver.onCompleted(); } else { responseObserver.onCompleted(); } } }
这个 getBook 就是根据客户端传来的 id,从 Map 中查询到一个 Book 并返回。
客户端调用代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class BookServiceClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .build(); BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel); getBook(stub); } private static void getBook (BookServiceGrpc.BookServiceStub stub) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (1 ); stub.getBook(StringValue.newBuilder().setValue("2" ).build(), new StreamObserver <Book>() { @Override public void onNext (Book book) { System.out.println("book = " + book); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { countDownLatch.countDown(); System.out.println("查询完毕" ); } }); countDownLatch.await(); } }
小伙伴们大概也能看出来,addBook 和 getBook 基本上操作套路是一模一样的。
3. 服务端流 RPC 前面的一元 RPC,客户端发起一个请求,服务端给出一个响应,请求就结束了。服务端流则是客户端发起一个请求,服务端给一个响应序列,这个响应序列组成一个流。
上面我们给出的 searchBook 就是这样一个例子,searchBook 是传递图书的 tags 参数,然后在服务端查询哪些书的 tags 满足条件,将满足条件的书全部都返回去。
我们来看下服务端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class BookServiceImpl extends BookServiceGrpc .BookServiceImplBase { private Map<String, Book> bookMap = new HashMap <>(); public BookServiceImpl () { Book b1 = Book.newBuilder().setId("1" ).setName("三国演义" ).setAuthor("罗贯中" ).setPrice(30 ).addTags("明清小说" ).addTags("通俗小说" ).build(); Book b2 = Book.newBuilder().setId("2" ).setName("西游记" ).setAuthor("吴承恩" ).setPrice(40 ).addTags("志怪小说" ).addTags("通俗小说" ).build(); Book b3 = Book.newBuilder().setId("3" ).setName("水浒传" ).setAuthor("施耐庵" ).setPrice(50 ).addTags("明清小说" ).addTags("通俗小说" ).build(); bookMap.put("1" , b1); bookMap.put("2" , b2); bookMap.put("3" , b3); } @Override public void searchBooks (StringValue request, StreamObserver<Book> responseObserver) { Set<String> keySet = bookMap.keySet(); String tags = request.getValue(); for (String key : keySet) { Book book = bookMap.get(key); int tagsCount = book.getTagsCount(); for (int i = 0 ; i < tagsCount; i++) { String t = book.getTags(i); if (t.equals(tags)) { responseObserver.onNext(book); break ; } } } responseObserver.onCompleted(); } }
小伙伴们看下,这段 Java 代码应该很好理解:
首先从 request 中提取客户端传来的 tags 参数。
遍历 bookMap,查看每一本书的 tags 是否等于客户端传来的 tags,如果相等,说明添加匹配,则通过 responseObserver.onNext(book); 将这本书写回到客户端。
等所有操作都完成后,执行 responseObserver.onCompleted();,表示服务端的响应序列结束了,这样客户端也就知道请求结束了。
我们来看看客户端的代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class BookServiceClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .build(); BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel); searchBook(stub); } private static void searchBook (BookServiceGrpc.BookServiceStub stub) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (1 ); stub.searchBooks(StringValue.newBuilder().setValue("明清小说" ).build(), new StreamObserver <Book>() { @Override public void onNext (Book book) { System.out.println(book); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { countDownLatch.countDown(); System.out.println("查询完毕!" ); } }); countDownLatch.await(); } }
客户端的代码好理解,搜索的关键字是 明清小说,每当服务端返回一次数据的时候,客户端回调的 onNext 方法就会被触发一次,当服务端之行了 responseObserver.onCompleted(); 之后,客户端的 onCompleted 方法也会被触发。
这个就是服务端流,客户端发起一个请求,服务端通过 onNext 可以多次写回数据。
4. 客户端流 RPC 客户端流则是客户端发起多个请求,服务端只给出一个响应。
上面的 updateBooks 就是一个客户端流的案例,客户端想要修改图书,可以发起多个请求修改多本书,服务端则收集多次修改的结果,将之汇总然后一次性返回给客户端。
我们先来看看服务端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class BookServiceImpl extends BookServiceGrpc .BookServiceImplBase { private Map<String, Book> bookMap = new HashMap <>(); public BookServiceImpl () { Book b1 = Book.newBuilder().setId("1" ).setName("三国演义" ).setAuthor("罗贯中" ).setPrice(30 ).addTags("明清小说" ).addTags("通俗小说" ).build(); Book b2 = Book.newBuilder().setId("2" ).setName("西游记" ).setAuthor("吴承恩" ).setPrice(40 ).addTags("志怪小说" ).addTags("通俗小说" ).build(); Book b3 = Book.newBuilder().setId("3" ).setName("水浒传" ).setAuthor("施耐庵" ).setPrice(50 ).addTags("明清小说" ).addTags("通俗小说" ).build(); bookMap.put("1" , b1); bookMap.put("2" , b2); bookMap.put("3" , b3); } @Override public StreamObserver<Book> updateBooks (StreamObserver<StringValue> responseObserver) { StringBuilder sb = new StringBuilder ("更新的图书 ID 为:" ); return new StreamObserver <Book>() { @Override public void onNext (Book book) { bookMap.put(book.getId(), book); sb.append(book.getId()) .append("," ); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { responseObserver.onNext(StringValue.newBuilder().setValue(sb.toString()).build()); responseObserver.onCompleted(); } }; } }
客户端每发送一本书来,就会触发服务端的 onNext 方法,然后我们在这方法中进行图书的更新操作,并记录更新结果。最后,我们在 onCompleted 方法中,将更新结果汇总返回给客户端,基本上就是这样一个流程。
我们再来看看客户端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class BookServiceClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .build(); BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel); updateBook(stub); } private static void updateBook (BookServiceGrpc.BookServiceStub stub) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (1 ); StreamObserver<Book> request = stub.updateBooks(new StreamObserver <StringValue>() { @Override public void onNext (StringValue stringValue) { System.out.println("stringValue.getValue() = " + stringValue.getValue()); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { System.out.println("更新完毕" ); countDownLatch.countDown(); } }); request.onNext(Book.newBuilder().setId("1" ).setName("a" ).setAuthor("b" ).build()); request.onNext(Book.newBuilder().setId("2" ).setName("c" ).setAuthor("d" ).build()); request.onCompleted(); countDownLatch.await(); } }
在客户端这块,updateBooks 方法会返回一个 StreamObserver 对象,调用该对象的 onNext 方法就是给服务端传递数据了,可以传递多个数据,调用该对象的 onCompleted 方法就是告诉服务端数据传递结束了,此时也会触发服务端的 onCompleted 方法,服务端的 onCompleted 方法执行之后,进而触发了客户端的 onCompleted 方法。
5. 双向流 RPC 双向流其实就是 3、4 小节的合体。即客户端多次发送数据,服务端也多次响应数据。
我们先来看下服务端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class BookServiceImpl extends BookServiceGrpc .BookServiceImplBase { private Map<String, Book> bookMap = new HashMap <>(); private List<Book> books = new ArrayList <>(); public BookServiceImpl () { Book b1 = Book.newBuilder().setId("1" ).setName("三国演义" ).setAuthor("罗贯中" ).setPrice(30 ).addTags("明清小说" ).addTags("通俗小说" ).build(); Book b2 = Book.newBuilder().setId("2" ).setName("西游记" ).setAuthor("吴承恩" ).setPrice(40 ).addTags("志怪小说" ).addTags("通俗小说" ).build(); Book b3 = Book.newBuilder().setId("3" ).setName("水浒传" ).setAuthor("施耐庵" ).setPrice(50 ).addTags("明清小说" ).addTags("通俗小说" ).build(); bookMap.put("1" , b1); bookMap.put("2" , b2); bookMap.put("3" , b3); } @Override public StreamObserver<StringValue> processBooks (StreamObserver<BookSet> responseObserver) { return new StreamObserver <StringValue>() { @Override public void onNext (StringValue stringValue) { Book b = Book.newBuilder().setId(stringValue.getValue()).build(); books.add(b); if (books.size() == 3 ) { BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build(); responseObserver.onNext(bookSet); books.clear(); } } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build(); responseObserver.onNext(bookSet); books.clear(); responseObserver.onCompleted(); } }; } }
这段代码没有实际意义,单纯为了给小伙伴们演示双向流,我的操作逻辑是客户端传递多个 ID 到服务端,然后服务端根据这些 ID 构建对应的 Book 对象,然后三个三个一组,再返回给客户端。客户端每次发送一个请求,都会触发服务端的 onNext 方法,我们在这个方法中对请求分组返回。最后如果还有剩余的请求,我们在 onCompleted() 方法中返回。
再来看看客户端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class BookServiceClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .build(); BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel); processBook(stub); } private static void processBook (BookServiceGrpc.BookServiceStub stub) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (1 ); StreamObserver<StringValue> request = stub.processBooks(new StreamObserver <BookSet>() { @Override public void onNext (BookSet bookSet) { System.out.println("bookSet = " + bookSet); System.out.println("=============" ); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { System.out.println("处理完毕!" ); countDownLatch.countDown(); } }); request.onNext(StringValue.newBuilder().setValue("a" ).build()); request.onNext(StringValue.newBuilder().setValue("b" ).build()); request.onNext(StringValue.newBuilder().setValue("c" ).build()); request.onNext(StringValue.newBuilder().setValue("d" ).build()); request.onCompleted(); countDownLatch.await(); } }
这个客户端的代码跟第四小节一模一样,不再赘述了。
好啦,这就是松哥和小伙伴们介绍的 gRPC 的四种不同的通信模式,文章中只给出了一些关键代码,如果小伙伴们没看明白,建议结合上篇文章 一起阅读就懂啦~