mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Merge branch 'stats_save'
This commit is contained in:
@@ -492,7 +492,7 @@ func main() {
|
||||
}
|
||||
|
||||
if cfg.CDRStatsEnabled { // Init it here so we make it availabe to the Apier
|
||||
cdrStats = engine.NewStats(ratingDb)
|
||||
cdrStats = engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval)
|
||||
if cfg.CDRStatConfig != nil && len(cfg.CDRStatConfig.Metrics) != 0 {
|
||||
cdrStats.AddQueue(engine.NewCdrStatsFromCdrStatsCfg(cfg.CDRStatConfig), nil)
|
||||
}
|
||||
|
||||
@@ -26,8 +26,9 @@ type CdrStatsConfig struct {
|
||||
Id string // Config id, unique per config instance
|
||||
QueueLength int // Number of items in the stats buffer
|
||||
TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
|
||||
Metrics []string // ASR, ACD, ACC
|
||||
SetupInterval []time.Time // 2 or less items (>= start interval,< stop_interval)
|
||||
SaveInterval time.Duration
|
||||
Metrics []string // ASR, ACD, ACC
|
||||
SetupInterval []time.Time // 2 or less items (>= start interval,< stop_interval)
|
||||
TORs []string
|
||||
CdrHosts []string
|
||||
CdrSources []string
|
||||
|
||||
@@ -202,6 +202,7 @@ type CGRConfig struct {
|
||||
CDRSReconnects int // number of reconnects to remote services before giving up
|
||||
CDRSCdrReplication []*CdrReplicationCfg // Replicate raw CDRs to a number of servers
|
||||
CDRStatsEnabled bool // Enable CDR Stats service
|
||||
CDRStatsSaveInterval time.Duration // Save interval duration
|
||||
CDRStatConfig *CdrStatsConfig // Active cdr stats configuration instances, platform level
|
||||
CdreProfiles map[string]*CdreConfig
|
||||
CdrcProfiles map[string]map[string]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath]map[instanceName]{Configs}
|
||||
@@ -590,6 +591,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
|
||||
if jsnCdrstatsCfg != nil {
|
||||
if jsnCdrstatsCfg.Enabled != nil {
|
||||
self.CDRStatsEnabled = *jsnCdrstatsCfg.Enabled
|
||||
if jsnCdrstatsCfg.Save_Interval != nil {
|
||||
if self.CDRStatsSaveInterval, err = utils.ParseDurationWithSecs(*jsnCdrstatsCfg.Save_Interval); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -108,6 +108,7 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
"cdrstats": {
|
||||
"enabled": false, // starts the cdrstats service: <true|false>
|
||||
"save_interval": "5s",
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -155,12 +155,13 @@ func TestDfCdrsJsonCfg(t *testing.T) {
|
||||
|
||||
func TestDfCdrStatsJsonCfg(t *testing.T) {
|
||||
eCfg := &CdrStatsJsonCfg{
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Save_Interval: utils.StringPointer("5s"),
|
||||
}
|
||||
if cfg, err := dfCgrJsonCfg.CdrStatsJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eCfg, cfg) {
|
||||
t.Error("Received: ", cfg)
|
||||
t.Error("Received: ", *cfg)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -86,7 +86,8 @@ type CdrReplicationJsonCfg struct {
|
||||
|
||||
// Cdrstats config section
|
||||
type CdrStatsJsonCfg struct {
|
||||
Enabled *bool
|
||||
Enabled *bool
|
||||
Save_Interval *string
|
||||
}
|
||||
|
||||
// One cdr field config, used in cdre and cdrc
|
||||
|
||||
@@ -87,7 +87,8 @@
|
||||
|
||||
|
||||
//"cdrstats": {
|
||||
// "enabled": false, // starts the cdrstats service: <true|false>
|
||||
// "enabled": false, // starts the cdrstats service: <true|false>
|
||||
// "save_interval": "5s",
|
||||
//},
|
||||
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
"enabled": true, // starts the cdrstats service: <true|false>
|
||||
"queue_length": 5, // number of items in the stats buffer
|
||||
"time_window": "0", // will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
|
||||
"save_interval": "5s",
|
||||
},
|
||||
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
|
||||
"cdrstats": {
|
||||
"enabled": true, // starts the cdrstats service: <true|false>
|
||||
"save_interval": "5s"
|
||||
},
|
||||
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ enabled = true # Starts Mediator service: <true|false>.
|
||||
enabled = true # Starts the cdrstats service: <true|false>
|
||||
#queue_length = 50 # Number of items in the stats buffer
|
||||
time_window = 1h # Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
|
||||
save_interval = 5s
|
||||
# metrics = ASR, ACD, ACC # Stat metric ids to build
|
||||
# setup_interval = # Filter on CDR SetupTime
|
||||
# tors = # Filter on CDR TOR fields
|
||||
@@ -39,7 +40,7 @@ time_window = 1h # Will only keep the CDRs who's call setup time is not olde
|
||||
# accounts = # Filter on CDR Account fields
|
||||
# subjects = # Filter on CDR Subject fields
|
||||
# destination_prefixes = # Filter on CDR Destination prefixes
|
||||
# usage_interval = # Filter on CDR Usage
|
||||
# usage_interval = # Filter on CDR Usage
|
||||
# mediation_run_ids = # Filter on CDR MediationRunId fields
|
||||
# rated_accounts = # Filter on CDR RatedAccount fields
|
||||
# rated_subjects = # Filter on CDR RatedSubject fields
|
||||
@@ -58,4 +59,3 @@ mi_addr = 172.16.254.77:8020 # Adress where to reach OpenSIPS mi_datagram modu
|
||||
# auth_user = cgrates # Authenticate to email server using this user
|
||||
# auth_passwd = CGRateS.org # Authenticate to email server with this password
|
||||
# from_address = cgr-mailer@localhost.localdomain # From address used when sending emails out
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
"scheduler": {
|
||||
"enabled": true, // start Scheduler service: <true|false>
|
||||
"save_interval": "5s",
|
||||
},
|
||||
|
||||
"cdrs": {
|
||||
|
||||
@@ -309,6 +309,7 @@ CREATE TABLE tp_cdrstats (
|
||||
`tag` varchar(64) NOT NULL,
|
||||
`queue_length` int(11) NOT NULL,
|
||||
`time_window` varchar(8) NOT NULL,
|
||||
`save_interval` varchar(8) NOT NULL,
|
||||
`metrics` varchar(64) NOT NULL,
|
||||
`setup_interval` varchar(64) NOT NULL,
|
||||
`tors` varchar(64) NOT NULL,
|
||||
|
||||
@@ -304,6 +304,7 @@ CREATE TABLE tp_cdrstats (
|
||||
tag VARCHAR(64) NOT NULL,
|
||||
queue_length INTEGER NOT NULL,
|
||||
time_window VARCHAR(8) NOT NULL,
|
||||
save_interval VARCHAR(8) NOT NULL,
|
||||
metrics VARCHAR(64) NOT NULL,
|
||||
setup_interval VARCHAR(64) NOT NULL,
|
||||
tors VARCHAR(64) NOT NULL,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#Id[0],QueueLength[1],TimeWindow[2],Metric[3],SetupInterval[4],TOR[5],CdrHost[6],CdrSource[7],ReqType[8],Direction[9],Tenant[10],Category[11],Account[12],Subject[13],DestinationPrefix[14],PddInterval[15],UsageInterval[16],Supplier[17],DisconnectCause[18],MediationRunIds[19],RatedAccount[20],RatedSubject[21],CostInterval[22],Triggers[23]
|
||||
CDRST3,5,60m,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR
|
||||
CDRST3,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD
|
||||
CDRST3,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC
|
||||
CDRST4,10,0,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR
|
||||
CDRST4,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD
|
||||
#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24]
|
||||
CDRST3,5,60m,10s,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR
|
||||
CDRST3,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD
|
||||
CDRST3,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC
|
||||
CDRST4,10,0,10s,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR
|
||||
CDRST4,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
#Id[0],QueueLength[1],TimeWindow[2],Metric[3],SetupInterval[4],TOR[5],CdrHost[6],CdrSource[7],ReqType[8],Direction[9],Tenant[10],Category[11],Account[12],Subject[13],DestinationPrefix[14],PddInterval[15],UsageInterval[16],Supplier[17],DisconnectCause[18],MediationRunIds[19],RatedAccount[20],RatedSubject[21],CostInterval[22],Triggers[23]
|
||||
CDRST1,5,60m,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST1_WARN_ASR
|
||||
CDRST1,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST1_WARN_ACD
|
||||
CDRST1,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST1_WARN_ACC
|
||||
CDRST2,10,10m,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST2_WARN_ASR
|
||||
CDRST2,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST2_WARN_ACD
|
||||
#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24]
|
||||
CDRST1,5,60m,10s,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST1_WARN_ASR
|
||||
CDRST1,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST1_WARN_ACD
|
||||
CDRST1,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST1_WARN_ACC
|
||||
CDRST2,10,10m,10s,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST2_WARN_ASR
|
||||
CDRST2,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST2_WARN_ACD
|
||||
|
||||
|
@@ -1,24 +1,24 @@
|
||||
#Id[0],QueueLength[1],TimeWindow[2],Metric[3],SetupInterval[4],TOR[5],CdrHost[6],CdrSource[7],ReqType[8],Direction[9],Tenant[10],Category[11],Account[12],Subject[13],DestinationPrefix[14],PddInterval[15],UsageInterval[16],Supplier[17],DisconnectCause[18],MediationRunIds[19],RatedAccount[20],RatedSubject[21],CostInterval[22],Triggers[23]
|
||||
CDRST1,10,0,ASR,,,,,,,cgrates.org,,,,,,,,,*default,,,,CDRST1_WARN
|
||||
CDRST1,,,ACD,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST1,,,ACC,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST1,,,TCD,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST1,,,TCC,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST_1001,10,10m,ASR,,,,,,,cgrates.org,,,1001,,,,,,*default,,,,CDRST1001_WARN
|
||||
CDRST_1001,,,ACD,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST_1001,,,ACC,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST_1002,10,10m,ASR,,,,,,,cgrates.org,,,1002,,,,,,*default,,,,CDRST1001_WARN
|
||||
CDRST_1002,,,ACD,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST_1002,,,ACC,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST_1003,,,ASR,,,,,,,cgrates.org,,,,1003,,,,,*default,,,,CDRST3_WARN
|
||||
CDRST_1003,,,ACD,,,,,,,,,,,,,,,,,,,,
|
||||
STATS_SUPPL1,,,ACD,,,,,,,,,,,,,,suppl1,,,,,,
|
||||
STATS_SUPPL1,,,ASR,,,,,,,,,,,,,,suppl1,,,,,,
|
||||
STATS_SUPPL1,,,ACC,,,,,,,,,,,,,,suppl1,,,,,,
|
||||
STATS_SUPPL1,,,TCD,,,,,,,,,,,,,,suppl1,,,,,,
|
||||
STATS_SUPPL1,,,TCC,,,,,,,,,,,,,,suppl1,,,,,,
|
||||
STATS_SUPPL2,,,ACD,,,,,,,,,,,,,,suppl2,,,,,,
|
||||
STATS_SUPPL2,,,ASR,,,,,,,,,,,,,,suppl2,,,,,,
|
||||
STATS_SUPPL2,,,ACC,,,,,,,,,,,,,,suppl2,,,,,,
|
||||
STATS_SUPPL2,,,TCD,,,,,,,,,,,,,,suppl2,,,,,,
|
||||
STATS_SUPPL2,,,TCC,,,,,,,,,,,,,,suppl2,,,,,,
|
||||
#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24]
|
||||
CDRST1,10,0,10s,ASR,,,,,,,cgrates.org,,,,,,,,,*default,,,,CDRST1_WARN
|
||||
CDRST1,,,,ACD,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST1,,,,ACC,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST1,,,,TCD,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST1,,,,TCC,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST_1001,10,10m,10s,ASR,,,,,,,cgrates.org,,,1001,,,,,,*default,,,,CDRST1001_WARN
|
||||
CDRST_1001,,,,ACD,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST_1001,,,,ACC,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST_1002,10,10m,10s,ASR,,,,,,,cgrates.org,,,1002,,,,,,*default,,,,CDRST1001_WARN
|
||||
CDRST_1002,,,,ACD,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST_1002,,,,ACC,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST_1003,,,,ASR,,,,,,,cgrates.org,,,,1003,,,,,*default,,,,CDRST3_WARN
|
||||
CDRST_1003,,,,ACD,,,,,,,,,,,,,,,,,,,,
|
||||
STATS_SUPPL1,,,,ACD,,,,,,,,,,,,,,suppl1,,,,,,
|
||||
STATS_SUPPL1,,,,ASR,,,,,,,,,,,,,,suppl1,,,,,,
|
||||
STATS_SUPPL1,,,,ACC,,,,,,,,,,,,,,suppl1,,,,,,
|
||||
STATS_SUPPL1,,,,TCD,,,,,,,,,,,,,,suppl1,,,,,,
|
||||
STATS_SUPPL1,,,,TCC,,,,,,,,,,,,,,suppl1,,,,,,
|
||||
STATS_SUPPL2,,,,ACD,,,,,,,,,,,,,,suppl2,,,,,,
|
||||
STATS_SUPPL2,,,,ASR,,,,,,,,,,,,,,suppl2,,,,,,
|
||||
STATS_SUPPL2,,,,ACC,,,,,,,,,,,,,,suppl2,,,,,,
|
||||
STATS_SUPPL2,,,,TCD,,,,,,,,,,,,,,suppl2,,,,,,
|
||||
STATS_SUPPL2,,,,TCC,,,,,,,,,,,,,,suppl2,,,,,,
|
||||
|
||||
|
@@ -54,9 +54,10 @@ func NewCdrStatsFromCdrStatsCfg(csCfg *config.CdrStatsConfig) *CdrStats {
|
||||
}
|
||||
|
||||
type CdrStats struct {
|
||||
Id string // Config id, unique per config instance
|
||||
QueueLength int // Number of items in the stats buffer
|
||||
TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
|
||||
Id string // Config id, unique per config instance
|
||||
QueueLength int // Number of items in the stats buffer
|
||||
TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
|
||||
SaveInterval time.Duration
|
||||
Metrics []string // ASR, ACD, ACC
|
||||
SetupInterval []time.Time // CDRFieldFilter on SetupInterval, 2 or less items (>= start interval,< stop_interval)
|
||||
TOR []string // CDRFieldFilter on TORs
|
||||
|
||||
@@ -202,12 +202,12 @@ vdf,post,*out,POST_AT,
|
||||
*out,cgrates.org,call,dan,*any,extra1,,,,,,rif2,rif2,,,,,,,
|
||||
`
|
||||
cdrStats = `
|
||||
#Id[0],QueueLength[1],TimeWindow[2],Metric[3],SetupInterval[4],TOR[5],CdrHost[6],CdrSource[7],ReqType[8],Direction[9],Tenant[10],Category[11],Account[12],Subject[13],DestinationPrefix[14],PddInterval[15],UsageInterval[16],Supplier[17],DisconnectCause[18],MediationRunIds[19],RatedAccount[20],RatedSubject[21],CostInterval[22],Triggers[23]CDRST1,5,60m,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,*rated,*out,cgrates.org,call,dan,dan,49,5m;10m,suppl1,NORMAL_CLEARING,default,rif,rif,0;2,STANDARD_TRIGGERS
|
||||
CDRST1,5,60m,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,*rated,*out,cgrates.org,call,dan,dan,49,3m;7m,5m;10m,suppl1,NORMAL_CLEARING,default,rif,rif,0;2,STANDARD_TRIGGERS
|
||||
CDRST1,,,ACD,,,,,,,,,,,,,,,,,,,,STANDARD_TRIGGER
|
||||
CDRST1,,,ACC,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST2,10,10m,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,
|
||||
CDRST2,,,ACD,,,,,,,,,,,,,,,,,,,,
|
||||
#Id[0],QueueLength[1],TimeWindow[2],SaveInterval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24]
|
||||
CDRST1,5,60m,10s,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,*rated,*out,cgrates.org,call,dan,dan,49,3m;7m,5m;10m,suppl1,NORMAL_CLEARING,default,rif,rif,0;2,STANDARD_TRIGGERS
|
||||
CDRST1,,,,ACD,,,,,,,,,,,,,,,,,,,,STANDARD_TRIGGER
|
||||
CDRST1,,,,ACC,,,,,,,,,,,,,,,,,,,,
|
||||
CDRST2,10,10m,,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,
|
||||
CDRST2,,,,ACD,,,,,,,,,,,,,,,,,,,,
|
||||
`
|
||||
)
|
||||
|
||||
@@ -1062,10 +1062,11 @@ func TestLoadCdrStats(t *testing.T) {
|
||||
t.Error("Failed to load cdr stats: ", csvr.cdrStats)
|
||||
}
|
||||
cdrStats1 := &CdrStats{
|
||||
Id: "CDRST1",
|
||||
QueueLength: 5,
|
||||
TimeWindow: 60 * time.Minute,
|
||||
Metrics: []string{"ASR", "ACD", "ACC"},
|
||||
Id: "CDRST1",
|
||||
QueueLength: 5,
|
||||
TimeWindow: 60 * time.Minute,
|
||||
SaveInterval: 10 * time.Second,
|
||||
Metrics: []string{"ASR", "ACD", "ACC"},
|
||||
SetupInterval: []time.Time{
|
||||
time.Date(2014, 7, 29, 15, 0, 0, 0, time.UTC),
|
||||
time.Date(2014, 7, 29, 16, 0, 0, 0, time.UTC),
|
||||
|
||||
@@ -325,6 +325,7 @@ func APItoModelCdrStat(stats *utils.TPCdrStats) (result []TpCdrstat) {
|
||||
Tag: stats.CdrStatsId,
|
||||
QueueLength: ql,
|
||||
TimeWindow: st.TimeWindow,
|
||||
SaveInterval: st.SaveInterval,
|
||||
Metrics: st.Metrics,
|
||||
SetupInterval: st.SetupInterval,
|
||||
Tors: st.TORs,
|
||||
|
||||
@@ -506,6 +506,7 @@ func (tps TpCdrStats) GetCdrStats() (map[string][]*utils.TPCdrStat, error) {
|
||||
QueueLength: strconv.Itoa(tpCs.QueueLength),
|
||||
TimeWindow: tpCs.TimeWindow,
|
||||
Metrics: tpCs.Metrics,
|
||||
SaveInterval: tpCs.SaveInterval,
|
||||
SetupInterval: tpCs.SetupInterval,
|
||||
TORs: tpCs.Tors,
|
||||
CdrHosts: tpCs.CdrHosts,
|
||||
@@ -546,6 +547,13 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, tpCs *util
|
||||
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.TimeWindow, cs.Id)
|
||||
}
|
||||
}
|
||||
if tpCs.SaveInterval != "" {
|
||||
if si, err := time.ParseDuration(tpCs.SaveInterval); err == nil {
|
||||
cs.SaveInterval = si
|
||||
} else {
|
||||
log.Printf("Error parsing SaveInterval %v for cdr stats %v", tpCs.SaveInterval, cs.Id)
|
||||
}
|
||||
}
|
||||
if tpCs.Metrics != "" {
|
||||
cs.Metrics = append(cs.Metrics, tpCs.Metrics)
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ func TestModelHelperCsvLoad(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestModelHelperCsvLoadInt(t *testing.T) {
|
||||
l, err := csvLoad(TpCdrstat{}, []string{"CDRST1", "5", "60m", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", "*rated", "*out", "cgrates.org", "call", "dan", "dan", "49", "3m;7m", "5m;10m", "suppl1", "NORMAL_CLEARING", "default", "rif", "rif", "0;2", "STANDARD_TRIGGERS"})
|
||||
l, err := csvLoad(TpCdrstat{}, []string{"CDRST1", "5", "60m", "10s", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", "*rated", "*out", "cgrates.org", "call", "dan", "dan", "49", "3m;7m", "5m;10m", "suppl1", "NORMAL_CLEARING", "default", "rif", "rif", "0;2", "STANDARD_TRIGGERS"})
|
||||
tpd, ok := l.(TpCdrstat)
|
||||
if err != nil || !ok || tpd.QueueLength != 5 {
|
||||
t.Errorf("model load failed: %+v", tpd)
|
||||
@@ -377,6 +377,7 @@ func TestTPCdrStatsAsExportSlice(t *testing.T) {
|
||||
&utils.TPCdrStat{
|
||||
QueueLength: "5",
|
||||
TimeWindow: "60m",
|
||||
SaveInterval: "10s",
|
||||
Metrics: "ASR;ACD",
|
||||
SetupInterval: "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z",
|
||||
TORs: "*voice",
|
||||
@@ -401,6 +402,7 @@ func TestTPCdrStatsAsExportSlice(t *testing.T) {
|
||||
&utils.TPCdrStat{
|
||||
QueueLength: "5",
|
||||
TimeWindow: "60m",
|
||||
SaveInterval: "9s",
|
||||
Metrics: "ASR",
|
||||
SetupInterval: "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z",
|
||||
TORs: "*voice",
|
||||
@@ -425,9 +427,9 @@ func TestTPCdrStatsAsExportSlice(t *testing.T) {
|
||||
},
|
||||
}
|
||||
expectedSlc := [][]string{
|
||||
[]string{"CDRST1", "5", "60m", "ASR;ACD", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call",
|
||||
[]string{"CDRST1", "5", "60m", "10s", "ASR;ACD", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call",
|
||||
"dan", "dan", "49", "3m;7m", "5m;10m", "supplier1", "NORMAL_CLEARNING", "default", "rif", "rif", "0;2", "STANDARD_TRIGGERS"},
|
||||
[]string{"CDRST1", "5", "60m", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call",
|
||||
[]string{"CDRST1", "5", "60m", "9s", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call",
|
||||
"dan", "dan", "49", "3m;7m", "5m;10m", "supplier1", "NORMAL_CLEARNING", "default", "dan", "dan", "0;2", "STANDARD_TRIGGERS"},
|
||||
}
|
||||
ms := APItoModelCdrStat(cdrStats)
|
||||
|
||||
@@ -295,27 +295,28 @@ type TpCdrstat struct {
|
||||
Tag string `index:"0" re:""`
|
||||
QueueLength int `index:"1" re:""`
|
||||
TimeWindow string `index:"2" re:""`
|
||||
Metrics string `index:"3" re:""`
|
||||
SetupInterval string `index:"4" re:""`
|
||||
Tors string `index:"5" re:""`
|
||||
CdrHosts string `index:"6" re:""`
|
||||
CdrSources string `index:"7" re:""`
|
||||
ReqTypes string `index:"8" re:""`
|
||||
Directions string `index:"9" re:""`
|
||||
Tenants string `index:"10" re:""`
|
||||
Categories string `index:"11" re:""`
|
||||
Accounts string `index:"12" re:""`
|
||||
Subjects string `index:"13" re:""`
|
||||
DestinationPrefixes string `index:"14" re:""`
|
||||
PddInterval string `index:"15" re:""`
|
||||
UsageInterval string `index:"16" re:""`
|
||||
Suppliers string `index:"17" re:""`
|
||||
DisconnectCauses string `index:"18" re:""`
|
||||
MediationRunids string `index:"19" re:""`
|
||||
RatedAccounts string `index:"20" re:""`
|
||||
RatedSubjects string `index:"21" re:""`
|
||||
CostInterval string `index:"22" re:""`
|
||||
ActionTriggers string `index:"23" re:""`
|
||||
SaveInterval string `index:"3" re:""`
|
||||
Metrics string `index:"4" re:""`
|
||||
SetupInterval string `index:"5" re:""`
|
||||
Tors string `index:"6" re:""`
|
||||
CdrHosts string `index:"7" re:""`
|
||||
CdrSources string `index:"8" re:""`
|
||||
ReqTypes string `index:"9" re:""`
|
||||
Directions string `index:"10" re:""`
|
||||
Tenants string `index:"11" re:""`
|
||||
Categories string `index:"12" re:""`
|
||||
Accounts string `index:"13" re:""`
|
||||
Subjects string `index:"14" re:""`
|
||||
DestinationPrefixes string `index:"15" re:""`
|
||||
PddInterval string `index:"16" re:""`
|
||||
UsageInterval string `index:"17" re:""`
|
||||
Suppliers string `index:"18" re:""`
|
||||
DisconnectCauses string `index:"19" re:""`
|
||||
MediationRunids string `index:"20" re:""`
|
||||
RatedAccounts string `index:"21" re:""`
|
||||
RatedSubjects string `index:"22" re:""`
|
||||
CostInterval string `index:"23" re:""`
|
||||
ActionTriggers string `index:"24" re:""`
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
|
||||
@@ -165,7 +165,7 @@ func TestGetSessionRuns(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetLCR(t *testing.T) {
|
||||
rsponder.Stats = NewStats(ratingStorage) // Load stats instance
|
||||
rsponder.Stats = NewStats(ratingStorage, accountingStorage, 0) // Load stats instance
|
||||
dstDe := &Destination{Id: "GERMANY", Prefixes: []string{"+49"}}
|
||||
if err := ratingStorage.SetDestination(dstDe); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
@@ -37,13 +38,47 @@ type StatsInterface interface {
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
queues map[string]*StatsQueue
|
||||
mux sync.RWMutex
|
||||
ratingDb RatingStorage
|
||||
queues map[string]*StatsQueue
|
||||
queueSavers map[string]*queueSaver
|
||||
mux sync.RWMutex
|
||||
ratingDb RatingStorage
|
||||
accountingDb AccountingStorage
|
||||
defaultSaveInterval time.Duration
|
||||
}
|
||||
type queueSaver struct {
|
||||
ticker *time.Ticker
|
||||
stopper chan bool
|
||||
}
|
||||
|
||||
func NewStats(ratingDb RatingStorage) *Stats {
|
||||
cdrStats := &Stats{ratingDb: ratingDb}
|
||||
func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver {
|
||||
svr := &queueSaver{
|
||||
ticker: time.NewTicker(saveInterval),
|
||||
stopper: make(chan bool),
|
||||
}
|
||||
go func(id string, c <-chan time.Time, s <-chan bool, sq *StatsQueue, accountDb AccountingStorage) {
|
||||
for {
|
||||
select {
|
||||
case <-c:
|
||||
if sq.IsDirty() {
|
||||
if err := accountDb.SetCdrStatsQueue(sq); err != nil {
|
||||
Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err))
|
||||
}
|
||||
}
|
||||
case <-s:
|
||||
break
|
||||
}
|
||||
}
|
||||
}(id, svr.ticker.C, svr.stopper, sq, adb)
|
||||
return svr
|
||||
}
|
||||
|
||||
func (svr *queueSaver) stop() {
|
||||
svr.ticker.Stop()
|
||||
svr.stopper <- true
|
||||
}
|
||||
|
||||
func NewStats(ratingDb RatingStorage, accountingDb AccountingStorage, saveInterval time.Duration) *Stats {
|
||||
cdrStats := &Stats{ratingDb: ratingDb, accountingDb: accountingDb, defaultSaveInterval: saveInterval}
|
||||
if css, err := ratingDb.GetAllCdrStats(); err == nil {
|
||||
cdrStats.UpdateQueues(css, nil)
|
||||
} else {
|
||||
@@ -79,6 +114,9 @@ func (s *Stats) AddQueue(cs *CdrStats, out *int) error {
|
||||
if s.queues == nil {
|
||||
s.queues = make(map[string]*StatsQueue)
|
||||
}
|
||||
if s.queueSavers == nil {
|
||||
s.queueSavers = make(map[string]*queueSaver)
|
||||
}
|
||||
if sq, exists := s.queues[cs.Id]; exists {
|
||||
sq.UpdateConf(cs)
|
||||
} else {
|
||||
@@ -108,11 +146,11 @@ func (s *Stats) ReloadQueues(ids []string, out *int) error {
|
||||
func (s *Stats) ResetQueues(ids []string, out *int) error {
|
||||
if len(ids) == 0 {
|
||||
for _, sq := range s.queues {
|
||||
sq.cdrs = make([]*QCdr, 0)
|
||||
sq.metrics = make(map[string]Metric, len(sq.conf.Metrics))
|
||||
sq.Cdrs = make([]*QCdr, 0)
|
||||
sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics))
|
||||
for _, m := range sq.conf.Metrics {
|
||||
if metric := CreateMetric(m); metric != nil {
|
||||
sq.metrics[m] = metric
|
||||
sq.Metrics[m] = metric
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -123,11 +161,11 @@ func (s *Stats) ResetQueues(ids []string, out *int) error {
|
||||
Logger.Warning(fmt.Sprintf("Cannot reset queue id %v: Not Fund", id))
|
||||
continue
|
||||
}
|
||||
sq.cdrs = make([]*QCdr, 0)
|
||||
sq.metrics = make(map[string]Metric, len(sq.conf.Metrics))
|
||||
sq.Cdrs = make([]*QCdr, 0)
|
||||
sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics))
|
||||
for _, m := range sq.conf.Metrics {
|
||||
if metric := CreateMetric(m); metric != nil {
|
||||
sq.metrics[m] = metric
|
||||
sq.Metrics[m] = metric
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -157,6 +195,23 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
|
||||
}
|
||||
if sq == nil {
|
||||
sq = NewStatsQueue(cs)
|
||||
// load queue from storage if exists
|
||||
if saved, err := s.accountingDb.GetCdrStatsQueue(sq.GetId()); err == nil {
|
||||
sq.Load(saved)
|
||||
} else {
|
||||
Logger.Info(err.Error())
|
||||
}
|
||||
// setup queue saver
|
||||
if s.queueSavers == nil {
|
||||
s.queueSavers = make(map[string]*queueSaver)
|
||||
}
|
||||
si := cs.SaveInterval
|
||||
if si == 0 {
|
||||
si = s.defaultSaveInterval
|
||||
}
|
||||
if si > 0 {
|
||||
s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb)
|
||||
}
|
||||
}
|
||||
s.queues[cs.Id] = sq
|
||||
}
|
||||
|
||||
@@ -19,16 +19,18 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type StatsQueue struct {
|
||||
cdrs []*QCdr
|
||||
Cdrs []*QCdr
|
||||
conf *CdrStats
|
||||
metrics map[string]Metric
|
||||
Metrics map[string]Metric
|
||||
mux sync.Mutex
|
||||
dirty bool
|
||||
}
|
||||
|
||||
var METRIC_TRIGGER_MAP = map[string]string{
|
||||
@@ -57,7 +59,7 @@ type QCdr struct {
|
||||
|
||||
func NewStatsQueue(conf *CdrStats) *StatsQueue {
|
||||
if conf == nil {
|
||||
return &StatsQueue{metrics: make(map[string]Metric)}
|
||||
return &StatsQueue{Metrics: make(map[string]Metric)}
|
||||
}
|
||||
sq := &StatsQueue{}
|
||||
sq.UpdateConf(conf)
|
||||
@@ -68,28 +70,49 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) {
|
||||
sq.mux.Lock()
|
||||
defer sq.mux.Unlock()
|
||||
sq.conf = conf
|
||||
sq.cdrs = make([]*QCdr, 0)
|
||||
sq.metrics = make(map[string]Metric, len(conf.Metrics))
|
||||
sq.Cdrs = make([]*QCdr, 0)
|
||||
sq.Metrics = make(map[string]Metric, len(conf.Metrics))
|
||||
sq.dirty = true
|
||||
for _, m := range conf.Metrics {
|
||||
if metric := CreateMetric(m); metric != nil {
|
||||
sq.metrics[m] = metric
|
||||
sq.Metrics[m] = metric
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sq *StatsQueue) Load(saved *StatsQueue) {
|
||||
sq.Cdrs = saved.Cdrs
|
||||
for key, metric := range saved.Metrics {
|
||||
if _, exists := sq.Metrics[key]; exists {
|
||||
sq.Metrics[key] = metric
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sq *StatsQueue) IsDirty() bool {
|
||||
sq.mux.Lock()
|
||||
defer sq.mux.Unlock()
|
||||
v := sq.dirty
|
||||
log.Print(v)
|
||||
// take advantage of the locking to set it to flip it
|
||||
sq.dirty = false
|
||||
return v
|
||||
}
|
||||
|
||||
func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
|
||||
sq.mux.Lock()
|
||||
defer sq.mux.Unlock()
|
||||
if sq.conf.AcceptCdr(cdr) {
|
||||
qcdr := sq.simplifyCdr(cdr)
|
||||
sq.cdrs = append(sq.cdrs, qcdr)
|
||||
sq.Cdrs = append(sq.Cdrs, qcdr)
|
||||
sq.addToMetrics(qcdr)
|
||||
sq.purgeObsoleteCdrs()
|
||||
sq.dirty = true
|
||||
// check for trigger
|
||||
stats := sq.getStats()
|
||||
sq.conf.Triggers.Sort()
|
||||
for _, at := range sq.conf.Triggers {
|
||||
if at.MinQueuedItems > 0 && len(sq.cdrs) < at.MinQueuedItems {
|
||||
if at.MinQueuedItems > 0 && len(sq.Cdrs) < at.MinQueuedItems {
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(at.ThresholdType, "*min_") {
|
||||
@@ -111,13 +134,13 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
|
||||
}
|
||||
|
||||
func (sq *StatsQueue) addToMetrics(cdr *QCdr) {
|
||||
for _, metric := range sq.metrics {
|
||||
for _, metric := range sq.Metrics {
|
||||
metric.AddCdr(cdr)
|
||||
}
|
||||
}
|
||||
|
||||
func (sq *StatsQueue) removeFromMetrics(cdr *QCdr) {
|
||||
for _, metric := range sq.metrics {
|
||||
for _, metric := range sq.Metrics {
|
||||
metric.RemoveCdr(cdr)
|
||||
}
|
||||
}
|
||||
@@ -134,22 +157,22 @@ func (sq *StatsQueue) simplifyCdr(cdr *StoredCdr) *QCdr {
|
||||
|
||||
func (sq *StatsQueue) purgeObsoleteCdrs() {
|
||||
if sq.conf.QueueLength > 0 {
|
||||
currentLength := len(sq.cdrs)
|
||||
currentLength := len(sq.Cdrs)
|
||||
if currentLength > sq.conf.QueueLength {
|
||||
for _, cdr := range sq.cdrs[:currentLength-sq.conf.QueueLength] {
|
||||
for _, cdr := range sq.Cdrs[:currentLength-sq.conf.QueueLength] {
|
||||
sq.removeFromMetrics(cdr)
|
||||
}
|
||||
sq.cdrs = sq.cdrs[currentLength-sq.conf.QueueLength:]
|
||||
sq.Cdrs = sq.Cdrs[currentLength-sq.conf.QueueLength:]
|
||||
}
|
||||
}
|
||||
if sq.conf.TimeWindow > 0 {
|
||||
for i, cdr := range sq.cdrs {
|
||||
for i, cdr := range sq.Cdrs {
|
||||
if time.Now().Sub(cdr.SetupTime) > sq.conf.TimeWindow {
|
||||
sq.removeFromMetrics(cdr)
|
||||
continue
|
||||
} else {
|
||||
if i > 0 {
|
||||
sq.cdrs = sq.cdrs[i:]
|
||||
sq.Cdrs = sq.Cdrs[i:]
|
||||
}
|
||||
break
|
||||
}
|
||||
@@ -165,8 +188,8 @@ func (sq *StatsQueue) GetStats() map[string]float64 {
|
||||
}
|
||||
|
||||
func (sq *StatsQueue) getStats() map[string]float64 {
|
||||
stat := make(map[string]float64, len(sq.metrics))
|
||||
for key, metric := range sq.metrics {
|
||||
stat := make(map[string]float64, len(sq.Metrics))
|
||||
for key, metric := range sq.Metrics {
|
||||
stat[key] = metric.GetValue()
|
||||
}
|
||||
return stat
|
||||
|
||||
@@ -27,8 +27,8 @@ import (
|
||||
|
||||
func TestStatsQueueInit(t *testing.T) {
|
||||
sq := NewStatsQueue(&CdrStats{Metrics: []string{ASR, ACC}})
|
||||
if len(sq.metrics) != 2 {
|
||||
t.Error("Expected 2 metrics got ", len(sq.metrics))
|
||||
if len(sq.Metrics) != 2 {
|
||||
t.Error("Expected 2 metrics got ", len(sq.Metrics))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -203,7 +203,7 @@ func TestAcceptCdr(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatsQueueIds(t *testing.T) {
|
||||
cdrStats := NewStats(ratingStorage)
|
||||
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
|
||||
ids := []string{}
|
||||
if err := cdrStats.GetQueueIds(0, &ids); err != nil {
|
||||
t.Error("Errorf getting queue ids: ", err)
|
||||
@@ -216,7 +216,7 @@ func TestStatsQueueIds(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatsAppendCdr(t *testing.T) {
|
||||
cdrStats := NewStats(ratingStorage)
|
||||
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
|
||||
cdr := &StoredCdr{
|
||||
Tenant: "cgrates.org",
|
||||
Category: "call",
|
||||
@@ -231,14 +231,14 @@ func TestStatsAppendCdr(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error("Error appending cdr to stats: ", err)
|
||||
}
|
||||
if len(cdrStats.queues["CDRST1"].cdrs) != 0 ||
|
||||
len(cdrStats.queues["CDRST2"].cdrs) != 1 {
|
||||
t.Error("Error appending cdr to queue: ", len(cdrStats.queues["CDRST2"].cdrs))
|
||||
if len(cdrStats.queues["CDRST1"].Cdrs) != 0 ||
|
||||
len(cdrStats.queues["CDRST2"].Cdrs) != 1 {
|
||||
t.Error("Error appending cdr to queue: ", len(cdrStats.queues["CDRST2"].Cdrs))
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatsGetValues(t *testing.T) {
|
||||
cdrStats := NewStats(ratingStorage)
|
||||
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
|
||||
cdr := &StoredCdr{
|
||||
Tenant: "cgrates.org",
|
||||
Category: "call",
|
||||
@@ -267,7 +267,7 @@ func TestStatsGetValues(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatsReloadQueues(t *testing.T) {
|
||||
cdrStats := NewStats(ratingStorage)
|
||||
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
|
||||
cdr := &StoredCdr{
|
||||
Tenant: "cgrates.org",
|
||||
Category: "call",
|
||||
@@ -287,7 +287,7 @@ func TestStatsReloadQueues(t *testing.T) {
|
||||
result := len(ids)
|
||||
expected := 2
|
||||
if result != expected {
|
||||
t.Errorf("Error loading stats queues. Expected %v was %v", expected, result)
|
||||
t.Errorf("Error loading stats queues. Expected %v was %v: %v", expected, result, ids)
|
||||
}
|
||||
valMap := make(map[string]float64)
|
||||
if err := cdrStats.GetValues("CDRST2", &valMap); err != nil {
|
||||
@@ -299,7 +299,7 @@ func TestStatsReloadQueues(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatsReloadQueuesWithDefault(t *testing.T) {
|
||||
cdrStats := NewStats(ratingStorage)
|
||||
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
|
||||
cdrStats.AddQueue(&CdrStats{
|
||||
Id: utils.META_DEFAULT,
|
||||
}, nil)
|
||||
@@ -335,7 +335,7 @@ func TestStatsReloadQueuesWithDefault(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatsReloadQueuesWithIds(t *testing.T) {
|
||||
cdrStats := NewStats(ratingStorage)
|
||||
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
|
||||
cdr := &StoredCdr{
|
||||
Tenant: "cgrates.org",
|
||||
Category: "call",
|
||||
@@ -366,8 +366,26 @@ func TestStatsReloadQueuesWithIds(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatsSaveQueues(t *testing.T) {
|
||||
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
|
||||
cdr := &StoredCdr{
|
||||
Tenant: "cgrates.org",
|
||||
Category: "call",
|
||||
AnswerTime: time.Now(),
|
||||
SetupTime: time.Now(),
|
||||
Usage: 10 * time.Second,
|
||||
Cost: 10,
|
||||
}
|
||||
cdrStats.AppendCDR(cdr, nil)
|
||||
ids := []string{}
|
||||
cdrStats.GetQueueIds(0, &ids)
|
||||
if _, found := cdrStats.queueSavers["CDRST1"]; !found {
|
||||
t.Error("Error creating queue savers: ", cdrStats.queueSavers)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatsResetQueues(t *testing.T) {
|
||||
cdrStats := NewStats(ratingStorage)
|
||||
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
|
||||
cdr := &StoredCdr{
|
||||
Tenant: "cgrates.org",
|
||||
Category: "call",
|
||||
@@ -399,7 +417,7 @@ func TestStatsResetQueues(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatsResetQueuesWithIds(t *testing.T) {
|
||||
cdrStats := NewStats(ratingStorage)
|
||||
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
|
||||
cdr := &StoredCdr{
|
||||
Tenant: "cgrates.org",
|
||||
Category: "call",
|
||||
|
||||
@@ -77,6 +77,8 @@ type AccountingStorage interface {
|
||||
Storage
|
||||
GetAccount(string) (*Account, error)
|
||||
SetAccount(*Account) error
|
||||
GetCdrStatsQueue(string) (*StatsQueue, error)
|
||||
SetCdrStatsQueue(*StatsQueue) error
|
||||
}
|
||||
|
||||
type CdrStorage interface {
|
||||
|
||||
@@ -262,7 +262,6 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
|
||||
if historyScribe != nil {
|
||||
go historyScribe.Record(rp.GetHistoryRecord(), &response)
|
||||
}
|
||||
//cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -293,7 +292,6 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
|
||||
if historyScribe != nil {
|
||||
go historyScribe.Record(rpf.GetHistoryRecord(), &response)
|
||||
}
|
||||
//cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -318,7 +316,6 @@ func (ms *MapStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) {
|
||||
func (ms *MapStorage) SetLCR(lcr *LCR) (err error) {
|
||||
result, err := ms.ms.Marshal(lcr)
|
||||
ms.dict[utils.LCR_PREFIX+lcr.GetId()] = result
|
||||
//cache2go.Cache(LCR_PREFIX+key, lcr)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -342,7 +339,6 @@ func (ms *MapStorage) GetRpAlias(key string, skipCache bool) (alias string, err
|
||||
|
||||
func (ms *MapStorage) SetRpAlias(key, alias string) (err error) {
|
||||
ms.dict[utils.RP_ALIAS_PREFIX+key] = []byte(alias)
|
||||
//cache2go.Cache(ALIAS_PREFIX+key, alias)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -409,7 +405,6 @@ func (ms *MapStorage) GetAccAlias(key string, skipCache bool) (alias string, err
|
||||
|
||||
func (ms *MapStorage) SetAccAlias(key, alias string) (err error) {
|
||||
ms.dict[utils.ACC_ALIAS_PREFIX+key] = []byte(alias)
|
||||
//cache2go.Cache(ALIAS_PREFIX+key, alias)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -471,7 +466,6 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
|
||||
if historyScribe != nil {
|
||||
go historyScribe.Record(dest.GetHistoryRecord(), &response)
|
||||
}
|
||||
//cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -496,7 +490,6 @@ func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err er
|
||||
func (ms *MapStorage) SetActions(key string, as Actions) (err error) {
|
||||
result, err := ms.ms.Marshal(&as)
|
||||
ms.dict[utils.ACTION_PREFIX+key] = result
|
||||
//cache2go.Cache(ACTION_PREFIX+key, as)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -521,7 +514,6 @@ func (ms *MapStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGrou
|
||||
func (ms *MapStorage) SetSharedGroup(sg *SharedGroup) (err error) {
|
||||
result, err := ms.ms.Marshal(sg)
|
||||
ms.dict[utils.SHARED_GROUP_PREFIX+sg.Id] = result
|
||||
//cache2go.Cache(SHARED_GROUP_PREFIX+key, sg)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -553,6 +545,22 @@ func (ms *MapStorage) SetAccount(ub *Account) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) {
|
||||
if values, ok := ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key]; ok {
|
||||
sq = &StatsQueue{}
|
||||
err = ms.ms.Unmarshal(values, sq)
|
||||
} else {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
|
||||
result, err := ms.ms.Marshal(sq)
|
||||
ms.dict[utils.CDR_STATS_QUEUE_PREFIX+sq.GetId()] = result
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetActionPlans(key string) (ats ActionPlans, err error) {
|
||||
if values, ok := ms.dict[utils.ACTION_TIMING_PREFIX+key]; ok {
|
||||
err = ms.ms.Unmarshal(values, &ats)
|
||||
|
||||
@@ -355,7 +355,6 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) {
|
||||
response := 0
|
||||
go historyScribe.Record(rp.GetHistoryRecord(), &response)
|
||||
}
|
||||
//cache2go.Cache(utils.RATING_PLAN_PREFIX+rp.Id, rp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -385,7 +384,6 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
|
||||
response := 0
|
||||
go historyScribe.Record(rpf.GetHistoryRecord(), &response)
|
||||
}
|
||||
//cache2go.Cache(utils.RATING_PROFILE_PREFIX+rpf.Id, rpf)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -506,7 +504,6 @@ func (rs *RedisStorage) GetAccAlias(key string, skipCache bool) (alias string, e
|
||||
// Adds one alias for one account
|
||||
func (rs *RedisStorage) SetAccAlias(key, alias string) (err error) {
|
||||
err = rs.db.Set(utils.ACC_ALIAS_PREFIX+key, []byte(alias))
|
||||
//cache2go.Cache(ALIAS_PREFIX+key, alias)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -600,7 +597,6 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) {
|
||||
response := 0
|
||||
go historyScribe.Record(dest.GetHistoryRecord(), &response)
|
||||
}
|
||||
//cache2go.Cache(utils.DESTINATION_PREFIX+dest.Id, dest)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -624,7 +620,6 @@ func (rs *RedisStorage) GetActions(key string, skipCache bool) (as Actions, err
|
||||
func (rs *RedisStorage) SetActions(key string, as Actions) (err error) {
|
||||
result, err := rs.ms.Marshal(&as)
|
||||
err = rs.db.Set(utils.ACTION_PREFIX+key, result)
|
||||
// cache2go.Cache(utils.ACTION_PREFIX+key, as)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -648,7 +643,6 @@ func (rs *RedisStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGr
|
||||
func (rs *RedisStorage) SetSharedGroup(sg *SharedGroup) (err error) {
|
||||
result, err := rs.ms.Marshal(sg)
|
||||
err = rs.db.Set(utils.SHARED_GROUP_PREFIX+sg.Id, result)
|
||||
//cache2go.Cache(utils.SHARED_GROUP_PREFIX+sg.Id, sg)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -680,6 +674,22 @@ func (rs *RedisStorage) SetAccount(ub *Account) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) {
|
||||
var values []byte
|
||||
if values, err = rs.db.Get(utils.CDR_STATS_QUEUE_PREFIX + key); err == nil {
|
||||
sq = &StatsQueue{}
|
||||
err = rs.ms.Unmarshal(values, sq)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
|
||||
result, err := rs.ms.Marshal(sq)
|
||||
err = rs.db.Set(utils.CDR_STATS_QUEUE_PREFIX+sq.GetId(), result)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetActionPlans(key string) (ats ActionPlans, err error) {
|
||||
var values []byte
|
||||
if values, err = rs.db.Get(utils.ACTION_TIMING_PREFIX + key); err == nil {
|
||||
|
||||
@@ -324,6 +324,7 @@ type TPCdrStats struct {
|
||||
type TPCdrStat struct {
|
||||
QueueLength string
|
||||
TimeWindow string
|
||||
SaveInterval string
|
||||
Metrics string
|
||||
SetupInterval string
|
||||
TORs string
|
||||
|
||||
@@ -164,6 +164,8 @@ const (
|
||||
DESTINATION_PREFIX = "dst_"
|
||||
LCR_PREFIX = "lcr_"
|
||||
DERIVEDCHARGERS_PREFIX = "dcs_"
|
||||
CDR_STATS_QUEUE_PREFIX = "csq_"
|
||||
CDR_STATS_PREFIX = "cst_"
|
||||
TEMP_DESTINATION_PREFIX = "tmp_"
|
||||
LOG_CALL_COST_PREFIX = "cco_"
|
||||
LOG_ACTION_TIMMING_PREFIX = "ltm_"
|
||||
@@ -180,7 +182,6 @@ const (
|
||||
CREATE_CDRS_TABLES_SQL = "create_cdrs_tables.sql"
|
||||
CREATE_TARIFFPLAN_TABLES_SQL = "create_tariffplan_tables.sql"
|
||||
TEST_SQL = "TEST_SQL"
|
||||
CDR_STATS_PREFIX = "cst_"
|
||||
DESTINATIONS_LOAD_THRESHOLD = 0.1
|
||||
CONSTANT = "constant"
|
||||
FILLER = "filler"
|
||||
|
||||
Reference in New Issue
Block a user