Update template for DNSAgent and add new method V2ProcessEvent in SessionS

This commit is contained in:
TeoV
2019-07-14 18:00:59 +03:00
committed by Dan Christian Bogos
parent ae5f2e1d6b
commit 4e2164b741
8 changed files with 338 additions and 7 deletions

View File

@@ -92,7 +92,6 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) {
}
reqVars[E164Address] = e164
reqVars[DomainName] = domainNameFromNAPTR(req.Question[0].Name)
}
cgrRplyNM := config.NewNavigableMap(nil)
rplyNM := config.NewNavigableMap(nil) // share it among different processors

View File

@@ -21,7 +21,7 @@
"type": "*constant", "value": "U"},
{"tag": "NAPTRService", "field_id": "Service",
"type": "*constant", "value": "E2U+SIP"},
{"tag": "NAPTRReplacement", "field_id": "Regexp",
{"tag": "NAPTRRegex", "field_id": "Regexp",
"type": "*variable", "value": "~*cgrep.Attributes.NAPTRAddress"},
],
},

View File

@@ -14,8 +14,8 @@
{"tag": "NAPTRPreference", "field_id": "Preference", "type": "*constant", "value": "10"},
{"tag": "NAPTRFlags", "field_id": "Flags", "type": "*constant", "value": "U"},
{"tag": "NAPTRService", "field_id": "Service", "type": "*constant", "value": "E2U+SIP"},
{"tag": "NAPTRRegexp", "field_id": "Regexp", "type": "*constant", "value": "^.*$"},
{"tag": "NAPTRReplacement", "field_id": "Replacement", "type": "*constant", "value": "sip:\\1@172.16.1.10."},
{"tag": "NAPTRRegexp", "field_id": "Regexp", "type": "*constant", "value": "!^(.*)$!sip:\\1@172.16.1.10.!"},
{"tag": "NAPTRReplacement", "field_id": "Replacement", "type": "*constant", "value": "."},
],
},
],

View File

@@ -33,6 +33,8 @@
"type": "*constant", "value": "E2U+SIP"},
{"tag": "NAPTRRegexp", "field_id": "Regexp", "type": "*variable",
"value": "~*cgrep.Suppliers.SortedSuppliers[0].SupplierParameters"},
{"tag": "NAPTRReplacement", "field_id": "Replacement",
"type": "*constant", "value": "."},
],
"continue": true,
},
@@ -53,6 +55,8 @@
"type": "*constant", "value": "E2U+SIP"},
{"tag": "NAPTRRegexp", "field_id": "Regexp", "type": "*variable",
"value": "~*cgrep.Suppliers.SortedSuppliers[1].SupplierParameters"},
{"tag": "NAPTRReplacement", "field_id": "Replacement",
"type": "*constant", "value": "."},
],
"continue": true,
},

View File

