Add Wrapper for CDR and ExternalCDR and use it in methods

This commit is contained in:
TeoV
2019-04-08 11:33:04 -04:00
committed by Dan Christian Bogos
parent 476b3b16a2
commit 22250fe9cc
18 changed files with 264 additions and 213 deletions

View File

@@ -354,8 +354,30 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
if reqProcessor.Flags.HasKey(utils.MetaCDRs) &&
!reqProcessor.Flags.HasKey(utils.MetaDryRun) {
var rplyCDRs string
//compose the arguments for SessionSv1ProcessCDR
argProcessCDR := &utils.CGREventWithArgDispatcher{
CGREvent: cgrEv,
}
//check if we have APIKey in event and in case it has add it in ArgDispatcher
apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey]
if hasApiKey {
argProcessCDR.ArgDispatcher = &utils.ArgDispatcher{
APIKey: utils.StringPointer(apiKeyIface.(string)),
}
}
//check if we have RouteID in event and in case it has add it in ArgDispatcher
routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID]
if hasRouteID {
if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct
argProcessCDR.ArgDispatcher = &utils.ArgDispatcher{
RouteID: utils.StringPointer(routeIDIface.(string)),
}
} else {
argProcessCDR.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string))
}
}
if err = da.sS.Call(utils.SessionSv1ProcessCDR,
cgrEv, &rplyCDRs); err != nil {
argProcessCDR, &rplyCDRs); err != nil {
agReq.CGRReply.Set([]string{utils.Error}, err.Error(), false, false)
}
}

View File

