目录

go注册rpc接口

目录

go注册rpc接口

1.定义proto文件:

syntax = "proto3";

package pb;

service Service {
rpc RPC (Request) returns (Reply) {}
}

message Request {
	string Action  = 1;
	int64  TraceID = 2;
	string Payload = 3;
}

message Reply {
	int32  Code    = 1;
	int64  TraceID = 2;
	string Payload = 3;
}

生成 gRPC 代码

使用 protoc 编译器生成 Go 代码:

protoc –go_out=. –go-grpc_out=. my.proto

这将生成两个文件:my.pb.go 和 my_grpc.pb.go

2.定义server:

makeGRPCEndpoint 函数的目的是将一个 gRPC 服务方法封装为一个 endpoint.Endpoint 类型的函数。这个函数主要用于将 gRPC 请求转换为内部的服务调用,并处理相关的上下文和错误。

import (
	"context"
	"github.com/go-kit/kit/endpoint"
	grpctransport "github.com/go-kit/kit/transport/grpc"
	"sync"
)
type GRPCServer struct {
	RPC         *grpctransport.Server
	Mutex       *sync.Mutex
	Controllers map[string]Controller
}

type GRPCService interface {
	GRPC(*Request) (*Reply, error)
}

func decodeRequest(_ context.Context, req interface{}) (interface{}, error) {
	greq := req.(*pb.Request)
	return &Request{Action: greq.Action, TraceID: greq.TraceID, Payload: greq.Payload}, nil
}
//makeGRPCEndpoint 函数的主要作用是将 gRPC 请求转换为内部的服务调用,并处理相关的上下文和错误。它使用 defer 和 recover 机制来捕获和处理异常。这个函数通常用于将 gRPC 请求处理逻辑封装为一个 endpoint.Endpoint,以便在服务中统一处理
func makeGRPCEndpoint(srv GRPCService) endpoint.Endpoint {
	return func(_ context.Context, request interface{}) (rep interface{}, err error) {
		defer func() {
			if utils.ExceptionHandler(recover(), "Transport::makeGRPCEndpoint") {
				rep = makeSystemErrorReply(FAIL, 0)
				err = nil
			}
		}()
		req := request.(*Request)
		rep, err = srv.GRPC(req)
		if err != nil {
			return makeSystemErrorReply(FAIL, req.TraceID), nil
		}
		return rep, nil
	}
}

func encodeReply(_ context.Context, grep interface{}) (interface{}, error) {
	rep := grep.(*Reply)
	rp := (rep.Payload).(string)
	return &pb.Reply{Code: rep.Code, TraceID: rep.TraceID, Payload: rp}, nil
}

对注册controller的方法:

type Request struct {
	Action    string      `json:"Action"`
	RequestId string      `json:"RequestId,omitempty"`
	TraceID   int64       `json:"TraceID,omitempty"`
	Payload   interface{} `json:"Payload,omitempty"`
}

type Reply struct {
	Code    int32       `json:"Code"`
	TraceID int64       `json:"TraceID,omitempty"`
	Payload interface{} `json:"Payload,omitempty"`
}

type Transport struct {
	HTTPServer  *HTTPServer
	GRPCServer  *GRPCServer
	HTTPAddress string
	GRPCAddress string
	EnableHTTP  bool
	EnableGRPC  bool
	Debug       *Debug
	Otel        *module.OtelModule
}

func (this *Request) GetLoggerFields() (traceId int64, requestId string) {
	return this.TraceID, this.RequestId
}

func (this *Reply) GetLoggerFields() (traceId int64, requestId string) {
	traceId = this.TraceID

	if v3response, ok := this.Payload.(*V3ResponseWrapper); ok {
		switch res := v3response.Response.(type) {
		case *V3ResponseSuc:
			requestId = res.RequestId
		case *V3ResponseERR:
			requestId = res.RequestId
		default:
			//	Do nothing
		}
	}

	return traceId, requestId
}

func NewTransport(
	HTTPEnable bool,
	HTTPAddress string,
	GRPCEnable bool,
	GRPCAddress string,
	Exec *common_service.ConcurrentExecutorService,
) *Transport {
	Trans := &Transport{
		HTTPAddress: HTTPAddress,
		GRPCAddress: GRPCAddress,
		EnableHTTP:  HTTPEnable,
		EnableGRPC:  GRPCEnable,
		Debug:       nil,
	}
	if Trans.EnableHTTP {
		Trans.Debug = &Debug{ExecService: Exec}
		Trans.Debug.Register()
		Trans.registerHTTP(
			TRANSPORT_DEFAULT_HTTP_PATH,
			makeHTTPEndpoint(Trans),
			decodeHTTPRequest,
			defaultEncodeHTTPReply,
		)
	}
	if Trans.EnableGRPC {
		Trans.registerGRPC(
			TRANSPORT_DEFAULT_GRPC_NAME,
			makeGRPCEndpoint(Trans),
			decodeRequest,
			encodeReply,
		)
	}
	return Trans
}

