go注册rpc接口
目录
go注册rpc接口
1.定义proto文件:
syntax = "proto3";
package pb;
service Service {
rpc RPC (Request) returns (Reply) {}
}
message Request {
string Action = 1;
int64 TraceID = 2;
string Payload = 3;
}
message Reply {
int32 Code = 1;
int64 TraceID = 2;
string Payload = 3;
}
生成 gRPC 代码
使用 protoc 编译器生成 Go 代码:
protoc –go_out=. –go-grpc_out=. my.proto
这将生成两个文件:my.pb.go 和 my_grpc.pb.go
2.定义server:
makeGRPCEndpoint 函数的目的是将一个 gRPC 服务方法封装为一个 endpoint.Endpoint 类型的函数。这个函数主要用于将 gRPC 请求转换为内部的服务调用,并处理相关的上下文和错误。
import (
"context"
"github.com/go-kit/kit/endpoint"
grpctransport "github.com/go-kit/kit/transport/grpc"
"sync"
)
type GRPCServer struct {
RPC *grpctransport.Server
Mutex *sync.Mutex
Controllers map[string]Controller
}
type GRPCService interface {
GRPC(*Request) (*Reply, error)
}
func decodeRequest(_ context.Context, req interface{}) (interface{}, error) {
greq := req.(*pb.Request)
return &Request{Action: greq.Action, TraceID: greq.TraceID, Payload: greq.Payload}, nil
}
//makeGRPCEndpoint 函数的主要作用是将 gRPC 请求转换为内部的服务调用,并处理相关的上下文和错误。它使用 defer 和 recover 机制来捕获和处理异常。这个函数通常用于将 gRPC 请求处理逻辑封装为一个 endpoint.Endpoint,以便在服务中统一处理
func makeGRPCEndpoint(srv GRPCService) endpoint.Endpoint {
return func(_ context.Context, request interface{}) (rep interface{}, err error) {
defer func() {
if utils.ExceptionHandler(recover(), "Transport::makeGRPCEndpoint") {
rep = makeSystemErrorReply(FAIL, 0)
err = nil
}
}()
req := request.(*Request)
rep, err = srv.GRPC(req)
if err != nil {
return makeSystemErrorReply(FAIL, req.TraceID), nil
}
return rep, nil
}
}
func encodeReply(_ context.Context, grep interface{}) (interface{}, error) {
rep := grep.(*Reply)
rp := (rep.Payload).(string)
return &pb.Reply{Code: rep.Code, TraceID: rep.TraceID, Payload: rp}, nil
}
对注册controller的方法:
type Request struct {
Action string `json:"Action"`
RequestId string `json:"RequestId,omitempty"`
TraceID int64 `json:"TraceID,omitempty"`
Payload interface{} `json:"Payload,omitempty"`
}
type Reply struct {
Code int32 `json:"Code"`
TraceID int64 `json:"TraceID,omitempty"`
Payload interface{} `json:"Payload,omitempty"`
}
type Transport struct {
HTTPServer *HTTPServer
GRPCServer *GRPCServer
HTTPAddress string
GRPCAddress string
EnableHTTP bool
EnableGRPC bool
Debug *Debug
Otel *module.OtelModule
}
func (this *Request) GetLoggerFields() (traceId int64, requestId string) {
return this.TraceID, this.RequestId
}
func (this *Reply) GetLoggerFields() (traceId int64, requestId string) {
traceId = this.TraceID
if v3response, ok := this.Payload.(*V3ResponseWrapper); ok {
switch res := v3response.Response.(type) {
case *V3ResponseSuc:
requestId = res.RequestId
case *V3ResponseERR:
requestId = res.RequestId
default:
// Do nothing
}
}
return traceId, requestId
}
func NewTransport(
HTTPEnable bool,
HTTPAddress string,
GRPCEnable bool,
GRPCAddress string,
Exec *common_service.ConcurrentExecutorService,
) *Transport {
Trans := &Transport{
HTTPAddress: HTTPAddress,
GRPCAddress: GRPCAddress,
EnableHTTP: HTTPEnable,
EnableGRPC: GRPCEnable,
Debug: nil,
}
if Trans.EnableHTTP {
Trans.Debug = &Debug{ExecService: Exec}
Trans.Debug.Register()
Trans.registerHTTP(
TRANSPORT_DEFAULT_HTTP_PATH,
makeHTTPEndpoint(Trans),
decodeHTTPRequest,
defaultEncodeHTTPReply,
)
}
if Trans.EnableGRPC {
Trans.registerGRPC(
TRANSPORT_DEFAULT_GRPC_NAME,
makeGRPCEndpoint(Trans),
decodeRequest,
encodeReply,
)
}
return Trans
}
func (o *Transport) GetOtelModule() *module.OtelModule {
if o.Otel == nil {
om, err := module.GetModuleManager().GetModule("module_otel")
if err != nil {
return nil
}
o.Otel = om.(*module.OtelModule)
}
return o.Otel
}
func (o *Transport) Run() error {
if o.EnableHTTP {
go func() {
defer func() { utils.ExceptionHandler(recover(), "Transport::Run::HTTPListener") }()
listener, err := net.Listen("tcp4", o.HTTPAddress)
if err != nil {
log.Errorf("Transport::Start::Listen(HTTP) %s fail", o.HTTPAddress)
os.Exit(1)
}
log.Infof("HTTPRPC serve on %s successful", o.HTTPAddress)
http.Serve(listener, nil)
defer listener.Close()
}()
}
if o.EnableGRPC {
go func() {
defer func() { utils.ExceptionHandler(recover(), "Transport::Run::GRPCListener") }()
listener, err := net.Listen("tcp", o.GRPCAddress)
if err != nil {
log.Errorf("Transport::Start::Listen(GRPC) %s fail", o.GRPCAddress)
os.Exit(1)
}
s := grpc.NewServer()
reflection.Register(s)
pb.RegisterEMSServiceServer(s, o)
log.Infof("GRPC serve on %s successful", o.GRPCAddress)
s.Serve(listener)
defer listener.Close()
}()
}
return nil
}
func (o *Transport) registerHTTP(
Name string,
MakeEndpoint endpoint.Endpoint,
DecodeRequest func(context.Context, *http.Request) (interface{}, error),
EncodeReply func(context.Context, http.ResponseWriter, interface{}) error,
) {
defer func() {
utils.ExceptionHandler(recover(), "Transport::registerHTTP")
}()
Encoder := EncodeReply
if Encoder == nil {
Encoder = defaultEncodeHTTPReply
}
o.HTTPServer = &HTTPServer{
RPC: httptransport.NewServer(
MakeEndpoint,
DecodeRequest,
Encoder,
),
Controllers: map[string]Controller{},
Mutex: &sync.Mutex{},
}
options := []otelhttp.Option{otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string {
return operation + r.RequestURI
})}
http.Handle(Name, otelhttp.NewHandler(o.HTTPServer.RPC, "woodpecker-ems", options...))
}
func (o *Transport) RPC(ctx context.Context, req *pb.Request) (*pb.Reply, error) {
defer func() {
utils.ExceptionHandler(recover(), "Transport::RPC")
}()
_, rep, err := o.GRPCServer.RPC.ServeGRPC(ctx, req)
if err != nil {
log.Errorf("RPC.ServeGRPC Error: %s", err.Error())
return nil, err
}
return rep.(*pb.Reply), nil
}
func (o *Transport) GRPC(req *Request) (*Reply, error) {
defer func() {
utils.ExceptionHandler(recover(), "Transport::GRPC")
}()
// 1. Find the controller by action name.
controller, exists := o.GRPCServer.Controllers[req.Action]
if exists == false {
log.Errorf("[%d] Recieved (GRPC) request: %v with unknown action: %s", req.TraceID, req, req.Action)
return makeSystemErrorReply(UNKNOWN_ACTION, req.TraceID), nil
}
// 2. Decode request.
UserRequest := controller.CreateRequest()
rp := (req.Payload).(string)
err := json.Unmarshal([]byte(rp), UserRequest)
if err != nil {
return makeSystemErrorReply(FAIL, req.TraceID), nil
}
// 3. Process with the request.
code, rep := controller.Process(UserRequest, req.TraceID)
// 4. Encode reply to payload.
payload, err := json.Marshal(rep)
if err != nil {
return makeSystemErrorReply(FAIL, req.TraceID), nil
}
reply := &Reply{Code: code, TraceID: req.TraceID, Payload: string(payload)}
log.Debugf("[%d] Recieved (GRPC) request: %v, Reply: %v", req.TraceID, req, reply)
return reply, nil
}
func (o *Transport) HTTPRPC(req *Request) (rep *Reply, err error) {
defer func() {
if utils.ExceptionHandler(recover(), "Transport::HTTPRPC") {
rep = makeSystemErrorReply(FAIL, req.TraceID)
err = nil
}
}()
requestLogger := NewRequestLogger(req)
// 1. Find the controller by action name.
controller, exists := o.HTTPServer.Controllers[req.Action]
if exists == false {
requestLogger.Errorf("Recieved (HTTP) request: with unknown action")
return makeSystemErrorReply(UNKNOWN_ACTION, req.TraceID), nil
}
// 2. Decode request.
UserRequest := controller.CreateRequest()
rp := (req.Payload).(string)
err = json.Unmarshal([]byte(rp), UserRequest)
if err != nil {
log.Errorf("decode unmarshal err: %+v", err)
return makeSystemErrorReply(FAIL, req.TraceID), nil
}
// 3. Process with the request.
code, irep := controller.Process(UserRequest, req.TraceID)
reply := &Reply{Code: code, TraceID: req.TraceID, Payload: irep}
if log.GetLevel() >= log.DebugLevel {
if s, err := json.Marshal(reply); err != nil {
requestLogger.Debugf("Recieved (HTTP) request: %+v, Reply: %+v", req, err)
} else {
requestLogger.Debugf("Recieved (HTTP) request: %+v, Reply: %v", req, string(s))
}
}
return reply, nil
}
func makeSystemErrorReply(code int32, traceId int64) *Reply {
return &Reply{Code: code, TraceID: traceId, Payload: "{}"}
}
func (o *Transport) registerGRPC(
Name string,
MakeEndpoint endpoint.Endpoint,
DecodeRequest func(context.Context, interface{}) (interface{}, error),
EncodeReply func(context.Context, interface{}) (interface{}, error),
) {
defer func() {
utils.ExceptionHandler(recover(), "Transport::registerGRPC")
}()
o.GRPCServer = &GRPCServer{
RPC: grpctransport.NewServer(
MakeEndpoint,
DecodeRequest,
EncodeReply,
),
Controllers: map[string]Controller{},
Mutex: &sync.Mutex{},
}
}
func (o *Transport) RegisterHTTPController(actionName string, controller Controller) error {
defer func() {
utils.ExceptionHandler(recover(), "Transport::RegisterHTTPController")
}()
if o.EnableHTTP == false {
return errors.New("RegisterHTTPController:ERROR: HTTP is not enable")
}
o.HTTPServer.Mutex.Lock()
defer func() { o.HTTPServer.Mutex.Unlock() }()
_, Exists := o.HTTPServer.Controllers[actionName]
if Exists {
return errors.New(fmt.Sprintf("Action(HTTPRPC) %s has registered", actionName))
}
if log.GetLevel() >= log.DebugLevel {
log.Debugf("Register action: %s successful", actionName)
}
o.HTTPServer.Controllers[actionName] = controller
return nil
}
func (o *Transport) RegisterGRPCController(actionName string, controller Controller) error {
defer func() {
utils.ExceptionHandler(recover(), "Transport::RegisterGRPCController")
}()
if o.EnableGRPC == false {
return errors.New("RegisterGRPCController:ERROR: GRPC is not enable")
}
o.GRPCServer.Mutex.Lock()
defer func() { o.GRPCServer.Mutex.Unlock() }()
_, Exists := o.GRPCServer.Controllers[actionName]
if Exists {
return errors.New(fmt.Sprintf("Action(GRPC) %s has registered", actionName))
}
o.GRPCServer.Controllers[actionName] = controller
return nil
}
创建这个transport:
func (o *TransportModule) Create(cfg *config.Config) error {
if o.EnableHTTP {
o.HTTPAddress = cfg.GetString(service.SectionServer, "http.address", "0.0.0.0:4022")
}
if o.EnableGRPC {
o.GRPCAddress = cfg.GetString(service.SectionServer, "grpc.address", "0.0.0.0:3022")
}
log.Infof("enable.http: %t, http.address: %s, enable.grpc: %t, "+
"grpc.address: %s", o.EnableHTTP, o.HTTPAddress, o.EnableGRPC, o.GRPCAddress)
Exec, Err := o.Exec.GetExecutorService()
if Err != nil {
return Err
}
o.Transport = service.NewTransport(o.EnableHTTP, o.HTTPAddress, o.EnableGRPC, o.GRPCAddress, Exec)
return nil
}
启动:
func (o *TransportModule) Start() error {
o.Transport.Run()
return nil
}
之后启动这个rpc模块:
for _, moduleinfo := range this.moduleList {
log.Info(fmt.Sprintf("run module:%s ...", moduleinfo.name))
go func(minfo *moduleInfo) {
this.errorChan <- &moduleErrInfo{
err: minfo.module.Start(),
module: minfo.module,
}
}(moduleinfo)
}
注册rpc接口:
_ = o.Transport.GetTransportService().RegisterGRPCController(model.ActionQueryInstanceID,
&controllers.QueryInstanceIDController{DBSrv: dbService})