Rename SessionSv2.ProcessEvent to SessionSv1.ProcessEvent

This commit is contained in:
TeoV
2019-07-15 16:20:48 +03:00
committed by Dan Christian Bogos
parent 561175e0b3
commit d0bc5fba9b
2 changed files with 122 additions and 70 deletions

View File

@@ -3015,7 +3015,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.RpcClientConnection,
}
// V2ProcessEventArgs are the options passed to ProcessEvent API
type V2ProcessEventArgs struct {
type V1ProcessEventArgs struct {
Flags []string
*utils.CGREvent
utils.Paginator
@@ -3023,10 +3023,11 @@ type V2ProcessEventArgs struct {
}
// V2ProcessEventReply is the reply for the ProcessEvent API
type V2ProcessEventReply struct {
type V1ProcessEventReply struct {
MaxUsage *time.Duration
ResourceAuthorization *string
ResourceAllocation *string
ResourceRelease *string
Attributes *engine.AttrSProcessEventReply
Suppliers *engine.SortedSuppliers
ThresholdIDs *[]string
@@ -3034,15 +3035,15 @@ type V2ProcessEventReply struct {
}
// BiRPCv2ProcessEvent processes one event with the right subsystems based on arguments received
func (sS *SessionS) BiRPCv2ProcessEvent(clnt rpcclient.RpcClientConnection,
args *V2ProcessEventArgs, rply *V2ProcessEventReply) (err error) {
func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
args *V1ProcessEventArgs, rply *V1ProcessEventReply) (err error) {
if args.CGREvent.ID == "" {
args.CGREvent.ID = utils.GenUUID()
}
// RPC caching
if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv2ProcessEvent, args.CGREvent.ID)
cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessEvent, args.CGREvent.ID)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
@@ -3050,7 +3051,7 @@ func (sS *SessionS) BiRPCv2ProcessEvent(clnt rpcclient.RpcClientConnection,
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*rply = *cachedResp.Result.(*V2ProcessEventReply)
*rply = *cachedResp.Result.(*V1ProcessEventReply)
}
return cachedResp.Error
}
@@ -3064,14 +3065,15 @@ func (sS *SessionS) BiRPCv2ProcessEvent(clnt rpcclient.RpcClientConnection,
args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
}
me := engine.NewMapEvent(args.CGREvent.Event)
ev := engine.NewSafEvent(args.CGREvent.Event)
originID := me.GetStringIgnoreErrors(utils.OriginID)
dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval
//convert from Flags []string to utils.FlagsWithParams
argsFlagsWithParams, err := utils.FlagsWithParamsFromSlice(args.Flags)
if err != nil {
return
}
// check for *attribute
if argsFlagsWithParams.HasKey(utils.MetaAttributes) {
if sS.attrS == nil {
@@ -3099,54 +3101,69 @@ func (sS *SessionS) BiRPCv2ProcessEvent(clnt rpcclient.RpcClientConnection,
return utils.NewErrAttributeS(err)
}
}
//we create a resFunc because in case of authorize we need to authorize or allocate
//a resource after the session was authorized; in case of init/update/terminate
//we need to do this after the session was initiated/updated/terminated
resFunc := func() (err error) {
// check for *resources
if argsFlagsWithParams.HasKey(utils.MetaResources) {
if sS.resS == nil {
return utils.NewErrNotConnected(utils.ResourceS)
}
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
attrRU := utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
UsageID: originID,
Units: 1,
ArgDispatcher: args.ArgDispatcher,
}
var resMessage string
// check what we need to do for resources (*authorization/*allocation)
if resOpt := argsFlagsWithParams.ParamsSlice(utils.MetaResources); len(resOpt) != 0 {
//check for subflags and convert them into utils.FlagsWithParams
resourceFlagsWithParams, err := utils.FlagsWithParamsFromSlice(resOpt)
if err != nil {
return err
}
if resourceFlagsWithParams.HasKey(utils.MetaAuthorize) {
if err = sS.resS.Call(utils.ResourceSv1AuthorizeResources,
attrRU, &resMessage); err != nil {
return utils.NewErrResourceS(err)
}
rply.ResourceAuthorization = &resMessage
}
if resourceFlagsWithParams.HasKey(utils.MetaAllocate) {
if err = sS.resS.Call(utils.ResourceSv1AllocateResources,
attrRU, &resMessage); err != nil {
return utils.NewErrResourceS(err)
}
rply.ResourceAllocation = &resMessage
}
}
}
return
}
//check for *auth/*init/*update/*terminate flags
//only one of them can be executed
switch {
//check for auth session
if argsFlagsWithParams.HasKey(utils.MetaAuth) {
case argsFlagsWithParams.HasKey(utils.MetaAuth):
maxUsage, err := sS.authSession(args.CGREvent.Tenant,
engine.NewSafEvent(args.CGREvent.Event))
if err != nil {
return utils.NewErrRALs(err)
}
rply.MaxUsage = &maxUsage
}
// check for *resources
if argsFlagsWithParams.HasKey(utils.MetaResources) {
if sS.resS == nil {
return utils.NewErrNotConnected(utils.ResourceS)
if err = resFunc(); err != nil {
return err
}
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
attrRU := utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
UsageID: originID,
Units: 1,
ArgDispatcher: args.ArgDispatcher,
}
var resMessage string
// check what we need to do for resources (*authorization/*allocation)
if resOpt := argsFlagsWithParams.ParamsSlice(utils.MetaResources); len(resOpt) != 0 {
if resOpt[0] == utils.MetaAuthorize {
if err = sS.resS.Call(utils.ResourceSv1AuthorizeResources,
attrRU, &resMessage); err != nil {
return utils.NewErrResourceS(err)
}
rply.ResourceAuthorization = &resMessage
}
if resOpt[0] == utils.MetaAllocate {
if err = sS.resS.Call(utils.ResourceSv1AllocateResources,
attrRU, &resMessage); err != nil {
return utils.NewErrResourceS(err)
}
rply.ResourceAllocation = &resMessage
}
}
}
// check for init session
if argsFlagsWithParams.HasKey(utils.MetaInitiate) {
var err error
ev := engine.NewSafEvent(args.CGREvent.Event)
dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval
case argsFlagsWithParams.HasKey(utils.MetaInitiate):
if err = resFunc(); err != nil {
return err
}
if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
@@ -3166,14 +3183,42 @@ func (sS *SessionS) BiRPCv2ProcessEvent(clnt rpcclient.RpcClientConnection,
rply.MaxUsage = &maxUsage
}
}
}
// check for terminate session
if argsFlagsWithParams.HasKey(utils.MetaTerminate) {
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
//check for update session
case argsFlagsWithParams.HasKey(utils.MetaUpdate):
if err = resFunc(); err != nil {
return err
}
if me.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = me.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}
ev := engine.NewSafEvent(args.CGREvent.Event)
dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval
cgrID := GetSetCGRID(ev)
ss := sS.getRelocateSessions(cgrID,
me.GetStringIgnoreErrors(utils.InitialOriginID),
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost))
var s *Session
if len(ss) == 0 {
if s, err = sS.initSession(args.CGREvent.Tenant,
ev, sS.biJClntID(clnt),
me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil {
return utils.NewErrRALs(err)
}
} else {
s = ss[0]
}
if maxUsage, err := sS.updateSession(s, ev.AsMapInterface()); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
}
// check for terminate session
case argsFlagsWithParams.HasKey(utils.MetaTerminate):
if err = resFunc(); err != nil {
return err
}
if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
@@ -3203,25 +3248,32 @@ func (sS *SessionS) BiRPCv2ProcessEvent(clnt rpcclient.RpcClientConnection,
}
// in case we need to release a resource we do this after we close the session
if argsFlagsWithParams.HasKey(utils.MetaResources) {
if resOpt := argsFlagsWithParams.ParamsSlice(utils.MetaResources); len(resOpt) != 0 &&
resOpt[0] == utils.MetaRelease {
if sS.resS == nil {
return utils.NewErrNotConnected(utils.ResourceS)
if sS.resS == nil {
return utils.NewErrNotConnected(utils.ResourceS)
}
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
var reply string
argsRU := utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired
Units: 1,
ArgDispatcher: args.ArgDispatcher,
}
if resOpt := argsFlagsWithParams.ParamsSlice(utils.MetaResources); len(resOpt) != 0 {
//check for subflags and convert them into utils.FlagsWithParams
resourceFlagsWithParams, err := utils.FlagsWithParamsFromSlice(resOpt)
if err != nil {
return err
}
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
var reply string
argsRU := utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired
Units: 1,
ArgDispatcher: args.ArgDispatcher,
}
if err = sS.resS.Call(utils.ResourceSv1ReleaseResources,
argsRU, &reply); err != nil {
return utils.NewErrResourceS(err)
if resourceFlagsWithParams.HasKey(utils.MetaRelease) {
if err = sS.resS.Call(utils.ResourceSv1ReleaseResources,
argsRU, &reply); err != nil {
return utils.NewErrResourceS(err)
}
}
rply.ResourceRelease = &reply
}
}
// get suppliers if required

View File

@@ -895,7 +895,7 @@ const (
SessionSv1TerminateSession = "SessionSv1.TerminateSession"
SessionSv1ProcessCDR = "SessionSv1.ProcessCDR"
SessionSv1ProcessMessage = "SessionSv1.ProcessMessage"
SessionSv2ProcessEvent = "SessionSv2.ProcessEvent"
SessionSv1ProcessEvent = "SessionSv1.ProcessEvent"
SessionSv1DisconnectSession = "SessionSv1.DisconnectSession"
SessionSv1GetActiveSessions = "SessionSv1.GetActiveSessions"
SessionSv1GetActiveSessionsCount = "SessionSv1.GetActiveSessionsCount"