Updated SessionSTerminate

This commit is contained in:
Trial97
2021-08-24 13:49:52 +03:00
committed by Dan Christian Bogos
parent 56e9e8ca0f
commit eecaf84cf5
23 changed files with 95 additions and 428 deletions

View File

@@ -300,17 +300,14 @@ func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) {
return
}
// populate terminate session args
tsArgs := ev.V1TerminateSessionArgs(*cgrEvDisp)
if tsArgs == nil {
utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate terminate session arguments",
utils.AsteriskAgent, chID))
return
if cgrEvDisp.APIOpts == nil {
cgrEvDisp.APIOpts = map[string]interface{}{utils.OptsSesTerminate: true}
}
var reply string
if err := sma.connMgr.Call(sma.ctx, sma.cgrCfg.AsteriskAgentCfg().SessionSConns,
utils.SessionSv1TerminateSession,
tsArgs, &reply); err != nil {
cgrEvDisp, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to terminate session for channelID: %s",
utils.AsteriskAgent, err.Error(), chID))
}

View File

@@ -297,18 +297,3 @@ func (smaEv *SMAsteriskEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs)
args.ParseFlags(smaEv.Subsystems(), utils.PlusChar)
return
}
func (smaEv *SMAsteriskEvent) V1TerminateSessionArgs(cgrEvDisp utils.CGREvent) (args *sessions.V1TerminateSessionArgs) {
args = &sessions.V1TerminateSessionArgs{ // defaults
CGREvent: &cgrEvDisp,
}
subsystems, err := cgrEvDisp.FieldAsString(utils.CGRFlags)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> event: %s don't have %s variable",
utils.AsteriskAgent, utils.ToJSON(cgrEvDisp), utils.CGRFlags))
args.TerminateSession = true
return
}
args.ParseFlags(subsystems, utils.PlusChar)
return
}

View File

