Release resource for sync sessions

This commit is contained in:
TeoV
2018-06-27 09:46:46 -04:00
committed by Dan Christian Bogos
parent d50812e3a7
commit 120b104f85
3 changed files with 84 additions and 18 deletions

View File

@@ -594,6 +594,29 @@ func testCallSyncSessions(t *testing.T) {
} else if len(*reply) != 2 {
t.Errorf("expecting 2, received reply: %+v", utils.ToJSON(reply))
}
//check if resource was allocated for 2 calls(1001->1002;1001->1003)
var rs *engine.Resources
args := &utils.ArgRSv1ResourceUsage{
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "ResourceAllocation",
Event: map[string]interface{}{
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1002"},
},
Units: 1,
}
if err := tutorialCallsRpc.Call(utils.ResourceSv1GetResourcesForEvent, args, &rs); err != nil {
t.Error(err)
} else if len(*rs) != 2 {
t.Errorf("Resources: %+v", utils.ToJSON(rs))
}
for _, r := range *rs {
if r.ID == "ResGroup1" && (len(r.Usages) != 2 || len(r.TTLIdx) != 2) {
t.Errorf("Unexpected resource: %+v", utils.ToJSON(r))
}
}
time.Sleep(3 * time.Second)
//stop the FS
@@ -656,6 +679,19 @@ func testCallSyncSessions(t *testing.T) {
}
}
//check if resource was released
var rsAfter *engine.Resources
if err := tutorialCallsRpc.Call(utils.ResourceSv1GetResourcesForEvent, args, &rsAfter); err != nil {
t.Error(err)
} else if len(*rsAfter) != 2 {
t.Errorf("Resources: %+v", rsAfter)
}
for _, r := range *rsAfter {
if r.ID == "ResGroup1" &&
(len(r.Usages) != 0 || len(r.TTLIdx) != 0) {
t.Errorf("Unexpected resource: %+v", utils.ToJSON(r))
}
}
}
func testCallStopPjsuaListener(t *testing.T) {

View File

@@ -44,6 +44,7 @@ type SMGSession struct {
Timezone string
EventStart SMGenericEvent // Event which started the session
CD *engine.CallDescriptor // initial CD used for debits, updated on each debit
ResourceID string
EventCost *engine.EventCost
ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked

View File

@@ -225,6 +225,23 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) {
cdr.Usage = s.TotalUsage
var reply string
smg.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply)
if smg.resS != nil && s.ResourceID != "" {
var reply string
argsRU := utils.ArgRSv1ResourceUsage{
CGREvent: utils.CGREvent{
Tenant: s.EventStart.GetTenant(utils.META_DEFAULT),
Event: s.EventStart,
},
UsageID: s.ResourceID,
Units: 1,
}
if err := smg.resS.Call(utils.ResourceSv1ReleaseResources,
argsRU, &reply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s could not release resource with resourceID: %s",
utils.SessionS, err.Error(), s.ResourceID))
}
}
smg.replicateSessionsWithID(s.CGRID, false, smg.smgReplConns)
}
@@ -398,7 +415,7 @@ func (smg *SMGeneric) getSessionIDsForPrefix(prefix string, passiveSessions bool
// sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request
func (smg *SMGeneric) sessionStart(evStart SMGenericEvent,
clntConn rpcclient.RpcClientConnection) (err error) {
clntConn rpcclient.RpcClientConnection, resourceID string) (err error) {
cgrID := evStart.GetCGRID(utils.META_DEFAULT)
_, err = guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level
if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 {
@@ -409,7 +426,7 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent,
evStart.AsCDR(smg.cgrCfg, smg.Timezone), &sessionRuns); err != nil {
return nil, err
} else if len(sessionRuns) == 0 {
s := &SMGSession{CGRID: cgrID, EventStart: evStart,
s := &SMGSession{CGRID: cgrID, ResourceID: resourceID, EventStart: evStart,
RunID: utils.META_NONE, Timezone: smg.Timezone,
rals: smg.rals, cdrsrv: smg.cdrsrv,
clntConn: clntConn}
@@ -418,7 +435,7 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent,
}
stopDebitChan := make(chan struct{})
for _, sessionRun := range sessionRuns {
s := &SMGSession{CGRID: cgrID, EventStart: evStart,
s := &SMGSession{CGRID: cgrID, ResourceID: resourceID, EventStart: evStart,
RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.Timezone,
rals: smg.rals, cdrsrv: smg.cdrsrv,
CD: sessionRun.CallDescriptor, clntConn: clntConn,
@@ -722,7 +739,7 @@ func (smg *SMGeneric) GetMaxUsage(gev SMGenericEvent) (maxUsage time.Duration, e
// Called on session start
func (smg *SMGeneric) InitiateSession(gev SMGenericEvent,
clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) {
clnt rpcclient.RpcClientConnection, resourceID string) (maxUsage time.Duration, err error) {
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cacheKey := "InitiateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
@@ -731,7 +748,7 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent,
defer smg.responseCache.Cache(cacheKey,
&utils.ResponseCacheItem{Value: maxUsage, Err: err}) // schedule response caching
smg.deletePassiveSessions(cgrID)
if err = smg.sessionStart(gev, clnt); err != nil {
if err = smg.sessionStart(gev, clnt, resourceID); err != nil {
smg.sessionEnd(cgrID, 0)
return
}
@@ -739,7 +756,7 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent,
maxUsage = time.Duration(-1)
return
}
maxUsage, err = smg.UpdateSession(gev, clnt)
maxUsage, err = smg.UpdateSession(gev, clnt, resourceID)
if err != nil || maxUsage == 0 {
smg.sessionEnd(cgrID, 0)
}
@@ -748,7 +765,7 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent,
// Execute debits for usage/maxUsage
func (smg *SMGeneric) UpdateSession(gev SMGenericEvent,
clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) {
clnt rpcclient.RpcClientConnection, resourceID string) (maxUsage time.Duration, err error) {
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cacheKey := "UpdateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
@@ -765,7 +782,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent,
err = smg.sessionRelocate(initialCGRID,
cgrID, gev.GetOriginID(utils.META_DEFAULT))
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(gev, clnt)
err = smg.sessionStart(gev, clnt, resourceID)
}
if err != nil {
return
@@ -820,7 +837,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent,
// Called on session end, should stop debit loop
func (smg *SMGeneric) TerminateSession(gev SMGenericEvent,
clnt rpcclient.RpcClientConnection) (err error) {
clnt rpcclient.RpcClientConnection, resourceID string) (err error) {
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cacheKey := "TerminateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
@@ -832,7 +849,7 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent,
err = smg.sessionRelocate(initialCGRID, cgrID,
gev.GetOriginID(utils.META_DEFAULT))
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(gev, clnt)
err = smg.sessionStart(gev, clnt, resourceID)
}
if err != nil && err != utils.ErrMandatoryIeMissing {
return
@@ -1104,7 +1121,7 @@ func (smg *SMGeneric) BiRPCV2GetMaxUsage(clnt rpcclient.RpcClientConnection,
func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *float64) (err error) {
var minMaxUsage time.Duration
if minMaxUsage, err = smg.InitiateSession(ev, clnt); err != nil {
if minMaxUsage, err = smg.InitiateSession(ev, clnt, ev.GetOriginID(utils.META_DEFAULT)); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1122,7 +1139,7 @@ func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection,
func (smg *SMGeneric) BiRPCV2InitiateSession(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *time.Duration) (err error) {
var minMaxUsage time.Duration
if minMaxUsage, err = smg.InitiateSession(ev, clnt); err != nil {
if minMaxUsage, err = smg.InitiateSession(ev, clnt, ev.GetOriginID(utils.META_DEFAULT)); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1137,7 +1154,7 @@ func (smg *SMGeneric) BiRPCV2InitiateSession(clnt rpcclient.RpcClientConnection,
func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *float64) (err error) {
var minMaxUsage time.Duration
if minMaxUsage, err = smg.UpdateSession(ev, clnt); err != nil {
if minMaxUsage, err = smg.UpdateSession(ev, clnt, ev.GetOriginID(utils.META_DEFAULT)); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1155,7 +1172,7 @@ func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection,
func (smg *SMGeneric) BiRPCV2UpdateSession(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *time.Duration) (err error) {
var minMaxUsage time.Duration
if minMaxUsage, err = smg.UpdateSession(ev, clnt); err != nil {
if minMaxUsage, err = smg.UpdateSession(ev, clnt, ev.GetOriginID(utils.META_DEFAULT)); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1168,7 +1185,7 @@ func (smg *SMGeneric) BiRPCV2UpdateSession(clnt rpcclient.RpcClientConnection,
// Called on session end, should stop debit loop
func (smg *SMGeneric) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, reply *string) (err error) {
if err = smg.TerminateSession(ev, clnt); err != nil {
if err = smg.TerminateSession(ev, clnt, ev.GetOriginID(utils.META_DEFAULT)); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1677,7 +1694,11 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
if smg.rals == nil {
return utils.NewErrNotConnected(utils.RALService)
}
if maxUsage, err := smg.InitiateSession(args.CGREvent.Event, clnt); err != nil {
originID, err := args.CGREvent.FieldAsString(utils.OriginID)
if err != nil {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
if maxUsage, err := smg.InitiateSession(args.CGREvent.Event, clnt, originID); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
@@ -1826,7 +1847,11 @@ func (smg *SMGeneric) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection,
if smg.rals == nil {
return utils.NewErrNotConnected(utils.RALService)
}
if maxUsage, err := smg.UpdateSession(args.CGREvent.Event, clnt); err != nil {
originID, err := args.CGREvent.FieldAsString(utils.OriginID)
if err != nil {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
if maxUsage, err := smg.UpdateSession(args.CGREvent.Event, clnt, originID); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
@@ -1869,7 +1894,11 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection
if smg.rals == nil {
return utils.NewErrNotConnected(utils.RALService)
}
if err = smg.TerminateSession(args.CGREvent.Event, clnt); err != nil {
originID, err := args.CGREvent.FieldAsString(utils.OriginID)
if err != nil {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
if err = smg.TerminateSession(args.CGREvent.Event, clnt, originID); err != nil {
return utils.NewErrRALs(err)
}
}