MyEnigma

とある自律移動システムエンジニアのブログです。#Robotics #Programing #C++ #Python #MATLAB #Vim #Mathematics #Book #Movie #Traveling #Mac #iPhone

ロボットエンジニアのためのgRPC入門


WEB+DB PRESS Vol.110

目次

はじめに

先日、データのシリアライズフォーマットである

Protocol buffersを紹介しましたが、

myenigma.hatenablog.com

このProtocol buffersでシリアライズされたデータは、

以前の記事のようにバイナリのファイルとして保存することもできますが、

gRPCと言われる通信方法で、

複数のプロセス間を通信させることが多いようです。

 

今回の記事では、このgRPCの概要と簡単なサンプルコードを

紹介したいと思います。

gRPCとは?

gRPCは2015年にGoogleが発表した、

サーバとクライアント間の通信プロトコルです。

gRPC

gRPCって何? - Qiita

今から学ぶgRPCの基礎 - Qiita

 

元々は、Googleの中のStubbyというプロジェクトで開発されていた、

Google内部のプロセス間通信ツールでしたが、

BSDライセンスのOSSとして公開されました。

同社ではgRPCを利用して毎秒100億単位のリクエストを処理しているとのことです。

 

RPCとは、Remote Procedure Callという

プロセス(アドレス)外に定義された関数(Procedure)を遠隔から呼び出す仕組みのことで、

ja.wikipedia.org

gPRCでは、サーバ側に定義された関数に、クライアントが引数を送信し、

その結果をサーバが返す形でRPCを実現できます。

 

WebのREST APIの代表例として、Http + JSONがありますが、

JSONがProtocol buffersに対応すると考えると、

Http が gRPCに対応すると考えることもできます。

(RPCなので若干レイヤーが異なりますが。。)

 

gPRCの特徴

gRPCは、他の通信方法と比べて、

下記のような特徴があります。

Protocol buffersでRPCを定義することで、様々な言語のサーバ・クライアント実装が自動生成可能

Protocol buffersは下記のブログの記事の通り、

データのシリアライズフォーマットですが、

myenigma.hatenablog.com

元々gRPCと組み合わせて使用されることを想定しているため、

Protocol buffersのprotoファイルに、gRPCの通信定義を書くことで、

構造化されたデータのコードだけでなく、

サーバとクライアントのコードを自動生成することができます。

 

このサーバとクライアントの実装コードの生成は、

protocコンパイラのプラグインとして実装されることが多いようです。

 

現状、Googleの公式gPRCプラグインとしては、

  • Objective-C

  • C++

  • Basic

  • Android

  • C#

  • C

  • Dart

  • Go

  • Java

  • Node

  • PHP

  • Python

  • Ruby

  • Web

などが対応しています。

grpc.io

 

通信プロトコルにHTTP/2を使うことで高速化、双方向通信、streaming等を実現

gRPCでは、通信にHTTP/2を使うことで、

通常のHTTPベースの通信と比べて、

高速な通信が可能です。

 

また、通常のリクエスト・レスポンス型のRPCである

  • 1 Unary RPC

だけでなく、複数のレスポンスを受け取る

  • 2 Server streaming

複数のリクエストを送信する

  • 3 Client streaming

リクエストもレスポンスも複数返す

  • 4 Bidirectional streaming RPC

の計4つのRPCを利用することが可能です。

 

通信のデータ量が小さく、データ解釈時の計算時間も短い

下記の記事で紹介したProtocol buffersの特徴は、

myenigma.hatenablog.com

gRPCで通信するときにも、通信のデータ量を小さくし、

データ解釈時の計算時間も短くすることを実現しています。

 

ProtoファイルでのRPCの宣言

各4種類のRPCをProtoファイルで宣言する方法は

下記の通りです。

基本的には、RPCで呼ぶ関数の名前(下記ではSayHello)と、

入力の型(下記ではHelloRequest), 入力の型(下記ではHello Response)

を指定し、複数回データを送受信するときには、

入力と出力の型の前にstreamをつけるだけです。

入力と出力の型もprotoファイルに定義します。

Unary RPC

rpc SayHello(HelloRequest) returns (HelloResponse){}

