diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 556ec6ea2..ef8dd9890 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -47,6 +47,7 @@ type ApierV1 struct { Config *config.CGRConfig Responder *engine.Responder CdrStatsSrv engine.StatsInterface + Users engine.UserService } func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error { @@ -940,6 +941,20 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach cs.AccountAliases = cache2go.CountEntries(utils.ACC_ALIAS_PREFIX) cs.DerivedChargers = cache2go.CountEntries(utils.DERIVEDCHARGERS_PREFIX) cs.LcrProfiles = cache2go.CountEntries(utils.LCR_PREFIX) + if self.CdrStatsSrv != nil && self.Config.CDRStatsEnabled { + var queueIds []string + if err := self.CdrStatsSrv.GetQueueIds(0, &queueIds); err != nil { + return utils.NewErrServerError(err) + } + cs.CdrStats = len(queueIds) + } + if self.Config.RaterUserServer == utils.INTERNAL { + var ups engine.UserProfiles + if err := self.Users.GetUsers(engine.UserProfile{}, &ups); err != nil { + return utils.NewErrServerError(err) + } + cs.Users = len(ups) + } *reply = *cs return nil } diff --git a/apier/v1/cdrsv1.go b/apier/v1/cdrsv1.go index a0b82923e..afa46d850 100644 --- a/apier/v1/cdrsv1.go +++ b/apier/v1/cdrsv1.go @@ -41,14 +41,6 @@ func (self *CdrsV1) ProcessCdr(cdr *engine.StoredCdr, reply *string) error { // Designed for external programs feeding CDRs to CGRateS func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCdr, reply *string) error { - out, err := engine.LoadUserProfile(cdr, "ExtraFields") - if err != nil { - *reply = err.Error() - return err - } - if upcdr, ok := out.(engine.ExternalCdr); ok { - *cdr = upcdr - } if err := self.CdrSrv.ProcessExternalCdr(cdr); err != nil { return utils.NewErrServerError(err) } diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index df8804938..a99976f42 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -258,6 +258,7 @@ func (self *Cdrc) processFile(filePath string) error { return fmt.Errorf("Unsupported CDR format: %s", self.cdrFormat) } procRowNr := 0 + cdrsPosted := 0 timeStart := time.Now() for { cdrs, err := recordsProcessor.ProcessNextRecord() @@ -279,6 +280,8 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(fmt.Sprintf(" Failed sending CDR, %+v, error: %s", storedCdr, err.Error())) } else if reply != "OK" { engine.Logger.Err(fmt.Sprintf(" Received unexpected reply for CDR, %+v, reply: %s", storedCdr, reply)) + } else { + cdrsPosted += 1 } } } @@ -288,7 +291,7 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(err.Error()) return err } - engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, run duration: %s", - fn, newPath, procRowNr, time.Now().Sub(timeStart))) + engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, CDRs posted: %d, run duration: %s", + fn, newPath, procRowNr, cdrsPosted, time.Now().Sub(timeStart))) return nil } diff --git a/cdrc/fwv.go b/cdrc/fwv.go index 8ec7c2c31..5f97d106f 100644 --- a/cdrc/fwv.go +++ b/cdrc/fwv.go @@ -115,13 +115,12 @@ func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error engine.Logger.Err(fmt.Sprintf(" Could not read complete line, have instead: %s", string(buf))) return nil, io.EOF } + record := string(buf) for cfgKey := range self.cdrcCfgs { - filterBreak := false - // ToDo: Field filters - if filterBreak { // Stop importing cdrc fields profile due to non matching filter + if passes := self.recordPassesCfgFilter(record, cfgKey); !passes { continue } - if storedCdr, err := self.recordToStoredCdr(string(buf), cfgKey); err != nil { + if storedCdr, err := self.recordToStoredCdr(record, cfgKey); err != nil { return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error()) } else { recordCdrs = append(recordCdrs, storedCdr) @@ -130,6 +129,24 @@ func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error return recordCdrs, nil } +func (self *FwvRecordsProcessor) recordPassesCfgFilter(record, configKey string) bool { + filterPasses := true + for _, rsrFilter := range self.cdrcCfgs[configKey].CdrFilter { + if rsrFilter == nil { // Nil filter does not need to match anything + continue + } + if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { + fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter) + return false + } else if !rsrFilter.FilterPasses(record[cfgFieldIdx:]) { + fmt.Printf("Record content to test: %s\n", record[cfgFieldIdx:]) + filterPasses = false + break + } + } + return filterPasses +} + // Converts a record (header or normal) to StoredCdr func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cfgKey string) (*engine.StoredCdr, error) { var err error diff --git a/cdrc/fwv_test.go b/cdrc/fwv_test.go index 3d4d9cd16..39f7c6a9c 100644 --- a/cdrc/fwv_test.go +++ b/cdrc/fwv_test.go @@ -19,6 +19,8 @@ along with this program. If not, see package cdrc import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" "testing" ) @@ -39,3 +41,15 @@ func TestFwvValue(t *testing.T) { } } + +func TestFwvRecordPassesCfgFilter(t *testing.T) { + //record, configKey string) bool { + cgrConfig, _ := config.NewDefaultCGRConfig() + cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT] // We don't really care that is for .csv since all we want to test are the filters + cdrcConfig.CdrFilter = utils.ParseRSRFieldsMustCompile(`~52:s/^0(\d{9})/+49${1}/(+49123123120)`, utils.INFIELD_SEP) + fwvRp := &FwvRecordsProcessor{cdrcCfgs: cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"]} + cdrLine := "CDR0000010 0 20120708181506000123451234 0040123123120 004 000018009980010001ISDN ABC 10Buiten uw regio EHV 00000009190000000009" + if passesFilter := fwvRp.recordPassesCfgFilter(cdrLine, utils.META_DEFAULT); !passesFilter { + t.Error("Not passes filter") + } +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ac337890c..e02affec2 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -504,7 +504,6 @@ func main() { } server.RpcRegisterName("ScribeV1", scribeServer) } - if cfg.UserServerEnabled { userServer, err = engine.NewUserMap(ratingDb) if err != nil { @@ -577,9 +576,10 @@ func main() { }() wg.Wait() - responder := engine.NewResponder(exitChan, nil, cdrStats, 10*time.Minute, cfg.ResponseCacheTTL) - apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats} - apierRpcV2 := &v2.ApierV2{ApierV1: v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats}} + responder := &engine.Responder{ExitChan: exitChan, Stats: cdrStats} + apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: userServer} + apierRpcV2 := &v2.ApierV2{ + ApierV1: v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: userServer}} if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != utils.INTERNAL { engine.Logger.Info("Registering Rater service") diff --git a/config/config.go b/config/config.go index 9af500d8c..c48fcbcf4 100644 --- a/config/config.go +++ b/config/config.go @@ -566,6 +566,15 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnRaterCfg.Cdrstats != nil { self.RaterCdrStats = *jsnRaterCfg.Cdrstats } + if jsnRaterCfg.Historys != nil { + self.RaterHistoryServer = *jsnRaterCfg.Historys + } + if jsnRaterCfg.Pubsubs != nil { + self.RaterPubSubServer = *jsnRaterCfg.Pubsubs + } + if jsnRaterCfg.Users != nil { + self.RaterUserServer = *jsnRaterCfg.Users + } } if jsnBalancerCfg != nil && jsnBalancerCfg.Enabled != nil { diff --git a/config/config_json_test.go b/config/config_json_test.go index 5cce72811..d5a7749fe 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -122,7 +122,8 @@ func TestDfBalancerJsonCfg(t *testing.T) { } func TestDfRaterJsonCfg(t *testing.T) { - eCfg := &RaterJsonCfg{Enabled: utils.BoolPointer(false), Balancer: utils.StringPointer(""), Cdrstats: utils.StringPointer("")} + eCfg := &RaterJsonCfg{Enabled: utils.BoolPointer(false), Balancer: utils.StringPointer(""), Cdrstats: utils.StringPointer(""), + Historys: utils.StringPointer(""), Pubsubs: utils.StringPointer(""), Users: utils.StringPointer("")} if cfg, err := dfCgrJsonCfg.RaterJsonCfg(); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCfg, cfg) { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index dd8b4db32..2e208f5be 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -62,6 +62,9 @@ type RaterJsonCfg struct { Enabled *bool Balancer *string Cdrstats *string + Historys *string + Pubsubs *string + Users *string } // Scheduler config section diff --git a/data/conf/samples/tutlocal/cgrates.json b/data/conf/samples/tutlocal/cgrates.json index 21b99b52a..d4567a697 100644 --- a/data/conf/samples/tutlocal/cgrates.json +++ b/data/conf/samples/tutlocal/cgrates.json @@ -13,6 +13,7 @@ "rater": { "enabled": true, // enable Rater service: "cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> + "users": "internal", }, "scheduler": { @@ -30,4 +31,8 @@ "enabled": true, // starts the cdrstats service: }, +"users": { + "enabled": true, +}, + } diff --git a/data/storage/mysql/alter_cdr_tables_rc5_rc6.sql b/data/storage/mysql/alter_cdr_tables_rc5_rc6.sql new file mode 100644 index 000000000..8b5694795 --- /dev/null +++ b/data/storage/mysql/alter_cdr_tables_rc5_rc6.sql @@ -0,0 +1,50 @@ +USE `cgrates`; + +ALTER TABLE `cdrs_primary` + CHANGE COLUMN tbid `id` int(11) NOT NULL auto_increment first , + CHANGE `cgrid` `cgrid` char(40) NOT NULL after `id` , + ADD COLUMN `pdd` decimal(12,9) NOT NULL after `setup_time` , + CHANGE `answer_time` `answer_time` datetime NOT NULL after `pdd` , + ADD COLUMN `supplier` varchar(128) NOT NULL after `usage` , + ADD COLUMN `disconnect_cause` varchar(64) NOT NULL after `supplier` , + ADD COLUMN `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP after `disconnect_cause` , + ADD COLUMN `deleted_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `created_at` , + ADD KEY `answer_time_idx`(`answer_time`) , + ADD KEY `deleted_at_idx`(`deleted_at`) , + DROP KEY `PRIMARY`, ADD PRIMARY KEY(`id`) ; + +ALTER TABLE `cdrs_extra` + CHANGE COLUMN tbid `id` int(11) NOT NULL auto_increment first , + CHANGE `cgrid` `cgrid` char(40) NOT NULL after `id` , + ADD COLUMN `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP after `extra_fields` , + ADD COLUMN `deleted_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `created_at`, + ADD UNIQUE KEY `cgrid`(`cgrid`) , + ADD KEY `deleted_at_idx`(`deleted_at`) , + DROP KEY `PRIMARY`, ADD PRIMARY KEY(`id`) ; + +ALTER TABLE `cost_details` + CHANGE COLUMN tbid `id` int(11) NOT NULL auto_increment first , + CHANGE `cost_source` `cost_source` varchar(64) NOT NULL after `timespans` , + ADD COLUMN `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP after `cost_source` , + ADD COLUMN `updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `created_at` , + ADD COLUMN `deleted_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `updated_at` , + DROP COLUMN `cost_time` , + ADD KEY `deleted_at_idx`(`deleted_at`) , + DROP KEY `PRIMARY`, ADD PRIMARY KEY(`id`) ; + +ALTER TABLE `rated_cdrs` + CHANGE COLUMN tbid `id` int(11) NOT NULL auto_increment first , + CHANGE `cgrid` `cgrid` char(40) NOT NULL after `id` , + CHANGE `category` `category` varchar(32) NOT NULL after `tenant` , + ADD COLUMN `pdd` decimal(12,9) NOT NULL after `setup_time` , + CHANGE `answer_time` `answer_time` datetime NOT NULL after `pdd` , + ADD COLUMN `supplier` varchar(128) NOT NULL after `usage` , + ADD COLUMN `disconnect_cause` varchar(64) NOT NULL after `supplier` , + CHANGE `cost` `cost` decimal(20,4) NULL after `disconnect_cause` , + ADD COLUMN `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP after `extra_info` , + ADD COLUMN `updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `created_at` , + ADD COLUMN `deleted_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `updated_at` , + DROP COLUMN `mediation_time` , + ADD KEY `deleted_at_idx`(`deleted_at`) , + DROP KEY `PRIMARY`, ADD PRIMARY KEY(`id`) ; + diff --git a/data/storage/mysql/alter_tables_rc4_rc5.sql b/data/storage/mysql/alter_tables_rc4_rc5.sql deleted file mode 100644 index 1a54a26f1..000000000 --- a/data/storage/mysql/alter_tables_rc4_rc5.sql +++ /dev/null @@ -1,48 +0,0 @@ -ALTER TABLE cdrs_primary - CHANGE COLUMN tor category varchar(16) NOT NULL, - CHANGE COLUMN duration `usage` bigint(20) NOT NULL, - ADD COLUMN tor varchar(16) NOT NULL AFTER cgrid; - -UPDATE cdrs_primary SET tor="*voice"; - -ALTER TABLE cost_details - DROP COLUMN accid, - MODIFY COLUMN cost_time datetime NOT NULL AFTER tbid, - CHANGE COLUMN `source` cost_source varchar(64) NOT NULL AFTER cost_time, - MODIFY COLUMN runid varchar(64) NOT NULL AFTER cgrid, - CHANGE COLUMN tor category varchar(32) NOT NULL AFTER tenant, - ADD COLUMN tor varchar(16) NOT NULL after runid, - MODIFY COLUMN direction varchar(8) NOT NULL AFTER tor; - -UPDATE cost_details SET tor="*voice"; - -ALTER TABLE rated_cdrs - MODIFY COLUMN mediation_time datetime NOT NULL AFTER tbid, - MODIFY COLUMN subject varchar(128) NOT NULL, - ADD COLUMN reqtype varchar(24) NOT NULL AFTER runid, - ADD COLUMN direction varchar(8) NOT NULL AFTER reqtype, - ADD COLUMN tenant varchar(64) NOT NULL AFTER direction, - ADD COLUMN category varchar(16) NOT NULL AFTER tenant, - ADD COLUMN account varchar(128) NOT NULL AFTER category, - ADD COLUMN destination varchar(128) NOT NULL AFTER subject, - ADD COLUMN setup_time datetime NOT NULL AFTER destination, - ADD COLUMN answer_time datetime NOT NULL AFTER setup_time, - ADD COLUMN `usage` bigint(20) NOT NULL AFTER answer_time; - -ALTER TABLE tp_rates - DROP COLUMN rounding_method, - DROP COLUMN rounding_decimals; - -ALTER TABLE tp_destination_rates - ADD COLUMN rounding_method varchar(255) NOT NULL, - ADD COLUMN rounding_decimals tinyint(4) NOT NULL; - -ALTER TABLE tp_rating_profiles - DROP KEY tpid_loadid_tenant_tor_dir_subj_atime, - CHANGE COLUMN tor category varchar(16) NOT NULL, - ADD UNIQUE KEY `tpid_loadid_tenant_category_dir_subj_atime` (`tpid`,`loadid`,`tenant`,`category`,`direction`,`subject`,`activation_time`); - - - - - diff --git a/data/storage/mysql/alter_tables_rc5_rc6.sql b/data/storage/mysql/alter_tables_rc5_rc6.sql deleted file mode 100644 index 1b315f21c..000000000 --- a/data/storage/mysql/alter_tables_rc5_rc6.sql +++ /dev/null @@ -1,405 +0,0 @@ -USE `cgrates`; - -ALTER TABLE `cdrs_primary` - CHANGE COLUMN tbid `id` int(11) NOT NULL auto_increment first , - CHANGE `cgrid` `cgrid` char(40) NOT NULL after `id` , - ADD COLUMN `pdd` decimal(12,9) NOT NULL after `setup_time` , - CHANGE `answer_time` `answer_time` datetime NOT NULL after `pdd` , - ADD COLUMN `supplier` varchar(128) NOT NULL after `usage` , - ADD COLUMN `disconnect_cause` varchar(64) NOT NULL after `supplier` , - ADD COLUMN `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP after `disconnect_cause` , - ADD COLUMN `deleted_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `created_at` , - ADD KEY `answer_time_idx`(`answer_time`) , - ADD KEY `deleted_at_idx`(`deleted_at`) , - DROP KEY `PRIMARY`, ADD PRIMARY KEY(`id`) ; - -ALTER TABLE `cdrs_extra` - CHANGE COLUMN tbid `id` int(11) NOT NULL auto_increment first , - CHANGE `cgrid` `cgrid` char(40) NOT NULL after `id` , - ADD COLUMN `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP after `extra_fields` , - ADD COLUMN `deleted_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `created_at`, - ADD UNIQUE KEY `cgrid`(`cgrid`) , - ADD KEY `deleted_at_idx`(`deleted_at`) , - DROP KEY `PRIMARY`, ADD PRIMARY KEY(`id`) ; - -ALTER TABLE `cost_details` - CHANGE COLUMN tbid `id` int(11) NOT NULL auto_increment first , - CHANGE `cost_source` `cost_source` varchar(64) NOT NULL after `timespans` , - ADD COLUMN `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP after `cost_source` , - ADD COLUMN `updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `created_at` , - ADD COLUMN `deleted_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `updated_at` , - DROP COLUMN `cost_time` , - ADD KEY `deleted_at_idx`(`deleted_at`) , - DROP KEY `PRIMARY`, ADD PRIMARY KEY(`id`) ; - -ALTER TABLE `rated_cdrs` - CHANGE COLUMN tbid `id` int(11) NOT NULL auto_increment first , - CHANGE `cgrid` `cgrid` char(40) NOT NULL after `id` , - CHANGE `category` `category` varchar(32) NOT NULL after `tenant` , - ADD COLUMN `pdd` decimal(12,9) NOT NULL after `setup_time` , - CHANGE `answer_time` `answer_time` datetime NOT NULL after `pdd` , - ADD COLUMN `supplier` varchar(128) NOT NULL after `usage` , - ADD COLUMN `disconnect_cause` varchar(64) NOT NULL after `supplier` , - CHANGE `cost` `cost` decimal(20,4) NULL after `disconnect_cause` , - ADD COLUMN `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP after `extra_info` , - ADD COLUMN `updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `created_at` , - ADD COLUMN `deleted_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' after `updated_at` , - DROP COLUMN `mediation_time` , - ADD KEY `deleted_at_idx`(`deleted_at`) , - DROP KEY `PRIMARY`, ADD PRIMARY KEY(`id`) ; - - --- --- Table structure for table `tp_timings` --- -DROP TABLE IF EXISTS `tp_timings`; -CREATE TABLE `tp_timings` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `years` varchar(255) NOT NULL, - `months` varchar(255) NOT NULL, - `month_days` varchar(255) NOT NULL, - `week_days` varchar(255) NOT NULL, - `time` varchar(32) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - KEY `tpid_tmid` (`tpid`,`tag`), - UNIQUE KEY `tpid_tag` (`tpid`,`tag`) -); - --- --- Table structure for table `tp_destinations` --- - -DROP TABLE IF EXISTS `tp_destinations`; -CREATE TABLE `tp_destinations` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `prefix` varchar(24) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - KEY `tpid_dstid` (`tpid`,`tag`), - UNIQUE KEY `tpid_dest_prefix` (`tpid`,`tag`,`prefix`) -); - --- --- Table structure for table `tp_rates` --- - -DROP TABLE IF EXISTS `tp_rates`; -CREATE TABLE `tp_rates` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `connect_fee` decimal(7,4) NOT NULL, - `rate` decimal(7,4) NOT NULL, - `rate_unit` varchar(16) NOT NULL, - `rate_increment` varchar(16) NOT NULL, - `group_interval_start` varchar(16) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - UNIQUE KEY `unique_tprate` (`tpid`,`tag`,`group_interval_start`), - KEY `tpid` (`tpid`), - KEY `tpid_rtid` (`tpid`,`tag`) -); - --- --- Table structure for table `destination_rates` --- - -DROP TABLE IF EXISTS `tp_destination_rates`; -CREATE TABLE `tp_destination_rates` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `destinations_tag` varchar(64) NOT NULL, - `rates_tag` varchar(64) NOT NULL, - `rounding_method` varchar(255) NOT NULL, - `rounding_decimals` tinyint(4) NOT NULL, - `max_cost` decimal(7,4) NOT NULL, - `max_cost_strategy` varchar(16) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - KEY `tpid_drid` (`tpid`,`tag`), - UNIQUE KEY `tpid_drid_dstid` (`tpid`,`tag`,`destinations_tag`) -); - --- --- Table structure for table `tp_rating_plans` --- - -DROP TABLE IF EXISTS `tp_rating_plans`; -CREATE TABLE `tp_rating_plans` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `destrates_tag` varchar(64) NOT NULL, - `timing_tag` varchar(64) NOT NULL, - `weight` DECIMAL(8,2) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - KEY `tpid_rpl` (`tpid`,`tag`), - UNIQUE KEY `tpid_rplid_destrates_timings_weight` (`tpid`,`tag`,`destrates_tag`,`timing_tag`) -); - --- --- Table structure for table `tp_rate_profiles` --- - -DROP TABLE IF EXISTS `tp_rating_profiles`; -CREATE TABLE `tp_rating_profiles` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `loadid` varchar(64) NOT NULL, - `direction` varchar(8) NOT NULL, - `tenant` varchar(64) NOT NULL, - `category` varchar(32) NOT NULL, - `subject` varchar(64) NOT NULL, - `activation_time` varchar(24) NOT NULL, - `rating_plan_tag` varchar(64) NOT NULL, - `fallback_subjects` varchar(64), - `cdr_stat_queue_ids` varchar(64), - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - KEY `tpid_loadid` (`tpid`, `loadid`), - UNIQUE KEY `tpid_loadid_tenant_category_dir_subj_atime` (`tpid`,`loadid`, `tenant`,`category`,`direction`,`subject`,`activation_time`) -); - --- --- Table structure for table `tp_shared_groups` --- - -DROP TABLE IF EXISTS `tp_shared_groups`; -CREATE TABLE `tp_shared_groups` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `account` varchar(24) NOT NULL, - `strategy` varchar(24) NOT NULL, - `rating_subject` varchar(24) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - UNIQUE KEY `unique_shared_group` (`tpid`,`tag`,`account`,`strategy`,`rating_subject`) -); - --- --- Table structure for table `tp_actions` --- - -DROP TABLE IF EXISTS `tp_actions`; -CREATE TABLE `tp_actions` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `action` varchar(24) NOT NULL, - `balance_tag` varchar(64) NOT NULL, - `balance_type` varchar(24) NOT NULL, - `direction` varchar(8) NOT NULL, - `units` DECIMAL(20,4) NOT NULL, - `expiry_time` varchar(24) NOT NULL, - `timing_tags` varchar(128) NOT NULL, - `destination_tags` varchar(64) NOT NULL, - `rating_subject` varchar(64) NOT NULL, - `category` varchar(32) NOT NULL, - `shared_group` varchar(64) NOT NULL, - `balance_weight` DECIMAL(8,2) NOT NULL, - `extra_parameters` varchar(256) NOT NULL, - `weight` DECIMAL(8,2) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - UNIQUE KEY `unique_action` (`tpid`,`tag`,`action`,`balance_tag`,`balance_type`,`direction`,`expiry_time`,`timing_tags`,`destination_tags`,`shared_group`,`balance_weight`,`weight`) -); - --- --- Table structure for table `tp_action_timings` --- - -DROP TABLE IF EXISTS `tp_action_plans`; -CREATE TABLE `tp_action_plans` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `actions_tag` varchar(64) NOT NULL, - `timing_tag` varchar(64) NOT NULL, - `weight` DECIMAL(8,2) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - UNIQUE KEY `unique_action_schedule` (`tpid`,`tag`,`actions_tag`) -); - --- --- Table structure for table `tp_action_triggers` --- - -DROP TABLE IF EXISTS `tp_action_triggers`; -CREATE TABLE `tp_action_triggers` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `unique_id` varchar(64) NOT NULL, - `balance_tag` varchar(64) NOT NULL, - `balance_type` varchar(24) NOT NULL, - `balance_direction` varchar(8) NOT NULL, - `threshold_type` char(12) NOT NULL, - `threshold_value` DECIMAL(20,4) NOT NULL, - `recurrent` BOOLEAN NOT NULL, - `min_sleep` varchar(16) NOT NULL, - `balance_destination_tags` varchar(64) NOT NULL, - `balance_weight` DECIMAL(8,2) NOT NULL, - `balance_expiry_time` varchar(24) NOT NULL, - `balance_timing_tags` varchar(128) NOT NULL, - `balance_rating_subject` varchar(64) NOT NULL, - `balance_category` varchar(32) NOT NULL, - `balance_shared_group` varchar(64) NOT NULL, - `min_queued_items` int(11) NOT NULL, - `actions_tag` varchar(64) NOT NULL, - `weight` DECIMAL(8,2) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - UNIQUE KEY `unique_trigger_definition` (`tpid`,`tag`,`balance_tag`,`balance_type`,`balance_direction`,`threshold_type`,`threshold_value`,`balance_destination_tags`,`actions_tag`) -); - --- --- Table structure for table `tp_account_actions` --- - -DROP TABLE IF EXISTS `tp_account_actions`; -CREATE TABLE `tp_account_actions` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `loadid` varchar(64) NOT NULL, - `tenant` varchar(64) NOT NULL, - `account` varchar(64) NOT NULL, - `direction` varchar(8) NOT NULL, - `action_plan_tag` varchar(64), - `action_triggers_tag` varchar(64), - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`), - UNIQUE KEY `unique_tp_account` (`tpid`,`loadid`,`tenant`,`account`,`direction`) -); - --- --- Table structure for table `tp_lcr_rules` --- - -DROP TABLE IF EXISTS tp_lcr_rules; -CREATE TABLE tp_lcr_rules ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `direction` varchar(8) NOT NULL, - `tenant` varchar(64) NOT NULL, - `category` varchar(32) NOT NULL, - `account` varchar(24) NOT NULL, - `subject` varchar(64) NOT NULL, - `destination_tag` varchar(64) NOT NULL, - `rp_category` varchar(32) NOT NULL, - `strategy` varchar(16) NOT NULL, - `strategy_params` varchar(256) NOT NULL, - `activation_time` varchar(24) NOT NULL, - `weight` DECIMAL(8,2) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`) -); - --- --- Table structure for table `tp_derived_chargers` --- - -DROP TABLE IF EXISTS tp_derived_chargers; -CREATE TABLE tp_derived_chargers ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `loadid` varchar(64) NOT NULL, - `direction` varchar(8) NOT NULL, - `tenant` varchar(64) NOT NULL, - `category` varchar(32) NOT NULL, - `account` varchar(24) NOT NULL, - `subject` varchar(64) NOT NULL, - `runid` varchar(24) NOT NULL, - `run_filters` varchar(256) NOT NULL, - `req_type_field` varchar(24) NOT NULL, - `direction_field` varchar(24) NOT NULL, - `tenant_field` varchar(24) NOT NULL, - `category_field` varchar(24) NOT NULL, - `account_field` varchar(24) NOT NULL, - `subject_field` varchar(24) NOT NULL, - `destination_field` varchar(24) NOT NULL, - `setup_time_field` varchar(24) NOT NULL, - `pdd_field` varchar(24) NOT NULL, - `answer_time_field` varchar(24) NOT NULL, - `usage_field` varchar(24) NOT NULL, - `supplier_field` varchar(24) NOT NULL, - `disconnect_cause_field` varchar(24) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`) -); - - --- --- Table structure for table `tp_cdr_stats` --- - -DROP TABLE IF EXISTS tp_cdr_stats; -CREATE TABLE tp_cdr_stats ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tag` varchar(64) NOT NULL, - `queue_length` int(11) NOT NULL, - `time_window` varchar(8) NOT NULL, - `save_interval` varchar(8) NOT NULL, - `metrics` varchar(64) NOT NULL, - `setup_interval` varchar(64) NOT NULL, - `tors` varchar(64) NOT NULL, - `cdr_hosts` varchar(64) NOT NULL, - `cdr_sources` varchar(64) NOT NULL, - `req_types` varchar(64) NOT NULL, - `directions` varchar(8) NOT NULL, - `tenants` varchar(64) NOT NULL, - `categories` varchar(32) NOT NULL, - `accounts` varchar(24) NOT NULL, - `subjects` varchar(64) NOT NULL, - `destination_prefixes` varchar(64) NOT NULL, - `pdd_interval` varchar(64) NOT NULL, - `usage_interval` varchar(64) NOT NULL, - `suppliers` varchar(64) NOT NULL, - `disconnect_causes` varchar(64) NOT NULL, - `mediation_runids` varchar(64) NOT NULL, - `rated_accounts` varchar(64) NOT NULL, - `rated_subjects` varchar(64) NOT NULL, - `cost_interval` varchar(24) NOT NULL, - `action_triggers` varchar(64) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`) -); - - -- --- Table structure for table `tp_users` --- - -DROP TABLE IF EXISTS tp_users; -CREATE TABLE tp_users ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `tpid` varchar(64) NOT NULL, - `tenant` varchar(64) NOT NULL, - `user_name` varchar(64) NOT NULL, - `attribute_name` varchar(64) NOT NULL, - `attribute_value` varchar(64) NOT NULL, - `created_at` TIMESTAMP, - PRIMARY KEY (`id`), - KEY `tpid` (`tpid`) -); \ No newline at end of file diff --git a/data/tariffplans/tutorial/Users.csv b/data/tariffplans/tutorial/Users.csv index df4057753..75936aa6a 100644 --- a/data/tariffplans/tutorial/Users.csv +++ b/data/tariffplans/tutorial/Users.csv @@ -1,4 +1,6 @@ #Tenant[0],UserName[1],AttributeName[2],AttributeValue[3] -cgrates.org,1001,test0,val0 -cgrates.org,1001,test1,val1 -cgrates.org,1002,another,value +cgrates.org,1001,SysUserName,danb +cgrates.org,1001,SysPassword,hisPass321 +cgrates.org,1001,Cli,+4986517174963 +cgrates.org,1002,SysUserName,rif +cgrates.org,1002,RifAttr,RifVal diff --git a/docs/conf.py b/docs/conf.py index 4ba2ebf99..6ce2b9cdf 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -50,7 +50,7 @@ copyright = u'2012-2015, ITsysCOM' # The short X.Y version. version = '0.9.1' # The full version, including alpha/beta/rc tags. -release = '0.9.1rc4' +release = '0.9.1~rc6' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/engine/calldesc.go b/engine/calldesc.go index e2739d44e..c3725f183 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -132,7 +132,8 @@ type CallDescriptor struct { FallbackSubject string // the subject to check for destination if not found on primary subject RatingInfos RatingInfos Increments Increments - TOR string // used unit balances selector + TOR string // used unit balances selector + ExtraFields map[string]string // Extra fields, mostly used for user profile matching // session limits MaxRate float64 MaxRateUnit time.Duration @@ -795,6 +796,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { if lcrCost.Entry == nil { return lcrCost, nil } + //log.Printf("Entry: %+v", lcrCost.Entry) if lcrCost.Entry.Strategy == LCR_STRATEGY_STATIC { for _, supplier := range lcrCost.Entry.GetParams() { @@ -865,7 +867,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { accNeverConsidered := true tccNeverConsidered := true ddcNeverConsidered := true - if utils.IsSliceMember([]string{LCR_STRATEGY_QOS, LCR_STRATEGY_QOS_THRESHOLD}, lcrCost.Entry.Strategy) { + if utils.IsSliceMember([]string{LCR_STRATEGY_QOS, LCR_STRATEGY_QOS_THRESHOLD, LCR_STRATEGY_LOAD}, lcrCost.Entry.Strategy) { if stats == nil { lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ Supplier: supplier, @@ -891,60 +893,84 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { } } } + statsErr := false + var supplierQueues []*StatsQueue for _, qId := range cdrStatsQueueIds { - statValues := make(map[string]float64) - if err := stats.GetValues(qId, &statValues); err != nil { - lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ - Supplier: supplier, - Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()), - }) - statsErr = true - break - } - if asr, exists := statValues[ASR]; exists { - if asr > STATS_NA { - asrValues = append(asrValues, asr) + if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD { + for _, qId := range cdrStatsQueueIds { + sq := &StatsQueue{} + if err := stats.GetQueue(qId, sq); err == nil { + if sq.conf.QueueLength == 0 { //only add qeues that don't have fixed length + supplierQueues = append(supplierQueues, sq) + } + } } - asrNeverConsidered = false - } - if pdd, exists := statValues[PDD]; exists { - if pdd > STATS_NA { - pddValues = append(pddValues, pdd) + } else { + statValues := make(map[string]float64) + if err := stats.GetValues(qId, &statValues); err != nil { + lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ + Supplier: supplier, + Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()), + }) + statsErr = true + break } - pddNeverConsidered = false - } - if acd, exists := statValues[ACD]; exists { - if acd > STATS_NA { - acdValues = append(acdValues, acd) + if asr, exists := statValues[ASR]; exists { + if asr > STATS_NA { + asrValues = append(asrValues, asr) + } + asrNeverConsidered = false } - acdNeverConsidered = false - } - if tcd, exists := statValues[TCD]; exists { - if tcd > STATS_NA { - tcdValues = append(tcdValues, tcd) + if pdd, exists := statValues[PDD]; exists { + if pdd > STATS_NA { + pddValues = append(pddValues, pdd) + } + pddNeverConsidered = false } - tcdNeverConsidered = false - } - if acc, exists := statValues[ACC]; exists { - if acc > STATS_NA { - accValues = append(accValues, acc) + if acd, exists := statValues[ACD]; exists { + if acd > STATS_NA { + acdValues = append(acdValues, acd) + } + acdNeverConsidered = false } - accNeverConsidered = false - } - if tcc, exists := statValues[TCC]; exists { - if tcc > STATS_NA { - tccValues = append(tccValues, tcc) + if tcd, exists := statValues[TCD]; exists { + if tcd > STATS_NA { + tcdValues = append(tcdValues, tcd) + } + tcdNeverConsidered = false } - tccNeverConsidered = false - } - if ddc, exists := statValues[TCC]; exists { - if ddc > STATS_NA { - ddcValues = append(ddcValues, ddc) + if acc, exists := statValues[ACC]; exists { + if acc > STATS_NA { + accValues = append(accValues, acc) + } + accNeverConsidered = false } - ddcNeverConsidered = false + if tcc, exists := statValues[TCC]; exists { + if tcc > STATS_NA { + tccValues = append(tccValues, tcc) + } + tccNeverConsidered = false + } + if ddc, exists := statValues[TCC]; exists { + if ddc > STATS_NA { + ddcValues = append(ddcValues, ddc) + } + ddcNeverConsidered = false + } + } } + if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD { + if len(supplierQueues) > 0 { + lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ + Supplier: supplier, + supplierQueues: supplierQueues, + }) + } + continue // next supplier + } + if statsErr { // Stats error in loop, to go next supplier continue } diff --git a/engine/cdrs.go b/engine/cdrs.go index 5cbd7e63a..5a0ad538e 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -147,6 +147,12 @@ func (self *CdrServer) RateCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqT // Returns error if not able to properly store the CDR, mediation is async since we can always recover offline func (self *CdrServer) processCdr(storedCdr *StoredCdr) (err error) { + if upData, err := LoadUserProfile(storedCdr, "ExtraFields"); err != nil { + return err + } else { + cdrRcv := upData.(*StoredCdr) + *storedCdr = *cdrRcv + } if storedCdr.ReqType == utils.META_NONE { return nil } diff --git a/engine/lcr.go b/engine/lcr.go index ba78df927..543af6e9e 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -20,6 +20,7 @@ package engine import ( "fmt" + "math/rand" "sort" "strconv" "strings" @@ -36,6 +37,13 @@ const ( LCR_STRATEGY_HIGHEST = "*highest_cost" LCR_STRATEGY_QOS_THRESHOLD = "*qos_threshold" LCR_STRATEGY_QOS = "*qos" + LCR_STRATEGY_LOAD = "*load_distribution" + + // used for load distribution sorting + RAND_LIMIT = 99 + LOW_PRIORITY_LIMIT = 100 + MED_PRIORITY_LIMIT = 200 + HIGH_PRIORITY_LIMIT = 300 ) // A request for LCR, used in APIer and SM where we need to expose it @@ -134,12 +142,13 @@ type LCRCost struct { } type LCRSupplierCost struct { - Supplier string - Cost float64 - Duration time.Duration - Error string // Not error due to JSON automatic serialization into struct - QOS map[string]float64 - qosSortParams []string + Supplier string + Cost float64 + Duration time.Duration + Error string // Not error due to JSON automatic serialization into struct + QOS map[string]float64 + qosSortParams []string + supplierQueues []*StatsQueue // used for load distribution } func (lcr *LCR) GetId() string { @@ -291,11 +300,101 @@ func (lc *LCRCost) Sort() { sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts)) case LCR_STRATEGY_QOS: sort.Sort(QOSSorter(lc.SupplierCosts)) + case LCR_STRATEGY_LOAD: + lc.SortLoadDistribution() + sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts)) } } +func (lc *LCRCost) SortLoadDistribution() { + // find the time window that is common to all qeues + scoreBoard := make(map[time.Duration]int) // register TimeWindow across suppliers + + var winnerTimeWindow time.Duration + maxScore := 0 + for _, supCost := range lc.SupplierCosts { + timeWindowFlag := make(map[time.Duration]bool) // flags appearance in same supplier + for _, sq := range supCost.supplierQueues { + if !timeWindowFlag[sq.conf.TimeWindow] { + timeWindowFlag[sq.conf.TimeWindow] = true + scoreBoard[sq.conf.TimeWindow]++ + } + if scoreBoard[sq.conf.TimeWindow] > maxScore { + maxScore = scoreBoard[sq.conf.TimeWindow] + winnerTimeWindow = sq.conf.TimeWindow + } + } + } + supplierQueues := make(map[*LCRSupplierCost]*StatsQueue) + for _, supCost := range lc.SupplierCosts { + for _, sq := range supCost.supplierQueues { + if sq.conf.TimeWindow == winnerTimeWindow { + supplierQueues[supCost] = sq + break + } + } + } + /*for supplier, sq := range supplierQueues { + log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow) + }*/ + // if all have less than ponder return random order + // if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first + // if all have a multiple of ponder return in the order of cdr times, oldest first + + // first put them in one of the above categories + for supCost, sq := range supplierQueues { + ponder := lc.GetSupplierPonder(supCost.Supplier) + cdrCount := len(sq.Cdrs) + if cdrCount < ponder { + supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT)) + continue + } + if cdrCount%ponder == 0 { + supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) + continue + } else { + supCost.Cost = float64(HIGH_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) + continue + } + } +} + +// used in load distribution strategy only +// receives a long supplier id and will return the ponder found in strategy params +func (lc *LCRCost) GetSupplierPonder(supplier string) int { + // parse strategy params + ponders := make(map[string]int) + params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP) + for _, param := range params { + ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) + if len(ponderSlice) != 2 { + Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) + continue + } + p, err := strconv.Atoi(ponderSlice[1]) + if err != nil { + Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) + continue + } + ponders[ponderSlice[0]] = p + } + parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP) + if len(parts) > 0 { + supplierSubject := parts[len(parts)-1] + if ponder, found := ponders[supplierSubject]; found { + return ponder + } + if ponder, found := ponders[utils.META_DEFAULT]; found { + return ponder + } + } + + return 1 +} + func (lc *LCRCost) HasErrors() bool { for _, supplCost := range lc.SupplierCosts { + if len(supplCost.Error) != 0 { return true } diff --git a/engine/lcr_test.go b/engine/lcr_test.go index 0364c4652..f8d12133d 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -278,3 +278,301 @@ func TestLCRCostSuppliersString(t *testing.T) { t.Errorf("Expecting: %s, received: %s", eSupplStr, supplStr) } } + +func TestLCRCostSuppliersLoad(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3;*default:7", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:dan12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:3;dan12:5;*default:2", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" || + lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllOver(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:2;dan12:4;*default:2", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" || + lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} diff --git a/engine/responder.go b/engine/responder.go index cd7da4e29..7cb1c8ebe 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -73,6 +73,12 @@ func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) { if arg.Subject == "" { arg.Subject = arg.Account } + if upData, err := LoadUserProfile(arg, "ExtraFields"); err != nil { + return err + } else { + udRcv := upData.(*CallDescriptor) + *arg = *udRcv + } if rs.Bal != nil { r, e := rs.getCallCost(arg, "Responder.GetCost") *reply, err = *r, e @@ -93,6 +99,12 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) { if arg.Subject == "" { arg.Subject = arg.Account } + if upData, err := LoadUserProfile(arg, "ExtraFields"); err != nil { + return err + } else { + udRcv := upData.(*CallDescriptor) + *arg = *udRcv + } if rs.Bal != nil { r, e := rs.getCallCost(arg, "Responder.Debit") *reply, err = *r, e @@ -115,6 +127,12 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) if arg.Subject == "" { arg.Subject = arg.Account } + if upData, err := LoadUserProfile(arg, "ExtraFields"); err != nil { + return err + } else { + udRcv := upData.(*CallDescriptor) + *arg = *udRcv + } if rs.Bal != nil { r, e := rs.getCallCost(arg, "Responder.MaxDebit") *reply, err = *r, e @@ -144,6 +162,12 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err if arg.Subject == "" { arg.Subject = arg.Account } + if upData, err := LoadUserProfile(arg, "ExtraFields"); err != nil { + return err + } else { + udRcv := upData.(*CallDescriptor) + *arg = *udRcv + } if rs.Bal != nil { *reply, err = rs.callMethod(arg, "Responder.RefundIncrements") } else { @@ -163,6 +187,12 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err if arg.Subject == "" { arg.Subject = arg.Account } + if upData, err := LoadUserProfile(arg, "ExtraFields"); err != nil { + return err + } else { + udRcv := upData.(*CallDescriptor) + *arg = *udRcv + } if rs.Bal != nil { *reply, err = rs.callMethod(arg, "Responder.GetMaxSessionTime") } else { @@ -174,11 +204,17 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err // Returns MaxSessionTime for an event received in SessionManager, considering DerivedCharging for it func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error { + if rs.Bal != nil { + return errors.New("unsupported method on the balancer") + } if ev.Subject == "" { ev.Subject = ev.Account } - if rs.Bal != nil { - return errors.New("unsupported method on the balancer") + if upData, err := LoadUserProfile(ev, "ExtraFields"); err != nil { + return err + } else { + udRcv := upData.(*StoredCdr) + *ev = *udRcv } maxCallDuration := -1.0 attrsDC := &utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), @@ -243,15 +279,17 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err // Used by SM to get all the prepaid CallDescriptors attached to a session func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { - if item, err := rs.getCache().Get(utils.GET_SESS_RUNS_CACHE_PREFIX + ev.CgrId); err == nil && item != nil { - *sRuns = *(item.Value.(*[]*SessionRun)) - return item.Err + if rs.Bal != nil { + return errors.New("Unsupported method on the balancer") } if ev.Subject == "" { ev.Subject = ev.Account } - if rs.Bal != nil { - return errors.New("Unsupported method on the balancer") + if upData, err := LoadUserProfile(ev, "ExtraFields"); err != nil { + return err + } else { + udRcv := upData.(*StoredCdr) + *ev = *udRcv } attrsDC := &utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} @@ -308,6 +346,12 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error { if rs.CdrSrv == nil { return errors.New("CDR_SERVER_NOT_RUNNING") } + if upData, err := LoadUserProfile(cdr, "ExtraFields"); err != nil { + return err + } else { + udRcv := upData.(*StoredCdr) + *cdr = *udRcv + } if err := rs.CdrSrv.ProcessCdr(cdr); err != nil { return err } @@ -344,6 +388,12 @@ func (rs *Responder) GetLCR(cd *CallDescriptor, reply *LCRCost) error { if cd.Subject == "" { cd.Subject = cd.Account } + if upData, err := LoadUserProfile(cd, "ExtraFields"); err != nil { + return err + } else { + udRcv := upData.(*CallDescriptor) + *cd = *udRcv + } lcrCost, err := cd.GetLCR(rs.Stats) if err != nil { return err diff --git a/engine/responder_test.go b/engine/responder_test.go index c0ad5ecd4..68d5054a7 100644 --- a/engine/responder_test.go +++ b/engine/responder_test.go @@ -354,7 +354,16 @@ func TestGetLCR(t *testing.T) { }, }, } - for _, lcr := range []*LCR{lcrStatic, lcrLowestCost, lcrQosThreshold, lcrQos} { + lcrLoad := &LCR{Direction: utils.OUT, Tenant: "tenant12", Category: "call_load", Account: utils.ANY, Subject: utils.ANY, + Activations: []*LCRActivation{ + &LCRActivation{ + ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC), + Entries: []*LCREntry{ + &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3", Weight: 10.0}}, + }, + }, + } + for _, lcr := range []*LCR{lcrStatic, lcrLowestCost, lcrQosThreshold, lcrQos, lcrLoad} { if err := ratingStorage.SetLCR(lcr); err != nil { t.Error(err) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 20075cb5c..2900c645d 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -23,6 +23,7 @@ import ( "compress/zlib" "errors" "fmt" + "log" "strings" "github.com/cgrates/cgrates/cache2go" @@ -430,10 +431,10 @@ func (rs *RedisStorage) RemoveRpAliases(tenantRtSubjects []*TenantRatingSubject, if tntRSubj.Subject != alias { continue } - cache2go.RemKey(key) if _, err = rs.db.Del(key); err != nil { return err } + cache2go.RemKey(key) break } } @@ -553,10 +554,12 @@ func (rs *RedisStorage) RemoveAccAliases(tenantAccounts []*TenantAccount, skipCa if tntAcnt.Account != alias { continue } - cache2go.RemKey(key) if _, err = rs.db.Del(key); err != nil { + log.Print("") return err } + cache2go.RemKey(key) + break } } @@ -580,6 +583,7 @@ func (rs *RedisStorage) RemoveAccAliases(tenantAccounts []*TenantAccount, skipCa return err } cache2go.RemKey(utils.ACC_ALIAS_PREFIX + key) + break } } } diff --git a/engine/storedcdr.go b/engine/storedcdr.go index 700d56295..a7968dd9b 100644 --- a/engine/storedcdr.go +++ b/engine/storedcdr.go @@ -33,7 +33,7 @@ func NewStoredCdrFromExternalCdr(extCdr *ExternalCdr) (*StoredCdr, error) { var err error storedCdr := &StoredCdr{CgrId: extCdr.CgrId, OrderId: extCdr.OrderId, TOR: extCdr.TOR, AccId: extCdr.AccId, CdrHost: extCdr.CdrHost, CdrSource: extCdr.CdrSource, ReqType: extCdr.ReqType, Direction: extCdr.Direction, Tenant: extCdr.Tenant, Category: extCdr.Category, Account: extCdr.Account, Subject: extCdr.Subject, - Destination: extCdr.Destination, Supplier: extCdr.Supplier, DisconnectCause: extCdr.DisconnectCause, ExtraFields: extCdr.ExtraFields, + Destination: extCdr.Destination, Supplier: extCdr.Supplier, DisconnectCause: extCdr.DisconnectCause, MediationRunId: extCdr.MediationRunId, RatedAccount: extCdr.RatedAccount, RatedSubject: extCdr.RatedSubject, Cost: extCdr.Cost, Rated: extCdr.Rated} if storedCdr.SetupTime, err = utils.ParseTimeDetectLayout(extCdr.SetupTime); err != nil { return nil, err @@ -55,6 +55,12 @@ func NewStoredCdrFromExternalCdr(extCdr *ExternalCdr) (*StoredCdr, error) { return nil, err } } + if extCdr.ExtraFields != nil { + storedCdr.ExtraFields = make(map[string]string) + } + for k, v := range extCdr.ExtraFields { + storedCdr.ExtraFields[k] = v + } return storedCdr, nil } @@ -651,6 +657,7 @@ type UsageRecord struct { SetupTime string AnswerTime string Usage string + ExtraFields map[string]string } func (self *UsageRecord) AsStoredCdr() (*StoredCdr, error) { @@ -666,24 +673,18 @@ func (self *UsageRecord) AsStoredCdr() (*StoredCdr, error) { if storedCdr.Usage, err = utils.ParseDurationWithSecs(self.Usage); err != nil { return nil, err } + if self.ExtraFields != nil { + storedCdr.ExtraFields = make(map[string]string) + } + for k, v := range self.ExtraFields { + storedCdr.ExtraFields[k] = v + } return storedCdr, nil } func (self *UsageRecord) AsCallDescriptor() (*CallDescriptor, error) { var err error - timeStr := self.AnswerTime - if len(timeStr) == 0 { // In case of auth, answer time will not be defined, so take it out of setup one - timeStr = self.SetupTime - } - startTime, err := utils.ParseTimeDetectLayout(timeStr) - if err != nil { - return nil, err - } - usage, err := utils.ParseDurationWithSecs(self.Usage) - if err != nil { - return nil, err - } - return &CallDescriptor{ + cd := &CallDescriptor{ TOR: self.TOR, Direction: self.Direction, Tenant: self.Tenant, @@ -691,7 +692,24 @@ func (self *UsageRecord) AsCallDescriptor() (*CallDescriptor, error) { Subject: self.Subject, Account: self.Account, Destination: self.Destination, - TimeStart: startTime, - TimeEnd: startTime.Add(usage), - }, nil + } + timeStr := self.AnswerTime + if len(timeStr) == 0 { // In case of auth, answer time will not be defined, so take it out of setup one + timeStr = self.SetupTime + } + if cd.TimeStart, err = utils.ParseTimeDetectLayout(timeStr); err != nil { + return nil, err + } + if usage, err := utils.ParseDurationWithSecs(self.Usage); err != nil { + return nil, err + } else { + cd.TimeEnd = cd.TimeStart.Add(usage) + } + if self.ExtraFields != nil { + cd.ExtraFields = make(map[string]string) + } + for k, v := range self.ExtraFields { + cd.ExtraFields[k] = v + } + return cd, nil } diff --git a/engine/users.go b/engine/users.go index aed3ead7d..0cf821c74 100644 --- a/engine/users.go +++ b/engine/users.go @@ -3,6 +3,7 @@ package engine import ( "sort" "strings" + "sync" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" @@ -61,6 +62,7 @@ type UserMap struct { index map[string]map[string]bool indexKeys []string ratingDb RatingStorage + mu sync.RWMutex } func NewUserMap(ratingDb RatingStorage) (*UserMap, error) { @@ -85,6 +87,8 @@ func newUserMap(ratingDb RatingStorage) *UserMap { } func (um *UserMap) SetUser(up UserProfile, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() if err := um.ratingDb.SetUser(&up); err != nil { *reply = err.Error() return err @@ -96,6 +100,8 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error { } func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() if err := um.ratingDb.RemoveUser(up.GetId()); err != nil { *reply = err.Error() return err @@ -107,6 +113,8 @@ func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { } func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() m, found := um.table[up.GetId()] if !found { *reply = utils.ErrNotFound.Error() @@ -144,6 +152,8 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { } func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { + um.mu.RLock() + defer um.mu.RUnlock() table := um.table // no index indexUnionKeys := make(map[string]bool) @@ -223,6 +233,8 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { } func (um *UserMap) AddIndex(indexes []string, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() um.indexKeys = indexes for key, values := range um.table { up := &UserProfile{Profile: values} @@ -305,6 +317,8 @@ func (um *UserMap) deleteIndex(up *UserProfile) { } func (um *UserMap) GetIndexes(in string, reply *map[string][]string) error { + um.mu.RLock() + defer um.mu.RUnlock() indexes := make(map[string][]string) for key, values := range um.index { var vs []string @@ -355,12 +369,22 @@ func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) er return ps.Client.Call("UsersV1.AddIndex", in, reply) } +// extraFields - Field name in the interface containing extraFields information func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) { if userService == nil { // no user service => no fun return in, nil } m := utils.ToMapStringString(in) - + var needsUsers bool + for _, val := range m { + if val == utils.USERS { + needsUsers = true + break + } + } + if !needsUsers { // Do not process further if user profile is not needed + return in, nil + } up := &UserProfile{ Profile: make(map[string]string), } diff --git a/engine/users_test.go b/engine/users_test.go index f876d9041..86699a9cb 100644 --- a/engine/users_test.go +++ b/engine/users_test.go @@ -3,6 +3,7 @@ package engine import ( "reflect" "testing" + "time" "github.com/cgrates/cgrates/utils" ) @@ -520,7 +521,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) { } } -func TestUsersStoredCDRGetLoadUserProfile(t *testing.T) { +func TestUsersUsageRecordGetLoadUserProfile(t *testing.T) { userService = &UserMap{ table: map[string]map[string]string{ "test:user": map[string]string{"TOR": "01", "ReqType": "1", "Direction": "*out", "Category": "c1", "Account": "dan", "Subject": "0723", "Destination": "+401", "SetupTime": "s1", "AnswerTime": "t1", "Usage": "10"}, @@ -568,7 +569,7 @@ func TestUsersStoredCDRGetLoadUserProfile(t *testing.T) { } } -func TestUsersStoredCDRGetLoadUserProfileExtraFields(t *testing.T) { +func TestUsersExternalCdrGetLoadUserProfileExtraFields(t *testing.T) { userService = &UserMap{ table: map[string]map[string]string{ "test:user": map[string]string{"TOR": "01", "ReqType": "1", "Direction": "*out", "Category": "c1", "Account": "dan", "Subject": "0723", "Destination": "+401", "SetupTime": "s1", "AnswerTime": "t1", "Usage": "10"}, @@ -622,7 +623,7 @@ func TestUsersStoredCDRGetLoadUserProfileExtraFields(t *testing.T) { } } -func TestUsersStoredCDRGetLoadUserProfileExtraFieldsNotFound(t *testing.T) { +func TestUsersExternalCdrGetLoadUserProfileExtraFieldsNotFound(t *testing.T) { userService = &UserMap{ table: map[string]map[string]string{ "test:user": map[string]string{"TOR": "01", "ReqType": "1", "Direction": "*out", "Category": "c1", "Account": "dan", "Subject": "0723", "Destination": "+401", "SetupTime": "s1", "AnswerTime": "t1", "Usage": "10"}, @@ -656,7 +657,7 @@ func TestUsersStoredCDRGetLoadUserProfileExtraFieldsNotFound(t *testing.T) { } } -func TestUsersStoredCDRGetLoadUserProfileExtraFieldsSet(t *testing.T) { +func TestUsersExternalCdrGetLoadUserProfileExtraFieldsSet(t *testing.T) { userService = &UserMap{ table: map[string]map[string]string{ "test:user": map[string]string{"TOR": "01", "ReqType": "1", "Direction": "*out", "Category": "c1", "Account": "dan", "Subject": "0723", "Destination": "+401", "SetupTime": "s1", "AnswerTime": "t1", "Usage": "10"}, @@ -711,3 +712,91 @@ func TestUsersStoredCDRGetLoadUserProfileExtraFieldsSet(t *testing.T) { t.Errorf("Expected: %+v got: %+v", expected, ur) } } + +func TestUsersCallDescLoadUserProfile(t *testing.T) { + userService = &UserMap{ + table: map[string]map[string]string{ + "cgrates.org:dan": map[string]string{"ReqType": "*prepaid", "Category": "call1", "Account": "dan", "Subject": "dan", "Cli": "+4986517174963"}, + "cgrates.org:danvoice": map[string]string{"TOR": "*voice", "ReqType": "*prepaid", "Category": "call1", "Account": "dan", "Subject": "0723"}, + "cgrates:rif": map[string]string{"ReqType": "*postpaid", "Direction": "*out", "Category": "call", "Account": "rif", "Subject": "0726"}, + }, + index: make(map[string]map[string]bool), + } + startTime := time.Now() + cd := &CallDescriptor{ + TOR: "*sms", + Tenant: utils.USERS, + Category: utils.USERS, + Subject: utils.USERS, + Account: utils.USERS, + Destination: "+4986517174963", + TimeStart: startTime, + TimeEnd: startTime.Add(time.Duration(1) * time.Minute), + ExtraFields: map[string]string{"Cli": "+4986517174963"}, + } + expected := &CallDescriptor{ + TOR: "*sms", + Tenant: "cgrates.org", + Category: "call1", + Account: "dan", + Subject: "dan", + Destination: "+4986517174963", + TimeStart: startTime, + TimeEnd: startTime.Add(time.Duration(1) * time.Minute), + ExtraFields: map[string]string{"Cli": "+4986517174963"}, + } + out, err := LoadUserProfile(cd, "ExtraFields") + if err != nil { + t.Error("Error loading user profile: ", err) + } + cdRcv := out.(*CallDescriptor) + if !reflect.DeepEqual(expected, cdRcv) { + t.Errorf("Expected: %+v got: %+v", expected, cdRcv) + } +} + +func TestUsersStoredCdrLoadUserProfile(t *testing.T) { + userService = &UserMap{ + table: map[string]map[string]string{ + "cgrates.org:dan": map[string]string{"ReqType": "*prepaid", "Category": "call1", "Account": "dan", "Subject": "dan", "Cli": "+4986517174963"}, + "cgrates.org:danvoice": map[string]string{"TOR": "*voice", "ReqType": "*prepaid", "Category": "call1", "Account": "dan", "Subject": "0723"}, + "cgrates:rif": map[string]string{"ReqType": "*postpaid", "Direction": "*out", "Category": "call", "Account": "rif", "Subject": "0726"}, + }, + index: make(map[string]map[string]bool), + } + startTime := time.Now() + cdr := &StoredCdr{ + TOR: "*sms", + ReqType: utils.USERS, + Tenant: utils.USERS, + Category: utils.USERS, + Account: utils.USERS, + Subject: utils.USERS, + Destination: "+4986517174963", + SetupTime: startTime, + AnswerTime: startTime, + Usage: time.Duration(1) * time.Minute, + ExtraFields: map[string]string{"Cli": "+4986517174963"}, + } + expected := &StoredCdr{ + TOR: "*sms", + ReqType: "*prepaid", + Tenant: "cgrates.org", + Category: "call1", + Account: "dan", + Subject: "dan", + Destination: "+4986517174963", + SetupTime: startTime, + AnswerTime: startTime, + Usage: time.Duration(1) * time.Minute, + ExtraFields: map[string]string{"Cli": "+4986517174963"}, + } + out, err := LoadUserProfile(cdr, "ExtraFields") + if err != nil { + t.Error("Error loading user profile: ", err) + } + cdRcv := out.(*StoredCdr) + if !reflect.DeepEqual(expected, cdRcv) { + t.Errorf("Expected: %+v got: %+v", expected, cdRcv) + } +} diff --git a/general_tests/tutorial_local_test.go b/general_tests/tutorial_local_test.go index cd78ef52c..adebc3cf0 100644 --- a/general_tests/tutorial_local_test.go +++ b/general_tests/tutorial_local_test.go @@ -117,7 +117,7 @@ func TestTutLocalCacheStats(t *testing.T) { } var rcvStats *utils.CacheStats expectedStats := &utils.CacheStats{Destinations: 4, RatingPlans: 3, RatingProfiles: 8, Actions: 7, SharedGroups: 1, RatingAliases: 1, AccountAliases: 1, - DerivedChargers: 1, LcrProfiles: 4} + DerivedChargers: 1, LcrProfiles: 4, CdrStats: 6} var args utils.AttrCacheStats if err := tutLocalRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 9495ddb8e..3adbde62a 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -546,6 +546,8 @@ type CacheStats struct { AccountAliases int DerivedChargers int LcrProfiles int + CdrStats int + Users int } type AttrCachedItemAge struct { diff --git a/utils/consts.go b/utils/consts.go index f75cc60d7..7ef9dfac4 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -27,7 +27,7 @@ var ( ) const ( - VERSION = "0.9.1rc6" + VERSION = "0.9.1~rc6" POSTGRES = "postgres" MYSQL = "mysql" MONGO = "mongo"