Merge branch 'master' into newcache

This commit is contained in:
Radu Ioan Fericean
2016-08-09 21:28:00 +03:00
31 changed files with 702 additions and 131 deletions

View File

@@ -48,6 +48,7 @@ information, please see the [`CONTRIBUTING.md`](CONTRIBUTING.md) file.
| @razvancrainea | Răzvan Crainea |
| @marcinkowalczyk | Marcin Kowalczyk |
| @andmar | André Maricato |
| @brendangilmore | Brendan Gilmore |
<!-- to sign, include a single line above this comment containing the following text:
| @username | First Last |
-->

View File

@@ -79,7 +79,7 @@ func TestApierCreateDirs(t *testing.T) {
if !*testLocal {
return
}
for _, pathDir := range []string{cfg.CdreProfiles[utils.META_DEFAULT].ExportFolder, "/var/log/cgrates/cdrc/in", "/var/log/cgrates/cdrc/out", cfg.HistoryDir} {
for _, pathDir := range []string{cfg.CdreProfiles[utils.META_DEFAULT].ExportDirectory, "/var/log/cgrates/cdrc/in", "/var/log/cgrates/cdrc/out", cfg.HistoryDir} {
if err := os.RemoveAll(pathDir); err != nil {
t.Fatal("Error removing folder: ", pathDir, err)

View File

@@ -120,7 +120,7 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E
return fmt.Errorf("%s:FieldSeparator:%s", utils.ErrServerError.Error(), "Invalid")
}
}
exportDir := exportTemplate.ExportFolder
exportDir := exportTemplate.ExportDirectory
if attr.ExportDir != nil && len(*attr.ExportDir) != 0 {
exportDir = *attr.ExportDir
}

View File

@@ -28,7 +28,7 @@ import (
func (self *ApierV2) GetAccounts(attr utils.AttrGetAccounts, reply *[]*engine.Account) error {
if len(attr.Tenant) == 0 {
return utils.NewErrMandatoryIeMissing("Tenanat")
return utils.NewErrMandatoryIeMissing("Tenant")
}
var accountKeys []string
var err error

View File

@@ -56,9 +56,9 @@ func (self *ApierV2) ExportCdrsToFile(attr utils.AttrExportCdrsToFile, reply *ut
return fmt.Errorf("%s:FieldSeparator:%s", utils.ErrServerError, "Invalid")
}
}
ExportFolder := exportTemplate.ExportFolder
if attr.ExportFolder != nil && len(*attr.ExportFolder) != 0 {
ExportFolder = *attr.ExportFolder
eDir := exportTemplate.ExportDirectory
if attr.ExportDirectory != nil && len(*attr.ExportDirectory) != 0 {
eDir = *attr.ExportDirectory
}
ExportID := strconv.FormatInt(time.Now().Unix(), 10)
if attr.ExportID != nil && len(*attr.ExportID) != 0 {
@@ -68,7 +68,7 @@ func (self *ApierV2) ExportCdrsToFile(attr utils.AttrExportCdrsToFile, reply *ut
if attr.ExportFileName != nil && len(*attr.ExportFileName) != 0 {
fileName = *attr.ExportFileName
}
filePath := path.Join(ExportFolder, fileName)
filePath := path.Join(eDir, fileName)
if cdrFormat == utils.DRYRUN {
filePath = utils.DRYRUN
}

View File

@@ -202,9 +202,9 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC
exitChan <- true
}
func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SMFreeSWITCH service.")
var ralsConn, cdrsConn *rpcclient.RpcClientPool
var ralsConn, cdrsConn, rlsConn *rpcclient.RpcClientPool
if len(cfg.SmFsConfig.RALsConns) != 0 {
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmFsConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
@@ -223,7 +223,16 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcCli
return
}
}
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, ralsConn, cdrsConn, cfg.DefaultTimezone)
if len(cfg.SmFsConfig.RLsConns) != 0 {
rlsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmFsConfig.RLsConns, rlsChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RLs: %s", err.Error()))
exitChan <- true
return
}
}
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, ralsConn, cdrsConn, rlsConn, cfg.DefaultTimezone)
smRpc.SMs = append(smRpc.SMs, sm)
if err = sm.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMFreeSWITCH> error: %s!", err))
@@ -640,7 +649,7 @@ func main() {
}
// Start SM-FreeSWITCH
if cfg.SmFsConfig.Enabled {
go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, cdrDb, exitChan)
go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, internalRLSChan, cdrDb, exitChan)
// close all sessions on shutdown
go shutdownSessionmanagerSingnalHandler(exitChan)
}

View File

