From 57ef37edcaddc7ef898ff2a8a312e3ad790b7782 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 29 Jul 2015 12:07:46 +0200 Subject: [PATCH 01/15] ExtraFields in CallDescriptor --- ...5_rc6.sql => alter_cdr_tables_rc5_rc6.sql} | 0 data/storage/mysql/alter_tables_rc4_rc5.sql | 48 ------------------- engine/calldesc.go | 3 +- 3 files changed, 2 insertions(+), 49 deletions(-) rename data/storage/mysql/{alter_tables_rc5_rc6.sql => alter_cdr_tables_rc5_rc6.sql} (100%) delete mode 100644 data/storage/mysql/alter_tables_rc4_rc5.sql diff --git a/data/storage/mysql/alter_tables_rc5_rc6.sql b/data/storage/mysql/alter_cdr_tables_rc5_rc6.sql similarity index 100% rename from data/storage/mysql/alter_tables_rc5_rc6.sql rename to data/storage/mysql/alter_cdr_tables_rc5_rc6.sql diff --git a/data/storage/mysql/alter_tables_rc4_rc5.sql b/data/storage/mysql/alter_tables_rc4_rc5.sql deleted file mode 100644 index 1a54a26f1..000000000 --- a/data/storage/mysql/alter_tables_rc4_rc5.sql +++ /dev/null @@ -1,48 +0,0 @@ -ALTER TABLE cdrs_primary - CHANGE COLUMN tor category varchar(16) NOT NULL, - CHANGE COLUMN duration `usage` bigint(20) NOT NULL, - ADD COLUMN tor varchar(16) NOT NULL AFTER cgrid; - -UPDATE cdrs_primary SET tor="*voice"; - -ALTER TABLE cost_details - DROP COLUMN accid, - MODIFY COLUMN cost_time datetime NOT NULL AFTER tbid, - CHANGE COLUMN `source` cost_source varchar(64) NOT NULL AFTER cost_time, - MODIFY COLUMN runid varchar(64) NOT NULL AFTER cgrid, - CHANGE COLUMN tor category varchar(32) NOT NULL AFTER tenant, - ADD COLUMN tor varchar(16) NOT NULL after runid, - MODIFY COLUMN direction varchar(8) NOT NULL AFTER tor; - -UPDATE cost_details SET tor="*voice"; - -ALTER TABLE rated_cdrs - MODIFY COLUMN mediation_time datetime NOT NULL AFTER tbid, - MODIFY COLUMN subject varchar(128) NOT NULL, - ADD COLUMN reqtype varchar(24) NOT NULL AFTER runid, - ADD COLUMN direction varchar(8) NOT NULL AFTER reqtype, - ADD COLUMN tenant varchar(64) NOT NULL AFTER direction, - ADD COLUMN category varchar(16) NOT NULL AFTER tenant, - ADD COLUMN account varchar(128) NOT NULL AFTER category, - ADD COLUMN destination varchar(128) NOT NULL AFTER subject, - ADD COLUMN setup_time datetime NOT NULL AFTER destination, - ADD COLUMN answer_time datetime NOT NULL AFTER setup_time, - ADD COLUMN `usage` bigint(20) NOT NULL AFTER answer_time; - -ALTER TABLE tp_rates - DROP COLUMN rounding_method, - DROP COLUMN rounding_decimals; - -ALTER TABLE tp_destination_rates - ADD COLUMN rounding_method varchar(255) NOT NULL, - ADD COLUMN rounding_decimals tinyint(4) NOT NULL; - -ALTER TABLE tp_rating_profiles - DROP KEY tpid_loadid_tenant_tor_dir_subj_atime, - CHANGE COLUMN tor category varchar(16) NOT NULL, - ADD UNIQUE KEY `tpid_loadid_tenant_category_dir_subj_atime` (`tpid`,`loadid`,`tenant`,`category`,`direction`,`subject`,`activation_time`); - - - - - diff --git a/engine/calldesc.go b/engine/calldesc.go index 5d527f46b..958177991 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -132,7 +132,8 @@ type CallDescriptor struct { FallbackSubject string // the subject to check for destination if not found on primary subject RatingInfos RatingInfos Increments Increments - TOR string // used unit balances selector + TOR string // used unit balances selector + ExtraFields map[string]string // Extra fields, mostly used for user profile matching // session limits MaxRate float64 MaxRateUnit time.Duration From dbc6160e8a508965c882c612ddafb3e60145e62f Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 29 Jul 2015 12:25:48 +0200 Subject: [PATCH 02/15] Only load user profile if at least one field requesting it --- engine/users.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/engine/users.go b/engine/users.go index 8e937fc83..c0fe01f1c 100644 --- a/engine/users.go +++ b/engine/users.go @@ -355,6 +355,7 @@ func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) er return ps.Client.Call("UsersV1.AddIndex", in, reply) } +// extraFields - Field name in the interface containing extraFields information func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) { if userService == nil { // no user service => no fun return in, nil @@ -363,7 +364,16 @@ func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) { if err != nil { return nil, err } - + var needsUsers bool + for _, val := range m { + if val == utils.USERS { + needsUsers = true + break + } + } + if !needsUsers { // Do not process further if user profile is not needed + return in, nil + } up := &UserProfile{ Profile: make(map[string]string), } From 0261252b69a726ae54b47bbaa115009200b8a3db Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 29 Jul 2015 12:32:17 +0200 Subject: [PATCH 03/15] Add LoadUserProfile in the CDRs core --- apier/v1/cdrsv1.go | 8 - .../mysql/alter_cdr_tables_rc5_rc6.sql | 355 ------------------ data/tariffplans/tutorial/Users.csv | 8 +- engine/cdrs.go | 5 + 4 files changed, 10 insertions(+), 366 deletions(-) diff --git a/apier/v1/cdrsv1.go b/apier/v1/cdrsv1.go index a0b82923e..afa46d850 100644 --- a/apier/v1/cdrsv1.go +++ b/apier/v1/cdrsv1.go @@ -41,14 +41,6 @@ func (self *CdrsV1) ProcessCdr(cdr *engine.StoredCdr, reply *string) error { // Designed for external programs feeding CDRs to CGRateS func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCdr, reply *string) error { - out, err := engine.LoadUserProfile(cdr, "ExtraFields") - if err != nil { - *reply = err.Error() - return err - } - if upcdr, ok := out.(engine.ExternalCdr); ok { - *cdr = upcdr - } if err := self.CdrSrv.ProcessExternalCdr(cdr); err != nil { return utils.NewErrServerError(err) } diff --git a/data/storage/mysql/alter_cdr_tables_rc5_rc6.sql b/data/storage/mysql/alter_cdr_tables_rc5_rc6.sql index 1b315f21c..8b5694795 100644 --- a/data/storage/mysql/alter_cdr_tables_rc5_rc6.sql +++ b/data/storage/mysql/alter_cdr_tables_rc5_rc6.sql @@ -48,358 +48,3 @@ ALTER TABLE `rated_cdrs` ADD KEY `deleted_at_idx`(`deleted_at`) , DROP KEY `PRIMARY`, ADD PRIMARY KEY(`id`) ; - --- --- Table structure for table `tp_timings` --- -DROP TABLE IF EXISTS `tp_timings`; -CREATE TABLE `tp_timings` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `years` varchar(255) NOT NULL, - `months` varchar(255) NOT NULL, - `month_days` varchar(255) NOT NULL, - `week_days` varchar(255) NOT NULL, - `time` varchar(32) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - KEY `tpid_tmid` (`tpid`,`tag`), - UNIQUE KEY `tpid_tag` (`tpid`,`tag`) -); - --- --- Table structure for table `tp_destinations` --- - -DROP TABLE IF EXISTS `tp_destinations`; -CREATE TABLE `tp_destinations` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `prefix` varchar(24) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - KEY `tpid_dstid` (`tpid`,`tag`), - UNIQUE KEY `tpid_dest_prefix` (`tpid`,`tag`,`prefix`) -); - --- --- Table structure for table `tp_rates` --- - -DROP TABLE IF EXISTS `tp_rates`; -CREATE TABLE `tp_rates` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `connect_fee` decimal(7,4) NOT NULL, - `rate` decimal(7,4) NOT NULL, - `rate_unit` varchar(16) NOT NULL, - `rate_increment` varchar(16) NOT NULL, - `group_interval_start` varchar(16) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - UNIQUE KEY `unique_tprate` (`tpid`,`tag`,`group_interval_start`), - KEY `tpid` (`tpid`), - KEY `tpid_rtid` (`tpid`,`tag`) -); - --- --- Table structure for table `destination_rates` --- - -DROP TABLE IF EXISTS `tp_destination_rates`; -CREATE TABLE `tp_destination_rates` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `destinations_tag` varchar(64) NOT NULL, - `rates_tag` varchar(64) NOT NULL, - `rounding_method` varchar(255) NOT NULL, - `rounding_decimals` tinyint(4) NOT NULL, - `max_cost` decimal(7,4) NOT NULL, - `max_cost_strategy` varchar(16) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - KEY `tpid_drid` (`tpid`,`tag`), - UNIQUE KEY `tpid_drid_dstid` (`tpid`,`tag`,`destinations_tag`) -); - --- --- Table structure for table `tp_rating_plans` --- - -DROP TABLE IF EXISTS `tp_rating_plans`; -CREATE TABLE `tp_rating_plans` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `destrates_tag` varchar(64) NOT NULL, - `timing_tag` varchar(64) NOT NULL, - `weight` DECIMAL(8,2) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - KEY `tpid_rpl` (`tpid`,`tag`), - UNIQUE KEY `tpid_rplid_destrates_timings_weight` (`tpid`,`tag`,`destrates_tag`,`timing_tag`) -); - --- --- Table structure for table `tp_rate_profiles` --- - -DROP TABLE IF EXISTS `tp_rating_profiles`; -CREATE TABLE `tp_rating_profiles` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `loadid` varchar(64) NOT NULL, - `direction` varchar(8) NOT NULL, - `tenant` varchar(64) NOT NULL, - `category` varchar(32) NOT NULL, - `subject` varchar(64) NOT NULL, - `activation_time` varchar(24) NOT NULL, - `rating_plan_tag` varchar(64) NOT NULL, - `fallback_subjects` varchar(64), - `cdr_stat_queue_ids` varchar(64), - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - KEY `tpid_loadid` (`tpid`, `loadid`), - UNIQUE KEY `tpid_loadid_tenant_category_dir_subj_atime` (`tpid`,`loadid`, `tenant`,`category`,`direction`,`subject`,`activation_time`) -); - --- --- Table structure for table `tp_shared_groups` --- - -DROP TABLE IF EXISTS `tp_shared_groups`; -CREATE TABLE `tp_shared_groups` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `account` varchar(24) NOT NULL, - `strategy` varchar(24) NOT NULL, - `rating_subject` varchar(24) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - UNIQUE KEY `unique_shared_group` (`tpid`,`tag`,`account`,`strategy`,`rating_subject`) -); - --- --- Table structure for table `tp_actions` --- - -DROP TABLE IF EXISTS `tp_actions`; -CREATE TABLE `tp_actions` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `action` varchar(24) NOT NULL, - `balance_tag` varchar(64) NOT NULL, - `balance_type` varchar(24) NOT NULL, - `direction` varchar(8) NOT NULL, - `units` DECIMAL(20,4) NOT NULL, - `expiry_time` varchar(24) NOT NULL, - `timing_tags` varchar(128) NOT NULL, - `destination_tags` varchar(64) NOT NULL, - `rating_subject` varchar(64) NOT NULL, - `category` varchar(32) NOT NULL, - `shared_group` varchar(64) NOT NULL, - `balance_weight` DECIMAL(8,2) NOT NULL, - `extra_parameters` varchar(256) NOT NULL, - `weight` DECIMAL(8,2) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - UNIQUE KEY `unique_action` (`tpid`,`tag`,`action`,`balance_tag`,`balance_type`,`direction`,`expiry_time`,`timing_tags`,`destination_tags`,`shared_group`,`balance_weight`,`weight`) -); - --- --- Table structure for table `tp_action_timings` --- - -DROP TABLE IF EXISTS `tp_action_plans`; -CREATE TABLE `tp_action_plans` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `actions_tag` varchar(64) NOT NULL, - `timing_tag` varchar(64) NOT NULL, - `weight` DECIMAL(8,2) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - UNIQUE KEY `unique_action_schedule` (`tpid`,`tag`,`actions_tag`) -); - --- --- Table structure for table `tp_action_triggers` --- - -DROP TABLE IF EXISTS `tp_action_triggers`; -CREATE TABLE `tp_action_triggers` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `unique_id` varchar(64) NOT NULL, - `balance_tag` varchar(64) NOT NULL, - `balance_type` varchar(24) NOT NULL, - `balance_direction` varchar(8) NOT NULL, - `threshold_type` char(12) NOT NULL, - `threshold_value` DECIMAL(20,4) NOT NULL, - `recurrent` BOOLEAN NOT NULL, - `min_sleep` varchar(16) NOT NULL, - `balance_destination_tags` varchar(64) NOT NULL, - `balance_weight` DECIMAL(8,2) NOT NULL, - `balance_expiry_time` varchar(24) NOT NULL, - `balance_timing_tags` varchar(128) NOT NULL, - `balance_rating_subject` varchar(64) NOT NULL, - `balance_category` varchar(32) NOT NULL, - `balance_shared_group` varchar(64) NOT NULL, - `min_queued_items` int(11) NOT NULL, - `actions_tag` varchar(64) NOT NULL, - `weight` DECIMAL(8,2) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - UNIQUE KEY `unique_trigger_definition` (`tpid`,`tag`,`balance_tag`,`balance_type`,`balance_direction`,`threshold_type`,`threshold_value`,`balance_destination_tags`,`actions_tag`) -); - --- --- Table structure for table `tp_account_actions` --- - -DROP TABLE IF EXISTS `tp_account_actions`; -CREATE TABLE `tp_account_actions` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `loadid` varchar(64) NOT NULL, - `tenant` varchar(64) NOT NULL, - `account` varchar(64) NOT NULL, - `direction` varchar(8) NOT NULL, - `action_plan_tag` varchar(64), - `action_triggers_tag` varchar(64), - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - UNIQUE KEY `unique_tp_account` (`tpid`,`loadid`,`tenant`,`account`,`direction`) -); - --- --- Table structure for table `tp_lcr_rules` --- - -DROP TABLE IF EXISTS tp_lcr_rules; -CREATE TABLE tp_lcr_rules ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `direction` varchar(8) NOT NULL, - `tenant` varchar(64) NOT NULL, - `category` varchar(32) NOT NULL, - `account` varchar(24) NOT NULL, - `subject` varchar(64) NOT NULL, - `destination_tag` varchar(64) NOT NULL, - `rp_category` varchar(32) NOT NULL, - `strategy` varchar(16) NOT NULL, - `strategy_params` varchar(256) NOT NULL, - `activation_time` varchar(24) NOT NULL, - `weight` DECIMAL(8,2) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`) -); - --- --- Table structure for table `tp_derived_chargers` --- - -DROP TABLE IF EXISTS tp_derived_chargers; -CREATE TABLE tp_derived_chargers ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `loadid` varchar(64) NOT NULL, - `direction` varchar(8) NOT NULL, - `tenant` varchar(64) NOT NULL, - `category` varchar(32) NOT NULL, - `account` varchar(24) NOT NULL, - `subject` varchar(64) NOT NULL, - `runid` varchar(24) NOT NULL, - `run_filters` varchar(256) NOT NULL, - `req_type_field` varchar(24) NOT NULL, - `direction_field` varchar(24) NOT NULL, - `tenant_field` varchar(24) NOT NULL, - `category_field` varchar(24) NOT NULL, - `account_field` varchar(24) NOT NULL, - `subject_field` varchar(24) NOT NULL, - `destination_field` varchar(24) NOT NULL, - `setup_time_field` varchar(24) NOT NULL, - `pdd_field` varchar(24) NOT NULL, - `answer_time_field` varchar(24) NOT NULL, - `usage_field` varchar(24) NOT NULL, - `supplier_field` varchar(24) NOT NULL, - `disconnect_cause_field` varchar(24) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`) -); - - --- --- Table structure for table `tp_cdr_stats` --- - -DROP TABLE IF EXISTS tp_cdr_stats; -CREATE TABLE tp_cdr_stats ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `queue_length` int(11) NOT NULL, - `time_window` varchar(8) NOT NULL, - `save_interval` varchar(8) NOT NULL, - `metrics` varchar(64) NOT NULL, - `setup_interval` varchar(64) NOT NULL, - `tors` varchar(64) NOT NULL, - `cdr_hosts` varchar(64) NOT NULL, - `cdr_sources` varchar(64) NOT NULL, - `req_types` varchar(64) NOT NULL, - `directions` varchar(8) NOT NULL, - `tenants` varchar(64) NOT NULL, - `categories` varchar(32) NOT NULL, - `accounts` varchar(24) NOT NULL, - `subjects` varchar(64) NOT NULL, - `destination_prefixes` varchar(64) NOT NULL, - `pdd_interval` varchar(64) NOT NULL, - `usage_interval` varchar(64) NOT NULL, - `suppliers` varchar(64) NOT NULL, - `disconnect_causes` varchar(64) NOT NULL, - `mediation_runids` varchar(64) NOT NULL, - `rated_accounts` varchar(64) NOT NULL, - `rated_subjects` varchar(64) NOT NULL, - `cost_interval` varchar(24) NOT NULL, - `action_triggers` varchar(64) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`) -); - - -- --- Table structure for table `tp_users` --- - -DROP TABLE IF EXISTS tp_users; -CREATE TABLE tp_users ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tenant` varchar(64) NOT NULL, - `user_name` varchar(64) NOT NULL, - `attribute_name` varchar(64) NOT NULL, - `attribute_value` varchar(64) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`) -); \ No newline at end of file diff --git a/data/tariffplans/tutorial/Users.csv b/data/tariffplans/tutorial/Users.csv index df4057753..75936aa6a 100644 --- a/data/tariffplans/tutorial/Users.csv +++ b/data/tariffplans/tutorial/Users.csv @@ -1,4 +1,6 @@ #Tenant[0],UserName[1],AttributeName[2],AttributeValue[3] -cgrates.org,1001,test0,val0 -cgrates.org,1001,test1,val1 -cgrates.org,1002,another,value +cgrates.org,1001,SysUserName,danb +cgrates.org,1001,SysPassword,hisPass321 +cgrates.org,1001,Cli,+4986517174963 +cgrates.org,1002,SysUserName,rif +cgrates.org,1002,RifAttr,RifVal diff --git a/engine/cdrs.go b/engine/cdrs.go index 5cbd7e63a..5967cff15 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -147,6 +147,11 @@ func (self *CdrServer) RateCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqT // Returns error if not able to properly store the CDR, mediation is async since we can always recover offline func (self *CdrServer) processCdr(storedCdr *StoredCdr) (err error) { + if upData, err := LoadUserProfile(storedCdr, "ExtraFields"); err != nil { + return err + } else { + *storedCdr = upData.(StoredCdr) + } if storedCdr.ReqType == utils.META_NONE { return nil } From 879b8faca2a9947a9236b98ed07c288e89cc7aa2 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 29 Jul 2015 13:21:06 +0200 Subject: [PATCH 04/15] Setting the documentation version --- docs/conf.py | 2 +- utils/consts.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 4ba2ebf99..6ce2b9cdf 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -50,7 +50,7 @@ copyright = u'2012-2015, ITsysCOM' # The short X.Y version. version = '0.9.1' # The full version, including alpha/beta/rc tags. -release = '0.9.1rc4' +release = '0.9.1~rc6' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/utils/consts.go b/utils/consts.go index 4fe6e2e3a..ac075bdb1 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -26,7 +26,7 @@ var ( ) const ( - VERSION = "0.9.1rc6" + VERSION = "0.9.1~rc6" POSTGRES = "postgres" MYSQL = "mysql" MONGO = "mongo" From 186238a4fdff7663e77a3c7283bcc9dfccc25732 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 30 Jul 2015 13:56:34 +0200 Subject: [PATCH 05/15] Fix json configuration to parse rater.users, pubsubs, historys, add CdrStats and Users to CacheStats counters, fix #130, fix #131 --- apier/v1/apier.go | 15 +++++++++++++++ cmd/cgr-engine/cgr-engine.go | 6 +++--- config/config.go | 9 +++++++++ config/config_json_test.go | 3 ++- config/libconfig_json.go | 3 +++ data/conf/samples/tutlocal/cgrates.json | 5 +++++ .../prepaid1centpsec/AccountActions.csv | 2 +- general_tests/tutorial_local_test.go | 2 +- utils/apitpdata.go | 2 ++ 9 files changed, 41 insertions(+), 6 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 556ec6ea2..ef8dd9890 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -47,6 +47,7 @@ type ApierV1 struct { Config *config.CGRConfig Responder *engine.Responder CdrStatsSrv engine.StatsInterface + Users engine.UserService } func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error { @@ -940,6 +941,20 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach cs.AccountAliases = cache2go.CountEntries(utils.ACC_ALIAS_PREFIX) cs.DerivedChargers = cache2go.CountEntries(utils.DERIVEDCHARGERS_PREFIX) cs.LcrProfiles = cache2go.CountEntries(utils.LCR_PREFIX) + if self.CdrStatsSrv != nil && self.Config.CDRStatsEnabled { + var queueIds []string + if err := self.CdrStatsSrv.GetQueueIds(0, &queueIds); err != nil { + return utils.NewErrServerError(err) + } + cs.CdrStats = len(queueIds) + } + if self.Config.RaterUserServer == utils.INTERNAL { + var ups engine.UserProfiles + if err := self.Users.GetUsers(engine.UserProfile{}, &ups); err != nil { + return utils.NewErrServerError(err) + } + cs.Users = len(ups) + } *reply = *cs return nil } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 9e728235f..c4a37ebbf 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -482,7 +482,6 @@ func main() { } server.RpcRegisterName("ScribeV1", scribeServer) } - if cfg.UserServerEnabled { userServer, err = engine.NewUserMap(ratingDb) if err != nil { @@ -556,8 +555,9 @@ func main() { wg.Wait() responder := &engine.Responder{ExitChan: exitChan, Stats: cdrStats} - apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats} - apierRpcV2 := &v2.ApierV2{ApierV1: v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats}} + apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: userServer} + apierRpcV2 := &v2.ApierV2{ + ApierV1: v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: userServer}} if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != utils.INTERNAL { engine.Logger.Info("Registering Rater service") diff --git a/config/config.go b/config/config.go index c299ae9ec..1e834085c 100644 --- a/config/config.go +++ b/config/config.go @@ -559,6 +559,15 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnRaterCfg.Cdrstats != nil { self.RaterCdrStats = *jsnRaterCfg.Cdrstats } + if jsnRaterCfg.Historys != nil { + self.RaterHistoryServer = *jsnRaterCfg.Historys + } + if jsnRaterCfg.Pubsubs != nil { + self.RaterPubSubServer = *jsnRaterCfg.Pubsubs + } + if jsnRaterCfg.Users != nil { + self.RaterUserServer = *jsnRaterCfg.Users + } } if jsnBalancerCfg != nil && jsnBalancerCfg.Enabled != nil { diff --git a/config/config_json_test.go b/config/config_json_test.go index 02869c4c7..b32e74e1c 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -121,7 +121,8 @@ func TestDfBalancerJsonCfg(t *testing.T) { } func TestDfRaterJsonCfg(t *testing.T) { - eCfg := &RaterJsonCfg{Enabled: utils.BoolPointer(false), Balancer: utils.StringPointer(""), Cdrstats: utils.StringPointer("")} + eCfg := &RaterJsonCfg{Enabled: utils.BoolPointer(false), Balancer: utils.StringPointer(""), Cdrstats: utils.StringPointer(""), + Historys: utils.StringPointer(""), Pubsubs: utils.StringPointer(""), Users: utils.StringPointer("")} if cfg, err := dfCgrJsonCfg.RaterJsonCfg(); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCfg, cfg) { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 3ca603aef..1445f7235 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -61,6 +61,9 @@ type RaterJsonCfg struct { Enabled *bool Balancer *string Cdrstats *string + Historys *string + Pubsubs *string + Users *string } // Scheduler config section diff --git a/data/conf/samples/tutlocal/cgrates.json b/data/conf/samples/tutlocal/cgrates.json index 21b99b52a..d4567a697 100644 --- a/data/conf/samples/tutlocal/cgrates.json +++ b/data/conf/samples/tutlocal/cgrates.json @@ -13,6 +13,7 @@ "rater": { "enabled": true, // enable Rater service: "cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> + "users": "internal", }, "scheduler": { @@ -30,4 +31,8 @@ "enabled": true, // starts the cdrstats service: }, +"users": { + "enabled": true, +}, + } diff --git a/data/tariffplans/prepaid1centpsec/AccountActions.csv b/data/tariffplans/prepaid1centpsec/AccountActions.csv index dd63c2299..544e4df45 100644 --- a/data/tariffplans/prepaid1centpsec/AccountActions.csv +++ b/data/tariffplans/prepaid1centpsec/AccountActions.csv @@ -1,6 +1,6 @@ #Tenant,Account,Direction,ActionPlanId,ActionTriggersId cgrates.org,1001,*out,PREPAID_10,STANDARD_TRIGGERS -cgrates.org,1002,*out,PREPAID_10,STANDARD_TRIGGERS +cgrates.org,1002;1006,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1003,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1004,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1005,*out,PREPAID_10,STANDARD_TRIGGERS diff --git a/general_tests/tutorial_local_test.go b/general_tests/tutorial_local_test.go index cd78ef52c..adebc3cf0 100644 --- a/general_tests/tutorial_local_test.go +++ b/general_tests/tutorial_local_test.go @@ -117,7 +117,7 @@ func TestTutLocalCacheStats(t *testing.T) { } var rcvStats *utils.CacheStats expectedStats := &utils.CacheStats{Destinations: 4, RatingPlans: 3, RatingProfiles: 8, Actions: 7, SharedGroups: 1, RatingAliases: 1, AccountAliases: 1, - DerivedChargers: 1, LcrProfiles: 4} + DerivedChargers: 1, LcrProfiles: 4, CdrStats: 6} var args utils.AttrCacheStats if err := tutLocalRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 9495ddb8e..3adbde62a 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -546,6 +546,8 @@ type CacheStats struct { AccountAliases int DerivedChargers int LcrProfiles int + CdrStats int + Users int } type AttrCachedItemAge struct { From 44780f1fe33eb97be9e39c9a1f39f6f749feb716 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 30 Jul 2015 14:46:36 +0200 Subject: [PATCH 06/15] Disable alias in prepaid1cent --- data/tariffplans/prepaid1centpsec/AccountActions.csv | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/tariffplans/prepaid1centpsec/AccountActions.csv b/data/tariffplans/prepaid1centpsec/AccountActions.csv index 544e4df45..dd63c2299 100644 --- a/data/tariffplans/prepaid1centpsec/AccountActions.csv +++ b/data/tariffplans/prepaid1centpsec/AccountActions.csv @@ -1,6 +1,6 @@ #Tenant,Account,Direction,ActionPlanId,ActionTriggersId cgrates.org,1001,*out,PREPAID_10,STANDARD_TRIGGERS -cgrates.org,1002;1006,*out,PREPAID_10,STANDARD_TRIGGERS +cgrates.org,1002,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1003,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1004,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1005,*out,PREPAID_10,STANDARD_TRIGGERS From ee62da445f1f452d7f2635af6a8ea1c43b14e0fe Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 30 Jul 2015 16:46:37 +0200 Subject: [PATCH 07/15] CDR filters for fwv, fixes #135 --- cdrc/fwv.go | 25 +++++++++++++++++++++---- cdrc/fwv_test.go | 14 ++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/cdrc/fwv.go b/cdrc/fwv.go index 8ec7c2c31..5f97d106f 100644 --- a/cdrc/fwv.go +++ b/cdrc/fwv.go @@ -115,13 +115,12 @@ func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error engine.Logger.Err(fmt.Sprintf(" Could not read complete line, have instead: %s", string(buf))) return nil, io.EOF } + record := string(buf) for cfgKey := range self.cdrcCfgs { - filterBreak := false - // ToDo: Field filters - if filterBreak { // Stop importing cdrc fields profile due to non matching filter + if passes := self.recordPassesCfgFilter(record, cfgKey); !passes { continue } - if storedCdr, err := self.recordToStoredCdr(string(buf), cfgKey); err != nil { + if storedCdr, err := self.recordToStoredCdr(record, cfgKey); err != nil { return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error()) } else { recordCdrs = append(recordCdrs, storedCdr) @@ -130,6 +129,24 @@ func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error return recordCdrs, nil } +func (self *FwvRecordsProcessor) recordPassesCfgFilter(record, configKey string) bool { + filterPasses := true + for _, rsrFilter := range self.cdrcCfgs[configKey].CdrFilter { + if rsrFilter == nil { // Nil filter does not need to match anything + continue + } + if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { + fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter) + return false + } else if !rsrFilter.FilterPasses(record[cfgFieldIdx:]) { + fmt.Printf("Record content to test: %s\n", record[cfgFieldIdx:]) + filterPasses = false + break + } + } + return filterPasses +} + // Converts a record (header or normal) to StoredCdr func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cfgKey string) (*engine.StoredCdr, error) { var err error diff --git a/cdrc/fwv_test.go b/cdrc/fwv_test.go index 3d4d9cd16..39f7c6a9c 100644 --- a/cdrc/fwv_test.go +++ b/cdrc/fwv_test.go @@ -19,6 +19,8 @@ along with this program. If not, see package cdrc import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" "testing" ) @@ -39,3 +41,15 @@ func TestFwvValue(t *testing.T) { } } + +func TestFwvRecordPassesCfgFilter(t *testing.T) { + //record, configKey string) bool { + cgrConfig, _ := config.NewDefaultCGRConfig() + cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT] // We don't really care that is for .csv since all we want to test are the filters + cdrcConfig.CdrFilter = utils.ParseRSRFieldsMustCompile(`~52:s/^0(\d{9})/+49${1}/(+49123123120)`, utils.INFIELD_SEP) + fwvRp := &FwvRecordsProcessor{cdrcCfgs: cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"]} + cdrLine := "CDR0000010 0 20120708181506000123451234 0040123123120 004 000018009980010001ISDN ABC 10Buiten uw regio EHV 00000009190000000009" + if passesFilter := fwvRp.recordPassesCfgFilter(cdrLine, utils.META_DEFAULT); !passesFilter { + t.Error("Not passes filter") + } +} From 8a19b2736c1d8fee789b3da46378deb3e4ded695 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 30 Jul 2015 18:01:38 +0300 Subject: [PATCH 08/15] fix form missing account alias key --- engine/storage_redis.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 20075cb5c..2900c645d 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -23,6 +23,7 @@ import ( "compress/zlib" "errors" "fmt" + "log" "strings" "github.com/cgrates/cgrates/cache2go" @@ -430,10 +431,10 @@ func (rs *RedisStorage) RemoveRpAliases(tenantRtSubjects []*TenantRatingSubject, if tntRSubj.Subject != alias { continue } - cache2go.RemKey(key) if _, err = rs.db.Del(key); err != nil { return err } + cache2go.RemKey(key) break } } @@ -553,10 +554,12 @@ func (rs *RedisStorage) RemoveAccAliases(tenantAccounts []*TenantAccount, skipCa if tntAcnt.Account != alias { continue } - cache2go.RemKey(key) if _, err = rs.db.Del(key); err != nil { + log.Print("") return err } + cache2go.RemKey(key) + break } } @@ -580,6 +583,7 @@ func (rs *RedisStorage) RemoveAccAliases(tenantAccounts []*TenantAccount, skipCa return err } cache2go.RemKey(utils.ACC_ALIAS_PREFIX + key) + break } } } From 36f57b0e93cfade989b9c47816163712c0e3a968 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 30 Jul 2015 18:40:23 +0300 Subject: [PATCH 09/15] sync for user service --- engine/users.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/engine/users.go b/engine/users.go index 84018ac9c..0cf821c74 100644 --- a/engine/users.go +++ b/engine/users.go @@ -3,6 +3,7 @@ package engine import ( "sort" "strings" + "sync" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" @@ -61,6 +62,7 @@ type UserMap struct { index map[string]map[string]bool indexKeys []string ratingDb RatingStorage + mu sync.RWMutex } func NewUserMap(ratingDb RatingStorage) (*UserMap, error) { @@ -85,6 +87,8 @@ func newUserMap(ratingDb RatingStorage) *UserMap { } func (um *UserMap) SetUser(up UserProfile, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() if err := um.ratingDb.SetUser(&up); err != nil { *reply = err.Error() return err @@ -96,6 +100,8 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error { } func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() if err := um.ratingDb.RemoveUser(up.GetId()); err != nil { *reply = err.Error() return err @@ -107,6 +113,8 @@ func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { } func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() m, found := um.table[up.GetId()] if !found { *reply = utils.ErrNotFound.Error() @@ -144,6 +152,8 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { } func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { + um.mu.RLock() + defer um.mu.RUnlock() table := um.table // no index indexUnionKeys := make(map[string]bool) @@ -223,6 +233,8 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { } func (um *UserMap) AddIndex(indexes []string, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() um.indexKeys = indexes for key, values := range um.table { up := &UserProfile{Profile: values} @@ -305,6 +317,8 @@ func (um *UserMap) deleteIndex(up *UserProfile) { } func (um *UserMap) GetIndexes(in string, reply *map[string][]string) error { + um.mu.RLock() + defer um.mu.RUnlock() indexes := make(map[string][]string) for key, values := range um.index { var vs []string From 92218a62fdf66554d3c9a1155312141abaa011cb Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 30 Jul 2015 23:12:15 +0300 Subject: [PATCH 10/15] load distribution lcr (work in progress) --- engine/calldesc.go | 113 ++++++++++++++++++++++++--------------- engine/lcr.go | 87 +++++++++++++++++++++++++++--- engine/lcr_test.go | 99 ++++++++++++++++++++++++++++++++++ engine/responder_test.go | 11 +++- 4 files changed, 259 insertions(+), 51 deletions(-) diff --git a/engine/calldesc.go b/engine/calldesc.go index 958177991..5580c0687 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -795,6 +795,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { if lcrCost.Entry == nil { return lcrCost, nil } + //log.Printf("Entry: %+v", lcrCost.Entry) if lcrCost.Entry.Strategy == LCR_STRATEGY_STATIC { for _, supplier := range lcrCost.Entry.GetParams() { @@ -865,7 +866,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { accNeverConsidered := true tccNeverConsidered := true ddcNeverConsidered := true - if utils.IsSliceMember([]string{LCR_STRATEGY_QOS, LCR_STRATEGY_QOS_THRESHOLD}, lcrCost.Entry.Strategy) { + if utils.IsSliceMember([]string{LCR_STRATEGY_QOS, LCR_STRATEGY_QOS_THRESHOLD, LCR_STRATEGY_LOAD}, lcrCost.Entry.Strategy) { if stats == nil { lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ Supplier: supplier, @@ -891,60 +892,84 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { } } } + statsErr := false + var supplierQueues []*StatsQueue for _, qId := range cdrStatsQueueIds { - statValues := make(map[string]float64) - if err := stats.GetValues(qId, &statValues); err != nil { - lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ - Supplier: supplier, - Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()), - }) - statsErr = true - break - } - if asr, exists := statValues[ASR]; exists { - if asr > STATS_NA { - asrValues = append(asrValues, asr) + if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD { + for _, qId := range cdrStatsQueueIds { + sq := &StatsQueue{} + if err := stats.GetQueue(qId, sq); err == nil { + if sq.conf.QueueLength == 0 { //only add qeues that don't have fixed length + supplierQueues = append(supplierQueues, sq) + } + } } - asrNeverConsidered = false - } - if pdd, exists := statValues[PDD]; exists { - if pdd > STATS_NA { - pddValues = append(pddValues, pdd) + } else { + statValues := make(map[string]float64) + if err := stats.GetValues(qId, &statValues); err != nil { + lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ + Supplier: supplier, + Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()), + }) + statsErr = true + break } - pddNeverConsidered = false - } - if acd, exists := statValues[ACD]; exists { - if acd > STATS_NA { - acdValues = append(acdValues, acd) + if asr, exists := statValues[ASR]; exists { + if asr > STATS_NA { + asrValues = append(asrValues, asr) + } + asrNeverConsidered = false } - acdNeverConsidered = false - } - if tcd, exists := statValues[TCD]; exists { - if tcd > STATS_NA { - tcdValues = append(tcdValues, tcd) + if pdd, exists := statValues[PDD]; exists { + if pdd > STATS_NA { + pddValues = append(pddValues, pdd) + } + pddNeverConsidered = false } - tcdNeverConsidered = false - } - if acc, exists := statValues[ACC]; exists { - if acc > STATS_NA { - accValues = append(accValues, acc) + if acd, exists := statValues[ACD]; exists { + if acd > STATS_NA { + acdValues = append(acdValues, acd) + } + acdNeverConsidered = false } - accNeverConsidered = false - } - if tcc, exists := statValues[TCC]; exists { - if tcc > STATS_NA { - tccValues = append(tccValues, tcc) + if tcd, exists := statValues[TCD]; exists { + if tcd > STATS_NA { + tcdValues = append(tcdValues, tcd) + } + tcdNeverConsidered = false } - tccNeverConsidered = false - } - if ddc, exists := statValues[TCC]; exists { - if ddc > STATS_NA { - ddcValues = append(ddcValues, ddc) + if acc, exists := statValues[ACC]; exists { + if acc > STATS_NA { + accValues = append(accValues, acc) + } + accNeverConsidered = false } - ddcNeverConsidered = false + if tcc, exists := statValues[TCC]; exists { + if tcc > STATS_NA { + tccValues = append(tccValues, tcc) + } + tccNeverConsidered = false + } + if ddc, exists := statValues[TCC]; exists { + if ddc > STATS_NA { + ddcValues = append(ddcValues, ddc) + } + ddcNeverConsidered = false + } + } } + if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD { + if len(supplierQueues) > 0 { + lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ + Supplier: supplier, + supplierQueues: supplierQueues, + }) + } + continue // next supplier + } + if statsErr { // Stats error in loop, to go next supplier continue } diff --git a/engine/lcr.go b/engine/lcr.go index ba78df927..cdab45627 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -36,6 +36,7 @@ const ( LCR_STRATEGY_HIGHEST = "*highest_cost" LCR_STRATEGY_QOS_THRESHOLD = "*qos_threshold" LCR_STRATEGY_QOS = "*qos" + LCR_STRATEGY_LOAD = "*load_distribution" ) // A request for LCR, used in APIer and SM where we need to expose it @@ -134,12 +135,13 @@ type LCRCost struct { } type LCRSupplierCost struct { - Supplier string - Cost float64 - Duration time.Duration - Error string // Not error due to JSON automatic serialization into struct - QOS map[string]float64 - qosSortParams []string + Supplier string + Cost float64 + Duration time.Duration + Error string // Not error due to JSON automatic serialization into struct + QOS map[string]float64 + qosSortParams []string + supplierQueues []*StatsQueue // used for load distribution } func (lcr *LCR) GetId() string { @@ -291,11 +293,84 @@ func (lc *LCRCost) Sort() { sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts)) case LCR_STRATEGY_QOS: sort.Sort(QOSSorter(lc.SupplierCosts)) + case LCR_STRATEGY_LOAD: + lc.SortLoadDistribution() } } +func (lc *LCRCost) SortLoadDistribution() { + // find the time window that is common to all qeues + scoreBoard := make(map[time.Duration]int) // register TimeWindow across suppliers + + var winnerTimeWindow time.Duration + maxScore := 0 + for _, supCost := range lc.SupplierCosts { + timeWindowFlag := make(map[time.Duration]bool) // flags appearance in same supplier + for _, sq := range supCost.supplierQueues { + if !timeWindowFlag[sq.conf.TimeWindow] { + timeWindowFlag[sq.conf.TimeWindow] = true + scoreBoard[sq.conf.TimeWindow]++ + } + if scoreBoard[sq.conf.TimeWindow] > maxScore { + maxScore = scoreBoard[sq.conf.TimeWindow] + winnerTimeWindow = sq.conf.TimeWindow + } + } + } + supplierQueues := make(map[string]*StatsQueue) + for _, supCost := range lc.SupplierCosts { + for _, sq := range supCost.supplierQueues { + if sq.conf.TimeWindow == winnerTimeWindow { + supplierQueues[supCost.Supplier] = sq + break + } + } + } + /*for supplier, sq := range supplierQueues { + log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow) + }*/ + // if all have less than ponder return random order + // if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first + // if all have a multiple of ponder return in the order of cdr times, oldest first + +} + +// used in load distribution strategy only +// receives a long supplier id and will return the ponder found in strategy params +func (lc *LCRCost) GetSupplierPonder(supplier string) float64 { + // parse strategy params + ponders := make(map[string]float64) + params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP) + for _, param := range params { + ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) + if len(ponderSlice) != 2 { + Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) + continue + } + p, err := strconv.ParseFloat(ponderSlice[1], 64) + if err != nil { + Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) + continue + } + ponders[ponderSlice[0]] = p + } + parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP) + if len(parts) > 0 { + supplierSubject := parts[len(parts)-1] + if ponder, found := ponders[supplierSubject]; found { + return ponder + } + if ponder, found := ponders[utils.META_DEFAULT]; found { + return ponder + } + } + + return 1 +} + func (lc *LCRCost) HasErrors() bool { for _, supplCost := range lc.SupplierCosts { + if len(supplCost.Error) != 0 { return true } diff --git a/engine/lcr_test.go b/engine/lcr_test.go index 0364c4652..847eefbda 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -278,3 +278,102 @@ func TestLCRCostSuppliersString(t *testing.T) { t.Errorf("Expecting: %s, received: %s", eSupplStr, supplStr) } } + +func TestLCRCostSuppliersLoad(t *testing.T) { + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3;*default:7", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "" || + lcrCost.SupplierCosts[1].Supplier != "" || + lcrCost.SupplierCosts[2].Supplier != "" { + //t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} diff --git a/engine/responder_test.go b/engine/responder_test.go index c0ad5ecd4..68d5054a7 100644 --- a/engine/responder_test.go +++ b/engine/responder_test.go @@ -354,7 +354,16 @@ func TestGetLCR(t *testing.T) { }, }, } - for _, lcr := range []*LCR{lcrStatic, lcrLowestCost, lcrQosThreshold, lcrQos} { + lcrLoad := &LCR{Direction: utils.OUT, Tenant: "tenant12", Category: "call_load", Account: utils.ANY, Subject: utils.ANY, + Activations: []*LCRActivation{ + &LCRActivation{ + ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC), + Entries: []*LCREntry{ + &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3", Weight: 10.0}}, + }, + }, + } + for _, lcr := range []*LCR{lcrStatic, lcrLowestCost, lcrQosThreshold, lcrQos, lcrLoad} { if err := ratingStorage.SetLCR(lcr); err != nil { t.Error(err) } From ab2b05dbde255fc229588692b1370b2721cd415a Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 31 Jul 2015 09:12:25 +0200 Subject: [PATCH 11/15] Adding CDRs posted information to CDRC, fixes #132 --- cdrc/cdrc.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index df8804938..a99976f42 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -258,6 +258,7 @@ func (self *Cdrc) processFile(filePath string) error { return fmt.Errorf("Unsupported CDR format: %s", self.cdrFormat) } procRowNr := 0 + cdrsPosted := 0 timeStart := time.Now() for { cdrs, err := recordsProcessor.ProcessNextRecord() @@ -279,6 +280,8 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(fmt.Sprintf(" Failed sending CDR, %+v, error: %s", storedCdr, err.Error())) } else if reply != "OK" { engine.Logger.Err(fmt.Sprintf(" Received unexpected reply for CDR, %+v, reply: %s", storedCdr, reply)) + } else { + cdrsPosted += 1 } } } @@ -288,7 +291,7 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(err.Error()) return err } - engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, run duration: %s", - fn, newPath, procRowNr, time.Now().Sub(timeStart))) + engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, CDRs posted: %d, run duration: %s", + fn, newPath, procRowNr, cdrsPosted, time.Now().Sub(timeStart))) return nil } From 9176f56589fb0b5e26b37b8a07c2bf9603e11d6c Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 31 Jul 2015 09:28:27 +0200 Subject: [PATCH 12/15] Properly export ExtraFields in UsageRequest and ExternalCdrs --- engine/storedcdr.go | 52 ++++++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/engine/storedcdr.go b/engine/storedcdr.go index 700d56295..a7968dd9b 100644 --- a/engine/storedcdr.go +++ b/engine/storedcdr.go @@ -33,7 +33,7 @@ func NewStoredCdrFromExternalCdr(extCdr *ExternalCdr) (*StoredCdr, error) { var err error storedCdr := &StoredCdr{CgrId: extCdr.CgrId, OrderId: extCdr.OrderId, TOR: extCdr.TOR, AccId: extCdr.AccId, CdrHost: extCdr.CdrHost, CdrSource: extCdr.CdrSource, ReqType: extCdr.ReqType, Direction: extCdr.Direction, Tenant: extCdr.Tenant, Category: extCdr.Category, Account: extCdr.Account, Subject: extCdr.Subject, - Destination: extCdr.Destination, Supplier: extCdr.Supplier, DisconnectCause: extCdr.DisconnectCause, ExtraFields: extCdr.ExtraFields, + Destination: extCdr.Destination, Supplier: extCdr.Supplier, DisconnectCause: extCdr.DisconnectCause, MediationRunId: extCdr.MediationRunId, RatedAccount: extCdr.RatedAccount, RatedSubject: extCdr.RatedSubject, Cost: extCdr.Cost, Rated: extCdr.Rated} if storedCdr.SetupTime, err = utils.ParseTimeDetectLayout(extCdr.SetupTime); err != nil { return nil, err @@ -55,6 +55,12 @@ func NewStoredCdrFromExternalCdr(extCdr *ExternalCdr) (*StoredCdr, error) { return nil, err } } + if extCdr.ExtraFields != nil { + storedCdr.ExtraFields = make(map[string]string) + } + for k, v := range extCdr.ExtraFields { + storedCdr.ExtraFields[k] = v + } return storedCdr, nil } @@ -651,6 +657,7 @@ type UsageRecord struct { SetupTime string AnswerTime string Usage string + ExtraFields map[string]string } func (self *UsageRecord) AsStoredCdr() (*StoredCdr, error) { @@ -666,24 +673,18 @@ func (self *UsageRecord) AsStoredCdr() (*StoredCdr, error) { if storedCdr.Usage, err = utils.ParseDurationWithSecs(self.Usage); err != nil { return nil, err } + if self.ExtraFields != nil { + storedCdr.ExtraFields = make(map[string]string) + } + for k, v := range self.ExtraFields { + storedCdr.ExtraFields[k] = v + } return storedCdr, nil } func (self *UsageRecord) AsCallDescriptor() (*CallDescriptor, error) { var err error - timeStr := self.AnswerTime - if len(timeStr) == 0 { // In case of auth, answer time will not be defined, so take it out of setup one - timeStr = self.SetupTime - } - startTime, err := utils.ParseTimeDetectLayout(timeStr) - if err != nil { - return nil, err - } - usage, err := utils.ParseDurationWithSecs(self.Usage) - if err != nil { - return nil, err - } - return &CallDescriptor{ + cd := &CallDescriptor{ TOR: self.TOR, Direction: self.Direction, Tenant: self.Tenant, @@ -691,7 +692,24 @@ func (self *UsageRecord) AsCallDescriptor() (*CallDescriptor, error) { Subject: self.Subject, Account: self.Account, Destination: self.Destination, - TimeStart: startTime, - TimeEnd: startTime.Add(usage), - }, nil + } + timeStr := self.AnswerTime + if len(timeStr) == 0 { // In case of auth, answer time will not be defined, so take it out of setup one + timeStr = self.SetupTime + } + if cd.TimeStart, err = utils.ParseTimeDetectLayout(timeStr); err != nil { + return nil, err + } + if usage, err := utils.ParseDurationWithSecs(self.Usage); err != nil { + return nil, err + } else { + cd.TimeEnd = cd.TimeStart.Add(usage) + } + if self.ExtraFields != nil { + cd.ExtraFields = make(map[string]string) + } + for k, v := range self.ExtraFields { + cd.ExtraFields[k] = v + } + return cd, nil } From d4a440d0ec40ce7d33ee43a8b8188bd2322b2ad3 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 11:27:38 +0300 Subject: [PATCH 13/15] first working load distribution lcr --- engine/lcr.go | 34 +++++++++++++++++++++++++++++----- engine/lcr_test.go | 23 ++++++++++++----------- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/engine/lcr.go b/engine/lcr.go index cdab45627..653bcbf1e 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -20,6 +20,7 @@ package engine import ( "fmt" + "math/rand" "sort" "strconv" "strings" @@ -37,6 +38,12 @@ const ( LCR_STRATEGY_QOS_THRESHOLD = "*qos_threshold" LCR_STRATEGY_QOS = "*qos" LCR_STRATEGY_LOAD = "*load_distribution" + + // used for load distribution sorting + RAND_LIMIT = 99 + LOW_PRIORITY_LIMIT = 100 + MED_PRIORITY_LIMIT = 200 + HIGH_PRIORITY_LIMIT = 300 ) // A request for LCR, used in APIer and SM where we need to expose it @@ -295,6 +302,7 @@ func (lc *LCRCost) Sort() { sort.Sort(QOSSorter(lc.SupplierCosts)) case LCR_STRATEGY_LOAD: lc.SortLoadDistribution() + sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts)) } } @@ -317,11 +325,11 @@ func (lc *LCRCost) SortLoadDistribution() { } } } - supplierQueues := make(map[string]*StatsQueue) + supplierQueues := make(map[*LCRSupplierCost]*StatsQueue) for _, supCost := range lc.SupplierCosts { for _, sq := range supCost.supplierQueues { if sq.conf.TimeWindow == winnerTimeWindow { - supplierQueues[supCost.Supplier] = sq + supplierQueues[supCost] = sq break } } @@ -333,13 +341,29 @@ func (lc *LCRCost) SortLoadDistribution() { // if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first // if all have a multiple of ponder return in the order of cdr times, oldest first + // first put them in one of the above categories + for supCost, sq := range supplierQueues { + ponder := lc.GetSupplierPonder(supCost.Supplier) + cdrCount := len(sq.Cdrs) + if cdrCount < ponder { + supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT)) + continue + } + if cdrCount%ponder == 0 { + supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() + continue + } else { + supCost.Cost = float64(HIGH_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() + continue + } + } } // used in load distribution strategy only // receives a long supplier id and will return the ponder found in strategy params -func (lc *LCRCost) GetSupplierPonder(supplier string) float64 { +func (lc *LCRCost) GetSupplierPonder(supplier string) int { // parse strategy params - ponders := make(map[string]float64) + ponders := make(map[string]int) params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP) for _, param := range params { ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) @@ -347,7 +371,7 @@ func (lc *LCRCost) GetSupplierPonder(supplier string) float64 { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue } - p, err := strconv.ParseFloat(ponderSlice[1], 64) + p, err := strconv.Atoi(ponderSlice[1]) if err != nil { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue diff --git a/engine/lcr_test.go b/engine/lcr_test.go index 847eefbda..4ecb1d8bc 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -280,6 +280,7 @@ func TestLCRCostSuppliersString(t *testing.T) { } func TestLCRCostSuppliersLoad(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) lcrCost := &LCRCost{ Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3;*default:7", Weight: 10.0}, SupplierCosts: []*LCRSupplierCost{ @@ -287,21 +288,21 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:ivo12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 3 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 1 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -313,21 +314,21 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, @@ -339,28 +340,28 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:rif12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 1 * time.Minute, @@ -374,6 +375,6 @@ func TestLCRCostSuppliersLoad(t *testing.T) { if lcrCost.SupplierCosts[0].Supplier != "" || lcrCost.SupplierCosts[1].Supplier != "" || lcrCost.SupplierCosts[2].Supplier != "" { - //t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) } } From 6110c321b59e816cacc13ee63bf11b072a97d2d6 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 11:56:26 +0300 Subject: [PATCH 14/15] fixes and more tests for lcr --- engine/lcr.go | 4 +- engine/lcr_test.go | 120 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 111 insertions(+), 13 deletions(-) diff --git a/engine/lcr.go b/engine/lcr.go index 653bcbf1e..543af6e9e 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -350,10 +350,10 @@ func (lc *LCRCost) SortLoadDistribution() { continue } if cdrCount%ponder == 0 { - supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() + supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) continue } else { - supCost.Cost = float64(HIGH_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() + supCost.Cost = float64(HIGH_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) continue } } diff --git a/engine/lcr_test.go b/engine/lcr_test.go index 4ecb1d8bc..f158ad5d5 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -288,7 +288,7 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:ivo12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 3 * time.Minute, @@ -314,21 +314,21 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, @@ -340,28 +340,28 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:rif12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 1 * time.Minute, @@ -372,9 +372,107 @@ func TestLCRCostSuppliersLoad(t *testing.T) { }, } lcrCost.Sort() - if lcrCost.SupplierCosts[0].Supplier != "" || - lcrCost.SupplierCosts[1].Supplier != "" || - lcrCost.SupplierCosts[2].Supplier != "" { + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:dan12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:3;dan12:5;*default:2", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" || + lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" { t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) } } From bc82a4165ab6c6f62d05bab17b839cc90396e92b Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 12:02:12 +0300 Subject: [PATCH 15/15] one more test --- engine/lcr_test.go | 102 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/engine/lcr_test.go b/engine/lcr_test.go index f158ad5d5..f8d12133d 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -452,7 +452,107 @@ func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" || + lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllOver(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:2;dan12:4;*default:2", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute,