SMGeneric with cached responses

This commit is contained in:
DanB
2016-11-04 19:42:56 +01:00
parent b0df6520b6
commit 6e84123b8b
2 changed files with 135 additions and 74 deletions

View File

@@ -5,7 +5,6 @@
// Starts rater, scheduler
"general": {
"response_cache_ttl": "1s",
"log_level": 8,
},

View File

@@ -25,6 +25,7 @@ import (
"sync"
"time"
"github.com/cgrates/cgrates/cache"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -53,7 +54,8 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection,
activeSessions: make(map[string][]*SMGSession),
aSessionsIndex: make(map[string]map[string]utils.StringMap),
passiveSessions: make(map[string][]*SMGSession),
sessionTerminators: make(map[string]*smgSessionTerminator)}
sessionTerminators: make(map[string]*smgSessionTerminator),
responseCache: cache.NewResponseCache(cgrCfg.ResponseCacheTTL)}
}
type SMGeneric struct {
@@ -69,7 +71,7 @@ type SMGeneric struct {
passiveSessions map[string][]*SMGSession // group passive sessions
pSessionsMux sync.RWMutex
sessionTerminators map[string]*smgSessionTerminator // terminate and cleanup the session if timer expires
responseCache *cache.ResponseCache // cache replies here
}
type smgSessionTerminator struct {
timer *time.Timer
@@ -268,7 +270,7 @@ func (smg *SMGeneric) getASession(uuid string) []*SMGSession {
return smg.activeSessions[uuid]
}
// Handle a new session, pass the connectionId so we can communicate on disconnect request
// sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request
func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) error {
sessionId := evStart.GetUUID()
processed, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level
@@ -298,7 +300,7 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.Rp
return err
}
// End a session from outside
// sessionEnd will end a session from outside
func (smg *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
_, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level
ss := smg.getASession(sessionId)
@@ -331,7 +333,7 @@ func (smg *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
return err
}
// Used when an update will relocate an initial session (eg multiple data streams)
// sessionRelocate is used when an update will relocate an initial session (eg multiple data streams)
func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error {
_, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on initialID level
if utils.IsSliceMember([]string{sessionID, initialID}, "") { // Not allowed empty params here
@@ -357,7 +359,7 @@ func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error {
return err
}
// replicateSessionsForEvent will replicate session based on configuration
// replicateSessions will replicate session based on configuration
func (smg *SMGeneric) replicateSessions(originID string) (err error) {
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 {
return
@@ -383,129 +385,176 @@ func (smg *SMGeneric) replicateSessions(originID string) (err error) {
return
}
// sessionActiveToPassive is a mechanism to transit a session from active to passive state
func (smg *SMGeneric) sessionActiveToPassive(originID string) (err error) {
return
}
// sessionPassiveToActive is a mechanism to transit a session from passive to active state
func (smg *SMGeneric) sessionPassiveToActive(originID string) (err error) {
return
}
// Methods to apply on sessions, mostly exported through RPC/Bi-RPC
//Calculates maximum usage allowed for gevent
func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (time.Duration, error) {
// MaxUsage calculates maximum usage allowed for given gevent
func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (maxUsage time.Duration, err error) {
cacheKey := "MaxUsage" + gev.GetCgrId(smg.timezone)
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return (item.Value.(time.Duration)), item.Err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err})
gev[utils.EVENT_NAME] = utils.CGR_AUTHORIZATION
storedCdr := gev.AsStoredCdr(config.CgrConfig(), smg.timezone)
var maxDur float64
if err := smg.rater.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil {
return time.Duration(0), err
if err = smg.rater.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil {
return
}
return time.Duration(maxDur), nil
maxUsage = time.Duration(maxDur)
return
}
func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) ([]string, error) {
func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) (suppls []string, err error) {
cacheKey := "LCRSuppliers" + gev.GetCgrId(smg.timezone) + gev.GetAccount(utils.META_DEFAULT) + gev.GetDestination(utils.META_DEFAULT)
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
if item.Value != nil {
suppls = (item.Value.([]string))
}
err = item.Err
return suppls, err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: suppls, Err: err})
gev[utils.EVENT_NAME] = utils.CGR_LCR_REQUEST
cd, err := gev.AsLcrRequest().AsCallDescriptor(smg.timezone)
var cd *engine.CallDescriptor
cd, err = gev.AsLcrRequest().AsCallDescriptor(smg.timezone)
cd.CgrID = gev.GetCgrId(smg.timezone)
if err != nil {
return nil, err
return
}
var lcr engine.LCRCost
if err = smg.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
return nil, err
return
}
if lcr.HasErrors() {
lcr.LogErrors()
return nil, errors.New("LCR_COMPUTE_ERROR")
err = errors.New("LCR_COMPUTE_ERROR")
return
}
return lcr.SuppliersSlice()
suppls, err = lcr.SuppliersSlice()
return
}
// Called on session start
func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (time.Duration, error) {
if err := smg.sessionStart(gev, clnt); err != nil {
func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) {
cacheKey := "InitiateSession" + gev.GetCgrId(smg.timezone)
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) // schedule response caching
if err = smg.sessionStart(gev, clnt); err != nil {
smg.sessionEnd(gev.GetUUID(), 0)
return nilDuration, err
return
}
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Session handled by debit loop
return -1, nil
maxUsage = -1
return
}
d, err := smg.UpdateSession(gev, clnt)
if err != nil || d == 0 {
maxUsage, err = smg.UpdateSession(gev, clnt)
if err != nil || maxUsage == 0 {
smg.sessionEnd(gev.GetUUID(), 0)
}
return d, err
return
}
// Execute debits for usage/maxUsage
func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (time.Duration, error) {
func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) {
cacheKey := "UpdateSession" + gev.GetCgrId(smg.timezone)
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err})
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active
return 0, errors.New("ACTIVE_DEBIT_LOOP")
err = errors.New("ACTIVE_DEBIT_LOOP")
return
}
defer smg.replicateSessions(gev.GetUUID())
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
if initialID, errGet := gev.GetFieldAsString(utils.InitialOriginID); errGet == nil {
defer smg.replicateSessions(initialID)
err := smg.sessionRelocate(gev.GetUUID(), initialID)
err = smg.sessionRelocate(gev.GetUUID(), initialID)
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(gev, clnt)
}
if err != nil {
return nilDuration, err
return
}
}
smg.resetTerminatorTimer(gev.GetUUID(), gev.GetSessionTTL(), gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage())
var lastUsed *time.Duration
if evLastUsed, err := gev.GetLastUsed(utils.META_DEFAULT); err == nil {
var evLastUsed time.Duration
if evLastUsed, err = gev.GetLastUsed(utils.META_DEFAULT); err == nil {
lastUsed = &evLastUsed
} else if err != utils.ErrNotFound {
return nilDuration, err
return
}
evMaxUsage, err := gev.GetMaxUsage(utils.META_DEFAULT, smg.cgrCfg.SmGenericConfig.MaxCallDuration)
if err != nil {
if maxUsage, err = gev.GetMaxUsage(utils.META_DEFAULT, smg.cgrCfg.SmGenericConfig.MaxCallDuration); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrMandatoryIeMissing
}
return nilDuration, err
return
}
aSessions := smg.getASession(gev.GetUUID())
if len(aSessions) == 0 {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> SessionUpdate with no active sessions for event: <%s>", gev.GetUUID()))
return nilDuration, utils.ErrServerError
err = utils.ErrServerError
return
}
for _, s := range aSessions {
if maxDur, err := s.debit(evMaxUsage, lastUsed); err != nil {
return nilDuration, err
} else if maxDur < evMaxUsage {
evMaxUsage = maxDur
var maxDur time.Duration
if maxDur, err = s.debit(maxUsage, lastUsed); err != nil {
return
} else if maxDur < maxUsage {
maxUsage = maxDur
}
}
return evMaxUsage, nil
return
}
// Called on session end, should stop debit loop
func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) error {
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
err := smg.sessionRelocate(gev.GetUUID(), initialID)
func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (err error) {
cacheKey := "TerminateSession" + gev.GetCgrId(smg.timezone)
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Err: err})
if initialID, errGet := gev.GetFieldAsString(utils.InitialOriginID); errGet == nil {
err = smg.sessionRelocate(gev.GetUUID(), initialID)
defer smg.replicateSessions(initialID)
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(gev, clnt)
}
if err != nil && err != utils.ErrMandatoryIeMissing {
return err
return
}
}
sessionIDs := []string{gev.GetUUID()}
if sessionIDPrefix, err := gev.GetFieldAsString(utils.OriginIDPrefix); err == nil { // OriginIDPrefix is present, OriginID will not be anymore considered
if sessionIDPrefix, errPrefix := gev.GetFieldAsString(utils.OriginIDPrefix); errPrefix == nil { // OriginIDPrefix is present, OriginID will not be anymore considered
sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix)
}
usage, errUsage := gev.GetUsage(utils.META_DEFAULT)
var lastUsed time.Duration
if errUsage != nil {
if errUsage != utils.ErrNotFound {
return errUsage
err = errUsage
return
}
var err error
lastUsed, err = gev.GetLastUsed(utils.META_DEFAULT)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrMandatoryIeMissing
}
return err
return
}
}
var interimError error
var hasActiveSession bool
for _, sessionID := range sessionIDs {
defer smg.replicateSessions(sessionID)
@@ -520,23 +569,29 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli
if errUsage != nil {
usage = s.TotalUsage - s.LastUsage + lastUsed
}
if err := smg.sessionEnd(sessionID, usage); err != nil {
interimError = err // Last error will be the one returned as API result
if errSEnd := smg.sessionEnd(sessionID, usage); errSEnd != nil {
err = errSEnd // Last error will be the one returned as API result
}
}
if !hasActiveSession {
return utils.ErrNoActiveSession
err = utils.ErrNoActiveSession
return
}
return interimError
return
}
// Processes one time events (eg: SMS)
func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err error) {
func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, err error) {
cacheKey := "ChargeEvent" + gev.GetCgrId(smg.timezone)
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err})
var sessionRuns []*engine.SessionRun
if err := smg.rater.Call("Responder.GetSessionRuns", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil {
return nilDuration, err
if err = smg.rater.Call("Responder.GetSessionRuns", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil {
return
} else if len(sessionRuns) == 0 {
return nilDuration, nil
return
}
var maxDurInit bool // Avoid differences between default 0 and received 0
for _, sR := range sessionRuns {
@@ -549,8 +604,8 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err
if ccDur := cc.GetDuration(); ccDur == 0 {
err = utils.ErrInsufficientCredit
break
} else if !maxDurInit || ccDur < maxDur {
maxDur = ccDur
} else if !maxDurInit || ccDur < maxUsage {
maxUsage = ccDur
}
}
if err != nil { // Refund the ones already taken since we have error on one of the debits
@@ -579,13 +634,13 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err
cd.Increments.Compress()
//utils.Logger.Info(fmt.Sprintf("Refunding session run callcost: %s", utils.ToJSON(cd)))
var response float64
err := smg.rater.Call("Responder.RefundIncrements", cd, &response)
err = smg.rater.Call("Responder.RefundIncrements", cd, &response)
if err != nil {
return nilDuration, err
return
}
}
}
return nilDuration, err
return
}
var withErrors bool
for _, sR := range sessionRuns {
@@ -604,8 +659,8 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err
cd := cc.CreateCallDescriptor()
cd.Increments = roundIncrements
var response float64
if err := smg.rater.Call("Responder.RefundRounding", cd, &response); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM> ERROR failed to refund rounding: %v", err))
if errRefund := smg.rater.Call("Responder.RefundRounding", cd, &response); errRefund != nil {
utils.Logger.Err(fmt.Sprintf("<SM> ERROR failed to refund rounding: %v", errRefund))
}
}
var reply string
@@ -617,23 +672,30 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err
OriginID: gev.GetUUID(),
CostDetails: cc,
}
if err := smg.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil && !strings.HasSuffix(err.Error(), utils.ErrExists.Error()) {
if errStore := smg.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost,
CheckDuplicate: true}, &reply); errStore != nil && !strings.HasSuffix(errStore.Error(), utils.ErrExists.Error()) {
withErrors = true
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not save CC: %+v, RunID: %s error: %s", cc, sR.DerivedCharger.RunID, err.Error()))
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not save CC: %+v, RunID: %s error: %s", cc, sR.DerivedCharger.RunID, errStore.Error()))
}
}
if withErrors {
return nilDuration, ErrPartiallyExecuted
err = ErrPartiallyExecuted
return
}
return maxDur, nil
return
}
func (smg *SMGeneric) ProcessCDR(gev SMGenericEvent) error {
var reply string
if err := smg.cdrsrv.Call("CdrsV1.ProcessCDR", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &reply); err != nil {
return err
func (smg *SMGeneric) ProcessCDR(gev SMGenericEvent) (err error) {
cacheKey := "ProcessCDR" + gev.GetCgrId(smg.timezone)
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Err
}
return nil
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Err: err})
var reply string
if err = smg.cdrsrv.Call("CdrsV1.ProcessCDR", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &reply); err != nil {
return
}
return
}
func (smg *SMGeneric) Connect() error {