@@ -24,7 +24,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
var (
@@ -393,39 +392,6 @@ func TestSMAEventV1AuthorizeArgs(t *testing.T) {
}
}
func TestSMAEventV1TerminateSessionArgs(t *testing.T) {
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "AsteriskEvent",
Event: map[string]interface{}{
"MissingCGRSubsustems": "",
},
}
exp := &sessions.V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: cgrEv,
}
var ev map[string]interface{}
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if rcv := smaEv.V1TerminateSessionArgs(*cgrEv); !reflect.DeepEqual(exp, rcv) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rcv))
}
exp2 := &sessions.V1TerminateSessionArgs{
TerminateSession: true,
ReleaseResources: true,
ProcessStats: true,
CGREvent: cgrEv,
}
cgrEv.Event[utils.CGRFlags] = "*resources+*accounts+*stats"
if rcv := smaEv.V1TerminateSessionArgs(*cgrEv); !reflect.DeepEqual(exp2, rcv) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp2), utils.ToJSON(rcv))
}
}
func TestRequestType(t *testing.T) {
var ev map[string]interface{}
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {

View File

@@ -391,17 +391,9 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor,
cgrEv, rply)
agReq.setCGRReply(rply, err)
case utils.MetaTerminate:
terminateArgs := sessions.NewV1TerminateSessionArgs(
reqProcessor.Flags.Has(utils.MetaAccounts),
reqProcessor.Flags.GetBool(utils.MetaResources),
reqProcessor.Flags.GetBool(utils.MetaThresholds),
reqProcessor.Flags.ParamsSlice(utils.MetaThresholds, utils.MetaIDs),
reqProcessor.Flags.GetBool(utils.MetaStats),
reqProcessor.Flags.ParamsSlice(utils.MetaStats, utils.MetaIDs),
cgrEv, reqProcessor.Flags.Has(utils.MetaFD))
var rply string
err = da.connMgr.Call(da.ctx, da.cgrCfg.DiameterAgentCfg().SessionSConns, utils.SessionSv1TerminateSession,
terminateArgs, &rply)
cgrEv, &rply)
agReq.setCGRReply(nil, err)
case utils.MetaMessage:
msgArgs := sessions.NewV1ProcessMessageArgs(

View File

@@ -306,7 +306,9 @@ func TestProcessRequest(t *testing.T) {
"ToR": "*voice",
"Usage": "10s",
},
APIOpts: make(map[string]interface{}),
APIOpts: map[string]interface{}{
utils.OptsSesTerminate: "true",
},
}
if !reflect.DeepEqual(expargs, arg) {
t.Errorf("Expected:%s ,received: %s", utils.ToJSON(expargs), utils.ToJSON(arg))
@@ -323,27 +325,25 @@ func TestProcessRequest(t *testing.T) {
var id string
if arg == nil {
t.Errorf("args is nil")
} else if rargs, can := arg.(*sessions.V1TerminateSessionArgs); !can {
} else if rargs, can := arg.(*utils.CGREvent); !can {
t.Errorf("args is not of sessions.V1TerminateSessionArgs type")
} else {
id = rargs.ID
}
expargs := &sessions.V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: id,
Event: map[string]interface{}{
"Account": "1001",
"Category": "call",
"Destination": "1003",
"OriginHost": "local",
"OriginID": "123456",
"ToR": "*voice",
"Usage": "10s",
},
APIOpts: map[string]interface{}{},
expargs := &utils.CGREvent{
Tenant: "cgrates.org",
ID: id,
Event: map[string]interface{}{
"Account": "1001",
"Category": "call",
"Destination": "1003",
"OriginHost": "local",
"OriginID": "123456",
"ToR": "*voice",
"Usage": "10s",
},
APIOpts: map[string]interface{}{
utils.OptsSesTerminate: "true",
},
}
if !reflect.DeepEqual(expargs, arg) {
@@ -506,12 +506,21 @@ func TestProcessRequest(t *testing.T) {
}
reqProcessor.Flags = utils.FlagsWithParamsFromSlice([]string{utils.MetaTerminate, utils.MetaAccounts, utils.MetaAttributes, utils.MetaCDRs})
tmpls = []*config.FCTemplate{
{Type: utils.MetaConstant, Path: utils.MetaOpts + utils.NestingSep + utils.OptsSesTerminate,
Value: config.NewRSRParsersMustCompile("true", utils.InfieldSep)},
}
for _, v := range tmpls {
v.ComputePath()
}
reqProcessor.ReplyFields = []*config.FCTemplate{{Tag: "ResultCode",
Type: utils.MetaConstant, Path: utils.MetaRep + utils.NestingSep + "ResultCode",
Value: config.NewRSRParsersMustCompile("2001", utils.InfieldSep)}}
for _, v := range reqProcessor.ReplyFields {
v.ComputePath()
}
clnReq = reqProcessor.Clone()
clnReq.RequestFields = append(clnReq.RequestFields, tmpls...)
cgrRplyNM = &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{}}
rply = utils.NewOrderedNavigableMap()
@@ -519,7 +528,7 @@ func TestProcessRequest(t *testing.T) {
reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant,
config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil)
pr, err = da.processRequest(reqProcessor, agReq)
pr, err = da.processRequest(clnReq, agReq)
if err != nil {
t.Error(err)
} else if !pr {

View File

@@ -252,18 +252,10 @@ func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor,
rply.SetMaxUsageNeeded(utils.OptAsBool(cgrEv.APIOpts, utils.OptsSesUpdate))
agReq.setCGRReply(rply, err)
case utils.MetaTerminate:
terminateArgs := sessions.NewV1TerminateSessionArgs(
reqProcessor.Flags.Has(utils.MetaAccounts),
reqProcessor.Flags.GetBool(utils.MetaResources),
reqProcessor.Flags.GetBool(utils.MetaThresholds),
reqProcessor.Flags.ParamsSlice(utils.MetaThresholds, utils.MetaIDs),
reqProcessor.Flags.GetBool(utils.MetaStats),
reqProcessor.Flags.ParamsSlice(utils.MetaStats, utils.MetaIDs),
cgrEv, reqProcessor.Flags.Has(utils.MetaFD))
var rply string
err = da.connMgr.Call(context.TODO(), da.cgrCfg.DNSAgentCfg().SessionSConns,
utils.SessionSv1TerminateSession,
terminateArgs, &rply)
cgrEv, &rply)
agReq.setCGRReply(nil, err)
case utils.MetaMessage:
evArgs := sessions.NewV1ProcessMessageArgs(

View File

@@ -268,10 +268,13 @@ func (fsa *FSsessions) onChannelHangupComplete(fsev FSEvent, connIdx int) {
var reply string
fsev[VarCGROriginHost] = utils.FirstNonEmpty(fsev[VarCGROriginHost], fsa.cfg.EventSocketConns[connIdx].Alias) // rewrite the OriginHost variable if it is empty
if fsev[VarAnswerEpoch] != "0" { // call was answered
terminateSessionArgs := fsev.V1TerminateSessionArgs()
terminateSessionArgs.CGREvent.Event[FsConnID] = connIdx // Attach the connection ID in case we need to create a session and disconnect it
cgrEv := fsev.AsCGREvent(config.CgrConfig().GeneralCfg().DefaultTimezone)
if cgrEv.APIOpts == nil {
cgrEv.APIOpts = map[string]interface{}{utils.OptsSesTerminate: true}
}
cgrEv.Event[FsConnID] = connIdx // Attach the connection ID in case we need to create a session and disconnect it
if err := fsa.connMgr.Call(fsa.ctx, fsa.cfg.SessionSConns, utils.SessionSv1TerminateSession,
terminateSessionArgs, &reply); err != nil {
cgrEv, &reply); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> Could not terminate session with event %s, error: %s",
utils.FreeSWITCHAgent, fsev.GetUUID(), err.Error()))

View File

@@ -409,23 +409,6 @@ func (fsev FSEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) {
return
}
// V1TerminateSessionArgs returns the arguments used in SMGv1.TerminateSession
func (fsev FSEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionArgs) {
cgrEv := fsev.AsCGREvent(config.CgrConfig().GeneralCfg().DefaultTimezone)
args = &sessions.V1TerminateSessionArgs{ // defaults
CGREvent: cgrEv,
}
subsystems, has := fsev[VarCGRFlags]
if !has {
utils.Logger.Warning(fmt.Sprintf("<%s> cgr_flags variable is not set, using defaults",
utils.FreeSWITCHAgent))
args.TerminateSession = true
return
}
args.ParseFlags(subsystems, utils.InfieldSep)
return
}
// SliceAsFsArray Converts a slice of strings into a FS array string, contains len(array) at first index since FS does not support len(ARRAY::) for now
func SliceAsFsArray(slc []string) (arry string) {
if len(slc) == 0 {

View File

@@ -1040,26 +1040,3 @@ func TestFsEvV1AuthorizeArgs(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", expected.RoutesIgnoreErrors, rcv.RoutesIgnoreErrors)
}
}
func TestFsEvV1TerminateSessionArgs(t *testing.T) {
timezone := config.CgrConfig().GeneralCfg().DefaultTimezone
ev := NewFSEvent(hangupEv)
expected := &sessions.V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: &utils.CGREvent{
Tenant: ev.GetTenant(utils.MetaDefault),
ID: utils.UUIDSha1Prefix(),
Event: ev.AsMapStringInterface(timezone),
},
}
rcv := ev.V1TerminateSessionArgs()
if !reflect.DeepEqual(expected.CGREvent.Tenant, rcv.CGREvent.Tenant) {
t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Tenant, rcv.CGREvent.Tenant)
} else if !reflect.DeepEqual(expected.CGREvent.Event, rcv.CGREvent.Event) {
t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Event, rcv.CGREvent.Event)
} else if !reflect.DeepEqual(expected.CGREvent.Event, rcv.CGREvent.Event) {
t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Event, rcv.CGREvent.Event)
} else if !reflect.DeepEqual(expected.TerminateSession, rcv.TerminateSession) {
t.Errorf("Expecting: %+v, received: %+v", expected.TerminateSession, rcv.TerminateSession)
}
}

