type Option struct { MagicNumber int// MagicNumber marks this's a geerpc request CodecType codec.Type // client may choose different Codec to encode body }
var DefaultOption = &Option{ MagicNumber: MagicNumber, CodecType: codec.GobType, }
// Server represents an RPC Server. type Server struct{}
// NewServer returns a new Server. funcNewServer() *Server { return &Server{} }
// DefaultServer is the default instance of *Server. var DefaultServer = NewServer()
// Accept accepts connections on the listener and serves requests // for each incoming connection. func(server *Server) Accept(lis net.Listener) { for { conn, err := lis.Accept() if err != nil { log.Println("rpc server: accept error:", err) return } go server.ServeConn(conn) } }
// Accept accepts connections on the listener and serves requests // for each incoming connection. funcAccept(lis net.Listener) { DefaultServer.Accept(lis) }
// ServeConn runs the server on a single connection. // ServeConn blocks, serving the connection until the client hangs up. func(server *Server) ServeConn(conn io.ReadWriteCloser) { deferfunc() { _ = conn.Close() }() var opt Option if err := json.NewDecoder(conn).Decode(&opt); err != nil { log.Println("rpc server: options error: ", err) return } if opt.MagicNumber != MagicNumber { log.Printf("rpc server: invalid magic number %x", opt.MagicNumber) return } f := codec.NewCodecFuncMap[opt.CodecType] if f == nil { log.Printf("rpc server: invalid codec type %s", opt.CodecType) return } server.serveCodec(f(conn)) }
// invalidRequest is a placeholder for response argv when error occurs var invalidRequest = struct{}{}
func(server *Server) serveCodec(cc codec.Codec) { sending := new(sync.Mutex) // make sure to send a complete response wg := new(sync.WaitGroup) // wait until all request are handled for { req, err := server.readRequest(cc) if err != nil { if req == nil { break// it's not possible to recover, so close the connection } req.h.Error = err.Error() server.sendResponse(cc, req.h, invalidRequest, sending) continue } wg.Add(1) go server.handleRequest(cc, req, sending, wg) } wg.Wait() _ = cc.Close() }
serveCodec 的过程非常简单。主要包含三个阶段
读取请求 readRequest
处理请求 handleRequest
回复请求 sendResponse
之前提到过,在一次连接中,允许接收多个请求,即多个 request header 和 request body,因此这里使用了 for 无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要注意的点有三个:
// request stores all information of a call type request struct { h *codec.Header // header of request argv, replyv reflect.Value // argv and replyv of request }
func(server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) { // TODO, should call registered rpc methods to get the right replyv // day 1, just print argv and send a hello message defer wg.Done() log.Println(req.h, req.argv.Elem()) req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq)) server.sendResponse(cc, req.h, req.replyv.Interface(), sending) }
目前还不能判断 body 的类型,因此在 readRequest 和 handleRequest 中,day1 将 body 作为字符串处理。接收到请求,打印 header,并回复 geerpc resp ${req.h.Seq}。这一部分后续再实现。