Correctly send SessionSv1.ProcessEvent from Agents to SessionS

This commit is contained in:
TeoV
2019-07-23 11:20:06 +03:00
committed by Dan Christian Bogos
parent b9c4be142f
commit 59c0258750
8 changed files with 152 additions and 152 deletions

View File

@@ -261,7 +261,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor,
utils.MetaDryRun, utils.MetaAuth,
utils.MetaInitiate, utils.MetaUpdate,
utils.MetaTerminate, utils.MetaMessage,
utils.MetaCDRs, utils.META_NONE} {
utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} {
if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags
reqType = typ
break

View File

@@ -163,7 +163,7 @@ func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor,
utils.MetaDryRun, utils.MetaAuth,
utils.MetaInitiate, utils.MetaUpdate,
utils.MetaTerminate, utils.MetaMessage,
utils.MetaCDRs, utils.META_NONE} {
utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} {
if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags
reqType = typ
break

View File

@@ -111,7 +111,7 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor,
utils.MetaDryRun, utils.MetaAuth,
utils.MetaInitiate, utils.MetaUpdate,
utils.MetaTerminate, utils.MetaMessage,
utils.MetaCDRs} {
utils.MetaCDRs, utils.MetaEvent, utils.MetaEmpty} {
if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags
reqType = typ
break
@@ -126,6 +126,7 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor,
switch reqType {
default:
return false, fmt.Errorf("unknown request type: <%s>", reqType)
case utils.META_NONE: // do nothing on CGRateS side
case utils.MetaDryRun:
utils.Logger.Info(
fmt.Sprintf("<%s> DRY_RUN, processorID: %s, CGREvent: %s",

View File

@@ -157,7 +157,7 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RequestProcessor,
utils.MetaDryRun, utils.MetaAuth,
utils.MetaInitiate, utils.MetaUpdate,
utils.MetaTerminate, utils.MetaMessage,
utils.MetaCDRs} {
utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} {
if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags
reqType = typ
break
@@ -172,6 +172,7 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RequestProcessor,
switch reqType {
default:
return false, fmt.Errorf("unknown request type: <%s>", reqType)
case utils.META_NONE: // do nothing on CGRateS side
case utils.MetaDryRun:
utils.Logger.Info(
fmt.Sprintf("<%s> DRY_RUN, processorID: %s, CGREvent: %s",

View File

@@ -83,7 +83,7 @@ func TestSSv1ItProcessEventWithPseudoPrepaid(t *testing.T) {
func testSSv1ItProcessEventAuth(t *testing.T) {
authUsage := 5 * time.Minute
args := &sessions.V1ProcessEventArgs{
Flags: []string{"*resources:*authorize", "*auth", "*suppliers", "*attributes"},
Flags: []string{"*resources:*authorize", "*rals:*auth", "*suppliers", "*attributes"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testSSv1ItProcessEventAuth",
@@ -107,8 +107,8 @@ func testSSv1ItProcessEventAuth(t *testing.T) {
if *rply.MaxUsage != authUsage {
t.Errorf("Unexpected MaxUsage: %v", rply.MaxUsage)
}
if *rply.ResourceAuthorization == "" {
t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAuthorization)
if *rply.ResourceMessage == "" {
t.Errorf("Unexpected ResourceMessage: %s", *rply.ResourceMessage)
}
eSplrs := &engine.SortedSuppliers{
ProfileID: "SPL_ACNT_1001",
@@ -162,7 +162,7 @@ func testSSv1ItProcessEventAuth(t *testing.T) {
func testSSv1ItProcessEventInitiateSession(t *testing.T) {
initUsage := 5 * time.Minute
args := &sessions.V1ProcessEventArgs{
Flags: []string{utils.MetaInitiate, "*resources:*allocate", "*attributes"},
Flags: []string{"*rals:*init", "*resources:*allocate", "*attributes"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testSSv1ItProcessEventInitiateSession",
@@ -193,8 +193,8 @@ func testSSv1ItProcessEventInitiateSession(t *testing.T) {
sSV1RequestType == utils.META_RATED) && *rply.MaxUsage != -1) {
t.Errorf("Unexpected MaxUsage: %v", rply.MaxUsage)
}
if *rply.ResourceAllocation != "RES_ACNT_1001" {
t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAllocation)
if *rply.ResourceMessage != "RES_ACNT_1001" {
t.Errorf("Unexpected ResourceMessage: %s", *rply.ResourceMessage)
}
eAttrs := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_ACNT_1001"},
@@ -232,7 +232,7 @@ func testSSv1ItProcessEventInitiateSession(t *testing.T) {
func testSSv1ItProcessEventUpdateSession(t *testing.T) {
reqUsage := 5 * time.Minute
args := &sessions.V1ProcessEventArgs{
Flags: []string{utils.MetaUpdate, "*attributes"},
Flags: []string{"*rals:*update", "*attributes"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testSSv1ItProcessEventUpdateSession",
@@ -299,7 +299,7 @@ func testSSv1ItProcessEventUpdateSession(t *testing.T) {
func testSSv1ItProcessEventTerminateSession(t *testing.T) {
args := &sessions.V1ProcessEventArgs{
Flags: []string{utils.MetaTerminate, "*resources:*release"},
Flags: []string{"*rals:*terminate", "*resources:*release"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testSSv1ItProcessEventTerminateSession",

View File

@@ -852,7 +852,7 @@ func testDspSessionForceDisconect(t *testing.T) {
func testDspSessionProcessEvent3(t *testing.T) {
args := &sessions.V1ProcessEventArgs{
Flags: []string{utils.MetaTerminate, "*resources:*release"},
Flags: []string{"*rals:*terminate", "*resources:*release"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testSSv1ItProcessEventTerminateSession",

View File

@@ -2868,14 +2868,12 @@ type V1ProcessEventArgs struct {
// V1ProcessEventReply is the reply for the ProcessEvent API
type V1ProcessEventReply struct {
MaxUsage *time.Duration
ResourceAuthorization *string
ResourceAllocation *string
ResourceRelease *string
Attributes *engine.AttrSProcessEventReply
Suppliers *engine.SortedSuppliers
ThresholdIDs *[]string
StatQueueIDs *[]string
MaxUsage *time.Duration
ResourceMessage *string
Attributes *engine.AttrSProcessEventReply
Suppliers *engine.SortedSuppliers
ThresholdIDs *[]string
StatQueueIDs *[]string
}
// AsNavigableMap is part of engine.NavigableMapper interface
@@ -2886,14 +2884,8 @@ func (v1Rply *V1ProcessEventReply) AsNavigableMap(
if v1Rply.MaxUsage != nil {
cgrReply[utils.CapMaxUsage] = *v1Rply.MaxUsage
}
if v1Rply.ResourceAuthorization != nil {
cgrReply[utils.CapResourceAuthorization] = *v1Rply.ResourceAuthorization
}
if v1Rply.ResourceAllocation != nil {
cgrReply[utils.CapResourceAllocation] = *v1Rply.ResourceAllocation
}
if v1Rply.ResourceRelease != nil {
cgrReply[utils.CapResourceRelease] = *v1Rply.ResourceRelease
if v1Rply.ResourceMessage != nil {
cgrReply[utils.CapResourceMessage] = *v1Rply.ResourceMessage
}
if v1Rply.Attributes != nil {
attrs := make(map[string]interface{})
@@ -3000,112 +2992,118 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
attrRU, &resMessage); err != nil {
return utils.NewErrResourceS(err)
}
rply.ResourceAuthorization = &resMessage
rply.ResourceMessage = &resMessage
}
if resourceFlagsWithParams.HasKey(utils.MetaAllocate) {
if err = sS.resS.Call(utils.ResourceSv1AllocateResources,
attrRU, &resMessage); err != nil {
return utils.NewErrResourceS(err)
}
rply.ResourceAllocation = &resMessage
rply.ResourceMessage = &resMessage
}
if resourceFlagsWithParams.HasKey(utils.MetaRelease) {
if err = sS.resS.Call(utils.ResourceSv1ReleaseResources,
attrRU, &resMessage); err != nil {
return utils.NewErrResourceS(err)
}
rply.ResourceRelease = &resMessage
rply.ResourceMessage = &resMessage
}
}
}
//check for *auth/*init/*update/*terminate flags
//only one of them can be executed
switch {
//check for auth session
case argsFlagsWithParams.HasKey(utils.MetaAuth):
maxUsage, err := sS.authSession(args.CGREvent.Tenant,
engine.NewSafEvent(args.CGREvent.Event))
if err != nil {
return utils.NewErrRALs(err)
}
rply.MaxUsage = &maxUsage
// check for init session
case argsFlagsWithParams.HasKey(utils.MetaInitiate):
if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}
s, err := sS.initSession(args.CGREvent.Tenant, ev,
sS.biJClntID(clnt), originID, dbtItvl, args.ArgDispatcher)
if err != nil {
return utils.NewErrRALs(err)
}
if dbtItvl > 0 { //active debit
rply.MaxUsage = utils.DurationPointer(time.Duration(-1))
} else {
if maxUsage, err := sS.updateSession(s, nil); err != nil {
return utils.NewErrRALs(err)
} else {
// check what we need to do for RALs (*auth/*init/*update/*terminate)
if argsFlagsWithParams.HasKey(utils.MetaRALs) {
if ralsOpts := argsFlagsWithParams.ParamsSlice(utils.MetaRALs); len(ralsOpts) != 0 {
//check for subflags and convert them into utils.FlagsWithParams
ralsFlagsWithParams, err := utils.FlagsWithParamsFromSlice(ralsOpts)
//for the moment only the the flag will be executed
switch {
//check for auth session
case ralsFlagsWithParams.HasKey(utils.MetaAuth):
maxUsage, err := sS.authSession(args.CGREvent.Tenant,
engine.NewSafEvent(args.CGREvent.Event))
if err != nil {
return utils.NewErrRALs(err)
}
rply.MaxUsage = &maxUsage
// check for init session
case ralsFlagsWithParams.HasKey(utils.MetaInit):
if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}
s, err := sS.initSession(args.CGREvent.Tenant, ev,
sS.biJClntID(clnt), originID, dbtItvl, args.ArgDispatcher)
if err != nil {
return utils.NewErrRALs(err)
}
if dbtItvl > 0 { //active debit
rply.MaxUsage = utils.DurationPointer(time.Duration(-1))
} else {
if maxUsage, err := sS.updateSession(s, nil); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
}
}
//check for update session
case ralsFlagsWithParams.HasKey(utils.MetaUpdate):
if me.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = me.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}
ev := engine.NewSafEvent(args.CGREvent.Event)
cgrID := GetSetCGRID(ev)
ss := sS.getRelocateSessions(cgrID,
me.GetStringIgnoreErrors(utils.InitialOriginID),
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost))
var s *Session
if len(ss) == 0 {
if s, err = sS.initSession(args.CGREvent.Tenant,
ev, sS.biJClntID(clnt),
me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil {
return utils.NewErrRALs(err)
}
} else {
s = ss[0]
}
if maxUsage, err := sS.updateSession(s, ev.AsMapInterface()); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
}
// check for terminate session
case ralsFlagsWithParams.HasKey(utils.MetaTerminate):
if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}
cgrID := GetSetCGRID(ev)
ss := sS.getRelocateSessions(cgrID,
me.GetStringIgnoreErrors(utils.InitialOriginID),
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost))
var s *Session
if len(ss) == 0 {
if s, err = sS.initSession(args.CGREvent.Tenant,
ev, sS.biJClntID(clnt),
me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil {
return utils.NewErrRALs(err)
}
} else {
s = ss[0]
}
if err = sS.endSession(s,
me.GetDurationPtrIgnoreErrors(utils.Usage),
me.GetDurationPtrIgnoreErrors(utils.LastUsed),
utils.TimePointer(me.GetTimeIgnoreErrors(utils.AnswerTime, utils.EmptyString))); err != nil {
return utils.NewErrRALs(err)
}
}
}
//check for update session
case argsFlagsWithParams.HasKey(utils.MetaUpdate):
if me.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = me.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}
ev := engine.NewSafEvent(args.CGREvent.Event)
cgrID := GetSetCGRID(ev)
ss := sS.getRelocateSessions(cgrID,
me.GetStringIgnoreErrors(utils.InitialOriginID),
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost))
var s *Session
if len(ss) == 0 {
if s, err = sS.initSession(args.CGREvent.Tenant,
ev, sS.biJClntID(clnt),
me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil {
return utils.NewErrRALs(err)
}
} else {
s = ss[0]
}
if maxUsage, err := sS.updateSession(s, ev.AsMapInterface()); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
}
// check for terminate session
case argsFlagsWithParams.HasKey(utils.MetaTerminate):
if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}
cgrID := GetSetCGRID(ev)
ss := sS.getRelocateSessions(cgrID,
me.GetStringIgnoreErrors(utils.InitialOriginID),
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost))
var s *Session
if len(ss) == 0 {
if s, err = sS.initSession(args.CGREvent.Tenant,
ev, sS.biJClntID(clnt),
me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil {
return utils.NewErrRALs(err)
}
} else {
s = ss[0]
}
if err = sS.endSession(s,
me.GetDurationPtrIgnoreErrors(utils.Usage),
me.GetDurationPtrIgnoreErrors(utils.LastUsed),
utils.TimePointer(me.GetTimeIgnoreErrors(utils.AnswerTime, utils.EmptyString))); err != nil {
return utils.NewErrRALs(err)
}
}
// get suppliers if required
if argsFlagsWithParams.HasKey(utils.MetaSuppliers) {

View File

@@ -614,6 +614,7 @@ const (
MetaRelease = "*release"
MetaAllocate = "*allocate"
MetaAuthorize = "*authorize"
MetaInit = "*init"
)
// Migrator Action
@@ -669,41 +670,40 @@ const (
// Migrator Metas
const (
MetaSetVersions = "*set_versions"
MetaEnsureIndexes = "*ensure_indexes"
MetaTpRatingPlans = "*tp_rating_plans"
MetaTpFilters = "*tp_filters"
MetaTpDestinationRates = "*tp_destination_rates"
MetaTpActionTriggers = "*tp_action_triggers"
MetaTpAccountActions = "*tp_account_actions"
MetaTpActionPlans = "*tp_action_plans"
MetaTpActions = "*tp_actions"
MetaTpThresholds = "*tp_thresholds"
MetaTpSuppliers = "*tp_suppliers"
MetaTpStats = "*tp_stats"
MetaTpSharedGroups = "*tp_shared_groups"
MetaTpRatingProfiles = "*tp_rating_profiles"
MetaTpResources = "*tp_resources"
MetaTpRates = "*tp_rates"
MetaTpTimings = "*tp_timings"
MetaTpResource = "*tp_resources"
MetaTpCdrStats = "*tp_cdrstats"
MetaTpDestinations = "*tp_destinations"
MetaTpRatingPlan = "*tp_rating_plans"
MetaTpRatingProfile = "*tp_rating_profiles"
MetaTpChargers = "*tp_chargers"
MetaTpDispatchers = "*tp_dispatchers"
MetaDurationSeconds = "*duration_seconds"
MetaDurationNanoseconds = "*duration_nanoseconds"
CapAttributes = "Attributes"
CapResourceAllocation = "ResourceAllocation"
CapResourceAuthorization = "ResourceAuthorization"
CapResourceRelease = "ResourceRelease"
CapMaxUsage = "MaxUsage"
CapSuppliers = "Suppliers"
CapThresholdHits = "ThresholdHits"
CapThresholds = "Thresholds"
CapStatQueues = "StatQueues"
MetaSetVersions = "*set_versions"
MetaEnsureIndexes = "*ensure_indexes"
MetaTpRatingPlans = "*tp_rating_plans"
MetaTpFilters = "*tp_filters"
MetaTpDestinationRates = "*tp_destination_rates"
MetaTpActionTriggers = "*tp_action_triggers"
MetaTpAccountActions = "*tp_account_actions"
MetaTpActionPlans = "*tp_action_plans"
MetaTpActions = "*tp_actions"
MetaTpThresholds = "*tp_thresholds"
MetaTpSuppliers = "*tp_suppliers"
MetaTpStats = "*tp_stats"
MetaTpSharedGroups = "*tp_shared_groups"
MetaTpRatingProfiles = "*tp_rating_profiles"
MetaTpResources = "*tp_resources"
MetaTpRates = "*tp_rates"
MetaTpTimings = "*tp_timings"
MetaTpResource = "*tp_resources"
MetaTpCdrStats = "*tp_cdrstats"
MetaTpDestinations = "*tp_destinations"
MetaTpRatingPlan = "*tp_rating_plans"
MetaTpRatingProfile = "*tp_rating_profiles"
MetaTpChargers = "*tp_chargers"
MetaTpDispatchers = "*tp_dispatchers"
MetaDurationSeconds = "*duration_seconds"
MetaDurationNanoseconds = "*duration_nanoseconds"
CapAttributes = "Attributes"
CapResourceMessage = "ResourceMessage"
CapResourceAllocation = "ResourceAllocation"
CapMaxUsage = "MaxUsage"
CapSuppliers = "Suppliers"
CapThresholdHits = "ThresholdHits"
CapThresholds = "Thresholds"
CapStatQueues = "StatQueues"
)
const (