Server streaming RPC

rpc SayHello(HelloRequest) returns (stream HelloResponse){}

Client streaming RPC

rpc SayHello(stream HelloRequest) returns (HelloResponse){}

Bidirectional streaming RPC

rpc SayHello(stream HelloRequest) returns (stream HelloResponse){}

 

gRPCのサンプルコード

下記は代表的な言語における

gRPC通信のサンプルコードです。

下記のコードはすべてこちらのリポジトリでも公開しています。

github.com

 

Python

pythonで、gRPCのサーバ、クライアントコードを生成するには、

下記のコマンドでgrpcip-toolsというライブラリをインストールする必要があります。

$ pip install grpcio-tools

protocコマンドのプラグインとすることもできますが、

今回はこのモジュールを下記のように直接利用します。

$ python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./addressbook.proto

このコマンドを実行することで、

データ格納用コードのhoge.pb2.pyというファイルと、

grpc通信用コードのhoge.pb2.grpc.pyというファイルが

生成されるはずです。

 

下記の4つのRPCを使った例では、

すべて共通の下記のprotoファイルを利用しました。

syntax = "proto3";

package tutorial;

service RequestAddressBookWithUnaryRPC {
    rpc Request (AddressBookRequest) returns (AddressBook) {
    }
}

service RequestAddressBookWithServerStreamingRPC {
    rpc Request (AddressBookRequest) returns (stream AddressBook) {
    }
}

service RequestAddressBookWithClientStreamingRPC {
    rpc Request (stream AddressBookRequest) returns (AddressBook) {
    }
}

service RequestAddressBookWithBidirectionalStreamingRPC {
    rpc Request (stream AddressBookRequest) returns (AddressBook) {
    }
}


message AddressBookRequest {
    int64 person_number = 1;
}


message Person {
    string name = 1;
    int32 id = 2;
    string email = 3;

    enum PhoneType {
        MOBILE = 0;
        HOME = 1;
        WORK = 2;
    }

    message PhoneNumber {
        string number = 1;
        PhoneType type = 2;
    }

    repeated PhoneNumber phones = 4;
}

message AddressBook {
    repeated Person people = 1;
}

Unary RPC

下記は簡単なUnary RPCの実装例です。

(Clientのperson_numberのデータなどは送っていますが、

特に使っていないですが。。)  

サーバの実装は下記の通りです。

"""
gRPC server for unary PRC sample in Python
author: Atsushi Sakai(@Atsushi_twi)
"""

import time
from concurrent import futures

import addressbook_pb2
import addressbook_pb2_grpc
import grpc


class AddressBookResponder(addressbook_pb2_grpc.RequestAddressBookServicer):

    def Request(self, request, context):
        print(request)
        print(context)

        address_book = addressbook_pb2.AddressBook()

        person1 = address_book.people.add()
        person1.id = 1234
        person1.name = "John Doe"
        person1.email = "jdoe@example.com"

        phone = person1.phones.add()
        phone.number = "555-4321"
        phone.type = addressbook_pb2.Person.HOME

        person2 = address_book.people.add()
        person2.id = 4321
        person2.name = "Tom Ranger"
        person2.email = "tranger@example.com"
        phone = person2.phones.add()
        phone.number = "555-4322"
        phone.type = addressbook_pb2.Person.WORK

        print(address_book)  # Human readable print

        return address_book


def main():
    print("start!!")

    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    addressbook_pb2_grpc.add_RequestAddressBookServicer_to_server(
        AddressBookResponder(),
        server)
    server.add_insecure_port('[::]:50051')
    server.start()

    _ONE_DAY_IN_SECONDS = 60 * 60 * 24
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

    print("done!!")


if __name__ == '__main__':
    main()

クライアントの実装は下記の通りです。

"""
gRPC server for unary PRC sample in Python
author: Atsushi Sakai
"""


import addressbook_pb2
import addressbook_pb2_grpc
import grpc


def main():
    print("start!!")

    with grpc.insecure_channel('localhost:50051') as channel:
        stub = addressbook_pb2_grpc.RequestAddressBookStub(channel)
        response = stub.Request(
            addressbook_pb2.AddressBookRequest(person_number=2))
        print("response: ", response)

    print("done!!")