@@ -3009,7 +3009,298 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
fmt.Sprintf("<%s> error: %s processing event %+v with StatS.",
utils.SessionS, err.Error(), args.CGREvent))
}
rply.ThresholdIDs = &statReply
rply.StatQueueIDs = &statReply
}
return nil
}
// V2ProcessEventArgs are the options passed to ProcessEvent API
type V2ProcessEventArgs struct {
Flags []string
*utils.CGREvent
utils.Paginator
*utils.ArgDispatcher
}
// V2ProcessEventReply is the reply for the ProcessEvent API
type V2ProcessEventReply struct {
MaxUsage *time.Duration
ResourceAuthorization *string
ResourceAllocation *string
Attributes *engine.AttrSProcessEventReply
Suppliers *engine.SortedSuppliers
ThresholdIDs *[]string
StatQueueIDs *[]string
}
// BiRPCv1ProcessEvent processes one event with the right subsystems based on arguments received
func (sS *SessionS) BiRPCv2ProcessEvent(clnt rpcclient.RpcClientConnection,
args *V2ProcessEventArgs, rply *V2ProcessEventReply) (err error) {
if args.CGREvent.ID == "" {
args.CGREvent.ID = utils.GenUUID()
}
// RPC caching
if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv2ProcessEvent, args.CGREvent.ID)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*rply = *cachedResp.Result.(*V1ProcessEventReply)
}
return cachedResp.Error
}
defer engine.Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: rply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
if args.CGREvent.Tenant == "" {
args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
}
me := engine.NewMapEvent(args.CGREvent.Event)
originID := me.GetStringIgnoreErrors(utils.OriginID)
//convert from Flags []string to utils.FlagsWithParams
argsFlagsWithParams, err := utils.FlagsWithParamsFromSlice(args.Flags)
if err != nil {
return
}
// check for *attribute
if argsFlagsWithParams.HasKey(utils.MetaAttributes) {
if sS.attrS == nil {
return utils.NewErrNotConnected(utils.AttributeS)
}
attrArgs := &engine.AttrArgsProcessEvent{
Context: utils.StringPointer(utils.MetaSessionS),
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
if attrIDs := argsFlagsWithParams.ParamsSlice(utils.MetaAttributes); len(attrIDs) != 0 {
attrArgs.AttributeIDs = attrIDs
}
var rplyEv engine.AttrSProcessEventReply
if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent,
attrArgs, &rplyEv); err == nil {
args.CGREvent = rplyEv.CGREvent
if tntIface, has := args.CGREvent.Event[utils.MetaTenant]; has {
// special case when we want to overwrite the tenant
args.CGREvent.Tenant = tntIface.(string)
delete(args.CGREvent.Event, utils.MetaTenant)
}
rply.Attributes = &rplyEv
} else if err.Error() != utils.ErrNotFound.Error() {
return utils.NewErrAttributeS(err)
}
}
//check for auth session
if argsFlagsWithParams.HasKey(utils.MetaAuth) {
maxUsage, err := sS.authSession(args.CGREvent.Tenant,
engine.NewSafEvent(args.CGREvent.Event))
if err != nil {
return utils.NewErrRALs(err)
}
rply.MaxUsage = &maxUsage
}
// check for *resources
if argsFlagsWithParams.HasKey(utils.MetaResources) {
if sS.resS == nil {
return utils.NewErrNotConnected(utils.ResourceS)
}
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
attrRU := utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
UsageID: originID,
Units: 1,
ArgDispatcher: args.ArgDispatcher,
}
var resMessage string
// check what we need to do for resources (*authorization/*allocation)
if resOpt := argsFlagsWithParams.ParamsSlice(utils.MetaResources); len(resOpt) != 0 {
if resOpt[0] == utils.MetaAuthorize {
if err = sS.resS.Call(utils.ResourceSv1AuthorizeResources,
attrRU, &resMessage); err != nil {
return utils.NewErrResourceS(err)
}
rply.ResourceAuthorization = &resMessage
}
if resOpt[0] == utils.MetaAllocate {
if err = sS.resS.Call(utils.ResourceSv1AllocateResources,
attrRU, &resMessage); err != nil {
return utils.NewErrResourceS(err)
}
rply.ResourceAllocation = &resMessage
}
}
}
// check for init session
if argsFlagsWithParams.HasKey(utils.MetaInitiate) {
var err error
ev := engine.NewSafEvent(args.CGREvent.Event)
dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval
if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}
s, err := sS.initSession(args.CGREvent.Tenant, ev,
sS.biJClntID(clnt), originID, dbtItvl, args.ArgDispatcher)
if err != nil {
return utils.NewErrRALs(err)
}
if dbtItvl > 0 { //active debit
rply.MaxUsage = utils.DurationPointer(time.Duration(-1))
} else {
if maxUsage, err := sS.updateSession(s, nil); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
}
}
}
// check for terminate session
if argsFlagsWithParams.HasKey(utils.MetaTerminate) {
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval
if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}
ss := sS.getRelocateSessions(cgrID,
me.GetStringIgnoreErrors(utils.InitialOriginID),
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost))
var s *Session
if len(ss) == 0 {
if s, err = sS.initSession(args.CGREvent.Tenant,
ev, sS.biJClntID(clnt),
me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil {
return utils.NewErrRALs(err)
}
} else {
s = ss[0]
}
if err = sS.endSession(s,
me.GetDurationPtrIgnoreErrors(utils.Usage),
me.GetDurationPtrIgnoreErrors(utils.LastUsed),
utils.TimePointer(me.GetTimeIgnoreErrors(utils.AnswerTime, utils.EmptyString))); err != nil {
return utils.NewErrRALs(err)
}
}
// in case we need to release a resource we do this after we close the session
if argsFlagsWithParams.HasKey(utils.MetaResources) {
if resOpt := argsFlagsWithParams.ParamsSlice(utils.MetaResources); len(resOpt) != 0 &&
resOpt[0] == utils.MetaRelease {
if sS.resS == nil {
return utils.NewErrNotConnected(utils.ResourceS)
}
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
var reply string
argsRU := utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired
Units: 1,
ArgDispatcher: args.ArgDispatcher,
}
if err = sS.resS.Call(utils.ResourceSv1ReleaseResources,
argsRU, &reply); err != nil {
return utils.NewErrResourceS(err)
}
}
}
// get suppliers if required
if argsFlagsWithParams.HasKey(utils.MetaSuppliers) {
if sS.splS == nil {
return utils.NewErrNotConnected(utils.SupplierS)
}
cgrEv := args.CGREvent.Clone()
if acd, has := cgrEv.Event[utils.ACD]; has {
cgrEv.Event[utils.Usage] = acd
}
var splsReply engine.SortedSuppliers
sArgs := &engine.ArgsGetSuppliers{
CGREvent: cgrEv,
Paginator: args.Paginator,
ArgDispatcher: args.ArgDispatcher,
}
// check in case we have options for suppliers
if splOpts := argsFlagsWithParams.ParamsSlice(utils.MetaSuppliers); len(splOpts) != 0 {
for _, splOpt := range splOpts {
if splOpt == utils.MetaIgnoreErrors {
sArgs.IgnoreErrors = true
}
if splOpt == utils.MetaEventCost {
sArgs.MaxCost = utils.MetaSuppliersEventCost
}
}
}
if err = sS.splS.Call(utils.SupplierSv1GetSuppliers,
sArgs, &splsReply); err != nil {
return utils.NewErrSupplierS(err)
}
if splsReply.SortedSuppliers != nil {
rply.Suppliers = &splsReply
}
}
// process thresholds if required
if argsFlagsWithParams.HasKey(utils.MetaThresholds) {
if sS.thdS == nil {
return utils.NewErrNotConnected(utils.ThresholdS)
}
var tIDs []string
thEv := &engine.ArgsProcessEvent{
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
// check in case we have ThresholdIDs inside flags
if thIDs := argsFlagsWithParams.ParamsSlice(utils.MetaThresholds); len(thIDs) != 0 {
thEv.ThresholdIDs = thIDs
}
if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent,
thEv, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.",
utils.SessionS, err.Error(), thEv))
}
rply.ThresholdIDs = &tIDs
}
// process stats if required
if argsFlagsWithParams.HasKey(utils.MetaStats) {
if sS.statS == nil {
return utils.NewErrNotConnected(utils.StatS)
}
var statReply []string
statArgs := &engine.StatsArgsProcessEvent{
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
// check in case we have StatIDs inside flags
if stsIDs := argsFlagsWithParams.ParamsSlice(utils.MetaStats); len(stsIDs) != 0 {
statArgs.StatIDs = stsIDs
}
if err := sS.statS.Call(utils.StatSv1ProcessEvent,
statArgs, &statReply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with StatS.",
utils.SessionS, err.Error(), args.CGREvent))
}
rply.StatQueueIDs = &statReply
}
return nil
}

