cache all calls from session manager

This commit is contained in:
Radu Ioan Fericean
2015-11-27 14:46:51 +02:00
parent 5cd3165036
commit ffdd960d39
8 changed files with 75 additions and 44 deletions

View File

@@ -18,7 +18,7 @@ type CacheItem struct {
type ResponseCache struct {
ttl time.Duration
cache map[string]*CacheItem
semaphore map[string]chan bool
semaphore map[string]chan bool // used for waiting till the first goroutine processes the response
mu sync.RWMutex
}

View File

@@ -24,6 +24,7 @@ import (
"net/rpc"
"reflect"
"runtime"
"strconv"
"strings"
"time"
@@ -149,7 +150,8 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) {
}
func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) {
if item, err := rs.getCache().Get(utils.MAX_DEBIT_CACHE_PREFIX + arg.CgrId); err == nil && item != nil {
cacheKey := "MaxDebit" + arg.CgrId + strconv.FormatFloat(arg.LoopIndex, 'f', -1, 64)
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
*reply = *(item.Value.(*CallCost))
return item.Err
}
@@ -167,10 +169,12 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error)
Subject: arg.Subject,
Context: utils.ALIAS_CONTEXT_RATING,
}, arg, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
// replace user profile fields
if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
if rs.Bal != nil {
@@ -179,23 +183,19 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error)
} else {
r, e := arg.MaxDebit()
if e != nil {
rs.getCache().Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{
Err: e,
})
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: e})
return e
} else if r != nil {
*reply = *r
}
}
rs.getCache().Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{
Value: reply,
Err: err,
})
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: reply, Err: err})
return
}
func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err error) {
if item, err := rs.getCache().Get(utils.REFUND_INCR_CACHE_PREFIX + arg.CgrId); err == nil && item != nil {
cacheKey := "RefundIncrements" + arg.CgrId
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
*reply = *(item.Value.(*float64))
return item.Err
}
@@ -213,10 +213,12 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err
Subject: arg.Subject,
Context: utils.ALIAS_CONTEXT_RATING,
}, arg, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
// replace user profile fields
if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
if rs.Bal != nil {
@@ -227,10 +229,7 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err
}, 0, arg.GetAccountKey())
*reply, err = r.(float64), e
}
rs.getCache().Cache(utils.REFUND_INCR_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{
Value: reply,
Err: err,
})
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: reply, Err: err})
return
}
@@ -266,8 +265,15 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err
// Returns MaxSessionTime for an event received in SessionManager, considering DerivedCharging for it
func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error {
cacheKey := "GetDerivedMaxSessionTime" + ev.CgrId
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
*reply = *(item.Value.(*float64))
return item.Err
}
if rs.Bal != nil {
return errors.New("unsupported method on the balancer")
err := errors.New("unsupported method on the balancer")
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
if ev.Subject == "" {
ev.Subject = ev.Account
@@ -283,10 +289,12 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err
Subject: ev.Subject,
Context: utils.ALIAS_CONTEXT_RATING,
}, ev, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
// replace user profile fields
if err := LoadUserProfile(ev, utils.EXTRA_FIELDS); err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
maxCallDuration := -1.0
@@ -294,6 +302,7 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
dcs := &utils.DerivedChargers{}
if err := rs.GetDerivedChargers(attrsDC, dcs); err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
dcs, _ = dcs.AppendDefaultRun()
@@ -314,10 +323,12 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err
}
startTime, err := ev.GetSetupTime(utils.META_DEFAULT, rs.Timezone)
if err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
usage, err := ev.GetDuration(utils.META_DEFAULT)
if err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
if usage == 0 {
@@ -337,6 +348,7 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err
err = rs.GetMaxSessionTime(cd, &remainingDuration)
if err != nil {
*reply = 0
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
if utils.IsSliceMember([]string{utils.META_POSTPAID, utils.POSTPAID}, ev.GetReqType(dc.ReqTypeField)) { // Only consider prepaid and pseudoprepaid for MaxSessionTime
@@ -349,14 +361,22 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err
maxCallDuration = remainingDuration
}
}
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: maxCallDuration})
*reply = maxCallDuration
return nil
}
// Used by SM to get all the prepaid CallDescriptors attached to a session
func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
cacheKey := "GetSessionRuns" + ev.CgrId
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
*sRuns = *(item.Value.(*[]*SessionRun))
return item.Err
}
if rs.Bal != nil {
return errors.New("Unsupported method on the balancer")
err := errors.New("Unsupported method on the balancer")
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
if ev.Subject == "" {
ev.Subject = ev.Account
@@ -372,19 +392,19 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
Subject: ev.Subject,
Context: utils.ALIAS_CONTEXT_RATING,
}, ev, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
// replace user profile fields
if err := LoadUserProfile(ev, utils.EXTRA_FIELDS); err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
attrsDC := &utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
dcs := &utils.DerivedChargers{}
if err := rs.GetDerivedChargers(attrsDC, dcs); err != nil {
rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{
Err: err,
})
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
dcs, _ = dcs.AppendDefaultRun()
@@ -395,10 +415,9 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
}
startTime, err := ev.GetAnswerTime(dc.AnswerTimeField, rs.Timezone)
if err != nil {
rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{
Err: err,
})
return errors.New("Error parsing answer event start time")
err := errors.New("Error parsing answer event start time")
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
cd := &CallDescriptor{
Direction: ev.GetDirection(dc.DirectionField),
@@ -411,9 +430,7 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
sesRuns = append(sesRuns, &SessionRun{DerivedCharger: dc, CallDescriptor: cd})
}
*sRuns = sesRuns
rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{
Value: sRuns,
})
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: sRuns})
return nil
}
@@ -430,42 +447,51 @@ func (rs *Responder) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *u
}
func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error {
if rs.CdrSrv == nil {
return errors.New("CDR_SERVER_NOT_RUNNING")
cacheKey := "ProcessCdr" + cdr.CgrId
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
*reply = *(item.Value.(*string))
return item.Err
}
if err := rs.CdrSrv.ProcessCdr(cdr); err != nil {
if rs.CdrSrv == nil {
err := errors.New("CDR_SERVER_NOT_RUNNING")
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
if err := rs.CdrSrv.ProcessCdr(cdr); err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: utils.OK})
*reply = utils.OK
return nil
}
func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *string) error {
if item, err := rs.getCache().Get(utils.LOG_CALL_COST_CACHE_PREFIX + ccl.CgrId); err == nil && item != nil {
cacheKey := "LogCallCost" + ccl.CgrId
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
*reply = item.Value.(string)
return item.Err
}
if rs.CdrSrv == nil {
err := errors.New("CDR_SERVER_NOT_RUNNING")
rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{
Err: err,
})
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
if err := rs.CdrSrv.LogCallCost(ccl); err != nil {
rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{
Err: err,
})
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
*reply = utils.OK
rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{
Value: utils.OK,
})
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: utils.OK})
return nil
}
func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error {
cacheKey := "GetLCR" + attrs.CgrId
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
*reply = *(item.Value.(*LCRCost))
return item.Err
}
if attrs.CallDescriptor.Subject == "" {
attrs.CallDescriptor.Subject = attrs.CallDescriptor.Account
}
@@ -481,6 +507,7 @@ func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error {
Subject: cd.Subject,
Context: utils.ALIAS_CONTEXT_RATING,
}, cd, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
// replace user profile fields
@@ -489,6 +516,7 @@ func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error {
}
lcrCost, err := attrs.CallDescriptor.GetLCR(rs.Stats, attrs.Paginator)
if err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return err
}
if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD {
@@ -496,6 +524,7 @@ func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error {
suppl.Cost = -1 // In case of load distribution we don't calculate costs
}
}
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: lcrCost})
*reply = *lcrCost
return nil
}