@@ -86,7 +86,7 @@ type CDRsV1 struct {
}
// ProcessCDR will process a CDR in CGRateS internal format
func (cdrSv1 *CDRsV1) ProcessCDR(cdr *engine.CDR, reply *string) error {
func (cdrSv1 *CDRsV1) ProcessCDR(cdr *engine.CDRWithArgDispatcher, reply *string) error {
return cdrSv1.CDRs.V1ProcessCDR(cdr, reply)
}
@@ -96,7 +96,7 @@ func (cdrSv1 *CDRsV1) ProcessEvent(arg *engine.ArgV1ProcessEvent, reply *string)
}
// ProcessExternalCDR will process a CDR in external format
func (cdrSv1 *CDRsV1) ProcessExternalCDR(cdr *engine.ExternalCDR, reply *string) error {
func (cdrSv1 *CDRsV1) ProcessExternalCDR(cdr *engine.ExternalCDRWithArgDispatcher, reply *string) error {
return cdrSv1.CDRs.V1ProcessExternalCDR(cdr, reply)
}
@@ -110,11 +110,11 @@ func (cdrSv1 *CDRsV1) StoreSessionCost(attr *engine.AttrCDRSStoreSMCost, reply *
return cdrSv1.CDRs.V1StoreSessionCost(attr, reply)
}
func (cdrSv1 *CDRsV1) CountCDRs(args *utils.RPCCDRsFilter, reply *int64) error {
func (cdrSv1 *CDRsV1) CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, reply *int64) error {
return cdrSv1.CDRs.V1CountCDRs(args, reply)
}
func (cdrSv1 *CDRsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error {
func (cdrSv1 *CDRsV1) GetCDRs(args utils.RPCCDRsFilterWithArgDispatcher, reply *[]*engine.CDR) error {
return cdrSv1.CDRs.V1GetCDRs(args, reply)
}

View File

@@ -686,11 +686,11 @@ func (dS *DispatcherSCDRsV1) Ping(args *utils.CGREventWithArgDispatcher, reply *
return dS.dS.CDRsV1Ping(args, reply)
}
func (dS *DispatcherSCDRsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error {
func (dS *DispatcherSCDRsV1) GetCDRs(args utils.RPCCDRsFilterWithArgDispatcher, reply *[]*engine.CDR) error {
return dS.dS.CDRsV1GetCDRs(args, reply)
}
func (dS *DispatcherSCDRsV1) CountCDRs(args *utils.RPCCDRsFilter, reply *int64) error {
func (dS *DispatcherSCDRsV1) CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, reply *int64) error {
return dS.dS.CDRsV1CountCDRs(args, reply)
}
@@ -702,7 +702,7 @@ func (dS *DispatcherSCDRsV1) RateCDRs(args *engine.ArgRateCDRs, reply *string) e
return dS.dS.CDRsV1RateCDRs(args, reply)
}
func (dS *DispatcherSCDRsV1) ProcessExternalCDR(args *engine.ExternalCDR, reply *string) error {
func (dS *DispatcherSCDRsV1) ProcessExternalCDR(args *engine.ExternalCDRWithArgDispatcher, reply *string) error {
return dS.dS.CDRsV1ProcessExternalCDR(args, reply)
}
@@ -710,6 +710,6 @@ func (dS *DispatcherSCDRsV1) ProcessEvent(args *engine.ArgV1ProcessEvent, reply
return dS.dS.CDRsV1ProcessEvent(args, reply)
}
func (dS *DispatcherSCDRsV1) ProcessCDR(args *engine.CDR, reply *string) error {
func (dS *DispatcherSCDRsV1) ProcessCDR(args *engine.CDRWithArgDispatcher, reply *string) error {
return dS.dS.CDRsV1ProcessCDR(args, reply)
}

View File

@@ -67,7 +67,7 @@ func (ssv1 *SessionSv1) TerminateSession(args *sessions.V1TerminateSessionArgs,
return ssv1.Ss.BiRPCv1TerminateSession(nil, args, rply)
}
func (ssv1 *SessionSv1) ProcessCDR(cgrEv *utils.CGREvent, rply *string) error {
func (ssv1 *SessionSv1) ProcessCDR(cgrEv *utils.CGREventWithArgDispatcher, rply *string) error {
return ssv1.Ss.BiRPCv1ProcessCDR(nil, cgrEv, rply)
}

View File

@@ -86,7 +86,7 @@ func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt *rpc2.Client, args *session
return ssv1.Ss.BiRPCv1TerminateSession(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREvent, rply *string) error {
func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREventWithArgDispatcher, rply *string) error {
return ssv1.Ss.BiRPCv1ProcessCDR(clnt, cgrEv, rply)
}

View File

@@ -12,3 +12,4 @@ cgrates.org,ATTR_API_RSP_AUTH,*auth,*string:~APIKey:rsp12345,,,APIMethods,*const
cgrates.org,ATTR_API_CHC_AUTH,*auth,*string:~APIKey:chc12345,,,APIMethods,*constant,CacheSv1.Ping&CacheSv1.GetCacheStats&CacheSv1.LoadCache&CacheSv1.PrecacheStatus&CacheSv1.GetItemIDs&CacheSv1.HasItem&CacheSv1.GetItemExpiryTime&CacheSv1.ReloadCache&CacheSv1.RemoveItem&CacheSv1.FlushCache&CacheSv1.Clear,false,20
cgrates.org,ATTR_API_GRD_AUTH,*auth,*string:~APIKey:grd12345,,,APIMethods,*constant,GuardianSv1.Ping&GuardianSv1.RemoteLock&GuardianSv1.RemoteUnlock,false,20
cgrates.org,ATTR_API_SCHD_AUTH,*auth,*string:~APIKey:sched12345,,,APIMethods,*constant,SchedulerSv1.Ping,false,20
cgrates.org,ATTR_API_CDRS_AUTH,*auth,*string:~APIKey:cdrs12345,,,APIMethods,*constant,CDRsV1.Ping,false,20
1 #Tenant ID Contexts FilterIDs ActivationInterval AttributeFilterIDs FieldName Type Value Blocker Weight
12 cgrates.org ATTR_API_CHC_AUTH *auth *string:~APIKey:chc12345 APIMethods *constant CacheSv1.Ping&CacheSv1.GetCacheStats&CacheSv1.LoadCache&CacheSv1.PrecacheStatus&CacheSv1.GetItemIDs&CacheSv1.HasItem&CacheSv1.GetItemExpiryTime&CacheSv1.ReloadCache&CacheSv1.RemoveItem&CacheSv1.FlushCache&CacheSv1.Clear false 20
13 cgrates.org ATTR_API_GRD_AUTH *auth *string:~APIKey:grd12345 APIMethods *constant GuardianSv1.Ping&GuardianSv1.RemoteLock&GuardianSv1.RemoteUnlock false 20
14 cgrates.org ATTR_API_SCHD_AUTH *auth *string:~APIKey:sched12345 APIMethods *constant SchedulerSv1.Ping false 20
15 cgrates.org ATTR_API_CDRS_AUTH *auth *string:~APIKey:cdrs12345 APIMethods *constant CDRsV1.Ping false 20

View File

@@ -42,7 +42,7 @@ func (dS *DispatcherService) CDRsV1Ping(args *utils.CGREventWithArgDispatcher,
utils.CDRsV1Ping, args.CGREvent, reply)
}
func (dS *DispatcherService) CDRsV1GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) (err error) {
func (dS *DispatcherService) CDRsV1GetCDRs(args utils.RPCCDRsFilterWithArgDispatcher, reply *[]*engine.CDR) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
}
@@ -57,7 +57,7 @@ func (dS *DispatcherService) CDRsV1GetCDRs(args utils.RPCCDRsFilter, reply *[]*e
utils.CDRsV1GetCDRs, args, reply)
}
func (dS *DispatcherService) CDRsV1CountCDRs(args *utils.RPCCDRsFilter, reply *int64) (err error) {
func (dS *DispatcherService) CDRsV1CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, reply *int64) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
}
@@ -93,7 +93,7 @@ func (dS *DispatcherService) CDRsV1RateCDRs(args *engine.ArgRateCDRs, reply *str
}
if dS.attrS != nil {
if err = dS.authorize(utils.CDRsV1RateCDRs,
args.Tenant,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
@@ -102,7 +102,7 @@ func (dS *DispatcherService) CDRsV1RateCDRs(args *engine.ArgRateCDRs, reply *str
utils.CDRsV1RateCDRs, args, reply)
}
func (dS *DispatcherService) CDRsV1ProcessExternalCDR(args *engine.ExternalCDR, reply *string) (err error) {
func (dS *DispatcherService) CDRsV1ProcessExternalCDR(args *engine.ExternalCDRWithArgDispatcher, reply *string) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
}
@@ -132,7 +132,7 @@ func (dS *DispatcherService) CDRsV1ProcessEvent(args *engine.ArgV1ProcessEvent,
utils.CDRsV1ProcessEvent, args, reply)
}
func (dS *DispatcherService) CDRsV1ProcessCDR(args *engine.CDR, reply *string) (err error) {
func (dS *DispatcherService) CDRsV1ProcessCDR(args *engine.CDRWithArgDispatcher, reply *string) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
}

View File

@@ -0,0 +1,61 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package dispatchers
import (
"testing"
"github.com/cgrates/cgrates/utils"
)
var sTestsDspCDRs = []func(t *testing.T){
testDspCDRsPing,
}
//Test start here
func TestDspCDRsITMySQL(t *testing.T) {
testDsp(t, sTestsDspCDRs, "TestDspCDRs", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
}
func TestDspCDRsITMongo(t *testing.T) {
testDsp(t, sTestsDspCDRs, "TestDspCDRs", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
}
func testDspCDRsPing(t *testing.T) {
var reply string
if err := allEngine.RCP.Call(utils.CDRsV1Ping, new(utils.CGREvent), &reply); err != nil {
t.Error(err)
} else if reply != utils.Pong {
t.Errorf("Received: %s", reply)
}
if err := dispEngine.RCP.Call(utils.CDRsV1Ping, &utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
},
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer("cdrs12345"),
},
}, &reply); err != nil {
t.Error(err)
} else if reply != utils.Pong {
t.Errorf("Received: %s", reply)
}
}

View File

@@ -215,9 +215,11 @@ func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeI
hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var dH *engine.DispatcherHost
if routeID != nil && *routeID != "" {
// overwrite routeID with RouteID:Subsystem
*routeID = utils.ConcatenatedKey(*routeID, subsystem)
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
utils.ConcatenatedKey(subsystem, *routeID)); ok && x != nil {
*routeID); ok && x != nil {
dH = x.(*engine.DispatcherHost)
if err = dH.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
return
@@ -233,7 +235,7 @@ func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeI
continue
}
if routeID != nil && *routeID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, utils.ConcatenatedKey(subsystem, *routeID), dH,
engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH,
nil, true, utils.EmptyString)
}
break

