From 3d0553ac378053982d1636719161af7508e06d85 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 18 Jun 2015 23:26:19 +0300 Subject: [PATCH 1/3] started stats saver --- cmd/cgr-engine/cgr-engine.go | 2 +- config/cdrstatsconfig.go | 5 +- config/config.go | 4 + config/config_defaults.go | 1 + config/config_json_test.go | 5 +- config/libconfig_json.go | 3 +- .../mysql/create_tariffplan_tables.sql | 1 + .../postgres/create_tariffplan_tables.sql | 1 + engine/cdrstats.go | 7 +- engine/loader_csv_test.go | 21 +-- engine/model_converters.go | 1 + engine/model_helpers.go | 8 ++ engine/model_helpers_test.go | 8 +- engine/models.go | 43 +++--- engine/responder_test.go | 2 +- engine/stats.go | 75 +++++++++-- engine/stats_queue.go | 70 ++++++---- engine/stats_test.go | 122 +++++++++--------- engine/storage_interface.go | 2 + engine/storage_map.go | 24 ++-- engine/storage_redis.go | 22 +++- utils/apitpdata.go | 1 + utils/consts.go | 3 +- 23 files changed, 269 insertions(+), 162 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ce5b4e8d4..a3b3f8e8f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -492,7 +492,7 @@ func main() { } if cfg.CDRStatsEnabled { // Init it here so we make it availabe to the Apier - cdrStats = engine.NewStats(ratingDb) + cdrStats = engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval) if cfg.CDRStatConfig != nil && len(cfg.CDRStatConfig.Metrics) != 0 { cdrStats.AddQueue(engine.NewCdrStatsFromCdrStatsCfg(cfg.CDRStatConfig), nil) } diff --git a/config/cdrstatsconfig.go b/config/cdrstatsconfig.go index e0a6202d6..e8594a633 100644 --- a/config/cdrstatsconfig.go +++ b/config/cdrstatsconfig.go @@ -26,8 +26,9 @@ type CdrStatsConfig struct { Id string // Config id, unique per config instance QueueLength int // Number of items in the stats buffer TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow - Metrics []string // ASR, ACD, ACC - SetupInterval []time.Time // 2 or less items (>= start interval,< stop_interval) + SaveInterval time.Duration + Metrics []string // ASR, ACD, ACC + SetupInterval []time.Time // 2 or less items (>= start interval,< stop_interval) TORs []string CdrHosts []string CdrSources []string diff --git a/config/config.go b/config/config.go index cc9e34c05..7ee0ea1fe 100644 --- a/config/config.go +++ b/config/config.go @@ -202,6 +202,7 @@ type CGRConfig struct { CDRSReconnects int // number of reconnects to remote services before giving up CDRSCdrReplication []*CdrReplicationCfg // Replicate raw CDRs to a number of servers CDRStatsEnabled bool // Enable CDR Stats service + CDRStatsSaveInterval time.Duration // Save interval duration CDRStatConfig *CdrStatsConfig // Active cdr stats configuration instances, platform level CdreProfiles map[string]*CdreConfig CdrcProfiles map[string]map[string]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath]map[instanceName]{Configs} @@ -590,6 +591,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnCdrstatsCfg != nil { if jsnCdrstatsCfg.Enabled != nil { self.CDRStatsEnabled = *jsnCdrstatsCfg.Enabled + if self.CDRStatsSaveInterval, err = utils.ParseDurationWithSecs(*jsnCdrstatsCfg.Save_Interval); err != nil { + return err + } } } diff --git a/config/config_defaults.go b/config/config_defaults.go index adbef2f23..6427d7c77 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -108,6 +108,7 @@ const CGRATES_CFG_JSON = ` "cdrstats": { "enabled": false, // starts the cdrstats service: + "save_interval": "5s", }, diff --git a/config/config_json_test.go b/config/config_json_test.go index e00d75d6f..61f9ab35f 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -155,12 +155,13 @@ func TestDfCdrsJsonCfg(t *testing.T) { func TestDfCdrStatsJsonCfg(t *testing.T) { eCfg := &CdrStatsJsonCfg{ - Enabled: utils.BoolPointer(false), + Enabled: utils.BoolPointer(false), + Save_Interval: utils.StringPointer("5s"), } if cfg, err := dfCgrJsonCfg.CdrStatsJsonCfg(); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCfg, cfg) { - t.Error("Received: ", cfg) + t.Error("Received: ", *cfg) } } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 23030c04c..4df2fe490 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -86,7 +86,8 @@ type CdrReplicationJsonCfg struct { // Cdrstats config section type CdrStatsJsonCfg struct { - Enabled *bool + Enabled *bool + Save_Interval *string } // One cdr field config, used in cdre and cdrc diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql index e061c0176..5d515dccd 100644 --- a/data/storage/mysql/create_tariffplan_tables.sql +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -309,6 +309,7 @@ CREATE TABLE tp_cdrstats ( `tag` varchar(64) NOT NULL, `queue_length` int(11) NOT NULL, `time_window` varchar(8) NOT NULL, + `save_interval` varchar(8) NOT NULL, `metrics` varchar(64) NOT NULL, `setup_interval` varchar(64) NOT NULL, `tors` varchar(64) NOT NULL, diff --git a/data/storage/postgres/create_tariffplan_tables.sql b/data/storage/postgres/create_tariffplan_tables.sql index 635bd7268..83c8880fa 100644 --- a/data/storage/postgres/create_tariffplan_tables.sql +++ b/data/storage/postgres/create_tariffplan_tables.sql @@ -304,6 +304,7 @@ CREATE TABLE tp_cdrstats ( tag VARCHAR(64) NOT NULL, queue_length INTEGER NOT NULL, time_window VARCHAR(8) NOT NULL, + save_interval VARCHAR(8) NOT NULL, metrics VARCHAR(64) NOT NULL, setup_interval VARCHAR(64) NOT NULL, tors VARCHAR(64) NOT NULL, diff --git a/engine/cdrstats.go b/engine/cdrstats.go index ae777e7e8..b5076eda7 100644 --- a/engine/cdrstats.go +++ b/engine/cdrstats.go @@ -54,9 +54,10 @@ func NewCdrStatsFromCdrStatsCfg(csCfg *config.CdrStatsConfig) *CdrStats { } type CdrStats struct { - Id string // Config id, unique per config instance - QueueLength int // Number of items in the stats buffer - TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow + Id string // Config id, unique per config instance + QueueLength int // Number of items in the stats buffer + TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow + SaveInterval time.Duration Metrics []string // ASR, ACD, ACC SetupInterval []time.Time // CDRFieldFilter on SetupInterval, 2 or less items (>= start interval,< stop_interval) TOR []string // CDRFieldFilter on TORs diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index f3a85049a..37e07fb70 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -202,12 +202,12 @@ vdf,post,*out,POST_AT, *out,cgrates.org,call,dan,*any,extra1,,,,,,rif2,rif2,,,,,,, ` cdrStats = ` -#Id[0],QueueLength[1],TimeWindow[2],Metric[3],SetupInterval[4],TOR[5],CdrHost[6],CdrSource[7],ReqType[8],Direction[9],Tenant[10],Category[11],Account[12],Subject[13],DestinationPrefix[14],PddInterval[15],UsageInterval[16],Supplier[17],DisconnectCause[18],MediationRunIds[19],RatedAccount[20],RatedSubject[21],CostInterval[22],Triggers[23]CDRST1,5,60m,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,*rated,*out,cgrates.org,call,dan,dan,49,5m;10m,suppl1,NORMAL_CLEARING,default,rif,rif,0;2,STANDARD_TRIGGERS -CDRST1,5,60m,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,*rated,*out,cgrates.org,call,dan,dan,49,3m;7m,5m;10m,suppl1,NORMAL_CLEARING,default,rif,rif,0;2,STANDARD_TRIGGERS -CDRST1,,,ACD,,,,,,,,,,,,,,,,,,,,STANDARD_TRIGGER -CDRST1,,,ACC,,,,,,,,,,,,,,,,,,,, -CDRST2,10,10m,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,, -CDRST2,,,ACD,,,,,,,,,,,,,,,,,,,, +#Id[0],QueueLength[1],TimeWindow[2],SaveInterval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24] +CDRST1,5,60m,10s,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,*rated,*out,cgrates.org,call,dan,dan,49,3m;7m,5m;10m,suppl1,NORMAL_CLEARING,default,rif,rif,0;2,STANDARD_TRIGGERS +CDRST1,,,,ACD,,,,,,,,,,,,,,,,,,,,STANDARD_TRIGGER +CDRST1,,,,ACC,,,,,,,,,,,,,,,,,,,, +CDRST2,10,10m,,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,, +CDRST2,,,,ACD,,,,,,,,,,,,,,,,,,,, ` ) @@ -1062,10 +1062,11 @@ func TestLoadCdrStats(t *testing.T) { t.Error("Failed to load cdr stats: ", csvr.cdrStats) } cdrStats1 := &CdrStats{ - Id: "CDRST1", - QueueLength: 5, - TimeWindow: 60 * time.Minute, - Metrics: []string{"ASR", "ACD", "ACC"}, + Id: "CDRST1", + QueueLength: 5, + TimeWindow: 60 * time.Minute, + SaveInterval: 10 * time.Second, + Metrics: []string{"ASR", "ACD", "ACC"}, SetupInterval: []time.Time{ time.Date(2014, 7, 29, 15, 0, 0, 0, time.UTC), time.Date(2014, 7, 29, 16, 0, 0, 0, time.UTC), diff --git a/engine/model_converters.go b/engine/model_converters.go index 543994105..fa6fd18f2 100644 --- a/engine/model_converters.go +++ b/engine/model_converters.go @@ -325,6 +325,7 @@ func APItoModelCdrStat(stats *utils.TPCdrStats) (result []TpCdrstat) { Tag: stats.CdrStatsId, QueueLength: ql, TimeWindow: st.TimeWindow, + SaveInterval: st.SaveInterval, Metrics: st.Metrics, SetupInterval: st.SetupInterval, Tors: st.TORs, diff --git a/engine/model_helpers.go b/engine/model_helpers.go index da256f696..823f21185 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -506,6 +506,7 @@ func (tps TpCdrStats) GetCdrStats() (map[string][]*utils.TPCdrStat, error) { QueueLength: strconv.Itoa(tpCs.QueueLength), TimeWindow: tpCs.TimeWindow, Metrics: tpCs.Metrics, + SaveInterval: tpCs.SaveInterval, SetupInterval: tpCs.SetupInterval, TORs: tpCs.Tors, CdrHosts: tpCs.CdrHosts, @@ -546,6 +547,13 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, tpCs *util log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.TimeWindow, cs.Id) } } + if tpCs.SaveInterval != "" { + if si, err := time.ParseDuration(tpCs.SaveInterval); err == nil { + cs.SaveInterval = si + } else { + log.Printf("Error parsing SaveInterval %v for cdr stats %v", tpCs.SaveInterval, cs.Id) + } + } if tpCs.Metrics != "" { cs.Metrics = append(cs.Metrics, tpCs.Metrics) } diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 54dd70cd7..bea90683f 100644 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -16,7 +16,7 @@ func TestModelHelperCsvLoad(t *testing.T) { } func TestModelHelperCsvLoadInt(t *testing.T) { - l, err := csvLoad(TpCdrstat{}, []string{"CDRST1", "5", "60m", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", "*rated", "*out", "cgrates.org", "call", "dan", "dan", "49", "3m;7m", "5m;10m", "suppl1", "NORMAL_CLEARING", "default", "rif", "rif", "0;2", "STANDARD_TRIGGERS"}) + l, err := csvLoad(TpCdrstat{}, []string{"CDRST1", "5", "60m", "10s", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", "*rated", "*out", "cgrates.org", "call", "dan", "dan", "49", "3m;7m", "5m;10m", "suppl1", "NORMAL_CLEARING", "default", "rif", "rif", "0;2", "STANDARD_TRIGGERS"}) tpd, ok := l.(TpCdrstat) if err != nil || !ok || tpd.QueueLength != 5 { t.Errorf("model load failed: %+v", tpd) @@ -377,6 +377,7 @@ func TestTPCdrStatsAsExportSlice(t *testing.T) { &utils.TPCdrStat{ QueueLength: "5", TimeWindow: "60m", + SaveInterval: "10s", Metrics: "ASR;ACD", SetupInterval: "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", TORs: "*voice", @@ -401,6 +402,7 @@ func TestTPCdrStatsAsExportSlice(t *testing.T) { &utils.TPCdrStat{ QueueLength: "5", TimeWindow: "60m", + SaveInterval: "9s", Metrics: "ASR", SetupInterval: "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", TORs: "*voice", @@ -425,9 +427,9 @@ func TestTPCdrStatsAsExportSlice(t *testing.T) { }, } expectedSlc := [][]string{ - []string{"CDRST1", "5", "60m", "ASR;ACD", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call", + []string{"CDRST1", "5", "60m", "10s", "ASR;ACD", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call", "dan", "dan", "49", "3m;7m", "5m;10m", "supplier1", "NORMAL_CLEARNING", "default", "rif", "rif", "0;2", "STANDARD_TRIGGERS"}, - []string{"CDRST1", "5", "60m", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call", + []string{"CDRST1", "5", "60m", "9s", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call", "dan", "dan", "49", "3m;7m", "5m;10m", "supplier1", "NORMAL_CLEARNING", "default", "dan", "dan", "0;2", "STANDARD_TRIGGERS"}, } ms := APItoModelCdrStat(cdrStats) diff --git a/engine/models.go b/engine/models.go index abaafe132..6280db899 100644 --- a/engine/models.go +++ b/engine/models.go @@ -295,27 +295,28 @@ type TpCdrstat struct { Tag string `index:"0" re:""` QueueLength int `index:"1" re:""` TimeWindow string `index:"2" re:""` - Metrics string `index:"3" re:""` - SetupInterval string `index:"4" re:""` - Tors string `index:"5" re:""` - CdrHosts string `index:"6" re:""` - CdrSources string `index:"7" re:""` - ReqTypes string `index:"8" re:""` - Directions string `index:"9" re:""` - Tenants string `index:"10" re:""` - Categories string `index:"11" re:""` - Accounts string `index:"12" re:""` - Subjects string `index:"13" re:""` - DestinationPrefixes string `index:"14" re:""` - PddInterval string `index:"15" re:""` - UsageInterval string `index:"16" re:""` - Suppliers string `index:"17" re:""` - DisconnectCauses string `index:"18" re:""` - MediationRunids string `index:"19" re:""` - RatedAccounts string `index:"20" re:""` - RatedSubjects string `index:"21" re:""` - CostInterval string `index:"22" re:""` - ActionTriggers string `index:"23" re:""` + SaveInterval string `index:"3" re:""` + Metrics string `index:"4" re:""` + SetupInterval string `index:"5" re:""` + Tors string `index:"6" re:""` + CdrHosts string `index:"7" re:""` + CdrSources string `index:"8" re:""` + ReqTypes string `index:"9" re:""` + Directions string `index:"10" re:""` + Tenants string `index:"11" re:""` + Categories string `index:"12" re:""` + Accounts string `index:"13" re:""` + Subjects string `index:"14" re:""` + DestinationPrefixes string `index:"15" re:""` + PddInterval string `index:"16" re:""` + UsageInterval string `index:"17" re:""` + Suppliers string `index:"18" re:""` + DisconnectCauses string `index:"19" re:""` + MediationRunids string `index:"20" re:""` + RatedAccounts string `index:"21" re:""` + RatedSubjects string `index:"22" re:""` + CostInterval string `index:"23" re:""` + ActionTriggers string `index:"24" re:""` CreatedAt time.Time } diff --git a/engine/responder_test.go b/engine/responder_test.go index 3df158e79..17e20d6fd 100644 --- a/engine/responder_test.go +++ b/engine/responder_test.go @@ -165,7 +165,7 @@ func TestGetSessionRuns(t *testing.T) { } func TestGetLCR(t *testing.T) { - rsponder.Stats = NewStats(ratingStorage) // Load stats instance + rsponder.Stats = NewStats(ratingStorage, accountingStorage, 0) // Load stats instance dstDe := &Destination{Id: "GERMANY", Prefixes: []string{"+49"}} if err := ratingStorage.SetDestination(dstDe); err != nil { t.Error(err) diff --git a/engine/stats.go b/engine/stats.go index c6482f6b4..3192efe04 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" @@ -37,13 +38,47 @@ type StatsInterface interface { } type Stats struct { - queues map[string]*StatsQueue - mux sync.RWMutex - ratingDb RatingStorage + queues map[string]*StatsQueue + queueSavers map[string]*queueSaver + mux sync.RWMutex + ratingDb RatingStorage + accountingDb AccountingStorage + defaultSaveInterval time.Duration +} +type queueSaver struct { + ticker *time.Ticker + stopper chan bool } -func NewStats(ratingDb RatingStorage) *Stats { - cdrStats := &Stats{ratingDb: ratingDb} +func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver { + svr := &queueSaver{ + ticker: time.NewTicker(saveInterval), + stopper: make(chan bool), + } + go func(id string, c <-chan time.Time, s <-chan bool, sq *StatsQueue, accountDb AccountingStorage) { + for { + select { + case <-c: + if sq.IsDirty() { + if err := accountDb.SetCdrStatsQueue(id, sq); err != nil { + Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err)) + } + } + case <-s: + break + } + } + }(id, svr.ticker.C, svr.stopper, sq, adb) + return svr +} + +func (svr *queueSaver) stop() { + svr.ticker.Stop() + svr.stopper <- true +} + +func NewStats(ratingDb RatingStorage, accountingDb AccountingStorage, saveInterval time.Duration) *Stats { + cdrStats := &Stats{ratingDb: ratingDb, accountingDb: accountingDb, defaultSaveInterval: saveInterval} if css, err := ratingDb.GetAllCdrStats(); err == nil { cdrStats.UpdateQueues(css, nil) } else { @@ -79,6 +114,9 @@ func (s *Stats) AddQueue(cs *CdrStats, out *int) error { if s.queues == nil { s.queues = make(map[string]*StatsQueue) } + if s.queueSavers == nil { + s.queueSavers = make(map[string]*queueSaver) + } if sq, exists := s.queues[cs.Id]; exists { sq.UpdateConf(cs) } else { @@ -108,11 +146,11 @@ func (s *Stats) ReloadQueues(ids []string, out *int) error { func (s *Stats) ResetQueues(ids []string, out *int) error { if len(ids) == 0 { for _, sq := range s.queues { - sq.cdrs = make([]*QCdr, 0) - sq.metrics = make(map[string]Metric, len(sq.conf.Metrics)) - for _, m := range sq.conf.Metrics { + sq.Cdrs = make([]*QCdr, 0) + sq.Metrics = make(map[string]Metric, len(sq.Conf.Metrics)) + for _, m := range sq.Conf.Metrics { if metric := CreateMetric(m); metric != nil { - sq.metrics[m] = metric + sq.Metrics[m] = metric } } } @@ -123,11 +161,11 @@ func (s *Stats) ResetQueues(ids []string, out *int) error { Logger.Warning(fmt.Sprintf("Cannot reset queue id %v: Not Fund", id)) continue } - sq.cdrs = make([]*QCdr, 0) - sq.metrics = make(map[string]Metric, len(sq.conf.Metrics)) - for _, m := range sq.conf.Metrics { + sq.Cdrs = make([]*QCdr, 0) + sq.Metrics = make(map[string]Metric, len(sq.Conf.Metrics)) + for _, m := range sq.Conf.Metrics { if metric := CreateMetric(m); metric != nil { - sq.metrics[m] = metric + sq.Metrics[m] = metric } } } @@ -143,8 +181,9 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { defer s.mux.Unlock() oldQueues := s.queues s.queues = make(map[string]*StatsQueue, len(css)) + s.queueSavers = make(map[string]*queueSaver) if def, exists := oldQueues[utils.META_DEFAULT]; exists { - def.UpdateConf(def.conf) // for reset + def.UpdateConf(def.Conf) // for reset s.queues[utils.META_DEFAULT] = def } for _, cs := range css { @@ -158,6 +197,14 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { if sq == nil { sq = NewStatsQueue(cs) } + + si := cs.SaveInterval + if si == 0 { + si = s.defaultSaveInterval + } + if si > 0 { + s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb) + } s.queues[cs.Id] = sq } return nil diff --git a/engine/stats_queue.go b/engine/stats_queue.go index 36aa30db3..5c76111a3 100644 --- a/engine/stats_queue.go +++ b/engine/stats_queue.go @@ -25,10 +25,11 @@ import ( ) type StatsQueue struct { - cdrs []*QCdr - conf *CdrStats - metrics map[string]Metric + Cdrs []*QCdr + Conf *CdrStats + Metrics map[string]Metric mux sync.Mutex + dirty bool } var METRIC_TRIGGER_MAP = map[string]string{ @@ -57,7 +58,7 @@ type QCdr struct { func NewStatsQueue(conf *CdrStats) *StatsQueue { if conf == nil { - return &StatsQueue{metrics: make(map[string]Metric)} + return &StatsQueue{Metrics: make(map[string]Metric)} } sq := &StatsQueue{} sq.UpdateConf(conf) @@ -67,29 +68,40 @@ func NewStatsQueue(conf *CdrStats) *StatsQueue { func (sq *StatsQueue) UpdateConf(conf *CdrStats) { sq.mux.Lock() defer sq.mux.Unlock() - sq.conf = conf - sq.cdrs = make([]*QCdr, 0) - sq.metrics = make(map[string]Metric, len(conf.Metrics)) + sq.Conf = conf + sq.Cdrs = make([]*QCdr, 0) + sq.Metrics = make(map[string]Metric, len(conf.Metrics)) + sq.dirty = true for _, m := range conf.Metrics { if metric := CreateMetric(m); metric != nil { - sq.metrics[m] = metric + sq.Metrics[m] = metric } } } +func (sq *StatsQueue) IsDirty() bool { + sq.mux.Lock() + defer sq.mux.Unlock() + v := sq.dirty + // take advantage of the locking to set it to flip it + sq.dirty = false + return v +} + func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { sq.mux.Lock() defer sq.mux.Unlock() - if sq.conf.AcceptCdr(cdr) { + if sq.Conf.AcceptCdr(cdr) { qcdr := sq.simplifyCdr(cdr) - sq.cdrs = append(sq.cdrs, qcdr) + sq.Cdrs = append(sq.Cdrs, qcdr) sq.addToMetrics(qcdr) sq.purgeObsoleteCdrs() + sq.dirty = true // check for trigger stats := sq.getStats() - sq.conf.Triggers.Sort() - for _, at := range sq.conf.Triggers { - if at.MinQueuedItems > 0 && len(sq.cdrs) < at.MinQueuedItems { + sq.Conf.Triggers.Sort() + for _, at := range sq.Conf.Triggers { + if at.MinQueuedItems > 0 && len(sq.Cdrs) < at.MinQueuedItems { continue } if strings.HasPrefix(at.ThresholdType, "*min_") { @@ -111,13 +123,15 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { } func (sq *StatsQueue) addToMetrics(cdr *QCdr) { - for _, metric := range sq.metrics { + sq.dirty = true + for _, metric := range sq.Metrics { metric.AddCdr(cdr) } } func (sq *StatsQueue) removeFromMetrics(cdr *QCdr) { - for _, metric := range sq.metrics { + sq.dirty = true + for _, metric := range sq.Metrics { metric.RemoveCdr(cdr) } } @@ -133,23 +147,23 @@ func (sq *StatsQueue) simplifyCdr(cdr *StoredCdr) *QCdr { } func (sq *StatsQueue) purgeObsoleteCdrs() { - if sq.conf.QueueLength > 0 { - currentLength := len(sq.cdrs) - if currentLength > sq.conf.QueueLength { - for _, cdr := range sq.cdrs[:currentLength-sq.conf.QueueLength] { + if sq.Conf.QueueLength > 0 { + currentLength := len(sq.Cdrs) + if currentLength > sq.Conf.QueueLength { + for _, cdr := range sq.Cdrs[:currentLength-sq.Conf.QueueLength] { sq.removeFromMetrics(cdr) } - sq.cdrs = sq.cdrs[currentLength-sq.conf.QueueLength:] + sq.Cdrs = sq.Cdrs[currentLength-sq.Conf.QueueLength:] } } - if sq.conf.TimeWindow > 0 { - for i, cdr := range sq.cdrs { - if time.Now().Sub(cdr.SetupTime) > sq.conf.TimeWindow { + if sq.Conf.TimeWindow > 0 { + for i, cdr := range sq.Cdrs { + if time.Now().Sub(cdr.SetupTime) > sq.Conf.TimeWindow { sq.removeFromMetrics(cdr) continue } else { if i > 0 { - sq.cdrs = sq.cdrs[i:] + sq.Cdrs = sq.Cdrs[i:] } break } @@ -165,20 +179,20 @@ func (sq *StatsQueue) GetStats() map[string]float64 { } func (sq *StatsQueue) getStats() map[string]float64 { - stat := make(map[string]float64, len(sq.metrics)) - for key, metric := range sq.metrics { + stat := make(map[string]float64, len(sq.Metrics)) + for key, metric := range sq.Metrics { stat[key] = metric.GetValue() } return stat } func (sq *StatsQueue) GetId() string { - return sq.conf.Id + return sq.Conf.Id } // Convert data into a struct which can be used in actions based on triggers hit func (sq *StatsQueue) Triggered(at *ActionTrigger) *StatsQueueTriggered { - return &StatsQueueTriggered{Id: sq.conf.Id, Metrics: sq.getStats(), Trigger: at} + return &StatsQueueTriggered{Id: sq.Conf.Id, Metrics: sq.getStats(), Trigger: at} } // Struct to be passed to triggered actions diff --git a/engine/stats_test.go b/engine/stats_test.go index 90937ad7f..af054c129 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -27,8 +27,8 @@ import ( func TestStatsQueueInit(t *testing.T) { sq := NewStatsQueue(&CdrStats{Metrics: []string{ASR, ACC}}) - if len(sq.metrics) != 2 { - t.Error("Expected 2 metrics got ", len(sq.metrics)) + if len(sq.Metrics) != 2 { + t.Error("Expected 2 metrics got ", len(sq.Metrics)) } } @@ -104,106 +104,106 @@ func TestAcceptCdr(t *testing.T) { MediationRunId: "mri", Cost: 10, } - sq.conf = &CdrStats{} - if sq.conf.AcceptCdr(cdr) != true { + sq.Conf = &CdrStats{} + if sq.Conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{TOR: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{TOR: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{CdrHost: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{CdrHost: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{CdrSource: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{CdrSource: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{Direction: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{Direction: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{Tenant: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{Tenant: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{Category: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{Category: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{Account: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{Account: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{Subject: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{Subject: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{Supplier: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{Supplier: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{DisconnectCause: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{DisconnectCause: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{RatedAccount: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{RatedAccount: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{RatedSubject: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{RatedSubject: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{DestinationPrefix: []string{"test"}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{DestinationPrefix: []string{"test"}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{DestinationPrefix: []string{"test", "123"}} - if sq.conf.AcceptCdr(cdr) != true { + sq.Conf = &CdrStats{DestinationPrefix: []string{"test", "123"}} + if sq.Conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}} - if sq.conf.AcceptCdr(cdr) != true { + sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}} + if sq.Conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} - if sq.conf.AcceptCdr(cdr) != true { + sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} + if sq.Conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{PddInterval: []time.Duration{8 * time.Second}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{PddInterval: []time.Duration{8 * time.Second}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 7 * time.Second}} - if sq.conf.AcceptCdr(cdr) == true { + sq.Conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 7 * time.Second}} + if sq.Conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 8 * time.Second}} - if sq.conf.AcceptCdr(cdr) != true { + sq.Conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 8 * time.Second}} + if sq.Conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } } func TestStatsQueueIds(t *testing.T) { - cdrStats := NewStats(ratingStorage) + cdrStats := NewStats(ratingStorage, accountingStorage, 0) ids := []string{} if err := cdrStats.GetQueueIds(0, &ids); err != nil { t.Error("Errorf getting queue ids: ", err) @@ -216,7 +216,7 @@ func TestStatsQueueIds(t *testing.T) { } func TestStatsAppendCdr(t *testing.T) { - cdrStats := NewStats(ratingStorage) + cdrStats := NewStats(ratingStorage, accountingStorage, 0) cdr := &StoredCdr{ Tenant: "cgrates.org", Category: "call", @@ -231,14 +231,14 @@ func TestStatsAppendCdr(t *testing.T) { if err != nil { t.Error("Error appending cdr to stats: ", err) } - if len(cdrStats.queues["CDRST1"].cdrs) != 0 || - len(cdrStats.queues["CDRST2"].cdrs) != 1 { - t.Error("Error appending cdr to queue: ", len(cdrStats.queues["CDRST2"].cdrs)) + if len(cdrStats.queues["CDRST1"].Cdrs) != 0 || + len(cdrStats.queues["CDRST2"].Cdrs) != 1 { + t.Error("Error appending cdr to queue: ", len(cdrStats.queues["CDRST2"].Cdrs)) } } func TestStatsGetValues(t *testing.T) { - cdrStats := NewStats(ratingStorage) + cdrStats := NewStats(ratingStorage, accountingStorage, 0) cdr := &StoredCdr{ Tenant: "cgrates.org", Category: "call", @@ -267,7 +267,7 @@ func TestStatsGetValues(t *testing.T) { } func TestStatsReloadQueues(t *testing.T) { - cdrStats := NewStats(ratingStorage) + cdrStats := NewStats(ratingStorage, accountingStorage, 0) cdr := &StoredCdr{ Tenant: "cgrates.org", Category: "call", @@ -299,7 +299,7 @@ func TestStatsReloadQueues(t *testing.T) { } func TestStatsReloadQueuesWithDefault(t *testing.T) { - cdrStats := NewStats(ratingStorage) + cdrStats := NewStats(ratingStorage, accountingStorage, 0) cdrStats.AddQueue(&CdrStats{ Id: utils.META_DEFAULT, }, nil) @@ -335,7 +335,7 @@ func TestStatsReloadQueuesWithDefault(t *testing.T) { } func TestStatsReloadQueuesWithIds(t *testing.T) { - cdrStats := NewStats(ratingStorage) + cdrStats := NewStats(ratingStorage, accountingStorage, 0) cdr := &StoredCdr{ Tenant: "cgrates.org", Category: "call", @@ -367,7 +367,7 @@ func TestStatsReloadQueuesWithIds(t *testing.T) { } func TestStatsResetQueues(t *testing.T) { - cdrStats := NewStats(ratingStorage) + cdrStats := NewStats(ratingStorage, accountingStorage, 0) cdr := &StoredCdr{ Tenant: "cgrates.org", Category: "call", @@ -399,7 +399,7 @@ func TestStatsResetQueues(t *testing.T) { } func TestStatsResetQueuesWithIds(t *testing.T) { - cdrStats := NewStats(ratingStorage) + cdrStats := NewStats(ratingStorage, accountingStorage, 0) cdr := &StoredCdr{ Tenant: "cgrates.org", Category: "call", diff --git a/engine/storage_interface.go b/engine/storage_interface.go index bf38979bf..894905304 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -77,6 +77,8 @@ type AccountingStorage interface { Storage GetAccount(string) (*Account, error) SetAccount(*Account) error + GetCdrStatsQueue(string) (*StatsQueue, error) + SetCdrStatsQueue(*StatsQueue) error } type CdrStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index e04d8f98e..e350c5ff6 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -262,7 +262,6 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) { if historyScribe != nil { go historyScribe.Record(rp.GetHistoryRecord(), &response) } - //cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) return } @@ -293,7 +292,6 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) { if historyScribe != nil { go historyScribe.Record(rpf.GetHistoryRecord(), &response) } - //cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return } @@ -318,7 +316,6 @@ func (ms *MapStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) { func (ms *MapStorage) SetLCR(lcr *LCR) (err error) { result, err := ms.ms.Marshal(lcr) ms.dict[utils.LCR_PREFIX+lcr.GetId()] = result - //cache2go.Cache(LCR_PREFIX+key, lcr) return } @@ -342,7 +339,6 @@ func (ms *MapStorage) GetRpAlias(key string, skipCache bool) (alias string, err func (ms *MapStorage) SetRpAlias(key, alias string) (err error) { ms.dict[utils.RP_ALIAS_PREFIX+key] = []byte(alias) - //cache2go.Cache(ALIAS_PREFIX+key, alias) return } @@ -409,7 +405,6 @@ func (ms *MapStorage) GetAccAlias(key string, skipCache bool) (alias string, err func (ms *MapStorage) SetAccAlias(key, alias string) (err error) { ms.dict[utils.ACC_ALIAS_PREFIX+key] = []byte(alias) - //cache2go.Cache(ALIAS_PREFIX+key, alias) return } @@ -471,7 +466,6 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) { if historyScribe != nil { go historyScribe.Record(dest.GetHistoryRecord(), &response) } - //cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) return } @@ -496,7 +490,6 @@ func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err er func (ms *MapStorage) SetActions(key string, as Actions) (err error) { result, err := ms.ms.Marshal(&as) ms.dict[utils.ACTION_PREFIX+key] = result - //cache2go.Cache(ACTION_PREFIX+key, as) return } @@ -521,7 +514,6 @@ func (ms *MapStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGrou func (ms *MapStorage) SetSharedGroup(sg *SharedGroup) (err error) { result, err := ms.ms.Marshal(sg) ms.dict[utils.SHARED_GROUP_PREFIX+sg.Id] = result - //cache2go.Cache(SHARED_GROUP_PREFIX+key, sg) return } @@ -553,6 +545,22 @@ func (ms *MapStorage) SetAccount(ub *Account) (err error) { return } +func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) { + if values, ok := ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key]; ok { + sq = &StatsQueue{} + err = ms.ms.Unmarshal(values, sq) + } else { + return nil, utils.ErrNotFound + } + return +} + +func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { + result, err := ms.ms.Marshal(sq) + ms.dict[utils.CDR_STATS_QUEUE_PREFIX+sq.GetId()] = result + return +} + func (ms *MapStorage) GetActionPlans(key string) (ats ActionPlans, err error) { if values, ok := ms.dict[utils.ACTION_TIMING_PREFIX+key]; ok { err = ms.ms.Unmarshal(values, &ats) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 879500d2a..7ab9d9201 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -355,7 +355,6 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) { response := 0 go historyScribe.Record(rp.GetHistoryRecord(), &response) } - //cache2go.Cache(utils.RATING_PLAN_PREFIX+rp.Id, rp) return } @@ -385,7 +384,6 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) { response := 0 go historyScribe.Record(rpf.GetHistoryRecord(), &response) } - //cache2go.Cache(utils.RATING_PROFILE_PREFIX+rpf.Id, rpf) return } @@ -506,7 +504,6 @@ func (rs *RedisStorage) GetAccAlias(key string, skipCache bool) (alias string, e // Adds one alias for one account func (rs *RedisStorage) SetAccAlias(key, alias string) (err error) { err = rs.db.Set(utils.ACC_ALIAS_PREFIX+key, []byte(alias)) - //cache2go.Cache(ALIAS_PREFIX+key, alias) return } @@ -600,7 +597,6 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { response := 0 go historyScribe.Record(dest.GetHistoryRecord(), &response) } - //cache2go.Cache(utils.DESTINATION_PREFIX+dest.Id, dest) return } @@ -624,7 +620,6 @@ func (rs *RedisStorage) GetActions(key string, skipCache bool) (as Actions, err func (rs *RedisStorage) SetActions(key string, as Actions) (err error) { result, err := rs.ms.Marshal(&as) err = rs.db.Set(utils.ACTION_PREFIX+key, result) - // cache2go.Cache(utils.ACTION_PREFIX+key, as) return } @@ -648,7 +643,6 @@ func (rs *RedisStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGr func (rs *RedisStorage) SetSharedGroup(sg *SharedGroup) (err error) { result, err := rs.ms.Marshal(sg) err = rs.db.Set(utils.SHARED_GROUP_PREFIX+sg.Id, result) - //cache2go.Cache(utils.SHARED_GROUP_PREFIX+sg.Id, sg) return } @@ -680,6 +674,22 @@ func (rs *RedisStorage) SetAccount(ub *Account) (err error) { return } +func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) { + var values []byte + if values, err = rs.db.Get(utils.CDR_STATS_QUEUE_PREFIX + key); err == nil { + sq = &StatsQueue{} + err = rs.ms.Unmarshal(values, sq) + } + + return +} + +func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { + result, err := rs.ms.Marshal(sq) + err = rs.db.Set(utils.CDR_STATS_QUEUE_PREFIX+sq.GetId(), result) + return +} + func (rs *RedisStorage) GetActionPlans(key string) (ats ActionPlans, err error) { var values []byte if values, err = rs.db.Get(utils.ACTION_TIMING_PREFIX + key); err == nil { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 2e786d0a1..383ce5682 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -324,6 +324,7 @@ type TPCdrStats struct { type TPCdrStat struct { QueueLength string TimeWindow string + SaveInterval string Metrics string SetupInterval string TORs string diff --git a/utils/consts.go b/utils/consts.go index 4de2483a8..5a11dc0c0 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -164,6 +164,8 @@ const ( DESTINATION_PREFIX = "dst_" LCR_PREFIX = "lcr_" DERIVEDCHARGERS_PREFIX = "dcs_" + CDR_STATS_QUEUE_PREFIX = "csq_" + CDR_STATS_PREFIX = "cst_" TEMP_DESTINATION_PREFIX = "tmp_" LOG_CALL_COST_PREFIX = "cco_" LOG_ACTION_TIMMING_PREFIX = "ltm_" @@ -180,7 +182,6 @@ const ( CREATE_CDRS_TABLES_SQL = "create_cdrs_tables.sql" CREATE_TARIFFPLAN_TABLES_SQL = "create_tariffplan_tables.sql" TEST_SQL = "TEST_SQL" - CDR_STATS_PREFIX = "cst_" DESTINATIONS_LOAD_THRESHOLD = 0.1 CONSTANT = "constant" FILLER = "filler" From ef5413118fcc1a5f597f6ecbc31f5d353551630e Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 19 Jun 2015 22:55:27 +0300 Subject: [PATCH 2/3] first draft of cdr stats queues save/load --- engine/stats.go | 38 ++++++++------ engine/stats_queue.go | 39 ++++++++------ engine/stats_test.go | 116 ++++++++++++++++++++++++------------------ 3 files changed, 114 insertions(+), 79 deletions(-) diff --git a/engine/stats.go b/engine/stats.go index 3192efe04..1c83b775d 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -60,7 +60,7 @@ func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb Ac select { case <-c: if sq.IsDirty() { - if err := accountDb.SetCdrStatsQueue(id, sq); err != nil { + if err := accountDb.SetCdrStatsQueue(sq); err != nil { Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err)) } } @@ -147,8 +147,8 @@ func (s *Stats) ResetQueues(ids []string, out *int) error { if len(ids) == 0 { for _, sq := range s.queues { sq.Cdrs = make([]*QCdr, 0) - sq.Metrics = make(map[string]Metric, len(sq.Conf.Metrics)) - for _, m := range sq.Conf.Metrics { + sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics)) + for _, m := range sq.conf.Metrics { if metric := CreateMetric(m); metric != nil { sq.Metrics[m] = metric } @@ -162,8 +162,8 @@ func (s *Stats) ResetQueues(ids []string, out *int) error { continue } sq.Cdrs = make([]*QCdr, 0) - sq.Metrics = make(map[string]Metric, len(sq.Conf.Metrics)) - for _, m := range sq.Conf.Metrics { + sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics)) + for _, m := range sq.conf.Metrics { if metric := CreateMetric(m); metric != nil { sq.Metrics[m] = metric } @@ -181,9 +181,8 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { defer s.mux.Unlock() oldQueues := s.queues s.queues = make(map[string]*StatsQueue, len(css)) - s.queueSavers = make(map[string]*queueSaver) if def, exists := oldQueues[utils.META_DEFAULT]; exists { - def.UpdateConf(def.Conf) // for reset + def.UpdateConf(def.conf) // for reset s.queues[utils.META_DEFAULT] = def } for _, cs := range css { @@ -196,14 +195,23 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { } if sq == nil { sq = NewStatsQueue(cs) - } - - si := cs.SaveInterval - if si == 0 { - si = s.defaultSaveInterval - } - if si > 0 { - s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb) + // load queue from storage if exists + if saved, err := s.accountingDb.GetCdrStatsQueue(sq.GetId()); err == nil { + sq.Load(saved) + } else { + Logger.Info(err.Error()) + } + // setup queue saver + if s.queueSavers == nil { + s.queueSavers = make(map[string]*queueSaver) + } + si := cs.SaveInterval + if si == 0 { + si = s.defaultSaveInterval + } + if si > 0 { + s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb) + } } s.queues[cs.Id] = sq } diff --git a/engine/stats_queue.go b/engine/stats_queue.go index 5c76111a3..4176bd49c 100644 --- a/engine/stats_queue.go +++ b/engine/stats_queue.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "log" "strings" "sync" "time" @@ -26,7 +27,7 @@ import ( type StatsQueue struct { Cdrs []*QCdr - Conf *CdrStats + conf *CdrStats Metrics map[string]Metric mux sync.Mutex dirty bool @@ -68,7 +69,7 @@ func NewStatsQueue(conf *CdrStats) *StatsQueue { func (sq *StatsQueue) UpdateConf(conf *CdrStats) { sq.mux.Lock() defer sq.mux.Unlock() - sq.Conf = conf + sq.conf = conf sq.Cdrs = make([]*QCdr, 0) sq.Metrics = make(map[string]Metric, len(conf.Metrics)) sq.dirty = true @@ -79,10 +80,20 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) { } } +func (sq *StatsQueue) Load(saved *StatsQueue) { + sq.Cdrs = saved.Cdrs + for key, metric := range saved.Metrics { + if _, exists := sq.Metrics[key]; exists { + sq.Metrics[key] = metric + } + } +} + func (sq *StatsQueue) IsDirty() bool { sq.mux.Lock() defer sq.mux.Unlock() v := sq.dirty + log.Print(v) // take advantage of the locking to set it to flip it sq.dirty = false return v @@ -91,7 +102,7 @@ func (sq *StatsQueue) IsDirty() bool { func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { sq.mux.Lock() defer sq.mux.Unlock() - if sq.Conf.AcceptCdr(cdr) { + if sq.conf.AcceptCdr(cdr) { qcdr := sq.simplifyCdr(cdr) sq.Cdrs = append(sq.Cdrs, qcdr) sq.addToMetrics(qcdr) @@ -99,8 +110,8 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { sq.dirty = true // check for trigger stats := sq.getStats() - sq.Conf.Triggers.Sort() - for _, at := range sq.Conf.Triggers { + sq.conf.Triggers.Sort() + for _, at := range sq.conf.Triggers { if at.MinQueuedItems > 0 && len(sq.Cdrs) < at.MinQueuedItems { continue } @@ -123,14 +134,12 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { } func (sq *StatsQueue) addToMetrics(cdr *QCdr) { - sq.dirty = true for _, metric := range sq.Metrics { metric.AddCdr(cdr) } } func (sq *StatsQueue) removeFromMetrics(cdr *QCdr) { - sq.dirty = true for _, metric := range sq.Metrics { metric.RemoveCdr(cdr) } @@ -147,18 +156,18 @@ func (sq *StatsQueue) simplifyCdr(cdr *StoredCdr) *QCdr { } func (sq *StatsQueue) purgeObsoleteCdrs() { - if sq.Conf.QueueLength > 0 { + if sq.conf.QueueLength > 0 { currentLength := len(sq.Cdrs) - if currentLength > sq.Conf.QueueLength { - for _, cdr := range sq.Cdrs[:currentLength-sq.Conf.QueueLength] { + if currentLength > sq.conf.QueueLength { + for _, cdr := range sq.Cdrs[:currentLength-sq.conf.QueueLength] { sq.removeFromMetrics(cdr) } - sq.Cdrs = sq.Cdrs[currentLength-sq.Conf.QueueLength:] + sq.Cdrs = sq.Cdrs[currentLength-sq.conf.QueueLength:] } } - if sq.Conf.TimeWindow > 0 { + if sq.conf.TimeWindow > 0 { for i, cdr := range sq.Cdrs { - if time.Now().Sub(cdr.SetupTime) > sq.Conf.TimeWindow { + if time.Now().Sub(cdr.SetupTime) > sq.conf.TimeWindow { sq.removeFromMetrics(cdr) continue } else { @@ -187,12 +196,12 @@ func (sq *StatsQueue) getStats() map[string]float64 { } func (sq *StatsQueue) GetId() string { - return sq.Conf.Id + return sq.conf.Id } // Convert data into a struct which can be used in actions based on triggers hit func (sq *StatsQueue) Triggered(at *ActionTrigger) *StatsQueueTriggered { - return &StatsQueueTriggered{Id: sq.Conf.Id, Metrics: sq.getStats(), Trigger: at} + return &StatsQueueTriggered{Id: sq.conf.Id, Metrics: sq.getStats(), Trigger: at} } // Struct to be passed to triggered actions diff --git a/engine/stats_test.go b/engine/stats_test.go index af054c129..bdaace3ad 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -104,100 +104,100 @@ func TestAcceptCdr(t *testing.T) { MediationRunId: "mri", Cost: 10, } - sq.Conf = &CdrStats{} - if sq.Conf.AcceptCdr(cdr) != true { + sq.conf = &CdrStats{} + if sq.conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{TOR: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{TOR: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{CdrHost: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{CdrHost: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{CdrSource: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{CdrSource: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Direction: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Direction: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Tenant: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Tenant: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Category: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Category: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Account: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Account: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Subject: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Subject: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Supplier: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Supplier: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{DisconnectCause: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{DisconnectCause: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{RatedAccount: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{RatedAccount: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{RatedSubject: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{RatedSubject: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{DestinationPrefix: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{DestinationPrefix: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{DestinationPrefix: []string{"test", "123"}} - if sq.Conf.AcceptCdr(cdr) != true { + sq.conf = &CdrStats{DestinationPrefix: []string{"test", "123"}} + if sq.conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}} - if sq.Conf.AcceptCdr(cdr) != true { + sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}} + if sq.conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} - if sq.Conf.AcceptCdr(cdr) != true { + sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} + if sq.conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{PddInterval: []time.Duration{8 * time.Second}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{PddInterval: []time.Duration{8 * time.Second}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 7 * time.Second}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 7 * time.Second}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 8 * time.Second}} - if sq.Conf.AcceptCdr(cdr) != true { + sq.conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 8 * time.Second}} + if sq.conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } } @@ -287,7 +287,7 @@ func TestStatsReloadQueues(t *testing.T) { result := len(ids) expected := 2 if result != expected { - t.Errorf("Error loading stats queues. Expected %v was %v", expected, result) + t.Errorf("Error loading stats queues. Expected %v was %v: %v", expected, result, ids) } valMap := make(map[string]float64) if err := cdrStats.GetValues("CDRST2", &valMap); err != nil { @@ -366,6 +366,24 @@ func TestStatsReloadQueuesWithIds(t *testing.T) { } } +func TestStatsSaveQueues(t *testing.T) { + cdrStats := NewStats(ratingStorage, accountingStorage, 0) + cdr := &StoredCdr{ + Tenant: "cgrates.org", + Category: "call", + AnswerTime: time.Now(), + SetupTime: time.Now(), + Usage: 10 * time.Second, + Cost: 10, + } + cdrStats.AppendCDR(cdr, nil) + ids := []string{} + cdrStats.GetQueueIds(0, &ids) + if _, found := cdrStats.queueSavers["CDRST1"]; !found { + t.Error("Error creating queue savers: ", cdrStats.queueSavers) + } +} + func TestStatsResetQueues(t *testing.T) { cdrStats := NewStats(ratingStorage, accountingStorage, 0) cdr := &StoredCdr{ From 10c13d06940f6d6ca15a5814124274980fedca34 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 22 Jun 2015 23:21:36 +0300 Subject: [PATCH 3/3] update data files and test fixes --- config/config.go | 6 ++- data/conf/cgrates/cgrates.json | 3 +- data/conf/samples/cdrstats/cdrstats.json | 1 + data/conf/samples/fscsv/cgrates.json | 1 + data/conf/samples/osips_cdrs_cdrstats.cfg | 4 +- data/conf/samples/tutlocal/cgrates.json | 1 + data/tariffplans/cdrstats/CdrStats.csv | 12 ++--- .../tariffplans/prepaid1centpsec/CdrStats.csv | 12 ++--- data/tariffplans/tutorial/CdrStats.csv | 48 +++++++++---------- 9 files changed, 47 insertions(+), 41 deletions(-) diff --git a/config/config.go b/config/config.go index 7ee0ea1fe..989b0f655 100644 --- a/config/config.go +++ b/config/config.go @@ -591,8 +591,10 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnCdrstatsCfg != nil { if jsnCdrstatsCfg.Enabled != nil { self.CDRStatsEnabled = *jsnCdrstatsCfg.Enabled - if self.CDRStatsSaveInterval, err = utils.ParseDurationWithSecs(*jsnCdrstatsCfg.Save_Interval); err != nil { - return err + if jsnCdrstatsCfg.Save_Interval != nil { + if self.CDRStatsSaveInterval, err = utils.ParseDurationWithSecs(*jsnCdrstatsCfg.Save_Interval); err != nil { + return err + } } } } diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index d8c6fe481..9fab1711f 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -87,7 +87,8 @@ //"cdrstats": { -// "enabled": false, // starts the cdrstats service: + // "enabled": false, // starts the cdrstats service: + // "save_interval": "5s", //}, diff --git a/data/conf/samples/cdrstats/cdrstats.json b/data/conf/samples/cdrstats/cdrstats.json index ae313dc69..63b9b7b96 100644 --- a/data/conf/samples/cdrstats/cdrstats.json +++ b/data/conf/samples/cdrstats/cdrstats.json @@ -19,6 +19,7 @@ "enabled": true, // starts the cdrstats service: "queue_length": 5, // number of items in the stats buffer "time_window": "0", // will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow + "save_interval": "5s", }, } diff --git a/data/conf/samples/fscsv/cgrates.json b/data/conf/samples/fscsv/cgrates.json index dd3c905ba..c7c8abba9 100644 --- a/data/conf/samples/fscsv/cgrates.json +++ b/data/conf/samples/fscsv/cgrates.json @@ -23,6 +23,7 @@ "cdrstats": { "enabled": true, // starts the cdrstats service: + "save_interval": "5s" }, } diff --git a/data/conf/samples/osips_cdrs_cdrstats.cfg b/data/conf/samples/osips_cdrs_cdrstats.cfg index 3e50ba9f4..fbdeb4dba 100644 --- a/data/conf/samples/osips_cdrs_cdrstats.cfg +++ b/data/conf/samples/osips_cdrs_cdrstats.cfg @@ -27,6 +27,7 @@ enabled = true # Starts Mediator service: . enabled = true # Starts the cdrstats service: #queue_length = 50 # Number of items in the stats buffer time_window = 1h # Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow +save_interval = 5s # metrics = ASR, ACD, ACC # Stat metric ids to build # setup_interval = # Filter on CDR SetupTime # tors = # Filter on CDR TOR fields @@ -39,7 +40,7 @@ time_window = 1h # Will only keep the CDRs who's call setup time is not olde # accounts = # Filter on CDR Account fields # subjects = # Filter on CDR Subject fields # destination_prefixes = # Filter on CDR Destination prefixes -# usage_interval = # Filter on CDR Usage +# usage_interval = # Filter on CDR Usage # mediation_run_ids = # Filter on CDR MediationRunId fields # rated_accounts = # Filter on CDR RatedAccount fields # rated_subjects = # Filter on CDR RatedSubject fields @@ -58,4 +59,3 @@ mi_addr = 172.16.254.77:8020 # Adress where to reach OpenSIPS mi_datagram modu # auth_user = cgrates # Authenticate to email server using this user # auth_passwd = CGRateS.org # Authenticate to email server with this password # from_address = cgr-mailer@localhost.localdomain # From address used when sending emails out - diff --git a/data/conf/samples/tutlocal/cgrates.json b/data/conf/samples/tutlocal/cgrates.json index eb6242fee..21b99b52a 100644 --- a/data/conf/samples/tutlocal/cgrates.json +++ b/data/conf/samples/tutlocal/cgrates.json @@ -17,6 +17,7 @@ "scheduler": { "enabled": true, // start Scheduler service: + "save_interval": "5s", }, "cdrs": { diff --git a/data/tariffplans/cdrstats/CdrStats.csv b/data/tariffplans/cdrstats/CdrStats.csv index 0b14bbeb4..f8c11a4c5 100644 --- a/data/tariffplans/cdrstats/CdrStats.csv +++ b/data/tariffplans/cdrstats/CdrStats.csv @@ -1,6 +1,6 @@ -#Id[0],QueueLength[1],TimeWindow[2],Metric[3],SetupInterval[4],TOR[5],CdrHost[6],CdrSource[7],ReqType[8],Direction[9],Tenant[10],Category[11],Account[12],Subject[13],DestinationPrefix[14],PddInterval[15],UsageInterval[16],Supplier[17],DisconnectCause[18],MediationRunIds[19],RatedAccount[20],RatedSubject[21],CostInterval[22],Triggers[23] -CDRST3,5,60m,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR -CDRST3,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD -CDRST3,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC -CDRST4,10,0,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR -CDRST4,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD \ No newline at end of file +#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24] +CDRST3,5,60m,10s,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR +CDRST3,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD +CDRST3,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC +CDRST4,10,0,10s,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR +CDRST4,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD diff --git a/data/tariffplans/prepaid1centpsec/CdrStats.csv b/data/tariffplans/prepaid1centpsec/CdrStats.csv index 8651538c5..6d365268b 100644 --- a/data/tariffplans/prepaid1centpsec/CdrStats.csv +++ b/data/tariffplans/prepaid1centpsec/CdrStats.csv @@ -1,6 +1,6 @@ -#Id[0],QueueLength[1],TimeWindow[2],Metric[3],SetupInterval[4],TOR[5],CdrHost[6],CdrSource[7],ReqType[8],Direction[9],Tenant[10],Category[11],Account[12],Subject[13],DestinationPrefix[14],PddInterval[15],UsageInterval[16],Supplier[17],DisconnectCause[18],MediationRunIds[19],RatedAccount[20],RatedSubject[21],CostInterval[22],Triggers[23] -CDRST1,5,60m,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST1_WARN_ASR -CDRST1,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST1_WARN_ACD -CDRST1,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST1_WARN_ACC -CDRST2,10,10m,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST2_WARN_ASR -CDRST2,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST2_WARN_ACD \ No newline at end of file +#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24] +CDRST1,5,60m,10s,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST1_WARN_ASR +CDRST1,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST1_WARN_ACD +CDRST1,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST1_WARN_ACC +CDRST2,10,10m,10s,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST2_WARN_ASR +CDRST2,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST2_WARN_ACD diff --git a/data/tariffplans/tutorial/CdrStats.csv b/data/tariffplans/tutorial/CdrStats.csv index 11704622a..340bca3ae 100644 --- a/data/tariffplans/tutorial/CdrStats.csv +++ b/data/tariffplans/tutorial/CdrStats.csv @@ -1,24 +1,24 @@ -#Id[0],QueueLength[1],TimeWindow[2],Metric[3],SetupInterval[4],TOR[5],CdrHost[6],CdrSource[7],ReqType[8],Direction[9],Tenant[10],Category[11],Account[12],Subject[13],DestinationPrefix[14],PddInterval[15],UsageInterval[16],Supplier[17],DisconnectCause[18],MediationRunIds[19],RatedAccount[20],RatedSubject[21],CostInterval[22],Triggers[23] -CDRST1,10,0,ASR,,,,,,,cgrates.org,,,,,,,,,*default,,,,CDRST1_WARN -CDRST1,,,ACD,,,,,,,,,,,,,,,,,,,, -CDRST1,,,ACC,,,,,,,,,,,,,,,,,,,, -CDRST1,,,TCD,,,,,,,,,,,,,,,,,,,, -CDRST1,,,TCC,,,,,,,,,,,,,,,,,,,, -CDRST_1001,10,10m,ASR,,,,,,,cgrates.org,,,1001,,,,,,*default,,,,CDRST1001_WARN -CDRST_1001,,,ACD,,,,,,,,,,,,,,,,,,,, -CDRST_1001,,,ACC,,,,,,,,,,,,,,,,,,,, -CDRST_1002,10,10m,ASR,,,,,,,cgrates.org,,,1002,,,,,,*default,,,,CDRST1001_WARN -CDRST_1002,,,ACD,,,,,,,,,,,,,,,,,,,, -CDRST_1002,,,ACC,,,,,,,,,,,,,,,,,,,, -CDRST_1003,,,ASR,,,,,,,cgrates.org,,,,1003,,,,,*default,,,,CDRST3_WARN -CDRST_1003,,,ACD,,,,,,,,,,,,,,,,,,,, -STATS_SUPPL1,,,ACD,,,,,,,,,,,,,,suppl1,,,,,, -STATS_SUPPL1,,,ASR,,,,,,,,,,,,,,suppl1,,,,,, -STATS_SUPPL1,,,ACC,,,,,,,,,,,,,,suppl1,,,,,, -STATS_SUPPL1,,,TCD,,,,,,,,,,,,,,suppl1,,,,,, -STATS_SUPPL1,,,TCC,,,,,,,,,,,,,,suppl1,,,,,, -STATS_SUPPL2,,,ACD,,,,,,,,,,,,,,suppl2,,,,,, -STATS_SUPPL2,,,ASR,,,,,,,,,,,,,,suppl2,,,,,, -STATS_SUPPL2,,,ACC,,,,,,,,,,,,,,suppl2,,,,,, -STATS_SUPPL2,,,TCD,,,,,,,,,,,,,,suppl2,,,,,, -STATS_SUPPL2,,,TCC,,,,,,,,,,,,,,suppl2,,,,,, +#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24] +CDRST1,10,0,10s,ASR,,,,,,,cgrates.org,,,,,,,,,*default,,,,CDRST1_WARN +CDRST1,,,,ACD,,,,,,,,,,,,,,,,,,,, +CDRST1,,,,ACC,,,,,,,,,,,,,,,,,,,, +CDRST1,,,,TCD,,,,,,,,,,,,,,,,,,,, +CDRST1,,,,TCC,,,,,,,,,,,,,,,,,,,, +CDRST_1001,10,10m,10s,ASR,,,,,,,cgrates.org,,,1001,,,,,,*default,,,,CDRST1001_WARN +CDRST_1001,,,,ACD,,,,,,,,,,,,,,,,,,,, +CDRST_1001,,,,ACC,,,,,,,,,,,,,,,,,,,, +CDRST_1002,10,10m,10s,ASR,,,,,,,cgrates.org,,,1002,,,,,,*default,,,,CDRST1001_WARN +CDRST_1002,,,,ACD,,,,,,,,,,,,,,,,,,,, +CDRST_1002,,,,ACC,,,,,,,,,,,,,,,,,,,, +CDRST_1003,,,,ASR,,,,,,,cgrates.org,,,,1003,,,,,*default,,,,CDRST3_WARN +CDRST_1003,,,,ACD,,,,,,,,,,,,,,,,,,,, +STATS_SUPPL1,,,,ACD,,,,,,,,,,,,,,suppl1,,,,,, +STATS_SUPPL1,,,,ASR,,,,,,,,,,,,,,suppl1,,,,,, +STATS_SUPPL1,,,,ACC,,,,,,,,,,,,,,suppl1,,,,,, +STATS_SUPPL1,,,,TCD,,,,,,,,,,,,,,suppl1,,,,,, +STATS_SUPPL1,,,,TCC,,,,,,,,,,,,,,suppl1,,,,,, +STATS_SUPPL2,,,,ACD,,,,,,,,,,,,,,suppl2,,,,,, +STATS_SUPPL2,,,,ASR,,,,,,,,,,,,,,suppl2,,,,,, +STATS_SUPPL2,,,,ACC,,,,,,,,,,,,,,suppl2,,,,,, +STATS_SUPPL2,,,,TCD,,,,,,,,,,,,,,suppl2,,,,,, +STATS_SUPPL2,,,,TCC,,,,,,,,,,,,,,suppl2,,,,,,