type Option struct { MagicNumber int// MagicNumber marks this's a geerpc request CodecType codec.Type // client may choose different Codec to encode body }
var DefaultOption = &Option{ MagicNumber: MagicNumber, CodecType: codec.GobType, }
// Server represents an RPC Server. type Server struct{}
// NewServer returns a new Server. funcNewServer() *Server { return &Server{} }
// DefaultServer is the default instance of *Server. var DefaultServer = NewServer()
// Accept accepts connections on the listener and serves requests // for each incoming connection. func(server *Server)Accept(lis net.Listener) { for { conn, err := lis.Accept() if err != nil { log.Println("rpc server: accept error:", err) return } go server.ServeConn(conn) } }
// Accept accepts connections on the listener and serves requests // for each incoming connection. funcAccept(lis net.Listener) { DefaultServer.Accept(lis) }
// ServeConn runs the server on a single connection. // ServeConn blocks, serving the connection until the client hangs up. func(server *Server)ServeConn(conn io.ReadWriteCloser) { deferfunc() { _ = conn.Close() }() var opt Option if err := json.NewDecoder(conn).Decode(&opt); err != nil { log.Println("rpc server: options error: ", err) return } if opt.MagicNumber != MagicNumber { log.Printf("rpc server: invalid magic number %x", opt.MagicNumber) return } f := codec.NewCodecFuncMap[opt.CodecType] if f == nil { log.Printf("rpc server: invalid codec type %s", opt.CodecType) return } server.serveCodec(f(conn)) }
// invalidRequest is a placeholder for response argv when error occurs var invalidRequest = struct{}{}
func(server *Server)serveCodec(cc codec.Codec) { sending := new(sync.Mutex) // make sure to send a complete response wg := new(sync.WaitGroup) // wait until all request are handled for { req, err := server.readRequest(cc) if err != nil { if req == nil { break// it's not possible to recover, so close the connection } req.h.Error = err.Error() server.sendResponse(cc, req.h, invalidRequest, sending) continue } wg.Add(1) go server.handleRequest(cc, req, sending, wg) } wg.Wait() _ = cc.Close() }
serveCodec 的过程非常简单。主要包含三个阶段
读取请求 readRequest
处理请求 handleRequest
回复请求 sendResponse
之前提到过,在一次连接中,允许接收多个请求,即多个 request header 和 request body,因此这里使用了 for 无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要注意的点有三个:
// request stores all information of a call type request struct { h *codec.Header // header of request argv, replyv reflect.Value // argv and replyv of request }
func(server *Server)readRequestHeader(cc codec.Codec)(*codec.Header, error) { var h codec.Header if err := cc.ReadHeader(&h); err != nil { if err != io.EOF && err != io.ErrUnexpectedEOF { log.Println("rpc server: read header error:", err) } returnnil, err } return &h, nil }
func(server *Server)readRequest(cc codec.Codec)(*request, error) { h, err := server.readRequestHeader(cc) if err != nil { returnnil, err } req := &request{h: h} // TODO: now we don't know the type of request argv // day 1, just suppose it's string req.argv = reflect.New(reflect.TypeOf("")) if err = cc.ReadBody(req.argv.Interface()); err != nil { log.Println("rpc server: read argv err:", err) } return req, nil }
func(server *Server)sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) { sending.Lock() defer sending.Unlock() if err := cc.Write(h, body); err != nil { log.Println("rpc server: write response error:", err) } }
func(server *Server)handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) { // TODO, should call registered rpc methods to get the right replyv // day 1, just print argv and send a hello message defer wg.Done() log.Println(req.h, req.argv.Elem()) req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq)) server.sendResponse(cc, req.h, req.replyv.Interface(), sending) }
目前还不能判断 body 的类型,因此在 readRequest 和 handleRequest 中,day1 将 body 作为字符串处理。接收到请求,打印 header,并回复 geerpc resp ${req.h.Seq}。这一部分后续再实现。