mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add CustomJSONCoded in case that Apier is active on the same engine with dispatcher
This commit is contained in:
committed by
Dan Christian Bogos
parent
b86d83b2e8
commit
fbc9f2332d
@@ -66,9 +66,9 @@ func newAgentRequest(req config.DataProvider,
|
||||
type AgentRequest struct {
|
||||
Request config.DataProvider // request
|
||||
Vars *config.NavigableMap // shared data
|
||||
CGRRequest *config.NavigableMap
|
||||
CGRRequest *config.NavigableMap // Used in reply to acced the request that was send
|
||||
CGRReply *config.NavigableMap
|
||||
CGRAReq *config.NavigableMap // active request
|
||||
CGRAReq *config.NavigableMap // Used to acces live build in request; both available as active request and active reply
|
||||
Reply *config.NavigableMap
|
||||
tenant,
|
||||
timezone string
|
||||
|
||||
@@ -687,6 +687,10 @@ func (dSv1 DispatcherSv1) GetProfileForEvent(ev *dispatchers.DispatcherEvent,
|
||||
return dSv1.dS.V1GetProfileForEvent(ev, dPrfl)
|
||||
}
|
||||
|
||||
func (dSv1 DispatcherSv1) Apier(args *utils.MethodParameters, reply *interface{}) (err error) {
|
||||
return dSv1.dS.V1Apier(new(ApierV1), args, reply)
|
||||
}
|
||||
|
||||
func NewDispatcherSCDRsV1(dps *dispatchers.DispatcherService) *DispatcherSCDRsV1 {
|
||||
return &DispatcherSCDRsV1{dS: dps}
|
||||
}
|
||||
|
||||
@@ -1202,6 +1202,8 @@ func startDispatcherService(internalDispatcherSChan, internalAttributeSChan chan
|
||||
return
|
||||
}()
|
||||
|
||||
server.SetDispatched()
|
||||
|
||||
server.RpcRegister(v1.NewDispatcherSv1(dspS))
|
||||
|
||||
server.RpcRegisterName(utils.ThresholdSv1,
|
||||
|
||||
@@ -18,3 +18,4 @@ cgrates.org,ATTR_API_CDRS_AUTH,*auth,*string:~APIKey:cdrs12345,,,APIMethods,*con
|
||||
cgrates.org,ATTR_API_DSP_AUTH,*auth,*string:~APIKey:dsp12345,,,APIMethods,*constant,DispatcherSv1.Ping&DispatcherSv1.GetProfileForEvent,false,20
|
||||
cgrates.org,ATTR_API_PSE_AUTH,*auth,*string:~APIKey:pse12345,,,APIMethods,*constant,SessionSv1.Ping&SessionSv1.AuthorizeEvent&SessionSv1.AuthorizeEventWithDigest&SessionSv1.InitiateSession&SessionSv1.InitiateSessionWithDigest&SessionSv1.UpdateSession&SessionSv1.SyncSessions&SessionSv1.TerminateSession&SessionSv1.ProcessCDR&SessionSv1.ProcessEvent&SessionSv1.GetActiveSessions&SessionSv1.GetActiveSessionsCount&SessionSv1.ForceDisconnect&SessionSv1.GetPassiveSessions&SessionSv1.GetPassiveSessionsCount&SessionSv1.ReplicateSessions&SessionSv1.SetPassiveSession&AttributeSv1.ProcessEvent&Responder.Debit&ResourceSv1.AllocateResources&ChargerSv1.ProcessEvent&Responder.MaxDebit,false,20
|
||||
cgrates.org,ATTR_API_CFG_AUTH,*auth,*string:~APIKey:cfg12345,,,APIMethods,*constant,ConfigSv1.GetJSONSection,false,20
|
||||
cgrates.org,ATTR_API_APIER_AUTH,*auth,*string:~APIKey:apier12345,,,APIMethods,*constant,ApierV1.GetAttributeProfile&ApierV1.SetAttributeProfile,false,20
|
||||
|
||||
|
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package dispatchers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
@@ -193,6 +194,96 @@ func (dS *DispatcherService) V1GetProfileForEvent(ev *DispatcherEvent,
|
||||
return
|
||||
}
|
||||
|
||||
// V1Apier is a generic way to cover all APIer methods
|
||||
func (dS *DispatcherService) V1Apier(apier interface{}, args *utils.MethodParameters, reply *interface{}) (err error) {
|
||||
|
||||
parameters, canCast := args.Parameters.(map[string]interface{})
|
||||
if !canCast {
|
||||
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
|
||||
}
|
||||
|
||||
var argD *utils.ArgDispatcher
|
||||
//check if we have APIKey in event and in case it has add it in ArgDispatcher
|
||||
apiKeyIface, hasApiKey := parameters[utils.APIKey]
|
||||
if hasApiKey {
|
||||
argD = &utils.ArgDispatcher{
|
||||
APIKey: utils.StringPointer(apiKeyIface.(string)),
|
||||
}
|
||||
}
|
||||
//check if we have RouteID in event and in case it has add it in ArgDispatcher
|
||||
routeIDIface, hasRouteID := parameters[utils.RouteID]
|
||||
if hasRouteID {
|
||||
if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct
|
||||
argD = &utils.ArgDispatcher{
|
||||
RouteID: utils.StringPointer(routeIDIface.(string)),
|
||||
}
|
||||
} else {
|
||||
argD.RouteID = utils.StringPointer(routeIDIface.(string))
|
||||
}
|
||||
}
|
||||
|
||||
if argD == nil {
|
||||
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
|
||||
}
|
||||
|
||||
tenant, _ := utils.IfaceAsString(parameters[utils.Tenant])
|
||||
tenant = utils.FirstNonEmpty(tenant, config.CgrConfig().GeneralCfg().DefaultTenant)
|
||||
if dS.attrS != nil {
|
||||
if err = dS.authorize(args.Method,
|
||||
tenant,
|
||||
argD.APIKey, utils.TimePointer(time.Now())); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
// split the method
|
||||
methodSplit := strings.Split(args.Method, ".")
|
||||
if len(methodSplit) != 2 {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
method := reflect.ValueOf(apier).MethodByName(methodSplit[1])
|
||||
if !method.IsValid() {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
// take the arguments (args + reply)
|
||||
methodType := method.Type()
|
||||
if methodType.NumIn() != 2 {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
// convert type of reply to the right one based on method
|
||||
realReplyType := methodType.In(1)
|
||||
|
||||
var realReply interface{}
|
||||
if realReplyType.Kind() == reflect.Ptr {
|
||||
trply := reflect.New(realReplyType.Elem()).Elem().Interface()
|
||||
realReply = &trply
|
||||
} else {
|
||||
realReply = reflect.New(realReplyType).Elem().Interface()
|
||||
}
|
||||
//convert parameters so we can unmarshal the informations into to right struct
|
||||
argsByte, err := json.Marshal(parameters)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// find the type for arg
|
||||
realArgsType := methodType.In(0)
|
||||
// create the arg with the right type for method
|
||||
var realArgs interface{} = reflect.New(realArgsType).Interface()
|
||||
// populate realArgs with data
|
||||
if err := json.Unmarshal(argsByte, &realArgs); err != nil {
|
||||
return err
|
||||
}
|
||||
if realArgsType.Kind() != reflect.Ptr {
|
||||
realArgs = reflect.ValueOf(realArgs).Elem().Interface()
|
||||
}
|
||||
if err := dS.Dispatch(&utils.CGREvent{Tenant: tenant, Event: parameters}, utils.MetaApier, argD.RouteID,
|
||||
args.Method, realArgs, realReply); err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = realReply
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// Call implements rpcclient.RpcClientConnection interface for internal RPC
|
||||
func (dS *DispatcherService) Call(serviceMethod string, // all API fuction must be of type: SubsystemMethod
|
||||
args interface{}, reply interface{}) error {
|
||||
|
||||
21
dispatchers/dispatchers_it_test.go
Normal file
21
dispatchers/dispatchers_it_test.go
Normal file
@@ -0,0 +1,21 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package dispatchers
|
||||
@@ -582,6 +582,8 @@ const (
|
||||
MetaSuppliersOffset = "*suppliers_offset"
|
||||
ActiveSessionPrefix = "act"
|
||||
PasiveSessionPrefix = "psv"
|
||||
ApierV = "ApierV"
|
||||
MetaApier = "*apier"
|
||||
)
|
||||
|
||||
// Migrator Action
|
||||
@@ -713,6 +715,7 @@ const (
|
||||
ChargerSv1 = "ChargerSv1"
|
||||
MetaAuth = "*auth"
|
||||
APIKey = "APIKey"
|
||||
RouteID = "RouteID"
|
||||
APIMethods = "APIMethods"
|
||||
APIMethod = "APIMethod"
|
||||
NestingSep = "."
|
||||
@@ -886,9 +889,9 @@ const (
|
||||
|
||||
// DispatcherS APIs
|
||||
const (
|
||||
DispatcherSv1Ping = "DispatcherSv1.Ping"
|
||||
DispatcherSv1GetProfileForEvent = "DispatcherSv1.GetProfileForEvent"
|
||||
DispatcherSv1SwitchApierVRequest = "DispatcherSv1.SwitchApierVRequest"
|
||||
DispatcherSv1Ping = "DispatcherSv1.Ping"
|
||||
DispatcherSv1GetProfileForEvent = "DispatcherSv1.GetProfileForEvent"
|
||||
DispatcherSv1Apier = "DispatcherSv1.Apier"
|
||||
)
|
||||
|
||||
// AnalyzerS APIs
|
||||
|
||||
19
utils/gob_codec.go
Normal file
19
utils/gob_codec.go
Normal file
@@ -0,0 +1,19 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package utils
|
||||
@@ -53,7 +53,7 @@ type jsonServerCodec struct {
|
||||
pending map[uint64]*json.RawMessage
|
||||
}
|
||||
|
||||
// NewServerCodec returns a new rpc.ServerCodec using JSON-RPC on conn.
|
||||
// NewCustomJSONServerCodec is used only when DispatcherS is active to handle APIer methods generically
|
||||
func NewCustomJSONServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
|
||||
return &jsonServerCodec{
|
||||
dec: json.NewDecoder(conn),
|
||||
@@ -64,9 +64,10 @@ func NewCustomJSONServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
|
||||
}
|
||||
|
||||
type serverRequest struct {
|
||||
Method string `json:"method"`
|
||||
Params *json.RawMessage `json:"params"`
|
||||
Id *json.RawMessage `json:"id"`
|
||||
Method string `json:"method"`
|
||||
Params *json.RawMessage `json:"params"`
|
||||
Id *json.RawMessage `json:"id"`
|
||||
isApier bool
|
||||
}
|
||||
|
||||
func (r *serverRequest) reset() {
|
||||
@@ -88,8 +89,8 @@ func (c *jsonServerCodec) ReadRequestHeader(r *rpc.Request) error {
|
||||
}
|
||||
// in case we get a request with ApierV1 or ApierV2 we redirect
|
||||
// to Dispatcher to send it according to ArgDispatcher
|
||||
if strings.HasPrefix(c.req.Method, "ApierV") {
|
||||
r.ServiceMethod = DispatcherSv1SwitchApierVRequest
|
||||
if c.req.isApier = strings.HasPrefix(c.req.Method, ApierV); c.req.isApier {
|
||||
r.ServiceMethod = DispatcherSv1Apier
|
||||
} else {
|
||||
r.ServiceMethod = c.req.Method
|
||||
}
|
||||
@@ -116,7 +117,7 @@ func (c *jsonServerCodec) ReadRequestBody(x interface{}) error {
|
||||
}
|
||||
// following example from ReadRequestHeader in case we get ApierV1
|
||||
// or ApierV2 we compose the parameters
|
||||
if strings.HasPrefix(c.req.Method, "ApierV") {
|
||||
if c.req.isApier {
|
||||
cx := x.(*MethodParameters)
|
||||
cx.Method = c.req.Method
|
||||
var params [1]interface{}
|
||||
|
||||
@@ -262,7 +262,11 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
|
||||
s.Unlock()
|
||||
Logger.Info("<HTTP> enabling handler for WebSocket connections")
|
||||
wsHandler := websocket.Handler(func(ws *websocket.Conn) {
|
||||
jsonrpc.ServeConn(ws)
|
||||
if s.isDispatched {
|
||||
rpc.ServeCodec(NewCustomJSONServerCodec(ws))
|
||||
} else {
|
||||
jsonrpc.ServeConn(ws)
|
||||
}
|
||||
})
|
||||
if useBasicAuth {
|
||||
s.httpMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -458,7 +462,11 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string,
|
||||
}
|
||||
continue
|
||||
}
|
||||
go jsonrpc.ServeConn(conn)
|
||||
if s.isDispatched {
|
||||
go rpc.ServeCodec(NewCustomJSONServerCodec(conn))
|
||||
} else {
|
||||
go jsonrpc.ServeConn(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user