@@ -31,7 +31,7 @@ type CdreConfig struct {
CostShiftDigits int
MaskDestinationID string
MaskLength int
ExportFolder string
ExportDirectory string
HeaderFields []*CfgCdrField
ContentFields []*CfgCdrField
TrailerFields []*CfgCdrField
@@ -76,8 +76,8 @@ func (self *CdreConfig) loadFromJsonCfg(jsnCfg *CdreJsonCfg) error {
if jsnCfg.Mask_length != nil {
self.MaskLength = *jsnCfg.Mask_length
}
if jsnCfg.Export_folder != nil {
self.ExportFolder = *jsnCfg.Export_folder
if jsnCfg.Export_directory != nil {
self.ExportDirectory = *jsnCfg.Export_directory
}
if jsnCfg.Header_fields != nil {
if self.HeaderFields, err = CfgCdrFieldsFromCdrFieldsJsonCfg(*jsnCfg.Header_fields); err != nil {
@@ -111,7 +111,7 @@ func (self *CdreConfig) Clone() *CdreConfig {
clnCdre.CostShiftDigits = self.CostShiftDigits
clnCdre.MaskDestinationID = self.MaskDestinationID
clnCdre.MaskLength = self.MaskLength
clnCdre.ExportFolder = self.ExportFolder
clnCdre.ExportDirectory = self.ExportDirectory
clnCdre.HeaderFields = make([]*CfgCdrField, len(self.HeaderFields))
for idx, fld := range self.HeaderFields {
clonedVal := *fld

View File

@@ -30,11 +30,11 @@ func TestCdreCfgClone(t *testing.T) {
emptyFields := []*CfgCdrField{}
initContentFlds := []*CfgCdrField{
&CfgCdrField{Tag: "CgrId",
Type: "cdrfield",
Type: "*composed",
FieldId: "cgrid",
Value: cgrIdRsrs},
&CfgCdrField{Tag: "RunId",
Type: "cdrfield",
Type: "*composed",
FieldId: "mediation_runid",
Value: runIdRsrs},
}
@@ -47,16 +47,16 @@ func TestCdreCfgClone(t *testing.T) {
CostShiftDigits: 0,
MaskDestinationID: "MASKED_DESTINATIONS",
MaskLength: 0,
ExportFolder: "/var/spool/cgrates/cdre",
ExportDirectory: "/var/spool/cgrates/cdre",
ContentFields: initContentFlds,
}
eClnContentFlds := []*CfgCdrField{
&CfgCdrField{Tag: "CgrId",
Type: "cdrfield",
Type: "*composed",
FieldId: "cgrid",
Value: cgrIdRsrs},
&CfgCdrField{Tag: "RunId",
Type: "cdrfield",
Type: "*composed",
FieldId: "mediation_runid",
Value: runIdRsrs},
}
@@ -69,7 +69,7 @@ func TestCdreCfgClone(t *testing.T) {
CostShiftDigits: 0,
MaskDestinationID: "MASKED_DESTINATIONS",
MaskLength: 0,
ExportFolder: "/var/spool/cgrates/cdre",
ExportDirectory: "/var/spool/cgrates/cdre",
HeaderFields: emptyFields,
ContentFields: eClnContentFlds,
TrailerFields: emptyFields,

View File

@@ -388,6 +388,11 @@ func (self *CGRConfig) checkConfigSanity() error {
return errors.New("CDRS not enabled but referenced by SMFreeSWITCH component")
}
}
for _, smFSRLsConn := range self.SmFsConfig.RLsConns {
if smFSRLsConn.Address == utils.MetaInternal && !self.resourceLimiterCfg.Enabled {
return errors.New("RLs not enabled but referenced by SMFreeSWITCH component")
}
}
}
// SM-Kamailio checks
if self.SmKamConfig.Enabled {

View File

@@ -207,7 +207,7 @@ const CGRATES_CFG_JSON = `
"cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents)
"mask_destination_id": "MASKED_DESTINATIONS", // destination id containing called addresses to be masked on export
"mask_length": 0, // length of the destination suffix to be masked
"export_folder": "/var/spool/cgrates/cdre", // path where the exported CDRs will be placed
"export_directory": "/var/spool/cgrates/cdre", // path where the exported CDRs will be placed
"header_fields": [], // template of the exported header fields
"content_fields": [ // template of the exported content fields
{"tag": "CGRID", "type": "*composed", "value": "CGRID"},
@@ -250,13 +250,14 @@ const CGRATES_CFG_JSON = `
"sm_freeswitch": {
"enabled": false, // starts SessionManager service: <true|false>
"enabled": false, // starts SessionManager service: <true|false>
"rals_conns": [
{"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013>
{"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013>
],
"cdrs_conns": [
{"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234>
{"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234>
],
"rls_conns": [], // address where to reach the ResourceLimiter service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"create_cdr": false, // create CDR out of events and sends them to CDRS component
"extra_fields": [], // extra fields to store in auth/CDRs when creating them
"debit_interval": "10s", // interval to perform debits on.
@@ -386,8 +387,8 @@ const CGRATES_CFG_JSON = `
"rls": {
"enabled": false, // starts ResourceLimiter service: <true|false>.
"cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|dur>
"usage_ttl": "3h", // expire usage records if older than this duration <""|*never|dur>
"cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
"usage_ttl": "3h", // expire usage records if older than this duration <""|*never|$dur>
},

View File

@@ -249,7 +249,7 @@ func TestDfCdreJsonCfgs(t *testing.T) {
Cost_shift_digits: utils.IntPointer(0),
Mask_destination_id: utils.StringPointer("MASKED_DESTINATIONS"),
Mask_length: utils.IntPointer(0),
Export_folder: utils.StringPointer("/var/spool/cgrates/cdre"),
Export_directory: utils.StringPointer("/var/spool/cgrates/cdre"),
Header_fields: &eFields,
Content_fields: &eContentFlds,
Trailer_fields: &eFields,
@@ -412,6 +412,7 @@ func TestSmFsJsonCfg(t *testing.T) {
&HaPoolJsonCfg{
Address: utils.StringPointer(utils.MetaInternal),
}},
Rls_conns: &[]*HaPoolJsonCfg{},
Create_cdr: utils.BoolPointer(false),
Extra_fields: utils.StringSlicePointer([]string{}),
Debit_interval: utils.StringPointer("10s"),

View File

@@ -140,7 +140,7 @@ type CdreJsonCfg struct {
Cost_shift_digits *int
Mask_destination_id *string
Mask_length *int
Export_folder *string
Export_directory *string
Header_fields *[]*CdrFieldJsonCfg
Content_fields *[]*CdrFieldJsonCfg
Trailer_fields *[]*CdrFieldJsonCfg
@@ -192,6 +192,7 @@ type SmFsJsonCfg struct {
Enabled *bool
Rals_conns *[]*HaPoolJsonCfg
Cdrs_conns *[]*HaPoolJsonCfg
Rls_conns *[]*HaPoolJsonCfg
Create_cdr *bool
Extra_fields *[]string
Debit_interval *string

View File

@@ -163,6 +163,7 @@ type SmFsConfig struct {
Enabled bool
RALsConns []*HaPoolConfig
CDRsConns []*HaPoolConfig
RLsConns []*HaPoolConfig
CreateCdr bool
ExtraFields []*utils.RSRField
DebitInterval time.Duration
@@ -200,6 +201,13 @@ func (self *SmFsConfig) loadFromJsonCfg(jsnCfg *SmFsJsonCfg) error {
self.CDRsConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Rls_conns != nil {
self.RLsConns = make([]*HaPoolConfig, len(*jsnCfg.Rls_conns))
for idx, jsnHaCfg := range *jsnCfg.Rls_conns {
self.RLsConns[idx] = NewDfltHaPoolConfig()
self.RLsConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Create_cdr != nil {
self.CreateCdr = *jsnCfg.Create_cdr
}

View File

@@ -6,6 +6,12 @@
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"listen": {
"rpc_json": ":2012", // RPC JSON listening address
"rpc_gob": ":2013", // RPC GOB listening address
"http": ":2080", // HTTP listening address
},
"rals": {
"enabled": true,
@@ -31,8 +37,10 @@
"sm_opensips": {
"enabled": true, // starts SessionManager service: <true|false>
"listen_udp": ":2020", // address where to listen for datagram events coming from OpenSIPS
"create_cdr": true, // create CDR out of events and sends them to CDRS component
"debit_interval": "5s", // interval to perform debits on.
"mi_addr": "192.168.56.128:8020", // address where to reach OpenSIPS MI to send session disconnects
},
}

View File

@@ -2,16 +2,16 @@
sed -i 's/127.0.0.1/0.0.0.0/g' /etc/redis/redis.conf /etc/mysql/my.cnf
# start services
/etc/init.d/rsyslog start
/etc/init.d/mysql start
/etc/init.d/redis-server start
service rsyslog start
service mysql start
service redis-server start
/root/code/bin/cgr-engine -config_dir /root/cgr/data/conf/samples/osips_training
# setup mysql
cd /usr/share/cgrates/storage/mysql && ./setup_cgr_db.sh root CGRateS.org
# load tariff plan data
cd /root/cgr/data/tariffplans/osips_training; cgr-loader
#cd /root/cgr/data/tariffplans/osips_training; cgr-loader
cd /root/cgr
DISABLE_AUTO_UPDATE="true" zsh

View File

@@ -1,6 +1,6 @@
#Id,FilterType,FilterFieldName,FilterValues,ActivationTime,Weight,Limit,ActionTriggers
ResGroup1,*string,Account,1001;1002,2014-07-29T15:00:00Z,10,2,
ResGroup1,*string_prefix,Destination,10;20,2014-07-29T15:00:00Z,10,,
ResGroup1,*cdr_stats,,CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20,,,,
ResGroup1,*rsr_fields,,Subject(~^1.*1$);Destination(1002),,,,
ResGroup2,*destinations,Destination,DST_FS,2014-07-29T15:00:00Z,10,2,
ResGroup2,*destinations,Destination,DST_FS,2014-07-29T15:00:00Z,10,2,
ResGroup2,*cdr_stats,,CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20,,,,
1 #Id FilterType FilterFieldName FilterValues ActivationTime Weight Limit ActionTriggers
2 ResGroup1 *string Account 1001;1002 2014-07-29T15:00:00Z 10 2
3 ResGroup1 *string_prefix Destination 10;20 2014-07-29T15:00:00Z 10
ResGroup1 *cdr_stats CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20
4 ResGroup1 *rsr_fields Subject(~^1.*1$);Destination(1002)
5 ResGroup2 *destinations Destination DST_FS 2014-07-29T15:00:00Z 10 2
6 ResGroup2 *cdr_stats CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20

View File

@@ -45,6 +45,14 @@
},
"rls": {
"enabled": true, // starts ResourceLimiter service: <true|false>.
"cdrstats_conns": [
{"address": "*internal"}
],
},
"cdre": {
"*default": {
"cdr_format": "csv", // exported CDRs format <csv>
@@ -60,21 +68,21 @@
"export_dir": "/tmp/cgr_fsevsock/cgrates/cdre", // path where the exported CDRs will be placed
"header_fields": [], // template of the exported header fields
"content_fields": [ // template of the exported content fields
{"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"},
{"tag":"RunId", "cdr_field_id": "mediation_runid", "type": "cdrfield", "value": "mediation_runid"},
{"tag":"Tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "tor"},
{"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"},
{"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"},
{"tag":"Direction", "cdr_field_id": "direction", "type": "cdrfield", "value": "direction"},
{"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"},
{"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"},
{"tag":"Account", "cdr_field_id": "account", "type": "cdrfield", "value": "account"},
{"tag":"Subject", "cdr_field_id": "subject", "type": "cdrfield", "value": "subject"},
{"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "destination"},
{"tag":"SetupTime", "cdr_field_id": "setup_time", "type": "cdrfield", "value": "setup_time", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"},
{"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"},
{"tag": "CgrId", "type": "*composed", "value": "CGRID"},
{"tag":"RunId", "type": "*composed", "value": "RunID"},
{"tag":"Tor", "type": "cdrfield", "value": "ToR"},
{"tag":"AccId", "type": "*composed", "value": "OriginID"},
{"tag":"ReqType", "type": "*composed", "value": "RequestType"},
{"tag":"Direction", "type": "*composed", "value": "Direction"},
{"tag":"Tenant", "type": "*composed", "value": "Tenant"},
{"tag":"Category", "type": "*composed", "value": "Category"},
{"tag":"Account", "type": "*composed", "value": "Account"},
{"tag":"Subject", "type": "*composed", "value": "Subject"},
{"tag":"Destination", "type": "*composed", "value": "Destination"},
{"tag":"SetupTime", "type": "*datetime", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "type": "*datetime", "value": "Usage"},
{"tag":"Cost", "type": "*composed", "value": "Cost"},
],
"trailer_fields": [], // template of the exported trailer fields
},
@@ -92,16 +100,16 @@
"export_dir": "/tmp/cgr_fsevsock/cgrates/cdre", // path where the exported CDRs will be placed
"header_fields": [], // template of the exported header fields
"content_fields": [ // template of the exported content fields
{"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"},
{"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"},
{"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"},
{"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"},
{"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"},
{"tag":"Subject", "cdr_field_id": "account", "type": "cdrfield", "value": "account"},
{"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "~destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"},
{"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"},
{"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"},
{"tag": "CgrId", "type": "*composed", "value": "CGRID"},
{"tag":"AccId", "type": "*composed", "value": "OriginID"},
{"tag":"ReqType", "type": "*composed", "value": "RequestType"},
{"tag":"Tenant", "type": "*composed", "value": "Tenant"},
{"tag":"Category", "type": "*composed", "value": "Category"},
{"tag":"Subject", "type": "*composed", "value": "Account"},
{"tag":"Destination", "type": "*composed", "value": "~Destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"},
{"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTIme", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "type": "*composed", "value": "Usage"},
{"tag":"Cost", "type": "*composed", "value": "Cost"},
],
"trailer_fields": [],
},
@@ -111,6 +119,9 @@
"sm_freeswitch": {
"enabled": true, // starts SessionManager service: <true|false>
"debit_interval": "5s", // interval to perform debits on.
"rls_conns": [
{"address": "*internal"}
],
"channel_sync_interval": "10s",
"event_socket_conns":[ // instantiate connections to multiple FreeSWITCH servers
{"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5}

View File

@@ -60,21 +60,21 @@
"export_dir": "/tmp/cgr_kamevapi/cgrates/cdre",
"header_fields": [],
"content_fields": [
{"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"},
{"tag":"RunId", "cdr_field_id": "mediation_runid", "type": "cdrfield", "value": "mediation_runid"},
{"tag":"Tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "tor"},
{"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"},
{"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"},
{"tag":"Direction", "cdr_field_id": "direction", "type": "cdrfield", "value": "direction"},
{"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"},
{"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"},
{"tag":"Account", "cdr_field_id": "account", "type": "cdrfield", "value": "account"},
{"tag":"Subject", "cdr_field_id": "subject", "type": "cdrfield", "value": "subject"},
{"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "destination"},
{"tag":"SetupTime", "cdr_field_id": "setup_time", "type": "cdrfield", "value": "setup_time", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"},
{"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"},
{"tag": "CgrId", "type": "*composed", "value": "CGRID"},
{"tag":"RunId", "type": "*composed", "value": "RunID"},
{"tag":"Tor", "type": "*composed", "value": "ToR"},
{"tag":"AccId", "type": "*composed", "value": "OriginID"},
{"tag":"ReqType", "type": "*composed", "value": "RequestType"},
{"tag":"Direction", "type": "*composed", "value": "Direction"},
{"tag":"Tenant", "type": "*composed", "value": "Tenant"},
{"tag":"Category", "type": "*composed", "value": "Category"},
{"tag":"Account", "type": "*composed", "value": "Account"},
{"tag":"Subject", "type": "*composed", "value": "Subject"},
{"tag":"Destination", "type": "*composed", "value": "Destination"},
{"tag":"SetupTime", "type": "*datetime", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "type": "*composed", "value": "usage"},
{"tag":"Cost", "type": "*composed", "value": "cost"},
],
"trailer_fields": [],
},
@@ -92,16 +92,16 @@
"export_dir": "/tmp/cgr_kamevapi/cgrates/cdre",
"header_fields": [],
"content_fields": [
{"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"},
{"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"},
{"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"},
{"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"},
{"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"},
{"tag":"Subject", "cdr_field_id": "account", "type": "cdrfield", "value": "account"},
{"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "~destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"},
{"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"},
{"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"},
{"tag": "CgrId", "type": "*composed", "value": "CGRID"},
{"tag":"AccId", "type": "*composed", "value": "OriginID"},
{"tag":"ReqType", "type": "*composed", "value": "RequestType"},
{"tag":"Tenant", "type": "*composed", "value": "Tenant"},
{"tag":"Category", "type": "*composed", "value": "Category"},
{"tag":"Subject", "type": "*composed", "value": "Account"},
{"tag":"Destination", "type": "*composed", "value": "~Destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"},
{"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "type": "*composed", "value": "Usage"},
{"tag":"Cost", "type": "*composed", "value": "Cost"},
],
"trailer_fields": [],
}

View File

@@ -60,21 +60,21 @@
"export_dir": "/tmp/cgr_osipsasync/cgrates/cdre", // path where the exported CDRs will be placed
"header_fields": [], // template of the exported header fields
"content_fields": [ // template of the exported content fields
{"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"},
{"tag":"RunId", "cdr_field_id": "mediation_runid", "type": "cdrfield", "value": "mediation_runid"},
{"tag":"Tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "tor"},
{"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"},
{"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"},
{"tag":"Direction", "cdr_field_id": "direction", "type": "cdrfield", "value": "direction"},
{"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"},
{"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"},
{"tag":"Account", "cdr_field_id": "account", "type": "cdrfield", "value": "account"},
{"tag":"Subject", "cdr_field_id": "subject", "type": "cdrfield", "value": "subject"},
{"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "destination"},
{"tag":"SetupTime", "cdr_field_id": "setup_time", "type": "cdrfield", "value": "setup_time", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"},
{"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"},
{"tag": "CgrId", "type": "*composed", "value": "CGRID"},
{"tag":"RunId", "type": "*composed", "value": "RunID"},
{"tag":"Tor", "type": "*composed", "value": "ToR"},
{"tag":"AccId", "type": "*composed", "value": "OriginID"},
{"tag":"ReqType", "type": "*composed", "value": "RequestType"},
{"tag":"Direction", "type": "*composed", "value": "Direction"},
{"tag":"Tenant", "type": "*composed", "value": "Tenant"},
{"tag":"Category", "type": "*composed", "value": "Category"},
{"tag":"Account", "type": "*composed", "value": "Account"},
{"tag":"Subject", "type": "*composed", "value": "Subject"},
{"tag":"Destination", "type": "*composed", "value": "Destination"},
{"tag":"SetupTime", "type": "*datetime", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "type": "*composed", "value": "Usage"},
{"tag":"Cost", "type": "*composed", "value": "Cost"},
],
"trailer_fields": [], // template of the exported trailer fields
},
@@ -89,19 +89,19 @@
"cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents)
"mask_destination_id": "MASKED_DESTINATIONS", // destination id containing called addresses to be masked on export
"mask_length": 0, // length of the destination suffix to be masked
"export_dir": "/tmp/cgr_osipsasync/cgrates/cdre", // path where the exported CDRs will be placed
"export_directory": "/tmp/cgr_osipsasync/cgrates/cdre", // path where the exported CDRs will be placed
"header_fields": [], // template of the exported header fields
"content_fields": [ // template of the exported content fields
{"tag": "CgrId", "cdr_field_id": "cgrid", "type": "cdrfield", "value": "cgrid"},
{"tag":"AccId", "cdr_field_id": "accid", "type": "cdrfield", "value": "accid"},
{"tag":"ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "reqtype"},
{"tag":"Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "tenant"},
{"tag":"Category", "cdr_field_id": "category", "type": "cdrfield", "value": "category"},
{"tag":"Subject", "cdr_field_id": "account", "type": "cdrfield", "value": "account"},
{"tag":"Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "~destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"},
{"tag":"AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "answer_time", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "usage"},
{"tag":"Cost", "cdr_field_id": "cost", "type": "cdrfield", "value": "cost"},
{"tag": "CgrId", "type": "*composed", "value": "CGRID"},
{"tag":"AccId", "type": "*composed", "value": "OriginID"},
{"tag":"ReqType", "type": "*composed", "value": "RequestType"},
{"tag":"Tenant", "type": "*composed", "value": "Tenant"},
{"tag":"Category", "type": "*composed", "value": "Category"},
{"tag":"Subject", "type": "*composed", "value": "Account"},
{"tag":"Destination", "type": "*datetime", "value": "~Destination:s/^1(\\d+)/+$1/:s/^\\+(\\d+)/00$1/"},
{"tag":"AnswerTime", "type": "*datetime", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "type": "*composed", "value": "Usage"},
{"tag":"Cost", "type": "*composed", "value": "Cost"},
],
"trailer_fields": [],
},

View File

@@ -411,7 +411,7 @@ func APItoModelUsers(attr *utils.TPUsers) (result []TpUser) {
}
func APItoResourceLimit(tpRL *utils.TPResourceLimit, timezone string) (rl *ResourceLimit, err error) {
rl = &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, Filters: make([]*RequestFilter, len(tpRL.Filters))}
rl = &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, Filters: make([]*RequestFilter, len(tpRL.Filters)), Usage: make(map[string]*ResourceUsage)}
for i, tpFltr := range tpRL.Filters {
rf := &RequestFilter{Type: tpFltr.Type, FieldName: tpFltr.FieldName, Values: tpFltr.Values}
if err := rf.CompileValues(); err != nil {

View File

@@ -21,7 +21,7 @@ func TestAPItoResourceLimit(t *testing.T) {
Weight: 10,
Limit: "2",
}
eRL := &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, Filters: make([]*RequestFilter, len(tpRL.Filters))}
eRL := &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, Filters: make([]*RequestFilter, len(tpRL.Filters)), Usage: make(map[string]*ResourceUsage)}
eRL.Filters[0] = &RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}
eRL.Filters[1] = &RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"10", "20"}}
eRL.Filters[2] = &RequestFilter{Type: MetaCDRStats, Values: []string{"CDRST1:*min_ASR:34", "CDRST_1001:*min_ASR:20"},

View File

@@ -126,6 +126,9 @@ func (fltr *RequestFilter) Pass(req interface{}, extraFieldsLabel string, cdrSta
func (fltr *RequestFilter) passString(req interface{}, extraFieldsLabel string) (bool, error) {
strVal, err := utils.ReflectFieldAsString(req, fltr.FieldName, extraFieldsLabel)
if err != nil {
if err == utils.ErrNotFound {
return false, nil
}
return false, err
}
for _, val := range fltr.Values {
@@ -139,6 +142,9 @@ func (fltr *RequestFilter) passString(req interface{}, extraFieldsLabel string)
func (fltr *RequestFilter) passStringPrefix(req interface{}, extraFieldsLabel string) (bool, error) {
strVal, err := utils.ReflectFieldAsString(req, fltr.FieldName, extraFieldsLabel)
if err != nil {
if err == utils.ErrNotFound {
return false, nil
}
return false, err
}
for _, prfx := range fltr.Values {
@@ -157,6 +163,9 @@ func (fltr *RequestFilter) passTimings(req interface{}, extraFieldsLabel string)
func (fltr *RequestFilter) passDestinations(req interface{}, extraFieldsLabel string) (bool, error) {
dst, err := utils.ReflectFieldAsString(req, fltr.FieldName, extraFieldsLabel)
if err != nil {
if err == utils.ErrNotFound {
return false, nil
}
return false, err
}
for _, p := range utils.SplitPrefix(dst, MIN_PREFIX_MATCH) {
@@ -177,6 +186,9 @@ func (fltr *RequestFilter) passDestinations(req interface{}, extraFieldsLabel st
func (fltr *RequestFilter) passRSRFields(req interface{}, extraFieldsLabel string) (bool, error) {
for _, rsrFld := range fltr.rsrFields {
if strVal, err := utils.ReflectFieldAsString(req, rsrFld.Id, extraFieldsLabel); err != nil {
if err == utils.ErrNotFound {
return false, nil
}
return false, err
} else if rsrFld.FilterPasses(strVal) {
return true, nil

View File

@@ -79,8 +79,10 @@ func TestPassStringPrefix(t *testing.T) {
t.Error("Not passes filter")
}
rf = &RequestFilter{Type: MetaStringPrefix, FieldName: "nonexisting", Values: []string{"off"}}
if _, err := rf.passStringPrefix(cd, "ExtraFields"); err == nil || err != utils.ErrNotFound {
if passing, err := rf.passStringPrefix(cd, "ExtraFields"); err != nil {
t.Error(err)
} else if passing {
t.Error("Passes filter")
}
}

View File

@@ -31,19 +31,67 @@ import (
"github.com/cgrates/rpcclient"
)
type ResourceUsage struct {
ID string // Unique identifier of this resourceUsage, Eg: FreeSWITCH UUID
UsageTime time.Time // So we can expire it later
UsageUnits float64 // Number of units used
}
// ResourceLimit represents a limit imposed for accessing a resource (eg: new calls)
type ResourceLimit struct {
ID string // Identifier of this limit
Filters []*RequestFilter // Filters for the request
ActivationTime time.Time // Time when this limit becomes active
Weight float64 // Weight to sort the ResourceLimits
Limit float64 // Limit value
ActionTriggers ActionTriggers // Thresholds to check after changing Limit
Used utils.Int64Slice // []time.Time.Unix() - keep it in this format so we can expire usage automatically
ExpiryTime time.Time
Weight float64 // Weight to sort the ResourceLimits
Limit float64 // Limit value
ActionTriggers ActionTriggers // Thresholds to check after changing Limit
UsageTTL time.Duration // Expire usage after this duration
Usage map[string]*ResourceUsage // Keep a record of usage, bounded with timestamps so we can expire too long records
usageCounter float64 // internal counter aggregating real usage of ResourceLimit
}
func (rl *ResourceLimit) removeExpiredUnits() {
for ruID, rv := range rl.Usage {
if time.Now().Sub(rv.UsageTime) <= rl.UsageTTL {
continue // not expired
}
delete(rl.Usage, ruID)
rl.usageCounter -= rv.UsageUnits
}
}
func (rl *ResourceLimit) UsedUnits() float64 {
if rl.UsageTTL != 0 {
rl.removeExpiredUnits()
}
return rl.usageCounter
}
func (rl *ResourceLimit) RecordUsage(ru *ResourceUsage) error {
if _, hasID := rl.Usage[ru.ID]; hasID {
return fmt.Errorf("Duplicate resource usage with id: %s", ru.ID)
}
rl.Usage[ru.ID] = ru
rl.usageCounter += ru.UsageUnits
return nil
}
func (rl *ResourceLimit) RemoveUsage(ruID string) error {
ru, hasIt := rl.Usage[ruID]
if !hasIt {
return fmt.Errorf("Cannot find usage record with id: %s", ruID)
}
delete(rl.Usage, ru.ID)
rl.usageCounter -= ru.UsageUnits
return nil
}
// Pas the config as a whole so we can ask access concurrently
func NewResourceLimiterService(cfg *config.CGRConfig, dataDB AccountingStorage, cdrStatS rpcclient.RpcClientConnection) (*ResourceLimiterService, error) {
if cdrStatS != nil && reflect.ValueOf(cdrStatS).IsNil() {
cdrStatS = nil
}
rls := &ResourceLimiterService{stringIndexes: make(map[string]map[string]utils.StringMap), dataDB: dataDB, cdrStatS: cdrStatS}
return rls, nil
}
@@ -59,7 +107,7 @@ type ResourceLimiterService struct {
// Index cached ResourceLimits with MetaString filter types
func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error {
utils.Logger.Info("<RLs> Start indexing string filters")
newStringIndexes := make(map[string]map[string]utils.StringMap) // Index it transactionally
newStringIndexes := make(map[string]map[string]utils.StringMap) // Index it transactional
var cacheIDsToIndex []string // Cache keys of RLs to be indexed
if rlIDs == nil {
cacheIDsToIndex = cache2go.GetEntriesKeys(utils.ResourceLimitsPrefix)
@@ -74,10 +122,12 @@ func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error {
return utils.ErrNotFound
}
rl := x.(*ResourceLimit)
var hasMetaString bool
for _, fltr := range rl.Filters {
if fltr.Type != MetaString {
continue
}
hasMetaString = true // Mark that we found at least one metatring so we don't need to index globally
if _, hastIt := newStringIndexes[fltr.FieldName]; !hastIt {
newStringIndexes[fltr.FieldName] = make(map[string]utils.StringMap)
}
@@ -88,6 +138,15 @@ func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error {
newStringIndexes[fltr.FieldName][fldVal][rl.ID] = true
}
}
if !hasMetaString {
if _, hasIt := newStringIndexes[utils.NOT_AVAILABLE]; !hasIt {
newStringIndexes[utils.NOT_AVAILABLE] = make(map[string]utils.StringMap)
}
if _, hasIt := newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE]; !hasIt {
newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] = make(utils.StringMap)
}
newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE][rl.ID] = true // Fields without real field index will be located in map[NOT_AVAILABLE][NOT_AVAILABLE][rl.ID]
}
}
rls.Lock()
defer rls.Unlock()
@@ -117,12 +176,11 @@ func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error {
// Called when cache/re-caching is necessary
func (rls *ResourceLimiterService) cacheResourceLimits(loadID string, rlIDs []string) error {
if len(rlIDs) == 0 {
return nil
}
if rlIDs == nil {
utils.Logger.Info("<RLs> Start caching all resource limits")
} else if len(rlIDs) != 0 {
} else if len(rlIDs) == 0 {
return nil
} else {
utils.Logger.Info(fmt.Sprintf("<RLs> Start caching resource limits with ids: %+v", rlIDs))
}
if err := rls.dataDB.PreloadCacheForPrefix(utils.ResourceLimitsPrefix); err != nil {
@@ -132,6 +190,72 @@ func (rls *ResourceLimiterService) cacheResourceLimits(loadID string, rlIDs []st
return rls.indexStringFilters(rlIDs)
}
func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]interface{}) (map[string]*ResourceLimit, error) {
matchingResources := make(map[string]*ResourceLimit)
for fldName, fieldValIf := range ev {
if _, hasIt := rls.stringIndexes[fldName]; !hasIt {
continue
}
strVal, canCast := utils.CastFieldIfToString(fieldValIf)
if !canCast {
return nil, fmt.Errorf("Cannot cast field: %s into string", fldName)
}
if _, hasIt := rls.stringIndexes[fldName][strVal]; !hasIt {
continue
}
for resName := range rls.stringIndexes[fldName][strVal] {
if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL
continue
}
x, err := CacheGet(utils.ResourceLimitsPrefix + resName)
if err != nil {
return nil, err
}
rl := x.(*ResourceLimit)
now := time.Now()
if rl.ActivationTime.After(now) || (!rl.ExpiryTime.IsZero() && rl.ExpiryTime.Before(now)) { // not active
continue
}
passAllFilters := true
for _, fltr := range rl.Filters {
if pass, err := fltr.Pass(ev, "", rls.cdrStatS); err != nil {
return nil, utils.NewErrServerError(err)
} else if !pass {
passAllFilters = false
continue
}
}
if passAllFilters {
matchingResources[rl.ID] = rl // Cannot save it here since we could have errors after and resource will remain unused
}
}
}
// Check un-indexed resources
for resName := range rls.stringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] {
if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL
continue
}
x, err := CacheGet(utils.ResourceLimitsPrefix + resName)
if err != nil {
return nil, err
}
rl := x.(*ResourceLimit)
now := time.Now()
if rl.ActivationTime.After(now) || (!rl.ExpiryTime.IsZero() && rl.ExpiryTime.Before(now)) { // not active
continue
}
for _, fltr := range rl.Filters {
if pass, err := fltr.Pass(ev, "", rls.cdrStatS); err != nil {
return nil, utils.NewErrServerError(err)
} else if !pass {
continue
}
matchingResources[rl.ID] = rl // Cannot save it here since we could have errors after and resource will remain unused
}
}
return matchingResources, nil
}
// Called to start the service
func (rls *ResourceLimiterService) Start() error {
if err := rls.cacheResourceLimits("ResourceLimiterServiceStart", nil); err != nil {
@@ -145,7 +269,7 @@ func (rls *ResourceLimiterService) Shutdown() error {
return nil
}
// RPC Methods available internally
// RPC Methods
// Cache/Re-cache
func (rls *ResourceLimiterService) V1CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error {
@@ -156,6 +280,83 @@ func (rls *ResourceLimiterService) V1CacheResourceLimits(attrs *utils.AttrRLsCac
return nil
}
// Alias API for external usage
func (rls *ResourceLimiterService) CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error {
return rls.V1CacheResourceLimits(attrs, reply)
}
func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error {
rls.Lock() // Unknown number of RLs updated
defer rls.Unlock()
matchingRLForEv, err := rls.matchingResourceLimitsForEvent(ev)
if err != nil {
return utils.NewErrServerError(err)
}
retRLs := make([]*ResourceLimit, len(matchingRLForEv))
i := 0
for _, rl := range matchingRLForEv {
retRLs[i] = rl
i++
}
*reply = retRLs
return nil
}
// Alias API for external usage
func (rls *ResourceLimiterService) ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error {
return rls.V1ResourceLimitsForEvent(ev, reply)
}
// Called when a session or another event needs to
func (rls *ResourceLimiterService) V1InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
rls.Lock() // Unknown number of RLs updated
defer rls.Unlock()
matchingRLForEv, err := rls.matchingResourceLimitsForEvent(attrs.Event)
if err != nil {
return utils.NewErrServerError(err)
}
for rlID, rl := range matchingRLForEv {
if rl.Limit < rl.UsedUnits()+attrs.RequestedUnits {
delete(matchingRLForEv, rlID)
}
if err := rl.RecordUsage(&ResourceUsage{ID: attrs.ResourceUsageID, UsageTime: time.Now(), UsageUnits: attrs.RequestedUnits}); err != nil {
return err // Should not happen
}
}
if len(matchingRLForEv) == 0 {
return utils.ErrResourceUnavailable
}
for _, rl := range matchingRLForEv {
CacheSet(utils.ResourceLimitsPrefix+rl.ID, rl)
}
*reply = utils.OK
return nil
}
// Alias for externam methods
func (rls *ResourceLimiterService) InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
return rls.V1InitiateResourceUsage(attrs, reply)
}
func (rls *ResourceLimiterService) V1TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
rls.Lock() // Unknown number of RLs updated
defer rls.Unlock()
matchingRLForEv, err := rls.matchingResourceLimitsForEvent(attrs.Event)
if err != nil {
return utils.NewErrServerError(err)
}
for _, rl := range matchingRLForEv {
rl.RemoveUsage(attrs.ResourceUsageID)
}
*reply = utils.OK
return nil
}
// Alias for external methods
func (rls *ResourceLimiterService) TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
return rls.V1TerminateResourceUsage(attrs, reply)
}
// Make the service available as RPC internally
func (rls *ResourceLimiterService) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")

View File

@@ -27,7 +27,9 @@ import (
"github.com/cgrates/cgrates/utils"
)
func TestIndexStringFilters(t *testing.T) {
var rLS *ResourceLimiterService
func TestRLsIndexStringFilters(t *testing.T) {
rls := []*ResourceLimit{
&ResourceLimit{
ID: "RL1",
@@ -39,6 +41,7 @@ func TestIndexStringFilters(t *testing.T) {
}},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 2,
Usage: make(map[string]*ResourceUsage),
},
&ResourceLimit{
ID: "RL2",
@@ -49,12 +52,35 @@ func TestIndexStringFilters(t *testing.T) {
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
UsageTTL: time.Duration(1 * time.Millisecond),
Usage: make(map[string]*ResourceUsage),
},
&ResourceLimit{
ID: "RL4",
Weight: 10,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"+49"}},
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
Usage: make(map[string]*ResourceUsage),
},
&ResourceLimit{
ID: "RL5",
Weight: 10,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"+40"}},
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
UsageTTL: time.Duration(10 * time.Millisecond),
Usage: make(map[string]*ResourceUsage),
},
}
for _, rl := range rls {
cache2go.Set(utils.ResourceLimitsPrefix+rl.ID, rl)
}
rLS := new(ResourceLimiterService)
rLS = new(ResourceLimiterService)
eIndexes := map[string]map[string]utils.StringMap{
"Account": map[string]utils.StringMap{
"1001": utils.StringMap{
@@ -73,6 +99,12 @@ func TestIndexStringFilters(t *testing.T) {
"RL2": true,
},
},
utils.NOT_AVAILABLE: map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
"RL4": true,
"RL5": true,
},
},
}
if err := rLS.indexStringFilters(nil); err != nil {
t.Error(err)
@@ -88,8 +120,21 @@ func TestIndexStringFilters(t *testing.T) {
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
Usage: make(map[string]*ResourceUsage),
}
cache2go.Set(utils.ResourceLimitsPrefix+rl3.ID, rl3)
rl6 := &ResourceLimit{ // Add it so we can test expiryTime
ID: "RL6",
Weight: 10,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}},
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
Usage: make(map[string]*ResourceUsage),
}
CacheSet(utils.ResourceLimitsPrefix+rl6.ID, rl6)
eIndexes = map[string]map[string]utils.StringMap{
"Account": map[string]utils.StringMap{
"1001": utils.StringMap{
@@ -107,16 +152,155 @@ func TestIndexStringFilters(t *testing.T) {
"dan": utils.StringMap{
"RL2": true,
"RL3": true,
"RL6": true,
},
"1003": utils.StringMap{
"RL3": true,
},
},
utils.NOT_AVAILABLE: map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
"RL4": true,
"RL5": true,
},
},
}
// Test index update
if err := rLS.indexStringFilters([]string{rl3.ID}); err != nil {
if err := rLS.indexStringFilters([]string{rl3.ID, rl6.ID}); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIndexes, rLS.stringIndexes) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, rLS.stringIndexes)
}
}
func TestRLsMatchingResourceLimitsForEvent(t *testing.T) {
eResLimits := map[string]*ResourceLimit{
"RL1": &ResourceLimit{
ID: "RL1",
Weight: 20,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}},
&RequestFilter{Type: MetaRSRFields, Values: []string{"Subject(~^1.*1$)", "Destination(1002)"},
rsrFields: utils.ParseRSRFieldsMustCompile("Subject(~^1.*1$);Destination(1002)", utils.INFIELD_SEP),
}},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 2,
Usage: make(map[string]*ResourceUsage),
},
"RL2": &ResourceLimit{
ID: "RL2",
Weight: 10,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"dan", "1002"}},
&RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}},
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
UsageTTL: time.Duration(1 * time.Millisecond),
Usage: make(map[string]*ResourceUsage),
},
}
if resLimits, err := rLS.matchingResourceLimitsForEvent(map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"}); err != nil {
t.Error(err)
} else if len(eResLimits) != len(resLimits) {
t.Errorf("Expecting: %+v, received: %+v", eResLimits, resLimits)
} else {
for rlID := range eResLimits {
if _, hasID := resLimits[rlID]; !hasID {
t.Errorf("Expecting: %+v, received: %+v", eResLimits, resLimits)
}
}
// Make sure the filters are what we expect to be after retrieving from cache:
fltr := resLimits["RL1"].Filters[1]
if pass, _ := fltr.Pass(map[string]interface{}{"Subject": "10000001"}, "", nil); !pass {
t.Errorf("Expecting RL: %+v, received: %+v", eResLimits["RL1"], resLimits["RL1"])
}
if pass, _ := fltr.Pass(map[string]interface{}{"Account": "1002"}, "", nil); pass {
t.Errorf("Expecting RL: %+v, received: %+v", eResLimits["RL1"], resLimits["RL1"])
}
}
}
func TestRLsV1ResourceLimitsForEvent(t *testing.T) {
eLimits := []*ResourceLimit{
&ResourceLimit{
ID: "RL1",
Weight: 20,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}},
&RequestFilter{Type: MetaRSRFields, Values: []string{"Subject(~^1.*1$)", "Destination(1002)"},
rsrFields: utils.ParseRSRFieldsMustCompile("Subject(~^1.*1$);Destination(1002)", utils.INFIELD_SEP),
}},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 2,
Usage: make(map[string]*ResourceUsage),
},
&ResourceLimit{
ID: "RL2",
Weight: 10,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"dan", "1002"}},
&RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}},
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
UsageTTL: time.Duration(1 * time.Millisecond),
Usage: make(map[string]*ResourceUsage),
},
}
var rcvLmts []*ResourceLimit
if err := rLS.V1ResourceLimitsForEvent(map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"}, &rcvLmts); err != nil {
t.Error(err)
} else if len(eLimits) != len(rcvLmts) {
t.Errorf("Expecting: %+v, received: %+v", eLimits, rcvLmts)
}
}
func TestRLsV1InitiateResourceUsage(t *testing.T) {
attrRU := utils.AttrRLsResourceUsage{
ResourceUsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50",
Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"},
RequestedUnits: 1,
}
var reply string
if err := rLS.V1InitiateResourceUsage(attrRU, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Received reply: ", reply)
}
resLimits, err := rLS.matchingResourceLimitsForEvent(attrRU.Event)
if err != nil {
t.Error(err)
} else if len(resLimits) != 2 {
t.Errorf("Received: %+v", resLimits)
} else if resLimits["RL1"].UsedUnits() != 1 {
t.Errorf("RL1: %+v", resLimits["RL1"])
} else if _, hasKey := resLimits["RL1"].Usage[attrRU.ResourceUsageID]; !hasKey {
t.Errorf("RL1: %+v", resLimits["RL1"])
}
}
func TestRLsV1TerminateResourceUsage(t *testing.T) {
attrRU := utils.AttrRLsResourceUsage{
ResourceUsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50",
Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"},
RequestedUnits: 1,
}
var reply string
if err := rLS.V1TerminateResourceUsage(attrRU, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Received reply: ", reply)
}
resLimits, err := rLS.matchingResourceLimitsForEvent(attrRU.Event)
if err != nil {
t.Error(err)
} else if len(resLimits) != 2 {
t.Errorf("Received: %+v", resLimits)
} else if resLimits["RL1"].UsedUnits() != 0 {
t.Errorf("RL1: %+v", resLimits["RL1"])
} else if _, hasKey := resLimits["RL1"].Usage[attrRU.ResourceUsageID]; hasKey {
t.Errorf("RL1: %+v", resLimits["RL1"])
}
}

View File

@@ -372,6 +372,32 @@ func (fsev FSEvent) ComputeLcr() bool {
}
}
// Used with RLs
func (fsev FSEvent) AsMapStringInterface(timezone string) map[string]interface{} {
mp := make(map[string]interface{})
mp[utils.CGRID] = fsev.GetCgrId(timezone)
mp[utils.TOR] = utils.VOICE
mp[utils.ACCID] = fsev.GetUUID()
mp[utils.CDRHOST] = fsev.GetOriginatorIP(utils.META_DEFAULT)
mp[utils.CDRSOURCE] = "FS_" + fsev.GetName()
mp[utils.REQTYPE] = fsev.GetReqType(utils.META_DEFAULT)
mp[utils.DIRECTION] = fsev.GetDirection(utils.META_DEFAULT)
mp[utils.TENANT] = fsev.GetTenant(utils.META_DEFAULT)
mp[utils.CATEGORY] = fsev.GetCategory(utils.META_DEFAULT)
mp[utils.ACCOUNT] = fsev.GetAccount(utils.META_DEFAULT)
mp[utils.SUBJECT] = fsev.GetSubject(utils.META_DEFAULT)
mp[utils.DESTINATION] = fsev.GetDestination(utils.META_DEFAULT)
mp[utils.SETUP_TIME], _ = fsev.GetSetupTime(utils.META_DEFAULT, timezone)
mp[utils.ANSWER_TIME], _ = fsev.GetAnswerTime(utils.META_DEFAULT, timezone)
mp[utils.USAGE], _ = fsev.GetDuration(utils.META_DEFAULT)
mp[utils.PDD], _ = fsev.GetPdd(utils.META_DEFAULT)
mp[utils.COST] = -1
mp[utils.SUPPLIER] = fsev.GetSupplier(utils.META_DEFAULT)
mp[utils.DISCONNECT_CAUSE] = fsev.GetDisconnectCause(utils.META_DEFAULT)
//storCdr.ExtraFields = fsev.GetExtraFields()
return mp
}
// Converts into CallDescriptor due to responder interface needs
func (fsev FSEvent) AsCallDescriptor() (*engine.CallDescriptor, error) {
lcrReq := &engine.LcrRequest{

View File

@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"log/syslog"
"reflect"
"strconv"
"strings"
"time"
@@ -33,13 +34,17 @@ import (
"github.com/cgrates/rpcclient"
)
func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs rpcclient.RpcClientConnection, timezone string) *FSSessionManager {
func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs, rls rpcclient.RpcClientConnection, timezone string) *FSSessionManager {
if rls != nil && reflect.ValueOf(rls).IsNil() {
rls = nil
}
return &FSSessionManager{
cfg: smFsConfig,
conns: make(map[string]*fsock.FSock),
senderPools: make(map[string]*fsock.FSockPool),
rater: rater,
cdrsrv: cdrs,
rls: rls,
sessions: NewSessions(),
timezone: timezone,
}
@@ -53,6 +58,7 @@ type FSSessionManager struct {
senderPools map[string]*fsock.FSockPool // Keep sender pools here
rater rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
rls rpcclient.RpcClientConnection
sessions *Sessions
timezone string
@@ -159,6 +165,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) {
if err = sm.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_API_ERROR: %s", err.Error()))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
}
if lcr.HasErrors() {
lcr.LogErrors()
@@ -174,9 +181,28 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) {
if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n", ev.GetUUID(), utils.CGR_SUPPLIERS, fsArray)); err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_ERROR: %s", err.Error()))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
}
}
}
if sm.rls != nil {
var reply string
attrRU := utils.AttrRLsResourceUsage{
ResourceUsageID: ev.GetUUID(),
Event: ev.(FSEvent).AsMapStringInterface(sm.timezone),
RequestedUnits: 1,
}
if err := sm.rls.Call("RLsV1.InitiateResourceUsage", attrRU, &reply); err != nil {
if err.Error() == utils.ErrResourceUnavailable.Error() {
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), "-"+utils.ErrResourceUnavailable.Error())
} else {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> RLs API error: %s", err.Error()))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
}
return
}
}
// Check ResourceLimits
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK)
}
@@ -224,6 +250,17 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) {
if sm.cfg.CreateCdr {
sm.ProcessCdr(ev.AsStoredCdr(config.CgrConfig().DefaultTimezone))
}
var reply string
attrRU := utils.AttrRLsResourceUsage{
ResourceUsageID: ev.GetUUID(),
Event: ev.(FSEvent).AsMapStringInterface(sm.timezone),
RequestedUnits: 1,
}
if sm.rls != nil {
if err := sm.rls.Call("RLsV1.TerminateResourceUsage", attrRU, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> RLs API error: %s", err.Error()))
}
}
}
// Connects to the freeswitch mod_event_socket server and starts

View File

@@ -1093,7 +1093,7 @@ type AttrExportCdrsToFile struct {
CdrFormat *string // Cdr output file format <utils.CdreCdrFormats>
FieldSeparator *string // Separator used between fields
ExportID *string // Optional exportid
ExportFolder *string // If provided it overwrites the configured export directory
ExportDirectory *string // If provided it overwrites the configured export directory
ExportFileName *string // If provided the output filename will be set to this
ExportTemplate *string // Exported fields template <""|fld1,fld2|*xml:instance_name>
DataUsageMultiplyFactor *float64 // Multiply data usage before export (eg: convert from KBytes to Bytes)
@@ -1222,3 +1222,9 @@ type AttrRLsCache struct {
LoadID string
ResourceLimitIDs []string
}
type AttrRLsResourceUsage struct {
ResourceUsageID string
Event map[string]interface{}
RequestedUnits float64
}

View File

@@ -32,6 +32,7 @@ var (
ErrUserNotFound = errors.New("USER_NOT_FOUND")
ErrInsufficientCredit = errors.New("INSUFFICIENT_CREDIT")
ErrNotConvertible = errors.New("NOT_CONVERTIBLE")
ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE")
CdreCdrFormats = []string{CSV, DRYRUN, CDRE_FIXED_WIDTH}
PrimaryCdrFields = []string{CGRID, CDRSOURCE, CDRHOST, ACCID, TOR, REQTYPE, DIRECTION, TENANT, CATEGORY, ACCOUNT, SUBJECT, DESTINATION, SETUP_TIME, PDD, ANSWER_TIME, USAGE,

View File

@@ -60,7 +60,19 @@ func ReflectFieldAsString(intf interface{}, fldName, extraFieldsLabel string) (s
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
field := v.FieldByName(fldName)
var field reflect.Value
switch v.Kind() {
case reflect.Struct:
field = v.FieldByName(fldName)
case reflect.Map:
field = v.MapIndex(reflect.ValueOf(fldName))
if !field.IsValid() { // Not looking in extra fields anymore
return "", ErrNotFound
}
default:
return "", fmt.Errorf("Unsupported field kind: %v", v.Kind())
}
if !field.IsValid() {
if extraFieldsLabel == "" {
return "", ErrNotFound

View File

@@ -22,7 +22,7 @@ import (
"testing"
)
func TestReflectFieldAsString(t *testing.T) {
func TestReflectFieldAsStringOnStruct(t *testing.T) {
mystruct := struct {
Title string
Count int
@@ -71,3 +71,48 @@ func TestReflectFieldAsString(t *testing.T) {
t.Error("Received: %s", strVal)
}
}
func TestReflectFieldAsStringOnMap(t *testing.T) {
myMap := map[string]interface{}{"Title": "Title1", "Count": 5, "Count64": int64(6), "Val": 7.3,
"a": "Title2", "b": 15, "c": int64(16), "d": 17.3}
if strVal, err := ReflectFieldAsString(myMap, "Title", ""); err != nil {
t.Error(err)
} else if strVal != "Title1" {
t.Error("Received: %s", strVal)
}
if strVal, err := ReflectFieldAsString(myMap, "Count", ""); err != nil {
t.Error(err)
} else if strVal != "5" {
t.Error("Received: %s", strVal)
}
if strVal, err := ReflectFieldAsString(myMap, "Count64", ""); err != nil {
t.Error(err)
} else if strVal != "6" {
t.Error("Received: %s", strVal)
}
if strVal, err := ReflectFieldAsString(myMap, "Val", ""); err != nil {
t.Error(err)
} else if strVal != "7.3" {
t.Error("Received: %s", strVal)
}
if strVal, err := ReflectFieldAsString(myMap, "a", ""); err != nil {
t.Error(err)
} else if strVal != "Title2" {
t.Error("Received: %s", strVal)
}
if strVal, err := ReflectFieldAsString(myMap, "b", ""); err != nil {
t.Error(err)
} else if strVal != "15" {
t.Error("Received: %s", strVal)
}
if strVal, err := ReflectFieldAsString(myMap, "c", ""); err != nil {
t.Error(err)
} else if strVal != "16" {
t.Error("Received: %s", strVal)
}
if strVal, err := ReflectFieldAsString(myMap, "d", ""); err != nil {
t.Error(err)
} else if strVal != "17.3" {
t.Error("Received: %s", strVal)
}
}