if __name__ == '__main__':
    main()

Server streaming RPC

下記のコードはServer streaming RPCの実装例です。

今回はサーバから二回に分けて、addressbookのデータを送信します。

クライアント側のコードでは、レスポンスのデータを受け取って、

forループでデータを表示していますが、

このループはrpcのメッセージを受信し次第、実行されるので

今回の例のように、Serverのレスポンスのデータの間に

時間がかかっても、クライアント側はすべてのデータを待たずに、

逐次的に処理をすることができます。

 

サーバの側で逐次的にデータを送信するのは、

yield文で実現できます。

 

サーバの実装は下記の通りです。

"""
gRPC server for Server streaming PRC sample in Python
author: Atsushi Sakai(@Atsushi_twi)
"""

import time
from concurrent import futures

import addressbook_pb2
import addressbook_pb2_grpc
import grpc


class AddressBookResponder(addressbook_pb2_grpc.RequestAddressBookServicer):

    def Request(self, request, context):
        print(request)
        print(context)

        address_book = addressbook_pb2.AddressBook()

        person1 = address_book.people.add()
        person1.id = 1234
        person1.name = "John Doe"
        person1.email = "jdoe@example.com"

        phone = person1.phones.add()
        phone.number = "555-4321"
        phone.type = addressbook_pb2.Person.HOME

        print(address_book)  # Human readable print

        yield address_book  # send first message

        time.sleep(5)  # wait 5 sec

        address_book = addressbook_pb2.AddressBook()

        person2 = address_book.people.add()
        person2.id = 4321
        person2.name = "Tom Ranger"
        person2.email = "tranger@example.com"
        phone = person2.phones.add()
        phone.number = "555-4322"
        phone.type = addressbook_pb2.Person.WORK

        print(address_book)  # Human readable print

        yield address_book  # send second message


def main():
    print("start!!")

    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    addressbook_pb2_grpc.add_RequestAddressBookServicer_to_server(
        AddressBookResponder(),
        server)
    server.add_insecure_port('[::]:50051')
    server.start()

    _ONE_DAY_IN_SECONDS = 60 * 60 * 24
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

    print("done!!")


if __name__ == '__main__':
    main()

クライアントの実装は下記の通りです。

"""
gRPC client for Server streaming PRC sample in Python
author: Atsushi Sakai
"""

import addressbook_pb2
import addressbook_pb2_grpc
import grpc


def main():
    print("start!!")

    with grpc.insecure_channel('localhost:50051') as channel:
        stub = addressbook_pb2_grpc.RequestAddressBookStub(channel)
        responses = stub.Request(
            addressbook_pb2.AddressBookRequest(person_number=2))

        for r in responses:
            print("response: ", r)

    print("done!!")


if __name__ == '__main__':
    main()

Client streaming RPC

続いて、Client streaming RPCは下記のように実装します。

Clientのリクエストを、listに格納し、

そのイテレータを送信してリクエストする形になります。

サーバ側でもそれぞれのリクエストにアクセスできるのがわかります。

 

下記はサーバの実装です

"""
gRPC server for Client streaming PRC sample in Python
author: Atsushi Sakai(@Atsushi_twi)
"""

import time
from concurrent import futures

import addressbook_pb2
import addressbook_pb2_grpc
import grpc


class AddressBookResponder(addressbook_pb2_grpc.RequestAddressBookWithClientStreamingRPCServicer):

    def Request(self, request, context):
        for r in request:
            print(r)

        address_book = addressbook_pb2.AddressBook()

        person1 = address_book.people.add()
        person1.id = 1234
        person1.name = "John Doe"
        person1.email = "jdoe@example.com"

        phone = person1.phones.add()
        phone.number = "555-4321"
        phone.type = addressbook_pb2.Person.HOME

        address_book = addressbook_pb2.AddressBook()

        person2 = address_book.people.add()
        person2.id = 4321
        person2.name = "Tom Ranger"
        person2.email = "tranger@example.com"
        phone = person2.phones.add()
        phone.number = "555-4322"
        phone.type = addressbook_pb2.Person.WORK

        print(address_book)  # Human readable print

        return address_book  # send second message


