diff --git a/agents/agentreq.go b/agents/agentreq.go index 2a8f5197b..052bea03a 100644 --- a/agents/agentreq.go +++ b/agents/agentreq.go @@ -66,9 +66,9 @@ func newAgentRequest(req config.DataProvider, type AgentRequest struct { Request config.DataProvider // request Vars *config.NavigableMap // shared data - CGRRequest *config.NavigableMap + CGRRequest *config.NavigableMap // Used in reply to acced the request that was send CGRReply *config.NavigableMap - CGRAReq *config.NavigableMap // active request + CGRAReq *config.NavigableMap // Used to acces live build in request; both available as active request and active reply Reply *config.NavigableMap tenant, timezone string diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index 4129877aa..0065270bc 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -687,6 +687,10 @@ func (dSv1 DispatcherSv1) GetProfileForEvent(ev *dispatchers.DispatcherEvent, return dSv1.dS.V1GetProfileForEvent(ev, dPrfl) } +func (dSv1 DispatcherSv1) Apier(args *utils.MethodParameters, reply *interface{}) (err error) { + return dSv1.dS.V1Apier(new(ApierV1), args, reply) +} + func NewDispatcherSCDRsV1(dps *dispatchers.DispatcherService) *DispatcherSCDRsV1 { return &DispatcherSCDRsV1{dS: dps} } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 313fd6cd8..7cff26e22 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -1202,6 +1202,8 @@ func startDispatcherService(internalDispatcherSChan, internalAttributeSChan chan return }() + server.SetDispatched() + server.RpcRegister(v1.NewDispatcherSv1(dspS)) server.RpcRegisterName(utils.ThresholdSv1, diff --git a/data/tariffplans/dispatchers/Attributes.csv b/data/tariffplans/dispatchers/Attributes.csv index ad5f48308..92ba4d49d 100644 --- a/data/tariffplans/dispatchers/Attributes.csv +++ b/data/tariffplans/dispatchers/Attributes.csv @@ -18,3 +18,4 @@ cgrates.org,ATTR_API_CDRS_AUTH,*auth,*string:~APIKey:cdrs12345,,,APIMethods,*con cgrates.org,ATTR_API_DSP_AUTH,*auth,*string:~APIKey:dsp12345,,,APIMethods,*constant,DispatcherSv1.Ping&DispatcherSv1.GetProfileForEvent,false,20 cgrates.org,ATTR_API_PSE_AUTH,*auth,*string:~APIKey:pse12345,,,APIMethods,*constant,SessionSv1.Ping&SessionSv1.AuthorizeEvent&SessionSv1.AuthorizeEventWithDigest&SessionSv1.InitiateSession&SessionSv1.InitiateSessionWithDigest&SessionSv1.UpdateSession&SessionSv1.SyncSessions&SessionSv1.TerminateSession&SessionSv1.ProcessCDR&SessionSv1.ProcessEvent&SessionSv1.GetActiveSessions&SessionSv1.GetActiveSessionsCount&SessionSv1.ForceDisconnect&SessionSv1.GetPassiveSessions&SessionSv1.GetPassiveSessionsCount&SessionSv1.ReplicateSessions&SessionSv1.SetPassiveSession&AttributeSv1.ProcessEvent&Responder.Debit&ResourceSv1.AllocateResources&ChargerSv1.ProcessEvent&Responder.MaxDebit,false,20 cgrates.org,ATTR_API_CFG_AUTH,*auth,*string:~APIKey:cfg12345,,,APIMethods,*constant,ConfigSv1.GetJSONSection,false,20 +cgrates.org,ATTR_API_APIER_AUTH,*auth,*string:~APIKey:apier12345,,,APIMethods,*constant,ApierV1.GetAttributeProfile&ApierV1.SetAttributeProfile,false,20 diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 0f9b21b51..434afb578 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -19,6 +19,7 @@ along with this program. If not, see package dispatchers import ( + "encoding/json" "fmt" "reflect" "strings" @@ -193,6 +194,96 @@ func (dS *DispatcherService) V1GetProfileForEvent(ev *DispatcherEvent, return } +// V1Apier is a generic way to cover all APIer methods +func (dS *DispatcherService) V1Apier(apier interface{}, args *utils.MethodParameters, reply *interface{}) (err error) { + + parameters, canCast := args.Parameters.(map[string]interface{}) + if !canCast { + return utils.NewErrMandatoryIeMissing("ArgDispatcher") + } + + var argD *utils.ArgDispatcher + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := parameters[utils.APIKey] + if hasApiKey { + argD = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := parameters[utils.RouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + argD = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + argD.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } + + if argD == nil { + return utils.NewErrMandatoryIeMissing("ArgDispatcher") + } + + tenant, _ := utils.IfaceAsString(parameters[utils.Tenant]) + tenant = utils.FirstNonEmpty(tenant, config.CgrConfig().GeneralCfg().DefaultTenant) + if dS.attrS != nil { + if err = dS.authorize(args.Method, + tenant, + argD.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + // split the method + methodSplit := strings.Split(args.Method, ".") + if len(methodSplit) != 2 { + return rpcclient.ErrUnsupporteServiceMethod + } + method := reflect.ValueOf(apier).MethodByName(methodSplit[1]) + if !method.IsValid() { + return rpcclient.ErrUnsupporteServiceMethod + } + // take the arguments (args + reply) + methodType := method.Type() + if methodType.NumIn() != 2 { + return rpcclient.ErrUnsupporteServiceMethod + } + // convert type of reply to the right one based on method + realReplyType := methodType.In(1) + + var realReply interface{} + if realReplyType.Kind() == reflect.Ptr { + trply := reflect.New(realReplyType.Elem()).Elem().Interface() + realReply = &trply + } else { + realReply = reflect.New(realReplyType).Elem().Interface() + } + //convert parameters so we can unmarshal the informations into to right struct + argsByte, err := json.Marshal(parameters) + if err != nil { + return err + } + // find the type for arg + realArgsType := methodType.In(0) + // create the arg with the right type for method + var realArgs interface{} = reflect.New(realArgsType).Interface() + // populate realArgs with data + if err := json.Unmarshal(argsByte, &realArgs); err != nil { + return err + } + if realArgsType.Kind() != reflect.Ptr { + realArgs = reflect.ValueOf(realArgs).Elem().Interface() + } + if err := dS.Dispatch(&utils.CGREvent{Tenant: tenant, Event: parameters}, utils.MetaApier, argD.RouteID, + args.Method, realArgs, realReply); err != nil { + return err + } + *reply = realReply + return nil + +} + // Call implements rpcclient.RpcClientConnection interface for internal RPC func (dS *DispatcherService) Call(serviceMethod string, // all API fuction must be of type: SubsystemMethod args interface{}, reply interface{}) error { diff --git a/dispatchers/dispatchers_it_test.go b/dispatchers/dispatchers_it_test.go new file mode 100644 index 000000000..20b0034f7 --- /dev/null +++ b/dispatchers/dispatchers_it_test.go @@ -0,0 +1,21 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatchers diff --git a/utils/consts.go b/utils/consts.go index fcf922e3d..aff4dc0c5 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -582,6 +582,8 @@ const ( MetaSuppliersOffset = "*suppliers_offset" ActiveSessionPrefix = "act" PasiveSessionPrefix = "psv" + ApierV = "ApierV" + MetaApier = "*apier" ) // Migrator Action @@ -713,6 +715,7 @@ const ( ChargerSv1 = "ChargerSv1" MetaAuth = "*auth" APIKey = "APIKey" + RouteID = "RouteID" APIMethods = "APIMethods" APIMethod = "APIMethod" NestingSep = "." @@ -886,9 +889,9 @@ const ( // DispatcherS APIs const ( - DispatcherSv1Ping = "DispatcherSv1.Ping" - DispatcherSv1GetProfileForEvent = "DispatcherSv1.GetProfileForEvent" - DispatcherSv1SwitchApierVRequest = "DispatcherSv1.SwitchApierVRequest" + DispatcherSv1Ping = "DispatcherSv1.Ping" + DispatcherSv1GetProfileForEvent = "DispatcherSv1.GetProfileForEvent" + DispatcherSv1Apier = "DispatcherSv1.Apier" ) // AnalyzerS APIs diff --git a/utils/gob_codec.go b/utils/gob_codec.go new file mode 100644 index 000000000..97bba67d6 --- /dev/null +++ b/utils/gob_codec.go @@ -0,0 +1,19 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package utils diff --git a/utils/json_codec.go b/utils/json_codec.go index 21b019b62..db4219835 100644 --- a/utils/json_codec.go +++ b/utils/json_codec.go @@ -53,7 +53,7 @@ type jsonServerCodec struct { pending map[uint64]*json.RawMessage } -// NewServerCodec returns a new rpc.ServerCodec using JSON-RPC on conn. +// NewCustomJSONServerCodec is used only when DispatcherS is active to handle APIer methods generically func NewCustomJSONServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { return &jsonServerCodec{ dec: json.NewDecoder(conn), @@ -64,9 +64,10 @@ func NewCustomJSONServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { } type serverRequest struct { - Method string `json:"method"` - Params *json.RawMessage `json:"params"` - Id *json.RawMessage `json:"id"` + Method string `json:"method"` + Params *json.RawMessage `json:"params"` + Id *json.RawMessage `json:"id"` + isApier bool } func (r *serverRequest) reset() { @@ -88,8 +89,8 @@ func (c *jsonServerCodec) ReadRequestHeader(r *rpc.Request) error { } // in case we get a request with ApierV1 or ApierV2 we redirect // to Dispatcher to send it according to ArgDispatcher - if strings.HasPrefix(c.req.Method, "ApierV") { - r.ServiceMethod = DispatcherSv1SwitchApierVRequest + if c.req.isApier = strings.HasPrefix(c.req.Method, ApierV); c.req.isApier { + r.ServiceMethod = DispatcherSv1Apier } else { r.ServiceMethod = c.req.Method } @@ -116,7 +117,7 @@ func (c *jsonServerCodec) ReadRequestBody(x interface{}) error { } // following example from ReadRequestHeader in case we get ApierV1 // or ApierV2 we compose the parameters - if strings.HasPrefix(c.req.Method, "ApierV") { + if c.req.isApier { cx := x.(*MethodParameters) cx.Method = c.req.Method var params [1]interface{} diff --git a/utils/server.go b/utils/server.go index 39a5222e0..6574a5df7 100644 --- a/utils/server.go +++ b/utils/server.go @@ -262,7 +262,11 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, s.Unlock() Logger.Info(" enabling handler for WebSocket connections") wsHandler := websocket.Handler(func(ws *websocket.Conn) { - jsonrpc.ServeConn(ws) + if s.isDispatched { + rpc.ServeCodec(NewCustomJSONServerCodec(ws)) + } else { + jsonrpc.ServeConn(ws) + } }) if useBasicAuth { s.httpMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) { @@ -458,7 +462,11 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, } continue } - go jsonrpc.ServeConn(conn) + if s.isDispatched { + go rpc.ServeCodec(NewCustomJSONServerCodec(conn)) + } else { + go jsonrpc.ServeConn(conn) + } } }