ByteStream不由Remote APIs所定义,而是归于Google APIs的范畴。
ByteStream API允许客户端从资源中读写字节流。资源有名称,这些名称在下面的API调用中提供,以标识正在读取或写入的资源。
ByteStream API的所有实现导出这里定义的接口:
- Read() :读取资源的内容。
- Write() :写入资源的内容。客户端可以使用相同的资源多次调用 Write() ,并且可以通过调用QueryWriteStatus() 来检查写的状态。
message
读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| message ReadRequest { string resource_name = 1; int64 read_offset = 2; int64 read_limit = 3; }
|
1 2 3 4 5 6
| message ReadResponse { bytes data = 10; }
|
写入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| message WriteRequest { string resource_name = 1; int64 write_offset = 2; bool finish_write = 3; bytes data = 10; }
|
1 2 3 4 5
| message WriteResponse { int64 committed_size = 1; }
|
service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| service ByteStream { rpc Read(ReadRequest) returns (stream ReadResponse);
rpc Write(stream WriteRequest) returns (WriteResponse);
rpc QueryWriteStatus(QueryWriteStatusRequest) returns (QueryWriteStatusResponse); }
|
go 实现
在bytestream.pb.go
的实现中,ByteStream
对应这样一个接口(需要服务端实现):
1 2 3 4 5
| type ByteStreamServer interface { Read(*ReadRequest, ByteStream_ReadServer) error Write(ByteStream_WriteServer) error QueryWriteStatus(context.Context, *QueryWriteStatusRequest) (*QueryWriteStatusResponse, error) }
|
结构体的三个函数均被Handler封装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func _ByteStream_Read_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(ReadRequest) if err := stream.RecvMsg(m); err != nil { return err } return srv.(ByteStreamServer).Read(m, &byteStreamReadServer{stream}) }
func _ByteStream_Write_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(ByteStreamServer).Write(&byteStreamWriteServer{stream}) }
func _ByteStream_QueryWriteStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(QueryWriteStatusRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(ByteStreamServer).QueryWriteStatus(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/google.bytestream.ByteStream/QueryWriteStatus", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ByteStreamServer).QueryWriteStatus(ctx, req.(*QueryWriteStatusRequest)) } return interceptor(ctx, in, info, handler) }
|
这三个Handler被注册到grpc.ServiceDesc
结构体上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| var _ByteStream_serviceDesc = grpc.ServiceDesc{ ServiceName: "google.bytestream.ByteStream", HandlerType: (*ByteStreamServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "QueryWriteStatus", Handler: _ByteStream_QueryWriteStatus_Handler, }, }, Streams: []grpc.StreamDesc{ { StreamName: "Read", Handler: _ByteStream_Read_Handler, ServerStreams: true, }, { StreamName: "Write", Handler: _ByteStream_Write_Handler, ClientStreams: true, }, }, Metadata: "google/bytestream/bytestream.proto", }
|
最后,服务端只需调用RegisterByteStreamServer
即可将ByteStream注册到持有的grpc.Server上。
1 2 3
| func RegisterByteStreamServer(s *grpc.Server, srv ByteStreamServer) { s.RegisterService(&_ByteStream_serviceDesc, srv) }
|