diff --git a/cache2go/response_cache.go b/cache2go/response_cache.go index 4ac57e3f7..c27fdecbf 100644 --- a/cache2go/response_cache.go +++ b/cache2go/response_cache.go @@ -18,7 +18,7 @@ type CacheItem struct { type ResponseCache struct { ttl time.Duration cache map[string]*CacheItem - semaphore map[string]chan bool + semaphore map[string]chan bool // used for waiting till the first goroutine processes the response mu sync.RWMutex } diff --git a/engine/responder.go b/engine/responder.go index 76b9a9b9d..e769a0058 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -24,6 +24,7 @@ import ( "net/rpc" "reflect" "runtime" + "strconv" "strings" "time" @@ -149,7 +150,8 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) { } func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) { - if item, err := rs.getCache().Get(utils.MAX_DEBIT_CACHE_PREFIX + arg.CgrId); err == nil && item != nil { + cacheKey := "MaxDebit" + arg.CgrId + strconv.FormatFloat(arg.LoopIndex, 'f', -1, 64) + if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil { *reply = *(item.Value.(*CallCost)) return item.Err } @@ -167,10 +169,12 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) Subject: arg.Subject, Context: utils.ALIAS_CONTEXT_RATING, }, arg, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } // replace user profile fields if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } if rs.Bal != nil { @@ -179,23 +183,19 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) } else { r, e := arg.MaxDebit() if e != nil { - rs.getCache().Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{ - Err: e, - }) + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: e}) return e } else if r != nil { *reply = *r } } - rs.getCache().Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{ - Value: reply, - Err: err, - }) + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: reply, Err: err}) return } func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err error) { - if item, err := rs.getCache().Get(utils.REFUND_INCR_CACHE_PREFIX + arg.CgrId); err == nil && item != nil { + cacheKey := "RefundIncrements" + arg.CgrId + if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil { *reply = *(item.Value.(*float64)) return item.Err } @@ -213,10 +213,12 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err Subject: arg.Subject, Context: utils.ALIAS_CONTEXT_RATING, }, arg, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } // replace user profile fields if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } if rs.Bal != nil { @@ -227,10 +229,7 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err }, 0, arg.GetAccountKey()) *reply, err = r.(float64), e } - rs.getCache().Cache(utils.REFUND_INCR_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{ - Value: reply, - Err: err, - }) + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: reply, Err: err}) return } @@ -266,8 +265,15 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err // Returns MaxSessionTime for an event received in SessionManager, considering DerivedCharging for it func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error { + cacheKey := "GetDerivedMaxSessionTime" + ev.CgrId + if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil { + *reply = *(item.Value.(*float64)) + return item.Err + } if rs.Bal != nil { - return errors.New("unsupported method on the balancer") + err := errors.New("unsupported method on the balancer") + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) + return err } if ev.Subject == "" { ev.Subject = ev.Account @@ -283,10 +289,12 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err Subject: ev.Subject, Context: utils.ALIAS_CONTEXT_RATING, }, ev, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } // replace user profile fields if err := LoadUserProfile(ev, utils.EXTRA_FIELDS); err != nil { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } maxCallDuration := -1.0 @@ -294,6 +302,7 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} dcs := &utils.DerivedChargers{} if err := rs.GetDerivedChargers(attrsDC, dcs); err != nil { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } dcs, _ = dcs.AppendDefaultRun() @@ -314,10 +323,12 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err } startTime, err := ev.GetSetupTime(utils.META_DEFAULT, rs.Timezone) if err != nil { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } usage, err := ev.GetDuration(utils.META_DEFAULT) if err != nil { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } if usage == 0 { @@ -337,6 +348,7 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err err = rs.GetMaxSessionTime(cd, &remainingDuration) if err != nil { *reply = 0 + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } if utils.IsSliceMember([]string{utils.META_POSTPAID, utils.POSTPAID}, ev.GetReqType(dc.ReqTypeField)) { // Only consider prepaid and pseudoprepaid for MaxSessionTime @@ -349,14 +361,22 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err maxCallDuration = remainingDuration } } + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: maxCallDuration}) *reply = maxCallDuration return nil } // Used by SM to get all the prepaid CallDescriptors attached to a session func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { + cacheKey := "GetSessionRuns" + ev.CgrId + if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil { + *sRuns = *(item.Value.(*[]*SessionRun)) + return item.Err + } if rs.Bal != nil { - return errors.New("Unsupported method on the balancer") + err := errors.New("Unsupported method on the balancer") + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) + return err } if ev.Subject == "" { ev.Subject = ev.Account @@ -372,19 +392,19 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { Subject: ev.Subject, Context: utils.ALIAS_CONTEXT_RATING, }, ev, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } // replace user profile fields if err := LoadUserProfile(ev, utils.EXTRA_FIELDS); err != nil { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } attrsDC := &utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} dcs := &utils.DerivedChargers{} if err := rs.GetDerivedChargers(attrsDC, dcs); err != nil { - rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{ - Err: err, - }) + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } dcs, _ = dcs.AppendDefaultRun() @@ -395,10 +415,9 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { } startTime, err := ev.GetAnswerTime(dc.AnswerTimeField, rs.Timezone) if err != nil { - rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{ - Err: err, - }) - return errors.New("Error parsing answer event start time") + err := errors.New("Error parsing answer event start time") + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) + return err } cd := &CallDescriptor{ Direction: ev.GetDirection(dc.DirectionField), @@ -411,9 +430,7 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { sesRuns = append(sesRuns, &SessionRun{DerivedCharger: dc, CallDescriptor: cd}) } *sRuns = sesRuns - rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{ - Value: sRuns, - }) + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: sRuns}) return nil } @@ -430,42 +447,51 @@ func (rs *Responder) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *u } func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error { - if rs.CdrSrv == nil { - return errors.New("CDR_SERVER_NOT_RUNNING") + cacheKey := "ProcessCdr" + cdr.CgrId + if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil { + *reply = *(item.Value.(*string)) + return item.Err } - if err := rs.CdrSrv.ProcessCdr(cdr); err != nil { + if rs.CdrSrv == nil { + err := errors.New("CDR_SERVER_NOT_RUNNING") + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } + if err := rs.CdrSrv.ProcessCdr(cdr); err != nil { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) + return err + } + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: utils.OK}) *reply = utils.OK return nil } func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *string) error { - if item, err := rs.getCache().Get(utils.LOG_CALL_COST_CACHE_PREFIX + ccl.CgrId); err == nil && item != nil { + cacheKey := "LogCallCost" + ccl.CgrId + if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil { *reply = item.Value.(string) return item.Err } if rs.CdrSrv == nil { err := errors.New("CDR_SERVER_NOT_RUNNING") - rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{ - Err: err, - }) + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } if err := rs.CdrSrv.LogCallCost(ccl); err != nil { - rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{ - Err: err, - }) + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } *reply = utils.OK - rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{ - Value: utils.OK, - }) + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: utils.OK}) return nil } func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error { + cacheKey := "GetLCR" + attrs.CgrId + if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil { + *reply = *(item.Value.(*LCRCost)) + return item.Err + } if attrs.CallDescriptor.Subject == "" { attrs.CallDescriptor.Subject = attrs.CallDescriptor.Account } @@ -481,6 +507,7 @@ func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error { Subject: cd.Subject, Context: utils.ALIAS_CONTEXT_RATING, }, cd, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } // replace user profile fields @@ -489,6 +516,7 @@ func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error { } lcrCost, err := attrs.CallDescriptor.GetLCR(rs.Stats, attrs.Paginator) if err != nil { + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return err } if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD { @@ -496,6 +524,7 @@ func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error { suppl.Cost = -1 // In case of load distribution we don't calculate costs } } + rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: lcrCost}) *reply = *lcrCost return nil } diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index 2b46a8129..905239d70 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -373,6 +373,7 @@ func (fsev FSEvent) ComputeLcr() bool { // Converts into CallDescriptor due to responder interface needs func (fsev FSEvent) AsCallDescriptor() (*engine.CallDescriptor, error) { lcrReq := &engine.LcrRequest{ + Direction: fsev.GetDirection(utils.META_DEFAULT), Tenant: fsev.GetTenant(utils.META_DEFAULT), Category: fsev.GetCategory(utils.META_DEFAULT), diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 969dacf4e..11af53e00 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -100,6 +100,7 @@ func (sm *FSSessionManager) setCgrLcr(ev engine.Event, connId string) error { return err } cd := &engine.CallDescriptor{ + CgrId: ev.GetCgrId(sm.Timezone()), Direction: ev.GetDirection(utils.META_DEFAULT), Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), @@ -148,6 +149,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { // ComputeLcr if ev.ComputeLcr() { cd, err := fsev.AsCallDescriptor() + cd.CgrId = fsev.GetCgrId(sm.Timezone()) if err != nil { utils.Logger.Info(fmt.Sprintf(" LCR_PREPROCESS_ERROR: %s", err.Error())) sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index e195bfe8c..26754a200 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -103,6 +103,7 @@ func (self *KamailioSessionManager) onCgrLcrReq(evData []byte, connId string) { func (self *KamailioSessionManager) getSuppliers(kev KamEvent) (string, error) { cd, err := kev.AsCallDescriptor() + cd.CgrId = kev.GetCgrId(self.timezone) if err != nil { utils.Logger.Info(fmt.Sprintf(" LCR_PREPROCESS_ERROR error: %s", err.Error())) return "", errors.New("LCR_PREPROCESS_ERROR") diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 82347807c..31ba88eeb 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -70,7 +70,7 @@ func NewSession(ev engine.Event, connId string, sm SessionManager) *Session { // the debit loop method (to be stoped by sending somenthing on stopDebit channel) func (s *Session) debitLoop(runIdx int) { nextCd := s.sessionRuns[runIdx].CallDescriptor - nextCd.CgrId = s.eventStart.GetCgrId("") + nextCd.CgrId = s.eventStart.GetCgrId(s.sessionManager.Timezone()) index := 0.0 debitPeriod := s.sessionManager.DebitInterval() for { @@ -195,6 +195,7 @@ func (s *Session) Refund(lastCC *engine.CallCost, hangupTime time.Time) error { // utils.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) if len(refundIncrements) > 0 { cd := &engine.CallDescriptor{ + CgrId: s.eventStart.GetCgrId(s.sessionManager.Timezone()), Direction: lastCC.Direction, Tenant: lastCC.Tenant, Category: lastCC.Category, diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 6546991a3..7d019dcef 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -145,6 +145,7 @@ func (self *SMGeneric) GetMaxUsage(gev SMGenericEvent, clnt *rpc2.Client) (time. func (self *SMGeneric) GetLcrSuppliers(gev SMGenericEvent, clnt *rpc2.Client) ([]string, error) { gev[utils.EVENT_NAME] = utils.CGR_LCR_REQUEST cd, err := gev.AsLcrRequest().AsCallDescriptor(self.timezone) + cd.CgrId = gev.GetCgrId(self.timezone) if err != nil { return nil, err } diff --git a/utils/consts.go b/utils/consts.go index 7024a73e5..8293abda4 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -231,10 +231,6 @@ const ( CGR_SUPPLIERS = "cgr_suppliers" KAM_FLATSTORE = "kamailio_flatstore" OSIPS_FLATSTORE = "opensips_flatstore" - MAX_DEBIT_CACHE_PREFIX = "MAX_DEBIT_" - REFUND_INCR_CACHE_PREFIX = "REFUND_INCR_" - GET_SESS_RUNS_CACHE_PREFIX = "GET_SESS_RUNS_" - LOG_CALL_COST_CACHE_PREFIX = "LOG_CALL_COSTS_" ALIAS_CONTEXT_RATING = "*rating" NOT_AVAILABLE = "N/A" CALL = "call"