View File

@@ -609,6 +609,9 @@ const (
CGREventString = "CGREvent"
MetaPing = "*ping"
MetaTextPlain = "*text_plain"
MetaIgnoreErrors = "*ignore_errors"
MetaRelease = "*release"
MetaAllocate = "*allocate"
)
// Migrator Action

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package utils
import (
"reflect"
"strconv"
"strings"
)
@@ -275,3 +276,15 @@ func (fWp FlagsWithParams) ParamsSlice(subs string) (ps []string) {
}
return ps
}
//func to convert from FlagsWithParams back to []string
func (fWp FlagsWithParams) SliceFlags() (sls []string) {
for key, _ := range fWp {
if prmSlice := fWp.ParamsSlice(key); !reflect.DeepEqual(prmSlice, []string{}) {
sls = append(sls, ConcatenatedKey(key, strings.Join(prmSlice, INFIELD_SEP)))
} else {
sls = append(sls, key)
}
}
return
}

View File

@@ -19,6 +19,7 @@ package utils
import (
"reflect"
"sort"
"testing"
)
@@ -161,7 +162,6 @@ func TestMapSubsystemIDsHasKey(t *testing.T) {
if has := mp.HasKey("*resources"); has {
t.Errorf("Expecting: false, received: %+v", has)
}
}
func TestMapSubsystemIDsGetIDs(t *testing.T) {
@@ -190,5 +190,26 @@ func TestMapSubsystemIDsGetIDs(t *testing.T) {
if ids := mp.ParamsSlice("*test"); !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting: %+v, received: %+v", eIDs, ids)
}
}
func TestFlagsToSlice(t *testing.T) {
sls := []string{"*event", "*thresholds:ID1;ID2;ID3", "*attributes", "*stats:ID"}
eMp := FlagsWithParams{
"*event": []string{},
"*thresholds": []string{"ID1", "ID2", "ID3"},
"*attributes": []string{},
"*stats": []string{"ID"},
}
mp, err := FlagsWithParamsFromSlice(sls)
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(mp, eMp) {
t.Errorf("Expecting: %+v, received: %+v", eMp, mp)
}
sort.Strings(sls)
flgSls := mp.SliceFlags()
sort.Strings(flgSls)
if !reflect.DeepEqual(flgSls, sls) {
t.Errorf("Expecting: %+v, received: %+v", sls, flgSls)
}
}