diff --git a/general_tests/tutorial_calls_test.go b/general_tests/tutorial_calls_test.go index 5742d61eb..644d6d828 100755 --- a/general_tests/tutorial_calls_test.go +++ b/general_tests/tutorial_calls_test.go @@ -594,6 +594,29 @@ func testCallSyncSessions(t *testing.T) { } else if len(*reply) != 2 { t.Errorf("expecting 2, received reply: %+v", utils.ToJSON(reply)) } + //check if resource was allocated for 2 calls(1001->1002;1001->1003) + var rs *engine.Resources + args := &utils.ArgRSv1ResourceUsage{ + CGREvent: utils.CGREvent{ + Tenant: "cgrates.org", + ID: "ResourceAllocation", + Event: map[string]interface{}{ + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002"}, + }, + Units: 1, + } + if err := tutorialCallsRpc.Call(utils.ResourceSv1GetResourcesForEvent, args, &rs); err != nil { + t.Error(err) + } else if len(*rs) != 2 { + t.Errorf("Resources: %+v", utils.ToJSON(rs)) + } + for _, r := range *rs { + if r.ID == "ResGroup1" && (len(r.Usages) != 2 || len(r.TTLIdx) != 2) { + t.Errorf("Unexpected resource: %+v", utils.ToJSON(r)) + } + } time.Sleep(3 * time.Second) //stop the FS @@ -656,6 +679,19 @@ func testCallSyncSessions(t *testing.T) { } } + //check if resource was released + var rsAfter *engine.Resources + if err := tutorialCallsRpc.Call(utils.ResourceSv1GetResourcesForEvent, args, &rsAfter); err != nil { + t.Error(err) + } else if len(*rsAfter) != 2 { + t.Errorf("Resources: %+v", rsAfter) + } + for _, r := range *rsAfter { + if r.ID == "ResGroup1" && + (len(r.Usages) != 0 || len(r.TTLIdx) != 0) { + t.Errorf("Unexpected resource: %+v", utils.ToJSON(r)) + } + } } func testCallStopPjsuaListener(t *testing.T) { diff --git a/sessions/session.go b/sessions/session.go index 763b03dbe..9112347da 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -44,6 +44,7 @@ type SMGSession struct { Timezone string EventStart SMGenericEvent // Event which started the session CD *engine.CallDescriptor // initial CD used for debits, updated on each debit + ResourceID string EventCost *engine.EventCost ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked diff --git a/sessions/sessions.go b/sessions/sessions.go index 50625d808..f046f0999 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -225,6 +225,23 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) { cdr.Usage = s.TotalUsage var reply string smg.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply) + if smg.resS != nil && s.ResourceID != "" { + var reply string + argsRU := utils.ArgRSv1ResourceUsage{ + CGREvent: utils.CGREvent{ + Tenant: s.EventStart.GetTenant(utils.META_DEFAULT), + Event: s.EventStart, + }, + UsageID: s.ResourceID, + Units: 1, + } + if err := smg.resS.Call(utils.ResourceSv1ReleaseResources, + argsRU, &reply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s could not release resource with resourceID: %s", + utils.SessionS, err.Error(), s.ResourceID)) + } + } smg.replicateSessionsWithID(s.CGRID, false, smg.smgReplConns) } @@ -398,7 +415,7 @@ func (smg *SMGeneric) getSessionIDsForPrefix(prefix string, passiveSessions bool // sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, - clntConn rpcclient.RpcClientConnection) (err error) { + clntConn rpcclient.RpcClientConnection, resourceID string) (err error) { cgrID := evStart.GetCGRID(utils.META_DEFAULT) _, err = guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 { @@ -409,7 +426,7 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, evStart.AsCDR(smg.cgrCfg, smg.Timezone), &sessionRuns); err != nil { return nil, err } else if len(sessionRuns) == 0 { - s := &SMGSession{CGRID: cgrID, EventStart: evStart, + s := &SMGSession{CGRID: cgrID, ResourceID: resourceID, EventStart: evStart, RunID: utils.META_NONE, Timezone: smg.Timezone, rals: smg.rals, cdrsrv: smg.cdrsrv, clntConn: clntConn} @@ -418,7 +435,7 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, } stopDebitChan := make(chan struct{}) for _, sessionRun := range sessionRuns { - s := &SMGSession{CGRID: cgrID, EventStart: evStart, + s := &SMGSession{CGRID: cgrID, ResourceID: resourceID, EventStart: evStart, RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.Timezone, rals: smg.rals, cdrsrv: smg.cdrsrv, CD: sessionRun.CallDescriptor, clntConn: clntConn, @@ -722,7 +739,7 @@ func (smg *SMGeneric) GetMaxUsage(gev SMGenericEvent) (maxUsage time.Duration, e // Called on session start func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, - clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) { + clnt rpcclient.RpcClientConnection, resourceID string) (maxUsage time.Duration, err error) { cgrID := gev.GetCGRID(utils.META_DEFAULT) cacheKey := "InitiateSession" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { @@ -731,7 +748,7 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: maxUsage, Err: err}) // schedule response caching smg.deletePassiveSessions(cgrID) - if err = smg.sessionStart(gev, clnt); err != nil { + if err = smg.sessionStart(gev, clnt, resourceID); err != nil { smg.sessionEnd(cgrID, 0) return } @@ -739,7 +756,7 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, maxUsage = time.Duration(-1) return } - maxUsage, err = smg.UpdateSession(gev, clnt) + maxUsage, err = smg.UpdateSession(gev, clnt, resourceID) if err != nil || maxUsage == 0 { smg.sessionEnd(cgrID, 0) } @@ -748,7 +765,7 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, // Execute debits for usage/maxUsage func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, - clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) { + clnt rpcclient.RpcClientConnection, resourceID string) (maxUsage time.Duration, err error) { cgrID := gev.GetCGRID(utils.META_DEFAULT) cacheKey := "UpdateSession" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { @@ -765,7 +782,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, err = smg.sessionRelocate(initialCGRID, cgrID, gev.GetOriginID(utils.META_DEFAULT)) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update - err = smg.sessionStart(gev, clnt) + err = smg.sessionStart(gev, clnt, resourceID) } if err != nil { return @@ -820,7 +837,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, // Called on session end, should stop debit loop func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, - clnt rpcclient.RpcClientConnection) (err error) { + clnt rpcclient.RpcClientConnection, resourceID string) (err error) { cgrID := gev.GetCGRID(utils.META_DEFAULT) cacheKey := "TerminateSession" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { @@ -832,7 +849,7 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, err = smg.sessionRelocate(initialCGRID, cgrID, gev.GetOriginID(utils.META_DEFAULT)) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update - err = smg.sessionStart(gev, clnt) + err = smg.sessionStart(gev, clnt, resourceID) } if err != nil && err != utils.ErrMandatoryIeMissing { return @@ -1104,7 +1121,7 @@ func (smg *SMGeneric) BiRPCV2GetMaxUsage(clnt rpcclient.RpcClientConnection, func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) (err error) { var minMaxUsage time.Duration - if minMaxUsage, err = smg.InitiateSession(ev, clnt); err != nil { + if minMaxUsage, err = smg.InitiateSession(ev, clnt, ev.GetOriginID(utils.META_DEFAULT)); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1122,7 +1139,7 @@ func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection, func (smg *SMGeneric) BiRPCV2InitiateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *time.Duration) (err error) { var minMaxUsage time.Duration - if minMaxUsage, err = smg.InitiateSession(ev, clnt); err != nil { + if minMaxUsage, err = smg.InitiateSession(ev, clnt, ev.GetOriginID(utils.META_DEFAULT)); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1137,7 +1154,7 @@ func (smg *SMGeneric) BiRPCV2InitiateSession(clnt rpcclient.RpcClientConnection, func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) (err error) { var minMaxUsage time.Duration - if minMaxUsage, err = smg.UpdateSession(ev, clnt); err != nil { + if minMaxUsage, err = smg.UpdateSession(ev, clnt, ev.GetOriginID(utils.META_DEFAULT)); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1155,7 +1172,7 @@ func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, func (smg *SMGeneric) BiRPCV2UpdateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *time.Duration) (err error) { var minMaxUsage time.Duration - if minMaxUsage, err = smg.UpdateSession(ev, clnt); err != nil { + if minMaxUsage, err = smg.UpdateSession(ev, clnt, ev.GetOriginID(utils.META_DEFAULT)); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1168,7 +1185,7 @@ func (smg *SMGeneric) BiRPCV2UpdateSession(clnt rpcclient.RpcClientConnection, // Called on session end, should stop debit loop func (smg *SMGeneric) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, reply *string) (err error) { - if err = smg.TerminateSession(ev, clnt); err != nil { + if err = smg.TerminateSession(ev, clnt, ev.GetOriginID(utils.META_DEFAULT)); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1677,7 +1694,11 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, if smg.rals == nil { return utils.NewErrNotConnected(utils.RALService) } - if maxUsage, err := smg.InitiateSession(args.CGREvent.Event, clnt); err != nil { + originID, err := args.CGREvent.FieldAsString(utils.OriginID) + if err != nil { + return utils.NewErrMandatoryIeMissing(utils.OriginID) + } + if maxUsage, err := smg.InitiateSession(args.CGREvent.Event, clnt, originID); err != nil { return utils.NewErrRALs(err) } else { rply.MaxUsage = &maxUsage @@ -1826,7 +1847,11 @@ func (smg *SMGeneric) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, if smg.rals == nil { return utils.NewErrNotConnected(utils.RALService) } - if maxUsage, err := smg.UpdateSession(args.CGREvent.Event, clnt); err != nil { + originID, err := args.CGREvent.FieldAsString(utils.OriginID) + if err != nil { + return utils.NewErrMandatoryIeMissing(utils.OriginID) + } + if maxUsage, err := smg.UpdateSession(args.CGREvent.Event, clnt, originID); err != nil { return utils.NewErrRALs(err) } else { rply.MaxUsage = &maxUsage @@ -1869,7 +1894,11 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection if smg.rals == nil { return utils.NewErrNotConnected(utils.RALService) } - if err = smg.TerminateSession(args.CGREvent.Event, clnt); err != nil { + originID, err := args.CGREvent.FieldAsString(utils.OriginID) + if err != nil { + return utils.NewErrMandatoryIeMissing(utils.OriginID) + } + if err = smg.TerminateSession(args.CGREvent.Event, clnt, originID); err != nil { return utils.NewErrRALs(err) } }