diff --git a/apier/v1/callsetup.go b/apier/v1/callsetup.go index 0e8f96202..40c88e833 100644 --- a/apier/v1/callsetup.go +++ b/apier/v1/callsetup.go @@ -20,10 +20,11 @@ package v1 import ( "fmt" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" "strconv" "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) // Returns MaxSessionTime in seconds, -1 for no limit @@ -60,7 +61,7 @@ func (self *ApierV1) GetMaxSessionTime(auth engine.MaxUsageReq, maxSessionTime * return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } var maxDur float64 - if err := self.Responder.GetDerivedMaxSessionTime(*storedCdr, &maxDur); err != nil { + if err := self.Responder.GetDerivedMaxSessionTime(storedCdr, &maxDur); err != nil { return err } if maxDur == -1.0 { diff --git a/apier/v1/costs.go b/apier/v1/costs.go index 1f61a0282..8cf756d59 100644 --- a/apier/v1/costs.go +++ b/apier/v1/costs.go @@ -36,7 +36,7 @@ type AttrGetDataCost struct { func (apier *ApierV1) GetDataCost(attrs AttrGetDataCost, reply *engine.DataCost) error { usageAsDuration := time.Duration(attrs.Usage) * time.Second // Convert to seconds to match the loaded rates - cd := engine.CallDescriptor{ + cd := &engine.CallDescriptor{ Direction: attrs.Direction, Category: attrs.Category, Tenant: attrs.Tenant, diff --git a/apier/v1/derivedcharging.go b/apier/v1/derivedcharging.go index 7486a2264..438ee4cbe 100644 --- a/apier/v1/derivedcharging.go +++ b/apier/v1/derivedcharging.go @@ -30,7 +30,7 @@ func (self *ApierV1) GetDerivedChargers(attrs utils.AttrDerivedChargers, reply * if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Direction", "Account", "Subject"}); len(missing) != 0 { return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } - if hDc, err := engine.HandleGetDerivedChargers(self.AccountDb, attrs); err != nil { + if hDc, err := engine.HandleGetDerivedChargers(self.AccountDb, &attrs); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } else if hDc != nil { *reply = hDc diff --git a/engine/cdrs.go b/engine/cdrs.go index 51dfe5573..e8c3018f6 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "errors" "fmt" "io/ioutil" "net/http" @@ -99,14 +100,24 @@ func (self *CdrServer) ProcessExternalCdr(cdr *ExternalCdr) error { } type CallCostLog struct { - CgrId string - Source string - RunId string - CallCost *CallCost + CgrId string + Source string + RunId string + CallCost *CallCost + CheckDuplicate bool } // RPC method, used to log callcosts to db func (self *CdrServer) LogCallCost(ccl *CallCostLog) error { + if ccl.CheckDuplicate { + cc, err := self.cdrDb.GetCallCostLog(ccl.CgrId, ccl.Source, ccl.RunId) + if err != nil { + return err + } + if cc != nil { + return errors.New(utils.ERR_EXISTS) + } + } return self.cdrDb.LogCallCost(ccl.CgrId, ccl.Source, ccl.RunId, ccl.CallCost) } @@ -221,7 +232,7 @@ func (self *CdrServer) getCostFromRater(storedCdr *StoredCdr) (*CallCost, error) //} cc := new(CallCost) var err error - cd := CallDescriptor{ + cd := &CallDescriptor{ TOR: storedCdr.TOR, Direction: storedCdr.Direction, Tenant: storedCdr.Tenant, @@ -254,7 +265,7 @@ func (self *CdrServer) deriveCdrs(storedCdr *StoredCdr) ([]*StoredCdr, error) { if storedCdr.Rated { // Do not derive already rated CDRs since they should be already derived return cdrRuns, nil } - attrsDC := utils.AttrDerivedChargers{Tenant: storedCdr.Tenant, Category: storedCdr.Category, Direction: storedCdr.Direction, + attrsDC := &utils.AttrDerivedChargers{Tenant: storedCdr.Tenant, Category: storedCdr.Category, Direction: storedCdr.Direction, Account: storedCdr.Account, Subject: storedCdr.Subject} var dcs utils.DerivedChargers if err := self.rater.GetDerivedChargers(attrsDC, &dcs); err != nil { diff --git a/engine/handler_derivedcharging.go b/engine/handler_derivedcharging.go index 865b3ae81..a0fc1bcaa 100644 --- a/engine/handler_derivedcharging.go +++ b/engine/handler_derivedcharging.go @@ -23,7 +23,7 @@ import ( ) // Handles retrieving of DerivedChargers profile based on longest match from AccountingDb -func HandleGetDerivedChargers(acntStorage AccountingStorage, attrs utils.AttrDerivedChargers) (utils.DerivedChargers, error) { +func HandleGetDerivedChargers(acntStorage AccountingStorage, attrs *utils.AttrDerivedChargers) (utils.DerivedChargers, error) { var dcs utils.DerivedChargers strictKey := utils.DerivedChargersKey(attrs.Direction, attrs.Tenant, attrs.Category, attrs.Account, attrs.Subject) anySubjKey := utils.DerivedChargersKey(attrs.Direction, attrs.Tenant, attrs.Category, attrs.Account, utils.ANY) diff --git a/engine/responder.go b/engine/responder.go index 603f23134..ef3cace25 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -50,9 +50,9 @@ type Responder struct { /* RPC method thet provides the external RPC interface for getting the rating information. */ -func (rs *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) { +func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) { if rs.Bal != nil { - r, e := rs.getCallCost(&arg, "Responder.GetCost") + r, e := rs.getCallCost(arg, "Responder.GetCost") *reply, err = *r, e } else { r, e := AccLock.Guard(func() (interface{}, error) { @@ -67,9 +67,9 @@ func (rs *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) { return } -func (rs *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) { +func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) { if rs.Bal != nil { - r, e := rs.getCallCost(&arg, "Responder.Debit") + r, e := rs.getCallCost(arg, "Responder.Debit") *reply, err = *r, e } else { r, e := arg.Debit() @@ -82,9 +82,9 @@ func (rs *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) { return } -func (rs *Responder) MaxDebit(arg CallDescriptor, reply *CallCost) (err error) { +func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) { if rs.Bal != nil { - r, e := rs.getCallCost(&arg, "Responder.MaxDebit") + r, e := rs.getCallCost(arg, "Responder.MaxDebit") *reply, err = *r, e } else { r, e := arg.MaxDebit() @@ -97,9 +97,9 @@ func (rs *Responder) MaxDebit(arg CallDescriptor, reply *CallCost) (err error) { return } -func (rs *Responder) RefundIncrements(arg CallDescriptor, reply *float64) (err error) { +func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err error) { if rs.Bal != nil { - *reply, err = rs.callMethod(&arg, "Responder.RefundIncrements") + *reply, err = rs.callMethod(arg, "Responder.RefundIncrements") } else { r, e := AccLock.Guard(func() (interface{}, error) { return arg.RefundIncrements() @@ -109,9 +109,9 @@ func (rs *Responder) RefundIncrements(arg CallDescriptor, reply *float64) (err e return } -func (rs *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err error) { +func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err error) { if rs.Bal != nil { - *reply, err = rs.callMethod(&arg, "Responder.GetMaxSessionTime") + *reply, err = rs.callMethod(arg, "Responder.GetMaxSessionTime") } else { r, e := arg.GetMaxSessionDuration() *reply, err = float64(r), e @@ -120,12 +120,12 @@ func (rs *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err } // Returns MaxSessionTime for an event received in SessionManager, considering DerivedCharging for it -func (rs *Responder) GetDerivedMaxSessionTime(ev StoredCdr, reply *float64) error { +func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error { if rs.Bal != nil { return errors.New("unsupported method on the balancer") } maxCallDuration := -1.0 - attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), + attrsDC := &utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} var dcs utils.DerivedChargers if err := rs.GetDerivedChargers(attrsDC, &dcs); err != nil { @@ -151,7 +151,7 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev StoredCdr, reply *float64) erro if err != nil { return err } - cd := CallDescriptor{ + cd := &CallDescriptor{ Direction: ev.GetDirection(dc.DirectionField), Tenant: ev.GetTenant(dc.TenantField), Category: ev.GetCategory(dc.CategoryField), @@ -179,11 +179,11 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev StoredCdr, reply *float64) erro } // Used by SM to get all the prepaid CallDescriptors attached to a session -func (rs *Responder) GetSessionRuns(ev StoredCdr, sRuns *[]*SessionRun) error { +func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { if rs.Bal != nil { return errors.New("Unsupported method on the balancer") } - attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), + attrsDC := &utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} var dcs utils.DerivedChargers if err := rs.GetDerivedChargers(attrsDC, &dcs); err != nil { @@ -213,7 +213,7 @@ func (rs *Responder) GetSessionRuns(ev StoredCdr, sRuns *[]*SessionRun) error { return nil } -func (rs *Responder) GetDerivedChargers(attrs utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error { +func (rs *Responder) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error { if rs.Bal != nil { return errors.New("BALANCER_UNSUPPORTED_METHOD") } @@ -236,14 +236,14 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error { return nil } -func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *int) error { +func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *string) error { if rs.CdrSrv == nil { return errors.New("CDR_SERVER_NOT_RUNNING") } if err := rs.CdrSrv.LogCallCost(ccl); err != nil { return err } - *reply = 0 + *reply = utils.OK return nil } @@ -256,9 +256,9 @@ func (rs *Responder) GetLCR(cd *CallDescriptor, reply *LCRCost) error { return nil } -func (rs *Responder) FlushCache(arg CallDescriptor, reply *float64) (err error) { +func (rs *Responder) FlushCache(arg *CallDescriptor, reply *float64) (err error) { if rs.Bal != nil { - *reply, err = rs.callMethod(&arg, "Responder.FlushCache") + *reply, err = rs.callMethod(arg, "Responder.FlushCache") } else { r, e := AccLock.Guard(func() (interface{}, error) { return 0, arg.FlushCache() @@ -405,16 +405,16 @@ func (rw *ResponderWorker) Close() error { } type Connector interface { - GetCost(CallDescriptor, *CallCost) error - Debit(CallDescriptor, *CallCost) error - MaxDebit(CallDescriptor, *CallCost) error - RefundIncrements(CallDescriptor, *float64) error - GetMaxSessionTime(CallDescriptor, *float64) error - GetDerivedChargers(utils.AttrDerivedChargers, *utils.DerivedChargers) error - GetDerivedMaxSessionTime(StoredCdr, *float64) error - GetSessionRuns(StoredCdr, *[]*SessionRun) error + GetCost(*CallDescriptor, *CallCost) error + Debit(*CallDescriptor, *CallCost) error + MaxDebit(*CallDescriptor, *CallCost) error + RefundIncrements(*CallDescriptor, *float64) error + GetMaxSessionTime(*CallDescriptor, *float64) error + GetDerivedChargers(*utils.AttrDerivedChargers, *utils.DerivedChargers) error + GetDerivedMaxSessionTime(*StoredCdr, *float64) error + GetSessionRuns(*StoredCdr, *[]*SessionRun) error ProcessCdr(*StoredCdr, *string) error - LogCallCost(*CallCostLog, *int) error + LogCallCost(*CallCostLog, *string) error GetLCR(*CallDescriptor, *LCRCost) error } @@ -422,35 +422,35 @@ type RPCClientConnector struct { Client *rpcclient.RpcClient } -func (rcc *RPCClientConnector) GetCost(cd CallDescriptor, cc *CallCost) error { +func (rcc *RPCClientConnector) GetCost(cd *CallDescriptor, cc *CallCost) error { return rcc.Client.Call("Responder.GetCost", cd, cc) } -func (rcc *RPCClientConnector) Debit(cd CallDescriptor, cc *CallCost) error { +func (rcc *RPCClientConnector) Debit(cd *CallDescriptor, cc *CallCost) error { return rcc.Client.Call("Responder.Debit", cd, cc) } -func (rcc *RPCClientConnector) MaxDebit(cd CallDescriptor, cc *CallCost) error { +func (rcc *RPCClientConnector) MaxDebit(cd *CallDescriptor, cc *CallCost) error { return rcc.Client.Call("Responder.MaxDebit", cd, cc) } -func (rcc *RPCClientConnector) RefundIncrements(cd CallDescriptor, resp *float64) error { +func (rcc *RPCClientConnector) RefundIncrements(cd *CallDescriptor, resp *float64) error { return rcc.Client.Call("Responder.RefundIncrements", cd, resp) } -func (rcc *RPCClientConnector) GetMaxSessionTime(cd CallDescriptor, resp *float64) error { +func (rcc *RPCClientConnector) GetMaxSessionTime(cd *CallDescriptor, resp *float64) error { return rcc.Client.Call("Responder.GetMaxSessionTime", cd, resp) } -func (rcc *RPCClientConnector) GetDerivedMaxSessionTime(ev StoredCdr, reply *float64) error { +func (rcc *RPCClientConnector) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error { return rcc.Client.Call("Responder.GetDerivedMaxSessionTime", ev, reply) } -func (rcc *RPCClientConnector) GetSessionRuns(ev StoredCdr, sRuns *[]*SessionRun) error { +func (rcc *RPCClientConnector) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { return rcc.Client.Call("Responder.GetSessionRuns", ev, sRuns) } -func (rcc *RPCClientConnector) GetDerivedChargers(attrs utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error { +func (rcc *RPCClientConnector) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error { return rcc.Client.Call("ApierV1.GetDerivedChargers", attrs, dcs) } @@ -458,7 +458,7 @@ func (rcc *RPCClientConnector) ProcessCdr(cdr *StoredCdr, reply *string) error { return rcc.Client.Call("CDRSV1.ProcessCdr", cdr, reply) } -func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *int) error { +func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *string) error { return rcc.Client.Call("CDRSV1.LogCallCost", ccl, reply) } diff --git a/engine/responder_test.go b/engine/responder_test.go index 7cd82ca55..eed6f8111 100644 --- a/engine/responder_test.go +++ b/engine/responder_test.go @@ -40,7 +40,7 @@ func TestResponderGetDerivedChargers(t *testing.T) { cfgedDC := utils.DerivedChargers{&utils.DerivedCharger{RunId: "responder1", ReqTypeField: "test", DirectionField: "test", TenantField: "test", CategoryField: "test", AccountField: "test", SubjectField: "test", DestinationField: "test", SetupTimeField: "test", AnswerTimeField: "test", UsageField: "test"}} rsponder = &Responder{} - attrs := utils.AttrDerivedChargers{Tenant: "cgrates.org", Category: "call", Direction: "*out", Account: "responder_test", Subject: "responder_test"} + attrs := &utils.AttrDerivedChargers{Tenant: "cgrates.org", Category: "call", Direction: "*out", Account: "responder_test", Subject: "responder_test"} if err := accountingStorage.SetDerivedChargers(utils.DerivedChargersKey(utils.OUT, utils.ANY, utils.ANY, utils.ANY, utils.ANY), cfgedDC); err != nil { t.Error(err) } @@ -57,7 +57,7 @@ func TestResponderGetDerivedChargers(t *testing.T) { func TestGetDerivedMaxSessionTime(t *testing.T) { testTenant := "vdf" - cdr := StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", + cdr := &StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test", ReqType: utils.META_RATED, Direction: "*out", Tenant: testTenant, Category: "call", Account: "dan", Subject: "dan", Destination: "1002", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID, Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, @@ -109,7 +109,7 @@ func TestGetDerivedMaxSessionTime(t *testing.T) { t.Error("BalanceValue: ", danStoredAcnt.BalanceMap[utils.VOICE+OUTBOUND][0].Value) } var dcs utils.DerivedChargers - attrs := utils.AttrDerivedChargers{Tenant: testTenant, Category: "call", Direction: "*out", Account: "dan", Subject: "dan"} + attrs := &utils.AttrDerivedChargers{Tenant: testTenant, Category: "call", Direction: "*out", Account: "dan", Subject: "dan"} if err := rsponder.GetDerivedChargers(attrs, &dcs); err != nil { t.Error("Unexpected error", err.Error()) } else if !reflect.DeepEqual(dcs, charger1) { @@ -125,7 +125,7 @@ func TestGetDerivedMaxSessionTime(t *testing.T) { func TestGetSessionRuns(t *testing.T) { testTenant := "vdf" - cdr := StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", + cdr := &StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test", ReqType: utils.META_PREPAID, Direction: "*out", Tenant: testTenant, Category: "call", Account: "dan2", Subject: "dan2", Destination: "1002", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), Pdd: 3 * time.Second, AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), Supplier: "suppl1", MediationRunId: utils.DEFAULT_RUNID, Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 5dcf21d68..c2b65a5f4 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -37,15 +37,15 @@ type FSSessionManager struct { conns map[string]*fsock.FSock // Keep the list here for connection management purposes sessions []*Session rater engine.Connector - cdrs engine.Connector + cdrsrv engine.Connector } func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Connector) *FSSessionManager { return &FSSessionManager{ - cfg: smFsConfig, - conns: make(map[string]*fsock.FSock), - rater: rater, - cdrs: cdrs, + cfg: smFsConfig, + conns: make(map[string]*fsock.FSock), + rater: rater, + cdrsrv: cdrs, } } @@ -204,7 +204,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { return } var maxCallDuration float64 // This will be the maximum duration this channel will be allowed to last - if err := sm.rater.GetDerivedMaxSessionTime(*ev.AsStoredCdr(), &maxCallDuration); err != nil { + if err := sm.rater.GetDerivedMaxSessionTime(ev.AsStoredCdr(), &maxCallDuration); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not get max session time for %s, error: %s", ev.GetUUID(), err.Error())) } maxCallDur := time.Duration(maxCallDuration) @@ -265,7 +265,7 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) { func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error { var reply string - if err := sm.cdrs.ProcessCdr(storedCdr, &reply); err != nil { + if err := sm.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil { engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error())) } return nil @@ -275,7 +275,7 @@ func (sm *FSSessionManager) DebitInterval() time.Duration { return sm.cfg.DebitInterval } func (sm *FSSessionManager) CdrSrv() engine.Connector { - return sm.cdrs + return sm.cdrsrv } func (sm *FSSessionManager) Rater() engine.Connector { diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index 760017ed8..920eb6835 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -63,7 +63,7 @@ func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) { } var remainingDuration float64 var errMaxSession error - if errMaxSession = self.rater.GetDerivedMaxSessionTime(*kev.AsStoredCdr(), &remainingDuration); errMaxSession != nil { + if errMaxSession = self.rater.GetDerivedMaxSessionTime(kev.AsStoredCdr(), &remainingDuration); errMaxSession != nil { engine.Logger.Err(fmt.Sprintf(" Could not get max session time, error: %s", errMaxSession.Error())) } var supplStr string diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 90d65b6f4..e2454c445 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -20,10 +20,12 @@ package sessionmanager import ( "encoding/json" + "errors" "fmt" "time" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) // Session type holding the call information fields, a session delegate for specific @@ -57,7 +59,7 @@ func NewSession(ev engine.Event, connId string, sm SessionManager) *Session { sessionManager: sm, connId: connId, } - if err := sm.Rater().GetSessionRuns(*ev.AsStoredCdr(), &s.sessionRuns); err != nil || len(s.sessionRuns) == 0 { + if err := sm.Rater().GetSessionRuns(ev.AsStoredCdr(), &s.sessionRuns); err != nil || len(s.sessionRuns) == 0 { return nil } for runIdx := range s.sessionRuns { @@ -68,7 +70,7 @@ func NewSession(ev engine.Event, connId string, sm SessionManager) *Session { // the debit loop method (to be stoped by sending somenthing on stopDebit channel) func (s *Session) debitLoop(runIdx int) { - nextCd := *s.sessionRuns[runIdx].CallDescriptor + nextCd := s.sessionRuns[runIdx].CallDescriptor index := 0.0 debitPeriod := s.sessionManager.DebitInterval() for { @@ -135,59 +137,73 @@ func (s *Session) Close(ev engine.Event) error { return err } hangupTime := startTime.Add(duration) - end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd - refundDuration := end.Sub(hangupTime) - var refundIncrements engine.Increments - for i := len(lastCC.Timespans) - 1; i >= 0; i-- { - ts := lastCC.Timespans[i] - tsDuration := ts.GetDuration() - if refundDuration <= tsDuration { - lastRefundedIncrementIndex := 0 - for j := len(ts.Increments) - 1; j >= 0; j-- { - increment := ts.Increments[j] - if increment.Duration <= refundDuration { - refundIncrements = append(refundIncrements, increment) - refundDuration -= increment.Duration - lastRefundedIncrementIndex = j - } - } - ts.SplitByIncrement(lastRefundedIncrementIndex) - break // do not go to other timespans - } else { - refundIncrements = append(refundIncrements, ts.Increments...) - // remove the timespan entirely - lastCC.Timespans[i] = nil - lastCC.Timespans = lastCC.Timespans[:i] - // continue to the next timespan with what is left to refund - refundDuration -= tsDuration - } + err = s.Refund(lastCC, hangupTime) + if err != nil { + return err } - // show only what was actualy refunded (stopped in timespan) - // engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) - if len(refundIncrements) > 0 { - cd := &engine.CallDescriptor{ - Direction: lastCC.Direction, - Tenant: lastCC.Tenant, - Category: lastCC.Category, - Subject: lastCC.Subject, - Account: lastCC.Account, - Destination: lastCC.Destination, - Increments: refundIncrements, - } - var response float64 - err := s.sessionManager.Rater().RefundIncrements(*cd, &response) - if err != nil { - return err - } - } - cost := refundIncrements.GetTotalCost() - lastCC.Cost -= cost - lastCC.Timespans.Compress() } go s.SaveOperations() return nil } +func (s *Session) Refund(lastCC *engine.CallCost, hangupTime time.Time) error { + end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd + refundDuration := end.Sub(hangupTime) + var refundIncrements engine.Increments + for i := len(lastCC.Timespans) - 1; i >= 0; i-- { + ts := lastCC.Timespans[i] + tsDuration := ts.GetDuration() + if refundDuration <= tsDuration { + + lastRefundedIncrementIndex := 0 + for j := len(ts.Increments) - 1; j >= 0; j-- { + increment := ts.Increments[j] + if increment.Duration <= refundDuration { + refundIncrements = append(refundIncrements, increment) + refundDuration -= increment.Duration + lastRefundedIncrementIndex = j + } + } + if lastRefundedIncrementIndex == 0 { + lastCC.Timespans[i] = nil + lastCC.Timespans = lastCC.Timespans[:i] + } else { + ts.SplitByIncrement(lastRefundedIncrementIndex) + } + break // do not go to other timespans + } else { + refundIncrements = append(refundIncrements, ts.Increments...) + // remove the timespan entirely + lastCC.Timespans[i] = nil + lastCC.Timespans = lastCC.Timespans[:i] + // continue to the next timespan with what is left to refund + refundDuration -= tsDuration + } + } + // show only what was actualy refunded (stopped in timespan) + // engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) + if len(refundIncrements) > 0 { + cd := &engine.CallDescriptor{ + Direction: lastCC.Direction, + Tenant: lastCC.Tenant, + Category: lastCC.Category, + Subject: lastCC.Subject, + Account: lastCC.Account, + Destination: lastCC.Destination, + Increments: refundIncrements, + } + var response float64 + err := s.sessionManager.Rater().RefundIncrements(cd, &response) + if err != nil { + return err + } + } + cost := refundIncrements.GetTotalCost() + lastCC.Cost -= cost + lastCC.Timespans.Compress() + return nil +} + // Nice print for session func (s *Session) String() string { sDump, _ := json.Marshal(s) @@ -204,14 +220,23 @@ func (s *Session) SaveOperations() { for _, cc := range sr.CallCosts[1:] { firstCC.Merge(cc) } - var existingDuration int - s.sessionManager.CdrSrv().LogCallCost(&engine.CallCostLog{ - CgrId: s.eventStart.GetCgrId(), - Source: engine.SESSION_MANAGER_SOURCE, - RunId: sr.DerivedCharger.RunId, - CallCost: firstCC, - }, &existingDuration) - // on duplicate error refound extra from existing database - + var reply string + err := s.sessionManager.CdrSrv().LogCallCost(&engine.CallCostLog{ + CgrId: s.eventStart.GetCgrId(), + Source: engine.SESSION_MANAGER_SOURCE, + RunId: sr.DerivedCharger.RunId, + CallCost: firstCC, + CheckDuplicate: true, + }, &reply) + // this is a protection against the case when the close event is missed for some reason + // when the cdr arrives to cdrserver because our callcost is not there it will be rated + // as postpaid. When the close event finally arives we have to refund everything + if err != nil { + if err == errors.New(utils.ERR_EXISTS) { + s.Refund(firstCC, firstCC.Timespans[0].TimeStart) + } else { + engine.Logger.Err(fmt.Sprintf("failed to log call cost: %v", err)) + } + } } } diff --git a/sessionmanager/session_test.go b/sessionmanager/session_test.go index b88e65bc0..7af0fa453 100644 --- a/sessionmanager/session_test.go +++ b/sessionmanager/session_test.go @@ -18,6 +18,14 @@ along with this program. If not, see package sessionmanager +import ( + "testing" + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + //"github.com/cgrates/cgrates/config" //"testing" @@ -71,3 +79,64 @@ func TestSessionNilSession(t *testing.T) { } } */ + +type MockConnector struct { + refundCd *engine.CallDescriptor +} + +func (mc *MockConnector) GetCost(*engine.CallDescriptor, *engine.CallCost) error { return nil } +func (mc *MockConnector) Debit(*engine.CallDescriptor, *engine.CallCost) error { return nil } +func (mc *MockConnector) MaxDebit(*engine.CallDescriptor, *engine.CallCost) error { return nil } +func (mc *MockConnector) RefundIncrements(cd *engine.CallDescriptor, reply *float64) error { + mc.refundCd = cd + return nil +} +func (mc *MockConnector) GetMaxSessionTime(*engine.CallDescriptor, *float64) error { return nil } +func (mc *MockConnector) GetDerivedChargers(*utils.AttrDerivedChargers, *utils.DerivedChargers) error { + return nil +} +func (mc *MockConnector) GetDerivedMaxSessionTime(*engine.StoredCdr, *float64) error { return nil } +func (mc *MockConnector) GetSessionRuns(*engine.StoredCdr, *[]*engine.SessionRun) error { return nil } +func (mc *MockConnector) ProcessCdr(*engine.StoredCdr, *string) error { return nil } +func (mc *MockConnector) LogCallCost(*engine.CallCostLog, *string) error { return nil } +func (mc *MockConnector) GetLCR(*engine.CallDescriptor, *engine.LCRCost) error { return nil } + +func TestSessionRefund(t *testing.T) { + mc := &MockConnector{} + s := &Session{sessionManager: &FSSessionManager{rater: mc}} + ts := &engine.TimeSpan{ + TimeStart: time.Date(2015, 6, 10, 14, 07, 0, 0, time.UTC), + TimeEnd: time.Date(2015, 6, 10, 14, 07, 30, 0, time.UTC), + } + // add increments + for i := 0; i < 30; i++ { + ts.AddIncrement(&engine.Increment{Duration: time.Second, Cost: 1.0}) + } + + cc := &engine.CallCost{Timespans: engine.TimeSpans{ts}} + hangupTime := time.Date(2015, 6, 10, 14, 07, 20, 0, time.UTC) + s.Refund(cc, hangupTime) + if len(mc.refundCd.Increments) != 10 || len(cc.Timespans) != 1 || cc.Timespans[0].TimeEnd != hangupTime { + t.Errorf("Error refunding: %+v, %+v", mc.refundCd.Increments, cc.Timespans[0]) + } +} + +func TestSessionRefundAll(t *testing.T) { + mc := &MockConnector{} + s := &Session{sessionManager: &FSSessionManager{rater: mc}} + ts := &engine.TimeSpan{ + TimeStart: time.Date(2015, 6, 10, 14, 07, 0, 0, time.UTC), + TimeEnd: time.Date(2015, 6, 10, 14, 07, 30, 0, time.UTC), + } + // add increments + for i := 0; i < 30; i++ { + ts.AddIncrement(&engine.Increment{Duration: time.Second, Cost: 1.0}) + } + + cc := &engine.CallCost{Timespans: engine.TimeSpans{ts}} + hangupTime := time.Date(2015, 6, 10, 14, 07, 0, 0, time.UTC) + s.Refund(cc, hangupTime) + if len(mc.refundCd.Increments) != 30 || len(cc.Timespans) != 0 { + t.Errorf("Error refunding: %+v, %+v", len(mc.refundCd.Increments), cc.Timespans[0]) + } +}