From ca43ea4b9cc0033c59a4bbe40ca449d60b96ee9a Mon Sep 17 00:00:00 2001 From: Edwardro22 Date: Thu, 3 Aug 2017 01:45:48 +0300 Subject: [PATCH] added thresholdCfg stordb|datadb methods and csv read --- apier/v1/apier.go | 2 + apier/v2/apier.go | 1 + cmd/cgr-loader/cgr-loader.go | 1 + .../mysql/create_tariffplan_tables.sql | 28 ++++ .../postgres/create_tariffplan_tables.sql | 27 ++++ data/tariffplans/testtp/ResourceLimits.csv | 2 +- data/tariffplans/testtp/Stats.csv | 2 +- data/tariffplans/testtp/Thresholds.csv | 2 + data/tariffplans/tutorial/ResourceLimits.csv | 2 +- data/tariffplans/tutorial/Stats.csv | 2 +- data/tariffplans/tutorial/Thresholds.csv | 2 + engine/libtest.go | 1 + engine/loader_csv_test.go | 45 +++++- engine/loader_it_test.go | 19 ++- engine/model_helpers.go | 145 +++++++++++++++++- engine/model_helpers_test.go | 97 +++++++++++- engine/models.go | 20 +++ engine/storage_csv.go | 40 ++++- engine/storage_interface.go | 5 + engine/storage_map.go | 57 ++++++- engine/storage_mongo_datadb.go | 52 +++++++ engine/storage_mongo_stordb.go | 31 ++++ engine/storage_redis.go | 49 ++++++ engine/storage_sql.go | 38 +++++ engine/tp_reader.go | 72 ++++++++- engine/tpimporter_csv.go | 13 ++ general_tests/acntacts_test.go | 4 +- general_tests/auth_test.go | 3 +- general_tests/costs1_test.go | 2 +- general_tests/datachrg1_test.go | 2 +- general_tests/ddazmbl1_test.go | 3 +- general_tests/ddazmbl2_test.go | 3 +- general_tests/ddazmbl3_test.go | 3 +- general_tests/smschrg1_test.go | 2 +- utils/apitpdata.go | 18 +++ utils/consts.go | 5 + 36 files changed, 767 insertions(+), 33 deletions(-) create mode 100644 data/tariffplans/testtp/Thresholds.csv create mode 100644 data/tariffplans/tutorial/Thresholds.csv mode change 100644 => 100755 general_tests/acntacts_test.go diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 2e8db02ef..a6e620ae7 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1145,6 +1145,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach cs.Aliases = cache.CountEntries(utils.ALIASES_PREFIX) cs.ReverseAliases = cache.CountEntries(utils.REVERSE_ALIASES_PREFIX) cs.ResourceLimits = cache.CountEntries(utils.ResourceLimitsPrefix) + if self.CdrStatsSrv != nil { var queueIds []string if err := self.CdrStatsSrv.Call("CDRStatsV1.GetQueueIds", 0, &queueIds); err != nil { @@ -1453,6 +1454,7 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, path.Join(attrs.FolderPath, utils.ALIASES_CSV), path.Join(attrs.FolderPath, utils.ResourceLimitsCsv), path.Join(attrs.FolderPath, utils.StatsCsv), + path.Join(attrs.FolderPath, utils.ThresholdsCsv), ), "", self.Config.DefaultTimezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) diff --git a/apier/v2/apier.go b/apier/v2/apier.go index d5f62801f..81da42ebf 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -141,6 +141,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, path.Join(attrs.FolderPath, utils.ALIASES_CSV), path.Join(attrs.FolderPath, utils.ResourceLimitsCsv), path.Join(attrs.FolderPath, utils.StatsCsv), + path.Join(attrs.FolderPath, utils.ThresholdsCsv), ), "", self.Config.DefaultTimezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 2fa25eb19..1e1587824 100755 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -268,6 +268,7 @@ func main() { path.Join(*dataPath, utils.ALIASES_CSV), path.Join(*dataPath, utils.ResourceLimitsCsv), path.Join(*dataPath, utils.StatsCsv), + path.Join(*dataPath, utils.ThresholdsCsv), ) } tpReader := engine.NewTpReader(dataDB, loader, *tpid, *timezone) diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql index 95bb2e682..278f0083e 100644 --- a/data/storage/mysql/create_tariffplan_tables.sql +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -443,6 +443,34 @@ CREATE TABLE tp_stats ( UNIQUE KEY `unique_tp_stats` (`tpid`, `tag`, `filter_type`, `filter_field_name`) ); +-- +-- Table structure for table `tp_threshold_cfgs` +-- + +DROP TABLE IF EXISTS tp_threshold_cfgs; +CREATE TABLE tp_threshold_cfgs ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` varchar(64) NOT NULL, + `tag` varchar(64) NOT NULL, + `filter_type` varchar(16) NOT NULL, + `filter_field_name` varchar(64) NOT NULL, + `filter_field_values` varchar(256) NOT NULL, + `activation_interval` varchar(64) NOT NULL, + `threshold_type` char(64) NOT NULL, + `threshold_value` DECIMAL(20,4) NOT NULL, + `min_items` int(11) NOT NULL, + `recurrent` BOOLEAN NOT NULL, + `min_sleep` varchar(16) NOT NULL, + `blocker` BOOLEAN NOT NULL, + `stored` BOOLEAN NOT NULL, + `weight` decimal(8,2) NOT NULL, + `action_ids` varchar(64) NOT NULL, + `created_at` TIMESTAMP, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`), + UNIQUE KEY `unique_tp_threshold_cfgs` (`tpid`, `tag`, `filter_type`, `filter_field_name`) +); + -- -- Table structure for table `versions` -- diff --git a/data/storage/postgres/create_tariffplan_tables.sql b/data/storage/postgres/create_tariffplan_tables.sql index 1c5add6dd..5bc43b9c0 100644 --- a/data/storage/postgres/create_tariffplan_tables.sql +++ b/data/storage/postgres/create_tariffplan_tables.sql @@ -438,6 +438,33 @@ CREATE TABLE tp_stats ( CREATE INDEX tp_stats_idx ON tp_stats (tpid); CREATE INDEX tp_stats_unique ON tp_stats ("tpid", "tag", "filter_type", "filter_field_name"); +-- +-- Table structure for table `tp_threshold_cfgs` +-- + +DROP TABLE IF EXISTS tp_threshold_cfgs; +CREATE TABLE tp_threshold_cfgs ( + "id" SERIAL PRIMARY KEY, + "tpid" varchar(64) NOT NULL, + "tag" varchar(64) NOT NULL, + "filter_type" varchar(16) NOT NULL, + "filter_field_name" varchar(64) NOT NULL, + "filter_field_values" varchar(256) NOT NULL, + "activation_interval" varchar(64) NOT NULL, + "threshold_type" VARCHAR(64) NOT NULL, + "threshold_value" NUMERIC(20,4) NOT NULL, + "min_items" INTEGER NOT NULL, + "recurrent" BOOLEAN NOT NULL, + "min_sleep" varchar(16) NOT NULL, + "blocker" BOOLEAN NOT NULL, + "stored" BOOLEAN NOT NULL, + "weight" decimal(8,2) NOT NULL, + "action_ids" varchar(64) NOT NULL, + "created_at" TIMESTAMP WITH TIME ZONE +); +CREATE INDEX tp_threshold_cfgs_idx ON tp_threshold_cfgs (tpid); +CREATE INDEX tp_threshold_cfgs_unique ON tp_threshold_cfgs ("tpid", "tag", "filter_type", "filter_field_name"); + -- -- Table structure for table `versions` diff --git a/data/tariffplans/testtp/ResourceLimits.csv b/data/tariffplans/testtp/ResourceLimits.csv index 8376db496..1e5d8d01e 100755 --- a/data/tariffplans/testtp/ResourceLimits.csv +++ b/data/tariffplans/testtp/ResourceLimits.csv @@ -1,4 +1,4 @@ -#Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,TTL,Limit,AllocationReply,Blocker,Stored,Weight,ActionTriggers +#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],TTL[5],Limit[6],AllocationMessage[7],Weight[8],Thresholds[9] ResGroup1,*string,Account,1001;1002,2014-07-29T15:00:00Z,1s,7,,true,true,20, ResGroup1,*string_prefix,Destination,10;20,,,,,,,, ResGroup1,*rsr_fields,,Subject(~^1.*1$);Destination(1002),,,,,,,, diff --git a/data/tariffplans/testtp/Stats.csv b/data/tariffplans/testtp/Stats.csv index 2013a7bc6..e3c87b60a 100755 --- a/data/tariffplans/testtp/Stats.csv +++ b/data/tariffplans/testtp/Stats.csv @@ -1,2 +1,2 @@ -#Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,QueueLength,TTL,Metrics,Blocker,Stored,Weight,Thresholds +#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],QueueLength[5],TTL[6],Metrics[7],Blocker[8],Stored[9],Weight[10],Thresholds[11] Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,true,20,THRESH1;THRESH2 diff --git a/data/tariffplans/testtp/Thresholds.csv b/data/tariffplans/testtp/Thresholds.csv new file mode 100644 index 000000000..44241c16f --- /dev/null +++ b/data/tariffplans/testtp/Thresholds.csv @@ -0,0 +1,2 @@ +#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],ThresholdType[5],ThresholdValue[6],MinItems[7],Recurrent[8],MinSleep[9],Blocker[10],Stored[11],Weight[12],ActionIDs[13] +Threshold1,*string,Account,1001;1002,2014-07-29T15:00:00Z,,1.2,10,true,1s,true,true,10,THRESH1;THRESH2 \ No newline at end of file diff --git a/data/tariffplans/tutorial/ResourceLimits.csv b/data/tariffplans/tutorial/ResourceLimits.csv index 8376db496..1e5d8d01e 100755 --- a/data/tariffplans/tutorial/ResourceLimits.csv +++ b/data/tariffplans/tutorial/ResourceLimits.csv @@ -1,4 +1,4 @@ -#Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,TTL,Limit,AllocationReply,Blocker,Stored,Weight,ActionTriggers +#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],TTL[5],Limit[6],AllocationMessage[7],Weight[8],Thresholds[9] ResGroup1,*string,Account,1001;1002,2014-07-29T15:00:00Z,1s,7,,true,true,20, ResGroup1,*string_prefix,Destination,10;20,,,,,,,, ResGroup1,*rsr_fields,,Subject(~^1.*1$);Destination(1002),,,,,,,, diff --git a/data/tariffplans/tutorial/Stats.csv b/data/tariffplans/tutorial/Stats.csv index 2013a7bc6..e3c87b60a 100755 --- a/data/tariffplans/tutorial/Stats.csv +++ b/data/tariffplans/tutorial/Stats.csv @@ -1,2 +1,2 @@ -#Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,QueueLength,TTL,Metrics,Blocker,Stored,Weight,Thresholds +#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],QueueLength[5],TTL[6],Metrics[7],Blocker[8],Stored[9],Weight[10],Thresholds[11] Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,true,20,THRESH1;THRESH2 diff --git a/data/tariffplans/tutorial/Thresholds.csv b/data/tariffplans/tutorial/Thresholds.csv new file mode 100644 index 000000000..44241c16f --- /dev/null +++ b/data/tariffplans/tutorial/Thresholds.csv @@ -0,0 +1,2 @@ +#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],ThresholdType[5],ThresholdValue[6],MinItems[7],Recurrent[8],MinSleep[9],Blocker[10],Stored[11],Weight[12],ActionIDs[13] +Threshold1,*string,Account,1001;1002,2014-07-29T15:00:00Z,,1.2,10,true,1s,true,true,10,THRESH1;THRESH2 \ No newline at end of file diff --git a/engine/libtest.go b/engine/libtest.go index a06b36525..c7a31129d 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -128,6 +128,7 @@ func LoadTariffPlanFromFolder(tpPath, timezone string, dataDB DataDB, disable_re path.Join(tpPath, utils.ALIASES_CSV), path.Join(tpPath, utils.ResourceLimitsCsv), path.Join(tpPath, utils.StatsCsv), + path.Join(tpPath, utils.ThresholdsCsv), ), "", timezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index e33944675..df4863fc6 100755 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -265,17 +265,20 @@ cgrates.org,mas,true,another,value,10 *out,cgrates.org,call,remo,remo,*any,*rating,Subject,remo,minu,10 *out,cgrates.org,call,remo,remo,*any,*rating,Account,remo,minu,10 ` - resLimits = ` -#Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,TTL,Limit,AllocationMessage,Weight,Thresholds +#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],TTL[5],Limit[6],AllocationMessage[7],Weight[8],Thresholds[9] ResGroup21,*string,HdrAccount,1001;1002,2014-07-29T15:00:00Z,1s,2,call,true,true,10, ResGroup21,*string_prefix,HdrDestination,10;20,,,,,,,, ResGroup21,*rsr_fields,,HdrSubject(~^1.*1$);HdrDestination(1002),,,,,,,, ResGroup22,*destinations,HdrDestination,DST_FS,2014-07-29T15:00:00Z,3600s,2,premium_call,true,true,10, ` stats = ` -#Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,QueueLength,TTL,Metrics,Blocker,Stored,Weight,Thresholds +#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],QueueLength[5],TTL[6],Metrics[7],Blocker[8],Stored[9],Weight[10],Thresholds[11] Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,true,20,THRESH1;THRESH2 +` + thresholds = ` +#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],ThresholdType[5],ThresholdValue[6],MinItems[7],Recurrent[8],MinSleep[9],Blocker[10],Stored[11],Weight[12],ActionIDs[13] +Threshold1,*string,Account,1001;1002,2014-07-29T15:00:00Z,,1.2,10,true,1s,true,true,10, ` ) @@ -283,7 +286,7 @@ var csvr *TpReader func init() { csvr = NewTpReader(dataStorage, NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats), testTPID, "") + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats, thresholds), testTPID, "") if err := csvr.LoadDestinations(); err != nil { log.Print("error in LoadDestinations:", err) } @@ -333,8 +336,14 @@ func init() { log.Print("error in LoadAliases:", err) } if err := csvr.LoadResourceLimits(); err != nil { + log.Print("error in LoadResourceLimits:", err) + } if err := csvr.LoadStats(); err != nil { + log.Print("error in LoadStats:", err) + } + if err := csvr.LoadThresholds(); err != nil { + log.Print("error in LoadThresholds:", err) } csvr.WriteToDatabase(false, false, false) cache.Flush() @@ -1449,3 +1458,31 @@ func TestLoadStats(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", eStats["Stats1"], csvr.stats["Stats1"]) } } + +func TestLoadThresholds(t *testing.T) { + eThresholds := map[string]*utils.TPThresholdCfg{ + "Threshold1": &utils.TPThresholdCfg{ + TPid: testTPID, + ID: "Threshold1", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, + }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + }, + ThresholdType: "", + ThresholdValue: 1.2, + MinItems: 10, + Recurrent: true, + MinSleep: "1s", + Blocker: true, + Stored: true, + Weight: 10, + }, + } + if len(csvr.thresholds) != len(eThresholds) { + t.Error("Failed to load thresholds: ", len(csvr.thresholds)) + } else if !reflect.DeepEqual(eThresholds["Threshold1"], csvr.thresholds["Threshold1"]) { + t.Errorf("Expecting: %+v, received: %+v", eThresholds["Threshold1"], csvr.thresholds["Threshold1"]) + } +} diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index e1d60fbb1..74620b49c 100755 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -106,6 +106,7 @@ func TestLoaderITLoadFromCSV(t *testing.T) { path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ALIASES_CSV), path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ResourceLimitsCsv), path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.StatsCsv), + path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ThresholdsCsv), ), "", "") if err = loader.LoadDestinations(); err != nil { @@ -156,6 +157,9 @@ func TestLoaderITLoadFromCSV(t *testing.T) { if err = loader.LoadStats(); err != nil { t.Error("Failed loading stats: ", err.Error()) } + if err = loader.LoadThresholds(); err != nil { + t.Error("Failed loading thresholds: ", err.Error()) + } if err := loader.WriteToDatabase(true, false, false); err != nil { t.Error("Could not write data into dataDb: ", err.Error()) } @@ -324,7 +328,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) { if err != nil { t.Error("Failed GetStatsQueue: ", err.Error()) } - sts, err := APItoTPStats(st, "UTC") + sts, err := APItoStats(st, "UTC") if err != nil { t.Error(err) } @@ -333,6 +337,19 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } } + for k, th := range loader.thresholds { + rcv, err := loader.dataStorage.GetThresholdCfg(k, true, utils.NonTransactional) + if err != nil { + t.Error("Failed GetThresholdCfg: ", err.Error()) + } + sts, err := APItoThresholdCfg(th, "UTC") + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(sts, rcv) { + t.Errorf("Expecting: %v, received: %v", sts, rcv) + } + } } // Imports data from csv files in tpScenario to storDb diff --git a/engine/model_helpers.go b/engine/model_helpers.go index bca41976c..e59be5f4e 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2054,7 +2054,7 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) { return } -func APItoTPStats(tpST *utils.TPStats, timezone string) (st *StatsQueue, err error) { +func APItoStats(tpST *utils.TPStats, timezone string) (st *StatsQueue, err error) { st = &StatsQueue{ ID: tpST.ID, QueueLength: tpST.QueueLength, @@ -2089,3 +2089,146 @@ func APItoTPStats(tpST *utils.TPStats, timezone string) (st *StatsQueue, err err } return st, nil } + +type TpThresholdCfgS []*TpThresholdCfg + +func (tps TpThresholdCfgS) AsTPThresholdCfg() (result []*utils.TPThresholdCfg) { + mst := make(map[string]*utils.TPThresholdCfg) + for _, tp := range tps { + th, found := mst[tp.Tag] + if !found { + th = &utils.TPThresholdCfg{ + TPid: tp.Tpid, + ID: tp.Tag, + Blocker: tp.Blocker, + Stored: tp.Stored, + Recurrent: tp.Recurrent, + MinSleep: tp.MinSleep, + } + } + if tp.ThresholdValue != 0 { + th.ThresholdValue = tp.ThresholdValue + } + if tp.ThresholdType != "" { + th.ThresholdType = tp.ThresholdType + } + if tp.ActionIDs != "" { + th.ActionIDs = append(th.ActionIDs, strings.Split(tp.ActionIDs, utils.INFIELD_SEP)...) + } + if tp.MinItems != 0 { + th.MinItems = tp.MinItems + } + if tp.Weight != 0 { + th.Weight = tp.Weight + } + if len(tp.ActivationInterval) != 0 { + th.ActivationInterval = new(utils.TPActivationInterval) + aiSplt := strings.Split(tp.ActivationInterval, utils.INFIELD_SEP) + if len(aiSplt) == 2 { + th.ActivationInterval.ActivationTime = aiSplt[0] + th.ActivationInterval.ExpiryTime = aiSplt[1] + } else if len(aiSplt) == 1 { + th.ActivationInterval.ActivationTime = aiSplt[0] + } + } + if tp.FilterType != "" { + th.Filters = append(th.Filters, &utils.TPRequestFilter{ + Type: tp.FilterType, + FieldName: tp.FilterFieldName, + Values: strings.Split(tp.FilterFieldValues, utils.INFIELD_SEP)}) + } + mst[tp.Tag] = th + } + result = make([]*utils.TPThresholdCfg, len(mst)) + i := 0 + for _, th := range mst { + result[i] = th + i++ + } + return +} + +func APItoModelTPThresholdCfg(th *utils.TPThresholdCfg) (mdls TpThresholdCfgS) { + if len(th.Filters) == 0 { + return + } + for i, fltr := range th.Filters { + mdl := &TpThresholdCfg{ + Tpid: th.TPid, + Tag: th.ID, + } + if i == 0 { + mdl.Blocker = th.Blocker + mdl.Stored = th.Stored + mdl.Weight = th.Weight + mdl.MinItems = th.MinItems + mdl.Recurrent = th.Recurrent + mdl.ThresholdType = th.ThresholdType + mdl.ThresholdValue = th.ThresholdValue + mdl.MinSleep = th.MinSleep + if th.ActivationInterval != nil { + if th.ActivationInterval.ActivationTime != "" { + mdl.ActivationInterval = th.ActivationInterval.ActivationTime + } + if th.ActivationInterval.ExpiryTime != "" { + mdl.ActivationInterval += utils.INFIELD_SEP + th.ActivationInterval.ExpiryTime + } + } + for i, atid := range th.ActionIDs { + if i != 0 { + mdl.ActionIDs = mdl.ActionIDs + utils.INFIELD_SEP + atid + } else { + mdl.ActionIDs = atid + } + } + + } + mdl.FilterType = fltr.Type + mdl.FilterFieldName = fltr.FieldName + for i, val := range fltr.Values { + if i != 0 { + mdl.FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val + } else { + mdl.FilterFieldValues = val + } + } + mdls = append(mdls, mdl) + } + return +} + +func APItoThresholdCfg(tpTH *utils.TPThresholdCfg, timezone string) (th *ThresholdCfg, err error) { + th = &ThresholdCfg{ + ID: tpTH.ID, + ThresholdType: tpTH.ThresholdType, + ThresholdValue: tpTH.ThresholdValue, + MinItems: tpTH.MinItems, + Recurrent: tpTH.Recurrent, + Weight: tpTH.Weight, + Blocker: tpTH.Blocker, + Stored: tpTH.Stored, + Filters: make([]*RequestFilter, len(tpTH.Filters)), + } + if tpTH.MinSleep != "" { + if th.MinSleep, err = utils.ParseDurationWithSecs(tpTH.MinSleep); err != nil { + return nil, err + } + } + for _, ati := range tpTH.ActionIDs { + th.ActionIDs = append(th.ActionIDs, ati) + + } + for i, f := range tpTH.Filters { + rf := &RequestFilter{Type: f.Type, FieldName: f.FieldName, Values: f.Values} + if err := rf.CompileValues(); err != nil { + return nil, err + } + th.Filters[i] = rf + } + if tpTH.ActivationInterval != nil { + if th.ActivationInterval, err = tpTH.ActivationInterval.AsActivationInterval(timezone); err != nil { + return nil, err + } + } + return th, nil +} diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 9c51f579a..6e73300b9 100644 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -878,7 +878,102 @@ func TestAPItoTPStats(t *testing.T) { at, _ := utils.ParseTimeDetectLayout("2014-07-29T15:00:00Z", "UTC") eTPs.ActivationInterval = &utils.ActivationInterval{ActivationTime: at} - if st, err := APItoTPStats(tps, "UTC"); err != nil { + if st, err := APItoStats(tps, "UTC"); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eTPs, st) { + t.Errorf("Expecting: %+v, received: %+v", eTPs, st) + } +} + +func TestAsTPThresholdCfgAsAsTPThresholdCfg(t *testing.T) { + tps := []*TpThresholdCfg{ + &TpThresholdCfg{ + Tpid: "TEST_TPID", + Tag: "Stats1", + FilterType: MetaStringPrefix, + FilterFieldName: "Account", + FilterFieldValues: "1001;1002", + ActivationInterval: "2014-07-29T15:00:00Z", + MinItems: 100, + Recurrent: false, + MinSleep: "1s", + ThresholdType: "", + ThresholdValue: 1.2, + Stored: false, + Blocker: false, + Weight: 20.0, + ActionIDs: "WARN3", + }, + } + eTPs := []*utils.TPThresholdCfg{ + &utils.TPThresholdCfg{ + TPid: tps[0].Tpid, + ID: tps[0].Tag, + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + Type: tps[0].FilterType, + FieldName: tps[0].FilterFieldName, + Values: []string{"1001", "1002"}, + }, + }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: tps[0].ActivationInterval, + }, + MinItems: tps[0].MinItems, + MinSleep: tps[0].MinSleep, + ThresholdType: tps[0].ThresholdType, + ThresholdValue: tps[0].ThresholdValue, + Recurrent: tps[0].Recurrent, + Stored: tps[0].Stored, + Blocker: tps[0].Blocker, + Weight: tps[0].Weight, + ActionIDs: []string{"WARN3"}, + }, + } + rcvTPs := TpThresholdCfgS(tps).AsTPThresholdCfg() + if !(reflect.DeepEqual(eTPs, rcvTPs) || reflect.DeepEqual(eTPs[0], rcvTPs[0])) { + t.Errorf("\nExpecting:\n%+v\nReceived:\n%+v", utils.ToIJSON(eTPs), utils.ToIJSON(rcvTPs)) + } +} + +func TestAPItoTPThresholdCfg(t *testing.T) { + tps := &utils.TPThresholdCfg{ + TPid: testTPID, + ID: "Stats1", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, + }, + ActivationInterval: &utils.TPActivationInterval{ActivationTime: "2014-07-29T15:00:00Z"}, + MinItems: 100, + Recurrent: false, + MinSleep: "1s", + ThresholdType: "", + ThresholdValue: 1.2, + Stored: false, + Blocker: false, + Weight: 20.0, + ActionIDs: []string{"WARN3"}, + } + + eTPs := &ThresholdCfg{ID: tps.ID, + Filters: make([]*RequestFilter, len(tps.Filters)), + MinItems: tps.MinItems, + Recurrent: tps.Recurrent, + ThresholdType: tps.ThresholdType, + ThresholdValue: tps.ThresholdValue, + Stored: tps.Stored, + Blocker: tps.Blocker, + Weight: tps.Weight, + ActionIDs: []string{"WARN3"}, + } + if eTPs.MinSleep, err = utils.ParseDurationWithSecs(tps.MinSleep); err != nil { + t.Errorf("Got error: %+v", err) + } + eTPs.Filters[0] = &RequestFilter{Type: MetaString, + FieldName: "Account", Values: []string{"1001", "1002"}} + at, _ := utils.ParseTimeDetectLayout("2014-07-29T15:00:00Z", "UTC") + eTPs.ActivationInterval = &utils.ActivationInterval{ActivationTime: at} + if st, err := APItoThresholdCfg(tps, "UTC"); err != nil { t.Error(err) } else if !reflect.DeepEqual(eTPs, st) { t.Errorf("Expecting: %+v, received: %+v", eTPs, st) diff --git a/engine/models.go b/engine/models.go index c76e280c4..825f5777e 100755 --- a/engine/models.go +++ b/engine/models.go @@ -495,3 +495,23 @@ type TpStats struct { Thresholds string `index:"11" re:""` CreatedAt time.Time } + +type TpThresholdCfg struct { + ID int64 + Tpid string + Tag string `index:"0" re:""` + FilterType string `index:"1" re:"^\*[A-Za-z].*"` + FilterFieldName string `index:"2" re:""` + FilterFieldValues string `index:"3" re:""` + ActivationInterval string `index:"4" re:""` + ThresholdType string `index:"5" re:""` + ThresholdValue float64 `index:"6" re:"\d+\.?\d*"` + MinItems int `index:"7" re:""` + Recurrent bool `index:"8" re:""` + MinSleep string `index:"9" re:""` + Blocker bool `index:"10" re:""` + Stored bool `index:"11" re:""` + Weight float64 `index:"12" re:"\d+\.?\d*"` + ActionIDs string `index:"13" re:""` + CreatedAt time.Time +} diff --git a/engine/storage_csv.go b/engine/storage_csv.go index 9c2e5fbc4..25fb6b143 100755 --- a/engine/storage_csv.go +++ b/engine/storage_csv.go @@ -32,26 +32,26 @@ type CSVStorage struct { readerFunc func(string, rune, int) (*csv.Reader, *os.File, error) // file names destinationsFn, ratesFn, destinationratesFn, timingsFn, destinationratetimingsFn, ratingprofilesFn, - sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn string + sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn, thresholdsFn string } func NewFileCSVStorage(sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, - actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn string) *CSVStorage { + actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn, thresholdsFn string) *CSVStorage { c := new(CSVStorage) c.sep = sep c.readerFunc = openFileCSVStorage c.destinationsFn, c.timingsFn, c.ratesFn, c.destinationratesFn, c.destinationratetimingsFn, c.ratingprofilesFn, - c.sharedgroupsFn, c.lcrFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn, c.cdrStatsFn, c.usersFn, c.aliasesFn, c.resLimitsFn, c.statsFn = destinationsFn, timingsFn, - ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn + c.sharedgroupsFn, c.lcrFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn, c.cdrStatsFn, c.usersFn, c.aliasesFn, c.resLimitsFn, c.statsFn, c.thresholdsFn = destinationsFn, timingsFn, + ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn, thresholdsFn return c } func NewStringCSVStorage(sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, - actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn string) *CSVStorage { + actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn, thresholdsFn string) *CSVStorage { c := NewFileCSVStorage(sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, - ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn) + ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn, thresholdsFn) c.readerFunc = openStringCSVStorage return c } @@ -648,6 +648,34 @@ func (csvs *CSVStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) { return tpStats.AsTPStats(), nil } +func (csvs *CSVStorage) GetTPThresholdCfg(tpid, id string) ([]*utils.TPThresholdCfg, error) { + csvReader, fp, err := csvs.readerFunc(csvs.thresholdsFn, csvs.sep, getColumnCount(TpThresholdCfg{})) + if err != nil { + //log.Print("Could not load stats file: ", err) + // allow writing of the other values + return nil, nil + } + if fp != nil { + defer fp.Close() + } + var tpThresholdCfg TpThresholdCfgS + for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { + if err != nil { + log.Print("bad line in TPThresholdCfg csv: ", err) + return nil, err + } + if thresholdCfg, err := csvLoad(TpThresholdCfg{}, record); err != nil { + log.Print("error loading TPThresholdCfg: ", err) + return nil, err + } else { + tHresholdCfg := thresholdCfg.(TpThresholdCfg) + tHresholdCfg.Tpid = tpid + tpThresholdCfg = append(tpThresholdCfg, &tHresholdCfg) + } + } + return tpThresholdCfg.AsTPThresholdCfg(), nil +} + func (csvs *CSVStorage) GetTpIds() ([]string, error) { return nil, utils.ErrNotImplemented } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 2cb49a20f..6b1484274 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -116,6 +116,9 @@ type DataDB interface { GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) RemSQStoredMetrics(sqID string) (err error) + GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) + SetThresholdCfg(th *ThresholdCfg) (err error) + RemThresholdCfg(ID string, transactionID string) (err error) // CacheDataFromDB loads data to cache, prefix represents the cache prefix, IDs should be nil if all available data should be loaded CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) error // ToDo: Move this to dataManager } @@ -163,6 +166,7 @@ type LoadReader interface { GetTPAccountActions(*utils.TPAccountActions) ([]*utils.TPAccountActions, error) GetTPResourceLimits(string, string) ([]*utils.TPResourceLimit, error) GetTPStats(string, string) ([]*utils.TPStats, error) + GetTPThresholdCfg(string, string) ([]*utils.TPThresholdCfg, error) } type LoadWriter interface { @@ -185,6 +189,7 @@ type LoadWriter interface { SetTPAccountActions([]*utils.TPAccountActions) error SetTPResourceLimits([]*utils.TPResourceLimit) error SetTPStats([]*utils.TPStats) error + SetTPThresholdCfg([]*utils.TPThresholdCfg) error } type Marshaler interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index d1368ffa2..75f9fb8a2 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1560,8 +1560,55 @@ func (ms *MapStorage) RemSQStoredMetrics(sqID string) (err error) { return } -/* -GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) -SetStatsQueue(sq *StatsQueue) (err error) -RemStatsQueue(sqID, transactionID string) (err error) -*/ +// GetStatsQueue retrieves a ThresholdCfg from dataDB/cache +func (ms *MapStorage) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + key := utils.ThresholdCfgPrefix + ID + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*ThresholdCfg), nil + } + } + values, ok := ms.dict[key] + if !ok { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + return nil, utils.ErrNotFound + } + err = ms.ms.Unmarshal(values, &th) + if err != nil { + return nil, err + } + for _, fltr := range th.Filters { + if err := fltr.CompileValues(); err != nil { + return nil, err + } + } + cache.Set(key, th, cacheCommit(transactionID), transactionID) + return +} + +// SetStatsQueue stores a ThresholdCfg into DataDB +func (ms *MapStorage) SetThresholdCfg(th *ThresholdCfg) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + result, err := ms.ms.Marshal(th) + if err != nil { + return err + } + ms.dict[utils.ThresholdCfgPrefix+th.ID] = result + return +} + +// RemStatsQueue removes a ThresholdCfg from dataDB/cache +func (ms *MapStorage) RemThresholdCfg(sqID string, transactionID string) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + key := utils.ThresholdCfgPrefix + sqID + delete(ms.dict, key) + cache.RemKey(key, cacheCommit(transactionID), transactionID) + return +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 769bbe0c8..fd2c588e4 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -2064,3 +2064,55 @@ func (ms *MongoStorage) RemSQStoredMetrics(sqmID string) (err error) { err = col.Remove(bson.M{"sqid": sqmID}) return err } + +// GetStatsQueue retrieves a ThresholdCfg from dataDB/cache +func (ms *MongoStorage) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) { + cacheKey := utils.ThresholdCfgPrefix + ID + if !skipCache { + if x, ok := cache.Get(cacheKey); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*ThresholdCfg), nil + } + } + session, col := ms.conn(utils.ThresholdCfgPrefix) + defer session.Close() + th = new(ThresholdCfg) + cCommit := cacheCommit(transactionID) + if err = col.Find(bson.M{"id": ID}).One(&th); err != nil { + if err == mgo.ErrNotFound { + cache.Set(cacheKey, nil, cCommit, transactionID) + err = utils.ErrNotFound + } + return nil, err + } + for _, fltr := range th.Filters { + if err = fltr.CompileValues(); err != nil { + return + } + } + cache.Set(cacheKey, th, cCommit, transactionID) + return +} + +// SetStatsQueue stores a ThresholdCfg into DataDB +func (ms *MongoStorage) SetThresholdCfg(th *ThresholdCfg) (err error) { + session, col := ms.conn(utils.ThresholdCfgPrefix) + defer session.Close() + _, err = col.UpsertId(bson.M{"id": th.ID}, th) + return +} + +// RemStatsQueue removes a ThresholdCfg from dataDB/cache +func (ms *MongoStorage) RemThresholdCfg(ID string, transactionID string) (err error) { + session, col := ms.conn(utils.ThresholdCfgPrefix) + key := utils.ThresholdCfgPrefix + ID + err = col.Remove(bson.M{"id": ID}) + if err != nil { + return err + } + cache.RemKey(key, cacheCommit(transactionID), transactionID) + session.Close() + return +} diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index ee25ca632..963e414e4 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -1136,6 +1136,37 @@ func (ms *MongoStorage) SetTPStats(tpSTs []*utils.TPStats) (err error) { return } +func (ms *MongoStorage) GetTPThresholdCfg(tpid, id string) ([]*utils.TPThresholdCfg, error) { + filter := bson.M{ + "tpid": tpid, + } + if id != "" { + filter["id"] = id + } + var results []*utils.TPThresholdCfg + session, col := ms.conn(utils.TBLTPThresholds) + defer session.Close() + err := col.Find(filter).All(&results) + if len(results) == 0 { + return results, utils.ErrNotFound + } + return results, err +} + +func (ms *MongoStorage) SetTPThresholdCfg(tpTHs []*utils.TPThresholdCfg) (err error) { + if len(tpTHs) == 0 { + return + } + session, col := ms.conn(utils.TBLTPThresholds) + defer session.Close() + tx := col.Bulk() + for _, tp := range tpTHs { + tx.Upsert(bson.M{"tpid": tp.TPid, "id": tp.ID}, tp) + } + _, err = tx.Run() + return +} + func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) { return } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 241873718..f116dee64 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1623,3 +1623,52 @@ func (rs *RedisStorage) RemSQStoredMetrics(sqmID string) (err error) { } return } + +// GetStatsQueue retrieves a ThresholdCfg from dataDB/cache +func (rs *RedisStorage) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) { + key := utils.ThresholdCfgPrefix + ID + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*ThresholdCfg), nil + } + } + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err == redis.ErrRespNil { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &th); err != nil { + return + } + for _, fltr := range th.Filters { + if err = fltr.CompileValues(); err != nil { + return + } + } + cache.Set(key, th, cacheCommit(transactionID), transactionID) + return +} + +// SetStatsQueue stores a ThresholdCfg into DataDB +func (rs *RedisStorage) SetThresholdCfg(th *ThresholdCfg) (err error) { + var result []byte + result, err = rs.ms.Marshal(th) + if err != nil { + return + } + return rs.Cmd("SET", utils.ThresholdCfgPrefix+th.ID, result).Err +} + +// RemStatsQueue removes a ThresholdCfg from dataDB/cache +func (rs *RedisStorage) RemThresholdCfg(ID string, transactionID string) (err error) { + key := utils.ThresholdCfgPrefix + ID + err = rs.Cmd("DEL", key).Err + cache.RemKey(key, cacheCommit(transactionID), transactionID) + return +} diff --git a/engine/storage_sql.go b/engine/storage_sql.go index f6d8b5a18..d363ea7b2 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -602,6 +602,28 @@ func (self *SQLStorage) SetTPStats(sts []*utils.TPStats) error { return nil } +func (self *SQLStorage) SetTPThresholdCfg(ths []*utils.TPThresholdCfg) error { + if len(ths) == 0 { + return nil + } + tx := self.db.Begin() + for _, th := range ths { + // Remove previous + if err := tx.Where(&TpThresholdCfg{Tpid: th.TPid, Tag: th.ID}).Delete(TpThresholdCfg{}).Error; err != nil { + tx.Rollback() + return err + } + for _, mst := range APItoModelTPThresholdCfg(th) { + if err := tx.Save(&mst).Error; err != nil { + tx.Rollback() + return err + } + } + } + tx.Commit() + return nil +} + func (self *SQLStorage) SetSMCost(smc *SMCost) error { if smc.CostDetails == nil { return nil @@ -1548,6 +1570,22 @@ func (self *SQLStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) { return asts, nil } +func (self *SQLStorage) GetTPThresholdCfg(tpid, id string) ([]*utils.TPThresholdCfg, error) { + var ths TpThresholdCfgS + q := self.db.Where("tpid = ?", tpid) + if len(id) != 0 { + q = q.Where("tag = ?", id) + } + if err := q.Find(&ths).Error; err != nil { + return nil, err + } + aths := ths.AsTPThresholdCfg() + if len(aths) == 0 { + return aths, utils.ErrNotFound + } + return aths, nil +} + // GetVersions returns slice of all versions or a specific version if tag is specified func (self *SQLStorage) GetVersions(itm string) (vrs Versions, err error) { q := self.db.Model(&TBLVersion{}) diff --git a/engine/tp_reader.go b/engine/tp_reader.go index fa961656b..01a1140b6 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -54,6 +54,8 @@ type TpReader struct { aliases map[string]*Alias resLimits map[string]*utils.TPResourceLimit stats map[string]*utils.TPStats + thresholds map[string]*utils.TPThresholdCfg + revDests, revAliases, acntActionPlans map[string][]string @@ -126,6 +128,7 @@ func (tpr *TpReader) Init() { tpr.derivedChargers = make(map[string]*utils.DerivedChargers) tpr.resLimits = make(map[string]*utils.TPResourceLimit) tpr.stats = make(map[string]*utils.TPStats) + tpr.thresholds = make(map[string]*utils.TPThresholdCfg) tpr.revDests = make(map[string][]string) tpr.revAliases = make(map[string][]string) tpr.acntActionPlans = make(map[string][]string) @@ -1622,6 +1625,23 @@ func (tpr *TpReader) LoadStats() error { return tpr.LoadStatsFiltered("") } +func (tpr *TpReader) LoadThresholdsFiltered(tag string) error { + tps, err := tpr.lr.GetTPThresholdCfg(tpr.tpid, tag) + if err != nil { + return err + } + mapTHs := make(map[string]*utils.TPThresholdCfg) + for _, th := range tps { + mapTHs[th.ID] = th + } + tpr.thresholds = mapTHs + return nil +} + +func (tpr *TpReader) LoadThresholds() error { + return tpr.LoadThresholdsFiltered("") +} + func (tpr *TpReader) LoadAll() (err error) { if err = tpr.LoadDestinations(); err != nil && err.Error() != utils.NotFoundCaps { return @@ -1677,6 +1697,9 @@ func (tpr *TpReader) LoadAll() (err error) { if err = tpr.LoadStats(); err != nil && err.Error() != utils.NotFoundCaps { return } + if err = tpr.LoadThresholds(); err != nil && err.Error() != utils.NotFoundCaps { + return + } return nil } @@ -1928,7 +1951,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Stats:") } for _, tpST := range tpr.stats { - st, err := APItoTPStats(tpST, tpr.timezone) + st, err := APItoStats(tpST, tpr.timezone) if err != nil { return err } @@ -1939,6 +1962,21 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("\t", st.ID) } } + if verbose { + log.Print("Thresholds:") + } + for _, tpTH := range tpr.thresholds { + th, err := APItoThresholdCfg(tpTH, tpr.timezone) + if err != nil { + return err + } + if err = tpr.dataStorage.SetThresholdCfg(th); err != nil { + return err + } + if verbose { + log.Print("\t", th.ID) + } + } if verbose { log.Print("Timings:") } @@ -2006,7 +2044,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err return err } for _, tpST := range tpr.stats { - if st, err := APItoTPStats(tpST, tpr.timezone); err != nil { + if st, err := APItoStats(tpST, tpr.timezone); err != nil { return err } else { stIdxr.IndexFilters(st.ID, st.Filters) @@ -2019,6 +2057,28 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err return err } } + if len(tpr.thresholds) > 0 { + if verbose { + log.Print("Indexing thresholds") + } + stIdxr, err := NewReqFilterIndexer(tpr.dataStorage, utils.ThresholdsIndex) + if err != nil { + return err + } + for _, tpTH := range tpr.thresholds { + if th, err := APItoThresholdCfg(tpTH, tpr.timezone); err != nil { + return err + } else { + stIdxr.IndexFilters(th.ID, th.Filters) + } + } + if verbose { + log.Printf("Indexed Stats keys: %+v", stIdxr.ChangedKeys().Slice()) + } + if err := stIdxr.StoreIndexes(); err != nil { + return err + } + } } return } @@ -2225,6 +2285,14 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil + case utils.ThresholdsPrefix: + keys := make([]string, len(tpr.thresholds)) + i := 0 + for k := range tpr.thresholds { + keys[i] = k + i++ + } + return keys, nil } return nil, errors.New("Unsupported load category") } diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index d441102d6..9c9416dae 100755 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -58,6 +58,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{ utils.ALIASES_CSV: (*TPCSVImporter).importAliases, utils.ResourceLimitsCsv: (*TPCSVImporter).importResourceLimits, utils.StatsCsv: (*TPCSVImporter).importStats, + utils.ThresholdsCsv: (*TPCSVImporter).importThresholds, } func (self *TPCSVImporter) Run() error { @@ -80,6 +81,7 @@ func (self *TPCSVImporter) Run() error { path.Join(self.DirPath, utils.ALIASES_CSV), path.Join(self.DirPath, utils.ResourceLimitsCsv), path.Join(self.DirPath, utils.StatsCsv), + path.Join(self.DirPath, utils.ThresholdsCsv), ) files, _ := ioutil.ReadDir(self.DirPath) for _, f := range files { @@ -370,3 +372,14 @@ func (self *TPCSVImporter) importStats(fn string) error { } return self.StorDb.SetTPStats(sts) } + +func (self *TPCSVImporter) importThresholds(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + sts, err := self.csvr.GetTPThresholdCfg(self.TPid, "") + if err != nil { + return err + } + return self.StorDb.SetTPThresholdCfg(sts) +} diff --git a/general_tests/acntacts_test.go b/general_tests/acntacts_test.go old mode 100644 new mode 100755 index 26386312f..1ddeddbae --- a/general_tests/acntacts_test.go +++ b/general_tests/acntacts_test.go @@ -54,9 +54,9 @@ ENABLE_ACNT,*enable_account,,,,,,,,,,,,,,false,false,10` aliases := `` resLimits := `` stats := `` - + thresholds := `` csvr := engine.NewTpReader(dbAcntActs, engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats), "", "") + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats, thresholds), "", "") if err := csvr.LoadAll(); err != nil { t.Fatal(err) } diff --git a/general_tests/auth_test.go b/general_tests/auth_test.go index 1991e96bb..ea25082f9 100755 --- a/general_tests/auth_test.go +++ b/general_tests/auth_test.go @@ -61,8 +61,9 @@ RP_ANY,DR_ANY_1CNT,*any,10` aliases := `` resLimits := `` stats := `` + thresholds := `` csvr := engine.NewTpReader(dbAuth, engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats), "", "") + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats, thresholds), "", "") if err := csvr.LoadAll(); err != nil { t.Fatal(err) } diff --git a/general_tests/costs1_test.go b/general_tests/costs1_test.go index 9eb062fb0..630dd3caa 100644 --- a/general_tests/costs1_test.go +++ b/general_tests/costs1_test.go @@ -51,7 +51,7 @@ RP_SMS1,DR_SMS_1,ALWAYS,10` *out,cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1,, *out,cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,,` csvr := engine.NewTpReader(dataDB, engine.NewStringCSVStorage(',', dests, timings, rates, destinationRates, ratingPlans, ratingProfiles, - "", "", "", "", "", "", "", "", "", "", "", ""), "", "") + "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "") if err := csvr.LoadTimings(); err != nil { t.Fatal(err) diff --git a/general_tests/datachrg1_test.go b/general_tests/datachrg1_test.go index f9250fc70..32ed70afd 100644 --- a/general_tests/datachrg1_test.go +++ b/general_tests/datachrg1_test.go @@ -42,7 +42,7 @@ DR_DATA_2,*any,RT_DATA_1c,*up,4,0,` RP_DATA1,DR_DATA_2,TM2,10` ratingProfiles := `*out,cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1,,` csvr := engine.NewTpReader(dataDB, engine.NewStringCSVStorage(',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles, - "", "", "", "", "", "", "", "", "", "", "", ""), "", "") + "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "") if err := csvr.LoadTimings(); err != nil { t.Fatal(err) } diff --git a/general_tests/ddazmbl1_test.go b/general_tests/ddazmbl1_test.go index 9fb272dc1..486770a64 100644 --- a/general_tests/ddazmbl1_test.go +++ b/general_tests/ddazmbl1_test.go @@ -61,8 +61,9 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` aliases := `` resLimits := `` stats := `` + thresholds := `` csvr := engine.NewTpReader(dataDB, engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats), "", "") + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats, thresholds), "", "") if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } diff --git a/general_tests/ddazmbl2_test.go b/general_tests/ddazmbl2_test.go index 928a69579..7f7b49965 100644 --- a/general_tests/ddazmbl2_test.go +++ b/general_tests/ddazmbl2_test.go @@ -61,8 +61,9 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` aliases := `` resLimits := `` stats := `` + thresholds := `` csvr := engine.NewTpReader(dataDB2, engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats), "", "") + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats, thresholds), "", "") if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } diff --git a/general_tests/ddazmbl3_test.go b/general_tests/ddazmbl3_test.go index 5e6a60a10..fa7f5953e 100644 --- a/general_tests/ddazmbl3_test.go +++ b/general_tests/ddazmbl3_test.go @@ -59,8 +59,9 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10` aliases := `` resLimits := `` stats := `` + thresholds := `` csvr := engine.NewTpReader(dataDB3, engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats), "", "") + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats, thresholds), "", "") if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } diff --git a/general_tests/smschrg1_test.go b/general_tests/smschrg1_test.go index 1def35838..cef967b2b 100755 --- a/general_tests/smschrg1_test.go +++ b/general_tests/smschrg1_test.go @@ -40,7 +40,7 @@ func TestSMSLoadCsvTpSmsChrg1(t *testing.T) { ratingPlans := `RP_SMS1,DR_SMS_1,ALWAYS,10` ratingProfiles := `*out,cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,,` csvr := engine.NewTpReader(dataDB, engine.NewStringCSVStorage(',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles, - "", "", "", "", "", "", "", "", "", "", "", ""), "", "") + "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "") if err := csvr.LoadTimings(); err != nil { t.Fatal(err) } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 46f5d3090..8e18920a8 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -668,6 +668,8 @@ type ArgsCache struct { AliasIDs *[]string ReverseAliasIDs *[]string ResourceLimitIDs *[]string + StatsIDs *[]string + ThresholdsIDs *[]string } // Data used to do remote cache reloads via api @@ -1341,3 +1343,19 @@ type TPStats struct { Weight float64 Thresholds []string } + +type TPThresholdCfg struct { + TPid string + ID string + Filters []*TPRequestFilter // Filters for the request + ActivationInterval *TPActivationInterval // Time when this limit becomes active and expires + ThresholdType string + ThresholdValue float64 // threshold value + MinItems int // number of items agregated for the threshold to match + Recurrent bool + MinSleep string + Blocker bool // blocker flag to stop processing on filters matched + Stored bool + Weight float64 // Weight to sort the thresholds + ActionIDs []string +} diff --git a/utils/consts.go b/utils/consts.go index d4fbb42c8..b6bd379e4 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -101,6 +101,7 @@ const ( TBLTPAliases = "tp_aliases" TBLTPResourceLimits = "tp_resource_limits" TBLTPStats = "tp_stats" + TBLTPThresholds = "tp_thresholds" TBLSMCosts = "sm_costs" TBLCDRs = "cdrs" TBLVersions = "versions" @@ -122,6 +123,7 @@ const ( ALIASES_CSV = "Aliases.csv" ResourceLimitsCsv = "ResourceLimits.csv" StatsCsv = "Stats.csv" + ThresholdsCsv = "Thresholds.csv" ROUNDING_UP = "*up" ROUNDING_MIDDLE = "*middle" ROUNDING_DOWN = "*down" @@ -236,6 +238,8 @@ const ( ResourceLimitsIndex = "rli_" StatsPrefix = "sts_" StatsIndex = "sti_" + ThresholdsPrefix = "ths_" + ThresholdsIndex = "thi_" TimingsPrefix = "tmg_" CDR_STATS_PREFIX = "cst_" TEMP_DESTINATION_PREFIX = "tmp_" @@ -248,6 +252,7 @@ const ( LOG_MEDIATED_CDR = "mcd_" SQStoredMetricsPrefix = "ssm_" StatsQueuePrefix = "stq_" + ThresholdCfgPrefix = "thc_" LOADINST_KEY = "load_history" SESSION_MANAGER_SOURCE = "SMR" MEDIATOR_SOURCE = "MED"