func (o *Transport) GetOtelModule() *module.OtelModule {
	if o.Otel == nil {
		om, err := module.GetModuleManager().GetModule("module_otel")
		if err != nil {
			return nil
		}
		o.Otel = om.(*module.OtelModule)
	}
	return o.Otel
}

func (o *Transport) Run() error {
	if o.EnableHTTP {
		go func() {
			defer func() { utils.ExceptionHandler(recover(), "Transport::Run::HTTPListener") }()
			listener, err := net.Listen("tcp4", o.HTTPAddress)
			if err != nil {
				log.Errorf("Transport::Start::Listen(HTTP) %s fail", o.HTTPAddress)
				os.Exit(1)
			}
			log.Infof("HTTPRPC serve on %s successful", o.HTTPAddress)
			http.Serve(listener, nil)
			defer listener.Close()
		}()
	}
	if o.EnableGRPC {
		go func() {
			defer func() { utils.ExceptionHandler(recover(), "Transport::Run::GRPCListener") }()
			listener, err := net.Listen("tcp", o.GRPCAddress)
			if err != nil {
				log.Errorf("Transport::Start::Listen(GRPC) %s fail", o.GRPCAddress)
				os.Exit(1)
			}
			s := grpc.NewServer()
			reflection.Register(s)
			pb.RegisterEMSServiceServer(s, o)
			log.Infof("GRPC serve on %s successful", o.GRPCAddress)
			s.Serve(listener)
			defer listener.Close()
		}()
	}
	return nil
}

func (o *Transport) registerHTTP(
	Name string,
	MakeEndpoint endpoint.Endpoint,
	DecodeRequest func(context.Context, *http.Request) (interface{}, error),
	EncodeReply func(context.Context, http.ResponseWriter, interface{}) error,
) {
	defer func() {
		utils.ExceptionHandler(recover(), "Transport::registerHTTP")
	}()
	Encoder := EncodeReply
	if Encoder == nil {
		Encoder = defaultEncodeHTTPReply
	}
	o.HTTPServer = &HTTPServer{
		RPC: httptransport.NewServer(
			MakeEndpoint,
			DecodeRequest,
			Encoder,
		),
		Controllers: map[string]Controller{},
		Mutex:       &sync.Mutex{},
	}
	options := []otelhttp.Option{otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string {
		return operation + r.RequestURI
	})}
	http.Handle(Name, otelhttp.NewHandler(o.HTTPServer.RPC, "woodpecker-ems", options...))
}

func (o *Transport) RPC(ctx context.Context, req *pb.Request) (*pb.Reply, error) {
	defer func() {
		utils.ExceptionHandler(recover(), "Transport::RPC")
	}()
	_, rep, err := o.GRPCServer.RPC.ServeGRPC(ctx, req)
	if err != nil {
		log.Errorf("RPC.ServeGRPC Error: %s", err.Error())
		return nil, err
	}
	return rep.(*pb.Reply), nil
}

func (o *Transport) GRPC(req *Request) (*Reply, error) {
	defer func() {
		utils.ExceptionHandler(recover(), "Transport::GRPC")
	}()

	// 1. Find the controller by action name.
	controller, exists := o.GRPCServer.Controllers[req.Action]
	if exists == false {
		log.Errorf("[%d] Recieved (GRPC) request: %v with unknown action: %s", req.TraceID, req, req.Action)
		return makeSystemErrorReply(UNKNOWN_ACTION, req.TraceID), nil
	}

	// 2. Decode request.
	UserRequest := controller.CreateRequest()
	rp := (req.Payload).(string)
	err := json.Unmarshal([]byte(rp), UserRequest)
	if err != nil {
		return makeSystemErrorReply(FAIL, req.TraceID), nil
	}

	// 3. Process with the request.
	code, rep := controller.Process(UserRequest, req.TraceID)

	// 4. Encode reply to payload.
	payload, err := json.Marshal(rep)
	if err != nil {
		return makeSystemErrorReply(FAIL, req.TraceID), nil
	}
	reply := &Reply{Code: code, TraceID: req.TraceID, Payload: string(payload)}
	log.Debugf("[%d] Recieved (GRPC) request: %v, Reply: %v", req.TraceID, req, reply)

	return reply, nil
}

