diff --git a/apier/v1/apier.go b/apier/v1/apier.go index ef8dd9890..a40eb161b 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1116,6 +1116,14 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, return err } } + + userKeys, _ := loader.GetLoadedIds(utils.USERS_PREFIX) + if len(userKeys) != 0 && self.Users != nil { + var r string + if err := self.Users.ReloadUsers("", &r); err != nil { + return err + } + } *reply = "OK" return nil } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e02affec2..a3fb9101d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -505,18 +505,12 @@ func main() { server.RpcRegisterName("ScribeV1", scribeServer) } if cfg.UserServerEnabled { - userServer, err = engine.NewUserMap(ratingDb) + userServer, err = engine.NewUserMap(accountDb, cfg.UserServerIndexes) if err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) exitChan <- true } server.RpcRegisterName("UsersV1", userServer) - if len(cfg.UserServerIndexes) != 0 { - var s string - if err := userServer.AddIndex(cfg.UserServerIndexes, &s); err != nil { - engine.Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", cfg.UserServerIndexes, err)) - } - } } // Register session manager service // FixMe: make sure this is thread safe diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 268bdbbf9..d6681708e 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -70,6 +70,7 @@ var ( historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving") raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") cdrstatsAddress = flag.String("cdrstats_address", cgrConfig.RPCGOBListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads") + usersAddress = flag.String("users_address", cgrConfig.RPCGOBListen, "Users service to contact for data reloads, empty to disable automatic data reloads") runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") ) @@ -83,7 +84,7 @@ func main() { var ratingDb engine.RatingStorage var accountDb engine.AccountingStorage var storDb engine.LoadStorage - var rater, cdrstats *rpc.Client + var rater, cdrstats, users *rpc.Client var loader engine.LoadReader // Init necessary db connections, only if not already if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb @@ -206,6 +207,19 @@ func main() { } else { log.Print("WARNING: CDRStats automatic data reload is disabled!") } + if *usersAddress != "" { // Init connection to rater so we can reload it's data + if *usersAddress == *raterAddress { + users = rater + } else { + users, err = rpc.Dial("tcp", *usersAddress) + if err != nil { + log.Fatalf("Could not connect to Users API: %s", err.Error()) + return + } + } + } else { + log.Print("WARNING: Users automatic data reload is disabled!") + } // write maps to database if err := tpReader.WriteToDatabase(*flush, *verbose); err != nil { @@ -272,4 +286,18 @@ func main() { } } } + + if users != nil { + userIds, _ := tpReader.GetLoadedIds(utils.USERS_PREFIX) + if len(userIds) > 0 { + if *verbose { + log.Print("Reloading Users data") + } + var reply string + if err := cdrstats.Call("UsersV1.ReloadUsers", "", &reply); err != nil { + log.Printf("WARNING: Failed reloading users data, error: %s\n", err.Error()) + } + + } + } } diff --git a/data/conf/samples/tutlocal/cgrates.json b/data/conf/samples/tutlocal/cgrates.json index d4567a697..a0d4d04ce 100644 --- a/data/conf/samples/tutlocal/cgrates.json +++ b/data/conf/samples/tutlocal/cgrates.json @@ -13,12 +13,12 @@ "rater": { "enabled": true, // enable Rater service: "cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> - "users": "internal", + "pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> + "users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> }, "scheduler": { "enabled": true, // start Scheduler service: - "save_interval": "5s", }, "cdrs": { @@ -31,8 +31,15 @@ "enabled": true, // starts the cdrstats service: }, -"users": { - "enabled": true, +"pubsubs": { + "enabled": true, // starts PubSub service: . }, + +"users": { + "enabled": true, // starts User service: . + "indexes": ["Uuid"], // user profile field indexes +}, + + } diff --git a/data/tariffplans/tutorial/Users.csv b/data/tariffplans/tutorial/Users.csv index 75936aa6a..d9e8d855c 100644 --- a/data/tariffplans/tutorial/Users.csv +++ b/data/tariffplans/tutorial/Users.csv @@ -2,5 +2,11 @@ cgrates.org,1001,SysUserName,danb cgrates.org,1001,SysPassword,hisPass321 cgrates.org,1001,Cli,+4986517174963 +cgrates.org,1001,Account,1001 +cgrates.org,1001,Subject,1001 +cgrates.org,1001,Uuid,388539dfd4f5cefee8f488b78c6c244b9e19138e cgrates.org,1002,SysUserName,rif cgrates.org,1002,RifAttr,RifVal +cgrates.org,1002,Account,1002 +cgrates.org,1002,Subject,1002 +cgrates.org,1002,Uuid,27f37edec0670fa34cf79076b80ef5021e39c5b5 diff --git a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json index 45a900faf..8b3d98aa9 100644 --- a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -10,6 +10,9 @@ "rater": { "enabled": true, // enable Rater service: "cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> + "historys": "internal", // address where to reach the history service, empty to disable history functionality: <""|internal|x.y.z.y:1234> + "pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> + "users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> }, @@ -24,6 +27,7 @@ "cdrstats": "internal", // address where to reach the cdrstats service. Empty to disable stats gathering <""|internal|x.y.z.y:1234> }, + "cdrstats": { "enabled": true, // starts the cdrstats service: }, @@ -61,7 +65,34 @@ {"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"}, ], "trailer_fields": [], // template of the exported trailer fields - } + }, + "customer_tpl": { + "cdr_format": "csv", // exported CDRs format + "field_separator": ";", + "data_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from KBytes to Bytes) + "sms_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from SMS unit to call duration in some billing systems) + "generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems) + "cost_multiply_factor": 1, // multiply cost before export, eg: add VAT + "cost_rounding_decimals": -1, // rounding decimals for Cost values. -1 to disable rounding + "cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents) + "mask_destination_id": "MASKED_DESTINATIONS", // destination id containing called addresses to be masked on export + "mask_length": 0, // length of the destination suffix to be masked + "export_dir": "/tmp/cgr_fsevsock/cgrates/cdre", // path where the exported CDRs will be placed + "header_fields": [], // template of the exported header fields + "content_fields": [ // template of the exported content fields + {"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"}, + {"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"}, + {"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"}, + {"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"}, + {"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"}, + {"tag":"Subject", "cdr_field_id": "account", "type": "cdrfield", "value": "account"}, + {"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "~destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"}, + {"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"}, + {"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"}, + ], + "trailer_fields": [], + }, }, @@ -77,14 +108,20 @@ }, -"history_server": { - "enabled": true, // starts History service: . +"historys": { + "enabled": true, // starts History service: . "history_dir": "/tmp/cgr_fsevsock/cgrates/history", // location on disk where to store history files. }, -"history_agent": { - "enabled": true, // starts History as a client: . +"pubsubs": { + "enabled": true, // starts PubSub service: . +}, + + +"users": { + "enabled": true, // starts User service: . + "indexes": ["Uuid"], // user profile field indexes }, diff --git a/data/tutorials/fs_evsock/freeswitch/etc/freeswitch_conf.tar.gz b/data/tutorials/fs_evsock/freeswitch/etc/freeswitch_conf.tar.gz index 7349bae0c..f62125994 100644 Binary files a/data/tutorials/fs_evsock/freeswitch/etc/freeswitch_conf.tar.gz and b/data/tutorials/fs_evsock/freeswitch/etc/freeswitch_conf.tar.gz differ diff --git a/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json b/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json index dc7d624a9..770a0ea06 100644 --- a/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json @@ -7,66 +7,12 @@ // This is what you get when you load CGRateS with an empty configuration file. -//"general": { -// "http_skip_tls_veify": false, // if enabled Http Client will accept any TLS certificate -// "rounding_decimals": 10, // system level precision for floats -// "dbdata_encoding": "msgpack", // encoding used to store object data in strings: -// "tpexport_dir": "/var/log/cgrates/tpe", // path towards export folder for offline Tariff Plans -// "default_reqtype": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated> -// "default_category": "call", // default Type of Record to consider when missing from requests -// "default_tenant": "cgrates.org", // default Tenant to consider when missing from requests -// "default_subject": "cgrates", // default rating Subject to consider when missing from requests -//}, - - -//"listen": { -// "rpc_json": "127.0.0.1:2012", // RPC JSON listening address -// "rpc_gob": "127.0.0.1:2013", // RPC GOB listening address -// "http": "127.0.0.1:2080", // HTTP listening address -//}, - - -//"rating_db": { -// "db_type": "redis", // rating subsystem database type: -// "db_host": "127.0.0.1", // rating subsystem database host address -// "db_port": 6379, // rating subsystem port to reach the database -// "db_name": "10", // rating subsystem database name to connect to -// "db_user": "", // rating subsystem username to use when connecting to database -// "db_passwd": "", // rating subsystem password to use when connecting to database -//}, - - -//"accounting_db": { -// "db_type": "redis", // accounting subsystem database: -// "db_host": "127.0.0.1", // accounting subsystem database host address -// "db_port": 6379, // accounting subsystem port to reach the database -// "db_name": "11", // accounting subsystem database name to connect to -// "db_user": "", // accounting subsystem username to use when connecting to database -// "db_passwd": "", // accounting subsystem password to use when connecting to database -//}, - - -//"stor_db": { -// "db_type": "mysql", // stor database type to use: -// "db_host": "127.0.0.1", // the host to connect to -// "db_port": 3306, // the port to reach the stordb -// "db_name": "cgrates", // stor database name -// "db_user": "cgrates", // username to use when connecting to stordb -// "db_passwd": "CGRateS.org", // password to use when connecting to stordb -// "max_open_conns": 0, // maximum database connections opened -// "max_idle_conns": -1, // maximum database connections idle -//}, - - -//"balancer": { -// "enabled": false, // start Balancer service: -//}, - - "rater": { "enabled": true, // enable Rater service: -// "balancer": "", // register to Balancer as worker: <""|internal|x.y.z.y:1234> - "cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> + "cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> + "historys": "internal", // address where to reach the history service, empty to disable history functionality: <""|internal|x.y.z.y:1234> + "pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> + "users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> }, @@ -77,35 +23,13 @@ "cdrs": { "enabled": true, // start the CDR Server service: -// "extra_fields": [], // extra fields to store in CDRs for non-generic CDRs -// "store_cdrs": true, // store cdrs in storDb - "rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234> - "cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> -// "reconnects": 5, // number of reconnect attempts to rater or cdrs -// "cdr_replication":[], // replicate the raw CDR to a number of servers + "rater": "internal", // address where to reach the Rater for cost calculation: <""|internal|x.y.z.y:1234> + "cdrstats": "internal", // address where to reach the cdrstats service. Empty to disable stats gathering <""|internal|x.y.z.y:1234> }, + "cdrstats": { "enabled": true, // starts the cdrstats service: -// "queue_length": 50, // number of items in the stats buffer -// "time_window": "1h", // will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow -// "metrics": ["ASR", "ACD", "ACC"], // stat metric ids to build -// "setup_interval": [], // filter on CDR SetupTime -// "tors": [], // filter on CDR TOR fields -// "cdr_hosts": [], // filter on CDR CdrHost fields -// "cdr_sources": [], // filter on CDR CdrSource fields -// "req_types": [], // filter on CDR ReqType fields -// "directions": [], // filter on CDR Direction fields -// "tenants": [], // filter on CDR Tenant fields -// "categories": [], // filter on CDR Category fields -// "accounts": [], // filter on CDR Account fields -// "subjects": [], // filter on CDR Subject fields -// "destination_prefixes": [], // filter on CDR Destination prefixes -// "usage_interval": [], // filter on CDR Usage -// "mediation_run_ids": [], // filter on CDR MediationRunId fields -// "rated_accounts": [], // filter on CDR RatedAccount fields -// "rated_subjects": [], // filter on CDR RatedSubject fields -// "cost_interval": [], // filter on CDR Cost }, @@ -172,103 +96,28 @@ }, -//"cdrc": { -// "*default": { -// "enabled": false, // enable CDR client functionality -// "cdrs_address": "internal", // address where to reach CDR server. -// "cdr_format": "csv", // CDR file format -// "field_separator": ",", // separator used in case of csv files -// "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify -// "data_usage_multiply_factor": 1024, // conversion factor for data usage -// "cdr_in_dir": "/var/log/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored -// "cdr_out_dir": "/var/log/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved -// "cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database -// "cdr_filter": "", // Filter CDR records to import -// "content_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value -// {"tag": "tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "2", "mandatory": true}, -// {"tag": "accid", "cdr_field_id": "accid", "type": "cdrfield", "value": "3", "mandatory": true}, -// {"tag": "reqtype", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "4", "mandatory": true}, -// {"tag": "direction", "cdr_field_id": "direction", "type": "cdrfield", "value": "5", "mandatory": true}, -// {"tag": "tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "6", "mandatory": true}, -// {"tag": "category", "cdr_field_id": "category", "type": "cdrfield", "value": "7", "mandatory": true}, -// {"tag": "account", "cdr_field_id": "account", "type": "cdrfield", "value": "8", "mandatory": true}, -// {"tag": "subject", "cdr_field_id": "subject", "type": "cdrfield", "value": "9", "mandatory": true}, -// {"tag": "destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "10", "mandatory": true}, -// {"tag": "setup_time", "cdr_field_id": "setup_time", "type": "cdrfield", "value": "11", "mandatory": true}, -// {"tag": "answer_time", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "12", "mandatory": true}, -// {"tag": "usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "13", "mandatory": true}, -// ], -// }, -//}, - - -//"sm_freeswitch": { -// "enabled": false, // starts SessionManager service: -// "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> -// "cdrs": "", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> -// "reconnects": 5, // number of reconnect attempts to rater or cdrs -// "cdr_extra_fields": [], // extra fields to store in CDRs in case of processing them -// "debit_interval": "10s", // interval to perform debits on. -// "min_call_duration": "0s", // only authorize calls with allowed duration higher than this -// "max_call_duration": "3h", // maximum call duration a prepaid call can last -// "min_dur_low_balance": "5s", // threshold which will trigger low balance warnings for prepaid calls (needs to be lower than debit_interval) -// "low_balance_ann_file": "", // file to be played when low balance is reached for prepaid calls -// "empty_balance_context": "", // if defined, prepaid calls will be transfered to this context on empty balance -// "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined) -// "connections":[ // instantiate connections to multiple FreeSWITCH servers -// {"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5} -// ], -//}, - - "sm_kamailio": { "enabled": true, // starts SessionManager service: -// "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> -// "reconnects": 5, // number of reconnect attempts to rater or cdrs "create_cdr": true, // create CDR out of events and sends them to CDRS component -// "debit_interval": "10s", // interval to perform debits on. -// "min_call_duration": "0s", // only authorize calls with allowed duration higher than this -// "max_call_duration": "3h", // maximum call duration a prepaid call can last -// "connections":[ // instantiate connections to multiple Kamailio servers -// {"evapi_addr": "127.0.0.1:8448", "reconnects": 5} // reconnects -1 to indefinitely connect -// ], }, -//"sm_opensips": { -// "enabled": false, // starts SessionManager service: -// "listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS -// "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> -// "cdrs": "", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> -// "debit_interval": "10s", // interval to perform debits on. -// "min_call_duration": "0s", // only authorize calls with allowed duration higher than this -// "max_call_duration": "3h", // maximum call duration a prepaid call can last -// "events_subscribe_interval": "60s", // automatic events subscription to OpenSIPS, 0 to disable it -// "mi_addr": "127.0.0.1:8020", // address where to reach OpenSIPS MI to send session disconnects -// "reconnects": -1, // reconnects -1 to indefinitely connect -//}, - - - -"history_server": { - "enabled": true, // starts History service: . +"historys": { + "enabled": true, // starts History service: . "history_dir": "/tmp/cgr_kamevapi/cgrates/history", // location on disk where to store history files. -// "save_interval": "1s", // interval to save changed cache into .git archive }, -"history_agent": { - "enabled": true, // starts History as a client: . -// "server": "internal", // address where to reach the master history server: +"pubsubs": { + "enabled": true, // starts PubSub service: . }, -//"mailer": { -// "server": "localhost", // the server to use when sending emails out -// "auth_user": "cgrates", // authenticate to email server using this user -// "auth_passwd": "CGRateS.org", // authenticate to email server with this password -// "from_address": "cgr-mailer@localhost.localdomain" // from address used when sending emails out -//}, +"users": { + "enabled": true, // starts User service: . + "indexes": ["Uuid"], // user profile field indexes +}, + } diff --git a/data/tutorials/kamevapi/kamailio/etc/kamailio/kamailio-cgrates.cfg b/data/tutorials/kamevapi/kamailio/etc/kamailio/kamailio-cgrates.cfg index 954034144..9a837fd5e 100644 --- a/data/tutorials/kamevapi/kamailio/etc/kamailio/kamailio-cgrates.cfg +++ b/data/tutorials/kamevapi/kamailio/etc/kamailio/kamailio-cgrates.cfg @@ -103,7 +103,7 @@ route[CGR_LCR_REPLY] { route[CGR_SESSION_DISCONNECT] { json_get_field("$evapi(msg)", "HashEntry", "$var(HashEntry)"); json_get_field("$evapi(msg)", "HashId", "$var(HashId)"); - son_get_field("$evapi(msg)", "Reason", "$var(Reason)"); + json_get_field("$evapi(msg)", "Reason", "$var(Reason)"); jsonrpc_exec('{"jsonrpc":"2.0","id":1, "method":"dlg.end_dlg","params":[$(var(HashEntry){s.rm,"}),$(var(HashId){s.rm,"})]}'); #$jsonrpl($var(reply)); } diff --git a/data/tutorials/kamevapi/kamailio/etc/kamailio/kamailio.cfg b/data/tutorials/kamevapi/kamailio/etc/kamailio/kamailio.cfg index 6ae550efd..a61eea930 100644 --- a/data/tutorials/kamevapi/kamailio/etc/kamailio/kamailio.cfg +++ b/data/tutorials/kamevapi/kamailio/etc/kamailio/kamailio.cfg @@ -200,9 +200,14 @@ route[CGRATES_AUTH_REPLY] { sl_send_reply("503","CGR_ERROR"); exit; } - if $var(CgrMaxSessionTime) != -1 && !dlg_set_timeout("$var(CgrMaxSessionTime)") { - sl_send_reply("503","CGR_MAX_SESSION_TIME_ERROR"); - exit; + if $var(CgrMaxSessionTime) != -1 { + if $var(CgrMaxSessionTime) == 0 { // Not enough balance, do not allow the call to go through + sl_send_reply("403","Insufficient credit"); + exit; + } else if !dlg_set_timeout("$var(CgrMaxSessionTime)") { + sl_send_reply("503","CGR_MAX_SESSION_TIME_ERROR"); + exit; + } } if $var(CgrSuppliers) != "" { # Enforce the supplier variable to the first one received from CGRateS, more for testing purposes $dlg_var(cgrSupplier) = $(var(CgrSuppliers){s.select,0,,}); diff --git a/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json b/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json index 288c1ada6..fc500ef2b 100644 --- a/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json @@ -10,6 +10,9 @@ "rater": { "enabled": true, // enable Rater service: "cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> + "historys": "internal", // address where to reach the history service, empty to disable history functionality: <""|internal|x.y.z.y:1234> + "pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> + "users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> }, @@ -42,7 +45,7 @@ "cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents) "mask_destination_id": "MASKED_DESTINATIONS", // destination id containing called addresses to be masked on export "mask_length": 0, // length of the destination suffix to be masked - "export_dir": "/tmp/cgr_fsevsock/cgrates/cdre", // path where the exported CDRs will be placed + "export_dir": "/tmp/cgr_osipsasync/cgrates/cdre", // path where the exported CDRs will be placed "header_fields": [], // template of the exported header fields "content_fields": [ // template of the exported content fields {"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"}, @@ -62,7 +65,34 @@ {"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"}, ], "trailer_fields": [], // template of the exported trailer fields - } + }, + "customer_tpl": { + "cdr_format": "csv", // exported CDRs format + "field_separator": ";", + "data_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from KBytes to Bytes) + "sms_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from SMS unit to call duration in some billing systems) + "generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems) + "cost_multiply_factor": 1, // multiply cost before export, eg: add VAT + "cost_rounding_decimals": -1, // rounding decimals for Cost values. -1 to disable rounding + "cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents) + "mask_destination_id": "MASKED_DESTINATIONS", // destination id containing called addresses to be masked on export + "mask_length": 0, // length of the destination suffix to be masked + "export_dir": "/tmp/cgr_osipsasync/cgrates/cdre", // path where the exported CDRs will be placed + "header_fields": [], // template of the exported header fields + "content_fields": [ // template of the exported content fields + {"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"}, + {"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"}, + {"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"}, + {"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"}, + {"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"}, + {"tag":"Subject", "cdr_field_id": "account", "type": "cdrfield", "value": "account"}, + {"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "~destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"}, + {"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"}, + {"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"}, + ], + "trailer_fields": [], + }, }, @@ -78,15 +108,20 @@ }, -"history_server": { - "enabled": true, // starts History service: . +"historys": { + "enabled": true, // starts History service: . "history_dir": "/tmp/cgr_osipsasync/cgrates/history", // location on disk where to store history files. }, -"history_agent": { - "enabled": true, // starts History as a client: . - "server": "internal", // address where to reach the master history server: +"pubsubs": { + "enabled": true, // starts PubSub service: . +}, + + +"users": { + "enabled": true, // starts User service: . + "indexes": ["Uuid"], // user profile field indexes }, diff --git a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg index bb8b57d66..c3a8f2990 100644 --- a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg +++ b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg @@ -91,22 +91,22 @@ modparam("db_flatstore", "single_file", 1) loadmodule "acc.so" modparam("acc", "detect_direction", 1) #modparam("acc", "cdr_flag", "CDR") -#modparam("acc", "evi_flag", "CDR") -#modparam("acc", "evi_missed_flag", "CDR") +modparam("acc", "evi_flag", "CDR") +modparam("acc", "evi_missed_flag", "CDR") modparam("acc", "evi_extra", "cgr_reqtype=$avp(cgr_reqtype); cgr_account=$avp(cgr_account); cgr_destination=$avp(cgr_destination); cgr_supplier=$avp(cgr_supplier); dialog_id=$DLG_did") -modparam("acc", "db_url", "flatstore:/tmp") -modparam("acc", "db_flag", "CDR") -modparam("acc", "db_missed_flag", "CDR") -modparam("acc", "db_table_missed_calls", "cgr_missed") -modparam("acc", "db_extra", "cgr_reqtype=$avp(cgr_reqtype); - cgr_account=$avp(cgr_account); - cgr_destination=$avp(cgr_destination); - cgr_supplier=$avp(cgr_supplier); - dialog_id=$DLG_did") +#modparam("acc", "db_url", "flatstore:/tmp") +#modparam("acc", "db_flag", "CDR") +#modparam("acc", "db_missed_flag", "CDR") +#modparam("acc", "db_table_missed_calls", "cgr_missed") +#modparam("acc", "db_extra", "cgr_reqtype=$avp(cgr_reqtype); +# cgr_account=$avp(cgr_account); +# cgr_destination=$avp(cgr_destination); +# cgr_supplier=$avp(cgr_supplier); +# dialog_id=$DLG_did") #### CfgUtils module loadmodule "cfgutils.so" @@ -358,9 +358,6 @@ route[location] { t_reply("404", "Not Found"); exit; } - append_branch(); - append_branch(); - setflag(CDR); } failure_route[missed_call] { diff --git a/docs/lcr.rst b/docs/lcr.rst index a6cd02362..f803f39b5 100644 --- a/docs/lcr.rst +++ b/docs/lcr.rst @@ -37,10 +37,10 @@ Strategy indicates supplier selection algorithm and StrategyParams will be speci \*load_distribution (sorting/filter) The system will sort the suppliers in order to achieve the specified load distribution. - - if all have less than ponder return random order - - if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first - - if all have a multiple of ponder return in the order of cdr times, oldest first - StrategyParams: supplier1:ponder;supplier2:ponder;*default:ponder + - if all have less than ratio return random order + - if some have a cdr count not divisible by ratio return them first and all ordered by cdr times, oldest first + - if all have a multiple of ratio return in the order of cdr times, oldest first + StrategyParams: supplier1:ratio;supplier2:ratio;*default:ratio ActivationTime is the date/time when the LCR entry starts to be active. diff --git a/engine/balances.go b/engine/balances.go index 6766ea55d..0408bdcc1 100644 --- a/engine/balances.go +++ b/engine/balances.go @@ -109,7 +109,7 @@ func (b *Balance) IsActiveAt(t time.Time) bool { return true } for _, tim := range b.Timings { - if tim.IsActiveAt(t, false) { + if tim.IsActiveAt(t) { return true } } diff --git a/engine/callcost_test.go b/engine/callcost_test.go index 6b3d6701f..10d2803cc 100644 --- a/engine/callcost_test.go +++ b/engine/callcost_test.go @@ -55,6 +55,7 @@ func TestMultipleResultMerge(t *testing.T) { cd := &CallDescriptor{Direction: OUTBOUND, Category: "0", Tenant: "vdf", Subject: "rif", Destination: "0256", TimeStart: t1, TimeEnd: t2} cc1, _ := cd.getCost() if cc1.Cost != 61 { + //ils.LogFull(cc1) t.Errorf("expected 61 was %v", cc1.Cost) for _, ts := range cc1.Timespans { t.Log(ts.RateInterval) diff --git a/engine/calldesc.go b/engine/calldesc.go index c3725f183..9983319b1 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -339,6 +339,7 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) { for i := 0; i < len(timespans); i++ { newTs := timespans[i].SplitByRatingPlan(rp) if newTs != nil { + //log.Print("NEW TS", newTs.TimeStart) timespans = append(timespans, newTs) } else { afterEnd = true @@ -347,8 +348,23 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) { } } } - } + // split on days + /*for i := 0; i < len(timespans); i++ { + if timespans[i].TimeStart.Day() != timespans[i].TimeEnd.Day() { + //log.Print("TS: ", timespans[i].TimeStart, timespans[i].TimeEnd) + start := timespans[i].TimeStart + newTs := timespans[i].SplitByTime(time.Date(start.Year(), start.Month(), start.Day(), 0, 0, 0, 0, start.Location()).Add(24 * time.Hour)) + if newTs != nil { + //log.Print("NEW TS: ", newTs.TimeStart, newTs.TimeEnd) + // insert the new timespan + index := i + 1 + timespans = append(timespans, nil) + copy(timespans[index+1:], timespans[index:]) + timespans[index] = newTs + } + } + }*/ // Logger.Debug(fmt.Sprintf("After SplitByRatingPlan: %+v", timespans)) // split on rate intervals for i := 0; i < len(timespans); i++ { @@ -358,16 +374,20 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) { // Logger.Debug(fmt.Sprintf("rp: %+v", rp)) //timespans[i].RatingPlan = nil rp.RateIntervals.Sort() + /*for _, interval := range rp.RateIntervals { + if !timespans[i].hasBetterRateIntervalThan(interval) { + timespans[i].SetRateInterval(interval) + } + }*/ + //log.Print("ORIG TS: ", timespans[i].TimeStart, timespans[i].TimeEnd) + //log.Print(timespans[i].RateInterval) for _, interval := range rp.RateIntervals { //log.Printf("\tINTERVAL: %+v", interval.Timing) - if timespans[i].hasBetterRateIntervalThan(interval) { - //log.Print("continue") - continue // if the timespan has an interval than it already has a heigher weight - } newTs := timespans[i].SplitByRateInterval(interval, cd.TOR != utils.VOICE) //utils.PrintFull(timespans[i]) //utils.PrintFull(newTs) if newTs != nil { + //log.Print("NEW TS: ", newTs.TimeStart, newTs.TimeEnd) newTs.setRatingInfo(rp) // insert the new timespan index := i + 1 @@ -379,6 +399,8 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) { } } } + //log.Print("TS: ", timespans[i].TimeStart, timespans[i].TimeEnd) + //log.Print(timespans[i].RateInterval.Timing) } //Logger.Debug(fmt.Sprintf("After SplitByRateInterval: %+v", timespans)) @@ -422,6 +444,7 @@ func (cd *CallDescriptor) GetDuration() time.Duration { Creates a CallCost structure with the cost information calculated for the received CallDescriptor. */ func (cd *CallDescriptor) GetCost() (*CallCost, error) { + cd.account = nil // make sure it's not cached cc, err := cd.getCost() if err != nil { return nil, err @@ -586,6 +609,7 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura } func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err error) { + cd.account = nil // make sure it's not cached if account, err := cd.getAccount(); err != nil || account == nil { Logger.Err(fmt.Sprintf("Could not get user balance for <%s>: %s.", cd.GetAccountKey(), err.Error())) return 0, err @@ -639,6 +663,7 @@ func (cd *CallDescriptor) debit(account *Account, dryRun bool, goNegative bool) } func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { + cd.account = nil // make sure it's not cached // lock all group members if account, err := cd.getAccount(); err != nil || account == nil { Logger.Err(fmt.Sprintf("Could not get user balance for <%s>: %s.", cd.GetAccountKey(), err.Error())) @@ -661,6 +686,7 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { // This methods combines the Debit and GetMaxSessionDuration and will debit the max available time as returned // by the GetMaxSessionDuration method. The amount filed has to be filled in call descriptor. func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { + cd.account = nil // make sure it's not cached if account, err := cd.getAccount(); err != nil || account == nil { Logger.Err(fmt.Sprintf("Could not get user balance for <%s>: %s.", cd.GetAccountKey(), err.Error())) return nil, err @@ -692,6 +718,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { } func (cd *CallDescriptor) RefundIncrements() (left float64, err error) { + cd.account = nil // make sure it's not cached accountsCache := make(map[string]*Account) for _, increment := range cd.Increments { account, found := accountsCache[increment.BalanceInfo.AccountId] @@ -770,6 +797,7 @@ func (cd *CallDescriptor) GetLCRFromStorage() (*LCR, error) { } func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { + cd.account = nil // make sure it's not cached lcr, err := cd.GetLCRFromStorage() if err != nil { return nil, err diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index bcdfa6197..8ac3f708f 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -195,7 +195,6 @@ func TestSplitSpansWeekend(t *testing.T) { }, } - //log.Print("=============================") timespans := cd.splitInTimeSpans() if len(timespans) != 2 { t.Log(cd.RatingInfos) @@ -404,9 +403,10 @@ func TestSpansMultipleRatingPlans(t *testing.T) { t1 := time.Date(2012, time.February, 7, 23, 50, 0, 0, time.UTC) t2 := time.Date(2012, time.February, 8, 0, 30, 0, 0, time.UTC) cd := &CallDescriptor{Direction: "*out", Category: "0", Tenant: "vdf", Subject: "rif", Destination: "0257308200", TimeStart: t1, TimeEnd: t2} - result, _ := cd.GetCost() - if result.Cost != 1200 || result.GetConnectFee() != 0 { - t.Errorf("Expected %v was %v", 1200, result) + cc, _ := cd.GetCost() + if cc.Cost != 2100 || cc.GetConnectFee() != 0 { + utils.LogFull(cc) + t.Errorf("Expected %v was %v (%v)", 2100, cc, cc.GetConnectFee()) } } @@ -977,7 +977,9 @@ func TestDebitNegatve(t *testing.T) { t.Errorf("Error debiting from empty share: %+v", balanceMap[0].GetValue()) } cc, err = cd.MaxDebit() - //utils.PrintFull(cc) + acc, _ = cd.getAccount() + balanceMap = acc.BalanceMap[utils.MONETARY+OUTBOUND] + //utils.LogFull(balanceMap) if err != nil || cc.Cost != 2.5 { t.Errorf("Debit from empty share error: %+v, %v", cc, err) } diff --git a/engine/cdrs.go b/engine/cdrs.go index 5a0ad538e..39f3b1c9c 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -324,7 +324,6 @@ func (self *CdrServer) rateCDR(storedCdr *StoredCdr) error { func (self *CdrServer) replicateCdr(cdr *StoredCdr) error { Logger.Debug(fmt.Sprintf("replicateCdr cdr: %+v, configuration: %+v", cdr, self.cgrCfg.CDRSCdrReplication)) for _, rplCfg := range self.cgrCfg.CDRSCdrReplication { - Logger.Debug(fmt.Sprintf("Replicating CDR with configuration: %+v", rplCfg)) passesFilters := true for _, cdfFltr := range rplCfg.CdrFilter { if fltrPass, _ := cdr.PassesFieldFilter(cdfFltr); !fltrPass { diff --git a/engine/lcr.go b/engine/lcr.go index 71e48361a..70099aa22 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -337,25 +337,25 @@ func (lc *LCRCost) SortLoadDistribution() { /*for supplier, sq := range supplierQueues { log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow) }*/ - // if all have less than ponder return random order - // if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first - // if all have a multiple of ponder return in the order of cdr times, oldest first + // if all have less than ratio return random order + // if some have a cdr count not divisible by ratio return them first and all ordered by cdr times, oldest first + // if all have a multiple of ratio return in the order of cdr times, oldest first // first put them in one of the above categories - havePonderlessSuppliers := false + haveRatiolessSuppliers := false for supCost, sq := range supplierQueues { - ponder := lc.GetSupplierPonder(supCost.Supplier) - if ponder == -1 { + ratio := lc.GetSupplierRatio(supCost.Supplier) + if ratio == -1 { supCost.Cost = -1 - havePonderlessSuppliers = true + haveRatiolessSuppliers = true continue } cdrCount := len(sq.Cdrs) - if cdrCount < ponder { + if cdrCount < ratio { supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT)) continue } - if cdrCount%ponder == 0 { + if cdrCount%ratio == 0 { supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) continue } else { @@ -363,7 +363,7 @@ func (lc *LCRCost) SortLoadDistribution() { continue } } - if havePonderlessSuppliers { + if haveRatiolessSuppliers { var filteredSupplierCost []*LCRSupplierCost for _, supCost := range lc.SupplierCosts { if supCost.Cost != -1 { @@ -375,35 +375,35 @@ func (lc *LCRCost) SortLoadDistribution() { } // used in load distribution strategy only -// receives a long supplier id and will return the ponder found in strategy params -func (lc *LCRCost) GetSupplierPonder(supplier string) int { +// receives a long supplier id and will return the ratio found in strategy params +func (lc *LCRCost) GetSupplierRatio(supplier string) int { // parse strategy params - ponders := make(map[string]int) + ratios := make(map[string]int) params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP) for _, param := range params { - ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) - if len(ponderSlice) != 2 { + ratioSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) + if len(ratioSlice) != 2 { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue } - p, err := strconv.Atoi(ponderSlice[1]) + p, err := strconv.Atoi(ratioSlice[1]) if err != nil { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue } - ponders[ponderSlice[0]] = p + ratios[ratioSlice[0]] = p } parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP) if len(parts) > 0 { supplierSubject := parts[len(parts)-1] - if ponder, found := ponders[supplierSubject]; found { - return ponder + if ratio, found := ratios[supplierSubject]; found { + return ratio } - if ponder, found := ponders[utils.META_DEFAULT]; found { - return ponder + if ratio, found := ratios[utils.META_DEFAULT]; found { + return ratio } } - if len(ponders) == 0 { + if len(ratios) == 0 { return 1 // use random/last cdr date sorting } return -1 // exclude missing suppliers diff --git a/engine/lcr_test.go b/engine/lcr_test.go index ad94fabe9..93fb2c6d4 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -215,11 +215,11 @@ func TestLcrGet(t *testing.T) { func TestLcrRequestAsCallDescriptor(t *testing.T) { sTime := time.Date(2015, 04, 06, 17, 40, 0, 0, time.UTC) callDur := time.Duration(1) * time.Minute - lcrReq := &LcrRequest{Account: "1001", StartTime: sTime.String()} + lcrReq := &LcrRequest{Account: "2001", StartTime: sTime.String()} if _, err := lcrReq.AsCallDescriptor(); err == nil || err != utils.ErrMandatoryIeMissing { t.Error("Unexpected error received: %v", err) } - lcrReq = &LcrRequest{Account: "1001", Destination: "1002", StartTime: sTime.String()} + lcrReq = &LcrRequest{Account: "2001", Destination: "2002", StartTime: sTime.String()} eCd := &CallDescriptor{ Direction: utils.OUT, Tenant: config.CgrConfig().DefaultTenant, @@ -412,7 +412,7 @@ func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -452,7 +452,7 @@ func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -512,7 +512,7 @@ func TestLCRCostSuppliersLoadAllOver(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -552,7 +552,7 @@ func TestLCRCostSuppliersLoadAllOver(t *testing.T) { }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -612,7 +612,7 @@ func TestLCRCostSuppliersLoadAllOverMisingDefault(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -652,7 +652,7 @@ func TestLCRCostSuppliersLoadAllOverMisingDefault(t *testing.T) { }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, diff --git a/engine/rateinterval.go b/engine/rateinterval.go index 37d5d2f94..0087f9b7b 100644 --- a/engine/rateinterval.go +++ b/engine/rateinterval.go @@ -116,13 +116,44 @@ func (rit *RITiming) CronString() string { return rit.cronString } -// Returns wheter the Timing is active at the specified time -func (rit *RITiming) IsActiveAt(t time.Time, endTime bool) bool { - // if the received time represents an endtime consider it 24 instead of 0 - hour := t.Hour() - if endTime && hour == 0 { - hour = 24 +/* +Returns a time object that represents the end of the interval realtive to the received time +*/ +func (rit *RITiming) getRightMargin(t time.Time) (rigthtTime time.Time) { + year, month, day := t.Year(), t.Month(), t.Day() + hour, min, sec, nsec := 23, 59, 59, 0 + loc := t.Location() + if rit.EndTime != "" { + split := strings.Split(rit.EndTime, ":") + hour, _ = strconv.Atoi(split[0]) + min, _ = strconv.Atoi(split[1]) + sec, _ = strconv.Atoi(split[2]) + //log.Print("RIGHT1: ", time.Date(year, month, day, hour, min, sec, nsec, loc)) + return time.Date(year, month, day, hour, min, sec, nsec, loc) } + //log.Print("RIGHT2: ", time.Date(year, month, day, hour, min, sec, nsec, loc).Add(time.Second)) + return time.Date(year, month, day, hour, min, sec, nsec, loc).Add(time.Second) +} + +/* +Returns a time object that represents the start of the interval realtive to the received time +*/ +func (rit *RITiming) getLeftMargin(t time.Time) (rigthtTime time.Time) { + year, month, day := t.Year(), t.Month(), t.Day() + hour, min, sec, nsec := 0, 0, 0, 0 + loc := t.Location() + if rit.StartTime != "" { + split := strings.Split(rit.StartTime, ":") + hour, _ = strconv.Atoi(split[0]) + min, _ = strconv.Atoi(split[1]) + sec, _ = strconv.Atoi(split[2]) + } + //log.Print("LEFT: ", time.Date(year, month, day, hour, min, sec, nsec, loc)) + return time.Date(year, month, day, hour, min, sec, nsec, loc) +} + +// Returns wheter the Timing is active at the specified time +func (rit *RITiming) IsActiveAt(t time.Time) bool { // check for years if len(rit.Years) > 0 && !rit.Years.Contains(t.Year()) { return false @@ -139,38 +170,25 @@ func (rit *RITiming) IsActiveAt(t time.Time, endTime bool) bool { if len(rit.WeekDays) > 0 && !rit.WeekDays.Contains(t.Weekday()) { return false } + //log.Print("Time: ", t) + + //log.Print("Left Margin: ", rit.getLeftMargin(t)) // check for start hour - if rit.StartTime != "" { - split := strings.Split(rit.StartTime, ":") - sh, _ := strconv.Atoi(split[0]) - sm, _ := strconv.Atoi(split[1]) - ss, _ := strconv.Atoi(split[2]) - // if the hour result before or result the same hour but the minute result before - if hour < sh || - (hour == sh && t.Minute() < sm) || - (hour == sh && t.Minute() == sm && t.Second() < ss) { - return false - } + if t.Before(rit.getLeftMargin(t)) { + return false } + + //log.Print("Right Margin: ", rit.getRightMargin(t)) // check for end hour - if rit.EndTime != "" { - split := strings.Split(rit.EndTime, ":") - eh, _ := strconv.Atoi(split[0]) - em, _ := strconv.Atoi(split[1]) - es, _ := strconv.Atoi(split[2]) - // if the hour result after or result the same hour but the minute result after - if hour > eh || - (hour == eh && t.Minute() > em) || - (hour == eh && t.Minute() == em && t.Second() > es) { - return false - } + if t.After(rit.getRightMargin(t)) { + return false } return true } // IsActive returns wheter the Timing is active now func (rit *RITiming) IsActive() bool { - return rit.IsActiveAt(time.Now(), false) + return rit.IsActiveAt(time.Now()) } func (rit *RITiming) IsBlank() bool { @@ -271,43 +289,12 @@ func (pg *RateGroups) AddRate(ps ...*Rate) { Returns true if the received time result inside the interval */ func (i *RateInterval) Contains(t time.Time, endTime bool) bool { - return i.Timing.IsActiveAt(t, endTime) -} - -/* -Returns a time object that represents the end of the interval realtive to the received time -*/ -func (i *RateInterval) getRightMargin(t time.Time) (rigthtTime time.Time) { - year, month, day := t.Year(), t.Month(), t.Day() - hour, min, sec, nsec := 23, 59, 59, 0 - loc := t.Location() - if i.Timing.EndTime != "" { - split := strings.Split(i.Timing.EndTime, ":") - hour, _ = strconv.Atoi(split[0]) - min, _ = strconv.Atoi(split[1]) - sec, _ = strconv.Atoi(split[2]) - //log.Print("RIGHT1: ", time.Date(year, month, day, hour, min, sec, nsec, loc)) - return time.Date(year, month, day, hour, min, sec, nsec, loc) + if endTime { + if t.Hour() == 0 && t.Minute() == 0 && t.Second() == 0 { // back one second to 23:59:59 + t = t.Add(-1 * time.Second) + } } - //log.Print("RIGHT2: ", time.Date(year, month, day, hour, min, sec, nsec, loc).Add(time.Second)) - return time.Date(year, month, day, hour, min, sec, nsec, loc).Add(time.Second) -} - -/* -Returns a time object that represents the start of the interval realtive to the received time -*/ -func (i *RateInterval) getLeftMargin(t time.Time) (rigthtTime time.Time) { - year, month, day := t.Year(), t.Month(), t.Day() - hour, min, sec, nsec := 0, 0, 0, 0 - loc := t.Location() - if i.Timing.StartTime != "" { - split := strings.Split(i.Timing.StartTime, ":") - hour, _ = strconv.Atoi(split[0]) - min, _ = strconv.Atoi(split[1]) - sec, _ = strconv.Atoi(split[2]) - } - //log.Print("LEFT: ", time.Date(year, month, day, hour, min, sec, nsec, loc)) - return time.Date(year, month, day, hour, min, sec, nsec, loc) + return i.Timing.IsActiveAt(t) } func (i *RateInterval) String_DISABLED() string { @@ -376,7 +363,7 @@ func (il RateIntervalList) Swap(i, j int) { // we need higher weights earlyer in the list func (il RateIntervalList) Less(j, i int) bool { - return il[i].Weight < il[j].Weight + return il[i].Weight < il[j].Weight //|| il[i].Timing.StartTime > il[j].Timing.StartTime } func (il RateIntervalList) Sort() { diff --git a/engine/responder.go b/engine/responder.go index 7cb1c8ebe..9f083c2cc 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -46,7 +46,7 @@ type Responder struct { ExitChan chan bool CdrSrv *CdrServer Stats StatsInterface - Timeout time.Duration + cnt int64 responseCache *cache2go.ResponseCache } @@ -70,6 +70,7 @@ func (rs *Responder) getCache() *cache2go.ResponseCache { RPC method thet provides the external RPC interface for getting the rating information. */ func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) { + rs.cnt += 1 if arg.Subject == "" { arg.Subject = arg.Account } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 6c05d287e..f06851e87 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -71,10 +71,6 @@ type RatingStorage interface { SetAccAlias(string, string) error RemoveAccAliases([]*TenantAccount, bool) error GetAccountAliases(string, string, bool) ([]string, error) - SetUser(*UserProfile) error - GetUser(string) (*UserProfile, error) - GetUsers() ([]*UserProfile, error) - RemoveUser(string) error } type AccountingStorage interface { @@ -87,6 +83,10 @@ type AccountingStorage interface { GetSubscribers() (map[string]*SubscriberData, error) SetSubscriber(string, *SubscriberData) error RemoveSubscriber(string) error + SetUser(*UserProfile) error + GetUser(string) (*UserProfile, error) + GetUsers() ([]*UserProfile, error) + RemoveUser(string) error } type CdrStorage interface { diff --git a/engine/timespans.go b/engine/timespans.go index 30d76b292..a4197f9ec 100644 --- a/engine/timespans.go +++ b/engine/timespans.go @@ -242,24 +242,13 @@ func (ts *TimeSpan) Contains(t time.Time) bool { return t.After(ts.TimeStart) && t.Before(ts.TimeEnd) } -/* -Will set the interval as spans's interval if new Weight is lower then span's interval Weight -or if the Weights are equal and new price is lower then spans's interval price -*/ -func (ts *TimeSpan) SetRateInterval(i *RateInterval) { - //log.Printf("SETRATEINTERVAL: %+v", i.Timing) - // higher weights are better - if ts.RateInterval == nil || ts.RateInterval.Weight < i.Weight { - ts.RateInterval = i - //log.Printf("RET TS: %+v", ts.RateInterval.Timing) +func (ts *TimeSpan) SetRateInterval(interval *RateInterval) { + if interval == nil { return } - iPrice, _, _ := i.GetRateParameters(ts.GetGroupStart()) - tsPrice, _, _ := ts.RateInterval.GetRateParameters(ts.GetGroupStart()) - if ts.RateInterval.Weight == i.Weight && iPrice <= tsPrice { - ts.RateInterval = i + if !ts.hasBetterRateIntervalThan(interval) { + ts.RateInterval = interval } - //log.Printf("END TS: %+v", ts.RateInterval.Timing) } // Returns the cost of the timespan according to the relevant cost interval. @@ -354,7 +343,7 @@ func (ts *TimeSpan) SplitByRateInterval(i *RateInterval, data bool) (nts *TimeSp for _, rate := range i.Rating.Rates { //Logger.Debug(fmt.Sprintf("Rate: %+v", rate)) if ts.GetGroupStart() < rate.GroupIntervalStart && ts.GetGroupEnd() > rate.GroupIntervalStart { - // Logger.Debug(fmt.Sprintf("Splitting")) + //log.Print("Splitting") ts.SetRateInterval(i) splitTime := ts.TimeStart.Add(rate.GroupIntervalStart - ts.GetGroupStart()) nts = &TimeSpan{ @@ -387,8 +376,7 @@ func (ts *TimeSpan) SplitByRateInterval(i *RateInterval, data bool) (nts *TimeSp // if only the start time is in the interval split the interval to the right if i.Contains(ts.TimeStart, false) { //log.Print("Start in interval") - splitTime := i.getRightMargin(ts.TimeStart) - + splitTime := i.Timing.getRightMargin(ts.TimeStart) ts.SetRateInterval(i) if splitTime == ts.TimeStart || splitTime.Equal(ts.TimeEnd) { return @@ -408,7 +396,7 @@ func (ts *TimeSpan) SplitByRateInterval(i *RateInterval, data bool) (nts *TimeSp if i.Contains(ts.TimeEnd, true) { //log.Print("End in interval") //tmpTime := time.Date(ts.TimeStart.) - splitTime := i.getLeftMargin(ts.TimeEnd) + splitTime := i.Timing.getLeftMargin(ts.TimeEnd) splitTime = utils.CopyHour(splitTime, ts.TimeStart) if splitTime.Equal(ts.TimeEnd) { return @@ -419,7 +407,6 @@ func (ts *TimeSpan) SplitByRateInterval(i *RateInterval, data bool) (nts *TimeSp } nts.copyRatingInfo(ts) ts.TimeEnd = splitTime - nts.SetRateInterval(i) nts.DurationIndex = ts.DurationIndex ts.SetNewDurationIndex(nts) @@ -429,6 +416,22 @@ func (ts *TimeSpan) SplitByRateInterval(i *RateInterval, data bool) (nts *TimeSp return } +/*func (ts *TimeSpan) SplitByTime(splitTime time.Time) (nts *TimeSpan) { + if splitTime.Equal(ts.TimeEnd) { + return + } + nts = &TimeSpan{ + TimeStart: splitTime, + TimeEnd: ts.TimeEnd, + } + nts.copyRatingInfo(ts) + ts.TimeEnd = splitTime + nts.SetRateInterval(ts.RateInterval) + nts.DurationIndex = ts.DurationIndex + ts.SetNewDurationIndex(nts) + return +}*/ + // Split the timespan at the given increment start func (ts *TimeSpan) SplitByIncrement(index int) *TimeSpan { if index <= 0 || index >= len(ts.Increments) { @@ -552,30 +555,46 @@ func (ts *TimeSpan) AddIncrement(inc *Increment) { } func (ts *TimeSpan) hasBetterRateIntervalThan(interval *RateInterval) bool { + if interval.Timing == nil { + return false + } + otherLeftMargin := interval.Timing.getLeftMargin(ts.TimeStart) + otherDistance := ts.TimeStart.Sub(otherLeftMargin) + //log.Print("OTHER LEFT: ", otherLeftMargin) + //log.Print("OTHER DISTANCE: ", otherDistance) + // if the distance is negative it's not usable + if otherDistance < 0 { + return true + } + //log.Print("RI: ", ts.RateInterval) if ts.RateInterval == nil { return false } - //log.Print("StartTime: ", ts.TimeStart) - //log.Printf("OWN: %+v", ts.RateInterval) - //log.Printf("OTHER: %+v", interval) + // the higher the weight the better if ts.RateInterval != nil && - ts.RateInterval.Weight > interval.Weight { - return true + ts.RateInterval.Weight < interval.Weight { + return false } // check interval is closer than the new one - ownLeftMargin := ts.RateInterval.getLeftMargin(ts.TimeStart) - otherLeftMargin := interval.getLeftMargin(ts.TimeStart) + ownLeftMargin := ts.RateInterval.Timing.getLeftMargin(ts.TimeStart) ownDistance := ts.TimeStart.Sub(ownLeftMargin) - otherDistance := ts.TimeStart.Sub(otherLeftMargin) - endOtherDistance := ts.TimeEnd.Sub(otherLeftMargin) - // if thr distance is negative relative to both ends it's not usable - if otherDistance < 0 && endOtherDistance < 0 { - return true - } + + //log.Print("OWN LEFT: ", otherLeftMargin) + //log.Print("OWN DISTANCE: ", otherDistance) + //endOtherDistance := ts.TimeEnd.Sub(otherLeftMargin) + // if own interval is closer than its better - if ownDistance <= otherDistance { + //log.Print(ownDistance) + if ownDistance > otherDistance { + return false + } + ownPrice, _, _ := ts.RateInterval.GetRateParameters(ts.GetGroupStart()) + otherPrice, _, _ := interval.GetRateParameters(ts.GetGroupStart()) + // if own price is smaller than it's better + //log.Print(ownPrice, otherPrice) + if ownPrice < otherPrice { return true } - return false + return true } diff --git a/engine/timespans_test.go b/engine/timespans_test.go index 468e2a51e..e2b21d1b2 100644 --- a/engine/timespans_test.go +++ b/engine/timespans_test.go @@ -217,7 +217,12 @@ func TestTimespanGetCost(t *testing.T) { if ts1.getCost() != 0 { t.Error("No interval and still kicking") } - ts1.SetRateInterval(&RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{0, 1.0, 1 * time.Second, 1 * time.Second}}}}) + ts1.SetRateInterval( + &RateInterval{ + Timing: &RITiming{}, + Rating: &RIRate{Rates: RateGroups{&Rate{0, 1.0, 1 * time.Second, 1 * time.Second}}}, + }, + ) if ts1.getCost() != 600 { t.Error("Expected 10 got ", ts1.Cost) } @@ -240,10 +245,18 @@ func TestTimespanGetCostIntervals(t *testing.T) { } func TestSetRateInterval(t *testing.T) { - i1 := &RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{0, 1.0, 1 * time.Second, 1 * time.Second}}}} + i1 := &RateInterval{ + Timing: &RITiming{}, + Rating: &RIRate{Rates: RateGroups{&Rate{0, 1.0, 1 * time.Second, 1 * time.Second}}}, + } ts1 := TimeSpan{RateInterval: i1} - i2 := &RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{0, 2.0, 1 * time.Second, 1 * time.Second}}}} - ts1.SetRateInterval(i2) + i2 := &RateInterval{ + Timing: &RITiming{}, + Rating: &RIRate{Rates: RateGroups{&Rate{0, 2.0, 1 * time.Second, 1 * time.Second}}}, + } + if !ts1.hasBetterRateIntervalThan(i2) { + ts1.SetRateInterval(i2) + } if ts1.RateInterval != i1 { t.Error("Smaller price interval should win") } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 32d53df70..e6ca07d64 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -1063,7 +1063,7 @@ func (tpr *TpReader) LoadUsersFiltered(filter *TpUser) (bool, error) { for _, tpUser := range tpUsers { user.Profile[tpUser.AttributeName] = tpUser.AttributeValue } - tpr.ratingStorage.SetUser(user) + tpr.accountingStorage.SetUser(user) return len(tpUsers) > 0, err } @@ -1306,7 +1306,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Users:") } for _, u := range tpr.users { - err = tpr.ratingStorage.SetUser(u) + err = tpr.accountingStorage.SetUser(u) if err != nil { return err } @@ -1459,6 +1459,14 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil + case utils.USERS_PREFIX: + keys := make([]string, len(tpr.users)) + i := 0 + for k := range tpr.users { + keys[i] = k + i++ + } + return keys, nil } return nil, errors.New("Unsupported category") } diff --git a/engine/users.go b/engine/users.go index 0cf821c74..894ba61f4 100644 --- a/engine/users.go +++ b/engine/users.go @@ -1,6 +1,7 @@ package engine import ( + "fmt" "sort" "strings" "sync" @@ -55,46 +56,83 @@ type UserService interface { GetUsers(UserProfile, *UserProfiles) error AddIndex([]string, *string) error GetIndexes(string, *map[string][]string) error + ReloadUsers(string, *string) error } type UserMap struct { - table map[string]map[string]string - index map[string]map[string]bool - indexKeys []string - ratingDb RatingStorage - mu sync.RWMutex + table map[string]map[string]string + index map[string]map[string]bool + indexKeys []string + accountingDb AccountingStorage + mu sync.RWMutex } -func NewUserMap(ratingDb RatingStorage) (*UserMap, error) { - um := newUserMap(ratingDb) - // load from rating db - if ups, err := um.ratingDb.GetUsers(); err == nil { - for _, up := range ups { - um.table[up.GetId()] = up.Profile - } - } else { +func NewUserMap(accountingDb AccountingStorage, indexes []string) (*UserMap, error) { + um := newUserMap(accountingDb, indexes) + var reply string + if err := um.ReloadUsers("", &reply); err != nil { return nil, err } return um, nil } -func newUserMap(ratingDb RatingStorage) *UserMap { +func newUserMap(accountingDb AccountingStorage, indexes []string) *UserMap { return &UserMap{ - table: make(map[string]map[string]string), - index: make(map[string]map[string]bool), - ratingDb: ratingDb, + table: make(map[string]map[string]string), + index: make(map[string]map[string]bool), + indexKeys: indexes, + accountingDb: accountingDb, } } +func (um *UserMap) ReloadUsers(in string, reply *string) error { + um.mu.Lock() + + // backup old data + oldTable := um.table + oldIndex := um.index + um.table = make(map[string]map[string]string) + um.index = make(map[string]map[string]bool) + + // load from rating db + if ups, err := um.accountingDb.GetUsers(); err == nil { + for _, up := range ups { + um.table[up.GetId()] = up.Profile + } + } else { + // restore old data before return + um.table = oldTable + um.index = oldIndex + + *reply = err.Error() + return err + } + um.mu.Unlock() + + if len(um.indexKeys) != 0 { + var s string + if err := um.AddIndex(um.indexKeys, &s); err != nil { + Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", um.indexKeys, err)) + um.table = oldTable + um.index = oldIndex + + *reply = err.Error() + return err + } + } + *reply = utils.OK + return nil +} + func (um *UserMap) SetUser(up UserProfile, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - if err := um.ratingDb.SetUser(&up); err != nil { + if err := um.accountingDb.SetUser(&up); err != nil { *reply = err.Error() return err } um.table[up.GetId()] = up.Profile - um.addIndex(&up) + um.addIndex(&up, um.indexKeys) *reply = utils.OK return nil } @@ -102,7 +140,7 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error { func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - if err := um.ratingDb.RemoveUser(up.GetId()); err != nil { + if err := um.accountingDb.RemoveUser(up.GetId()); err != nil { *reply = err.Error() return err } @@ -140,13 +178,13 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { UserName: up.UserName, Profile: m, } - if err := um.ratingDb.SetUser(finalUp); err != nil { + if err := um.accountingDb.SetUser(finalUp); err != nil { *reply = err.Error() return err } um.table[up.GetId()] = m um.deleteIndex(oldUp) - um.addIndex(finalUp) + um.addIndex(finalUp, um.indexKeys) *reply = utils.OK return nil } @@ -186,7 +224,7 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { } } - var candidates UserProfiles + candidates := make(UserProfiles, 0) // It should not return nil in case of no users but [] for key, values := range table { ponder := 0 tableUP := &UserProfile{ @@ -235,19 +273,19 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { func (um *UserMap) AddIndex(indexes []string, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - um.indexKeys = indexes + um.indexKeys = append(um.indexKeys, indexes...) for key, values := range um.table { up := &UserProfile{Profile: values} up.SetId(key) - um.addIndex(up) + um.addIndex(up, indexes) } *reply = utils.OK return nil } -func (um *UserMap) addIndex(up *UserProfile) { +func (um *UserMap) addIndex(up *UserProfile, indexes []string) { key := up.GetId() - for _, index := range um.indexKeys { + for _, index := range indexes { if index == "Tenant" { if up.Tenant != "" { indexKey := utils.ConcatenatedKey(index, up.Tenant) @@ -369,6 +407,10 @@ func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) er return ps.Client.Call("UsersV1.AddIndex", in, reply) } +func (ps *ProxyUserService) ReloadUsers(in string, reply *string) error { + return ps.Client.Call("UsersV1.ReloadUsers", in, reply) +} + // extraFields - Field name in the interface containing extraFields information func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) { if userService == nil { // no user service => no fun diff --git a/engine/users_test.go b/engine/users_test.go index 86699a9cb..42fd3d549 100644 --- a/engine/users_test.go +++ b/engine/users_test.go @@ -19,7 +19,7 @@ var testMap = UserMap{ } func TestUsersAdd(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -40,7 +40,7 @@ func TestUsersAdd(t *testing.T) { } func TestUsersUpdate(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -71,7 +71,7 @@ func TestUsersUpdate(t *testing.T) { } func TestUsersUpdateNotFound(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -89,7 +89,7 @@ func TestUsersUpdateNotFound(t *testing.T) { } func TestUsersUpdateInit(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -115,7 +115,7 @@ func TestUsersUpdateInit(t *testing.T) { } func TestUsersRemove(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -445,7 +445,7 @@ func TestUsersGetMissingIdTwoINdex(t *testing.T) { } func TestUsersAddUpdateRemoveIndexes(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string tm.AddIndex([]string{"t"}, &r) if len(tm.index) != 0 { diff --git a/general_tests/tutorial_fs_calls_test.go b/general_tests/tutorial_fs_calls_test.go index b55d1dbfe..fbdb3c590 100644 --- a/general_tests/tutorial_fs_calls_test.go +++ b/general_tests/tutorial_fs_calls_test.go @@ -326,7 +326,7 @@ func TestTutFsCallsCdrs(t *testing.T) { if reply[0].ReqType != utils.META_PREPAID { t.Errorf("Unexpected ReqType for CDR: %+v", reply[0]) } - if reply[0].Usage != "65" { // Usage as seconds + if reply[0].Usage != "65" && reply[0].Usage != "66" { // Usage as seconds t.Errorf("Unexpected Usage for CDR: %+v", reply[0]) } if reply[0].Cost != 0 { // Cost was not calculated @@ -386,7 +386,7 @@ func TestTutFsCallsCdrs(t *testing.T) { if reply[0].Destination != "1001" { t.Errorf("Unexpected Destination for CDR: %+v", reply[0]) } - if reply[0].Usage != "63" { // Usage as seconds + if reply[0].Usage != "63" && reply[0].Usage != "64" { // Usage as seconds, sometimes takes a second longer to disconnect t.Errorf("Unexpected Usage for CDR: %+v", reply[0]) } } @@ -405,7 +405,7 @@ func TestTutFsCallsCdrs(t *testing.T) { if reply[0].Destination != "1001" { t.Errorf("Unexpected Destination for CDR: %+v", reply[0]) } - if reply[0].Usage != "62" { // Usage as seconds + if reply[0].Usage != "62" && reply[0].Usage != "63" { // Usage as seconds t.Errorf("Unexpected Usage for CDR: %+v", reply[0]) } } @@ -424,7 +424,7 @@ func TestTutFsCallsCdrs(t *testing.T) { if reply[0].Destination != "1002" { t.Errorf("Unexpected Destination for CDR: %+v", reply[0]) } - if reply[0].Usage != "64" { // Usage as seconds + if reply[0].Usage != "64" && reply[0].Usage != "65" { // Usage as seconds t.Errorf("Unexpected Usage for CDR: %+v", reply[0]) } if reply[0].Cost == -1.0 { // Cost was not calculated @@ -446,7 +446,7 @@ func TestTutFsCallsCdrs(t *testing.T) { if reply[0].Destination != "1002" { t.Errorf("Unexpected Destination for CDR: %+v", reply[0]) } - if reply[0].Usage != "66" { // Usage as seconds + if reply[0].Usage != "66" && reply[0].Usage != "67" { // Usage as seconds t.Errorf("Unexpected Usage for CDR: %+v", reply[0]) } if reply[0].Cost == -1.0 { // Cost was not calculated diff --git a/general_tests/tutorial_kam_calls_test.go b/general_tests/tutorial_kam_calls_test.go index 93d966410..a4dba99b3 100644 --- a/general_tests/tutorial_kam_calls_test.go +++ b/general_tests/tutorial_kam_calls_test.go @@ -595,7 +595,6 @@ func TestTutKamCallsStopPjsuaListener(t *testing.T) { time.Sleep(time.Duration(1) * time.Second) // Allow pjsua to finish it's tasks, eg un-REGISTER } -/* func TestTutKamCallsStopCgrEngine(t *testing.T) { if !*testCalls { return @@ -611,4 +610,3 @@ func TestTutKamCallsStopKam(t *testing.T) { } engine.KillProcName("kamailio", 1000) } -*/ diff --git a/general_tests/tutorial_local_test.go b/general_tests/tutorial_local_test.go index adebc3cf0..041a25a76 100644 --- a/general_tests/tutorial_local_test.go +++ b/general_tests/tutorial_local_test.go @@ -117,7 +117,7 @@ func TestTutLocalCacheStats(t *testing.T) { } var rcvStats *utils.CacheStats expectedStats := &utils.CacheStats{Destinations: 4, RatingPlans: 3, RatingProfiles: 8, Actions: 7, SharedGroups: 1, RatingAliases: 1, AccountAliases: 1, - DerivedChargers: 1, LcrProfiles: 4, CdrStats: 6} + DerivedChargers: 1, LcrProfiles: 4, CdrStats: 6, Users: 2} var args utils.AttrCacheStats if err := tutLocalRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) @@ -182,6 +182,18 @@ func TestTutLocalGetCachedItemAge(t *testing.T) { */ } +func TestTutLocalGetUsers(t *testing.T) { + if !*testLocal { + return + } + var users engine.UserProfiles + if err := tutLocalRpc.Call("UsersV1.GetUsers", engine.UserProfile{}, &users); err != nil { + t.Error("Got error on UsersV1.GetUsers: ", err.Error()) + } else if len(users) != 2 { + t.Error("Calling UsersV1.GetUsers got users:", len(users)) + } +} + // Check call costs func TestTutLocalGetCosts(t *testing.T) { if !*testLocal { @@ -206,6 +218,24 @@ func TestTutLocalGetCosts(t *testing.T) { } else if cc.Cost != 0.6 { t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) } + // Make sure that the same cost is returned via users aliasing + cd = engine.CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: utils.USERS, + Subject: utils.USERS, + Account: utils.USERS, + Destination: "1002", + DurationIndex: 0, + TimeStart: tStart, + TimeEnd: tEnd, + ExtraFields: map[string]string{"Uuid": "388539dfd4f5cefee8f488b78c6c244b9e19138e"}, + } + if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil { + t.Error("Got error on Responder.GetCost: ", err.Error()) + } else if cc.Cost != 0.6 { + t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) + } tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z") cd = engine.CallDescriptor{ diff --git a/general_tests/tutorial_osips_calls_test.go b/general_tests/tutorial_osips_calls_test.go index f3195522e..31b8b0b60 100644 --- a/general_tests/tutorial_osips_calls_test.go +++ b/general_tests/tutorial_osips_calls_test.go @@ -71,7 +71,6 @@ func TestTutOsipsCallsResetStorDb(t *testing.T) { } } -/* // start Kam server func TestTutOsipsCallsStartOsips(t *testing.T) { if !*testCalls { @@ -82,7 +81,6 @@ func TestTutOsipsCallsStartOsips(t *testing.T) { t.Fatal(err) } } -*/ // Start CGR Engine func TestTutOsipsCallsStartEngine(t *testing.T) { @@ -358,7 +356,7 @@ func TestTutOsipsCallsCdrs(t *testing.T) { if reply[0].Destination != "1001" { t.Errorf("Unexpected Destination for CDR: %+v", reply[0]) } - if reply[0].Usage != "62" { // Usage as seconds + if reply[0].Usage != "62" && reply[0].Usage != "63" { // Usage as seconds t.Errorf("Unexpected Usage for CDR: %+v", reply[0]) } } @@ -448,11 +446,9 @@ func TestTutOsipsCallsStopCgrEngine(t *testing.T) { } } -/* func TestTutOsipsCallsStopOpensips(t *testing.T) { if !*testCalls { return } engine.KillProcName("opensips", 100) } -*/ diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 2a082891a..913cc0ab7 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -220,13 +220,15 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { if err := sm.rater.GetDerivedMaxSessionTime(ev.AsStoredCdr(), &maxCallDuration); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not get max session time for %s, error: %s", ev.GetUUID(), err.Error())) } - maxCallDur := time.Duration(maxCallDuration) - if maxCallDur <= sm.cfg.MinCallDuration { - //engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject))) - sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS) - return + if maxCallDuration != -1 { // For calls different than unlimited, set limits + maxCallDur := time.Duration(maxCallDuration) + if maxCallDur <= sm.cfg.MinCallDuration { + //engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject))) + sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS) + return + } + sm.setMaxCallDuration(ev.GetUUID(), connId, maxCallDur) } - sm.setMaxCallDuration(ev.GetUUID(), connId, maxCallDur) // ComputeLcr if ev.ComputeLcr() { cd, err := fsev.AsCallDescriptor() @@ -341,7 +343,7 @@ func (sm *FSSessionManager) Shutdown() (err error) { continue } engine.Logger.Info(fmt.Sprintf(" Shutting down all sessions on connection id: %s", connId)) - if _, err = fSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype prepaid"); err != nil { + if _, err = fSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype *prepaid"); err != nil { engine.Logger.Err(fmt.Sprintf(" Error on calls shutdown: %s, connection id: %s", err.Error(), connId)) } }