Merge branch 'master' into master

This commit is contained in:
Dan Christian Bogos
2017-10-27 19:47:47 +02:00
committed by GitHub
22 changed files with 226 additions and 68 deletions

View File

@@ -103,6 +103,49 @@ var tEvs = []*engine.ThresholdEvent{
utils.ACCOUNT: "1002",
utils.ResourceID: "RES_GRP_1",
utils.USAGE: 10.0}},
&engine.ThresholdEvent{ // hitting THD_RES_1
Tenant: "cgrates.org",
ID: "event6",
Event: map[string]interface{}{
utils.EventType: utils.ResourceUpdate,
utils.ACCOUNT: "1002",
utils.ResourceID: "RES_GRP_1",
utils.USAGE: 10.0}},
&engine.ThresholdEvent{ // hitting THD_RES_1
Tenant: "cgrates.org",
ID: "event6",
Event: map[string]interface{}{
utils.EventType: utils.ResourceUpdate,
utils.ACCOUNT: "1002",
utils.ResourceID: "RES_GRP_1",
utils.USAGE: 10.0}},
&engine.ThresholdEvent{ // hitting THD_CDRS_1
Tenant: "cgrates.org",
ID: "cdrev1",
Event: map[string]interface{}{
utils.EventType: utils.CDR,
"field_extr1": "val_extr1",
"fieldextr2": "valextr2",
utils.CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()),
utils.MEDI_RUNID: utils.MetaRaw,
utils.ORDERID: 123,
utils.CDRHOST: "192.168.1.1",
utils.CDRSOURCE: utils.UNIT_TEST,
utils.ACCID: "dsafdsaf",
utils.TOR: utils.VOICE,
utils.REQTYPE: utils.META_RATED,
utils.DIRECTION: "*out",
utils.TENANT: "cgrates.org",
utils.CATEGORY: "call",
utils.ACCOUNT: "1007",
utils.SUBJECT: "1007",
utils.DESTINATION: "+4986517174963",
utils.SETUP_TIME: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC),
utils.PDD: time.Duration(0) * time.Second,
utils.ANSWER_TIME: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
utils.USAGE: time.Duration(10) * time.Second,
utils.SUPPLIER: "SUPPL1",
utils.COST: -1.0}},
}
var sTestsThresholdSV1 = []func(t *testing.T){
@@ -190,7 +233,7 @@ func testV1TSFromFolder(t *testing.T) {
func testV1TSGetThresholds(t *testing.T) {
var tIDs []string
expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_ACNT_EXPIRED", "THD_STATS_3"}
expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_ACNT_EXPIRED", "THD_STATS_3", "THD_CDRS_1"}
if err := tSv1Rpc.Call("ThresholdSV1.GetThresholdIDs", "cgrates.org", &tIDs); err != nil {
t.Error(err)
} else if len(expectedIDs) != len(tIDs) {
@@ -244,6 +287,24 @@ func testV1TSProcessEvent(t *testing.T) {
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
}
eHits = 1
if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[6], &hits); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
}
eHits = 1
if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[7], &hits); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
}
eHits = 1
if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[8], &hits); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
}
}
func testV1TSGetThresholdsAfterProcess(t *testing.T) {

View File

@@ -390,10 +390,10 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
cdrDb engine.CdrStorage, dm *engine.DataManager,
internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
internalCdrStatSChan, internalStatSChan chan rpcclient.RpcClientConnection,
internalCdrStatSChan, internalThresholdSChan, internalStatSChan chan rpcclient.RpcClientConnection,
server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS CDRS service.")
var ralConn, pubSubConn, usersConn, aliasesConn, cdrstatsConn, statsConn *rpcclient.RpcClientPool
var ralConn, pubSubConn, usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn *rpcclient.RpcClientPool
if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl)
@@ -439,6 +439,15 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
return
}
}
if len(cfg.CDRSThresholdSConns) != 0 { // Stats connection init
thresholdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSThresholdSConns, internalThresholdSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to ThresholdS: %s", err.Error()))
exitChan <- true
return
}
}
if len(cfg.CDRSStatSConns) != 0 { // Stats connection init
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSStatSConns, internalStatSChan, cfg.InternalTtl)
@@ -449,7 +458,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
}
}
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, dm, ralConn, pubSubConn,
usersConn, aliasesConn, cdrstatsConn, statsConn)
usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn)
cdrServer.SetTimeToLive(cfg.ResponseCacheTTL, nil)
utils.Logger.Info("Registering CDRS HTTP Handlers.")
cdrServer.RegisterHandlersToServer(server)
@@ -541,7 +550,7 @@ func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.
return
}
}
rS, err := engine.NewResourceService(dm, cfg.ResourceSCfg().StoreInterval, thdSConn, filterS)
rS, err := engine.NewResourceService(dm, cfg.ResourceSCfg().StoreInterval, thdSConn, filterS, cfg.ResourceSCfg().IndexedFields)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not init, error: %s", err.Error()))
exitChan <- true
@@ -577,7 +586,7 @@ func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.R
return
}
}
sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn, filterS)
sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn, filterS, cfg.StatSCfg().IndexedFields)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
exitChan <- true
@@ -602,7 +611,7 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
filterS := <-filterSChan
filterSChan <- filterS
tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().FilteredFields,
tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().IndexedFields,
cfg.ThresholdSCfg().StoreInterval, filterS)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<ThresholdS> Could not init, error: %s", err.Error()))
@@ -829,7 +838,7 @@ func main() {
if cfg.CDRSEnabled {
go startCDRS(internalCdrSChan, cdrDb, dm,
internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
internalCdrStatSChan, internalStatSChan, server, exitChan)
internalCdrStatSChan, internalThresholdSChan, internalStatSChan, server, exitChan)
}
// Start CDR Stats server

View File

@@ -249,6 +249,7 @@ type CGRConfig struct {
CDRSUserSConns []*HaPoolConfig // address where to reach the users service: <""|internal|x.y.z.y:1234>
CDRSAliaseSConns []*HaPoolConfig // address where to reach the aliases service: <""|internal|x.y.z.y:1234>
CDRSCDRStatSConns []*HaPoolConfig // address where to reach the cdrstats service. Empty to disable cdrstats gathering <""|internal|x.y.z.y:1234>
CDRSThresholdSConns []*HaPoolConfig // address where to reach the thresholds service
CDRSStatSConns []*HaPoolConfig
CDRSOnlineCDRExports []string // list of CDRE templates to use for real-time CDR exports
CDRStatsEnabled bool // Enable CDR Stats service
@@ -968,6 +969,13 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
self.CDRSCDRStatSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCdrsCfg.Thresholds_conns != nil {
self.CDRSThresholdSConns = make([]*HaPoolConfig, len(*jsnCdrsCfg.Thresholds_conns))
for idx, jsnHaCfg := range *jsnCdrsCfg.Thresholds_conns {
self.CDRSThresholdSConns[idx] = NewDfltHaPoolConfig()
self.CDRSThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCdrsCfg.Stats_conns != nil {
self.CDRSStatSConns = make([]*HaPoolConfig, len(*jsnCdrsCfg.Stats_conns))
for idx, jsnHaCfg := range *jsnCdrsCfg.Stats_conns {

View File

@@ -155,6 +155,7 @@ const CGRATES_CFG_JSON = `
"users_conns": [], // address where to reach the user service, empty to disable user profile functionality: <""|*internal|x.y.z.y:1234>
"aliases_conns": [], // address where to reach the aliases service, empty to disable aliases functionality: <""|*internal|x.y.z.y:1234>
"cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable cdrstats functionality: <""|*internal|x.y.z.y:1234>
"thresholds_conns": [], // address where to reach the thresholds service, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234>
"stats_conns": [], // address where to reach the stat service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"online_cdr_exports":[], // list of CDRE profiles to use for real-time CDR exports
},
@@ -423,6 +424,7 @@ const CGRATES_CFG_JSON = `
"enabled": false, // starts ResourceLimiter service: <true|false>.
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
"thresholds_conns": [], // address where to reach the thresholds service, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234>
"indexed_fields": [], // query indexes based on these fields for faster processing
},
@@ -430,13 +432,14 @@ const CGRATES_CFG_JSON = `
"enabled": false, // starts Stat service: <true|false>.
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
"thresholds_conns": [], // address where to reach the thresholds service, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234>
"indexed_fields": [], // query indexes based on these fields for faster processing
},
"thresholds": {
"enabled": false, // starts ThresholdS service: <true|false>.
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
"filtered_fields": [], // match filters based on these fields for dynamic filtering, empty to use all
"indexed_fields": [], // query indexes based on these fields for faster processing
},

View File

@@ -231,6 +231,7 @@ func TestDfCdrsJsonCfg(t *testing.T) {
Users_conns: &[]*HaPoolJsonCfg{},
Aliases_conns: &[]*HaPoolJsonCfg{},
Cdrstats_conns: &[]*HaPoolJsonCfg{},
Thresholds_conns: &[]*HaPoolJsonCfg{},
Stats_conns: &[]*HaPoolJsonCfg{},
Online_cdr_exports: &[]string{},
}
@@ -692,6 +693,7 @@ func TestDfResourceLimiterSJsonCfg(t *testing.T) {
Enabled: utils.BoolPointer(false),
Thresholds_conns: &[]*HaPoolJsonCfg{},
Store_interval: utils.StringPointer(""),
Indexed_fields: utils.StringSlicePointer([]string{}),
}
if cfg, err := dfCgrJsonCfg.ResourceSJsonCfg(); err != nil {
t.Error(err)
@@ -705,6 +707,7 @@ func TestDfStatServiceJsonCfg(t *testing.T) {
Enabled: utils.BoolPointer(false),
Store_interval: utils.StringPointer(""),
Thresholds_conns: &[]*HaPoolJsonCfg{},
Indexed_fields: utils.StringSlicePointer([]string{}),
}
if cfg, err := dfCgrJsonCfg.StatSJsonCfg(); err != nil {
t.Error(err)
@@ -715,9 +718,9 @@ func TestDfStatServiceJsonCfg(t *testing.T) {
func TestDfThresholdSJsonCfg(t *testing.T) {
eCfg := &ThresholdSJsonCfg{
Enabled: utils.BoolPointer(false),
Store_interval: utils.StringPointer(""),
Filtered_fields: utils.StringSlicePointer([]string{}),
Enabled: utils.BoolPointer(false),
Store_interval: utils.StringPointer(""),
Indexed_fields: utils.StringSlicePointer([]string{}),
}
if cfg, err := dfCgrJsonCfg.ThresholdSJsonCfg(); err != nil {
t.Error(err)

View File

@@ -341,6 +341,12 @@ func TestCgrCfgJSONDefaultsCDRS(t *testing.T) {
if !reflect.DeepEqual(cgrCfg.CDRSCDRStatSConns, eHaPoolCfg) {
t.Error(cgrCfg.CDRSCDRStatSConns)
}
if !reflect.DeepEqual(cgrCfg.CDRSThresholdSConns, eHaPoolCfg) {
t.Error(cgrCfg.CDRSThresholdSConns)
}
if !reflect.DeepEqual(cgrCfg.CDRSStatSConns, eHaPoolCfg) {
t.Error(cgrCfg.CDRSStatSConns)
}
if cgrCfg.CDRSOnlineCDRExports != nil {
t.Error(cgrCfg.CDRSOnlineCDRExports)
}
@@ -579,6 +585,7 @@ func TestCgrCfgJSONDefaultsResLimCfg(t *testing.T) {
Enabled: false,
ThresholdSConns: []*HaPoolConfig{},
StoreInterval: 0,
IndexedFields: []string{},
}
if !reflect.DeepEqual(cgrCfg.resourceSCfg, eResLiCfg) {
t.Errorf("expecting: %s, received: %s", utils.ToJSON(eResLiCfg), utils.ToJSON(cgrCfg.resourceSCfg))
@@ -591,6 +598,7 @@ func TestCgrCfgJSONDefaultStatsCfg(t *testing.T) {
Enabled: false,
StoreInterval: 0,
ThresholdSConns: []*HaPoolConfig{},
IndexedFields: []string{},
}
if !reflect.DeepEqual(cgrCfg.statsCfg, eStatsCfg) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.statsCfg, eStatsCfg)
@@ -601,6 +609,7 @@ func TestCgrCfgJSONDefaultThresholdSCfg(t *testing.T) {
eThresholdSCfg := &ThresholdSCfg{
Enabled: false,
StoreInterval: 0,
IndexedFields: []string{},
}
if !reflect.DeepEqual(eThresholdSCfg, cgrCfg.thresholdSCfg) {
t.Errorf("received: %+v, expecting: %+v", eThresholdSCfg, cgrCfg.statsCfg)

View File

@@ -108,6 +108,7 @@ type CdrsJsonCfg struct {
Users_conns *[]*HaPoolJsonCfg
Aliases_conns *[]*HaPoolJsonCfg
Cdrstats_conns *[]*HaPoolJsonCfg
Thresholds_conns *[]*HaPoolJsonCfg
Stats_conns *[]*HaPoolJsonCfg
Online_cdr_exports *[]string
}
@@ -391,6 +392,7 @@ type ResourceSJsonCfg struct {
Enabled *bool
Thresholds_conns *[]*HaPoolJsonCfg
Store_interval *string
Indexed_fields *[]string
}
// Stat service config section
@@ -398,13 +400,14 @@ type StatServJsonCfg struct {
Enabled *bool
Store_interval *string
Thresholds_conns *[]*HaPoolJsonCfg
Indexed_fields *[]string
}
// Threshold service config section
type ThresholdSJsonCfg struct {
Enabled *bool
Store_interval *string
Filtered_fields *[]string
Enabled *bool
Store_interval *string
Indexed_fields *[]string
}
// Mailer config section

View File

@@ -28,6 +28,7 @@ type ResourceSConfig struct {
Enabled bool
ThresholdSConns []*HaPoolConfig // Connections towards StatS
StoreInterval time.Duration // Dump regularly from cache into dataDB
IndexedFields []string
}
func (rlcfg *ResourceSConfig) loadFromJsonCfg(jsnCfg *ResourceSJsonCfg) (err error) {
@@ -49,5 +50,11 @@ func (rlcfg *ResourceSConfig) loadFromJsonCfg(jsnCfg *ResourceSJsonCfg) (err err
return
}
}
if jsnCfg.Indexed_fields != nil {
rlcfg.IndexedFields = make([]string, len(*jsnCfg.Indexed_fields))
for i, fID := range *jsnCfg.Indexed_fields {
rlcfg.IndexedFields[i] = fID
}
}
return nil
}

View File

@@ -28,6 +28,7 @@ type StatSCfg struct {
Enabled bool
StoreInterval time.Duration // Dump regularly from cache into dataDB
ThresholdSConns []*HaPoolConfig
IndexedFields []string
}
func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) {
@@ -49,5 +50,11 @@ func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) {
st.ThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Indexed_fields != nil {
st.IndexedFields = make([]string, len(*jsnCfg.Indexed_fields))
for i, fID := range *jsnCfg.Indexed_fields {
st.IndexedFields[i] = fID
}
}
return nil
}

View File

@@ -25,9 +25,9 @@ import (
)
type ThresholdSCfg struct {
Enabled bool
StoreInterval time.Duration // Dump regularly from cache into dataDB
FilteredFields []string
Enabled bool
StoreInterval time.Duration // Dump regularly from cache into dataDB
IndexedFields []string
}
func (t *ThresholdSCfg) loadFromJsonCfg(jsnCfg *ThresholdSJsonCfg) (err error) {
@@ -42,5 +42,11 @@ func (t *ThresholdSCfg) loadFromJsonCfg(jsnCfg *ThresholdSJsonCfg) (err error) {
return err
}
}
if jsnCfg.Indexed_fields != nil {
t.IndexedFields = make([]string, len(*jsnCfg.Indexed_fields))
for i, fID := range *jsnCfg.Indexed_fields {
t.IndexedFields[i] = fID
}
}
return nil
}

View File

@@ -2,7 +2,7 @@
cgrates.org,FLTR_1,*string,Account,1001;1002,2014-07-29T15:00:00Z
cgrates.org,FLTR_1,*string_prefix,Destination,10;20,
cgrates.org,FLTR_1,*rsr_fields,,Subject(~^1.*1$);Destination(1002),
cgrates.org,FLTR_ACNT_dan,*string,Account,dan,2014-07-29T15:00:00Z
cgrates.org,FLTR_ACNT_1007,*string,Account,1007,2014-07-29T15:00:00Z
cgrates.org,FLTR_DST_DE,*destinations,Destination,DST_DE,2014-07-29T15:00:00Z
cgrates.org,FLTR_DST_NL,*destinations,Destination,DST_NL,2014-07-29T15:00:00Z
cgrates.org,FLTR_ACNT_BALANCE_1,*string,Account,1001;1002,2014-07-29T15:00:00Z
@@ -26,3 +26,4 @@ cgrates.org,FLTR_DST_FS,*destinations,Destination,DST_FS,2014-07-29T15:00:00Z
cgrates.org,FLTR_RES_GR3,*string,Account,3001,2014-07-29T15:00:00Z
cgrates.org,FLTR_CDRS,*cdr_stats,,CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20,2014-07-29T15:00:00Z
cgrates.org,FLTR_STS1,*string,Account,1001;1002,2014-07-29T15:00:00Z
cgrates.org,FLTR_CDR_UPDATE,*string,EventType,CDR,2014-07-29T15:00:00Z
1 #Tenant[0] ID[1] FilterType[2] FilterFieldName[3] FilterFieldValues[4] ActivationInterval[5]
2 cgrates.org FLTR_1 *string Account 1001;1002 2014-07-29T15:00:00Z
3 cgrates.org FLTR_1 *string_prefix Destination 10;20
4 cgrates.org FLTR_1 *rsr_fields Subject(~^1.*1$);Destination(1002)
5 cgrates.org FLTR_ACNT_dan FLTR_ACNT_1007 *string Account dan 1007 2014-07-29T15:00:00Z
6 cgrates.org FLTR_DST_DE *destinations Destination DST_DE 2014-07-29T15:00:00Z
7 cgrates.org FLTR_DST_NL *destinations Destination DST_NL 2014-07-29T15:00:00Z
8 cgrates.org FLTR_ACNT_BALANCE_1 *string Account 1001;1002 2014-07-29T15:00:00Z
26 cgrates.org FLTR_RES_GR3 *string Account 3001 2014-07-29T15:00:00Z
27 cgrates.org FLTR_CDRS *cdr_stats CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20 2014-07-29T15:00:00Z
28 cgrates.org FLTR_STS1 *string Account 1001;1002 2014-07-29T15:00:00Z
29 cgrates.org FLTR_CDR_UPDATE *string EventType CDR 2014-07-29T15:00:00Z

View File

@@ -5,3 +5,4 @@ cgrates.org,THD_STATS_1,FLTR_STATS_1,2014-07-29T15:00:00Z,true,1,1s,false,10,LOG
cgrates.org,THD_STATS_2,FLTR_STATS_2,2014-07-29T15:00:00Z,true,1,1s,false,10,DISABLE_AND_LOG,false
cgrates.org,THD_STATS_3,FLTR_STATS_3,2014-07-29T15:00:00Z,false,1,1s,false,10,TOPUP_100SMS_DE_MOBILE,false
cgrates.org,THD_RES_1,FLTR_RES_1,2014-07-29T15:00:00Z,true,1,1s,false,10,LOG_WARNING,false
cgrates.org,THD_CDRS_1,FLTR_ACNT_1007;FLTR_CDR_UPDATE,2014-07-29T15:00:00Z,false,1,1s,false,10,LOG_WARNING,false
1 #Tenant[0] Id[1] FilterIDs[2] ActivationInterval[3] Recurrent[4] MinHits[5] MinSleep[6] Blocker[7] Weight[8] ActionIDs[9] Async[10]
5 cgrates.org THD_STATS_2 FLTR_STATS_2 2014-07-29T15:00:00Z true 1 1s false 10 DISABLE_AND_LOG false
6 cgrates.org THD_STATS_3 FLTR_STATS_3 2014-07-29T15:00:00Z false 1 1s false 10 TOPUP_100SMS_DE_MOBILE false
7 cgrates.org THD_RES_1 FLTR_RES_1 2014-07-29T15:00:00Z true 1 1s false 10 LOG_WARNING false
8 cgrates.org THD_CDRS_1 FLTR_ACNT_1007;FLTR_CDR_UPDATE 2014-07-29T15:00:00Z false 1 1s false 10 LOG_WARNING false

View File

@@ -70,7 +70,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub, users,
aliases, cdrstats, stats rpcclient.RpcClientConnection) (*CdrServer, error) {
aliases, cdrstats, thdS, stats rpcclient.RpcClientConnection) (*CdrServer, error) {
if rater != nil && reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code
rater = nil
}
@@ -86,12 +86,15 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r
if cdrstats != nil && reflect.ValueOf(cdrstats).IsNil() {
cdrstats = nil
}
if thdS != nil && reflect.ValueOf(thdS).IsNil() {
thdS = nil
}
if stats != nil && reflect.ValueOf(stats).IsNil() {
stats = nil
}
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm,
rals: rater, pubsub: pubsub, users: users, aliases: aliases,
cdrstats: cdrstats, stats: stats, guard: guardian.Guardian,
cdrstats: cdrstats, stats: stats, thdS: thdS, guard: guardian.Guardian,
httpPoster: utils.NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil
}
@@ -104,6 +107,7 @@ type CdrServer struct {
users rpcclient.RpcClientConnection
aliases rpcclient.RpcClientConnection
cdrstats rpcclient.RpcClientConnection
thdS rpcclient.RpcClientConnection
stats rpcclient.RpcClientConnection
guard *guardian.GuardianLock
responseCache *cache.ResponseCache
@@ -193,6 +197,18 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
return err // Error is propagated back and we don't continue processing the CDR if we cannot store it
}
}
if self.thdS != nil {
cdrIf, _ := cdr.AsMapStringIface()
ev := &ThresholdEvent{
Tenant: cdr.Tenant,
ID: utils.GenUUID(),
Event: cdrIf}
var hits int
if err := self.thdS.Call(utils.ThresholdSv1ProcessEvent, ev, &hits); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<CDRS> error: %s processing CDR event %+v with thdS.", err.Error(), ev))
}
}
// Attach raw CDR to stats
if self.cdrstats != nil { // Send raw CDR to stats
var out int

View File

@@ -25,11 +25,24 @@ import (
)
// matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event
// fieldIDs limits the fields which are checked against indexes
// helper on top of dataDB.MatchReqFilterIndex, adding utils.NOT_AVAILABLE to list of fields queried
// executes a number of $(len(fields) + 1) queries to dataDB so the size of event influences the speed of return
func matchingItemIDsForEvent(ev map[string]interface{}, dm *DataManager, dbIdxKey string) (itemIDs utils.StringMap, err error) {
func matchingItemIDsForEvent(ev map[string]interface{}, fieldIDs []string, dm *DataManager, dbIdxKey string) (itemIDs utils.StringMap, err error) {
if len(fieldIDs) == 0 {
fieldIDs = make([]string, len(ev))
i := 0
for fldID := range ev {
fieldIDs[i] = fldID
i += 1
}
}
itemIDs = make(utils.StringMap)
for fldName, fieldValIf := range ev {
for _, fldName := range fieldIDs {
fieldValIf, has := ev[fldName]
if !has {
continue
}
fldVal, canCast := utils.CastFieldIfToString(fieldValIf)
if !canCast {
return nil, fmt.Errorf("Cannot cast field: %s into string", fldName)

View File

@@ -279,7 +279,7 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage
// Pas the config as a whole so we can ask access concurrently
func NewResourceService(dm *DataManager, storeInterval time.Duration,
thdS rpcclient.RpcClientConnection, filterS *FilterS) (*ResourceService, error) {
thdS rpcclient.RpcClientConnection, filterS *FilterS, indexedFields []string) (*ResourceService, error) {
if thdS != nil && reflect.ValueOf(thdS).IsNil() {
thdS = nil
}
@@ -296,6 +296,7 @@ type ResourceService struct {
dm *DataManager // So we can load the data in cache and index it
thdS rpcclient.RpcClientConnection // allows applying filters based on stats
filterS *FilterS
indexedFields []string // speed up query on indexes
lcEventResources map[string][]*utils.TenantID // cache recording resources for events in alocation phase
lcERMux sync.RWMutex // protects the lcEventResources
storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool
@@ -433,7 +434,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call
func (rS *ResourceService) matchingResourcesForEvent(tenant string, ev map[string]interface{}) (rs Resources, err error) {
matchingResources := make(map[string]*Resource)
rIDs, err := matchingItemIDsForEvent(ev, rS.dm, utils.ResourceProfilesStringIndex+tenant)
rIDs, err := matchingItemIDsForEvent(ev, rS.indexedFields, rS.dm, utils.ResourceProfilesStringIndex+tenant)
if err != nil {
return nil, err
}

View File

@@ -34,7 +34,7 @@ import (
// NewStatService initializes a StatService
func NewStatService(dm *DataManager, storeInterval time.Duration,
thdS rpcclient.RpcClientConnection, filterS *FilterS) (ss *StatService, err error) {
thdS rpcclient.RpcClientConnection, filterS *FilterS, indexedFields []string) (ss *StatService, err error) {
if thdS != nil && reflect.ValueOf(thdS).IsNil() { // fix nil value in interface
thdS = nil
}
@@ -43,6 +43,7 @@ func NewStatService(dm *DataManager, storeInterval time.Duration,
storeInterval: storeInterval,
thdS: thdS,
filterS: filterS,
indexedFields: indexedFields,
storedStatQueues: make(utils.StringMap),
stopBackup: make(chan struct{})}, nil
}
@@ -53,6 +54,7 @@ type StatService struct {
storeInterval time.Duration
thdS rpcclient.RpcClientConnection // rpc connection towards ThresholdS
filterS *FilterS
indexedFields []string
stopBackup chan struct{}
storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
ssqMux sync.RWMutex // protects storedStatQueues
@@ -139,7 +141,7 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call
func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues, err error) {
matchingSQs := make(map[string]*StatQueue)
sqIDs, err := matchingItemIDsForEvent(ev.Event, sS.dm, utils.StatQueuesStringIndex+ev.Tenant)
sqIDs, err := matchingItemIDsForEvent(ev.Event, sS.indexedFields, sS.dm, utils.StatQueuesStringIndex+ev.Tenant)
if err != nil {
return nil, err
}

View File

@@ -92,7 +92,7 @@ func (csvs *CSVStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, e
var tpTimings TpTimings
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in timings csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.timingsFn, err.Error())
return nil, err
}
if tpTiming, err := csvLoad(TpTiming{}, record); err != nil {
@@ -120,7 +120,7 @@ func (csvs *CSVStorage) GetTPDestinations(tpid, id string) ([]*utils.TPDestinati
var tpDests TpDestinations
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in destinations csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.destinationsFn, err.Error())
return nil, err
}
if tpDest, err := csvLoad(TpDestination{}, record); err != nil {
@@ -148,7 +148,7 @@ func (csvs *CSVStorage) GetTPRates(tpid, id string) ([]*utils.TPRate, error) {
var tpRates TpRates
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in rates csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.ratesFn, err.Error())
return nil, err
}
if tpRate, err := csvLoad(TpRate{}, record); err != nil {
@@ -180,7 +180,7 @@ func (csvs *CSVStorage) GetTPDestinationRates(tpid, id string, p *utils.Paginato
var tpDestinationRates TpDestinationRates
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in destinationrates csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.destinationratesFn, err.Error())
return nil, err
}
if tpRate, err := csvLoad(TpDestinationRate{}, record); err != nil {
@@ -212,7 +212,7 @@ func (csvs *CSVStorage) GetTPRatingPlans(tpid, id string, p *utils.Paginator) ([
var tpRatingPlans TpRatingPlans
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in rating plans csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.destinationratetimingsFn, err.Error())
return nil, err
}
if tpRate, err := csvLoad(TpRatingPlan{}, record); err != nil {
@@ -244,7 +244,7 @@ func (csvs *CSVStorage) GetTPRatingProfiles(filter *utils.TPRatingProfile) ([]*u
var tpRatingProfiles TpRatingProfiles
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line rating profiles csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.ratingprofilesFn, err.Error())
return nil, err
}
if tpRate, err := csvLoad(TpRatingProfile{}, record); err != nil {
@@ -279,7 +279,7 @@ func (csvs *CSVStorage) GetTPSharedGroups(tpid, id string) ([]*utils.TPSharedGro
var tpSharedGroups TpSharedGroups
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in shared groups csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.sharedgroupsFn, err.Error())
return nil, err
}
if tpRate, err := csvLoad(TpSharedGroup{}, record); err != nil {
@@ -312,7 +312,7 @@ func (csvs *CSVStorage) GetTPLCRs(filter *utils.TPLcrRules) ([]*utils.TPLcrRules
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpLcrRule{}, record); err != nil {
if err != nil {
log.Print("bad line in lcr rules csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.lcrFn, err.Error())
return nil, err
}
return nil, err
@@ -344,7 +344,7 @@ func (csvs *CSVStorage) GetTPActions(tpid, id string) ([]*utils.TPActions, error
var tpActions TpActions
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in actions csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.actionsFn, err.Error())
return nil, err
}
if tpAction, err := csvLoad(TpAction{}, record); err != nil {
@@ -404,7 +404,7 @@ func (csvs *CSVStorage) GetTPActionTriggers(tpid, id string) ([]*utils.TPActionT
var tpActionTriggers TpActionTriggers
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in action triggers csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.actiontriggersFn, err.Error())
return nil, err
}
if tpAt, err := csvLoad(TpActionTrigger{}, record); err != nil {
@@ -436,7 +436,7 @@ func (csvs *CSVStorage) GetTPAccountActions(filter *utils.TPAccountActions) ([]*
var tpAccountActions TpAccountActions
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in account actions csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.accountactionsFn, err.Error())
return nil, err
}
if tpAa, err := csvLoad(TpAccountAction{}, record); err != nil {
@@ -471,7 +471,7 @@ func (csvs *CSVStorage) GetTPDerivedChargers(filter *utils.TPDerivedChargers) ([
var tpDerivedChargers TpDerivedChargers
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in derived chargers csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.derivedChargersFn, err.Error())
return nil, err
}
if tp, err := csvLoad(TpDerivedCharger{}, record); err != nil {
@@ -506,7 +506,7 @@ func (csvs *CSVStorage) GetTPCdrStats(tpid, id string) ([]*utils.TPCdrStats, err
var tpCdrStats TpCdrStats
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in cdr stats csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.cdrStatsFn, err.Error())
return nil, err
}
if tpCdrStat, err := csvLoad(TpCdrstat{}, record); err != nil {
@@ -538,7 +538,7 @@ func (csvs *CSVStorage) GetTPUsers(filter *utils.TPUsers) ([]*utils.TPUsers, err
var tpUsers TpUsers
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in users csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.usersFn, err.Error())
return nil, err
}
if tpUser, err := csvLoad(TpUser{}, record); err != nil {
@@ -572,7 +572,7 @@ func (csvs *CSVStorage) GetTPAliases(filter *utils.TPAliases) ([]*utils.TPAliase
var tpAliases TpAliases
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in aliases csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.aliasesFn, err.Error())
return nil, err
}
if tpAlias, err := csvLoad(TpAlias{}, record); err != nil {
@@ -606,7 +606,7 @@ func (csvs *CSVStorage) GetTPResources(tpid, id string) ([]*utils.TPResource, er
var tpResLimits TpResources
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in resourceprofiles csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.resProfilesFn, err.Error())
return nil, err
}
if tpResLimit, err := csvLoad(TpResource{}, record); err != nil {
@@ -634,7 +634,7 @@ func (csvs *CSVStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) {
var tpStats TpStatsS
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in TPStats csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.statsFn, err.Error())
return nil, err
}
if tpstats, err := csvLoad(TpStats{}, record); err != nil {
@@ -662,7 +662,7 @@ func (csvs *CSVStorage) GetTPThresholds(tpid, id string) ([]*utils.TPThreshold,
var tpThreshold TpThresholdS
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in TPThreshold csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.thresholdsFn, err.Error())
return nil, err
}
if thresholdCfg, err := csvLoad(TpThreshold{}, record); err != nil {
@@ -690,7 +690,7 @@ func (csvs *CSVStorage) GetTPFilters(tpid, id string) ([]*utils.TPFilter, error)
var tpFilter TpFilterS
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in TPFilter csv: ", err)
log.Printf("bad line in %s, %s\n", csvs.filterFn, err.Error())
return nil, err
}
if filterCfg, err := csvLoad(TpFilter{}, record); err != nil {

View File

@@ -1281,9 +1281,6 @@ func (rs *RedisStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]ma
}
func (rs *RedisStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
if err = rs.Cmd("DEL", dbKey).Err; err != nil { // DELETE before set
return
}
mp := make(map[string]string)
for fldName, fldValMp := range indexes {
for fldVal, strMp := range fldValMp {

View File

@@ -135,9 +135,19 @@ func (t *Threshold) ProcessEvent(ev *ThresholdEvent, dm *DataManager) (err error
if acntID != "" {
at.accountIDs = utils.NewStringMap(acntID)
}
if errExec := at.Execute(nil, nil); errExec != nil {
utils.Logger.Warning(fmt.Sprintf("<ThresholdS> failed executing actions: %s, error: %s", actionSetID, errExec.Error()))
err = utils.ErrPartiallyExecuted
if t.tPrfl.Async {
go func() {
if errExec := at.Execute(nil, nil); errExec != nil {
utils.Logger.Warning(fmt.Sprintf("<ThresholdS> failed executing actions: %s, error: %s", actionSetID, errExec.Error()))
}
}()
} else {
if errExec := at.Execute(nil, nil); errExec != nil {
utils.Logger.Warning(fmt.Sprintf("<ThresholdS> failed executing actions: %s, error: %s", actionSetID, errExec.Error()))
err = utils.ErrPartiallyExecuted
}
}
}
return
@@ -151,25 +161,25 @@ func (ts Thresholds) Sort() {
sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight })
}
func NewThresholdService(dm *DataManager, filteredFields []string, storeInterval time.Duration,
func NewThresholdService(dm *DataManager, indexedFields []string, storeInterval time.Duration,
filterS *FilterS) (tS *ThresholdService, err error) {
return &ThresholdService{dm: dm,
filteredFields: filteredFields,
storeInterval: storeInterval,
filterS: filterS,
stopBackup: make(chan struct{}),
storedTdIDs: make(utils.StringMap)}, nil
indexedFields: indexedFields,
storeInterval: storeInterval,
filterS: filterS,
stopBackup: make(chan struct{}),
storedTdIDs: make(utils.StringMap)}, nil
}
// ThresholdService manages Threshold execution and storing them to dataDB
type ThresholdService struct {
dm *DataManager
filteredFields []string // fields considered when searching for matching thresholds
storeInterval time.Duration
filterS *FilterS
stopBackup chan struct{}
storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
stMux sync.RWMutex // protects storedTdIDs
dm *DataManager
indexedFields []string // fields considered when searching for matching thresholds
storeInterval time.Duration
filterS *FilterS
stopBackup chan struct{}
storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
stMux sync.RWMutex // protects storedTdIDs
}
// Called to start the service
@@ -254,7 +264,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) {
// matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event
func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts Thresholds, err error) {
matchingTs := make(map[string]*Threshold)
tIDs, err := matchingItemIDsForEvent(ev.Event, tS.dm, utils.ThresholdStringIndex+ev.Tenant)
tIDs, err := matchingItemIDsForEvent(ev.Event, tS.indexedFields, tS.dm, utils.ThresholdStringIndex+ev.Tenant)
if err != nil {
return nil, err
}

View File

@@ -101,7 +101,7 @@ func TestTutSMGCacheStats(t *testing.T) {
expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9,
Actions: 9, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, LcrProfiles: 5,
CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3, StatQueues: 0,
StatQueueProfiles: 0, Thresholds: 6, ThresholdProfiles: 6, Filters: 14}
StatQueueProfiles: 0, Thresholds: 7, ThresholdProfiles: 7, Filters: 15}
var args utils.AttrCacheStats
if err := tutSMGRpc.Call("ApierV2.GetCacheStats", args, &rcvStats); err != nil {
t.Error("Got error on ApierV2.GetCacheStats: ", err.Error())

View File

@@ -105,7 +105,7 @@ func TestTutITCacheStats(t *testing.T) {
expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9,
Actions: 9, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, LcrProfiles: 5,
CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3, StatQueues: 0,
StatQueueProfiles: 0, Thresholds: 6, ThresholdProfiles: 6, Filters: 14}
StatQueueProfiles: 0, Thresholds: 7, ThresholdProfiles: 7, Filters: 15}
var args utils.AttrCacheStats
if err := tutLocalRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil {
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())

View File

@@ -480,6 +480,7 @@ const (
BalanceUpdate = "BalanceUpdate"
StatUpdate = "StatUpdate"
ResourceUpdate = "ResourceUpdate"
CDR = "CDR"
ExpiryTime = "ExpiryTime"
AllowNegative = "AllowNegative"
Disabled = "Disabled"