func (o *Transport) HTTPRPC(req *Request) (rep *Reply, err error) {
	defer func() {
		if utils.ExceptionHandler(recover(), "Transport::HTTPRPC") {
			rep = makeSystemErrorReply(FAIL, req.TraceID)
			err = nil
		}
	}()

	requestLogger := NewRequestLogger(req)

	// 1. Find the controller by action name.
	controller, exists := o.HTTPServer.Controllers[req.Action]
	if exists == false {
		requestLogger.Errorf("Recieved (HTTP) request: with unknown action")
		return makeSystemErrorReply(UNKNOWN_ACTION, req.TraceID), nil
	}

	// 2. Decode request.
	UserRequest := controller.CreateRequest()
	rp := (req.Payload).(string)
	err = json.Unmarshal([]byte(rp), UserRequest)
	if err != nil {
		log.Errorf("decode unmarshal err: %+v", err)
		return makeSystemErrorReply(FAIL, req.TraceID), nil
	}

	// 3. Process with the request.
	code, irep := controller.Process(UserRequest, req.TraceID)
	reply := &Reply{Code: code, TraceID: req.TraceID, Payload: irep}

	if log.GetLevel() >= log.DebugLevel {
		if s, err := json.Marshal(reply); err != nil {
			requestLogger.Debugf("Recieved (HTTP) request: %+v, Reply: %+v", req, err)
		} else {
			requestLogger.Debugf("Recieved (HTTP) request: %+v, Reply: %v", req, string(s))
		}
	}
	return reply, nil
}

func makeSystemErrorReply(code int32, traceId int64) *Reply {
	return &Reply{Code: code, TraceID: traceId, Payload: "{}"}
}

func (o *Transport) registerGRPC(
	Name string,
	MakeEndpoint endpoint.Endpoint,
	DecodeRequest func(context.Context, interface{}) (interface{}, error),
	EncodeReply func(context.Context, interface{}) (interface{}, error),
) {
	defer func() {
		utils.ExceptionHandler(recover(), "Transport::registerGRPC")
	}()
	o.GRPCServer = &GRPCServer{
		RPC: grpctransport.NewServer(
			MakeEndpoint,
			DecodeRequest,
			EncodeReply,
		),
		Controllers: map[string]Controller{},
		Mutex:       &sync.Mutex{},
	}
}

func (o *Transport) RegisterHTTPController(actionName string, controller Controller) error {
	defer func() {
		utils.ExceptionHandler(recover(), "Transport::RegisterHTTPController")
	}()
	if o.EnableHTTP == false {
		return errors.New("RegisterHTTPController:ERROR: HTTP is not enable")
	}
	o.HTTPServer.Mutex.Lock()
	defer func() { o.HTTPServer.Mutex.Unlock() }()
	_, Exists := o.HTTPServer.Controllers[actionName]
	if Exists {
		return errors.New(fmt.Sprintf("Action(HTTPRPC) %s has registered", actionName))
	}
	if log.GetLevel() >= log.DebugLevel {
		log.Debugf("Register action: %s successful", actionName)
	}
	o.HTTPServer.Controllers[actionName] = controller
	return nil
}

func (o *Transport) RegisterGRPCController(actionName string, controller Controller) error {
	defer func() {
		utils.ExceptionHandler(recover(), "Transport::RegisterGRPCController")
	}()
	if o.EnableGRPC == false {
		return errors.New("RegisterGRPCController:ERROR: GRPC is not enable")
	}
	o.GRPCServer.Mutex.Lock()
	defer func() { o.GRPCServer.Mutex.Unlock() }()
	_, Exists := o.GRPCServer.Controllers[actionName]
	if Exists {
		return errors.New(fmt.Sprintf("Action(GRPC) %s has registered", actionName))
	}
	o.GRPCServer.Controllers[actionName] = controller
	return nil
}

创建这个transport:

func (o *TransportModule) Create(cfg *config.Config) error {
	if o.EnableHTTP {
		o.HTTPAddress = cfg.GetString(service.SectionServer, "http.address", "0.0.0.0:4022")
	}
	if o.EnableGRPC {
		o.GRPCAddress = cfg.GetString(service.SectionServer, "grpc.address", "0.0.0.0:3022")
	}
	log.Infof("enable.http: %t, http.address: %s, enable.grpc: %t, "+
		"grpc.address: %s", o.EnableHTTP, o.HTTPAddress, o.EnableGRPC, o.GRPCAddress)
	Exec, Err := o.Exec.GetExecutorService()
	if Err != nil {
		return Err
	}
	o.Transport = service.NewTransport(o.EnableHTTP, o.HTTPAddress, o.EnableGRPC, o.GRPCAddress, Exec)
	return nil
}

启动:

func (o *TransportModule) Start() error {
	o.Transport.Run()
	return nil
}

之后启动这个rpc模块:

	for _, moduleinfo := range this.moduleList {
		log.Info(fmt.Sprintf("run module:%s ...", moduleinfo.name))
		go func(minfo *moduleInfo) {
			this.errorChan <- &moduleErrInfo{
				err:    minfo.module.Start(),
				module: minfo.module,
			}
		}(moduleinfo)
	}

注册rpc接口:

_ = o.Transport.GetTransportService().RegisterGRPCController(model.ActionQueryInstanceID,
			&controllers.QueryInstanceIDController{DBSrv: dbService})