def main():
    print("start!!")

    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    addressbook_pb2_grpc.add_RequestAddressBookWithClientStreamingRPCServicer_to_server(
        AddressBookResponder(),
        server)
    server.add_insecure_port('[::]:50051')
    server.start()

    _ONE_DAY_IN_SECONDS = 60 * 60 * 24
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

    print("done!!")


if __name__ == '__main__':
    main()

 

下記はクライアントの実装です

"""
gRPC client for client streaming PRC sample in Python
author: Atsushi Sakai
"""

import addressbook_pb2
import addressbook_pb2_grpc
import grpc


def main():
    print("start!!")

    with grpc.insecure_channel('localhost:50051') as channel:
        stub = addressbook_pb2_grpc.RequestAddressBookWithClientStreamingRPCStub(channel)
        messages = [addressbook_pb2.AddressBookRequest(person_number=2),
                    addressbook_pb2.AddressBookRequest(person_number=3)]
        responses = stub.Request(iter(messages))

    print("done!!")


if __name__ == '__main__':
    main()

Bidirectional streaming RPC

基本的には、

前述のClient stream RPCとServer stream RPCを組み合わせて、

Bidirectional streaming RPCを実現できます。

 

下記はサーバの実装です

"""
gRPC server for bidirectional streaming PRC sample in Python
author: Atsushi Sakai(@Atsushi_twi)
"""

import time
from concurrent import futures

import grpc

import addressbook_pb2
import addressbook_pb2_grpc


class AddressBookResponder(addressbook_pb2_grpc.RequestAddressBookWithBidirectionalStreamingRPCServicer):

    def Request(self, request, context):
        for r in request:
            print(r)

        address_book = addressbook_pb2.AddressBook()

        person1 = address_book.people.add()
        person1.id = 1234
        person1.name = "John Doe"
        person1.email = "jdoe@example.com"

        phone = person1.phones.add()
        phone.number = "555-4321"
        phone.type = addressbook_pb2.Person.HOME

        print(address_book)  # Human readable print

        yield address_book  # send first message

        time.sleep(5)  # wait 5 sec

        address_book = addressbook_pb2.AddressBook()

        person2 = address_book.people.add()
        person2.id = 4321
        person2.name = "Tom Ranger"
        person2.email = "tranger@example.com"
        phone = person2.phones.add()
        phone.number = "555-4322"
        phone.type = addressbook_pb2.Person.WORK

        print(address_book)  # Human readable print

        yield address_book  # send second message


def main():
    print("start!!")

    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    addressbook_pb2_grpc.add_RequestAddressBookWithBidirectionalStreamingRPCServicer_to_server(
        AddressBookResponder(),
        server)
    server.add_insecure_port('[::]:50051')
    server.start()

    _ONE_DAY_IN_SECONDS = 60 * 60 * 24
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

    print("done!!")


if __name__ == '__main__':
    main()

 

下記はクライアントの実装です

"""
gRPC client for bidirectional streaming PRC sample in Python
author: Atsushi Sakai
"""

import grpc

import addressbook_pb2
import addressbook_pb2_grpc


def main():
    print("start!!")

    with grpc.insecure_channel('localhost:50051') as channel:
        stub = addressbook_pb2_grpc.RequestAddressBookWithBidirectionalStreamingRPCStub(channel)
        messages = [addressbook_pb2.AddressBookRequest(person_number=2),
                    addressbook_pb2.AddressBookRequest(person_number=3)]
        responses = stub.Request(iter(messages))

        for r in responses:
            print("response: ", r)

    print("done!!")


if __name__ == '__main__':
    main()

gPRCのgの意味

下記のように、gRPCのバージョンによって、

異なるらしいです。

grpc/g_stands_for.md at master · grpc/grpc

参考資料

myenigma.hatenablog.com

myenigma.hatenablog.com

myenigma.hatenablog.com

myenigma.hatenablog.com

myenigma.hatenablog.com


WEB+DB PRESS Vol.110

MyEnigma Supporters

もしこの記事が参考になり、

ブログをサポートしたいと思われた方は、

こちらからよろしくお願いします。

myenigma.hatenablog.com