diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 8f894acab..9901cb1ed 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -48,6 +48,7 @@ information, please see the [`CONTRIBUTING.md`](CONTRIBUTING.md) file. | @razvancrainea | Răzvan Crainea | | @marcinkowalczyk | Marcin Kowalczyk | | @andmar | André Maricato | +| @brendangilmore | Brendan Gilmore | diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index eaaa8c6bc..90175a08e 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -79,7 +79,7 @@ func TestApierCreateDirs(t *testing.T) { if !*testLocal { return } - for _, pathDir := range []string{cfg.CdreProfiles[utils.META_DEFAULT].ExportFolder, "/var/log/cgrates/cdrc/in", "/var/log/cgrates/cdrc/out", cfg.HistoryDir} { + for _, pathDir := range []string{cfg.CdreProfiles[utils.META_DEFAULT].ExportDirectory, "/var/log/cgrates/cdrc/in", "/var/log/cgrates/cdrc/out", cfg.HistoryDir} { if err := os.RemoveAll(pathDir); err != nil { t.Fatal("Error removing folder: ", pathDir, err) diff --git a/apier/v1/cdre.go b/apier/v1/cdre.go index 224058cfc..fd4957e6e 100644 --- a/apier/v1/cdre.go +++ b/apier/v1/cdre.go @@ -120,7 +120,7 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E return fmt.Errorf("%s:FieldSeparator:%s", utils.ErrServerError.Error(), "Invalid") } } - exportDir := exportTemplate.ExportFolder + exportDir := exportTemplate.ExportDirectory if attr.ExportDir != nil && len(*attr.ExportDir) != 0 { exportDir = *attr.ExportDir } diff --git a/apier/v2/accounts.go b/apier/v2/accounts.go index 1003c0089..e9c7adfd3 100644 --- a/apier/v2/accounts.go +++ b/apier/v2/accounts.go @@ -28,7 +28,7 @@ import ( func (self *ApierV2) GetAccounts(attr utils.AttrGetAccounts, reply *[]*engine.Account) error { if len(attr.Tenant) == 0 { - return utils.NewErrMandatoryIeMissing("Tenanat") + return utils.NewErrMandatoryIeMissing("Tenant") } var accountKeys []string var err error diff --git a/apier/v2/cdre.go b/apier/v2/cdre.go index 31ea8f43e..e0fe280e1 100644 --- a/apier/v2/cdre.go +++ b/apier/v2/cdre.go @@ -56,9 +56,9 @@ func (self *ApierV2) ExportCdrsToFile(attr utils.AttrExportCdrsToFile, reply *ut return fmt.Errorf("%s:FieldSeparator:%s", utils.ErrServerError, "Invalid") } } - ExportFolder := exportTemplate.ExportFolder - if attr.ExportFolder != nil && len(*attr.ExportFolder) != 0 { - ExportFolder = *attr.ExportFolder + eDir := exportTemplate.ExportDirectory + if attr.ExportDirectory != nil && len(*attr.ExportDirectory) != 0 { + eDir = *attr.ExportDirectory } ExportID := strconv.FormatInt(time.Now().Unix(), 10) if attr.ExportID != nil && len(*attr.ExportID) != 0 { @@ -68,7 +68,7 @@ func (self *ApierV2) ExportCdrsToFile(attr utils.AttrExportCdrsToFile, reply *ut if attr.ExportFileName != nil && len(*attr.ExportFileName) != 0 { fileName = *attr.ExportFileName } - filePath := path.Join(ExportFolder, fileName) + filePath := path.Join(eDir, fileName) if cdrFormat == utils.DRYRUN { filePath = utils.DRYRUN } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a9b654120..0838a4ce3 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -202,9 +202,9 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC exitChan <- true } -func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { +func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { utils.Logger.Info("Starting CGRateS SMFreeSWITCH service.") - var ralsConn, cdrsConn *rpcclient.RpcClientPool + var ralsConn, cdrsConn, rlsConn *rpcclient.RpcClientPool if len(cfg.SmFsConfig.RALsConns) != 0 { ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SmFsConfig.RALsConns, internalRaterChan, cfg.InternalTtl) @@ -223,7 +223,16 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcCli return } } - sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, ralsConn, cdrsConn, cfg.DefaultTimezone) + if len(cfg.SmFsConfig.RLsConns) != 0 { + rlsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.SmFsConfig.RLsConns, rlsChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to RLs: %s", err.Error())) + exitChan <- true + return + } + } + sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, ralsConn, cdrsConn, rlsConn, cfg.DefaultTimezone) smRpc.SMs = append(smRpc.SMs, sm) if err = sm.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) @@ -640,7 +649,7 @@ func main() { } // Start SM-FreeSWITCH if cfg.SmFsConfig.Enabled { - go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, cdrDb, exitChan) + go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, internalRLSChan, cdrDb, exitChan) // close all sessions on shutdown go shutdownSessionmanagerSingnalHandler(exitChan) } diff --git a/config/cdreconfig.go b/config/cdreconfig.go index f55bad28e..50285b700 100644 --- a/config/cdreconfig.go +++ b/config/cdreconfig.go @@ -31,7 +31,7 @@ type CdreConfig struct { CostShiftDigits int MaskDestinationID string MaskLength int - ExportFolder string + ExportDirectory string HeaderFields []*CfgCdrField ContentFields []*CfgCdrField TrailerFields []*CfgCdrField @@ -76,8 +76,8 @@ func (self *CdreConfig) loadFromJsonCfg(jsnCfg *CdreJsonCfg) error { if jsnCfg.Mask_length != nil { self.MaskLength = *jsnCfg.Mask_length } - if jsnCfg.Export_folder != nil { - self.ExportFolder = *jsnCfg.Export_folder + if jsnCfg.Export_directory != nil { + self.ExportDirectory = *jsnCfg.Export_directory } if jsnCfg.Header_fields != nil { if self.HeaderFields, err = CfgCdrFieldsFromCdrFieldsJsonCfg(*jsnCfg.Header_fields); err != nil { @@ -111,7 +111,7 @@ func (self *CdreConfig) Clone() *CdreConfig { clnCdre.CostShiftDigits = self.CostShiftDigits clnCdre.MaskDestinationID = self.MaskDestinationID clnCdre.MaskLength = self.MaskLength - clnCdre.ExportFolder = self.ExportFolder + clnCdre.ExportDirectory = self.ExportDirectory clnCdre.HeaderFields = make([]*CfgCdrField, len(self.HeaderFields)) for idx, fld := range self.HeaderFields { clonedVal := *fld diff --git a/config/cdreconfig_test.go b/config/cdreconfig_test.go index ec236ff6d..474474f3b 100644 --- a/config/cdreconfig_test.go +++ b/config/cdreconfig_test.go @@ -30,11 +30,11 @@ func TestCdreCfgClone(t *testing.T) { emptyFields := []*CfgCdrField{} initContentFlds := []*CfgCdrField{ &CfgCdrField{Tag: "CgrId", - Type: "cdrfield", + Type: "*composed", FieldId: "cgrid", Value: cgrIdRsrs}, &CfgCdrField{Tag: "RunId", - Type: "cdrfield", + Type: "*composed", FieldId: "mediation_runid", Value: runIdRsrs}, } @@ -47,16 +47,16 @@ func TestCdreCfgClone(t *testing.T) { CostShiftDigits: 0, MaskDestinationID: "MASKED_DESTINATIONS", MaskLength: 0, - ExportFolder: "/var/spool/cgrates/cdre", + ExportDirectory: "/var/spool/cgrates/cdre", ContentFields: initContentFlds, } eClnContentFlds := []*CfgCdrField{ &CfgCdrField{Tag: "CgrId", - Type: "cdrfield", + Type: "*composed", FieldId: "cgrid", Value: cgrIdRsrs}, &CfgCdrField{Tag: "RunId", - Type: "cdrfield", + Type: "*composed", FieldId: "mediation_runid", Value: runIdRsrs}, } @@ -69,7 +69,7 @@ func TestCdreCfgClone(t *testing.T) { CostShiftDigits: 0, MaskDestinationID: "MASKED_DESTINATIONS", MaskLength: 0, - ExportFolder: "/var/spool/cgrates/cdre", + ExportDirectory: "/var/spool/cgrates/cdre", HeaderFields: emptyFields, ContentFields: eClnContentFlds, TrailerFields: emptyFields, diff --git a/config/config.go b/config/config.go index 4d0a197ea..36d62db9c 100644 --- a/config/config.go +++ b/config/config.go @@ -388,6 +388,11 @@ func (self *CGRConfig) checkConfigSanity() error { return errors.New("CDRS not enabled but referenced by SMFreeSWITCH component") } } + for _, smFSRLsConn := range self.SmFsConfig.RLsConns { + if smFSRLsConn.Address == utils.MetaInternal && !self.resourceLimiterCfg.Enabled { + return errors.New("RLs not enabled but referenced by SMFreeSWITCH component") + } + } } // SM-Kamailio checks if self.SmKamConfig.Enabled { diff --git a/config/config_defaults.go b/config/config_defaults.go index b9d7ea48d..2ff17f04a 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -207,7 +207,7 @@ const CGRATES_CFG_JSON = ` "cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents) "mask_destination_id": "MASKED_DESTINATIONS", // destination id containing called addresses to be masked on export "mask_length": 0, // length of the destination suffix to be masked - "export_folder": "/var/spool/cgrates/cdre", // path where the exported CDRs will be placed + "export_directory": "/var/spool/cgrates/cdre", // path where the exported CDRs will be placed "header_fields": [], // template of the exported header fields "content_fields": [ // template of the exported content fields {"tag": "CGRID", "type": "*composed", "value": "CGRID"}, @@ -250,13 +250,14 @@ const CGRATES_CFG_JSON = ` "sm_freeswitch": { - "enabled": false, // starts SessionManager service: + "enabled": false, // starts SessionManager service: "rals_conns": [ - {"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013> + {"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013> ], "cdrs_conns": [ - {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234> + {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234> ], + "rls_conns": [], // address where to reach the ResourceLimiter service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> "create_cdr": false, // create CDR out of events and sends them to CDRS component "extra_fields": [], // extra fields to store in auth/CDRs when creating them "debit_interval": "10s", // interval to perform debits on. @@ -386,8 +387,8 @@ const CGRATES_CFG_JSON = ` "rls": { "enabled": false, // starts ResourceLimiter service: . "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> - "cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|dur> - "usage_ttl": "3h", // expire usage records if older than this duration <""|*never|dur> + "cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur> + "usage_ttl": "3h", // expire usage records if older than this duration <""|*never|$dur> }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 6a478b12f..3c48253c2 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -249,7 +249,7 @@ func TestDfCdreJsonCfgs(t *testing.T) { Cost_shift_digits: utils.IntPointer(0), Mask_destination_id: utils.StringPointer("MASKED_DESTINATIONS"), Mask_length: utils.IntPointer(0), - Export_folder: utils.StringPointer("/var/spool/cgrates/cdre"), + Export_directory: utils.StringPointer("/var/spool/cgrates/cdre"), Header_fields: &eFields, Content_fields: &eContentFlds, Trailer_fields: &eFields, @@ -412,6 +412,7 @@ func TestSmFsJsonCfg(t *testing.T) { &HaPoolJsonCfg{ Address: utils.StringPointer(utils.MetaInternal), }}, + Rls_conns: &[]*HaPoolJsonCfg{}, Create_cdr: utils.BoolPointer(false), Extra_fields: utils.StringSlicePointer([]string{}), Debit_interval: utils.StringPointer("10s"), diff --git a/config/libconfig_json.go b/config/libconfig_json.go index bd6495a48..b3a1aadc6 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -140,7 +140,7 @@ type CdreJsonCfg struct { Cost_shift_digits *int Mask_destination_id *string Mask_length *int - Export_folder *string + Export_directory *string Header_fields *[]*CdrFieldJsonCfg Content_fields *[]*CdrFieldJsonCfg Trailer_fields *[]*CdrFieldJsonCfg @@ -192,6 +192,7 @@ type SmFsJsonCfg struct { Enabled *bool Rals_conns *[]*HaPoolJsonCfg Cdrs_conns *[]*HaPoolJsonCfg + Rls_conns *[]*HaPoolJsonCfg Create_cdr *bool Extra_fields *[]string Debit_interval *string diff --git a/config/smconfig.go b/config/smconfig.go index 9205baba4..0954967f5 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -163,6 +163,7 @@ type SmFsConfig struct { Enabled bool RALsConns []*HaPoolConfig CDRsConns []*HaPoolConfig + RLsConns []*HaPoolConfig CreateCdr bool ExtraFields []*utils.RSRField DebitInterval time.Duration @@ -200,6 +201,13 @@ func (self *SmFsConfig) loadFromJsonCfg(jsnCfg *SmFsJsonCfg) error { self.CDRsConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCfg.Rls_conns != nil { + self.RLsConns = make([]*HaPoolConfig, len(*jsnCfg.Rls_conns)) + for idx, jsnHaCfg := range *jsnCfg.Rls_conns { + self.RLsConns[idx] = NewDfltHaPoolConfig() + self.RLsConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } if jsnCfg.Create_cdr != nil { self.CreateCdr = *jsnCfg.Create_cdr } diff --git a/data/conf/samples/osips_training/cgrates.json b/data/conf/samples/osips_training/cgrates.json index dd77b9d91..6286bfdfd 100644 --- a/data/conf/samples/osips_training/cgrates.json +++ b/data/conf/samples/osips_training/cgrates.json @@ -6,6 +6,12 @@ // This file contains the default configuration hardcoded into CGRateS. // This is what you get when you load CGRateS with an empty configuration file. +"listen": { + "rpc_json": ":2012", // RPC JSON listening address + "rpc_gob": ":2013", // RPC GOB listening address + "http": ":2080", // HTTP listening address +}, + "rals": { "enabled": true, @@ -31,8 +37,10 @@ "sm_opensips": { "enabled": true, // starts SessionManager service: + "listen_udp": ":2020", // address where to listen for datagram events coming from OpenSIPS "create_cdr": true, // create CDR out of events and sends them to CDRS component "debit_interval": "5s", // interval to perform debits on. + "mi_addr": "192.168.56.128:8020", // address where to reach OpenSIPS MI to send session disconnects }, } diff --git a/data/docker/osips_training/start.sh b/data/docker/osips_training/start.sh index dd4d1e81d..95fcbecb6 100755 --- a/data/docker/osips_training/start.sh +++ b/data/docker/osips_training/start.sh @@ -2,16 +2,16 @@ sed -i 's/127.0.0.1/0.0.0.0/g' /etc/redis/redis.conf /etc/mysql/my.cnf # start services -/etc/init.d/rsyslog start -/etc/init.d/mysql start -/etc/init.d/redis-server start +service rsyslog start +service mysql start +service redis-server start /root/code/bin/cgr-engine -config_dir /root/cgr/data/conf/samples/osips_training # setup mysql cd /usr/share/cgrates/storage/mysql && ./setup_cgr_db.sh root CGRateS.org # load tariff plan data -cd /root/cgr/data/tariffplans/osips_training; cgr-loader +#cd /root/cgr/data/tariffplans/osips_training; cgr-loader cd /root/cgr DISABLE_AUTO_UPDATE="true" zsh diff --git a/data/tariffplans/tutorial/ResourceLimits.csv b/data/tariffplans/tutorial/ResourceLimits.csv index 53cb69b4d..3fa5dfcd8 100644 --- a/data/tariffplans/tutorial/ResourceLimits.csv +++ b/data/tariffplans/tutorial/ResourceLimits.csv @@ -1,6 +1,6 @@ #Id,FilterType,FilterFieldName,FilterValues,ActivationTime,Weight,Limit,ActionTriggers ResGroup1,*string,Account,1001;1002,2014-07-29T15:00:00Z,10,2, ResGroup1,*string_prefix,Destination,10;20,2014-07-29T15:00:00Z,10,, -ResGroup1,*cdr_stats,,CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20,,,, ResGroup1,*rsr_fields,,Subject(~^1.*1$);Destination(1002),,,, -ResGroup2,*destinations,Destination,DST_FS,2014-07-29T15:00:00Z,10,2, \ No newline at end of file +ResGroup2,*destinations,Destination,DST_FS,2014-07-29T15:00:00Z,10,2, +ResGroup2,*cdr_stats,,CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20,,,, diff --git a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json index 1425caab7..6cc85113e 100644 --- a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -45,6 +45,14 @@ }, +"rls": { + "enabled": true, // starts ResourceLimiter service: . + "cdrstats_conns": [ + {"address": "*internal"} + ], +}, + + "cdre": { "*default": { "cdr_format": "csv", // exported CDRs format @@ -60,21 +68,21 @@ "export_dir": "/tmp/cgr_fsevsock/cgrates/cdre", // path where the exported CDRs will be placed "header_fields": [], // template of the exported header fields "content_fields": [ // template of the exported content fields - {"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"}, - {"tag":"RunId", "cdr_field_id": "mediation_runid", "type": "cdrfield", "value": "mediation_runid"}, - {"tag":"Tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "tor"}, - {"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"}, - {"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"}, - {"tag":"Direction", "cdr_field_id": "direction", "type": "cdrfield", "value": "direction"}, - {"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"}, - {"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"}, - {"tag":"Account", "cdr_field_id": "account", "type": "cdrfield", "value": "account"}, - {"tag":"Subject", "cdr_field_id": "subject", "type": "cdrfield", "value": "subject"}, - {"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "destination"}, - {"tag":"SetupTime", "cdr_field_id": "setup_time", "type": "cdrfield", "value": "setup_time", "layout": "2006-01-02T15:04:05Z07:00"}, - {"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"}, - {"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"}, - {"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"}, + {"tag": "CgrId", "type": "*composed", "value": "CGRID"}, + {"tag":"RunId", "type": "*composed", "value": "RunID"}, + {"tag":"Tor", "type": "cdrfield", "value": "ToR"}, + {"tag":"AccId", "type": "*composed", "value": "OriginID"}, + {"tag":"ReqType", "type": "*composed", "value": "RequestType"}, + {"tag":"Direction", "type": "*composed", "value": "Direction"}, + {"tag":"Tenant", "type": "*composed", "value": "Tenant"}, + {"tag":"Category", "type": "*composed", "value": "Category"}, + {"tag":"Account", "type": "*composed", "value": "Account"}, + {"tag":"Subject", "type": "*composed", "value": "Subject"}, + {"tag":"Destination", "type": "*composed", "value": "Destination"}, + {"tag":"SetupTime", "type": "*datetime", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"Usage", "type": "*datetime", "value": "Usage"}, + {"tag":"Cost", "type": "*composed", "value": "Cost"}, ], "trailer_fields": [], // template of the exported trailer fields }, @@ -92,16 +100,16 @@ "export_dir": "/tmp/cgr_fsevsock/cgrates/cdre", // path where the exported CDRs will be placed "header_fields": [], // template of the exported header fields "content_fields": [ // template of the exported content fields - {"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"}, - {"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"}, - {"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"}, - {"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"}, - {"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"}, - {"tag":"Subject", "cdr_field_id": "account", "type": "cdrfield", "value": "account"}, - {"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "~destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"}, - {"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"}, - {"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"}, - {"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"}, + {"tag": "CgrId", "type": "*composed", "value": "CGRID"}, + {"tag":"AccId", "type": "*composed", "value": "OriginID"}, + {"tag":"ReqType", "type": "*composed", "value": "RequestType"}, + {"tag":"Tenant", "type": "*composed", "value": "Tenant"}, + {"tag":"Category", "type": "*composed", "value": "Category"}, + {"tag":"Subject", "type": "*composed", "value": "Account"}, + {"tag":"Destination", "type": "*composed", "value": "~Destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"}, + {"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTIme", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"Usage", "type": "*composed", "value": "Usage"}, + {"tag":"Cost", "type": "*composed", "value": "Cost"}, ], "trailer_fields": [], }, @@ -111,6 +119,9 @@ "sm_freeswitch": { "enabled": true, // starts SessionManager service: "debit_interval": "5s", // interval to perform debits on. + "rls_conns": [ + {"address": "*internal"} + ], "channel_sync_interval": "10s", "event_socket_conns":[ // instantiate connections to multiple FreeSWITCH servers {"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5} diff --git a/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json b/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json index 3a3764e9c..fd0b40bef 100644 --- a/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json @@ -60,21 +60,21 @@ "export_dir": "/tmp/cgr_kamevapi/cgrates/cdre", "header_fields": [], "content_fields": [ - {"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"}, - {"tag":"RunId", "cdr_field_id": "mediation_runid", "type": "cdrfield", "value": "mediation_runid"}, - {"tag":"Tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "tor"}, - {"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"}, - {"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"}, - {"tag":"Direction", "cdr_field_id": "direction", "type": "cdrfield", "value": "direction"}, - {"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"}, - {"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"}, - {"tag":"Account", "cdr_field_id": "account", "type": "cdrfield", "value": "account"}, - {"tag":"Subject", "cdr_field_id": "subject", "type": "cdrfield", "value": "subject"}, - {"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "destination"}, - {"tag":"SetupTime", "cdr_field_id": "setup_time", "type": "cdrfield", "value": "setup_time", "layout": "2006-01-02T15:04:05Z07:00"}, - {"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"}, - {"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"}, - {"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"}, + {"tag": "CgrId", "type": "*composed", "value": "CGRID"}, + {"tag":"RunId", "type": "*composed", "value": "RunID"}, + {"tag":"Tor", "type": "*composed", "value": "ToR"}, + {"tag":"AccId", "type": "*composed", "value": "OriginID"}, + {"tag":"ReqType", "type": "*composed", "value": "RequestType"}, + {"tag":"Direction", "type": "*composed", "value": "Direction"}, + {"tag":"Tenant", "type": "*composed", "value": "Tenant"}, + {"tag":"Category", "type": "*composed", "value": "Category"}, + {"tag":"Account", "type": "*composed", "value": "Account"}, + {"tag":"Subject", "type": "*composed", "value": "Subject"}, + {"tag":"Destination", "type": "*composed", "value": "Destination"}, + {"tag":"SetupTime", "type": "*datetime", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"Usage", "type": "*composed", "value": "usage"}, + {"tag":"Cost", "type": "*composed", "value": "cost"}, ], "trailer_fields": [], }, @@ -92,16 +92,16 @@ "export_dir": "/tmp/cgr_kamevapi/cgrates/cdre", "header_fields": [], "content_fields": [ - {"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"}, - {"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"}, - {"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"}, - {"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"}, - {"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"}, - {"tag":"Subject", "cdr_field_id": "account", "type": "cdrfield", "value": "account"}, - {"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "~destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"}, - {"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"}, - {"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"}, - {"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"}, + {"tag": "CgrId", "type": "*composed", "value": "CGRID"}, + {"tag":"AccId", "type": "*composed", "value": "OriginID"}, + {"tag":"ReqType", "type": "*composed", "value": "RequestType"}, + {"tag":"Tenant", "type": "*composed", "value": "Tenant"}, + {"tag":"Category", "type": "*composed", "value": "Category"}, + {"tag":"Subject", "type": "*composed", "value": "Account"}, + {"tag":"Destination", "type": "*composed", "value": "~Destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"}, + {"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"Usage", "type": "*composed", "value": "Usage"}, + {"tag":"Cost", "type": "*composed", "value": "Cost"}, ], "trailer_fields": [], } diff --git a/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json b/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json index 225830f24..62c8b9518 100644 --- a/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/osips_async/cgrates/etc/cgrates/cgrates.json @@ -60,21 +60,21 @@ "export_dir": "/tmp/cgr_osipsasync/cgrates/cdre", // path where the exported CDRs will be placed "header_fields": [], // template of the exported header fields "content_fields": [ // template of the exported content fields - {"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"}, - {"tag":"RunId", "cdr_field_id": "mediation_runid", "type": "cdrfield", "value": "mediation_runid"}, - {"tag":"Tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "tor"}, - {"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"}, - {"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"}, - {"tag":"Direction", "cdr_field_id": "direction", "type": "cdrfield", "value": "direction"}, - {"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"}, - {"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"}, - {"tag":"Account", "cdr_field_id": "account", "type": "cdrfield", "value": "account"}, - {"tag":"Subject", "cdr_field_id": "subject", "type": "cdrfield", "value": "subject"}, - {"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "destination"}, - {"tag":"SetupTime", "cdr_field_id": "setup_time", "type": "cdrfield", "value": "setup_time", "layout": "2006-01-02T15:04:05Z07:00"}, - {"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"}, - {"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"}, - {"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"}, + {"tag": "CgrId", "type": "*composed", "value": "CGRID"}, + {"tag":"RunId", "type": "*composed", "value": "RunID"}, + {"tag":"Tor", "type": "*composed", "value": "ToR"}, + {"tag":"AccId", "type": "*composed", "value": "OriginID"}, + {"tag":"ReqType", "type": "*composed", "value": "RequestType"}, + {"tag":"Direction", "type": "*composed", "value": "Direction"}, + {"tag":"Tenant", "type": "*composed", "value": "Tenant"}, + {"tag":"Category", "type": "*composed", "value": "Category"}, + {"tag":"Account", "type": "*composed", "value": "Account"}, + {"tag":"Subject", "type": "*composed", "value": "Subject"}, + {"tag":"Destination", "type": "*composed", "value": "Destination"}, + {"tag":"SetupTime", "type": "*datetime", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"Usage", "type": "*composed", "value": "Usage"}, + {"tag":"Cost", "type": "*composed", "value": "Cost"}, ], "trailer_fields": [], // template of the exported trailer fields }, @@ -89,19 +89,19 @@ "cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents) "mask_destination_id": "MASKED_DESTINATIONS", // destination id containing called addresses to be masked on export "mask_length": 0, // length of the destination suffix to be masked - "export_dir": "/tmp/cgr_osipsasync/cgrates/cdre", // path where the exported CDRs will be placed + "export_directory": "/tmp/cgr_osipsasync/cgrates/cdre", // path where the exported CDRs will be placed "header_fields": [], // template of the exported header fields "content_fields": [ // template of the exported content fields - {"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"}, - {"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"}, - {"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"}, - {"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"}, - {"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"}, - {"tag":"Subject", "cdr_field_id": "account", "type": "cdrfield", "value": "account"}, - {"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "~destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"}, - {"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"}, - {"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"}, - {"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"}, + {"tag": "CgrId", "type": "*composed", "value": "CGRID"}, + {"tag":"AccId", "type": "*composed", "value": "OriginID"}, + {"tag":"ReqType", "type": "*composed", "value": "RequestType"}, + {"tag":"Tenant", "type": "*composed", "value": "Tenant"}, + {"tag":"Category", "type": "*composed", "value": "Category"}, + {"tag":"Subject", "type": "*composed", "value": "Account"}, + {"tag":"Destination", "type": "*datetime", "value": "~Destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"}, + {"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag":"Usage", "type": "*composed", "value": "Usage"}, + {"tag":"Cost", "type": "*composed", "value": "Cost"}, ], "trailer_fields": [], }, diff --git a/engine/model_converters.go b/engine/model_converters.go index e869ba856..72beeb563 100644 --- a/engine/model_converters.go +++ b/engine/model_converters.go @@ -411,7 +411,7 @@ func APItoModelUsers(attr *utils.TPUsers) (result []TpUser) { } func APItoResourceLimit(tpRL *utils.TPResourceLimit, timezone string) (rl *ResourceLimit, err error) { - rl = &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, Filters: make([]*RequestFilter, len(tpRL.Filters))} + rl = &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, Filters: make([]*RequestFilter, len(tpRL.Filters)), Usage: make(map[string]*ResourceUsage)} for i, tpFltr := range tpRL.Filters { rf := &RequestFilter{Type: tpFltr.Type, FieldName: tpFltr.FieldName, Values: tpFltr.Values} if err := rf.CompileValues(); err != nil { diff --git a/engine/model_converters_test.go b/engine/model_converters_test.go index e40fe695a..83dff5449 100644 --- a/engine/model_converters_test.go +++ b/engine/model_converters_test.go @@ -21,7 +21,7 @@ func TestAPItoResourceLimit(t *testing.T) { Weight: 10, Limit: "2", } - eRL := &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, Filters: make([]*RequestFilter, len(tpRL.Filters))} + eRL := &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, Filters: make([]*RequestFilter, len(tpRL.Filters)), Usage: make(map[string]*ResourceUsage)} eRL.Filters[0] = &RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}} eRL.Filters[1] = &RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"10", "20"}} eRL.Filters[2] = &RequestFilter{Type: MetaCDRStats, Values: []string{"CDRST1:*min_ASR:34", "CDRST_1001:*min_ASR:20"}, diff --git a/engine/reqfilter.go b/engine/reqfilter.go index 9c709b645..e08787e0e 100644 --- a/engine/reqfilter.go +++ b/engine/reqfilter.go @@ -126,6 +126,9 @@ func (fltr *RequestFilter) Pass(req interface{}, extraFieldsLabel string, cdrSta func (fltr *RequestFilter) passString(req interface{}, extraFieldsLabel string) (bool, error) { strVal, err := utils.ReflectFieldAsString(req, fltr.FieldName, extraFieldsLabel) if err != nil { + if err == utils.ErrNotFound { + return false, nil + } return false, err } for _, val := range fltr.Values { @@ -139,6 +142,9 @@ func (fltr *RequestFilter) passString(req interface{}, extraFieldsLabel string) func (fltr *RequestFilter) passStringPrefix(req interface{}, extraFieldsLabel string) (bool, error) { strVal, err := utils.ReflectFieldAsString(req, fltr.FieldName, extraFieldsLabel) if err != nil { + if err == utils.ErrNotFound { + return false, nil + } return false, err } for _, prfx := range fltr.Values { @@ -157,6 +163,9 @@ func (fltr *RequestFilter) passTimings(req interface{}, extraFieldsLabel string) func (fltr *RequestFilter) passDestinations(req interface{}, extraFieldsLabel string) (bool, error) { dst, err := utils.ReflectFieldAsString(req, fltr.FieldName, extraFieldsLabel) if err != nil { + if err == utils.ErrNotFound { + return false, nil + } return false, err } for _, p := range utils.SplitPrefix(dst, MIN_PREFIX_MATCH) { @@ -177,6 +186,9 @@ func (fltr *RequestFilter) passDestinations(req interface{}, extraFieldsLabel st func (fltr *RequestFilter) passRSRFields(req interface{}, extraFieldsLabel string) (bool, error) { for _, rsrFld := range fltr.rsrFields { if strVal, err := utils.ReflectFieldAsString(req, rsrFld.Id, extraFieldsLabel); err != nil { + if err == utils.ErrNotFound { + return false, nil + } return false, err } else if rsrFld.FilterPasses(strVal) { return true, nil diff --git a/engine/reqfilter_test.go b/engine/reqfilter_test.go index 86590d917..ebe50e471 100644 --- a/engine/reqfilter_test.go +++ b/engine/reqfilter_test.go @@ -79,8 +79,10 @@ func TestPassStringPrefix(t *testing.T) { t.Error("Not passes filter") } rf = &RequestFilter{Type: MetaStringPrefix, FieldName: "nonexisting", Values: []string{"off"}} - if _, err := rf.passStringPrefix(cd, "ExtraFields"); err == nil || err != utils.ErrNotFound { + if passing, err := rf.passStringPrefix(cd, "ExtraFields"); err != nil { t.Error(err) + } else if passing { + t.Error("Passes filter") } } diff --git a/engine/reslimiter.go b/engine/reslimiter.go index 68bf29ec3..564d6130c 100644 --- a/engine/reslimiter.go +++ b/engine/reslimiter.go @@ -31,19 +31,67 @@ import ( "github.com/cgrates/rpcclient" ) +type ResourceUsage struct { + ID string // Unique identifier of this resourceUsage, Eg: FreeSWITCH UUID + UsageTime time.Time // So we can expire it later + UsageUnits float64 // Number of units used +} + // ResourceLimit represents a limit imposed for accessing a resource (eg: new calls) type ResourceLimit struct { ID string // Identifier of this limit Filters []*RequestFilter // Filters for the request ActivationTime time.Time // Time when this limit becomes active - Weight float64 // Weight to sort the ResourceLimits - Limit float64 // Limit value - ActionTriggers ActionTriggers // Thresholds to check after changing Limit - Used utils.Int64Slice // []time.Time.Unix() - keep it in this format so we can expire usage automatically + ExpiryTime time.Time + Weight float64 // Weight to sort the ResourceLimits + Limit float64 // Limit value + ActionTriggers ActionTriggers // Thresholds to check after changing Limit + UsageTTL time.Duration // Expire usage after this duration + Usage map[string]*ResourceUsage // Keep a record of usage, bounded with timestamps so we can expire too long records + usageCounter float64 // internal counter aggregating real usage of ResourceLimit +} + +func (rl *ResourceLimit) removeExpiredUnits() { + for ruID, rv := range rl.Usage { + if time.Now().Sub(rv.UsageTime) <= rl.UsageTTL { + continue // not expired + } + delete(rl.Usage, ruID) + rl.usageCounter -= rv.UsageUnits + } +} + +func (rl *ResourceLimit) UsedUnits() float64 { + if rl.UsageTTL != 0 { + rl.removeExpiredUnits() + } + return rl.usageCounter +} + +func (rl *ResourceLimit) RecordUsage(ru *ResourceUsage) error { + if _, hasID := rl.Usage[ru.ID]; hasID { + return fmt.Errorf("Duplicate resource usage with id: %s", ru.ID) + } + rl.Usage[ru.ID] = ru + rl.usageCounter += ru.UsageUnits + return nil +} + +func (rl *ResourceLimit) RemoveUsage(ruID string) error { + ru, hasIt := rl.Usage[ruID] + if !hasIt { + return fmt.Errorf("Cannot find usage record with id: %s", ruID) + } + delete(rl.Usage, ru.ID) + rl.usageCounter -= ru.UsageUnits + return nil } // Pas the config as a whole so we can ask access concurrently func NewResourceLimiterService(cfg *config.CGRConfig, dataDB AccountingStorage, cdrStatS rpcclient.RpcClientConnection) (*ResourceLimiterService, error) { + if cdrStatS != nil && reflect.ValueOf(cdrStatS).IsNil() { + cdrStatS = nil + } rls := &ResourceLimiterService{stringIndexes: make(map[string]map[string]utils.StringMap), dataDB: dataDB, cdrStatS: cdrStatS} return rls, nil } @@ -59,7 +107,7 @@ type ResourceLimiterService struct { // Index cached ResourceLimits with MetaString filter types func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error { utils.Logger.Info(" Start indexing string filters") - newStringIndexes := make(map[string]map[string]utils.StringMap) // Index it transactionally + newStringIndexes := make(map[string]map[string]utils.StringMap) // Index it transactional var cacheIDsToIndex []string // Cache keys of RLs to be indexed if rlIDs == nil { cacheIDsToIndex = cache2go.GetEntriesKeys(utils.ResourceLimitsPrefix) @@ -74,10 +122,12 @@ func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error { return utils.ErrNotFound } rl := x.(*ResourceLimit) + var hasMetaString bool for _, fltr := range rl.Filters { if fltr.Type != MetaString { continue } + hasMetaString = true // Mark that we found at least one metatring so we don't need to index globally if _, hastIt := newStringIndexes[fltr.FieldName]; !hastIt { newStringIndexes[fltr.FieldName] = make(map[string]utils.StringMap) } @@ -88,6 +138,15 @@ func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error { newStringIndexes[fltr.FieldName][fldVal][rl.ID] = true } } + if !hasMetaString { + if _, hasIt := newStringIndexes[utils.NOT_AVAILABLE]; !hasIt { + newStringIndexes[utils.NOT_AVAILABLE] = make(map[string]utils.StringMap) + } + if _, hasIt := newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE]; !hasIt { + newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] = make(utils.StringMap) + } + newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE][rl.ID] = true // Fields without real field index will be located in map[NOT_AVAILABLE][NOT_AVAILABLE][rl.ID] + } } rls.Lock() defer rls.Unlock() @@ -117,12 +176,11 @@ func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error { // Called when cache/re-caching is necessary func (rls *ResourceLimiterService) cacheResourceLimits(loadID string, rlIDs []string) error { - if len(rlIDs) == 0 { - return nil - } if rlIDs == nil { utils.Logger.Info(" Start caching all resource limits") - } else if len(rlIDs) != 0 { + } else if len(rlIDs) == 0 { + return nil + } else { utils.Logger.Info(fmt.Sprintf(" Start caching resource limits with ids: %+v", rlIDs)) } if err := rls.dataDB.PreloadCacheForPrefix(utils.ResourceLimitsPrefix); err != nil { @@ -132,6 +190,72 @@ func (rls *ResourceLimiterService) cacheResourceLimits(loadID string, rlIDs []st return rls.indexStringFilters(rlIDs) } +func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]interface{}) (map[string]*ResourceLimit, error) { + matchingResources := make(map[string]*ResourceLimit) + for fldName, fieldValIf := range ev { + if _, hasIt := rls.stringIndexes[fldName]; !hasIt { + continue + } + strVal, canCast := utils.CastFieldIfToString(fieldValIf) + if !canCast { + return nil, fmt.Errorf("Cannot cast field: %s into string", fldName) + } + if _, hasIt := rls.stringIndexes[fldName][strVal]; !hasIt { + continue + } + for resName := range rls.stringIndexes[fldName][strVal] { + if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL + continue + } + x, err := CacheGet(utils.ResourceLimitsPrefix + resName) + if err != nil { + return nil, err + } + rl := x.(*ResourceLimit) + now := time.Now() + if rl.ActivationTime.After(now) || (!rl.ExpiryTime.IsZero() && rl.ExpiryTime.Before(now)) { // not active + continue + } + passAllFilters := true + for _, fltr := range rl.Filters { + if pass, err := fltr.Pass(ev, "", rls.cdrStatS); err != nil { + return nil, utils.NewErrServerError(err) + } else if !pass { + passAllFilters = false + continue + } + } + if passAllFilters { + matchingResources[rl.ID] = rl // Cannot save it here since we could have errors after and resource will remain unused + } + } + } + // Check un-indexed resources + for resName := range rls.stringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] { + if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL + continue + } + x, err := CacheGet(utils.ResourceLimitsPrefix + resName) + if err != nil { + return nil, err + } + rl := x.(*ResourceLimit) + now := time.Now() + if rl.ActivationTime.After(now) || (!rl.ExpiryTime.IsZero() && rl.ExpiryTime.Before(now)) { // not active + continue + } + for _, fltr := range rl.Filters { + if pass, err := fltr.Pass(ev, "", rls.cdrStatS); err != nil { + return nil, utils.NewErrServerError(err) + } else if !pass { + continue + } + matchingResources[rl.ID] = rl // Cannot save it here since we could have errors after and resource will remain unused + } + } + return matchingResources, nil +} + // Called to start the service func (rls *ResourceLimiterService) Start() error { if err := rls.cacheResourceLimits("ResourceLimiterServiceStart", nil); err != nil { @@ -145,7 +269,7 @@ func (rls *ResourceLimiterService) Shutdown() error { return nil } -// RPC Methods available internally +// RPC Methods // Cache/Re-cache func (rls *ResourceLimiterService) V1CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error { @@ -156,6 +280,83 @@ func (rls *ResourceLimiterService) V1CacheResourceLimits(attrs *utils.AttrRLsCac return nil } +// Alias API for external usage +func (rls *ResourceLimiterService) CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error { + return rls.V1CacheResourceLimits(attrs, reply) +} + +func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error { + rls.Lock() // Unknown number of RLs updated + defer rls.Unlock() + matchingRLForEv, err := rls.matchingResourceLimitsForEvent(ev) + if err != nil { + return utils.NewErrServerError(err) + } + retRLs := make([]*ResourceLimit, len(matchingRLForEv)) + i := 0 + for _, rl := range matchingRLForEv { + retRLs[i] = rl + i++ + } + *reply = retRLs + return nil +} + +// Alias API for external usage +func (rls *ResourceLimiterService) ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error { + return rls.V1ResourceLimitsForEvent(ev, reply) +} + +// Called when a session or another event needs to +func (rls *ResourceLimiterService) V1InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error { + rls.Lock() // Unknown number of RLs updated + defer rls.Unlock() + matchingRLForEv, err := rls.matchingResourceLimitsForEvent(attrs.Event) + if err != nil { + return utils.NewErrServerError(err) + } + for rlID, rl := range matchingRLForEv { + if rl.Limit < rl.UsedUnits()+attrs.RequestedUnits { + delete(matchingRLForEv, rlID) + } + if err := rl.RecordUsage(&ResourceUsage{ID: attrs.ResourceUsageID, UsageTime: time.Now(), UsageUnits: attrs.RequestedUnits}); err != nil { + return err // Should not happen + } + } + if len(matchingRLForEv) == 0 { + return utils.ErrResourceUnavailable + } + for _, rl := range matchingRLForEv { + CacheSet(utils.ResourceLimitsPrefix+rl.ID, rl) + } + *reply = utils.OK + return nil +} + +// Alias for externam methods +func (rls *ResourceLimiterService) InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error { + return rls.V1InitiateResourceUsage(attrs, reply) +} + +func (rls *ResourceLimiterService) V1TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error { + rls.Lock() // Unknown number of RLs updated + defer rls.Unlock() + matchingRLForEv, err := rls.matchingResourceLimitsForEvent(attrs.Event) + if err != nil { + return utils.NewErrServerError(err) + } + for _, rl := range matchingRLForEv { + rl.RemoveUsage(attrs.ResourceUsageID) + } + *reply = utils.OK + return nil +} + +// Alias for external methods +func (rls *ResourceLimiterService) TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error { + return rls.V1TerminateResourceUsage(attrs, reply) +} + // Make the service available as RPC internally func (rls *ResourceLimiterService) Call(serviceMethod string, args interface{}, reply interface{}) error { parts := strings.Split(serviceMethod, ".") diff --git a/engine/reslimiter_test.go b/engine/reslimiter_test.go index 659fecc12..6c7bec318 100644 --- a/engine/reslimiter_test.go +++ b/engine/reslimiter_test.go @@ -27,7 +27,9 @@ import ( "github.com/cgrates/cgrates/utils" ) -func TestIndexStringFilters(t *testing.T) { +var rLS *ResourceLimiterService + +func TestRLsIndexStringFilters(t *testing.T) { rls := []*ResourceLimit{ &ResourceLimit{ ID: "RL1", @@ -39,6 +41,7 @@ func TestIndexStringFilters(t *testing.T) { }}, ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), Limit: 2, + Usage: make(map[string]*ResourceUsage), }, &ResourceLimit{ ID: "RL2", @@ -49,12 +52,35 @@ func TestIndexStringFilters(t *testing.T) { }, ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), Limit: 1, + UsageTTL: time.Duration(1 * time.Millisecond), + Usage: make(map[string]*ResourceUsage), + }, + &ResourceLimit{ + ID: "RL4", + Weight: 10, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"+49"}}, + }, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 1, + Usage: make(map[string]*ResourceUsage), + }, + &ResourceLimit{ + ID: "RL5", + Weight: 10, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"+40"}}, + }, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 1, + UsageTTL: time.Duration(10 * time.Millisecond), + Usage: make(map[string]*ResourceUsage), }, } for _, rl := range rls { cache2go.Set(utils.ResourceLimitsPrefix+rl.ID, rl) } - rLS := new(ResourceLimiterService) + rLS = new(ResourceLimiterService) eIndexes := map[string]map[string]utils.StringMap{ "Account": map[string]utils.StringMap{ "1001": utils.StringMap{ @@ -73,6 +99,12 @@ func TestIndexStringFilters(t *testing.T) { "RL2": true, }, }, + utils.NOT_AVAILABLE: map[string]utils.StringMap{ + utils.NOT_AVAILABLE: utils.StringMap{ + "RL4": true, + "RL5": true, + }, + }, } if err := rLS.indexStringFilters(nil); err != nil { t.Error(err) @@ -88,8 +120,21 @@ func TestIndexStringFilters(t *testing.T) { }, ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), Limit: 1, + Usage: make(map[string]*ResourceUsage), } cache2go.Set(utils.ResourceLimitsPrefix+rl3.ID, rl3) + rl6 := &ResourceLimit{ // Add it so we can test expiryTime + ID: "RL6", + Weight: 10, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}}, + }, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 1, + Usage: make(map[string]*ResourceUsage), + } + CacheSet(utils.ResourceLimitsPrefix+rl6.ID, rl6) eIndexes = map[string]map[string]utils.StringMap{ "Account": map[string]utils.StringMap{ "1001": utils.StringMap{ @@ -107,16 +152,155 @@ func TestIndexStringFilters(t *testing.T) { "dan": utils.StringMap{ "RL2": true, "RL3": true, + "RL6": true, }, "1003": utils.StringMap{ "RL3": true, }, }, + utils.NOT_AVAILABLE: map[string]utils.StringMap{ + utils.NOT_AVAILABLE: utils.StringMap{ + "RL4": true, + "RL5": true, + }, + }, } // Test index update - if err := rLS.indexStringFilters([]string{rl3.ID}); err != nil { + if err := rLS.indexStringFilters([]string{rl3.ID, rl6.ID}); err != nil { t.Error(err) } else if !reflect.DeepEqual(eIndexes, rLS.stringIndexes) { t.Errorf("Expecting: %+v, received: %+v", eIndexes, rLS.stringIndexes) } } + +func TestRLsMatchingResourceLimitsForEvent(t *testing.T) { + eResLimits := map[string]*ResourceLimit{ + "RL1": &ResourceLimit{ + ID: "RL1", + Weight: 20, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, + &RequestFilter{Type: MetaRSRFields, Values: []string{"Subject(~^1.*1$)", "Destination(1002)"}, + rsrFields: utils.ParseRSRFieldsMustCompile("Subject(~^1.*1$);Destination(1002)", utils.INFIELD_SEP), + }}, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 2, + Usage: make(map[string]*ResourceUsage), + }, + "RL2": &ResourceLimit{ + ID: "RL2", + Weight: 10, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"dan", "1002"}}, + &RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}}, + }, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 1, + UsageTTL: time.Duration(1 * time.Millisecond), + Usage: make(map[string]*ResourceUsage), + }, + } + if resLimits, err := rLS.matchingResourceLimitsForEvent(map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"}); err != nil { + t.Error(err) + } else if len(eResLimits) != len(resLimits) { + t.Errorf("Expecting: %+v, received: %+v", eResLimits, resLimits) + } else { + for rlID := range eResLimits { + if _, hasID := resLimits[rlID]; !hasID { + t.Errorf("Expecting: %+v, received: %+v", eResLimits, resLimits) + } + } + // Make sure the filters are what we expect to be after retrieving from cache: + fltr := resLimits["RL1"].Filters[1] + if pass, _ := fltr.Pass(map[string]interface{}{"Subject": "10000001"}, "", nil); !pass { + t.Errorf("Expecting RL: %+v, received: %+v", eResLimits["RL1"], resLimits["RL1"]) + } + if pass, _ := fltr.Pass(map[string]interface{}{"Account": "1002"}, "", nil); pass { + t.Errorf("Expecting RL: %+v, received: %+v", eResLimits["RL1"], resLimits["RL1"]) + } + + } +} + +func TestRLsV1ResourceLimitsForEvent(t *testing.T) { + eLimits := []*ResourceLimit{ + &ResourceLimit{ + ID: "RL1", + Weight: 20, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, + &RequestFilter{Type: MetaRSRFields, Values: []string{"Subject(~^1.*1$)", "Destination(1002)"}, + rsrFields: utils.ParseRSRFieldsMustCompile("Subject(~^1.*1$);Destination(1002)", utils.INFIELD_SEP), + }}, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 2, + Usage: make(map[string]*ResourceUsage), + }, + &ResourceLimit{ + ID: "RL2", + Weight: 10, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"dan", "1002"}}, + &RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}}, + }, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 1, + UsageTTL: time.Duration(1 * time.Millisecond), + Usage: make(map[string]*ResourceUsage), + }, + } + var rcvLmts []*ResourceLimit + if err := rLS.V1ResourceLimitsForEvent(map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"}, &rcvLmts); err != nil { + t.Error(err) + } else if len(eLimits) != len(rcvLmts) { + t.Errorf("Expecting: %+v, received: %+v", eLimits, rcvLmts) + } +} + +func TestRLsV1InitiateResourceUsage(t *testing.T) { + attrRU := utils.AttrRLsResourceUsage{ + ResourceUsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50", + Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"}, + RequestedUnits: 1, + } + var reply string + if err := rLS.V1InitiateResourceUsage(attrRU, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("Received reply: ", reply) + } + resLimits, err := rLS.matchingResourceLimitsForEvent(attrRU.Event) + if err != nil { + t.Error(err) + } else if len(resLimits) != 2 { + t.Errorf("Received: %+v", resLimits) + } else if resLimits["RL1"].UsedUnits() != 1 { + t.Errorf("RL1: %+v", resLimits["RL1"]) + } else if _, hasKey := resLimits["RL1"].Usage[attrRU.ResourceUsageID]; !hasKey { + t.Errorf("RL1: %+v", resLimits["RL1"]) + } +} + +func TestRLsV1TerminateResourceUsage(t *testing.T) { + attrRU := utils.AttrRLsResourceUsage{ + ResourceUsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50", + Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"}, + RequestedUnits: 1, + } + var reply string + if err := rLS.V1TerminateResourceUsage(attrRU, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("Received reply: ", reply) + } + resLimits, err := rLS.matchingResourceLimitsForEvent(attrRU.Event) + if err != nil { + t.Error(err) + } else if len(resLimits) != 2 { + t.Errorf("Received: %+v", resLimits) + } else if resLimits["RL1"].UsedUnits() != 0 { + t.Errorf("RL1: %+v", resLimits["RL1"]) + } else if _, hasKey := resLimits["RL1"].Usage[attrRU.ResourceUsageID]; hasKey { + t.Errorf("RL1: %+v", resLimits["RL1"]) + } +} diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index c955125e2..c37e46d25 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -372,6 +372,32 @@ func (fsev FSEvent) ComputeLcr() bool { } } +// Used with RLs +func (fsev FSEvent) AsMapStringInterface(timezone string) map[string]interface{} { + mp := make(map[string]interface{}) + mp[utils.CGRID] = fsev.GetCgrId(timezone) + mp[utils.TOR] = utils.VOICE + mp[utils.ACCID] = fsev.GetUUID() + mp[utils.CDRHOST] = fsev.GetOriginatorIP(utils.META_DEFAULT) + mp[utils.CDRSOURCE] = "FS_" + fsev.GetName() + mp[utils.REQTYPE] = fsev.GetReqType(utils.META_DEFAULT) + mp[utils.DIRECTION] = fsev.GetDirection(utils.META_DEFAULT) + mp[utils.TENANT] = fsev.GetTenant(utils.META_DEFAULT) + mp[utils.CATEGORY] = fsev.GetCategory(utils.META_DEFAULT) + mp[utils.ACCOUNT] = fsev.GetAccount(utils.META_DEFAULT) + mp[utils.SUBJECT] = fsev.GetSubject(utils.META_DEFAULT) + mp[utils.DESTINATION] = fsev.GetDestination(utils.META_DEFAULT) + mp[utils.SETUP_TIME], _ = fsev.GetSetupTime(utils.META_DEFAULT, timezone) + mp[utils.ANSWER_TIME], _ = fsev.GetAnswerTime(utils.META_DEFAULT, timezone) + mp[utils.USAGE], _ = fsev.GetDuration(utils.META_DEFAULT) + mp[utils.PDD], _ = fsev.GetPdd(utils.META_DEFAULT) + mp[utils.COST] = -1 + mp[utils.SUPPLIER] = fsev.GetSupplier(utils.META_DEFAULT) + mp[utils.DISCONNECT_CAUSE] = fsev.GetDisconnectCause(utils.META_DEFAULT) + //storCdr.ExtraFields = fsev.GetExtraFields() + return mp +} + // Converts into CallDescriptor due to responder interface needs func (fsev FSEvent) AsCallDescriptor() (*engine.CallDescriptor, error) { lcrReq := &engine.LcrRequest{ diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index e7dc3184e..58f0349ab 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "log/syslog" + "reflect" "strconv" "strings" "time" @@ -33,13 +34,17 @@ import ( "github.com/cgrates/rpcclient" ) -func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs rpcclient.RpcClientConnection, timezone string) *FSSessionManager { +func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs, rls rpcclient.RpcClientConnection, timezone string) *FSSessionManager { + if rls != nil && reflect.ValueOf(rls).IsNil() { + rls = nil + } return &FSSessionManager{ cfg: smFsConfig, conns: make(map[string]*fsock.FSock), senderPools: make(map[string]*fsock.FSockPool), rater: rater, cdrsrv: cdrs, + rls: rls, sessions: NewSessions(), timezone: timezone, } @@ -53,6 +58,7 @@ type FSSessionManager struct { senderPools map[string]*fsock.FSockPool // Keep sender pools here rater rpcclient.RpcClientConnection cdrsrv rpcclient.RpcClientConnection + rls rpcclient.RpcClientConnection sessions *Sessions timezone string @@ -159,6 +165,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { if err = sm.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { utils.Logger.Info(fmt.Sprintf(" LCR_API_ERROR: %s", err.Error())) sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) + return } if lcr.HasErrors() { lcr.LogErrors() @@ -174,9 +181,28 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n", ev.GetUUID(), utils.CGR_SUPPLIERS, fsArray)); err != nil { utils.Logger.Info(fmt.Sprintf(" LCR_ERROR: %s", err.Error())) sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) + return } } } + if sm.rls != nil { + var reply string + attrRU := utils.AttrRLsResourceUsage{ + ResourceUsageID: ev.GetUUID(), + Event: ev.(FSEvent).AsMapStringInterface(sm.timezone), + RequestedUnits: 1, + } + if err := sm.rls.Call("RLsV1.InitiateResourceUsage", attrRU, &reply); err != nil { + if err.Error() == utils.ErrResourceUnavailable.Error() { + sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), "-"+utils.ErrResourceUnavailable.Error()) + } else { + utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error())) + sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) + } + return + } + } + // Check ResourceLimits sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK) } @@ -224,6 +250,17 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) { if sm.cfg.CreateCdr { sm.ProcessCdr(ev.AsStoredCdr(config.CgrConfig().DefaultTimezone)) } + var reply string + attrRU := utils.AttrRLsResourceUsage{ + ResourceUsageID: ev.GetUUID(), + Event: ev.(FSEvent).AsMapStringInterface(sm.timezone), + RequestedUnits: 1, + } + if sm.rls != nil { + if err := sm.rls.Call("RLsV1.TerminateResourceUsage", attrRU, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error())) + } + } } // Connects to the freeswitch mod_event_socket server and starts diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 7f572918a..7f382d6e4 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1093,7 +1093,7 @@ type AttrExportCdrsToFile struct { CdrFormat *string // Cdr output file format FieldSeparator *string // Separator used between fields ExportID *string // Optional exportid - ExportFolder *string // If provided it overwrites the configured export directory + ExportDirectory *string // If provided it overwrites the configured export directory ExportFileName *string // If provided the output filename will be set to this ExportTemplate *string // Exported fields template <""|fld1,fld2|*xml:instance_name> DataUsageMultiplyFactor *float64 // Multiply data usage before export (eg: convert from KBytes to Bytes) @@ -1222,3 +1222,9 @@ type AttrRLsCache struct { LoadID string ResourceLimitIDs []string } + +type AttrRLsResourceUsage struct { + ResourceUsageID string + Event map[string]interface{} + RequestedUnits float64 +} diff --git a/utils/consts.go b/utils/consts.go index bef1164c6..e5341b129 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -32,6 +32,7 @@ var ( ErrUserNotFound = errors.New("USER_NOT_FOUND") ErrInsufficientCredit = errors.New("INSUFFICIENT_CREDIT") ErrNotConvertible = errors.New("NOT_CONVERTIBLE") + ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE") CdreCdrFormats = []string{CSV, DRYRUN, CDRE_FIXED_WIDTH} PrimaryCdrFields = []string{CGRID, CDRSOURCE, CDRHOST, ACCID, TOR, REQTYPE, DIRECTION, TENANT, CATEGORY, ACCOUNT, SUBJECT, DESTINATION, SETUP_TIME, PDD, ANSWER_TIME, USAGE, diff --git a/utils/reflect.go b/utils/reflect.go index d8ba5bb2f..3f2bba7fd 100644 --- a/utils/reflect.go +++ b/utils/reflect.go @@ -60,7 +60,19 @@ func ReflectFieldAsString(intf interface{}, fldName, extraFieldsLabel string) (s if v.Kind() == reflect.Ptr { v = v.Elem() } - field := v.FieldByName(fldName) + var field reflect.Value + switch v.Kind() { + case reflect.Struct: + field = v.FieldByName(fldName) + case reflect.Map: + field = v.MapIndex(reflect.ValueOf(fldName)) + if !field.IsValid() { // Not looking in extra fields anymore + return "", ErrNotFound + } + default: + return "", fmt.Errorf("Unsupported field kind: %v", v.Kind()) + } + if !field.IsValid() { if extraFieldsLabel == "" { return "", ErrNotFound diff --git a/utils/reflect_test.go b/utils/reflect_test.go index ea3312be7..12061b269 100644 --- a/utils/reflect_test.go +++ b/utils/reflect_test.go @@ -22,7 +22,7 @@ import ( "testing" ) -func TestReflectFieldAsString(t *testing.T) { +func TestReflectFieldAsStringOnStruct(t *testing.T) { mystruct := struct { Title string Count int @@ -71,3 +71,48 @@ func TestReflectFieldAsString(t *testing.T) { t.Error("Received: %s", strVal) } } + +func TestReflectFieldAsStringOnMap(t *testing.T) { + myMap := map[string]interface{}{"Title": "Title1", "Count": 5, "Count64": int64(6), "Val": 7.3, + "a": "Title2", "b": 15, "c": int64(16), "d": 17.3} + if strVal, err := ReflectFieldAsString(myMap, "Title", ""); err != nil { + t.Error(err) + } else if strVal != "Title1" { + t.Error("Received: %s", strVal) + } + if strVal, err := ReflectFieldAsString(myMap, "Count", ""); err != nil { + t.Error(err) + } else if strVal != "5" { + t.Error("Received: %s", strVal) + } + if strVal, err := ReflectFieldAsString(myMap, "Count64", ""); err != nil { + t.Error(err) + } else if strVal != "6" { + t.Error("Received: %s", strVal) + } + if strVal, err := ReflectFieldAsString(myMap, "Val", ""); err != nil { + t.Error(err) + } else if strVal != "7.3" { + t.Error("Received: %s", strVal) + } + if strVal, err := ReflectFieldAsString(myMap, "a", ""); err != nil { + t.Error(err) + } else if strVal != "Title2" { + t.Error("Received: %s", strVal) + } + if strVal, err := ReflectFieldAsString(myMap, "b", ""); err != nil { + t.Error(err) + } else if strVal != "15" { + t.Error("Received: %s", strVal) + } + if strVal, err := ReflectFieldAsString(myMap, "c", ""); err != nil { + t.Error(err) + } else if strVal != "16" { + t.Error("Received: %s", strVal) + } + if strVal, err := ReflectFieldAsString(myMap, "d", ""); err != nil { + t.Error(err) + } else if strVal != "17.3" { + t.Error("Received: %s", strVal) + } +}