// Call represents an active RPC. type Call struct { Seq uint64 ServiceMethod string// format "<service>.<method>" Args interface{} // arguments to the function Reply interface{} // reply from the function Error error // if error occurs, it will be set Done chan *Call // Strobes when call is complete. }
// Client represents an RPC Client. // There may be multiple outstanding Calls associated // with a single Client, and a Client may be used by // multiple goroutines simultaneously. type Client struct { cc codec.Codec opt *Option sending sync.Mutex // protect following header codec.Header mu sync.Mutex // protect following seq uint64 pending map[uint64]*Call closing bool// user has called Close shutdown bool// server has told us to stop }
var _ io.Closer = (*Client)(nil)
var ErrShutdown = errors.New("connection is shut down")
// Close the connection func(client *Client)Close()error { client.mu.Lock() defer client.mu.Unlock() if client.closing { return ErrShutdown } client.closing = true return client.cc.Close() }
// IsAvailable return true if the client does work func(client *Client)IsAvailable()bool { client.mu.Lock() defer client.mu.Unlock() return !client.shutdown && !client.closing }
// encode and send the request if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) // call may be nil, it usually means that Write partially failed, // client has received the response and handled if call != nil { call.Error = err call.done() } } }
// Go invokes the function asynchronously. // It returns the Call structure representing the invocation. func(client *Client)Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call { if done == nil { done = make(chan *Call, 10) } elseifcap(done) == 0 { log.Panic("rpc client: done channel is unbuffered") } call := &Call{ ServiceMethod: serviceMethod, Args: args, Reply: reply, Done: done, } client.send(call) return call }
// Call invokes the named function, waits for it to complete, // and returns its error status. func(client *Client)Call(serviceMethod string, args, reply interface{})error { call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done return call.Error }
Go 和 Call 是客户端暴露给用户的两个 RPC 服务调用接口,Go 是一个异步接口,返回 call 实例。
Call 是对 Go 的封装,阻塞 call.Done,等待响应返回,是一个同步接口。
至此,一个支持异步和并发的 GeeRPC 客户端已经完成。
Demo
第一天 GeeRPC 只实现了服务端,因此我们在 main 函数中手动模拟了整个通信过程,今天我们就将 main 函数中通信部分替换为今天的客户端吧。