View File

@@ -182,17 +182,9 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor,
rply.SetMaxUsageNeeded(utils.OptAsBool(cgrEv.APIOpts, utils.OptsSesUpdate))
agReq.setCGRReply(rply, err)
case utils.MetaTerminate:
terminateArgs := sessions.NewV1TerminateSessionArgs(
reqProcessor.Flags.Has(utils.MetaAccounts),
reqProcessor.Flags.GetBool(utils.MetaResources),
reqProcessor.Flags.GetBool(utils.MetaThresholds),
reqProcessor.Flags.ParamsSlice(utils.MetaThresholds, utils.MetaIDs),
reqProcessor.Flags.GetBool(utils.MetaStats),
reqProcessor.Flags.ParamsSlice(utils.MetaStats, utils.MetaIDs),
cgrEv, reqProcessor.Flags.Has(utils.MetaFD))
var rply string
err = ha.connMgr.Call(context.TODO(), ha.sessionConns, utils.SessionSv1TerminateSession,
terminateArgs, &rply)
cgrEv, &rply)
agReq.setCGRReply(nil, err)
case utils.MetaMessage:
evArgs := sessions.NewV1ProcessMessageArgs(

View File

@@ -214,16 +214,14 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connIdx int) {
utils.KamailioAgent, kev[utils.OriginID]))
return
}
tsArgs := kev.V1TerminateSessionArgs()
if tsArgs == nil {
utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate terminate session arguments",
utils.KamailioAgent, kev[utils.OriginID]))
return
cgrEv := kev.AsCGREvent(config.CgrConfig().GeneralCfg().DefaultTimezone)
if cgrEv.APIOpts == nil {
cgrEv.APIOpts = map[string]interface{}{utils.OptsSesTerminate: true}
}
var reply string
tsArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID in case we need to create a session and disconnect it
cgrEv.Event[EvapiConnID] = connIdx // Attach the connection ID in case we need to create a session and disconnect it
if err := ka.connMgr.Call(ka.ctx, ka.cfg.SessionSConns, utils.SessionSv1TerminateSession,
tsArgs, &reply); err != nil {
cgrEv, &reply); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> could not terminate session with event %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
@@ -231,9 +229,9 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connIdx int) {
}
if ka.cfg.CreateCdr || strings.Index(kev[utils.CGRFlags], utils.MetaCDRs) != -1 {
if err := ka.connMgr.Call(ka.ctx, ka.cfg.SessionSConns, utils.SessionSv1ProcessCDR,
tsArgs.CGREvent, &reply); err != nil {
cgrEv, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("%s> failed processing CGREvent: %s, error: %s",
utils.KamailioAgent, utils.ToJSON(tsArgs.CGREvent), err.Error()))
utils.KamailioAgent, utils.ToJSON(cgrEv), err.Error()))
}
}
}

