From bf3a36ff91d2e28b974cfbde295e20ef3a37edcd Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Oct 2017 13:47:31 +0200 Subject: [PATCH 01/10] ThresholdS indexed_fields option in config --- cmd/cgr-engine/cgr-engine.go | 2 +- config/config_defaults.go | 2 +- config/config_json_test.go | 6 +++--- config/config_test.go | 1 + config/libconfig_json.go | 6 +++--- config/thresholdscfg.go | 12 +++++++++--- engine/reqfilterhelpers.go | 17 +++++++++++++++-- engine/resources.go | 2 +- engine/stats.go | 2 +- engine/thresholds.go | 28 ++++++++++++++-------------- 10 files changed, 49 insertions(+), 29 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e2619fdeb..869775136 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -602,7 +602,7 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { filterS := <-filterSChan filterSChan <- filterS - tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().FilteredFields, + tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().IndexedFields, cfg.ThresholdSCfg().StoreInterval, filterS) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) diff --git a/config/config_defaults.go b/config/config_defaults.go index a2e053f1b..ce276e536 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -436,7 +436,7 @@ const CGRATES_CFG_JSON = ` "thresholds": { "enabled": false, // starts ThresholdS service: . "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur> - "filtered_fields": [], // match filters based on these fields for dynamic filtering, empty to use all + "indexed_fields": [], // query indexes based on these fields for faster processing }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 4c8e03c54..be2770f07 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -715,9 +715,9 @@ func TestDfStatServiceJsonCfg(t *testing.T) { func TestDfThresholdSJsonCfg(t *testing.T) { eCfg := &ThresholdSJsonCfg{ - Enabled: utils.BoolPointer(false), - Store_interval: utils.StringPointer(""), - Filtered_fields: utils.StringSlicePointer([]string{}), + Enabled: utils.BoolPointer(false), + Store_interval: utils.StringPointer(""), + Indexed_fields: utils.StringSlicePointer([]string{}), } if cfg, err := dfCgrJsonCfg.ThresholdSJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index 0b4121803..ce1480c83 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -601,6 +601,7 @@ func TestCgrCfgJSONDefaultThresholdSCfg(t *testing.T) { eThresholdSCfg := &ThresholdSCfg{ Enabled: false, StoreInterval: 0, + IndexedFields: []string{}, } if !reflect.DeepEqual(eThresholdSCfg, cgrCfg.thresholdSCfg) { t.Errorf("received: %+v, expecting: %+v", eThresholdSCfg, cgrCfg.statsCfg) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 5be9a2bb3..fbfe82f14 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -402,9 +402,9 @@ type StatServJsonCfg struct { // Threshold service config section type ThresholdSJsonCfg struct { - Enabled *bool - Store_interval *string - Filtered_fields *[]string + Enabled *bool + Store_interval *string + Indexed_fields *[]string } // Mailer config section diff --git a/config/thresholdscfg.go b/config/thresholdscfg.go index 78b7a24d0..625dd2bcc 100644 --- a/config/thresholdscfg.go +++ b/config/thresholdscfg.go @@ -25,9 +25,9 @@ import ( ) type ThresholdSCfg struct { - Enabled bool - StoreInterval time.Duration // Dump regularly from cache into dataDB - FilteredFields []string + Enabled bool + StoreInterval time.Duration // Dump regularly from cache into dataDB + IndexedFields []string } func (t *ThresholdSCfg) loadFromJsonCfg(jsnCfg *ThresholdSJsonCfg) (err error) { @@ -42,5 +42,11 @@ func (t *ThresholdSCfg) loadFromJsonCfg(jsnCfg *ThresholdSJsonCfg) (err error) { return err } } + if jsnCfg.Indexed_fields != nil { + t.IndexedFields = make([]string, len(*jsnCfg.Indexed_fields)) + for i, fID := range *jsnCfg.Indexed_fields { + t.IndexedFields[i] = fID + } + } return nil } diff --git a/engine/reqfilterhelpers.go b/engine/reqfilterhelpers.go index f07eef411..ad0ad6710 100644 --- a/engine/reqfilterhelpers.go +++ b/engine/reqfilterhelpers.go @@ -25,11 +25,24 @@ import ( ) // matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event +// fieldIDs limits the fields which are checked against indexes // helper on top of dataDB.MatchReqFilterIndex, adding utils.NOT_AVAILABLE to list of fields queried // executes a number of $(len(fields) + 1) queries to dataDB so the size of event influences the speed of return -func matchingItemIDsForEvent(ev map[string]interface{}, dm *DataManager, dbIdxKey string) (itemIDs utils.StringMap, err error) { +func matchingItemIDsForEvent(ev map[string]interface{}, fieldIDs []string, dm *DataManager, dbIdxKey string) (itemIDs utils.StringMap, err error) { + if len(fieldIDs) == 0 { + fieldIDs = make([]string, len(ev)) + i := 0 + for fldID := range ev { + fieldIDs[i] = fldID + i += 1 + } + } itemIDs = make(utils.StringMap) - for fldName, fieldValIf := range ev { + for _, fldName := range fieldIDs { + fieldValIf, has := ev[fldName] + if !has { + continue + } fldVal, canCast := utils.CastFieldIfToString(fieldValIf) if !canCast { return nil, fmt.Errorf("Cannot cast field: %s into string", fldName) diff --git a/engine/resources.go b/engine/resources.go index 92cafe492..301f23f07 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -433,7 +433,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) // matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call func (rS *ResourceService) matchingResourcesForEvent(tenant string, ev map[string]interface{}) (rs Resources, err error) { matchingResources := make(map[string]*Resource) - rIDs, err := matchingItemIDsForEvent(ev, rS.dm, utils.ResourceProfilesStringIndex+tenant) + rIDs, err := matchingItemIDsForEvent(ev, nil, rS.dm, utils.ResourceProfilesStringIndex+tenant) if err != nil { return nil, err } diff --git a/engine/stats.go b/engine/stats.go index dbe25b9ea..67b9697e7 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -139,7 +139,7 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { // matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues, err error) { matchingSQs := make(map[string]*StatQueue) - sqIDs, err := matchingItemIDsForEvent(ev.Event, sS.dm, utils.StatQueuesStringIndex+ev.Tenant) + sqIDs, err := matchingItemIDsForEvent(ev.Event, nil, sS.dm, utils.StatQueuesStringIndex+ev.Tenant) if err != nil { return nil, err } diff --git a/engine/thresholds.go b/engine/thresholds.go index b749632d2..6d9fdc057 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -151,25 +151,25 @@ func (ts Thresholds) Sort() { sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight }) } -func NewThresholdService(dm *DataManager, filteredFields []string, storeInterval time.Duration, +func NewThresholdService(dm *DataManager, indexedFields []string, storeInterval time.Duration, filterS *FilterS) (tS *ThresholdService, err error) { return &ThresholdService{dm: dm, - filteredFields: filteredFields, - storeInterval: storeInterval, - filterS: filterS, - stopBackup: make(chan struct{}), - storedTdIDs: make(utils.StringMap)}, nil + indexedFields: indexedFields, + storeInterval: storeInterval, + filterS: filterS, + stopBackup: make(chan struct{}), + storedTdIDs: make(utils.StringMap)}, nil } // ThresholdService manages Threshold execution and storing them to dataDB type ThresholdService struct { - dm *DataManager - filteredFields []string // fields considered when searching for matching thresholds - storeInterval time.Duration - filterS *FilterS - stopBackup chan struct{} - storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool - stMux sync.RWMutex // protects storedTdIDs + dm *DataManager + indexedFields []string // fields considered when searching for matching thresholds + storeInterval time.Duration + filterS *FilterS + stopBackup chan struct{} + storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool + stMux sync.RWMutex // protects storedTdIDs } // Called to start the service @@ -254,7 +254,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { // matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts Thresholds, err error) { matchingTs := make(map[string]*Threshold) - tIDs, err := matchingItemIDsForEvent(ev.Event, tS.dm, utils.ThresholdStringIndex+ev.Tenant) + tIDs, err := matchingItemIDsForEvent(ev.Event, tS.indexedFields, tS.dm, utils.ThresholdStringIndex+ev.Tenant) if err != nil { return nil, err } From 7316ec43dade692e2290e058afc30acaf8bd057a Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Oct 2017 13:59:58 +0200 Subject: [PATCH 02/10] Config indexed_fields for stats and resources --- config/config_defaults.go | 2 ++ config/config_json_test.go | 2 ++ config/config_test.go | 2 ++ config/libconfig_json.go | 2 ++ config/reslimitercfg.go | 7 +++++++ config/statscfg.go | 7 +++++++ 6 files changed, 22 insertions(+) diff --git a/config/config_defaults.go b/config/config_defaults.go index ce276e536..7e9770909 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -423,6 +423,7 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts ResourceLimiter service: . "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur> "thresholds_conns": [], // address where to reach the thresholds service, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234> + "indexed_fields": [], // query indexes based on these fields for faster processing }, @@ -430,6 +431,7 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts Stat service: . "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur> "thresholds_conns": [], // address where to reach the thresholds service, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234> + "indexed_fields": [], // query indexes based on these fields for faster processing }, diff --git a/config/config_json_test.go b/config/config_json_test.go index be2770f07..eab834c6b 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -692,6 +692,7 @@ func TestDfResourceLimiterSJsonCfg(t *testing.T) { Enabled: utils.BoolPointer(false), Thresholds_conns: &[]*HaPoolJsonCfg{}, Store_interval: utils.StringPointer(""), + Indexed_fields: utils.StringSlicePointer([]string{}), } if cfg, err := dfCgrJsonCfg.ResourceSJsonCfg(); err != nil { t.Error(err) @@ -705,6 +706,7 @@ func TestDfStatServiceJsonCfg(t *testing.T) { Enabled: utils.BoolPointer(false), Store_interval: utils.StringPointer(""), Thresholds_conns: &[]*HaPoolJsonCfg{}, + Indexed_fields: utils.StringSlicePointer([]string{}), } if cfg, err := dfCgrJsonCfg.StatSJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index ce1480c83..4f189524a 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -579,6 +579,7 @@ func TestCgrCfgJSONDefaultsResLimCfg(t *testing.T) { Enabled: false, ThresholdSConns: []*HaPoolConfig{}, StoreInterval: 0, + IndexedFields: []string{}, } if !reflect.DeepEqual(cgrCfg.resourceSCfg, eResLiCfg) { t.Errorf("expecting: %s, received: %s", utils.ToJSON(eResLiCfg), utils.ToJSON(cgrCfg.resourceSCfg)) @@ -591,6 +592,7 @@ func TestCgrCfgJSONDefaultStatsCfg(t *testing.T) { Enabled: false, StoreInterval: 0, ThresholdSConns: []*HaPoolConfig{}, + IndexedFields: []string{}, } if !reflect.DeepEqual(cgrCfg.statsCfg, eStatsCfg) { t.Errorf("received: %+v, expecting: %+v", cgrCfg.statsCfg, eStatsCfg) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index fbfe82f14..ceec45116 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -391,6 +391,7 @@ type ResourceSJsonCfg struct { Enabled *bool Thresholds_conns *[]*HaPoolJsonCfg Store_interval *string + Indexed_fields *[]string } // Stat service config section @@ -398,6 +399,7 @@ type StatServJsonCfg struct { Enabled *bool Store_interval *string Thresholds_conns *[]*HaPoolJsonCfg + Indexed_fields *[]string } // Threshold service config section diff --git a/config/reslimitercfg.go b/config/reslimitercfg.go index b4bf57a81..09b988903 100644 --- a/config/reslimitercfg.go +++ b/config/reslimitercfg.go @@ -28,6 +28,7 @@ type ResourceSConfig struct { Enabled bool ThresholdSConns []*HaPoolConfig // Connections towards StatS StoreInterval time.Duration // Dump regularly from cache into dataDB + IndexedFields []string } func (rlcfg *ResourceSConfig) loadFromJsonCfg(jsnCfg *ResourceSJsonCfg) (err error) { @@ -49,5 +50,11 @@ func (rlcfg *ResourceSConfig) loadFromJsonCfg(jsnCfg *ResourceSJsonCfg) (err err return } } + if jsnCfg.Indexed_fields != nil { + rlcfg.IndexedFields = make([]string, len(*jsnCfg.Indexed_fields)) + for i, fID := range *jsnCfg.Indexed_fields { + rlcfg.IndexedFields[i] = fID + } + } return nil } diff --git a/config/statscfg.go b/config/statscfg.go index a75c3c92f..4c078188b 100644 --- a/config/statscfg.go +++ b/config/statscfg.go @@ -28,6 +28,7 @@ type StatSCfg struct { Enabled bool StoreInterval time.Duration // Dump regularly from cache into dataDB ThresholdSConns []*HaPoolConfig + IndexedFields []string } func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) { @@ -49,5 +50,11 @@ func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) { st.ThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCfg.Indexed_fields != nil { + st.IndexedFields = make([]string, len(*jsnCfg.Indexed_fields)) + for i, fID := range *jsnCfg.Indexed_fields { + st.IndexedFields[i] = fID + } + } return nil } From ff51f819425ebcb3d1e963f6716996205015c4f8 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Oct 2017 14:05:08 +0200 Subject: [PATCH 03/10] StatS using IndexedFields --- cmd/cgr-engine/cgr-engine.go | 2 +- engine/stats.go | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 869775136..ec8e2b116 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -577,7 +577,7 @@ func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.R return } } - sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn, filterS) + sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn, filterS, cfg.StatSCfg().IndexedFields) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) exitChan <- true diff --git a/engine/stats.go b/engine/stats.go index 67b9697e7..e7fd8e3ae 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -34,7 +34,7 @@ import ( // NewStatService initializes a StatService func NewStatService(dm *DataManager, storeInterval time.Duration, - thdS rpcclient.RpcClientConnection, filterS *FilterS) (ss *StatService, err error) { + thdS rpcclient.RpcClientConnection, filterS *FilterS, indexedFields []string) (ss *StatService, err error) { if thdS != nil && reflect.ValueOf(thdS).IsNil() { // fix nil value in interface thdS = nil } @@ -43,6 +43,7 @@ func NewStatService(dm *DataManager, storeInterval time.Duration, storeInterval: storeInterval, thdS: thdS, filterS: filterS, + indexedFields: indexedFields, storedStatQueues: make(utils.StringMap), stopBackup: make(chan struct{})}, nil } @@ -53,6 +54,7 @@ type StatService struct { storeInterval time.Duration thdS rpcclient.RpcClientConnection // rpc connection towards ThresholdS filterS *FilterS + indexedFields []string stopBackup chan struct{} storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool ssqMux sync.RWMutex // protects storedStatQueues @@ -139,7 +141,7 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { // matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues, err error) { matchingSQs := make(map[string]*StatQueue) - sqIDs, err := matchingItemIDsForEvent(ev.Event, nil, sS.dm, utils.StatQueuesStringIndex+ev.Tenant) + sqIDs, err := matchingItemIDsForEvent(ev.Event, sS.indexedFields, sS.dm, utils.StatQueuesStringIndex+ev.Tenant) if err != nil { return nil, err } From 067072fcd400569cdf004d15270f30b129f9b1a3 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Oct 2017 15:07:56 +0200 Subject: [PATCH 04/10] Threshold.Async implementation --- engine/thresholds.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/engine/thresholds.go b/engine/thresholds.go index 6d9fdc057..71a1f91ce 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -135,9 +135,19 @@ func (t *Threshold) ProcessEvent(ev *ThresholdEvent, dm *DataManager) (err error if acntID != "" { at.accountIDs = utils.NewStringMap(acntID) } - if errExec := at.Execute(nil, nil); errExec != nil { - utils.Logger.Warning(fmt.Sprintf(" failed executing actions: %s, error: %s", actionSetID, errExec.Error())) - err = utils.ErrPartiallyExecuted + if t.tPrfl.Async { + + go func() { + if errExec := at.Execute(nil, nil); errExec != nil { + utils.Logger.Warning(fmt.Sprintf(" failed executing actions: %s, error: %s", actionSetID, errExec.Error())) + } + }() + + } else { + if errExec := at.Execute(nil, nil); errExec != nil { + utils.Logger.Warning(fmt.Sprintf(" failed executing actions: %s, error: %s", actionSetID, errExec.Error())) + err = utils.ErrPartiallyExecuted + } } } return From e01ab48a8f0b3c7eba0ece5b830e3cbba77ec047 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Oct 2017 16:11:02 +0200 Subject: [PATCH 05/10] Better errors for .csv parsing --- engine/storage_csv.go | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/engine/storage_csv.go b/engine/storage_csv.go index d84438f5c..f7b55867e 100755 --- a/engine/storage_csv.go +++ b/engine/storage_csv.go @@ -92,7 +92,7 @@ func (csvs *CSVStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, e var tpTimings TpTimings for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in timings csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.timingsFn, err.Error()) return nil, err } if tpTiming, err := csvLoad(TpTiming{}, record); err != nil { @@ -120,7 +120,7 @@ func (csvs *CSVStorage) GetTPDestinations(tpid, id string) ([]*utils.TPDestinati var tpDests TpDestinations for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in destinations csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.destinationsFn, err.Error()) return nil, err } if tpDest, err := csvLoad(TpDestination{}, record); err != nil { @@ -148,7 +148,7 @@ func (csvs *CSVStorage) GetTPRates(tpid, id string) ([]*utils.TPRate, error) { var tpRates TpRates for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in rates csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.ratesFn, err.Error()) return nil, err } if tpRate, err := csvLoad(TpRate{}, record); err != nil { @@ -180,7 +180,7 @@ func (csvs *CSVStorage) GetTPDestinationRates(tpid, id string, p *utils.Paginato var tpDestinationRates TpDestinationRates for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in destinationrates csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.destinationratesFn, err.Error()) return nil, err } if tpRate, err := csvLoad(TpDestinationRate{}, record); err != nil { @@ -212,7 +212,7 @@ func (csvs *CSVStorage) GetTPRatingPlans(tpid, id string, p *utils.Paginator) ([ var tpRatingPlans TpRatingPlans for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in rating plans csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.destinationratetimingsFn, err.Error()) return nil, err } if tpRate, err := csvLoad(TpRatingPlan{}, record); err != nil { @@ -244,7 +244,7 @@ func (csvs *CSVStorage) GetTPRatingProfiles(filter *utils.TPRatingProfile) ([]*u var tpRatingProfiles TpRatingProfiles for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line rating profiles csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.ratingprofilesFn, err.Error()) return nil, err } if tpRate, err := csvLoad(TpRatingProfile{}, record); err != nil { @@ -279,7 +279,7 @@ func (csvs *CSVStorage) GetTPSharedGroups(tpid, id string) ([]*utils.TPSharedGro var tpSharedGroups TpSharedGroups for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in shared groups csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.sharedgroupsFn, err.Error()) return nil, err } if tpRate, err := csvLoad(TpSharedGroup{}, record); err != nil { @@ -312,7 +312,7 @@ func (csvs *CSVStorage) GetTPLCRs(filter *utils.TPLcrRules) ([]*utils.TPLcrRules for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if tpRate, err := csvLoad(TpLcrRule{}, record); err != nil { if err != nil { - log.Print("bad line in lcr rules csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.lcrFn, err.Error()) return nil, err } return nil, err @@ -344,7 +344,7 @@ func (csvs *CSVStorage) GetTPActions(tpid, id string) ([]*utils.TPActions, error var tpActions TpActions for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in actions csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.actionsFn, err.Error()) return nil, err } if tpAction, err := csvLoad(TpAction{}, record); err != nil { @@ -404,7 +404,7 @@ func (csvs *CSVStorage) GetTPActionTriggers(tpid, id string) ([]*utils.TPActionT var tpActionTriggers TpActionTriggers for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in action triggers csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.actiontriggersFn, err.Error()) return nil, err } if tpAt, err := csvLoad(TpActionTrigger{}, record); err != nil { @@ -436,7 +436,7 @@ func (csvs *CSVStorage) GetTPAccountActions(filter *utils.TPAccountActions) ([]* var tpAccountActions TpAccountActions for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in account actions csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.accountactionsFn, err.Error()) return nil, err } if tpAa, err := csvLoad(TpAccountAction{}, record); err != nil { @@ -471,7 +471,7 @@ func (csvs *CSVStorage) GetTPDerivedChargers(filter *utils.TPDerivedChargers) ([ var tpDerivedChargers TpDerivedChargers for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in derived chargers csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.derivedChargersFn, err.Error()) return nil, err } if tp, err := csvLoad(TpDerivedCharger{}, record); err != nil { @@ -506,7 +506,7 @@ func (csvs *CSVStorage) GetTPCdrStats(tpid, id string) ([]*utils.TPCdrStats, err var tpCdrStats TpCdrStats for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in cdr stats csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.cdrStatsFn, err.Error()) return nil, err } if tpCdrStat, err := csvLoad(TpCdrstat{}, record); err != nil { @@ -538,7 +538,7 @@ func (csvs *CSVStorage) GetTPUsers(filter *utils.TPUsers) ([]*utils.TPUsers, err var tpUsers TpUsers for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in users csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.usersFn, err.Error()) return nil, err } if tpUser, err := csvLoad(TpUser{}, record); err != nil { @@ -572,7 +572,7 @@ func (csvs *CSVStorage) GetTPAliases(filter *utils.TPAliases) ([]*utils.TPAliase var tpAliases TpAliases for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in aliases csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.aliasesFn, err.Error()) return nil, err } if tpAlias, err := csvLoad(TpAlias{}, record); err != nil { @@ -606,7 +606,7 @@ func (csvs *CSVStorage) GetTPResources(tpid, id string) ([]*utils.TPResource, er var tpResLimits TpResources for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in resourceprofiles csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.resProfilesFn, err.Error()) return nil, err } if tpResLimit, err := csvLoad(TpResource{}, record); err != nil { @@ -634,7 +634,7 @@ func (csvs *CSVStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) { var tpStats TpStatsS for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in TPStats csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.statsFn, err.Error()) return nil, err } if tpstats, err := csvLoad(TpStats{}, record); err != nil { @@ -662,7 +662,7 @@ func (csvs *CSVStorage) GetTPThresholds(tpid, id string) ([]*utils.TPThreshold, var tpThreshold TpThresholdS for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in TPThreshold csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.thresholdsFn, err.Error()) return nil, err } if thresholdCfg, err := csvLoad(TpThreshold{}, record); err != nil { @@ -690,7 +690,7 @@ func (csvs *CSVStorage) GetTPFilters(tpid, id string) ([]*utils.TPFilter, error) var tpFilter TpFilterS for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { if err != nil { - log.Print("bad line in TPFilter csv: ", err) + log.Printf("bad line in %s, %s\n", csvs.filterFn, err.Error()) return nil, err } if filterCfg, err := csvLoad(TpFilter{}, record); err != nil { From 404c366f5758b5cfb22a683a4e96dab6b9751a8c Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Oct 2017 16:14:11 +0200 Subject: [PATCH 06/10] ResourceS with indexedFields for faster queries on string indexes --- cmd/cgr-engine/cgr-engine.go | 2 +- engine/resources.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ec8e2b116..a060deb28 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -541,7 +541,7 @@ func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient. return } } - rS, err := engine.NewResourceService(dm, cfg.ResourceSCfg().StoreInterval, thdSConn, filterS) + rS, err := engine.NewResourceService(dm, cfg.ResourceSCfg().StoreInterval, thdSConn, filterS, cfg.ResourceSCfg().IndexedFields) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) exitChan <- true diff --git a/engine/resources.go b/engine/resources.go index 301f23f07..b4450d26c 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -279,7 +279,7 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage // Pas the config as a whole so we can ask access concurrently func NewResourceService(dm *DataManager, storeInterval time.Duration, - thdS rpcclient.RpcClientConnection, filterS *FilterS) (*ResourceService, error) { + thdS rpcclient.RpcClientConnection, filterS *FilterS, indexedFields []string) (*ResourceService, error) { if thdS != nil && reflect.ValueOf(thdS).IsNil() { thdS = nil } @@ -296,6 +296,7 @@ type ResourceService struct { dm *DataManager // So we can load the data in cache and index it thdS rpcclient.RpcClientConnection // allows applying filters based on stats filterS *FilterS + indexedFields []string // speed up query on indexes lcEventResources map[string][]*utils.TenantID // cache recording resources for events in alocation phase lcERMux sync.RWMutex // protects the lcEventResources storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool @@ -433,7 +434,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) // matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call func (rS *ResourceService) matchingResourcesForEvent(tenant string, ev map[string]interface{}) (rs Resources, err error) { matchingResources := make(map[string]*Resource) - rIDs, err := matchingItemIDsForEvent(ev, nil, rS.dm, utils.ResourceProfilesStringIndex+tenant) + rIDs, err := matchingItemIDsForEvent(ev, rS.indexedFields, rS.dm, utils.ResourceProfilesStringIndex+tenant) if err != nil { return nil, err } From 596b24fb4d7cebbeaa853945e18b0683cd622332 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Oct 2017 16:25:28 +0200 Subject: [PATCH 07/10] Redis not to automatically remove previous filter indexes --- engine/storage_redis.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 37689966d..8c5584c80 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1281,9 +1281,6 @@ func (rs *RedisStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]ma } func (rs *RedisStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) { - if err = rs.Cmd("DEL", dbKey).Err; err != nil { // DELETE before set - return - } mp := make(map[string]string) for fldName, fldValMp := range indexes { for fldVal, strMp := range fldValMp { From c66269898fb95701975a1a9b6af6f330266798f4 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Oct 2017 18:32:39 +0200 Subject: [PATCH 08/10] Sample Thresholds.csv to be used with CDR events --- apier/v1/thresholds_it_test.go | 16 ++++++++++++++++ data/tariffplans/tutorial/Filters.csv | 1 + data/tariffplans/tutorial/Thresholds.csv | 1 + general_tests/tut_smgeneric_it_test.go | 2 +- general_tests/tutorial_it_test.go | 2 +- utils/consts.go | 1 + 6 files changed, 21 insertions(+), 2 deletions(-) diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index 5d79d90e1..fc3da6237 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -103,6 +103,22 @@ var tEvs = []*engine.ThresholdEvent{ utils.ACCOUNT: "1002", utils.ResourceID: "RES_GRP_1", utils.USAGE: 10.0}}, + &engine.ThresholdEvent{ // hitting THD_RES_1 + Tenant: "cgrates.org", + ID: "event6", + Event: map[string]interface{}{ + utils.EventType: utils.ResourceUpdate, + utils.ACCOUNT: "1002", + utils.ResourceID: "RES_GRP_1", + utils.USAGE: 10.0}}, + &engine.ThresholdEvent{ // hitting THD_RES_1 + Tenant: "cgrates.org", + ID: "event6", + Event: map[string]interface{}{ + utils.EventType: utils.ResourceUpdate, + utils.ACCOUNT: "1002", + utils.ResourceID: "RES_GRP_1", + utils.USAGE: 10.0}}, } var sTestsThresholdSV1 = []func(t *testing.T){ diff --git a/data/tariffplans/tutorial/Filters.csv b/data/tariffplans/tutorial/Filters.csv index 760ee326b..74a50ad02 100644 --- a/data/tariffplans/tutorial/Filters.csv +++ b/data/tariffplans/tutorial/Filters.csv @@ -25,3 +25,4 @@ cgrates.org,FLTR_RES_1,*gte,Usage,10.0, cgrates.org,FLTR_DST_FS,*destinations,Destination,DST_FS,2014-07-29T15:00:00Z cgrates.org,FLTR_RES_GR3,*string,Account,3001,2014-07-29T15:00:00Z cgrates.org,FLTR_CDRS,*cdr_stats,,CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20,2014-07-29T15:00:00Z +cgrates.org,FLTR_CDR_UPDATE,*string,EventType,CDR,2014-07-29T15:00:00Z diff --git a/data/tariffplans/tutorial/Thresholds.csv b/data/tariffplans/tutorial/Thresholds.csv index 3942d1cb0..50ef8c1e6 100644 --- a/data/tariffplans/tutorial/Thresholds.csv +++ b/data/tariffplans/tutorial/Thresholds.csv @@ -5,3 +5,4 @@ cgrates.org,THD_STATS_1,FLTR_STATS_1,2014-07-29T15:00:00Z,true,1,1s,false,10,LOG cgrates.org,THD_STATS_2,FLTR_STATS_2,2014-07-29T15:00:00Z,true,1,1s,false,10,DISABLE_AND_LOG,false cgrates.org,THD_STATS_3,FLTR_STATS_3,2014-07-29T15:00:00Z,false,1,1s,false,10,TOPUP_100SMS_DE_MOBILE,false cgrates.org,THD_RES_1,FLTR_RES_1,2014-07-29T15:00:00Z,true,1,1s,false,10,LOG_WARNING,false +cgrates.org,THD_CDRS_1,FLTR_ACNT_dan;FLTR_CDR_UPDATE,2014-07-29T15:00:00Z,false,1,1s,false,10,LOG_WARNING,false diff --git a/general_tests/tut_smgeneric_it_test.go b/general_tests/tut_smgeneric_it_test.go index a3e881561..5c0a8131d 100644 --- a/general_tests/tut_smgeneric_it_test.go +++ b/general_tests/tut_smgeneric_it_test.go @@ -101,7 +101,7 @@ func TestTutSMGCacheStats(t *testing.T) { expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9, Actions: 9, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, LcrProfiles: 5, CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3, StatQueues: 0, - StatQueueProfiles: 0, Thresholds: 6, ThresholdProfiles: 6, Filters: 13} + StatQueueProfiles: 0, Thresholds: 7, ThresholdProfiles: 7, Filters: 14} var args utils.AttrCacheStats if err := tutSMGRpc.Call("ApierV2.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV2.GetCacheStats: ", err.Error()) diff --git a/general_tests/tutorial_it_test.go b/general_tests/tutorial_it_test.go index 8b82431ff..9a11ded75 100644 --- a/general_tests/tutorial_it_test.go +++ b/general_tests/tutorial_it_test.go @@ -105,7 +105,7 @@ func TestTutITCacheStats(t *testing.T) { expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9, Actions: 9, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, LcrProfiles: 5, CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3, StatQueues: 0, - StatQueueProfiles: 0, Thresholds: 6, ThresholdProfiles: 6, Filters: 13} + StatQueueProfiles: 0, Thresholds: 7, ThresholdProfiles: 7, Filters: 14} var args utils.AttrCacheStats if err := tutLocalRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) diff --git a/utils/consts.go b/utils/consts.go index 35aaa2586..41021c648 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -480,6 +480,7 @@ const ( BalanceUpdate = "BalanceUpdate" StatUpdate = "StatUpdate" ResourceUpdate = "ResourceUpdate" + CDRUpdate = "CDR" ExpiryTime = "ExpiryTime" AllowNegative = "AllowNegative" Disabled = "Disabled" From 86804c5c2866d85c7a2b5e70007d2bc5575f4faa Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Oct 2017 18:40:17 +0200 Subject: [PATCH 09/10] Config option thresholds_conns in cdrs --- config/config.go | 8 ++++++++ config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/config_test.go | 6 ++++++ config/libconfig_json.go | 1 + utils/consts.go | 2 +- 6 files changed, 18 insertions(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index a4000d416..45dacd389 100755 --- a/config/config.go +++ b/config/config.go @@ -249,6 +249,7 @@ type CGRConfig struct { CDRSUserSConns []*HaPoolConfig // address where to reach the users service: <""|internal|x.y.z.y:1234> CDRSAliaseSConns []*HaPoolConfig // address where to reach the aliases service: <""|internal|x.y.z.y:1234> CDRSCDRStatSConns []*HaPoolConfig // address where to reach the cdrstats service. Empty to disable cdrstats gathering <""|internal|x.y.z.y:1234> + CDRSThresholdSConns []*HaPoolConfig // address where to reach the thresholds service CDRSStatSConns []*HaPoolConfig CDRSOnlineCDRExports []string // list of CDRE templates to use for real-time CDR exports CDRStatsEnabled bool // Enable CDR Stats service @@ -954,6 +955,13 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) { self.CDRSCDRStatSConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCdrsCfg.Thresholds_conns != nil { + self.CDRSThresholdSConns = make([]*HaPoolConfig, len(*jsnCdrsCfg.Thresholds_conns)) + for idx, jsnHaCfg := range *jsnCdrsCfg.Thresholds_conns { + self.CDRSThresholdSConns[idx] = NewDfltHaPoolConfig() + self.CDRSThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } if jsnCdrsCfg.Stats_conns != nil { self.CDRSStatSConns = make([]*HaPoolConfig, len(*jsnCdrsCfg.Stats_conns)) for idx, jsnHaCfg := range *jsnCdrsCfg.Stats_conns { diff --git a/config/config_defaults.go b/config/config_defaults.go index 7e9770909..d7ed88158 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -155,6 +155,7 @@ const CGRATES_CFG_JSON = ` "users_conns": [], // address where to reach the user service, empty to disable user profile functionality: <""|*internal|x.y.z.y:1234> "aliases_conns": [], // address where to reach the aliases service, empty to disable aliases functionality: <""|*internal|x.y.z.y:1234> "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable cdrstats functionality: <""|*internal|x.y.z.y:1234> + "thresholds_conns": [], // address where to reach the thresholds service, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234> "stats_conns": [], // address where to reach the stat service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> "online_cdr_exports":[], // list of CDRE profiles to use for real-time CDR exports }, diff --git a/config/config_json_test.go b/config/config_json_test.go index eab834c6b..8956c0077 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -231,6 +231,7 @@ func TestDfCdrsJsonCfg(t *testing.T) { Users_conns: &[]*HaPoolJsonCfg{}, Aliases_conns: &[]*HaPoolJsonCfg{}, Cdrstats_conns: &[]*HaPoolJsonCfg{}, + Thresholds_conns: &[]*HaPoolJsonCfg{}, Stats_conns: &[]*HaPoolJsonCfg{}, Online_cdr_exports: &[]string{}, } diff --git a/config/config_test.go b/config/config_test.go index 4f189524a..222571e59 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -341,6 +341,12 @@ func TestCgrCfgJSONDefaultsCDRS(t *testing.T) { if !reflect.DeepEqual(cgrCfg.CDRSCDRStatSConns, eHaPoolCfg) { t.Error(cgrCfg.CDRSCDRStatSConns) } + if !reflect.DeepEqual(cgrCfg.CDRSThresholdSConns, eHaPoolCfg) { + t.Error(cgrCfg.CDRSThresholdSConns) + } + if !reflect.DeepEqual(cgrCfg.CDRSStatSConns, eHaPoolCfg) { + t.Error(cgrCfg.CDRSStatSConns) + } if cgrCfg.CDRSOnlineCDRExports != nil { t.Error(cgrCfg.CDRSOnlineCDRExports) } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index ceec45116..5e464ecfc 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -108,6 +108,7 @@ type CdrsJsonCfg struct { Users_conns *[]*HaPoolJsonCfg Aliases_conns *[]*HaPoolJsonCfg Cdrstats_conns *[]*HaPoolJsonCfg + Thresholds_conns *[]*HaPoolJsonCfg Stats_conns *[]*HaPoolJsonCfg Online_cdr_exports *[]string } diff --git a/utils/consts.go b/utils/consts.go index 41021c648..aa0c6871c 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -480,7 +480,7 @@ const ( BalanceUpdate = "BalanceUpdate" StatUpdate = "StatUpdate" ResourceUpdate = "ResourceUpdate" - CDRUpdate = "CDR" + CDR = "CDR" ExpiryTime = "ExpiryTime" AllowNegative = "AllowNegative" Disabled = "Disabled" From f375a52e537607d0a6661d88554fb8ae35f30e29 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 27 Oct 2017 19:44:44 +0200 Subject: [PATCH 10/10] CDRS with thresholds for *raw CDRs --- apier/v1/thresholds_it_test.go | 47 +++++++++++++++++++++++- cmd/cgr-engine/cgr-engine.go | 17 +++++++-- data/tariffplans/tutorial/Filters.csv | 2 +- data/tariffplans/tutorial/Thresholds.csv | 2 +- engine/cdrs.go | 20 +++++++++- 5 files changed, 79 insertions(+), 9 deletions(-) diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index fc3da6237..da9ef6b8f 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -119,6 +119,33 @@ var tEvs = []*engine.ThresholdEvent{ utils.ACCOUNT: "1002", utils.ResourceID: "RES_GRP_1", utils.USAGE: 10.0}}, + &engine.ThresholdEvent{ // hitting THD_CDRS_1 + Tenant: "cgrates.org", + ID: "cdrev1", + Event: map[string]interface{}{ + utils.EventType: utils.CDR, + "field_extr1": "val_extr1", + "fieldextr2": "valextr2", + utils.CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), + utils.MEDI_RUNID: utils.MetaRaw, + utils.ORDERID: 123, + utils.CDRHOST: "192.168.1.1", + utils.CDRSOURCE: utils.UNIT_TEST, + utils.ACCID: "dsafdsaf", + utils.TOR: utils.VOICE, + utils.REQTYPE: utils.META_RATED, + utils.DIRECTION: "*out", + utils.TENANT: "cgrates.org", + utils.CATEGORY: "call", + utils.ACCOUNT: "1007", + utils.SUBJECT: "1007", + utils.DESTINATION: "+4986517174963", + utils.SETUP_TIME: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), + utils.PDD: time.Duration(0) * time.Second, + utils.ANSWER_TIME: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + utils.USAGE: time.Duration(10) * time.Second, + utils.SUPPLIER: "SUPPL1", + utils.COST: -1.0}}, } var sTestsThresholdSV1 = []func(t *testing.T){ @@ -206,7 +233,7 @@ func testV1TSFromFolder(t *testing.T) { func testV1TSGetThresholds(t *testing.T) { var tIDs []string - expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_ACNT_EXPIRED", "THD_STATS_3"} + expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_ACNT_EXPIRED", "THD_STATS_3", "THD_CDRS_1"} if err := tSv1Rpc.Call("ThresholdSV1.GetThresholdIDs", "cgrates.org", &tIDs); err != nil { t.Error(err) } else if len(expectedIDs) != len(tIDs) { @@ -260,6 +287,24 @@ func testV1TSProcessEvent(t *testing.T) { } else if hits != eHits { t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } + eHits = 1 + if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[6], &hits); err != nil { + t.Error(err) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) + } + eHits = 1 + if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[7], &hits); err != nil { + t.Error(err) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) + } + eHits = 1 + if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[8], &hits); err != nil { + t.Error(err) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) + } } func testV1TSGetThresholdsAfterProcess(t *testing.T) { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a060deb28..d2ccb7981 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -390,10 +390,10 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, dm *engine.DataManager, internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, - internalCdrStatSChan, internalStatSChan chan rpcclient.RpcClientConnection, + internalCdrStatSChan, internalThresholdSChan, internalStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS CDRS service.") - var ralConn, pubSubConn, usersConn, aliasesConn, cdrstatsConn, statsConn *rpcclient.RpcClientPool + var ralConn, pubSubConn, usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn *rpcclient.RpcClientPool if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl) @@ -439,6 +439,15 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, return } } + if len(cfg.CDRSThresholdSConns) != 0 { // Stats connection init + thresholdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.CDRSThresholdSConns, internalThresholdSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS: %s", err.Error())) + exitChan <- true + return + } + } if len(cfg.CDRSStatSConns) != 0 { // Stats connection init statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.CDRSStatSConns, internalStatSChan, cfg.InternalTtl) @@ -449,7 +458,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, } } cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, dm, ralConn, pubSubConn, - usersConn, aliasesConn, cdrstatsConn, statsConn) + usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn) cdrServer.SetTimeToLive(cfg.ResponseCacheTTL, nil) utils.Logger.Info("Registering CDRS HTTP Handlers.") cdrServer.RegisterHandlersToServer(server) @@ -829,7 +838,7 @@ func main() { if cfg.CDRSEnabled { go startCDRS(internalCdrSChan, cdrDb, dm, internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, - internalCdrStatSChan, internalStatSChan, server, exitChan) + internalCdrStatSChan, internalThresholdSChan, internalStatSChan, server, exitChan) } // Start CDR Stats server diff --git a/data/tariffplans/tutorial/Filters.csv b/data/tariffplans/tutorial/Filters.csv index 74a50ad02..f0dc6010f 100644 --- a/data/tariffplans/tutorial/Filters.csv +++ b/data/tariffplans/tutorial/Filters.csv @@ -2,7 +2,7 @@ cgrates.org,FLTR_1,*string,Account,1001;1002,2014-07-29T15:00:00Z cgrates.org,FLTR_1,*string_prefix,Destination,10;20, cgrates.org,FLTR_1,*rsr_fields,,Subject(~^1.*1$);Destination(1002), -cgrates.org,FLTR_ACNT_dan,*string,Account,dan,2014-07-29T15:00:00Z +cgrates.org,FLTR_ACNT_1007,*string,Account,1007,2014-07-29T15:00:00Z cgrates.org,FLTR_DST_DE,*destinations,Destination,DST_DE,2014-07-29T15:00:00Z cgrates.org,FLTR_DST_NL,*destinations,Destination,DST_NL,2014-07-29T15:00:00Z cgrates.org,FLTR_ACNT_BALANCE_1,*string,Account,1001;1002,2014-07-29T15:00:00Z diff --git a/data/tariffplans/tutorial/Thresholds.csv b/data/tariffplans/tutorial/Thresholds.csv index 50ef8c1e6..d82fc7ec1 100644 --- a/data/tariffplans/tutorial/Thresholds.csv +++ b/data/tariffplans/tutorial/Thresholds.csv @@ -5,4 +5,4 @@ cgrates.org,THD_STATS_1,FLTR_STATS_1,2014-07-29T15:00:00Z,true,1,1s,false,10,LOG cgrates.org,THD_STATS_2,FLTR_STATS_2,2014-07-29T15:00:00Z,true,1,1s,false,10,DISABLE_AND_LOG,false cgrates.org,THD_STATS_3,FLTR_STATS_3,2014-07-29T15:00:00Z,false,1,1s,false,10,TOPUP_100SMS_DE_MOBILE,false cgrates.org,THD_RES_1,FLTR_RES_1,2014-07-29T15:00:00Z,true,1,1s,false,10,LOG_WARNING,false -cgrates.org,THD_CDRS_1,FLTR_ACNT_dan;FLTR_CDR_UPDATE,2014-07-29T15:00:00Z,false,1,1s,false,10,LOG_WARNING,false +cgrates.org,THD_CDRS_1,FLTR_ACNT_1007;FLTR_CDR_UPDATE,2014-07-29T15:00:00Z,false,1,1s,false,10,LOG_WARNING,false diff --git a/engine/cdrs.go b/engine/cdrs.go index e83c0064c..953160bee 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -70,7 +70,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub, users, - aliases, cdrstats, stats rpcclient.RpcClientConnection) (*CdrServer, error) { + aliases, cdrstats, thdS, stats rpcclient.RpcClientConnection) (*CdrServer, error) { if rater != nil && reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code rater = nil } @@ -86,12 +86,15 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r if cdrstats != nil && reflect.ValueOf(cdrstats).IsNil() { cdrstats = nil } + if thdS != nil && reflect.ValueOf(thdS).IsNil() { + thdS = nil + } if stats != nil && reflect.ValueOf(stats).IsNil() { stats = nil } return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm, rals: rater, pubsub: pubsub, users: users, aliases: aliases, - cdrstats: cdrstats, stats: stats, guard: guardian.Guardian, + cdrstats: cdrstats, stats: stats, thdS: thdS, guard: guardian.Guardian, httpPoster: utils.NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil } @@ -104,6 +107,7 @@ type CdrServer struct { users rpcclient.RpcClientConnection aliases rpcclient.RpcClientConnection cdrstats rpcclient.RpcClientConnection + thdS rpcclient.RpcClientConnection stats rpcclient.RpcClientConnection guard *guardian.GuardianLock responseCache *cache.ResponseCache @@ -193,6 +197,18 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) { return err // Error is propagated back and we don't continue processing the CDR if we cannot store it } } + if self.thdS != nil { + cdrIf, _ := cdr.AsMapStringIface() + ev := &ThresholdEvent{ + Tenant: cdr.Tenant, + ID: utils.GenUUID(), + Event: cdrIf} + var hits int + if err := self.thdS.Call(utils.ThresholdSv1ProcessEvent, ev, &hits); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" error: %s processing CDR event %+v with thdS.", err.Error(), ev)) + } + } // Attach raw CDR to stats if self.cdrstats != nil { // Send raw CDR to stats var out int