Add new filter type "*accounts"

This commit is contained in:
TeoV
2019-08-01 16:56:36 +03:00
committed by Dan Christian Bogos
parent cae07d1e35
commit 6e3960fb7a
12 changed files with 301 additions and 34 deletions

View File

@@ -40,7 +40,7 @@ func TestAgReqAsNavigableMap(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.CGRID},
@@ -136,7 +136,7 @@ func TestAgReqMaxCost(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.CapMaxUsage}, "120s", false, false)
@@ -180,7 +180,7 @@ func TestAgReqParseFieldDiameter(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
@@ -230,7 +230,7 @@ func TestAgReqParseFieldRadius(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
tplFlds := []*config.FCTemplate{
@@ -270,7 +270,7 @@ Host: api.cgrates.org
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
tplFlds := []*config.FCTemplate{
@@ -341,7 +341,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
tplFlds := []*config.FCTemplate{
@@ -370,7 +370,7 @@ func TestAgReqEmptyFilter(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.CGRID},
@@ -413,7 +413,7 @@ func TestAgReqMetaExponent(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq.CGRRequest.Set([]string{"Value"}, "2", false, false)
agReq.CGRRequest.Set([]string{"Exponent"}, "2", false, false)
@@ -439,7 +439,7 @@ func TestAgReqCGRActiveRequest(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
@@ -482,7 +482,7 @@ func TestAgReqFieldAsNone(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false)
@@ -519,7 +519,7 @@ func TestAgReqFieldAsNone2(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false)
@@ -559,7 +559,7 @@ func TestAgReqAsNavigableMap2(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false)
@@ -616,7 +616,7 @@ func TestAgReqFieldAsInterface(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRAReq = config.NewNavigableMap(nil)
@@ -663,7 +663,7 @@ func TestAgReqNewARWithCGRRplyAndRply(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
ev := map[string]interface{}{
"FirstLevel": map[string]interface{}{
@@ -713,7 +713,7 @@ func TestAgReqSetCGRReplyWithError(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
ev := map[string]interface{}{
"FirstLevel": map[string]interface{}{
@@ -754,7 +754,7 @@ func TestAgReqSetCGRReplyWithoutError(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
ev := map[string]interface{}{
"FirstLevel": map[string]interface{}{
@@ -816,7 +816,7 @@ func TestAgReqParseFieldMetaCCUsage(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
@@ -894,7 +894,7 @@ func TestAgReqParseFieldMetaUsageDifference(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
@@ -960,7 +960,7 @@ func TestAgReqParseFieldMetaSum(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
@@ -1004,7 +1004,7 @@ func TestAgReqParseFieldMetaDifference(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
@@ -1048,7 +1048,7 @@ func TestAgReqParseFieldMetaValueExponent(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)

View File

@@ -51,7 +51,7 @@ func (s *testMockSessionConn) Call(method string, arg interface{}, rply interfac
func TestProcessRequest(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
filters := engine.NewFilterS(config.CgrConfig(), nil, nil, dm) // no need for filterS but stiil try to configure the dm :D
filters := engine.NewFilterS(config.CgrConfig(), nil, nil, nil, dm) // no need for filterS but stiil try to configure the dm :D
cgrRplyNM := config.NewNavigableMap(nil)
rply := config.NewNavigableMap(nil)

View File

@@ -76,7 +76,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) {
cdrcConfig := cgrConfig.CdrcProfiles["/var/spool/cgrates/cdrc/in"][0]
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
filterS := engine.NewFilterS(cgrConfig, nil, nil, dm)
filterS := engine.NewFilterS(cgrConfig, nil, nil, nil, dm)
cdrcConfig.CdrSourceId = "TEST_CDRC"
cdrcConfig.ContentFields = []*config.FCTemplate{
{Tag: "TORField", Type: utils.META_COMPOSED, FieldId: utils.ToR,

View File

@@ -321,7 +321,7 @@ func TestXMLRPProcessWithNewFilters(t *testing.T) {
}
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft),
utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true,
cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, engine.NewDataManager(data)))
cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, nil, engine.NewDataManager(data)))
if err != nil {
t.Error(err)
}
@@ -588,7 +588,7 @@ func TestXMLRPNestingSeparator(t *testing.T) {
}
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(xmlContent),
utils.HierarchyPath([]string{"File", "CDRs", "Call"}), "UTC", true,
cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, engine.NewDataManager(data)))
cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, nil, engine.NewDataManager(data)))
if err != nil {
t.Error(err)
}

View File

@@ -1135,10 +1135,10 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh
// startFilterService fires up the FilterS
func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
internalStatSChan, internalResourceSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
internalStatSChan, internalResourceSChan, internalRalSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dm *engine.DataManager, exitChan chan bool) {
<-cacheS.GetPrecacheChannel(utils.CacheFilters)
filterSChan <- engine.NewFilterS(cfg, internalStatSChan, internalResourceSChan, dm)
filterSChan <- engine.NewFilterS(cfg, internalStatSChan, internalResourceSChan, internalRalSChan, dm)
}
// loaderService will start and register APIs for LoaderService if enabled
@@ -1794,7 +1794,7 @@ func main() {
}
// Start FilterS
go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, cfg, dm, exitChan)
go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan)
if cfg.AttributeSCfg().Enabled {
go startAttributeService(internalAttributeSChan, cacheS,

View File

@@ -21,6 +21,7 @@ package config
type FilterSCfg struct {
StatSConns []*RemoteHost
ResourceSConns []*RemoteHost
RALsConns []*RemoteHost
}
func (fSCfg *FilterSCfg) loadFromJsonCfg(jsnCfg *FilterSJsonCfg) (err error) {
@@ -41,5 +42,12 @@ func (fSCfg *FilterSCfg) loadFromJsonCfg(jsnCfg *FilterSJsonCfg) (err error) {
fSCfg.ResourceSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Rals_conns != nil {
fSCfg.RALsConns = make([]*RemoteHost, len(*jsnCfg.Rals_conns))
for idx, jsnHaCfg := range *jsnCfg.Rals_conns {
fSCfg.RALsConns[idx] = NewDfltRemoteHost()
fSCfg.RALsConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
return
}

View File

@@ -94,6 +94,7 @@ type DbJsonCfg struct {
type FilterSJsonCfg struct {
Stats_conns *[]*RemoteHostJson
Resources_conns *[]*RemoteHostJson
Rals_conns *[]*RemoteHostJson
}
// Rater config section

View File

@@ -1116,6 +1116,29 @@ func (acnt *Account) Publish() {
}
}
func (acnt *Account) AsNavigableMap(_ []*config.FCTemplate) (*config.NavigableMap, error) {
mpIface := map[string]interface{}{
"ID": acnt.ID,
//"UnitCounters": acnt.UnitCounters,
"ActionTriggers": acnt.ActionTriggers,
"AllowNegative": acnt.AllowNegative,
"Disabled": acnt.Disabled,
}
balanceMap := make(map[string]interface{}, len(acnt.BalanceMap))
for key, balances := range acnt.BalanceMap {
balSls := make([]*config.NavigableMap, len(balances))
for i, balance := range balances {
balSls[i], _ = balance.AsNavigableMap(nil)
}
balanceMap[key] = balSls
}
mpIface["BalanceMap"] = balanceMap
return config.NewNavigableMap(mpIface), nil
}
func NewAccountSummaryFromJSON(jsn string) (acntSummary *AccountSummary, err error) {
if !utils.IsSliceMember([]string{"", "null"}, jsn) { // Unmarshal only when content
json.Unmarshal([]byte(jsn), &acntSummary)

View File

@@ -2279,6 +2279,54 @@ func TestAccountGetMultipleBalancesForPrefixWithSameWeight(t *testing.T) {
}
}
func TestAccountAsNavigableMap(t *testing.T) {
acc := &Account{
BalanceMap: map[string]Balances{
utils.MONETARY: Balances{
&Balance{
ID: "SpecialBalance1",
Value: 10,
Weight: 10.0,
},
&Balance{
ID: "SpecialBalance2",
Value: 10,
Weight: 10.0,
},
},
},
}
nM, _ := acc.AsNavigableMap(nil)
eVal := "SpecialBalance1"
if strVal, err := nM.FieldAsString(
[]string{"BalanceMap", "*monetary[0]", "ID"}); err != nil {
t.Error(err)
} else if strVal != eVal {
t.Errorf("expecting: <%+v> received: <%+v>", eVal, strVal)
}
eVal = "10"
if strVal, err := nM.FieldAsString(
[]string{"BalanceMap", "*monetary[0]", "Value"}); err != nil {
t.Error(err)
} else if strVal != eVal {
t.Errorf("expecting: <%+v> received: <%+v>", eVal, strVal)
}
eVal = "10"
if strVal, err := nM.FieldAsString(
[]string{"BalanceMap", "*monetary[0]", "Weight"}); err != nil {
t.Error(err)
} else if strVal != eVal {
t.Errorf("expecting: <%+v> received: <%+v>", eVal, strVal)
}
eVal = "SpecialBalance2"
if strVal, err := nM.FieldAsString(
[]string{"BalanceMap", "*monetary[1]", "ID"}); err != nil {
t.Error(err)
} else if strVal != eVal {
t.Errorf("expecting: <%+v> received: <%+v>", eVal, strVal)
}
}
/*********************************** Benchmarks *******************************/
func BenchmarkGetSecondForPrefix(b *testing.B) {

View File

@@ -185,6 +185,25 @@ func (b *Balance) Clone() *Balance {
return n
}
func (b *Balance) AsNavigableMap(_ []*config.FCTemplate) (*config.NavigableMap, error) {
return config.NewNavigableMap(map[string]interface{}{
"Uuid": b.Uuid,
"ID": b.ID,
"Value": b.Value,
"ExpirationDate": b.ExpirationDate,
"Weight": b.Weight,
"DestinationIDs": b.DestinationIDs,
"RatingSubject": b.RatingSubject,
"Categories": b.Categories,
"SharedGroups": b.SharedGroups,
"Timings": b.Timings,
"TimingIDs": b.TimingIDs,
"Disabled": b.Disabled,
"Factor": b.Factor,
"Blocker": b.Blocker,
}), nil
}
func (b *Balance) getMatchingPrefixAndDestID(dest string) (prefix, destId string) {
if len(b.DestinationIDs) != 0 && b.DestinationIDs[utils.ANY] == false {
for _, p := range utils.SplitPrefix(dest, MIN_PREFIX_MATCH) {

View File

@@ -62,7 +62,7 @@ const (
)
func NewFilterS(cfg *config.CGRConfig,
statSChan, resSChan chan rpcclient.RpcClientConnection, dm *DataManager) (fS *FilterS) {
statSChan, resSChan, ralSChan chan rpcclient.RpcClientConnection, dm *DataManager) (fS *FilterS) {
fS = &FilterS{
dm: dm,
cfg: cfg,
@@ -73,16 +73,19 @@ func NewFilterS(cfg *config.CGRConfig,
if len(cfg.FilterSCfg().ResourceSConns) != 0 {
fS.connResourceS(resSChan)
}
if len(cfg.FilterSCfg().RALsConns) != 0 {
fS.connRALs(ralSChan)
}
return
}
// FilterS is a service used to take decisions in case of filters
// uses lazy connections where necessary to avoid deadlocks on service startup
type FilterS struct {
cfg *config.CGRConfig
statSConns, resSConns rpcclient.RpcClientConnection
sSConnMux, rSConnMux sync.RWMutex // make sure only one goroutine attempts connecting
dm *DataManager
cfg *config.CGRConfig
statSConns, resSConns, ralSConns rpcclient.RpcClientConnection
sSConnMux, rSConnMux, ralSConnMux sync.RWMutex // make sure only one goroutine attempts connecting
dm *DataManager
}
// connStatS returns will connect towards StatS
@@ -117,6 +120,22 @@ func (fS *FilterS) connResourceS(resSChan chan rpcclient.RpcClientConnection) (e
return
}
// connRALs returns will connect towards RALs
func (fS *FilterS) connRALs(ralSChan chan rpcclient.RpcClientConnection) (err error) {
fS.ralSConnMux.Lock()
defer fS.ralSConnMux.Unlock()
if fS.ralSConns != nil { // connection was populated between locks
return
}
fS.ralSConns, err = NewRPCPool(rpcclient.POOL_FIRST,
fS.cfg.TlsCfg().ClientKey, fS.cfg.TlsCfg().ClientCerificate,
fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts,
fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout,
fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().RALsConns,
ralSChan, true)
return
}
// Pass will check all filters wihin filterIDs and require them passing for dataProvider
// there should be at least one filter passing, ie: if filters are not active event will fail to pass
// receives the event as DataProvider so we can accept undecoded data (ie: HttpRequest)
@@ -242,6 +261,7 @@ type FilterRule struct {
negative *bool
statItems []*itemFilter // Cached compiled itemFilter out of Values
resourceItems []*itemFilter // Cached compiled itemFilter out of Values
accountItems []*itemFilter // Cached compiled itemFilter out of Values
}
// Separate method to compile RSR fields
@@ -279,7 +299,7 @@ func (rf *FilterRule) CompileValues() (err error) {
return fmt.Errorf("Value %s needs to contain at least 3 items", val)
}
// valSplt[0] filter type
// valSplt[1] id of the Resource
// valSplt[1] id of the AccountID.FieldToUsed
// valSplt[2] value to compare
rf.resourceItems[i] = &itemFilter{
FilterType: valSplt[0],
@@ -287,6 +307,24 @@ func (rf *FilterRule) CompileValues() (err error) {
FilterValue: valSplt[2],
}
}
case utils.MetaAccounts:
//value for filter of type *accounts needs to be in the following form:
//*gt:AccountID:ValueOfUsage
rf.accountItems = make([]*itemFilter, len(rf.Values))
for i, val := range rf.Values {
valSplt := strings.Split(val, utils.InInFieldSep)
if len(valSplt) != 3 {
return fmt.Errorf("Value %s needs to contain at least 3 items", val)
}
// valSplt[0] filter type
// valSplt[1] id of the Resource
// valSplt[2] value to compare
rf.accountItems[i] = &itemFilter{
FilterType: valSplt[0],
ItemID: valSplt[1],
FilterValue: valSplt[2],
}
}
}
return
}
@@ -323,6 +361,8 @@ func (fltr *FilterRule) Pass(dP config.DataProvider,
result, err = fltr.passResourceS(dP, rpcClnt, tenant)
case MetaEqual, MetaNotEqual:
result, err = fltr.passEqualTo(dP)
case utils.MetaAccounts:
result, err = fltr.passAccountS(dP, rpcClnt, tenant)
default:
err = utils.ErrPrefixNotErrNotImplemented(fltr.Type)
}
@@ -580,6 +620,38 @@ func (fltr *FilterRule) passResourceS(dP config.DataProvider,
return true, nil
}
func (fltr *FilterRule) passAccountS(dP config.DataProvider,
accountS rpcclient.RpcClientConnection, tenant string) (bool, error) {
if accountS == nil || reflect.ValueOf(accountS).IsNil() {
return false, errors.New("Missing AccountS information")
}
for _, accItem := range fltr.accountItems {
//split accItem.ItemID in two accountID and actual filter
//AccountID.BalanceMap.*monetary[0].Value
splittedString := strings.SplitN(accItem.ItemID, utils.NestingSep, 2)
accID := splittedString[0]
filterID := splittedString[1]
var reply Account
if err := accountS.Call(utils.ApierV2GetAccount,
&utils.AttrGetAccount{Tenant: tenant, Account: accID}, &reply); err != nil {
return false, err
}
//compose the newFilter
fltr, err := NewFilterRule(accItem.FilterType,
utils.DynamicDataPrefix+filterID, []string{accItem.FilterValue})
if err != nil {
return false, err
}
dP, _ := reply.AsNavigableMap(nil)
if val, err := fltr.Pass(dP, nil, tenant); err != nil || !val {
//in case of error return false and error
//and in case of not pass return false and nil
return false, err
}
}
return true, nil
}
func (fltr *FilterRule) passEqualTo(dP config.DataProvider) (bool, error) {
fldIf, err := config.GetDynamicInterface(fltr.FieldName, dP)
if err != nil {

View File

@@ -53,6 +53,7 @@ var sTestsFltr = []func(t *testing.T){
testV1FltrGetThresholdForEvent,
testV1FltrGetThresholdForEvent2,
testV1FltrPopulateResources,
testV1FltrAccounts,
testV1FltrStopEngine,
}
@@ -510,6 +511,101 @@ func testV1FltrPopulateResources(t *testing.T) {
}
}
func testV1FltrAccounts(t *testing.T) {
var resp string
if err := fltrRpc.Call("ApierV1.RemoveThresholdProfile",
&utils.TenantIDWithCache{Tenant: "cgrates.org", ID: "THD_ACNT_1001"}, &resp); err != nil {
t.Error(err)
} else if resp != utils.OK {
t.Error("Unexpected reply returned", resp)
}
//Add a filter of type *accounts and check if *monetary balance of account 1001 is minim 9 ( greater than 9)
//we expect that the balance to be 10 so the filter should pass (10 > 9)
filter := &engine.Filter{
Tenant: "cgrates.org",
ID: "FLTR_TH_Accounts",
Rules: []*engine.FilterRule{
{
Type: "*accounts",
Values: []string{"*gt:1001.BalanceMap.*monetary[0].Value:9"},
},
},
}
var result string
if err := fltrRpc.Call("ApierV1.SetFilter", filter, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
//Add a threshold with filter from above and an inline filter for Account 1010
tPrfl := &engine.ThresholdProfile{
Tenant: "cgrates.org",
ID: "TH_Account",
FilterIDs: []string{"FLTR_TH_Accounts", "*string:~Account:1001"},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
},
MaxHits: -1,
MinSleep: time.Duration(1 * time.Millisecond),
Weight: 90.0,
ActionIDs: []string{"LOG"},
Async: true,
}
if err := fltrRpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
var rcvTh *engine.ThresholdProfile
if err := fltrRpc.Call("ApierV1.GetThresholdProfile",
&utils.TenantID{Tenant: tPrfl.Tenant, ID: tPrfl.ID}, &rcvTh); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tPrfl, rcvTh) {
t.Errorf("Expecting: %+v, received: %+v", tPrfl, rcvTh)
}
tEv := utils.CGREvent{
Tenant: "cgrates.org",
ID: "event1",
Event: map[string]interface{}{
utils.Account: "1001"},
}
var ids []string
if err := fltrRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(ids, []string{"TH_Account"}) {
t.Error("Unexpected reply returned", ids)
}
// update the filter
//Add a filter of type *accounts and check if *monetary balance of account 1001 is minim 11 ( greater than 11)
//we expect that the balance to be 10 so the filter should not pass (10 > 11)
filter = &engine.Filter{
Tenant: "cgrates.org",
ID: "FLTR_TH_Accounts",
Rules: []*engine.FilterRule{
{
Type: "*accounts",
Values: []string{"*gt:1001.BalanceMap.*monetary[0].Value:11"},
},
},
}
if err := fltrRpc.Call("ApierV1.SetFilter", filter, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
if err := fltrRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func testV1FltrStopEngine(t *testing.T) {
if err := engine.KillEngine(accDelay); err != nil {
t.Error(err)