diff --git a/agents/dnsagent.go b/agents/dnsagent.go index 0eaad2708..ad30cba6f 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -92,7 +92,6 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) { } reqVars[E164Address] = e164 reqVars[DomainName] = domainNameFromNAPTR(req.Question[0].Name) - } cgrRplyNM := config.NewNavigableMap(nil) rplyNM := config.NewNavigableMap(nil) // share it among different processors diff --git a/data/conf/samples/dnsagent/attributes.json b/data/conf/samples/dnsagent/attributes.json index 4a7629a6c..78f9449e1 100644 --- a/data/conf/samples/dnsagent/attributes.json +++ b/data/conf/samples/dnsagent/attributes.json @@ -21,7 +21,7 @@ "type": "*constant", "value": "U"}, {"tag": "NAPTRService", "field_id": "Service", "type": "*constant", "value": "E2U+SIP"}, - {"tag": "NAPTRReplacement", "field_id": "Regexp", + {"tag": "NAPTRRegex", "field_id": "Regexp", "type": "*variable", "value": "~*cgrep.Attributes.NAPTRAddress"}, ], }, diff --git a/data/conf/samples/dnsagent/dryrun.json b/data/conf/samples/dnsagent/dryrun.json index ebdfb2c32..e485a7997 100644 --- a/data/conf/samples/dnsagent/dryrun.json +++ b/data/conf/samples/dnsagent/dryrun.json @@ -14,8 +14,8 @@ {"tag": "NAPTRPreference", "field_id": "Preference", "type": "*constant", "value": "10"}, {"tag": "NAPTRFlags", "field_id": "Flags", "type": "*constant", "value": "U"}, {"tag": "NAPTRService", "field_id": "Service", "type": "*constant", "value": "E2U+SIP"}, - {"tag": "NAPTRRegexp", "field_id": "Regexp", "type": "*constant", "value": "^.*$"}, - {"tag": "NAPTRReplacement", "field_id": "Replacement", "type": "*constant", "value": "sip:\\1@172.16.1.10."}, + {"tag": "NAPTRRegexp", "field_id": "Regexp", "type": "*constant", "value": "!^(.*)$!sip:\\1@172.16.1.10.!"}, + {"tag": "NAPTRReplacement", "field_id": "Replacement", "type": "*constant", "value": "."}, ], }, ], diff --git a/data/conf/samples/dnsagent/suppliers.json b/data/conf/samples/dnsagent/suppliers.json index ab4e007ba..abf662d3c 100644 --- a/data/conf/samples/dnsagent/suppliers.json +++ b/data/conf/samples/dnsagent/suppliers.json @@ -33,6 +33,8 @@ "type": "*constant", "value": "E2U+SIP"}, {"tag": "NAPTRRegexp", "field_id": "Regexp", "type": "*variable", "value": "~*cgrep.Suppliers.SortedSuppliers[0].SupplierParameters"}, + {"tag": "NAPTRReplacement", "field_id": "Replacement", + "type": "*constant", "value": "."}, ], "continue": true, }, @@ -53,6 +55,8 @@ "type": "*constant", "value": "E2U+SIP"}, {"tag": "NAPTRRegexp", "field_id": "Regexp", "type": "*variable", "value": "~*cgrep.Suppliers.SortedSuppliers[1].SupplierParameters"}, + {"tag": "NAPTRReplacement", "field_id": "Replacement", + "type": "*constant", "value": "."}, ], "continue": true, }, diff --git a/sessions/sessions.go b/sessions/sessions.go index 75921e554..ff03f4ddd 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -3009,7 +3009,298 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", utils.SessionS, err.Error(), args.CGREvent)) } - rply.ThresholdIDs = &statReply + rply.StatQueueIDs = &statReply + } + return nil +} + +// V2ProcessEventArgs are the options passed to ProcessEvent API +type V2ProcessEventArgs struct { + Flags []string + *utils.CGREvent + utils.Paginator + *utils.ArgDispatcher +} + +// V2ProcessEventReply is the reply for the ProcessEvent API +type V2ProcessEventReply struct { + MaxUsage *time.Duration + ResourceAuthorization *string + ResourceAllocation *string + Attributes *engine.AttrSProcessEventReply + Suppliers *engine.SortedSuppliers + ThresholdIDs *[]string + StatQueueIDs *[]string +} + +// BiRPCv1ProcessEvent processes one event with the right subsystems based on arguments received +func (sS *SessionS) BiRPCv2ProcessEvent(clnt rpcclient.RpcClientConnection, + args *V2ProcessEventArgs, rply *V2ProcessEventReply) (err error) { + if args.CGREvent.ID == "" { + args.CGREvent.ID = utils.GenUUID() + } + + // RPC caching + if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.SessionSv2ProcessEvent, args.CGREvent.ID) + refID := guardian.Guardian.GuardIDs("", + sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) + + if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *rply = *cachedResp.Result.(*V1ProcessEventReply) + } + return cachedResp.Error + } + defer engine.Cache.Set(utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: rply, Error: err}, + nil, true, utils.NonTransactional) + } + // end of RPC caching + + if args.CGREvent.Tenant == "" { + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant + } + me := engine.NewMapEvent(args.CGREvent.Event) + originID := me.GetStringIgnoreErrors(utils.OriginID) + + //convert from Flags []string to utils.FlagsWithParams + argsFlagsWithParams, err := utils.FlagsWithParamsFromSlice(args.Flags) + if err != nil { + return + } + + // check for *attribute + if argsFlagsWithParams.HasKey(utils.MetaAttributes) { + if sS.attrS == nil { + return utils.NewErrNotConnected(utils.AttributeS) + } + attrArgs := &engine.AttrArgsProcessEvent{ + Context: utils.StringPointer(utils.MetaSessionS), + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, + } + if attrIDs := argsFlagsWithParams.ParamsSlice(utils.MetaAttributes); len(attrIDs) != 0 { + attrArgs.AttributeIDs = attrIDs + } + var rplyEv engine.AttrSProcessEventReply + if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent, + attrArgs, &rplyEv); err == nil { + args.CGREvent = rplyEv.CGREvent + if tntIface, has := args.CGREvent.Event[utils.MetaTenant]; has { + // special case when we want to overwrite the tenant + args.CGREvent.Tenant = tntIface.(string) + delete(args.CGREvent.Event, utils.MetaTenant) + } + rply.Attributes = &rplyEv + } else if err.Error() != utils.ErrNotFound.Error() { + return utils.NewErrAttributeS(err) + } + } + //check for auth session + if argsFlagsWithParams.HasKey(utils.MetaAuth) { + maxUsage, err := sS.authSession(args.CGREvent.Tenant, + engine.NewSafEvent(args.CGREvent.Event)) + if err != nil { + return utils.NewErrRALs(err) + } + rply.MaxUsage = &maxUsage + } + // check for *resources + if argsFlagsWithParams.HasKey(utils.MetaResources) { + if sS.resS == nil { + return utils.NewErrNotConnected(utils.ResourceS) + } + if originID == "" { + return utils.NewErrMandatoryIeMissing(utils.OriginID) + } + attrRU := utils.ArgRSv1ResourceUsage{ + CGREvent: args.CGREvent, + UsageID: originID, + Units: 1, + ArgDispatcher: args.ArgDispatcher, + } + var resMessage string + // check what we need to do for resources (*authorization/*allocation) + if resOpt := argsFlagsWithParams.ParamsSlice(utils.MetaResources); len(resOpt) != 0 { + if resOpt[0] == utils.MetaAuthorize { + if err = sS.resS.Call(utils.ResourceSv1AuthorizeResources, + attrRU, &resMessage); err != nil { + return utils.NewErrResourceS(err) + } + rply.ResourceAuthorization = &resMessage + } + if resOpt[0] == utils.MetaAllocate { + if err = sS.resS.Call(utils.ResourceSv1AllocateResources, + attrRU, &resMessage); err != nil { + return utils.NewErrResourceS(err) + } + rply.ResourceAllocation = &resMessage + } + + } + } + // check for init session + if argsFlagsWithParams.HasKey(utils.MetaInitiate) { + var err error + ev := engine.NewSafEvent(args.CGREvent.Event) + dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval + if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval + if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil { + return utils.NewErrRALs(err) + } + } + s, err := sS.initSession(args.CGREvent.Tenant, ev, + sS.biJClntID(clnt), originID, dbtItvl, args.ArgDispatcher) + if err != nil { + return utils.NewErrRALs(err) + } + if dbtItvl > 0 { //active debit + rply.MaxUsage = utils.DurationPointer(time.Duration(-1)) + } else { + if maxUsage, err := sS.updateSession(s, nil); err != nil { + return utils.NewErrRALs(err) + } else { + rply.MaxUsage = &maxUsage + } + } + } + // check for terminate session + if argsFlagsWithParams.HasKey(utils.MetaTerminate) { + if originID == "" { + return utils.NewErrMandatoryIeMissing(utils.OriginID) + } + dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval + if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval + if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil { + return utils.NewErrRALs(err) + } + } + ss := sS.getRelocateSessions(cgrID, + me.GetStringIgnoreErrors(utils.InitialOriginID), + me.GetStringIgnoreErrors(utils.OriginID), + me.GetStringIgnoreErrors(utils.OriginHost)) + var s *Session + if len(ss) == 0 { + if s, err = sS.initSession(args.CGREvent.Tenant, + ev, sS.biJClntID(clnt), + me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil { + return utils.NewErrRALs(err) + } + } else { + s = ss[0] + } + if err = sS.endSession(s, + me.GetDurationPtrIgnoreErrors(utils.Usage), + me.GetDurationPtrIgnoreErrors(utils.LastUsed), + utils.TimePointer(me.GetTimeIgnoreErrors(utils.AnswerTime, utils.EmptyString))); err != nil { + return utils.NewErrRALs(err) + } + } + // in case we need to release a resource we do this after we close the session + if argsFlagsWithParams.HasKey(utils.MetaResources) { + if resOpt := argsFlagsWithParams.ParamsSlice(utils.MetaResources); len(resOpt) != 0 && + resOpt[0] == utils.MetaRelease { + if sS.resS == nil { + return utils.NewErrNotConnected(utils.ResourceS) + } + if originID == "" { + return utils.NewErrMandatoryIeMissing(utils.OriginID) + } + var reply string + argsRU := utils.ArgRSv1ResourceUsage{ + CGREvent: args.CGREvent, + UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired + Units: 1, + ArgDispatcher: args.ArgDispatcher, + } + if err = sS.resS.Call(utils.ResourceSv1ReleaseResources, + argsRU, &reply); err != nil { + return utils.NewErrResourceS(err) + } + } + } + // get suppliers if required + if argsFlagsWithParams.HasKey(utils.MetaSuppliers) { + if sS.splS == nil { + return utils.NewErrNotConnected(utils.SupplierS) + } + cgrEv := args.CGREvent.Clone() + if acd, has := cgrEv.Event[utils.ACD]; has { + cgrEv.Event[utils.Usage] = acd + } + var splsReply engine.SortedSuppliers + sArgs := &engine.ArgsGetSuppliers{ + CGREvent: cgrEv, + Paginator: args.Paginator, + ArgDispatcher: args.ArgDispatcher, + } + // check in case we have options for suppliers + if splOpts := argsFlagsWithParams.ParamsSlice(utils.MetaSuppliers); len(splOpts) != 0 { + for _, splOpt := range splOpts { + if splOpt == utils.MetaIgnoreErrors { + sArgs.IgnoreErrors = true + } + if splOpt == utils.MetaEventCost { + sArgs.MaxCost = utils.MetaSuppliersEventCost + } + } + } + if err = sS.splS.Call(utils.SupplierSv1GetSuppliers, + sArgs, &splsReply); err != nil { + return utils.NewErrSupplierS(err) + } + if splsReply.SortedSuppliers != nil { + rply.Suppliers = &splsReply + } + } + // process thresholds if required + if argsFlagsWithParams.HasKey(utils.MetaThresholds) { + if sS.thdS == nil { + return utils.NewErrNotConnected(utils.ThresholdS) + } + var tIDs []string + thEv := &engine.ArgsProcessEvent{ + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, + } + // check in case we have ThresholdIDs inside flags + if thIDs := argsFlagsWithParams.ParamsSlice(utils.MetaThresholds); len(thIDs) != 0 { + thEv.ThresholdIDs = thIDs + } + if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, + thEv, &tIDs); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", + utils.SessionS, err.Error(), thEv)) + } + rply.ThresholdIDs = &tIDs + } + // process stats if required + if argsFlagsWithParams.HasKey(utils.MetaStats) { + if sS.statS == nil { + return utils.NewErrNotConnected(utils.StatS) + } + var statReply []string + statArgs := &engine.StatsArgsProcessEvent{ + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, + } + // check in case we have StatIDs inside flags + if stsIDs := argsFlagsWithParams.ParamsSlice(utils.MetaStats); len(stsIDs) != 0 { + statArgs.StatIDs = stsIDs + } + if err := sS.statS.Call(utils.StatSv1ProcessEvent, + statArgs, &statReply); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", + utils.SessionS, err.Error(), args.CGREvent)) + } + rply.StatQueueIDs = &statReply } return nil } diff --git a/utils/consts.go b/utils/consts.go index 013117da4..8c2540034 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -609,6 +609,9 @@ const ( CGREventString = "CGREvent" MetaPing = "*ping" MetaTextPlain = "*text_plain" + MetaIgnoreErrors = "*ignore_errors" + MetaRelease = "*release" + MetaAllocate = "*allocate" ) // Migrator Action diff --git a/utils/map.go b/utils/map.go index c4e6c514d..68d471972 100644 --- a/utils/map.go +++ b/utils/map.go @@ -19,6 +19,7 @@ along with this program. If not, see package utils import ( + "reflect" "strconv" "strings" ) @@ -275,3 +276,15 @@ func (fWp FlagsWithParams) ParamsSlice(subs string) (ps []string) { } return ps } + +//func to convert from FlagsWithParams back to []string +func (fWp FlagsWithParams) SliceFlags() (sls []string) { + for key, _ := range fWp { + if prmSlice := fWp.ParamsSlice(key); !reflect.DeepEqual(prmSlice, []string{}) { + sls = append(sls, ConcatenatedKey(key, strings.Join(prmSlice, INFIELD_SEP))) + } else { + sls = append(sls, key) + } + } + return +} diff --git a/utils/map_test.go b/utils/map_test.go index b925c087e..65d8bf14f 100644 --- a/utils/map_test.go +++ b/utils/map_test.go @@ -19,6 +19,7 @@ package utils import ( "reflect" + "sort" "testing" ) @@ -161,7 +162,6 @@ func TestMapSubsystemIDsHasKey(t *testing.T) { if has := mp.HasKey("*resources"); has { t.Errorf("Expecting: false, received: %+v", has) } - } func TestMapSubsystemIDsGetIDs(t *testing.T) { @@ -190,5 +190,26 @@ func TestMapSubsystemIDsGetIDs(t *testing.T) { if ids := mp.ParamsSlice("*test"); !reflect.DeepEqual(ids, eIDs) { t.Errorf("Expecting: %+v, received: %+v", eIDs, ids) } - +} + +func TestFlagsToSlice(t *testing.T) { + sls := []string{"*event", "*thresholds:ID1;ID2;ID3", "*attributes", "*stats:ID"} + eMp := FlagsWithParams{ + "*event": []string{}, + "*thresholds": []string{"ID1", "ID2", "ID3"}, + "*attributes": []string{}, + "*stats": []string{"ID"}, + } + mp, err := FlagsWithParamsFromSlice(sls) + if err != nil { + t.Error(err) + } else if !reflect.DeepEqual(mp, eMp) { + t.Errorf("Expecting: %+v, received: %+v", eMp, mp) + } + sort.Strings(sls) + flgSls := mp.SliceFlags() + sort.Strings(flgSls) + if !reflect.DeepEqual(flgSls, sls) { + t.Errorf("Expecting: %+v, received: %+v", sls, flgSls) + } }