From 6e3960fb7ac01ab2e731bb8817b52e142e76eccf Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 1 Aug 2019 16:56:36 +0300 Subject: [PATCH] Add new filter type "*accounts" --- agents/agentreq_test.go | 42 +++++++------- agents/diamagent_test.go | 2 +- cdrc/csv_test.go | 2 +- cdrc/xml_test.go | 4 +- cmd/cgr-engine/cgr-engine.go | 6 +- config/filterscfg.go | 8 +++ config/libconfig_json.go | 1 + engine/account.go | 23 ++++++++ engine/account_test.go | 48 ++++++++++++++++ engine/balances.go | 19 +++++++ engine/filters.go | 84 ++++++++++++++++++++++++++-- general_tests/filters_it_test.go | 96 ++++++++++++++++++++++++++++++++ 12 files changed, 301 insertions(+), 34 deletions(-) diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index d07d68476..a3535d002 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -40,7 +40,7 @@ func TestAgReqAsNavigableMap(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CGRID}, @@ -136,7 +136,7 @@ func TestAgReqMaxCost(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CapMaxUsage}, "120s", false, false) @@ -180,7 +180,7 @@ func TestAgReqParseFieldDiameter(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -230,7 +230,7 @@ func TestAgReqParseFieldRadius(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ @@ -270,7 +270,7 @@ Host: api.cgrates.org data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ @@ -341,7 +341,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ @@ -370,7 +370,7 @@ func TestAgReqEmptyFilter(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CGRID}, @@ -413,7 +413,7 @@ func TestAgReqMetaExponent(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) agReq.CGRRequest.Set([]string{"Value"}, "2", false, false) agReq.CGRRequest.Set([]string{"Exponent"}, "2", false, false) @@ -439,7 +439,7 @@ func TestAgReqCGRActiveRequest(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent @@ -482,7 +482,7 @@ func TestAgReqFieldAsNone(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false) @@ -519,7 +519,7 @@ func TestAgReqFieldAsNone2(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false) @@ -559,7 +559,7 @@ func TestAgReqAsNavigableMap2(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false) @@ -616,7 +616,7 @@ func TestAgReqFieldAsInterface(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRAReq = config.NewNavigableMap(nil) @@ -663,7 +663,7 @@ func TestAgReqNewARWithCGRRplyAndRply(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) ev := map[string]interface{}{ "FirstLevel": map[string]interface{}{ @@ -713,7 +713,7 @@ func TestAgReqSetCGRReplyWithError(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) ev := map[string]interface{}{ "FirstLevel": map[string]interface{}{ @@ -754,7 +754,7 @@ func TestAgReqSetCGRReplyWithoutError(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) ev := map[string]interface{}{ "FirstLevel": map[string]interface{}{ @@ -816,7 +816,7 @@ func TestAgReqParseFieldMetaCCUsage(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -894,7 +894,7 @@ func TestAgReqParseFieldMetaUsageDifference(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -960,7 +960,7 @@ func TestAgReqParseFieldMetaSum(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -1004,7 +1004,7 @@ func TestAgReqParseFieldMetaDifference(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) @@ -1048,7 +1048,7 @@ func TestAgReqParseFieldMetaValueExponent(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) diff --git a/agents/diamagent_test.go b/agents/diamagent_test.go index 919c38a6c..6a4e8f21a 100644 --- a/agents/diamagent_test.go +++ b/agents/diamagent_test.go @@ -51,7 +51,7 @@ func (s *testMockSessionConn) Call(method string, arg interface{}, rply interfac func TestProcessRequest(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) - filters := engine.NewFilterS(config.CgrConfig(), nil, nil, dm) // no need for filterS but stiil try to configure the dm :D + filters := engine.NewFilterS(config.CgrConfig(), nil, nil, nil, dm) // no need for filterS but stiil try to configure the dm :D cgrRplyNM := config.NewNavigableMap(nil) rply := config.NewNavigableMap(nil) diff --git a/cdrc/csv_test.go b/cdrc/csv_test.go index 5cf16cf40..ffaee3fb5 100644 --- a/cdrc/csv_test.go +++ b/cdrc/csv_test.go @@ -76,7 +76,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) { cdrcConfig := cgrConfig.CdrcProfiles["/var/spool/cgrates/cdrc/in"][0] data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) - filterS := engine.NewFilterS(cgrConfig, nil, nil, dm) + filterS := engine.NewFilterS(cgrConfig, nil, nil, nil, dm) cdrcConfig.CdrSourceId = "TEST_CDRC" cdrcConfig.ContentFields = []*config.FCTemplate{ {Tag: "TORField", Type: utils.META_COMPOSED, FieldId: utils.ToR, diff --git a/cdrc/xml_test.go b/cdrc/xml_test.go index e02ca4415..34144d19e 100644 --- a/cdrc/xml_test.go +++ b/cdrc/xml_test.go @@ -321,7 +321,7 @@ func TestXMLRPProcessWithNewFilters(t *testing.T) { } xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true, - cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, engine.NewDataManager(data))) + cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, nil, engine.NewDataManager(data))) if err != nil { t.Error(err) } @@ -588,7 +588,7 @@ func TestXMLRPNestingSeparator(t *testing.T) { } xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(xmlContent), utils.HierarchyPath([]string{"File", "CDRs", "Call"}), "UTC", true, - cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, engine.NewDataManager(data))) + cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, nil, engine.NewDataManager(data))) if err != nil { t.Error(err) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 4b6d3369f..543a6dbbd 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -1135,10 +1135,10 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh // startFilterService fires up the FilterS func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, - internalStatSChan, internalResourceSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, + internalStatSChan, internalResourceSChan, internalRalSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, exitChan chan bool) { <-cacheS.GetPrecacheChannel(utils.CacheFilters) - filterSChan <- engine.NewFilterS(cfg, internalStatSChan, internalResourceSChan, dm) + filterSChan <- engine.NewFilterS(cfg, internalStatSChan, internalResourceSChan, internalRalSChan, dm) } // loaderService will start and register APIs for LoaderService if enabled @@ -1794,7 +1794,7 @@ func main() { } // Start FilterS - go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, cfg, dm, exitChan) + go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan) if cfg.AttributeSCfg().Enabled { go startAttributeService(internalAttributeSChan, cacheS, diff --git a/config/filterscfg.go b/config/filterscfg.go index b21e9996d..4954670b5 100644 --- a/config/filterscfg.go +++ b/config/filterscfg.go @@ -21,6 +21,7 @@ package config type FilterSCfg struct { StatSConns []*RemoteHost ResourceSConns []*RemoteHost + RALsConns []*RemoteHost } func (fSCfg *FilterSCfg) loadFromJsonCfg(jsnCfg *FilterSJsonCfg) (err error) { @@ -41,5 +42,12 @@ func (fSCfg *FilterSCfg) loadFromJsonCfg(jsnCfg *FilterSJsonCfg) (err error) { fSCfg.ResourceSConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCfg.Rals_conns != nil { + fSCfg.RALsConns = make([]*RemoteHost, len(*jsnCfg.Rals_conns)) + for idx, jsnHaCfg := range *jsnCfg.Rals_conns { + fSCfg.RALsConns[idx] = NewDfltRemoteHost() + fSCfg.RALsConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } return } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index fb65a1e7e..92c1af23d 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -94,6 +94,7 @@ type DbJsonCfg struct { type FilterSJsonCfg struct { Stats_conns *[]*RemoteHostJson Resources_conns *[]*RemoteHostJson + Rals_conns *[]*RemoteHostJson } // Rater config section diff --git a/engine/account.go b/engine/account.go index 33e21fcf0..5114cfcc1 100644 --- a/engine/account.go +++ b/engine/account.go @@ -1116,6 +1116,29 @@ func (acnt *Account) Publish() { } } +func (acnt *Account) AsNavigableMap(_ []*config.FCTemplate) (*config.NavigableMap, error) { + mpIface := map[string]interface{}{ + "ID": acnt.ID, + //"UnitCounters": acnt.UnitCounters, + "ActionTriggers": acnt.ActionTriggers, + "AllowNegative": acnt.AllowNegative, + "Disabled": acnt.Disabled, + } + + balanceMap := make(map[string]interface{}, len(acnt.BalanceMap)) + for key, balances := range acnt.BalanceMap { + balSls := make([]*config.NavigableMap, len(balances)) + for i, balance := range balances { + balSls[i], _ = balance.AsNavigableMap(nil) + } + balanceMap[key] = balSls + } + mpIface["BalanceMap"] = balanceMap + + return config.NewNavigableMap(mpIface), nil + +} + func NewAccountSummaryFromJSON(jsn string) (acntSummary *AccountSummary, err error) { if !utils.IsSliceMember([]string{"", "null"}, jsn) { // Unmarshal only when content json.Unmarshal([]byte(jsn), &acntSummary) diff --git a/engine/account_test.go b/engine/account_test.go index 34f20f21a..75313478a 100644 --- a/engine/account_test.go +++ b/engine/account_test.go @@ -2279,6 +2279,54 @@ func TestAccountGetMultipleBalancesForPrefixWithSameWeight(t *testing.T) { } } +func TestAccountAsNavigableMap(t *testing.T) { + acc := &Account{ + BalanceMap: map[string]Balances{ + utils.MONETARY: Balances{ + &Balance{ + ID: "SpecialBalance1", + Value: 10, + Weight: 10.0, + }, + &Balance{ + ID: "SpecialBalance2", + Value: 10, + Weight: 10.0, + }, + }, + }, + } + nM, _ := acc.AsNavigableMap(nil) + eVal := "SpecialBalance1" + if strVal, err := nM.FieldAsString( + []string{"BalanceMap", "*monetary[0]", "ID"}); err != nil { + t.Error(err) + } else if strVal != eVal { + t.Errorf("expecting: <%+v> received: <%+v>", eVal, strVal) + } + eVal = "10" + if strVal, err := nM.FieldAsString( + []string{"BalanceMap", "*monetary[0]", "Value"}); err != nil { + t.Error(err) + } else if strVal != eVal { + t.Errorf("expecting: <%+v> received: <%+v>", eVal, strVal) + } + eVal = "10" + if strVal, err := nM.FieldAsString( + []string{"BalanceMap", "*monetary[0]", "Weight"}); err != nil { + t.Error(err) + } else if strVal != eVal { + t.Errorf("expecting: <%+v> received: <%+v>", eVal, strVal) + } + eVal = "SpecialBalance2" + if strVal, err := nM.FieldAsString( + []string{"BalanceMap", "*monetary[1]", "ID"}); err != nil { + t.Error(err) + } else if strVal != eVal { + t.Errorf("expecting: <%+v> received: <%+v>", eVal, strVal) + } +} + /*********************************** Benchmarks *******************************/ func BenchmarkGetSecondForPrefix(b *testing.B) { diff --git a/engine/balances.go b/engine/balances.go index e03daace6..56ac99427 100644 --- a/engine/balances.go +++ b/engine/balances.go @@ -185,6 +185,25 @@ func (b *Balance) Clone() *Balance { return n } +func (b *Balance) AsNavigableMap(_ []*config.FCTemplate) (*config.NavigableMap, error) { + return config.NewNavigableMap(map[string]interface{}{ + "Uuid": b.Uuid, + "ID": b.ID, + "Value": b.Value, + "ExpirationDate": b.ExpirationDate, + "Weight": b.Weight, + "DestinationIDs": b.DestinationIDs, + "RatingSubject": b.RatingSubject, + "Categories": b.Categories, + "SharedGroups": b.SharedGroups, + "Timings": b.Timings, + "TimingIDs": b.TimingIDs, + "Disabled": b.Disabled, + "Factor": b.Factor, + "Blocker": b.Blocker, + }), nil +} + func (b *Balance) getMatchingPrefixAndDestID(dest string) (prefix, destId string) { if len(b.DestinationIDs) != 0 && b.DestinationIDs[utils.ANY] == false { for _, p := range utils.SplitPrefix(dest, MIN_PREFIX_MATCH) { diff --git a/engine/filters.go b/engine/filters.go index 78ebbfc79..6d9e8698f 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -62,7 +62,7 @@ const ( ) func NewFilterS(cfg *config.CGRConfig, - statSChan, resSChan chan rpcclient.RpcClientConnection, dm *DataManager) (fS *FilterS) { + statSChan, resSChan, ralSChan chan rpcclient.RpcClientConnection, dm *DataManager) (fS *FilterS) { fS = &FilterS{ dm: dm, cfg: cfg, @@ -73,16 +73,19 @@ func NewFilterS(cfg *config.CGRConfig, if len(cfg.FilterSCfg().ResourceSConns) != 0 { fS.connResourceS(resSChan) } + if len(cfg.FilterSCfg().RALsConns) != 0 { + fS.connRALs(ralSChan) + } return } // FilterS is a service used to take decisions in case of filters // uses lazy connections where necessary to avoid deadlocks on service startup type FilterS struct { - cfg *config.CGRConfig - statSConns, resSConns rpcclient.RpcClientConnection - sSConnMux, rSConnMux sync.RWMutex // make sure only one goroutine attempts connecting - dm *DataManager + cfg *config.CGRConfig + statSConns, resSConns, ralSConns rpcclient.RpcClientConnection + sSConnMux, rSConnMux, ralSConnMux sync.RWMutex // make sure only one goroutine attempts connecting + dm *DataManager } // connStatS returns will connect towards StatS @@ -117,6 +120,22 @@ func (fS *FilterS) connResourceS(resSChan chan rpcclient.RpcClientConnection) (e return } +// connRALs returns will connect towards RALs +func (fS *FilterS) connRALs(ralSChan chan rpcclient.RpcClientConnection) (err error) { + fS.ralSConnMux.Lock() + defer fS.ralSConnMux.Unlock() + if fS.ralSConns != nil { // connection was populated between locks + return + } + fS.ralSConns, err = NewRPCPool(rpcclient.POOL_FIRST, + fS.cfg.TlsCfg().ClientKey, fS.cfg.TlsCfg().ClientCerificate, + fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts, + fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout, + fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().RALsConns, + ralSChan, true) + return +} + // Pass will check all filters wihin filterIDs and require them passing for dataProvider // there should be at least one filter passing, ie: if filters are not active event will fail to pass // receives the event as DataProvider so we can accept undecoded data (ie: HttpRequest) @@ -242,6 +261,7 @@ type FilterRule struct { negative *bool statItems []*itemFilter // Cached compiled itemFilter out of Values resourceItems []*itemFilter // Cached compiled itemFilter out of Values + accountItems []*itemFilter // Cached compiled itemFilter out of Values } // Separate method to compile RSR fields @@ -279,7 +299,7 @@ func (rf *FilterRule) CompileValues() (err error) { return fmt.Errorf("Value %s needs to contain at least 3 items", val) } // valSplt[0] filter type - // valSplt[1] id of the Resource + // valSplt[1] id of the AccountID.FieldToUsed // valSplt[2] value to compare rf.resourceItems[i] = &itemFilter{ FilterType: valSplt[0], @@ -287,6 +307,24 @@ func (rf *FilterRule) CompileValues() (err error) { FilterValue: valSplt[2], } } + case utils.MetaAccounts: + //value for filter of type *accounts needs to be in the following form: + //*gt:AccountID:ValueOfUsage + rf.accountItems = make([]*itemFilter, len(rf.Values)) + for i, val := range rf.Values { + valSplt := strings.Split(val, utils.InInFieldSep) + if len(valSplt) != 3 { + return fmt.Errorf("Value %s needs to contain at least 3 items", val) + } + // valSplt[0] filter type + // valSplt[1] id of the Resource + // valSplt[2] value to compare + rf.accountItems[i] = &itemFilter{ + FilterType: valSplt[0], + ItemID: valSplt[1], + FilterValue: valSplt[2], + } + } } return } @@ -323,6 +361,8 @@ func (fltr *FilterRule) Pass(dP config.DataProvider, result, err = fltr.passResourceS(dP, rpcClnt, tenant) case MetaEqual, MetaNotEqual: result, err = fltr.passEqualTo(dP) + case utils.MetaAccounts: + result, err = fltr.passAccountS(dP, rpcClnt, tenant) default: err = utils.ErrPrefixNotErrNotImplemented(fltr.Type) } @@ -580,6 +620,38 @@ func (fltr *FilterRule) passResourceS(dP config.DataProvider, return true, nil } +func (fltr *FilterRule) passAccountS(dP config.DataProvider, + accountS rpcclient.RpcClientConnection, tenant string) (bool, error) { + if accountS == nil || reflect.ValueOf(accountS).IsNil() { + return false, errors.New("Missing AccountS information") + } + for _, accItem := range fltr.accountItems { + //split accItem.ItemID in two accountID and actual filter + //AccountID.BalanceMap.*monetary[0].Value + splittedString := strings.SplitN(accItem.ItemID, utils.NestingSep, 2) + accID := splittedString[0] + filterID := splittedString[1] + var reply Account + if err := accountS.Call(utils.ApierV2GetAccount, + &utils.AttrGetAccount{Tenant: tenant, Account: accID}, &reply); err != nil { + return false, err + } + //compose the newFilter + fltr, err := NewFilterRule(accItem.FilterType, + utils.DynamicDataPrefix+filterID, []string{accItem.FilterValue}) + if err != nil { + return false, err + } + dP, _ := reply.AsNavigableMap(nil) + if val, err := fltr.Pass(dP, nil, tenant); err != nil || !val { + //in case of error return false and error + //and in case of not pass return false and nil + return false, err + } + } + return true, nil +} + func (fltr *FilterRule) passEqualTo(dP config.DataProvider) (bool, error) { fldIf, err := config.GetDynamicInterface(fltr.FieldName, dP) if err != nil { diff --git a/general_tests/filters_it_test.go b/general_tests/filters_it_test.go index 53aef1ada..51075c56c 100644 --- a/general_tests/filters_it_test.go +++ b/general_tests/filters_it_test.go @@ -53,6 +53,7 @@ var sTestsFltr = []func(t *testing.T){ testV1FltrGetThresholdForEvent, testV1FltrGetThresholdForEvent2, testV1FltrPopulateResources, + testV1FltrAccounts, testV1FltrStopEngine, } @@ -510,6 +511,101 @@ func testV1FltrPopulateResources(t *testing.T) { } } +func testV1FltrAccounts(t *testing.T) { + var resp string + if err := fltrRpc.Call("ApierV1.RemoveThresholdProfile", + &utils.TenantIDWithCache{Tenant: "cgrates.org", ID: "THD_ACNT_1001"}, &resp); err != nil { + t.Error(err) + } else if resp != utils.OK { + t.Error("Unexpected reply returned", resp) + } + //Add a filter of type *accounts and check if *monetary balance of account 1001 is minim 9 ( greater than 9) + //we expect that the balance to be 10 so the filter should pass (10 > 9) + filter := &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLTR_TH_Accounts", + Rules: []*engine.FilterRule{ + { + Type: "*accounts", + Values: []string{"*gt:1001.BalanceMap.*monetary[0].Value:9"}, + }, + }, + } + + var result string + if err := fltrRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + + //Add a threshold with filter from above and an inline filter for Account 1010 + tPrfl := &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "TH_Account", + FilterIDs: []string{"FLTR_TH_Accounts", "*string:~Account:1001"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + }, + MaxHits: -1, + MinSleep: time.Duration(1 * time.Millisecond), + Weight: 90.0, + ActionIDs: []string{"LOG"}, + Async: true, + } + if err := fltrRpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + var rcvTh *engine.ThresholdProfile + if err := fltrRpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: tPrfl.Tenant, ID: tPrfl.ID}, &rcvTh); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl, rcvTh) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl, rcvTh) + } + + tEv := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.Account: "1001"}, + } + var ids []string + if err := fltrRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(ids, []string{"TH_Account"}) { + t.Error("Unexpected reply returned", ids) + } + + // update the filter + //Add a filter of type *accounts and check if *monetary balance of account 1001 is minim 11 ( greater than 11) + //we expect that the balance to be 10 so the filter should not pass (10 > 11) + filter = &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLTR_TH_Accounts", + Rules: []*engine.FilterRule{ + { + Type: "*accounts", + Values: []string{"*gt:1001.BalanceMap.*monetary[0].Value:11"}, + }, + }, + } + + if err := fltrRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + + if err := fltrRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + func testV1FltrStopEngine(t *testing.T) { if err := engine.KillEngine(accDelay); err != nil { t.Error(err)