View File

@@ -373,6 +373,7 @@ func (fsev FSEvent) ComputeLcr() bool {
// Converts into CallDescriptor due to responder interface needs
func (fsev FSEvent) AsCallDescriptor() (*engine.CallDescriptor, error) {
lcrReq := &engine.LcrRequest{
Direction: fsev.GetDirection(utils.META_DEFAULT),
Tenant: fsev.GetTenant(utils.META_DEFAULT),
Category: fsev.GetCategory(utils.META_DEFAULT),

View File

@@ -100,6 +100,7 @@ func (sm *FSSessionManager) setCgrLcr(ev engine.Event, connId string) error {
return err
}
cd := &engine.CallDescriptor{
CgrId: ev.GetCgrId(sm.Timezone()),
Direction: ev.GetDirection(utils.META_DEFAULT),
Tenant: ev.GetTenant(utils.META_DEFAULT),
Category: ev.GetCategory(utils.META_DEFAULT),
@@ -148,6 +149,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) {
// ComputeLcr
if ev.ComputeLcr() {
cd, err := fsev.AsCallDescriptor()
cd.CgrId = fsev.GetCgrId(sm.Timezone())
if err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_PREPROCESS_ERROR: %s", err.Error()))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)

View File

@@ -103,6 +103,7 @@ func (self *KamailioSessionManager) onCgrLcrReq(evData []byte, connId string) {
func (self *KamailioSessionManager) getSuppliers(kev KamEvent) (string, error) {
cd, err := kev.AsCallDescriptor()
cd.CgrId = kev.GetCgrId(self.timezone)
if err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-Kamailio> LCR_PREPROCESS_ERROR error: %s", err.Error()))
return "", errors.New("LCR_PREPROCESS_ERROR")

View File

@@ -70,7 +70,7 @@ func NewSession(ev engine.Event, connId string, sm SessionManager) *Session {
// the debit loop method (to be stoped by sending somenthing on stopDebit channel)
func (s *Session) debitLoop(runIdx int) {
nextCd := s.sessionRuns[runIdx].CallDescriptor
nextCd.CgrId = s.eventStart.GetCgrId("")
nextCd.CgrId = s.eventStart.GetCgrId(s.sessionManager.Timezone())
index := 0.0
debitPeriod := s.sessionManager.DebitInterval()
for {
@@ -195,6 +195,7 @@ func (s *Session) Refund(lastCC *engine.CallCost, hangupTime time.Time) error {
// utils.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
if len(refundIncrements) > 0 {
cd := &engine.CallDescriptor{
CgrId: s.eventStart.GetCgrId(s.sessionManager.Timezone()),
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
Category: lastCC.Category,

View File

@@ -145,6 +145,7 @@ func (self *SMGeneric) GetMaxUsage(gev SMGenericEvent, clnt *rpc2.Client) (time.
func (self *SMGeneric) GetLcrSuppliers(gev SMGenericEvent, clnt *rpc2.Client) ([]string, error) {
gev[utils.EVENT_NAME] = utils.CGR_LCR_REQUEST
cd, err := gev.AsLcrRequest().AsCallDescriptor(self.timezone)
cd.CgrId = gev.GetCgrId(self.timezone)
if err != nil {
return nil, err
}

View File

@@ -231,10 +231,6 @@ const (
CGR_SUPPLIERS = "cgr_suppliers"
KAM_FLATSTORE = "kamailio_flatstore"
OSIPS_FLATSTORE = "opensips_flatstore"
MAX_DEBIT_CACHE_PREFIX = "MAX_DEBIT_"
REFUND_INCR_CACHE_PREFIX = "REFUND_INCR_"
GET_SESS_RUNS_CACHE_PREFIX = "GET_SESS_RUNS_"
LOG_CALL_COST_CACHE_PREFIX = "LOG_CALL_COSTS_"
ALIAS_CONTEXT_RATING = "*rating"
NOT_AVAILABLE = "N/A"
CALL = "call"