View File

@@ -310,23 +310,6 @@ func (kev KamEvent) AsKamProcessMessageEmptyReply() (kar *KamReply) {
return
}
// V1TerminateSessionArgs returns the arguments used in SMGv1.TerminateSession
func (kev KamEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionArgs) {
cgrEv := kev.AsCGREvent(utils.FirstNonEmpty(
config.CgrConfig().KamAgentCfg().Timezone,
config.CgrConfig().GeneralCfg().DefaultTimezone))
args = &sessions.V1TerminateSessionArgs{ // defaults
TerminateSession: true,
CGREvent: cgrEv,
}
subsystems, has := kev[utils.CGRFlags]
if !has {
return
}
args.ParseFlags(subsystems, utils.InfieldSep)
return
}
//KamReply will be used to send back to kamailio from
//Authrization,ProcessEvent and ProcessEvent empty (pingPong)
type KamReply struct {

View File

@@ -286,36 +286,6 @@ func TestKamEvAsKamAuthReply(t *testing.T) {
}
}
func TestKamEvV1TerminateSessionArgs(t *testing.T) {
kamEv := KamEvent{"event": "CGR_CALL_END",
"callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0",
"from_tag": "bf71ad59", "to_tag": "7351fecf",
"cgr_reqtype": utils.MetaPostpaid, "cgr_account": "1001",
"cgr_destination": "1002", "cgr_answertime": "1419839310",
"cgr_duration": "3", "cgr_pdd": "4",
utils.CGRRoute: "supplier2",
utils.CGRDisconnectCause: "200"}
expected := &sessions.V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: &utils.CGREvent{
Tenant: utils.FirstNonEmpty(kamEv[utils.Tenant],
config.CgrConfig().GeneralCfg().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: kamEv.AsMapStringInterface(),
},
}
rcv := kamEv.V1TerminateSessionArgs()
if !reflect.DeepEqual(expected.CGREvent.Tenant, rcv.CGREvent.Tenant) {
t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Tenant, rcv.CGREvent.Tenant)
} else if !reflect.DeepEqual(expected.CGREvent.Event, rcv.CGREvent.Event) {
t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Event, rcv.CGREvent.Event)
} else if !reflect.DeepEqual(expected.CGREvent.Event, rcv.CGREvent.Event) {
t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Event, rcv.CGREvent.Event)
} else if !reflect.DeepEqual(expected.TerminateSession, rcv.TerminateSession) {
t.Errorf("Expecting: %+v, received: %+v", expected.TerminateSession, rcv.TerminateSession)
}
}
func TestKamEvV1ProcessMessageArgs(t *testing.T) {
kamEv := KamEvent{"event": "CGR_PROCESS_MESSAGE",
"callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0",

View File

@@ -240,17 +240,9 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R
rply.SetMaxUsageNeeded(utils.OptAsBool(cgrEv.APIOpts, utils.OptsSesUpdate))
agReq.setCGRReply(rply, err)
case utils.MetaTerminate:
terminateArgs := sessions.NewV1TerminateSessionArgs(
reqProcessor.Flags.Has(utils.MetaAccounts),
reqProcessor.Flags.GetBool(utils.MetaResources),
reqProcessor.Flags.GetBool(utils.MetaThresholds),
reqProcessor.Flags.ParamsSlice(utils.MetaThresholds, utils.MetaIDs),
reqProcessor.Flags.GetBool(utils.MetaStats),
reqProcessor.Flags.ParamsSlice(utils.MetaStats, utils.MetaIDs),
cgrEv, reqProcessor.Flags.Has(utils.MetaFD))
var rply string
err = ra.connMgr.Call(context.TODO(), ra.cgrCfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1TerminateSession,
terminateArgs, &rply)
cgrEv, &rply)
agReq.setCGRReply(nil, err)
case utils.MetaMessage:
evArgs := sessions.NewV1ProcessMessageArgs(

View File

@@ -67,7 +67,7 @@ func (ssv1 *SessionSv1) SyncSessions(ctx *context.Context, args *utils.TenantWit
return ssv1.sS.BiRPCv1SyncSessions(ctx, &utils.TenantWithAPIOpts{}, rply)
}
func (ssv1 *SessionSv1) TerminateSession(ctx *context.Context, args *sessions.V1TerminateSessionArgs,
func (ssv1 *SessionSv1) TerminateSession(ctx *context.Context, args *utils.CGREvent,
rply *string) error {
return ssv1.sS.BiRPCv1TerminateSession(ctx, args, rply)
}

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package console
import (
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
@@ -27,7 +26,7 @@ func init() {
c := &CmdSessionsTerminate{
name: "session_terminate",
rpcMethod: utils.SessionSv1TerminateSession,
rpcParams: &sessions.V1TerminateSessionArgs{},
rpcParams: &utils.CGREvent{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
@@ -36,7 +35,7 @@ func init() {
type CmdSessionsTerminate struct {
name string
rpcMethod string
rpcParams *sessions.V1TerminateSessionArgs
rpcParams *utils.CGREvent
*CommandExecuter
}
@@ -50,9 +49,7 @@ func (self *CmdSessionsTerminate) RpcMethod() string {
func (self *CmdSessionsTerminate) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &sessions.V1TerminateSessionArgs{
CGREvent: new(utils.CGREvent),
}
self.rpcParams = new(utils.CGREvent)
}
return self.rpcParams
}

View File

@@ -113,16 +113,16 @@ func (dS *DispatcherService) SessionSv1SyncSessions(args *utils.TenantWithAPIOpt
}, utils.MetaSessionS, utils.SessionSv1SyncSessions, args, reply)
}
func (dS *DispatcherService) SessionSv1TerminateSession(args *sessions.V1TerminateSessionArgs,
func (dS *DispatcherService) SessionSv1TerminateSession(args *utils.CGREvent,
reply *string) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if err = dS.authorize(utils.SessionSv1TerminateSession, args.CGREvent.Tenant,
if err = dS.authorize(utils.SessionSv1TerminateSession, args.Tenant,
utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil {
return
}
}
return dS.Dispatch(context.TODO(), args.CGREvent, utils.MetaSessionS, utils.SessionSv1TerminateSession, args, reply)
return dS.Dispatch(context.TODO(), args, utils.MetaSessionS, utils.SessionSv1TerminateSession, args, reply)
}
func (dS *DispatcherService) SessionSv1ProcessCDR(args *utils.CGREvent,

View File

@@ -150,39 +150,6 @@ func TestDspSessionSv1SyncSessionsErrorNil(t *testing.T) {
}
}
func TestDspSessionSv1TerminateSessionNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
CGREvent := &sessions.V1TerminateSessionArgs{
CGREvent: &utils.CGREvent{
Tenant: "tenant",
},
}
var reply *string
result := dspSrv.SessionSv1TerminateSession(CGREvent, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspSessionSv1TerminateSessionErrorNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"}
CGREvent := &sessions.V1TerminateSessionArgs{
CGREvent: &utils.CGREvent{
Tenant: "tenant",
},
}
var reply *string
result := dspSrv.SessionSv1TerminateSession(CGREvent, reply)
expected := "MANDATORY_IE_MISSING: [ApiKey]"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspSessionSv1ProcessCDRNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)

View File

@@ -237,17 +237,9 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1UpdateSession,
cgrEv, rply)
case utils.MetaTerminate:
terminateArgs := sessions.NewV1TerminateSessionArgs(
rdrCfg.Flags.Has(utils.MetaAccounts),
rdrCfg.Flags.Has(utils.MetaResources),
rdrCfg.Flags.Has(utils.MetaThresholds),
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds, utils.MetaIDs),
rdrCfg.Flags.Has(utils.MetaStats),
rdrCfg.Flags.ParamsSlice(utils.MetaStats, utils.MetaIDs),
cgrEv, rdrCfg.Flags.Has(utils.MetaFD))
rply := utils.StringPointer("")
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1TerminateSession,
terminateArgs, rply)
cgrEv, rply)
case utils.MetaMessage:
evArgs := sessions.NewV1ProcessMessageArgs(
rdrCfg.Flags.Has(utils.MetaAttributes),

View File

@@ -780,59 +780,6 @@ func (v1Rply *V1UpdateSessionReply) AsNavigableMap() map[string]*utils.DataNode
return cgrReply
}
// NewV1TerminateSessionArgs creates a new V1TerminateSessionArgs using the given arguments
func NewV1TerminateSessionArgs(acnts, resrc,
thrds bool, thresholdIDs []string, stats bool,
statIDs []string, cgrEv *utils.CGREvent, forceDuration bool) (args *V1TerminateSessionArgs) {
args = &V1TerminateSessionArgs{
TerminateSession: acnts,
ReleaseResources: resrc,
ProcessThresholds: thrds,
ProcessStats: stats,
CGREvent: cgrEv,
ForceDuration: forceDuration,
}
if len(thresholdIDs) != 0 {
args.ThresholdIDs = thresholdIDs
}
if len(statIDs) != 0 {
args.StatIDs = statIDs
}
return
}
// V1TerminateSessionArgs is used as argumen for TerminateSession
type V1TerminateSessionArgs struct {
TerminateSession bool
ForceDuration bool
ReleaseResources bool
ProcessThresholds bool
ProcessStats bool
ThresholdIDs []string
StatIDs []string
*utils.CGREvent
}
// ParseFlags will populate the V1TerminateSessionArgs flags
func (args *V1TerminateSessionArgs) ParseFlags(flags, sep string) {
for _, subsystem := range strings.Split(flags, sep) {
switch {
case subsystem == utils.MetaAccounts:
args.TerminateSession = true
case subsystem == utils.MetaResources:
args.ReleaseResources = true
case strings.Index(subsystem, utils.MetaThresholds) != -1:
args.ProcessThresholds = true
args.ThresholdIDs = getFlagIDs(subsystem)
case strings.Index(subsystem, utils.MetaStats) != -1:
args.ProcessStats = true
args.StatIDs = getFlagIDs(subsystem)
case subsystem == utils.MetaFD:
args.ForceDuration = true
}
}
}
// ArgsReplicateSessions used to specify wich Session to replicate over the given connections
type ArgsReplicateSessions struct {
CGRID string

View File

@@ -1553,7 +1553,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(ctx *context.Context,
if !(args.GetAttributes || utils.OptAsBool(args.APIOpts, utils.OptsSesAttributeS) ||
args.GetMaxUsage || utils.OptAsBool(args.APIOpts, utils.OptsSesMaxUsage) ||
args.AuthorizeResources || utils.OptAsBool(args.APIOpts, utils.OptsSesResourceSAuth) ||
args.AuthorizeResources || utils.OptAsBool(args.APIOpts, utils.OptsSesResourceSAuthorize) ||
args.GetRoutes || utils.OptAsBool(args.APIOpts, utils.OptsSesRouteS)) {
return // Nothing to do
}
@@ -1594,7 +1594,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(ctx *context.Context,
authReply.MaxUsage = &maxUsage
}
if args.AuthorizeResources ||
utils.OptAsBool(args.APIOpts, utils.OptsSesResourceSAuth) {
utils.OptAsBool(args.APIOpts, utils.OptsSesResourceSAuthorize) {
if len(sS.cgrCfg.SessionSCfg().ResSConns) == 0 {
return utils.NewErrNotConnected(utils.ResourceS)
}
@@ -1683,7 +1683,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEventWithDigest(ctx *context.Context,
authReply.AttributesDigest = utils.StringPointer(initAuthRply.Attributes.Digest())
}
if args.AuthorizeResources ||
utils.OptAsBool(args.APIOpts, utils.OptsSesResourceSAuth) {
utils.OptAsBool(args.APIOpts, utils.OptsSesResourceSAuthorize) {
authReply.ResourceAllocation = initAuthRply.ResourceAllocation
}
if args.GetMaxUsage ||
@@ -1743,7 +1743,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(ctx *context.Context,
attrS := utils.OptAsBool(args.APIOpts, utils.OptsSesAttributeS)
initS := utils.OptAsBool(args.APIOpts, utils.OptsSesInitiate)
resS := utils.OptAsBool(args.APIOpts, utils.OptsSesResourceSAloc)
resS := utils.OptAsBool(args.APIOpts, utils.OptsSesResourceSAlocate)
if !(attrS || initS || resS) {
return // nothing to do
}
@@ -1970,20 +1970,20 @@ func (sS *SessionS) BiRPCv1UpdateSession(ctx *context.Context,
// BiRPCv1TerminateSession will stop debit loops as well as release any used resources
func (sS *SessionS) BiRPCv1TerminateSession(ctx *context.Context,
args *V1TerminateSessionArgs, rply *string) (err error) {
if args.CGREvent == nil {
args *utils.CGREvent, rply *string) (err error) {
if args == nil {
return utils.NewErrMandatoryIeMissing(utils.CGREventString)
}
var withErrors bool
if args.CGREvent.ID == "" {
args.CGREvent.ID = utils.GenUUID()
if args.ID == "" {
args.ID = utils.GenUUID()
}
if args.CGREvent.Tenant == "" {
args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
if args.Tenant == "" {
args.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
}
// RPC caching
if sS.cgrCfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1TerminateSession, args.CGREvent.ID)
cacheKey := utils.ConcatenatedKey(utils.SessionSv1TerminateSession, args.ID)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
@@ -2000,15 +2000,17 @@ func (sS *SessionS) BiRPCv1TerminateSession(ctx *context.Context,
nil, true, utils.NonTransactional)
}
// end of RPC caching
if !args.TerminateSession && !args.ReleaseResources {
attrS := utils.OptAsBool(args.APIOpts, utils.OptsSesAttributeS)
termS := utils.OptAsBool(args.APIOpts, utils.OptsSesTerminate)
if !(attrS || termS) {
return // nothing to do
}
ev := engine.MapEvent(args.CGREvent.Event)
ev := engine.MapEvent(args.Event)
opts := engine.MapEvent(args.APIOpts)
cgrID := GetSetCGRID(ev)
originID := ev.GetStringIgnoreErrors(utils.OriginID)
if args.TerminateSession {
if termS {
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
@@ -2033,8 +2035,8 @@ func (sS *SessionS) BiRPCv1TerminateSession(ctx *context.Context,
continue
}
isMsg = true
if s, err = sS.initSession(ctx, args.CGREvent, sS.biJClntID(ctx.Client), ev.GetStringIgnoreErrors(utils.OriginID),
dbtItvl, isMsg, args.ForceDuration); err != nil {
if s, err = sS.initSession(ctx, args, sS.biJClntID(ctx.Client), ev.GetStringIgnoreErrors(utils.OriginID),
dbtItvl, isMsg, utils.OptAsBool(args.APIOpts, utils.OptsSesForceDuration)); err != nil {
return err //utils.NewErrRALs(err)
}
if _, err = sS.updateSession(ctx, s, ev, opts, isMsg); err != nil {
@@ -2056,7 +2058,7 @@ func (sS *SessionS) BiRPCv1TerminateSession(ctx *context.Context,
return err //utils.NewErrRALs(err)
}
}
if args.ReleaseResources {
if utils.OptAsBool(args.APIOpts, utils.OptsSesResourceSRelease) {
if len(sS.cgrCfg.SessionSCfg().ResSConns) == 0 {
return utils.NewErrNotConnected(utils.ResourceS)
}
@@ -2065,7 +2067,7 @@ func (sS *SessionS) BiRPCv1TerminateSession(ctx *context.Context,
}
var reply string
argsRU := &utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
CGREvent: args,
UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired
Units: 1,
}
@@ -2074,23 +2076,31 @@ func (sS *SessionS) BiRPCv1TerminateSession(ctx *context.Context,
return utils.NewErrResourceS(err)
}
}
if args.ProcessThresholds {
_, err := sS.processThreshold(ctx, args.CGREvent, args.ThresholdIDs, true)
if utils.OptAsBool(args.APIOpts, utils.OptsSesThresholdS) {
var thIDs []string
if thIDs, err = utils.OptAsStringSlice(args.APIOpts, utils.OptsSesThresholdIDs); err != nil {
return
}
_, err := sS.processThreshold(ctx, args, thIDs, true)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.",
utils.SessionS, err.Error(), args.CGREvent))
utils.SessionS, err.Error(), args))
withErrors = true
}
}
if args.ProcessStats {
_, err := sS.processStats(ctx, args.CGREvent, args.StatIDs, false)
if utils.OptAsBool(args.APIOpts, utils.OptsSesStatS) {
var statIDs []string
if statIDs, err = utils.OptAsStringSlice(args.APIOpts, utils.OptsSesStatIDs); err != nil {
return
}
_, err := sS.processStats(ctx, args, statIDs, false)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with StatS.",
utils.SessionS, err.Error(), args.CGREvent))
utils.SessionS, err.Error(), args))
withErrors = true
}
}

View File

@@ -1031,49 +1031,6 @@ func TestV1AuthorizeArgsParseFlags1(t *testing.T) {
}
}
func TestSessionSNewV1TerminateSessionArgs(t *testing.T) {
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "Event",
Event: map[string]interface{}{
utils.AccountField: "1001",
utils.Destination: "1002",
},
}
expected := &V1TerminateSessionArgs{
TerminateSession: true,
ProcessThresholds: true,
CGREvent: cgrEv,
ForceDuration: true,
}
rply := NewV1TerminateSessionArgs(true, false, true, nil, false, nil, cgrEv, true)
if !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting %+v, received: %+v", expected, rply)
}
expected = &V1TerminateSessionArgs{
CGREvent: cgrEv,
ForceDuration: true,
}
rply = NewV1TerminateSessionArgs(false, false, false, nil, false, nil, cgrEv, true)
if !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting %+v, received: %+v", expected, rply)
}
//test with len(thresholdIDs) != 0 && len(StatIDs) != 0
thresholdIDs := []string{"ID1", "ID2"}
statIDs := []string{"test1", "test2"}
expected = &V1TerminateSessionArgs{
CGREvent: cgrEv,
ThresholdIDs: []string{"ID1", "ID2"},
StatIDs: []string{"test1", "test2"},
ForceDuration: true,
}
rply = NewV1TerminateSessionArgs(false, false, false, thresholdIDs, false, statIDs, cgrEv, true)
if !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting %+v, received: %+v", expected, rply)
}
}
func TestSessionSNewV1ProcessMessageArgs(t *testing.T) {
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
@@ -1861,50 +1818,6 @@ func TestV1InitSessionArgsParseFlags(t *testing.T) {
}
*/
func TestV1TerminateSessionArgsParseFlags(t *testing.T) {
v1TerminateSsArgs := new(V1TerminateSessionArgs)
eOut := new(V1TerminateSessionArgs)
//empty check
strArg := ""
v1TerminateSsArgs.ParseFlags(strArg, utils.InfieldSep)
if !reflect.DeepEqual(eOut, v1TerminateSsArgs) {
t.Errorf("Expecting %+v,\n received: %+v", eOut, v1TerminateSsArgs)
}
//normal check -> without *dispatchers
eOut = &V1TerminateSessionArgs{
TerminateSession: true,
ReleaseResources: true,
ProcessThresholds: true,
ThresholdIDs: []string{"tr1", "tr2", "tr3"},
ProcessStats: true,
StatIDs: []string{"st1", "st2", "st3"},
ForceDuration: true,
}
strArg = "*accounts;*resources;*routes;*thresholds:tr1&tr2&tr3;*stats:st1&st2&st3;*fd"
v1TerminateSsArgs.ParseFlags(strArg, utils.InfieldSep)
if !reflect.DeepEqual(eOut, v1TerminateSsArgs) {
t.Errorf("Expecting %+v,\n received: %+v\n", utils.ToJSON(eOut), utils.ToJSON(v1TerminateSsArgs))
}
// //normal check -> with *dispatchers
eOut = &V1TerminateSessionArgs{
TerminateSession: true,
ReleaseResources: true,
ProcessThresholds: true,
ThresholdIDs: []string{"tr1", "tr2", "tr3"},
ProcessStats: true,
StatIDs: []string{"st1", "st2", "st3"},
ForceDuration: true,
}
strArg = "*accounts;*resources;;*dispatchers;*thresholds:tr1&tr2&tr3;*stats:st1&st2&st3;*fd"
v1TerminateSsArgs.ParseFlags(strArg, utils.InfieldSep)
if !reflect.DeepEqual(eOut, v1TerminateSsArgs) {
t.Errorf("Expecting %+v,\n received: %+v\n", utils.ToJSON(eOut), utils.ToJSON(v1TerminateSsArgs))
}
}
func TestV1ProcessMessageArgsParseFlags(t *testing.T) {
v1ProcessMsgArgs := new(V1ProcessMessageArgs)
v1ProcessMsgArgs.CGREvent = new(utils.CGREvent)

View File

@@ -2108,8 +2108,9 @@ const (
OptsSesCDRsDerivedReply = "*sesCDRsDerivedReply"
OptsSesChargerS = "*sesChargerS"
OptsSesResourceS = "*sesResourceS"
OptsSesResourceSAuth = "*sesResourceSAuth"
OptsSesResourceSAloc = "*sesResourceSAloc"
OptsSesResourceSAuthorize = "*sesResourceSAuthorize"
OptsSesResourceSAlocate = "*sesResourceSAlocate"
OptsSesResourceSRelease = "*sesResourceSRelease"
OptsSesResourceSDerivedReply = "*sesResourceSDerivedReply"
OptsSesRouteS = "*sesRouteS"
OptsSesRouteSDerivedReply = "*sesRouteSDerivedReply"
@@ -2128,6 +2129,7 @@ const (
OptsSesForceDuration = "*sesForceDuration"
OptsSesInitiate = "*sesInitiate"
OptsSesUpdate = "*sesUpdate"
OptsSesTerminate = "*sesTerminate"
OptsCDRsAttributeS = "*cdrsAttributeS"
OptsCDRsChargerS = "*cdrsChargerS"