View File

@@ -104,7 +104,6 @@ type CDR struct {
CostSource string // The source of this cost
Cost float64 //
CostDetails *EventCost // Attach the cost details to CDR when possible
*utils.ArgDispatcher
}
// AddDefaults will add missing information based on other fields
@@ -805,7 +804,6 @@ type ExternalCDR struct {
CostDetails string
ExtraInfo string
PreRated bool // Mark the CDR as rated so we do not process it during mediation
*utils.ArgDispatcher
}
// Used when authorizing requests from outside, eg ApierV1.GetMaxUsage
@@ -881,3 +879,13 @@ func (self *UsageRecord) AsCallDescriptor(timezone string, denyNegative bool) (*
func (self *UsageRecord) GetId() string {
return utils.Sha1(self.ToR, self.RequestType, self.Tenant, self.Category, self.Account, self.Subject, self.Destination, self.SetupTime, self.AnswerTime, self.Usage)
}
type CDRWithArgDispatcher struct {
*CDR
*utils.ArgDispatcher
}
type ExternalCDRWithArgDispatcher struct {
*ExternalCDR
*utils.ArgDispatcher
}

View File

@@ -43,7 +43,7 @@ func cgrCdrHandler(w http.ResponseWriter, r *http.Request) {
}
cdr := cgrCdr.AsCDR(cdrServer.cgrCfg.GeneralCfg().DefaultTimezone)
var ignored string
if err := cdrServer.V1ProcessCDR(cdr, &ignored); err != nil {
if err := cdrServer.V1ProcessCDR(&CDRWithArgDispatcher{CDR: cdr}, &ignored); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing CDR: %s, err: <%s>",
utils.CDRs, cdr, err.Error()))
@@ -60,7 +60,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
cdr := fsCdr.AsCDR(cdrServer.cgrCfg.GeneralCfg().DefaultTimezone)
var ignored string
if err := cdrServer.V1ProcessCDR(cdr, &ignored); err != nil {
if err := cdrServer.V1ProcessCDR(&CDRWithArgDispatcher{CDR: cdr}, &ignored); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing CDR: %s, err: <%s>",
utils.CDRs, cdr, err.Error()))
@@ -137,7 +137,7 @@ func (cdrS *CDRServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error {
// rateCDR will populate cost field
// Returns more than one rated CDR in case of SMCost retrieved based on prefix
func (cdrS *CDRServer) rateCDR(cdr *CDR) ([]*CDR, error) {
func (cdrS *CDRServer) rateCDR(cdr *CDRWithArgDispatcher) ([]*CDR, error) {
var qryCC *CallCost
var err error
if cdr.RequestType == utils.META_NONE {
@@ -197,11 +197,11 @@ func (cdrS *CDRServer) rateCDR(cdr *CDR) ([]*CDR, error) {
cdr.CostDetails = NewEventCostFromCallCost(qryCC, cdr.CGRID, cdr.RunID)
}
cdr.CostDetails.Compute()
return []*CDR{cdr}, nil
return []*CDR{cdr.CDR}, nil
}
// getCostFromRater will retrieve the cost from RALs
func (cdrS *CDRServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
func (cdrS *CDRServer) getCostFromRater(cdr *CDRWithArgDispatcher) (*CallCost, error) {
cc := new(CallCost)
var err error
timeStart := cdr.AnswerTime
@@ -266,13 +266,13 @@ func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREventWithArgDispatcher,
return
}
func (cdrS *CDRServer) rateCDRWithErr(cdr *CDR) (ratedCDRs []*CDR) {
func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*CDR) {
var err error
ratedCDRs, err = cdrS.rateCDR(cdr)
if err != nil {
cdr.Cost = -1.0 // If there was an error, mark the CDR
cdr.ExtraInfo = err.Error()
ratedCDRs = []*CDR{cdr}
ratedCDRs = []*CDR{cdr.CDR}
}
return
}
@@ -300,12 +300,10 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher,
partExec = true
continue
}
for _, rtCDR := range cdrS.rateCDRWithErr(cdr) {
for _, rtCDR := range cdrS.rateCDRWithErr(&CDRWithArgDispatcher{CDR: cdr, ArgDispatcher: cgrEv.ArgDispatcher}) {
arg := &utils.CGREventWithArgDispatcher{
CGREvent: rtCDR.AsCGREvent(),
}
if cgrEv.ArgDispatcher != nil {
arg.ArgDispatcher = cgrEv.ArgDispatcher
CGREvent: rtCDR.AsCGREvent(),
ArgDispatcher: cgrEv.ArgDispatcher,
}
if errProc := cdrS.attrStoExpThdStat(arg,
attrS, store, export, thdS, statS); errProc != nil {
@@ -426,7 +424,7 @@ func (cdrS *CDRServer) Call(serviceMethod string, args interface{}, reply interf
}
// V1ProcessCDR processes a CDR
func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) {
func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (err error) {
if cdr.CGRID == utils.EmptyString { // Populate CGRID if not present
cdr.ComputeCGRID()
}
@@ -473,9 +471,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) {
ID: utils.UUIDSha1Prefix(),
Event: cdr.AsMapStringIface(),
},
}
if cdr.ArgDispatcher != nil {
cgrEv.ArgDispatcher = cdr.ArgDispatcher
ArgDispatcher: cdr.ArgDispatcher,
}
if cdrS.attrS != nil {
if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
@@ -484,7 +480,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) {
}
}
if cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs { // Store *raw CDR
if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil {
if err = cdrS.cdrDb.SetCDR(cdr.CDR, false); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> storing primary CDR %+v, got error: %s",
utils.CDRs, cdr, err.Error()))
@@ -493,7 +489,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) {
}
}
if len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 {
cdrS.exportCDRs([]*CDR{cdr}) // Replicate raw CDR
cdrS.exportCDRs([]*CDR{cdr.CDR}) // Replicate raw CDR
}
if cdrS.thdS != nil {
go cdrS.thdSProcessEvent(cgrEv)
@@ -600,7 +596,8 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
err = utils.ErrPartiallyExecuted
return
}
for _, rtCDR := range cdrS.rateCDRWithErr(cdr) {
for _, rtCDR := range cdrS.rateCDRWithErr(&CDRWithArgDispatcher{CDR: cdr,
ArgDispatcher: arg.ArgDispatcher}) {
cgrEv := &utils.CGREventWithArgDispatcher{
CGREvent: rtCDR.AsCGREvent(),
ArgDispatcher: arg.ArgDispatcher,
@@ -728,6 +725,7 @@ type ArgRateCDRs struct {
ThresholdS *bool
StatS *bool // Set to true if the CDRs should be sent to stats server
*utils.ArgDispatcher
*utils.TenantArg
}
// V1RateCDRs is used for re-/rate CDRs which are already stored within StorDB
@@ -777,16 +775,18 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) {
}
// Used to process external CDRs
func (cdrS *CDRServer) V1ProcessExternalCDR(eCDR *ExternalCDR, reply *string) error {
cdr, err := NewCDRFromExternalCDR(eCDR, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
func (cdrS *CDRServer) V1ProcessExternalCDR(eCDR *ExternalCDRWithArgDispatcher, reply *string) error {
cdr, err := NewCDRFromExternalCDR(eCDR.ExternalCDR,
cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
return err
}
return cdrS.V1ProcessCDR(cdr, reply)
return cdrS.V1ProcessCDR(&CDRWithArgDispatcher{CDR: cdr,
ArgDispatcher: eCDR.ArgDispatcher}, reply)
}
// V1GetCDRs returns CDRs from DB
func (cdrS *CDRServer) V1GetCDRs(args utils.RPCCDRsFilter, cdrs *[]*CDR) error {
func (cdrS *CDRServer) V1GetCDRs(args utils.RPCCDRsFilterWithArgDispatcher, cdrs *[]*CDR) error {
cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
if err.Error() != utils.NotFoundCaps {
@@ -803,7 +803,7 @@ func (cdrS *CDRServer) V1GetCDRs(args utils.RPCCDRsFilter, cdrs *[]*CDR) error {
}
// V1CountCDRs counts CDRs from DB
func (cdrS *CDRServer) V1CountCDRs(args *utils.RPCCDRsFilter, cnt *int64) error {
func (cdrS *CDRServer) V1CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, cnt *int64) error {
cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
if err.Error() != utils.NotFoundCaps {

View File

@@ -129,12 +129,11 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREventWithArgDispatcher) (
}
args := &AttrArgsProcessEvent{
AttributeIDs: cP.AttributeIDs,
Context: utils.StringPointer(utils.MetaChargers),
ProcessRuns: nil,
CGREvent: *clonedEv.CGREvent}
if clonedEv.ArgDispatcher != nil {
args.ArgDispatcher = clonedEv.ArgDispatcher
AttributeIDs: cP.AttributeIDs,
Context: utils.StringPointer(utils.MetaChargers),
ProcessRuns: nil,
CGREvent: *clonedEv.CGREvent,
ArgDispatcher: clonedEv.ArgDispatcher,
}
var evReply AttrSProcessEventReply
if err = cS.attrS.Call(utils.AttributeSv1ProcessEvent,

View File

@@ -526,10 +526,10 @@ func (rS *ResourceService) processThresholds(r *Resource, argDispatcher *utils.A
Event: map[string]interface{}{
utils.EventType: utils.ResourceUpdate,
utils.ResourceID: r.ID,
utils.Usage: r.totalUsage()}}}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if argDispatcher != nil {
thEv.ArgDispatcher = argDispatcher
utils.Usage: r.totalUsage(),
},
},
ArgDispatcher: argDispatcher,
}
var tIDs []string
if err = rS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&

View File

@@ -281,10 +281,10 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.EventType: utils.StatUpdate,
utils.StatID: sq.ID}}}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
thEv.ArgDispatcher = args.ArgDispatcher
utils.StatID: sq.ID,
},
},
ArgDispatcher: args.ArgDispatcher,
}
for metricID, metric := range sq.SQMetrics {
thEv.Event[metricID] = metric.GetValue()

View File

@@ -169,13 +169,15 @@ type SMCost struct {
type AttrCDRSStoreSMCost struct {
Cost *SMCost
CheckDuplicate bool
*utils.TenantWithArgDispatcher
*utils.ArgDispatcher
*utils.TenantArg
}
type ArgsV2CDRSStoreSMCost struct {
Cost *V2SMCost
CheckDuplicate bool
*utils.TenantWithArgDispatcher
*utils.ArgDispatcher
*utils.TenantArg
}
type V2SMCost struct {

View File

@@ -478,12 +478,9 @@ func (spS *SupplierService) V1GetSuppliers(args *ArgsGetSuppliers, reply *Sorted
}
if spS.attributeS != nil {
attrArgs := &AttrArgsProcessEvent{
Context: utils.StringPointer(utils.MetaSuppliers),
CGREvent: args.CGREvent,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
attrArgs.ArgDispatcher = args.ArgDispatcher
Context: utils.StringPointer(utils.MetaSuppliers),
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
var rplyEv AttrSProcessEventReply
if err := spS.attributeS.Call(utils.AttributeSv1ProcessEvent,

View File

@@ -391,17 +391,15 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs
var reply string
for _, cgrEv := range cgrEvs {
argsProc := &engine.ArgV1ProcessEvent{
CGREvent: *cgrEv,
ChargerS: utils.BoolPointer(false),
AttributeS: utils.BoolPointer(false)}
CGREvent: *cgrEv,
ChargerS: utils.BoolPointer(false),
AttributeS: utils.BoolPointer(false),
ArgDispatcher: s.ArgDispatcher,
}
if unratedReqs.HasField( // order additional rating for unrated request types
engine.NewMapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) {
argsProc.RALs = utils.BoolPointer(true)
}
if s.ArgDispatcher != nil {
argsProc.ArgDispatcher = s.ArgDispatcher
}
if err = sS.cdrS.Call(utils.CDRsV1ProcessEvent, argsProc, &reply); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
@@ -415,12 +413,10 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs
if sS.resS != nil && s.ResourceID != "" {
var reply string
argsRU := utils.ArgRSv1ResourceUsage{
CGREvent: cgrEv,
UsageID: s.ResourceID,
Units: 1,
}
if s.ArgDispatcher != nil {
argsRU.ArgDispatcher = s.ArgDispatcher
CGREvent: cgrEv,
UsageID: s.ResourceID,
Units: 1,
ArgDispatcher: s.ArgDispatcher,
}
if err := sS.resS.Call(utils.ResourceSv1ReleaseResources,
argsRU, &reply); err != nil {
@@ -600,18 +596,16 @@ func (sS *SessionS) refundSession(s *Session, sRunIdx int, rUsage time.Duration)
}
}
cd := &engine.CallDescriptor{
CgrID: s.CGRID,
RunID: sr.Event.GetStringIgnoreErrors(utils.RunID),
Category: sr.CD.Category,
Tenant: sr.CD.Tenant,
Subject: sr.CD.Subject,
Account: sr.CD.Account,
Destination: sr.CD.Destination,
TOR: sr.CD.TOR,
Increments: incrmts,
}
if s.ArgDispatcher != nil {
cd.ArgDispatcher = s.ArgDispatcher
CgrID: s.CGRID,
RunID: sr.Event.GetStringIgnoreErrors(utils.RunID),
Category: sr.CD.Category,
Tenant: sr.CD.Tenant,
Subject: sr.CD.Subject,
Account: sr.CD.Account,
Destination: sr.CD.Destination,
TOR: sr.CD.TOR,
Increments: incrmts,
ArgDispatcher: s.ArgDispatcher,
}
var acnt engine.Account
if err = sS.ralS.Call(utils.ResponderRefundIncrements, cd, &acnt); err != nil {
@@ -645,9 +639,10 @@ func (sS *SessionS) storeSCost(s *Session, sRunIdx int) (err error) {
argSmCost := &engine.ArgsV2CDRSStoreSMCost{
Cost: smCost,
CheckDuplicate: true,
}
if s.ArgDispatcher != nil {
argSmCost.ArgDispatcher = s.ArgDispatcher
ArgDispatcher: s.ArgDispatcher,
TenantArg: &utils.TenantArg{
Tenant: s.Tenant,
},
}
var reply string
if err := sS.cdrS.Call(utils.CDRsV2StoreSessionCost,
@@ -974,11 +969,7 @@ func (sS *SessionS) forkSession(s *Session) (err error) {
ID: utils.UUIDSha1Prefix(),
Event: s.EventStart.AsMapInterface(),
},
}
// in case we have ArgDispatcher in session we populate CGREvent
// so DispatcherS can verify the APIKey/RouteID if it's necessary
if s.ArgDispatcher != nil {
cgrEv.ArgDispatcher = s.ArgDispatcher
ArgDispatcher: s.ArgDispatcher,
}
var chrgrs []*engine.ChrgSProcessEventReply
if err = sS.chargerS.Call(utils.ChargerSv1ProcessEvent,
@@ -1710,12 +1701,9 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
return utils.NewErrNotConnected(utils.AttributeS)
}
attrArgs := &engine.AttrArgsProcessEvent{
Context: utils.StringPointer(utils.MetaSessionS),
CGREvent: args.CGREvent,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
attrArgs.ArgDispatcher = args.ArgDispatcher
Context: utils.StringPointer(utils.MetaSessionS),
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
var rplyEv engine.AttrSProcessEventReply
if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent,
@@ -1749,13 +1737,10 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
}
var allocMsg string
attrRU := utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
UsageID: originID,
Units: 1,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
attrRU.ArgDispatcher = args.ArgDispatcher
CGREvent: args.CGREvent,
UsageID: originID,
Units: 1,
ArgDispatcher: args.ArgDispatcher,
}
if err = sS.resS.Call(utils.ResourceSv1AuthorizeResources,
attrRU, &allocMsg); err != nil {
@@ -1773,14 +1758,11 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
}
var splsReply engine.SortedSuppliers
sArgs := &engine.ArgsGetSuppliers{
IgnoreErrors: args.SuppliersIgnoreErrors,
MaxCost: args.SuppliersMaxCost,
CGREvent: *cgrEv,
Paginator: args.Paginator,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
sArgs.ArgDispatcher = args.ArgDispatcher
IgnoreErrors: args.SuppliersIgnoreErrors,
MaxCost: args.SuppliersMaxCost,
CGREvent: *cgrEv,
Paginator: args.Paginator,
ArgDispatcher: args.ArgDispatcher,
}
if err = sS.splS.Call(utils.SupplierSv1GetSuppliers,
sArgs, &splsReply); err != nil {
@@ -1796,11 +1778,8 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
}
var tIDs []string
thEv := &engine.ArgsProcessEvent{
CGREvent: args.CGREvent,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
thEv.ArgDispatcher = args.ArgDispatcher
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
@@ -1814,10 +1793,9 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
if sS.statS == nil {
return utils.NewErrNotConnected(utils.StatService)
}
statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
statArgs.ArgDispatcher = args.ArgDispatcher
statArgs := &engine.StatsArgsProcessEvent{
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
var statReply []string
if err := sS.statS.Call(utils.StatSv1ProcessEvent,
@@ -1997,12 +1975,9 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
return utils.NewErrNotConnected(utils.AttributeS)
}
attrArgs := &engine.AttrArgsProcessEvent{
Context: utils.StringPointer(utils.MetaSessionS),
CGREvent: args.CGREvent,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
attrArgs.ArgDispatcher = args.ArgDispatcher
Context: utils.StringPointer(utils.MetaSessionS),
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
var rplyEv engine.AttrSProcessEventReply
if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent,
@@ -2026,13 +2001,10 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
attrRU := utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
UsageID: originID,
Units: 1,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
attrRU.ArgDispatcher = args.ArgDispatcher
CGREvent: args.CGREvent,
UsageID: originID,
Units: 1,
ArgDispatcher: args.ArgDispatcher,
}
var allocMessage string
if err = sS.resS.Call(utils.ResourceSv1AllocateResources,
@@ -2071,11 +2043,8 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
}
var tIDs []string
thEv := &engine.ArgsProcessEvent{
CGREvent: args.CGREvent,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
thEv.ArgDispatcher = args.ArgDispatcher
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent,
thEv, &tIDs); err != nil &&
@@ -2091,10 +2060,9 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
return utils.NewErrNotConnected(utils.StatService)
}
var statReply []string
statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
statArgs.ArgDispatcher = args.ArgDispatcher
statArgs := &engine.StatsArgsProcessEvent{
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
if err := sS.statS.Call(utils.StatSv1ProcessEvent,
statArgs, &statReply); err != nil &&
@@ -2153,8 +2121,11 @@ func (sS *SessionS) BiRPCv1InitiateSessionWithDigest(clnt rpcclient.RpcClientCon
// NewV1UpdateSessionArgs is a constructor for update session arguments
func NewV1UpdateSessionArgs(attrs, acnts bool,
cgrEv utils.CGREvent) (args *V1UpdateSessionArgs) {
args = &V1UpdateSessionArgs{GetAttributes: attrs,
UpdateSession: acnts, CGREvent: cgrEv}
args = &V1UpdateSessionArgs{
GetAttributes: attrs,
UpdateSession: acnts,
CGREvent: cgrEv,
}
//check if we have APIKey in event and in case it has add it in ArgDispatcher
apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey]
if hasApiKey {
@@ -2249,12 +2220,9 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection,
return utils.NewErrNotConnected(utils.AttributeS)
}
attrArgs := &engine.AttrArgsProcessEvent{
Context: utils.StringPointer(utils.MetaSessionS),
CGREvent: args.CGREvent,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
attrArgs.ArgDispatcher = args.ArgDispatcher
Context: utils.StringPointer(utils.MetaSessionS),
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
var rplyEv engine.AttrSProcessEventReply
if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent,
@@ -2417,13 +2385,10 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection,
}
var reply string
argsRU := utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired
Units: 1,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
argsRU.ArgDispatcher = args.ArgDispatcher
CGREvent: args.CGREvent,
UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired
Units: 1,
ArgDispatcher: args.ArgDispatcher,
}
if err = sS.resS.Call(utils.ResourceSv1ReleaseResources,
argsRU, &reply); err != nil {
@@ -2436,11 +2401,8 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection,
}
var tIDs []string
thEv := &engine.ArgsProcessEvent{
CGREvent: args.CGREvent,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
thEv.ArgDispatcher = args.ArgDispatcher
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
@@ -2454,10 +2416,9 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection,
return utils.NewErrNotConnected(utils.StatS)
}
var statReply []string
statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
statArgs.ArgDispatcher = args.ArgDispatcher
statArgs := &engine.StatsArgsProcessEvent{
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
if err := sS.statS.Call(utils.StatSv1ProcessEvent,
statArgs, &statReply); err != nil &&
@@ -2473,14 +2434,14 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection,
// BiRPCv1ProcessCDR sends the CDR to CDRs
func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
cgrEv *utils.CGREvent, rply *string) (err error) {
if cgrEv.ID == "" {
cgrEv.ID = utils.GenUUID()
cgrEvWithArgDisp *utils.CGREventWithArgDispatcher, rply *string) (err error) {
if cgrEvWithArgDisp.ID == "" {
cgrEvWithArgDisp.ID = utils.GenUUID()
}
// RPC caching
if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessCDR, cgrEv.ID)
cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessCDR, cgrEvWithArgDisp.ID)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
@@ -2498,7 +2459,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
}
// end of RPC caching
ev := engine.NewSafEvent(cgrEv.Event)
ev := engine.NewSafEvent(cgrEvWithArgDisp.Event)
cgrID := GetSetCGRID(ev)
ss := sS.getRelocateSessions(cgrID,
ev.GetStringIgnoreErrors(utils.InitialOriginID),
@@ -2517,7 +2478,8 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
}
if s == nil { // no cached session, CDR will be handled by CDRs
return sS.cdrS.Call(utils.CDRsV1ProcessEvent,
&engine.ArgV1ProcessEvent{CGREvent: *cgrEv}, rply)
&engine.ArgV1ProcessEvent{CGREvent: *cgrEvWithArgDisp.CGREvent,
ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher}, rply)
}
// Use previously stored Session to generate CDRs
@@ -2538,9 +2500,11 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
var withErrors bool
for _, cgrEv := range cgrEvs {
argsProc := &engine.ArgV1ProcessEvent{
CGREvent: *cgrEv,
ChargerS: utils.BoolPointer(false),
AttributeS: utils.BoolPointer(false)}
CGREvent: *cgrEv,
ChargerS: utils.BoolPointer(false),
AttributeS: utils.BoolPointer(false),
ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher,
}
if unratedReqs.HasField( // order additional rating for unrated request types
engine.NewMapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) {
argsProc.RALs = utils.BoolPointer(true)
@@ -2671,12 +2635,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
return utils.NewErrNotConnected(utils.AttributeS)
}
attrArgs := &engine.AttrArgsProcessEvent{
Context: utils.StringPointer(utils.MetaSessionS),
CGREvent: args.CGREvent,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
attrArgs.ArgDispatcher = args.ArgDispatcher
Context: utils.StringPointer(utils.MetaSessionS),
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
var rplyEv engine.AttrSProcessEventReply
if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent,
@@ -2700,13 +2661,10 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
attrRU := utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
UsageID: originID,
Units: 1,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
attrRU.ArgDispatcher = args.ArgDispatcher
CGREvent: args.CGREvent,
UsageID: originID,
Units: 1,
ArgDispatcher: args.ArgDispatcher,
}
var allocMessage string
if err = sS.resS.Call(utils.ResourceSv1AllocateResources,
@@ -2729,11 +2687,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
}
var tIDs []string
thEv := &engine.ArgsProcessEvent{
CGREvent: args.CGREvent,
}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
thEv.ArgDispatcher = args.ArgDispatcher
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent,
thEv, &tIDs); err != nil &&
@@ -2748,10 +2703,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
return utils.NewErrNotConnected(utils.StatS)
}
var statReply []string
statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}
// in case we receive ArgDispatcher we add it to be used by DispatcherS
if args.ArgDispatcher != nil {
statArgs.ArgDispatcher = args.ArgDispatcher
statArgs := &engine.StatsArgsProcessEvent{
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
if err := sS.statS.Call(utils.StatSv1ProcessEvent,
statArgs, &statReply); err != nil &&
@@ -2919,11 +2873,12 @@ func (sS *SessionS) BiRPCV1ProcessCDR(clnt rpcclient.RpcClientConnection,
ev engine.MapEvent, rply *string) (err error) {
return sS.BiRPCv1ProcessCDR(
clnt,
&utils.CGREvent{
Tenant: utils.FirstNonEmpty(
ev.GetStringIgnoreErrors(utils.Tenant),
sS.cgrCfg.GeneralCfg().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: ev},
&utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: utils.FirstNonEmpty(
ev.GetStringIgnoreErrors(utils.Tenant),
sS.cgrCfg.GeneralCfg().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: ev}},
rply)
}

View File

@@ -828,7 +828,6 @@ type RPCCDRsFilter struct {
MaxCost *float64 // End of the usage interval (<)
OrderBy string // Ascendent/Descendent
Paginator // Add pagination
*TenantWithArgDispatcher
}
func (self *RPCCDRsFilter) AsCDRsFilter(timezone string) (*CDRsFilter, error) {
@@ -1358,3 +1357,8 @@ func AppendToSMCostFilter(smcFilter *SMCostFilter, fieldType, fieldName string,
}
return smcFilter, err
}
type RPCCDRsFilterWithArgDispatcher struct {
*RPCCDRsFilter
*TenantWithArgDispatcher
}