diff --git a/agents/diamagent.go b/agents/diamagent.go index a4aacd3ae..e6ecaa015 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -347,7 +347,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor, return } case utils.MetaMessage: - evArgs := sessions.NewV1ProcessMessageArgs( + msgArgs := sessions.NewV1ProcessMessageArgs( reqProcessor.Flags.HasKey(utils.MetaAttributes), reqProcessor.Flags.ParamsSlice(utils.MetaAttributes), reqProcessor.Flags.HasKey(utils.MetaThresholds), @@ -362,6 +362,24 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor, cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator) rply := new(sessions.V1ProcessMessageReply) err = da.sS.Call(utils.SessionSv1ProcessMessage, + msgArgs, rply) + if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { + cgrEv.Event[utils.Usage] = 0 // avoid further debits + } else if rply.MaxUsage != nil { + cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit + } + if err = agReq.setCGRReply(rply, err); err != nil { + return + } + case utils.MetaEvent: + evArgs := &sessions.V1ProcessEventArgs{ + Flags: reqProcessor.Flags.SliceFlags(), + CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher, + Paginator: *cgrArgs.SupplierPaginator, + } + rply := new(sessions.V1ProcessEventReply) + err = da.sS.Call(utils.SessionSv1ProcessEvent, evArgs, rply) if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { cgrEv.Event[utils.Usage] = 0 // avoid further debits diff --git a/agents/dnsagent.go b/agents/dnsagent.go index eb295de16..707408809 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -274,6 +274,24 @@ func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor, if err = agReq.setCGRReply(rply, err); err != nil { return } + case utils.MetaEvent: + evArgs := &sessions.V1ProcessEventArgs{ + Flags: reqProcessor.Flags.SliceFlags(), + CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher, + Paginator: *cgrArgs.SupplierPaginator, + } + rply := new(sessions.V1ProcessEventReply) + err = da.sS.Call(utils.SessionSv1ProcessEvent, + evArgs, rply) + if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { + cgrEv.Event[utils.Usage] = 0 // avoid further debits + } else if rply.MaxUsage != nil { + cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit + } + if err = agReq.setCGRReply(rply, err); err != nil { + return + } case utils.MetaCDRs: // allow CDR processing } // separate request so we can capture the Terminate/Event also here diff --git a/agents/httpagent.go b/agents/httpagent.go index 6be780312..72c940067 100644 --- a/agents/httpagent.go +++ b/agents/httpagent.go @@ -220,6 +220,24 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor, if err = agReq.setCGRReply(nil, err); err != nil { return } + case utils.MetaEvent: + evArgs := &sessions.V1ProcessEventArgs{ + Flags: reqProcessor.Flags.SliceFlags(), + CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher, + Paginator: *cgrArgs.SupplierPaginator, + } + rply := new(sessions.V1ProcessEventReply) + err = ha.sessionS.Call(utils.SessionSv1ProcessEvent, + evArgs, rply) + if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { + cgrEv.Event[utils.Usage] = 0 // avoid further debits + } else if rply.MaxUsage != nil { + cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit + } + if err = agReq.setCGRReply(rply, err); err != nil { + return + } case utils.MetaCDRs: // allow CDR processing } // separate request so we can capture the Terminate/Event also here diff --git a/agents/radagent.go b/agents/radagent.go index bbfd65228..d5fbce701 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -265,6 +265,24 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RequestProcessor, if err = agReq.setCGRReply(rply, err); err != nil { return } + case utils.MetaEvent: + evArgs := &sessions.V1ProcessEventArgs{ + Flags: reqProcessor.Flags.SliceFlags(), + CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher, + Paginator: *cgrArgs.SupplierPaginator, + } + rply := new(sessions.V1ProcessEventReply) + err = ra.sessionS.Call(utils.SessionSv1ProcessEvent, + evArgs, rply) + if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { + cgrEv.Event[utils.Usage] = 0 // avoid further debits + } else if rply.MaxUsage != nil { + cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit + } + if err = agReq.setCGRReply(rply, err); err != nil { + return + } case utils.MetaCDRs: // allow this method } // separate request so we can capture the Terminate/Event also here diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index bce1b090d..d2786c031 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -439,12 +439,18 @@ func (dS *DispatcherSessionSv1) ProcessCDR(args *utils.CGREventWithArgDispatcher return dS.dS.SessionSv1ProcessCDR(args, reply) } -// ProcessEvent implements SessionSv1ProcessEvent +// ProcessMessage implements SessionSv1ProcessMessage func (dS *DispatcherSessionSv1) ProcessMessage(args *sessions.V1ProcessMessageArgs, reply *sessions.V1ProcessMessageReply) (err error) { return dS.dS.SessionSv1ProcessMessage(args, reply) } +// ProcessMessage implements SessionSv1ProcessMessage +func (dS *DispatcherSessionSv1) ProcessEvent(args *sessions.V1ProcessEventArgs, + reply *sessions.V1ProcessEventReply) (err error) { + return dS.dS.SessionSv1ProcessEvent(args, reply) +} + // TerminateSession implements SessionSv1TerminateSession func (dS *DispatcherSessionSv1) TerminateSession(args *sessions.V1TerminateSessionArgs, reply *string) (err error) { diff --git a/apier/v1/dispatcher_interface.go b/apier/v1/dispatcher_interface.go index a6e933309..c391ef2d8 100644 --- a/apier/v1/dispatcher_interface.go +++ b/apier/v1/dispatcher_interface.go @@ -81,6 +81,7 @@ type SessionSv1Interface interface { TerminateSession(args *sessions.V1TerminateSessionArgs, rply *string) error ProcessCDR(cgrEv *utils.CGREventWithArgDispatcher, rply *string) error ProcessMessage(args *sessions.V1ProcessMessageArgs, rply *sessions.V1ProcessMessageReply) error + ProcessEvent(args *sessions.V1ProcessEventArgs, rply *sessions.V1ProcessEventReply) error GetActiveSessions(args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error GetActiveSessionsCount(args *utils.SessionFilter, rply *int) error ForceDisconnect(args *utils.SessionFilter, rply *string) error diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index 5b8f45455..64a5512a6 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -77,6 +77,11 @@ func (ssv1 *SessionSv1) ProcessMessage(args *sessions.V1ProcessMessageArgs, return ssv1.Ss.BiRPCv1ProcessMessage(nil, args, rply) } +func (ssv1 *SessionSv1) ProcessEvent(args *sessions.V1ProcessEventArgs, + rply *sessions.V1ProcessEventReply) error { + return ssv1.Ss.BiRPCv1ProcessEvent(nil, args, rply) +} + func (ssv1 *SessionSv1) GetActiveSessions(args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error { return ssv1.Ss.BiRPCv1GetActiveSessions(nil, args, rply) diff --git a/apier/v1/sessions_process_event_it_test.go b/apier/v1/sessions_process_event_it_test.go new file mode 100644 index 000000000..8f71c7631 --- /dev/null +++ b/apier/v1/sessions_process_event_it_test.go @@ -0,0 +1,402 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/utils" +) + +//Use from sessionsv1_it_test.go +//functions insted of duplicate them here +// eg: initCfg,ResetDB,StopEngine,etc... +var sTestSessionSv1ProcessEvent = []func(t *testing.T){ + testSSv1ItInitCfg, + testSSv1ItResetDataDb, + testSSv1ItResetStorDb, + testSSv1ItStartEngine, + testSSv1ItRpcConn, + testSSv1ItPing, + testSSv1ItTPFromFolder, + testSSv1ItProcessEventAuth, + testSSv1ItProcessEventInitiateSession, + testSSv1ItProcessEventUpdateSession, + testSSv1ItProcessEventTerminateSession, + testSSv1ItProcessCDRForSessionFromProcessEvent, + testSSv1ItGetCDRs, + testSSv1ItStopCgrEngine, +} + +func TestSSv1ItProcessEventWithPrepaid(t *testing.T) { + sSV1RequestType = utils.META_PREPAID + for _, stest := range sTestSessionSv1ProcessEvent { + t.Run(sSV1RequestType, stest) + } +} + +func TestSSv1ItProcessEventWithPostPaid(t *testing.T) { + sSV1RequestType = utils.META_POSTPAID + sTestSessionSv1ProcessEvent = append(sTestSessionSv1ProcessEvent[:len(sTestSessionSv1ProcessEvent)-3], testSSv1ItStopCgrEngine) + for _, stest := range sTestSessionSv1ProcessEvent { + t.Run(sSV1RequestType, stest) + } +} + +func TestSSv1ItProcessEventWithRated(t *testing.T) { + sSV1RequestType = utils.META_RATED + sTestSessionSv1ProcessEvent = append(sTestSessionSv1ProcessEvent[:len(sTestSessionSv1ProcessEvent)-3], testSSv1ItStopCgrEngine) + for _, stest := range sTestSessionSv1ProcessEvent { + t.Run(sSV1RequestType, stest) + } +} + +func TestSSv1ItProcessEventWithPseudoPrepaid(t *testing.T) { + sSV1RequestType = utils.META_PSEUDOPREPAID + for _, stest := range sTestSessionSv1ProcessEvent { + t.Run(sSV1RequestType, stest) + } +} + +func testSSv1ItProcessEventAuth(t *testing.T) { + authUsage := 5 * time.Minute + args := &sessions.V1ProcessEventArgs{ + Flags: []string{"*resources:*authorize", "*auth", "*suppliers", "*attributes"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItProcessEventAuth", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testSSv1ItProcessEvent", + utils.RequestType: sSV1RequestType, + utils.Account: "1001", + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.Usage: authUsage, + }, + }, + } + var rply sessions.V1ProcessEventReply + if err := sSv1BiRpc.Call(utils.SessionSv1ProcessEvent, args, &rply); err != nil { + t.Fatal(err) + } + if *rply.MaxUsage != authUsage { + t.Errorf("Unexpected MaxUsage: %v", rply.MaxUsage) + } + if *rply.ResourceAuthorization == "" { + t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAuthorization) + } + eSplrs := &engine.SortedSuppliers{ + ProfileID: "SPL_ACNT_1001", + Sorting: utils.MetaWeight, + Count: 2, + SortedSuppliers: []*engine.SortedSupplier{ + { + SupplierID: "supplier1", + SortingData: map[string]interface{}{ + "Weight": 20.0, + }, + }, + { + SupplierID: "supplier2", + SortingData: map[string]interface{}{ + "Weight": 10.0, + }, + }, + }, + } + if !reflect.DeepEqual(eSplrs, rply.Suppliers) { + t.Errorf("expecting: %+v,\n received: %+v", utils.ToJSON(eSplrs), utils.ToJSON(rply.Suppliers)) + } + eAttrs := &engine.AttrSProcessEventReply{ + MatchedProfiles: []string{"ATTR_ACNT_1001"}, + AlteredFields: []string{"OfficeGroup"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItProcessEventAuth", + Event: map[string]interface{}{ + utils.CGRID: "4be779c004d9f784e836db9ffd41b50319d71fe8", + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.Account: "1001", + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + "OfficeGroup": "Marketing", + utils.OriginID: "testSSv1ItProcessEvent", + utils.RequestType: sSV1RequestType, + utils.SetupTime: "2018-01-07T17:00:00Z", + utils.Usage: 300000000000.0, + }, + }, + } + if !reflect.DeepEqual(eAttrs, rply.Attributes) { + t.Errorf("expecting: %+v, received: %+v", + utils.ToJSON(eAttrs), utils.ToJSON(rply.Attributes)) + } +} + +func testSSv1ItProcessEventInitiateSession(t *testing.T) { + initUsage := 5 * time.Minute + args := &sessions.V1ProcessEventArgs{ + Flags: []string{utils.MetaInitiate, "*resources:*allocate", "*attributes"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItProcessEventInitiateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testSSv1ItProcessEvent", + utils.RequestType: sSV1RequestType, + utils.Account: "1001", + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: initUsage, + }, + }, + } + var rply sessions.V1ProcessEventReply + if err := sSv1BiRpc.Call(utils.SessionSv1ProcessEvent, + args, &rply); err != nil { + t.Error(err) + } + // in case of prepaid and pseudoprepade we expect a MaxUsage of 5min + // and in case of postpaid and rated we expect -1 + if ((sSV1RequestType == utils.META_PREPAID || + sSV1RequestType == utils.META_PSEUDOPREPAID) && *rply.MaxUsage != initUsage) || + ((sSV1RequestType == utils.META_POSTPAID || + sSV1RequestType == utils.META_RATED) && *rply.MaxUsage != -1) { + t.Errorf("Unexpected MaxUsage: %v", rply.MaxUsage) + } + if *rply.ResourceAllocation != "RES_ACNT_1001" { + t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAllocation) + } + eAttrs := &engine.AttrSProcessEventReply{ + MatchedProfiles: []string{"ATTR_ACNT_1001"}, + AlteredFields: []string{"OfficeGroup"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItProcessEventInitiateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.Account: "1001", + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + "OfficeGroup": "Marketing", + utils.OriginID: "testSSv1ItProcessEvent", + utils.RequestType: sSV1RequestType, + utils.SetupTime: "2018-01-07T17:00:00Z", + utils.AnswerTime: "2018-01-07T17:00:10Z", + utils.Usage: 300000000000.0, + }, + }, + } + if !reflect.DeepEqual(eAttrs, rply.Attributes) { + t.Errorf("expecting: %+v, received: %+v", + utils.ToJSON(eAttrs), utils.ToJSON(rply.Attributes)) + } + aSessions := make([]*sessions.ExternalSession, 0) + if err := sSv1BiRpc.Call(utils.SessionSv1GetActiveSessions, &utils.SessionFilter{}, &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 2 { + t.Errorf("wrong active sessions: %s \n , and len(aSessions) %+v", utils.ToJSON(aSessions), len(aSessions)) + } +} + +func testSSv1ItProcessEventUpdateSession(t *testing.T) { + reqUsage := 5 * time.Minute + args := &sessions.V1ProcessEventArgs{ + Flags: []string{utils.MetaUpdate, "*attributes"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItProcessEventUpdateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testSSv1ItProcessEvent", + utils.RequestType: sSV1RequestType, + utils.Account: "1001", + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: reqUsage, + }, + }, + } + var rply sessions.V1ProcessEventReply + if err := sSv1BiRpc.Call(utils.SessionSv1ProcessEvent, + args, &rply); err != nil { + t.Error(err) + } + eAttrs := &engine.AttrSProcessEventReply{ + MatchedProfiles: []string{"ATTR_ACNT_1001"}, + AlteredFields: []string{"OfficeGroup"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItProcessEventUpdateSession", + Event: map[string]interface{}{ + utils.CGRID: "4be779c004d9f784e836db9ffd41b50319d71fe8", + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.Account: "1001", + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + "OfficeGroup": "Marketing", + utils.OriginID: "testSSv1ItProcessEvent", + utils.RequestType: sSV1RequestType, + utils.SetupTime: "2018-01-07T17:00:00Z", + utils.AnswerTime: "2018-01-07T17:00:10Z", + utils.Usage: 300000000000.0, + }, + }, + } + if !reflect.DeepEqual(eAttrs, rply.Attributes) { + t.Errorf("expecting: %+v, received: %+v", + utils.ToJSON(eAttrs), utils.ToJSON(rply.Attributes)) + } + // in case of prepaid and pseudoprepade we expect a MaxUsage of 5min + // and in case of postpaid and rated we expect -1 + if ((sSV1RequestType == utils.META_PREPAID || + sSV1RequestType == utils.META_PSEUDOPREPAID) && *rply.MaxUsage != reqUsage) || + ((sSV1RequestType == utils.META_POSTPAID || + sSV1RequestType == utils.META_RATED) && *rply.MaxUsage != -1) { + t.Errorf("Unexpected MaxUsage: %v", rply.MaxUsage) + } + aSessions := make([]*sessions.ExternalSession, 0) + if err := sSv1BiRpc.Call(utils.SessionSv1GetActiveSessions, &utils.SessionFilter{}, &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 2 { + t.Errorf("wrong active sessions: %s", utils.ToJSON(aSessions)) + } +} + +func testSSv1ItProcessEventTerminateSession(t *testing.T) { + args := &sessions.V1ProcessEventArgs{ + Flags: []string{utils.MetaTerminate, "*resources:*release"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItProcessEventTerminateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testSSv1ItProcessEvent", + utils.RequestType: sSV1RequestType, + utils.Account: "1001", + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 10 * time.Minute, + }, + }, + } + var rply sessions.V1ProcessEventReply + if err := sSv1BiRpc.Call(utils.SessionSv1ProcessEvent, + args, &rply); err != nil { + t.Error(err) + } + aSessions := make([]*sessions.ExternalSession, 0) + if err := sSv1BiRpc.Call(utils.SessionSv1GetActiveSessions, &utils.SessionFilter{}, &aSessions); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func testSSv1ItProcessCDRForSessionFromProcessEvent(t *testing.T) { + args := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItProcessCDRForSessionFromProcessEvent", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testSSv1ItProcessEvent", + utils.RequestType: sSV1RequestType, + utils.Account: "1001", + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 10 * time.Minute, + }, + } + var rply string + if err := sSv1BiRpc.Call(utils.SessionSv1ProcessCDR, + args, &rply); err != nil { + t.Error(err) + } + if rply != utils.OK { + t.Errorf("Unexpected reply: %s", rply) + } + time.Sleep(100 * time.Millisecond) +} + +func testSSv1ItGetCDRs(t *testing.T) { + var cdrCnt int64 + req := utils.AttrGetCdrs{} + if err := sSApierRpc.Call(utils.CDRsV1CountCDRs, req, &cdrCnt); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if cdrCnt != 3 { // 3 for each CDR + t.Error("Unexpected number of CDRs returned: ", cdrCnt) + } + + var cdrs []*engine.CDR + args := utils.RPCCDRsFilter{RunIDs: []string{utils.MetaRaw}} + if err := sSApierRpc.Call(utils.CDRsV1GetCDRs, args, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } else { + if cdrs[0].Cost != -1.0 { + t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost) + } + } + args = utils.RPCCDRsFilter{RunIDs: []string{"CustomerCharges"}, + OriginIDs: []string{"testSSv1ItProcessEvent"}} + if err := sSApierRpc.Call(utils.CDRsV1GetCDRs, args, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } else { + if cdrs[0].Cost != 0.198 { + t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost) + } + } + args = utils.RPCCDRsFilter{RunIDs: []string{"SupplierCharges"}, + OriginIDs: []string{"testSSv1ItProcessEvent"}} + if err := sSApierRpc.Call(utils.CDRsV1GetCDRs, args, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } else { + if cdrs[0].Cost != 0.102 { + t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost) + } + } +} diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index c298f2c71..9f4f4aa36 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -41,6 +41,7 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { utils.SessionSv1TerminateSession: ssv1.BiRPCv1TerminateSession, utils.SessionSv1ProcessCDR: ssv1.BiRPCv1ProcessCDR, utils.SessionSv1ProcessMessage: ssv1.BiRPCv1ProcessMessage, + utils.SessionSv1ProcessEvent: ssv1.BiRPCv1ProcessEvent, utils.SessionSv1ForceDisconnect: ssv1.BiRPCv1ForceDisconnect, utils.SessionSv1RegisterInternalBiJSONConn: ssv1.BiRPCv1RegisterInternalBiJSONConn, @@ -95,6 +96,11 @@ func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(clnt *rpc2.Client, args *sessions. return ssv1.Ss.BiRPCv1ProcessMessage(clnt, args, rply) } +func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, + rply *sessions.V1ProcessEventReply) error { + return ssv1.Ss.BiRPCv1ProcessEvent(clnt, args, rply) +} + func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error { return ssv1.Ss.BiRPCv1GetActiveSessions(clnt, args, rply) diff --git a/dispatchers/sessions.go b/dispatchers/sessions.go index 54b664151..88ddf5a9f 100755 --- a/dispatchers/sessions.go +++ b/dispatchers/sessions.go @@ -225,6 +225,26 @@ func (dS *DispatcherService) SessionSv1ProcessMessage(args *sessions.V1ProcessMe utils.SessionSv1ProcessMessage, args, reply) } +func (dS *DispatcherService) SessionSv1ProcessEvent(args *sessions.V1ProcessEventArgs, + reply *sessions.V1ProcessEventReply) (err error) { + if dS.attrS != nil { + if args.ArgDispatcher == nil { + return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) + } + if err = dS.authorize(utils.SessionSv1ProcessEvent, + args.CGREvent.Tenant, + args.APIKey, args.CGREvent.Time); err != nil { + return + } + } + var routeID *string + if args.ArgDispatcher != nil { + routeID = args.ArgDispatcher.RouteID + } + return dS.Dispatch(args.CGREvent, utils.MetaSessionS, routeID, + utils.SessionSv1ProcessEvent, args, reply) +} + func (dS *DispatcherService) SessionSv1GetActiveSessions(args *utils.SessionFilter, reply *[]*sessions.ExternalSession) (err error) { if dS.attrS != nil { diff --git a/sessions/sessions.go b/sessions/sessions.go index 14a4b5f14..70bcf49dd 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -2866,6 +2866,45 @@ type V1ProcessEventReply struct { StatQueueIDs *[]string } +// AsNavigableMap is part of engine.NavigableMapper interface +func (v1Rply *V1ProcessEventReply) AsNavigableMap( + ignr []*config.FCTemplate) (*config.NavigableMap, error) { + cgrReply := make(map[string]interface{}) + if v1Rply != nil { + if v1Rply.MaxUsage != nil { + cgrReply[utils.CapMaxUsage] = *v1Rply.MaxUsage + } + if v1Rply.ResourceAuthorization != nil { + cgrReply[utils.CapResourceAuthorization] = *v1Rply.ResourceAuthorization + } + if v1Rply.ResourceAllocation != nil { + cgrReply[utils.CapResourceAllocation] = *v1Rply.ResourceAllocation + } + if v1Rply.ResourceRelease != nil { + cgrReply[utils.CapResourceRelease] = *v1Rply.ResourceRelease + } + if v1Rply.Attributes != nil { + attrs := make(map[string]interface{}) + for _, fldName := range v1Rply.Attributes.AlteredFields { + if v1Rply.Attributes.CGREvent.HasField(fldName) { + attrs[fldName] = v1Rply.Attributes.CGREvent.Event[fldName] + } + } + cgrReply[utils.CapAttributes] = attrs + } + if v1Rply.Suppliers != nil { + cgrReply[utils.CapSuppliers] = v1Rply.Suppliers.AsNavigableMap() + } + if v1Rply.ThresholdIDs != nil { + cgrReply[utils.CapThresholds] = *v1Rply.ThresholdIDs + } + if v1Rply.StatQueueIDs != nil { + cgrReply[utils.CapStatQueues] = *v1Rply.StatQueueIDs + } + } + return config.NewNavigableMap(cgrReply), nil +} + // BiRPCv1ProcessEvent processes one event with the right subsystems based on arguments received func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, args *V1ProcessEventArgs, rply *V1ProcessEventReply) (err error) { diff --git a/utils/consts.go b/utils/consts.go index 39b24a261..b38b760ad 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -669,39 +669,41 @@ const ( // Migrator Metas const ( - MetaSetVersions = "*set_versions" - MetaEnsureIndexes = "*ensure_indexes" - MetaTpRatingPlans = "*tp_rating_plans" - MetaTpFilters = "*tp_filters" - MetaTpDestinationRates = "*tp_destination_rates" - MetaTpActionTriggers = "*tp_action_triggers" - MetaTpAccountActions = "*tp_account_actions" - MetaTpActionPlans = "*tp_action_plans" - MetaTpActions = "*tp_actions" - MetaTpThresholds = "*tp_thresholds" - MetaTpSuppliers = "*tp_suppliers" - MetaTpStats = "*tp_stats" - MetaTpSharedGroups = "*tp_shared_groups" - MetaTpRatingProfiles = "*tp_rating_profiles" - MetaTpResources = "*tp_resources" - MetaTpRates = "*tp_rates" - MetaTpTimings = "*tp_timings" - MetaTpResource = "*tp_resources" - MetaTpCdrStats = "*tp_cdrstats" - MetaTpDestinations = "*tp_destinations" - MetaTpRatingPlan = "*tp_rating_plans" - MetaTpRatingProfile = "*tp_rating_profiles" - MetaTpChargers = "*tp_chargers" - MetaTpDispatchers = "*tp_dispatchers" - MetaDurationSeconds = "*duration_seconds" - MetaDurationNanoseconds = "*duration_nanoseconds" - CapAttributes = "Attributes" - CapResourceAllocation = "ResourceAllocation" - CapMaxUsage = "MaxUsage" - CapSuppliers = "Suppliers" - CapThresholdHits = "ThresholdHits" - CapThresholds = "Thresholds" - CapStatQueues = "StatQueues" + MetaSetVersions = "*set_versions" + MetaEnsureIndexes = "*ensure_indexes" + MetaTpRatingPlans = "*tp_rating_plans" + MetaTpFilters = "*tp_filters" + MetaTpDestinationRates = "*tp_destination_rates" + MetaTpActionTriggers = "*tp_action_triggers" + MetaTpAccountActions = "*tp_account_actions" + MetaTpActionPlans = "*tp_action_plans" + MetaTpActions = "*tp_actions" + MetaTpThresholds = "*tp_thresholds" + MetaTpSuppliers = "*tp_suppliers" + MetaTpStats = "*tp_stats" + MetaTpSharedGroups = "*tp_shared_groups" + MetaTpRatingProfiles = "*tp_rating_profiles" + MetaTpResources = "*tp_resources" + MetaTpRates = "*tp_rates" + MetaTpTimings = "*tp_timings" + MetaTpResource = "*tp_resources" + MetaTpCdrStats = "*tp_cdrstats" + MetaTpDestinations = "*tp_destinations" + MetaTpRatingPlan = "*tp_rating_plans" + MetaTpRatingProfile = "*tp_rating_profiles" + MetaTpChargers = "*tp_chargers" + MetaTpDispatchers = "*tp_dispatchers" + MetaDurationSeconds = "*duration_seconds" + MetaDurationNanoseconds = "*duration_nanoseconds" + CapAttributes = "Attributes" + CapResourceAllocation = "ResourceAllocation" + CapResourceAuthorization = "ResourceAuthorization" + CapResourceRelease = "ResourceRelease" + CapMaxUsage = "MaxUsage" + CapSuppliers = "Suppliers" + CapThresholdHits = "ThresholdHits" + CapThresholds = "Thresholds" + CapStatQueues = "StatQueues" ) const (