diff --git a/cache/cache.go b/cache/cache.go old mode 100644 new mode 100755 diff --git a/cache/cache_store.go b/cache/cache_store.go old mode 100644 new mode 100755 diff --git a/cache/cache_test.go b/cache/cache_test.go old mode 100644 new mode 100755 diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go old mode 100644 new mode 100755 diff --git a/config/cacheconfig.go b/config/cacheconfig.go old mode 100644 new mode 100755 diff --git a/config/config.go b/config/config.go old mode 100644 new mode 100755 diff --git a/config/config_defaults.go b/config/config_defaults.go old mode 100644 new mode 100755 diff --git a/config/config_json_test.go b/config/config_json_test.go old mode 100644 new mode 100755 diff --git a/config/config_test.go b/config/config_test.go old mode 100644 new mode 100755 diff --git a/config/libconfig_json.go b/config/libconfig_json.go old mode 100644 new mode 100755 diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql index 6179a8615..2d2dda176 100644 --- a/data/storage/mysql/create_tariffplan_tables.sql +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -391,6 +391,10 @@ CREATE TABLE tp_aliases ( UNIQUE KEY `unique_tp_aliases` (`tpid`,`direction`,`tenant`,`category`,`account`,`subject`,`context`, `target`) ); +-- +-- Table structure for table `tp_resource_limits` +-- + DROP TABLE IF EXISTS tp_resource_limits; CREATE TABLE tp_resource_limits ( `id` int(11) NOT NULL AUTO_INCREMENT, @@ -411,6 +415,35 @@ CREATE TABLE tp_resource_limits ( UNIQUE KEY `unique_tp_resource_limits` (`tpid`, `tag`, `filter_type`, `filter_field_name`) ); +-- +-- Table structure for table `tp_stats` +-- + +DROP TABLE IF EXISTS tp_stats; +CREATE TABLE tp_stats ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` varchar(64) NOT NULL, + `tag` varchar(64) NOT NULL, + `filter_type` varchar(16) NOT NULL, + `filter_field_name` varchar(64) NOT NULL, + `filter_field_values` varchar(256) NOT NULL, + `activation_interval` varchar(64) NOT NULL, + `queue_length` int(11) NOT NULL, + `ttl` varchar(32) NOT NULL, + `metrics` varchar(64) NOT NULL, + `store` BOOLEAN NOT NULL, + `thresholds` varchar(64) NOT NULL, + `weight` decimal(8,2) NOT NULL, + `created_at` TIMESTAMP, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`), + UNIQUE KEY `unique_tp_stats` (`tpid`, `tag`, `filter_type`, `filter_field_name`) +); + +-- +-- Table structure for table `versions` +-- + DROP TABLE IF EXISTS versions; CREATE TABLE versions ( `id` int(11) NOT NULL AUTO_INCREMENT, diff --git a/data/storage/postgres/create_tariffplan_tables.sql b/data/storage/postgres/create_tariffplan_tables.sql index dab478d76..9183881f1 100644 --- a/data/storage/postgres/create_tariffplan_tables.sql +++ b/data/storage/postgres/create_tariffplan_tables.sql @@ -386,6 +386,11 @@ CREATE TABLE tp_aliases ( CREATE INDEX tpaliases_tpid_idx ON tp_aliases (tpid); CREATE INDEX tpaliases_idx ON tp_aliases ("tpid","direction","tenant","category","account","subject","context","target"); + +-- +-- Table structure for table `tp_resource_limits` +-- + DROP TABLE IF EXISTS tp_resource_limits; CREATE TABLE tp_resource_limits ( "id" SERIAL PRIMARY KEY, @@ -398,13 +403,43 @@ CREATE TABLE tp_resource_limits ( "usage_ttl" varchar(32) NOT NULL, "limit" varchar(64) NOT NULL, "allocation_message" varchar(64) NOT NULL, - "weight" decimal(8,2) NOT NULL, + "weight" NUMERIC(8,2) NOT NULL, "action_trigger_ids" varchar(64) NOT NULL, "created_at" TIMESTAMP WITH TIME ZONE ); CREATE INDEX tp_resource_limits_idx ON tp_resource_limits (tpid); CREATE INDEX tp_resource_limits_unique ON tp_resource_limits ("tpid", "tag", "filter_type", "filter_field_name"); + +-- +-- Table structure for table `tp_stats` +-- + +DROP TABLE IF EXISTS tp_stats; +CREATE TABLE tp_stats ( + "id" SERIAL PRIMARY KEY, + "tpid" varchar(64) NOT NULL, + "tag" varchar(64) NOT NULL, + "filter_type" varchar(16) NOT NULL, + "filter_field_name" varchar(64) NOT NULL, + "filter_field_values" varchar(256) NOT NULL, + "activation_interval" varchar(64) NOT NULL, + "queue_length" INTEGER NOT NULL, + "ttl" varchar(32) NOT NULL, + "metrics" varchar(64) NOT NULL, + "store" BOOLEAN NOT NULL, + "thresholds" varchar(64) NOT NULL, + "weight" decimal(8,2) NOT NULL, + "created_at" TIMESTAMP WITH TIME ZONE +); +CREATE INDEX tp_stats_idx ON tp_stats (tpid); +CREATE INDEX tp_stats_unique ON tp_stats ("tpid", "tag", "filter_type", "filter_field_name"); + + +-- +-- Table structure for table `versions` +-- + DROP TABLE IF EXISTS versions; CREATE TABLE versions ( "id" SERIAL PRIMARY KEY, diff --git a/data/tariffplans/tutorial/ResourceLimits.csv b/data/tariffplans/tutorial/ResourceLimits.csv deleted file mode 100644 index c95d6b485..000000000 --- a/data/tariffplans/tutorial/ResourceLimits.csv +++ /dev/null @@ -1,6 +0,0 @@ -#Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,TTL,Limit,AllocationReply,Weight,ActionTriggers -ResGroup1,*string,Account,1001;1002,2014-07-29T15:00:00Z,1s,7,,20, -ResGroup1,*string_prefix,Destination,10;20,,,,,, -ResGroup1,*rsr_fields,,Subject(~^1.*1$);Destination(1002),,,,,, -ResGroup2,*destinations,Destination,DST_FS,2014-07-29T15:00:00Z,3600s,8,SPECIAL_1002,10, -ResGroup3,*cdr_stats,,CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20,,,,,, diff --git a/data/tariffplans/tutorial/Stats.csv b/data/tariffplans/tutorial/Stats.csv deleted file mode 100644 index 8fdb9aa4d..000000000 --- a/data/tariffplans/tutorial/Stats.csv +++ /dev/null @@ -1,2 +0,0 @@ -#Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,QueueLength,TTL,Metrics,Store,Thresholds,Weight -Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,THRESH1;THRESH2,20 diff --git a/engine/calldesc.go b/engine/calldesc.go old mode 100644 new mode 100755 diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index 1188cda6c..a4370cc75 100644 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -153,6 +153,9 @@ func TestLoaderITLoadFromCSV(t *testing.T) { if err = loader.LoadResourceLimits(); err != nil { t.Error("Failed loading resource limits: ", err.Error()) } + if err = loader.LoadStats(); err != nil { + t.Error("Failed loading stats: ", err.Error()) + } if err := loader.WriteToDatabase(false, false, false); err != nil { t.Error("Could not write data into dataDb: ", err.Error()) } @@ -312,6 +315,16 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } } + for k, st := range loader.stats { + rcv, err := loader.dataStorage.GetStatsQueue(k, true, utils.NonTransactional) + if err != nil { + t.Error("Failed GetStatsQueue: ", err.Error()) + } + if !reflect.DeepEqual(st, rcv) { + t.Errorf("Expecting: %v, received: %v", st, rcv) + } + } + } // Imports data from csv files in tpScenario to storDb diff --git a/engine/model_helpers.go b/engine/model_helpers.go index e58e5bc15..90b20ae15 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -1910,8 +1910,12 @@ func APItoModelResourceLimit(rl *utils.TPResourceLimit) (mdls TpResourceLimits) } func APItoResourceLimit(tpRL *utils.TPResourceLimit, timezone string) (rl *ResourceLimit, err error) { - rl = &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, - Filters: make([]*RequestFilter, len(tpRL.Filters)), Usage: make(map[string]*ResourceUsage)} + rl = &ResourceLimit{ + ID: tpRL.ID, + Weight: tpRL.Weight, + Filters: make([]*RequestFilter, len(tpRL.Filters)), + Usage: make(map[string]*ResourceUsage), + } if tpRL.UsageTTL != "" { if rl.UsageTTL, err = utils.ParseDurationWithSecs(tpRL.UsageTTL); err != nil { return nil, err @@ -2048,6 +2052,7 @@ func APItoTPStats(tpST *utils.TPStats, timezone string) (st *StatsQueue, err err ID: tpST.ID, QueueLength: tpST.QueueLength, Store: tpST.Store, + Weight: tpST.Weight, Filters: make([]*RequestFilter, len(tpST.Filters)), } if tpST.TTL != "" { diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 86bde95ef..53b6ae304 100644 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -850,6 +850,7 @@ func TestAPItoTPStats(t *testing.T) { Store: tps.Store, Thresholds: []string{"THRESH1", "THRESH2"}, Filters: make([]*RequestFilter, len(tps.Filters)), + Weight: 20.0, } if eTPs.TTL, err = utils.ParseDurationWithSecs(tps.TTL); err != nil { t.Errorf("Got error: %+v", err) diff --git a/engine/statsqueue.go b/engine/statsqueue.go old mode 100644 new mode 100755 diff --git a/engine/storage_csv.go b/engine/storage_csv.go index fe680b2f3..9c2e5fbc4 100755 --- a/engine/storage_csv.go +++ b/engine/storage_csv.go @@ -623,9 +623,9 @@ func (csvs *CSVStorage) GetTPResourceLimits(tpid, id string) ([]*utils.TPResourc func (csvs *CSVStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) { csvReader, fp, err := csvs.readerFunc(csvs.statsFn, csvs.sep, getColumnCount(TpStats{})) if err != nil { - //log.Print("Could not load resource limits file: ", err) + //log.Print("Could not load stats file: ", err) // allow writing of the other values - return nil, err + return nil, nil } if fp != nil { defer fp.Close() diff --git a/engine/storage_map.go b/engine/storage_map.go old mode 100644 new mode 100755 index be8ad0daf..d1368ffa2 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1496,6 +1496,11 @@ func (ms *MapStorage) GetStatsQueue(sqID string, skipCache bool, transactionID s if err != nil { return nil, err } + for _, fltr := range sq.Filters { + if err := fltr.CompileValues(); err != nil { + return nil, err + } + } cache.Set(key, sq, cacheCommit(transactionID), transactionID) return } @@ -1540,6 +1545,9 @@ func (ms *MapStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) { defer ms.mu.Unlock() var result []byte result, err = ms.ms.Marshal(sqSM) + if err != nil { + return err + } ms.dict[utils.SQStoredMetricsPrefix+sqSM.SqID] = result return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go old mode 100644 new mode 100755 index ba7ebbb4f..769bbe0c8 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1986,7 +1986,6 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (item // GetStatsQueue retrieves a StatsQueue from dataDB/cache func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) { - var rez *StatsQueue cacheKey := utils.StatsQueuePrefix + sqID if !skipCache { if x, ok := cache.Get(cacheKey); ok { @@ -1998,15 +1997,20 @@ func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID } session, col := ms.conn(utils.StatsQueuePrefix) defer session.Close() + sq = new(StatsQueue) cCommit := cacheCommit(transactionID) - if err = col.Find(bson.M{"id": sqID}).One(&rez); err != nil { + if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil { if err == mgo.ErrNotFound { cache.Set(cacheKey, nil, cCommit, transactionID) err = utils.ErrNotFound } return nil, err } - sq = rez + for _, fltr := range sq.Filters { + if err = fltr.CompileValues(); err != nil { + return + } + } cache.Set(cacheKey, sq, cCommit, transactionID) return } diff --git a/engine/storage_redis.go b/engine/storage_redis.go old mode 100644 new mode 100755 index 29a09cb75..241873718 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1562,6 +1562,11 @@ func (rs *RedisStorage) GetStatsQueue(sqID string, skipCache bool, transactionID if err = rs.ms.Unmarshal(values, &sq); err != nil { return } + for _, fltr := range sq.Filters { + if err = fltr.CompileValues(); err != nil { + return + } + } cache.Set(key, sq, cacheCommit(transactionID), transactionID) return } diff --git a/engine/storage_utils.go b/engine/storage_utils.go old mode 100644 new mode 100755 diff --git a/engine/stordb_it_test.go b/engine/stordb_it_test.go index 696f4e738..e0f206a73 100644 --- a/engine/stordb_it_test.go +++ b/engine/stordb_it_test.go @@ -1561,7 +1561,7 @@ func testStorDBitCRUDTpStats(t *testing.T) { t.Error(err) } // READ - if rcv, err := storDB.GetTPStats("testTPid", ""); err != nil { + if rcv, err := storDB.GetTPStats("TEST_TPID", ""); err != nil { t.Error(err) } else { if !(reflect.DeepEqual(eTPs[0].TPid, rcv[0].TPid) || reflect.DeepEqual(eTPs[0].TPid, rcv[1].TPid)) { @@ -1588,7 +1588,7 @@ func testStorDBitCRUDTpStats(t *testing.T) { t.Error(err) } // READ - if rcv, err := storDB.GetTPStats("testTPid", ""); err != nil { + if rcv, err := storDB.GetTPStats("TEST_TPID", ""); err != nil { t.Error(err) } else { if !(reflect.DeepEqual(eTPs[0].TPid, rcv[0].TPid) || reflect.DeepEqual(eTPs[0].TPid, rcv[1].TPid)) { diff --git a/engine/tp_reader.go b/engine/tp_reader.go index e661c6985..dfef9a458 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -1610,6 +1610,7 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) error { if err != nil { return err } + fmt.Printf("\nstats :") mapSTs := make(map[string]*utils.TPStats) for _, st := range tps { mapSTs[st.ID] = st @@ -1924,6 +1925,22 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("\t", rl.ID) } } + if verbose { + log.Print("Stats:") + } + for _, tpST := range tpr.stats { + st, err := APItoTPStats(tpST, tpr.timezone) + if err != nil { + return err + } + fmt.Printf("stats: %+v", st.Filters[0]) + if err = tpr.dataStorage.SetStatsQueue(st); err != nil { + return err + } + if verbose { + log.Print("\t", st.ID) + } + } if verbose { log.Print("Timings:") } diff --git a/utils/consts.go b/utils/consts.go old mode 100644 new mode 100755