From 6e84123b8b252c06c232de69bb52cff19f8fe60c Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 4 Nov 2016 19:42:56 +0100 Subject: [PATCH] SMGeneric with cached responses --- data/conf/samples/smg/cgrates.json | 1 - sessionmanager/smgeneric.go | 208 +++++++++++++++++++---------- 2 files changed, 135 insertions(+), 74 deletions(-) diff --git a/data/conf/samples/smg/cgrates.json b/data/conf/samples/smg/cgrates.json index acf5ecda4..7cd20ad35 100644 --- a/data/conf/samples/smg/cgrates.json +++ b/data/conf/samples/smg/cgrates.json @@ -5,7 +5,6 @@ // Starts rater, scheduler "general": { - "response_cache_ttl": "1s", "log_level": 8, }, diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index b2f103372..173e92953 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/cgrates/cgrates/cache" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -53,7 +54,8 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, activeSessions: make(map[string][]*SMGSession), aSessionsIndex: make(map[string]map[string]utils.StringMap), passiveSessions: make(map[string][]*SMGSession), - sessionTerminators: make(map[string]*smgSessionTerminator)} + sessionTerminators: make(map[string]*smgSessionTerminator), + responseCache: cache.NewResponseCache(cgrCfg.ResponseCacheTTL)} } type SMGeneric struct { @@ -69,7 +71,7 @@ type SMGeneric struct { passiveSessions map[string][]*SMGSession // group passive sessions pSessionsMux sync.RWMutex sessionTerminators map[string]*smgSessionTerminator // terminate and cleanup the session if timer expires - + responseCache *cache.ResponseCache // cache replies here } type smgSessionTerminator struct { timer *time.Timer @@ -268,7 +270,7 @@ func (smg *SMGeneric) getASession(uuid string) []*SMGSession { return smg.activeSessions[uuid] } -// Handle a new session, pass the connectionId so we can communicate on disconnect request +// sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) error { sessionId := evStart.GetUUID() processed, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level @@ -298,7 +300,7 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.Rp return err } -// End a session from outside +// sessionEnd will end a session from outside func (smg *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error { _, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level ss := smg.getASession(sessionId) @@ -331,7 +333,7 @@ func (smg *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error { return err } -// Used when an update will relocate an initial session (eg multiple data streams) +// sessionRelocate is used when an update will relocate an initial session (eg multiple data streams) func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error { _, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on initialID level if utils.IsSliceMember([]string{sessionID, initialID}, "") { // Not allowed empty params here @@ -357,7 +359,7 @@ func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error { return err } -// replicateSessionsForEvent will replicate session based on configuration +// replicateSessions will replicate session based on configuration func (smg *SMGeneric) replicateSessions(originID string) (err error) { if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { return @@ -383,129 +385,176 @@ func (smg *SMGeneric) replicateSessions(originID string) (err error) { return } +// sessionActiveToPassive is a mechanism to transit a session from active to passive state +func (smg *SMGeneric) sessionActiveToPassive(originID string) (err error) { + return +} + +// sessionPassiveToActive is a mechanism to transit a session from passive to active state +func (smg *SMGeneric) sessionPassiveToActive(originID string) (err error) { + return +} + // Methods to apply on sessions, mostly exported through RPC/Bi-RPC -//Calculates maximum usage allowed for gevent -func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (time.Duration, error) { + +// MaxUsage calculates maximum usage allowed for given gevent +func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (maxUsage time.Duration, err error) { + cacheKey := "MaxUsage" + gev.GetCgrId(smg.timezone) + if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { + return (item.Value.(time.Duration)), item.Err + } + defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) gev[utils.EVENT_NAME] = utils.CGR_AUTHORIZATION storedCdr := gev.AsStoredCdr(config.CgrConfig(), smg.timezone) var maxDur float64 - if err := smg.rater.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil { - return time.Duration(0), err + if err = smg.rater.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil { + return } - return time.Duration(maxDur), nil + maxUsage = time.Duration(maxDur) + return } -func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) ([]string, error) { +func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) (suppls []string, err error) { + cacheKey := "LCRSuppliers" + gev.GetCgrId(smg.timezone) + gev.GetAccount(utils.META_DEFAULT) + gev.GetDestination(utils.META_DEFAULT) + if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { + if item.Value != nil { + suppls = (item.Value.([]string)) + } + err = item.Err + return suppls, err + } + defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: suppls, Err: err}) gev[utils.EVENT_NAME] = utils.CGR_LCR_REQUEST - cd, err := gev.AsLcrRequest().AsCallDescriptor(smg.timezone) + var cd *engine.CallDescriptor + cd, err = gev.AsLcrRequest().AsCallDescriptor(smg.timezone) cd.CgrID = gev.GetCgrId(smg.timezone) if err != nil { - return nil, err + return } var lcr engine.LCRCost if err = smg.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { - return nil, err + return } if lcr.HasErrors() { lcr.LogErrors() - return nil, errors.New("LCR_COMPUTE_ERROR") + err = errors.New("LCR_COMPUTE_ERROR") + return } - return lcr.SuppliersSlice() + suppls, err = lcr.SuppliersSlice() + return } // Called on session start -func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (time.Duration, error) { - if err := smg.sessionStart(gev, clnt); err != nil { +func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) { + cacheKey := "InitiateSession" + gev.GetCgrId(smg.timezone) + if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { + return item.Value.(time.Duration), item.Err + } + defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) // schedule response caching + if err = smg.sessionStart(gev, clnt); err != nil { smg.sessionEnd(gev.GetUUID(), 0) - return nilDuration, err + return } if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Session handled by debit loop - return -1, nil + maxUsage = -1 + return } - d, err := smg.UpdateSession(gev, clnt) - if err != nil || d == 0 { + maxUsage, err = smg.UpdateSession(gev, clnt) + if err != nil || maxUsage == 0 { smg.sessionEnd(gev.GetUUID(), 0) } - return d, err + return } // Execute debits for usage/maxUsage -func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (time.Duration, error) { +func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) { + cacheKey := "UpdateSession" + gev.GetCgrId(smg.timezone) + if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { + return item.Value.(time.Duration), item.Err + } + defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active - return 0, errors.New("ACTIVE_DEBIT_LOOP") + err = errors.New("ACTIVE_DEBIT_LOOP") + return } defer smg.replicateSessions(gev.GetUUID()) - if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil { + if initialID, errGet := gev.GetFieldAsString(utils.InitialOriginID); errGet == nil { defer smg.replicateSessions(initialID) - err := smg.sessionRelocate(gev.GetUUID(), initialID) + err = smg.sessionRelocate(gev.GetUUID(), initialID) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update err = smg.sessionStart(gev, clnt) } if err != nil { - return nilDuration, err + return } } smg.resetTerminatorTimer(gev.GetUUID(), gev.GetSessionTTL(), gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage()) var lastUsed *time.Duration - if evLastUsed, err := gev.GetLastUsed(utils.META_DEFAULT); err == nil { + var evLastUsed time.Duration + if evLastUsed, err = gev.GetLastUsed(utils.META_DEFAULT); err == nil { lastUsed = &evLastUsed } else if err != utils.ErrNotFound { - return nilDuration, err + return } - evMaxUsage, err := gev.GetMaxUsage(utils.META_DEFAULT, smg.cgrCfg.SmGenericConfig.MaxCallDuration) - if err != nil { + if maxUsage, err = gev.GetMaxUsage(utils.META_DEFAULT, smg.cgrCfg.SmGenericConfig.MaxCallDuration); err != nil { if err == utils.ErrNotFound { err = utils.ErrMandatoryIeMissing } - return nilDuration, err + return } aSessions := smg.getASession(gev.GetUUID()) if len(aSessions) == 0 { utils.Logger.Err(fmt.Sprintf(" SessionUpdate with no active sessions for event: <%s>", gev.GetUUID())) - return nilDuration, utils.ErrServerError + err = utils.ErrServerError + return } for _, s := range aSessions { - if maxDur, err := s.debit(evMaxUsage, lastUsed); err != nil { - return nilDuration, err - } else if maxDur < evMaxUsage { - evMaxUsage = maxDur + var maxDur time.Duration + if maxDur, err = s.debit(maxUsage, lastUsed); err != nil { + return + } else if maxDur < maxUsage { + maxUsage = maxDur } } - return evMaxUsage, nil + return } // Called on session end, should stop debit loop -func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) error { - if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil { - err := smg.sessionRelocate(gev.GetUUID(), initialID) +func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (err error) { + cacheKey := "TerminateSession" + gev.GetCgrId(smg.timezone) + if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { + return item.Err + } + defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Err: err}) + if initialID, errGet := gev.GetFieldAsString(utils.InitialOriginID); errGet == nil { + err = smg.sessionRelocate(gev.GetUUID(), initialID) defer smg.replicateSessions(initialID) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update err = smg.sessionStart(gev, clnt) } if err != nil && err != utils.ErrMandatoryIeMissing { - return err + return } } sessionIDs := []string{gev.GetUUID()} - if sessionIDPrefix, err := gev.GetFieldAsString(utils.OriginIDPrefix); err == nil { // OriginIDPrefix is present, OriginID will not be anymore considered + if sessionIDPrefix, errPrefix := gev.GetFieldAsString(utils.OriginIDPrefix); errPrefix == nil { // OriginIDPrefix is present, OriginID will not be anymore considered sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix) } usage, errUsage := gev.GetUsage(utils.META_DEFAULT) var lastUsed time.Duration if errUsage != nil { if errUsage != utils.ErrNotFound { - return errUsage + err = errUsage + return } - var err error lastUsed, err = gev.GetLastUsed(utils.META_DEFAULT) if err != nil { if err == utils.ErrNotFound { err = utils.ErrMandatoryIeMissing } - return err + return } } - var interimError error var hasActiveSession bool for _, sessionID := range sessionIDs { defer smg.replicateSessions(sessionID) @@ -520,23 +569,29 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli if errUsage != nil { usage = s.TotalUsage - s.LastUsage + lastUsed } - if err := smg.sessionEnd(sessionID, usage); err != nil { - interimError = err // Last error will be the one returned as API result + if errSEnd := smg.sessionEnd(sessionID, usage); errSEnd != nil { + err = errSEnd // Last error will be the one returned as API result } } if !hasActiveSession { - return utils.ErrNoActiveSession + err = utils.ErrNoActiveSession + return } - return interimError + return } // Processes one time events (eg: SMS) -func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err error) { +func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, err error) { + cacheKey := "ChargeEvent" + gev.GetCgrId(smg.timezone) + if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { + return item.Value.(time.Duration), item.Err + } + defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) var sessionRuns []*engine.SessionRun - if err := smg.rater.Call("Responder.GetSessionRuns", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil { - return nilDuration, err + if err = smg.rater.Call("Responder.GetSessionRuns", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil { + return } else if len(sessionRuns) == 0 { - return nilDuration, nil + return } var maxDurInit bool // Avoid differences between default 0 and received 0 for _, sR := range sessionRuns { @@ -549,8 +604,8 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err if ccDur := cc.GetDuration(); ccDur == 0 { err = utils.ErrInsufficientCredit break - } else if !maxDurInit || ccDur < maxDur { - maxDur = ccDur + } else if !maxDurInit || ccDur < maxUsage { + maxUsage = ccDur } } if err != nil { // Refund the ones already taken since we have error on one of the debits @@ -579,13 +634,13 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err cd.Increments.Compress() //utils.Logger.Info(fmt.Sprintf("Refunding session run callcost: %s", utils.ToJSON(cd))) var response float64 - err := smg.rater.Call("Responder.RefundIncrements", cd, &response) + err = smg.rater.Call("Responder.RefundIncrements", cd, &response) if err != nil { - return nilDuration, err + return } } } - return nilDuration, err + return } var withErrors bool for _, sR := range sessionRuns { @@ -604,8 +659,8 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err cd := cc.CreateCallDescriptor() cd.Increments = roundIncrements var response float64 - if err := smg.rater.Call("Responder.RefundRounding", cd, &response); err != nil { - utils.Logger.Err(fmt.Sprintf(" ERROR failed to refund rounding: %v", err)) + if errRefund := smg.rater.Call("Responder.RefundRounding", cd, &response); errRefund != nil { + utils.Logger.Err(fmt.Sprintf(" ERROR failed to refund rounding: %v", errRefund)) } } var reply string @@ -617,23 +672,30 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err OriginID: gev.GetUUID(), CostDetails: cc, } - if err := smg.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil && !strings.HasSuffix(err.Error(), utils.ErrExists.Error()) { + if errStore := smg.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, + CheckDuplicate: true}, &reply); errStore != nil && !strings.HasSuffix(errStore.Error(), utils.ErrExists.Error()) { withErrors = true - utils.Logger.Err(fmt.Sprintf(" Could not save CC: %+v, RunID: %s error: %s", cc, sR.DerivedCharger.RunID, err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not save CC: %+v, RunID: %s error: %s", cc, sR.DerivedCharger.RunID, errStore.Error())) } } if withErrors { - return nilDuration, ErrPartiallyExecuted + err = ErrPartiallyExecuted + return } - return maxDur, nil + return } -func (smg *SMGeneric) ProcessCDR(gev SMGenericEvent) error { - var reply string - if err := smg.cdrsrv.Call("CdrsV1.ProcessCDR", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &reply); err != nil { - return err +func (smg *SMGeneric) ProcessCDR(gev SMGenericEvent) (err error) { + cacheKey := "ProcessCDR" + gev.GetCgrId(smg.timezone) + if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { + return item.Err } - return nil + defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Err: err}) + var reply string + if err = smg.cdrsrv.Call("CdrsV1.ProcessCDR", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &reply); err != nil { + return + } + return } func (smg *SMGeneric) Connect() error {