started stats saver

This commit is contained in:
Radu Ioan Fericean
2015-06-18 23:26:19 +03:00
parent 1ae2020c0f
commit 3d0553ac37
23 changed files with 269 additions and 162 deletions

View File

@@ -492,7 +492,7 @@ func main() {
}
if cfg.CDRStatsEnabled { // Init it here so we make it availabe to the Apier
cdrStats = engine.NewStats(ratingDb)
cdrStats = engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval)
if cfg.CDRStatConfig != nil && len(cfg.CDRStatConfig.Metrics) != 0 {
cdrStats.AddQueue(engine.NewCdrStatsFromCdrStatsCfg(cfg.CDRStatConfig), nil)
}

View File

@@ -26,8 +26,9 @@ type CdrStatsConfig struct {
Id string // Config id, unique per config instance
QueueLength int // Number of items in the stats buffer
TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
Metrics []string // ASR, ACD, ACC
SetupInterval []time.Time // 2 or less items (>= start interval,< stop_interval)
SaveInterval time.Duration
Metrics []string // ASR, ACD, ACC
SetupInterval []time.Time // 2 or less items (>= start interval,< stop_interval)
TORs []string
CdrHosts []string
CdrSources []string

View File

@@ -202,6 +202,7 @@ type CGRConfig struct {
CDRSReconnects int // number of reconnects to remote services before giving up
CDRSCdrReplication []*CdrReplicationCfg // Replicate raw CDRs to a number of servers
CDRStatsEnabled bool // Enable CDR Stats service
CDRStatsSaveInterval time.Duration // Save interval duration
CDRStatConfig *CdrStatsConfig // Active cdr stats configuration instances, platform level
CdreProfiles map[string]*CdreConfig
CdrcProfiles map[string]map[string]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath]map[instanceName]{Configs}
@@ -590,6 +591,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
if jsnCdrstatsCfg != nil {
if jsnCdrstatsCfg.Enabled != nil {
self.CDRStatsEnabled = *jsnCdrstatsCfg.Enabled
if self.CDRStatsSaveInterval, err = utils.ParseDurationWithSecs(*jsnCdrstatsCfg.Save_Interval); err != nil {
return err
}
}
}

View File

@@ -108,6 +108,7 @@ const CGRATES_CFG_JSON = `
"cdrstats": {
"enabled": false, // starts the cdrstats service: <true|false>
"save_interval": "5s",
},

View File

@@ -155,12 +155,13 @@ func TestDfCdrsJsonCfg(t *testing.T) {
func TestDfCdrStatsJsonCfg(t *testing.T) {
eCfg := &CdrStatsJsonCfg{
Enabled: utils.BoolPointer(false),
Enabled: utils.BoolPointer(false),
Save_Interval: utils.StringPointer("5s"),
}
if cfg, err := dfCgrJsonCfg.CdrStatsJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Error("Received: ", cfg)
t.Error("Received: ", *cfg)
}
}

View File

@@ -86,7 +86,8 @@ type CdrReplicationJsonCfg struct {
// Cdrstats config section
type CdrStatsJsonCfg struct {
Enabled *bool
Enabled *bool
Save_Interval *string
}
// One cdr field config, used in cdre and cdrc

View File

@@ -309,6 +309,7 @@ CREATE TABLE tp_cdrstats (
`tag` varchar(64) NOT NULL,
`queue_length` int(11) NOT NULL,
`time_window` varchar(8) NOT NULL,
`save_interval` varchar(8) NOT NULL,
`metrics` varchar(64) NOT NULL,
`setup_interval` varchar(64) NOT NULL,
`tors` varchar(64) NOT NULL,

View File

@@ -304,6 +304,7 @@ CREATE TABLE tp_cdrstats (
tag VARCHAR(64) NOT NULL,
queue_length INTEGER NOT NULL,
time_window VARCHAR(8) NOT NULL,
save_interval VARCHAR(8) NOT NULL,
metrics VARCHAR(64) NOT NULL,
setup_interval VARCHAR(64) NOT NULL,
tors VARCHAR(64) NOT NULL,

View File

@@ -54,9 +54,10 @@ func NewCdrStatsFromCdrStatsCfg(csCfg *config.CdrStatsConfig) *CdrStats {
}
type CdrStats struct {
Id string // Config id, unique per config instance
QueueLength int // Number of items in the stats buffer
TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
Id string // Config id, unique per config instance
QueueLength int // Number of items in the stats buffer
TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
SaveInterval time.Duration
Metrics []string // ASR, ACD, ACC
SetupInterval []time.Time // CDRFieldFilter on SetupInterval, 2 or less items (>= start interval,< stop_interval)
TOR []string // CDRFieldFilter on TORs

View File

@@ -202,12 +202,12 @@ vdf,post,*out,POST_AT,
*out,cgrates.org,call,dan,*any,extra1,,,,,,rif2,rif2,,,,,,,
`
cdrStats = `
#Id[0],QueueLength[1],TimeWindow[2],Metric[3],SetupInterval[4],TOR[5],CdrHost[6],CdrSource[7],ReqType[8],Direction[9],Tenant[10],Category[11],Account[12],Subject[13],DestinationPrefix[14],PddInterval[15],UsageInterval[16],Supplier[17],DisconnectCause[18],MediationRunIds[19],RatedAccount[20],RatedSubject[21],CostInterval[22],Triggers[23]CDRST1,5,60m,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,*rated,*out,cgrates.org,call,dan,dan,49,5m;10m,suppl1,NORMAL_CLEARING,default,rif,rif,0;2,STANDARD_TRIGGERS
CDRST1,5,60m,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,*rated,*out,cgrates.org,call,dan,dan,49,3m;7m,5m;10m,suppl1,NORMAL_CLEARING,default,rif,rif,0;2,STANDARD_TRIGGERS
CDRST1,,,ACD,,,,,,,,,,,,,,,,,,,,STANDARD_TRIGGER
CDRST1,,,ACC,,,,,,,,,,,,,,,,,,,,
CDRST2,10,10m,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,
CDRST2,,,ACD,,,,,,,,,,,,,,,,,,,,
#Id[0],QueueLength[1],TimeWindow[2],SaveInterval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24]
CDRST1,5,60m,10s,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,*rated,*out,cgrates.org,call,dan,dan,49,3m;7m,5m;10m,suppl1,NORMAL_CLEARING,default,rif,rif,0;2,STANDARD_TRIGGERS
CDRST1,,,,ACD,,,,,,,,,,,,,,,,,,,,STANDARD_TRIGGER
CDRST1,,,,ACC,,,,,,,,,,,,,,,,,,,,
CDRST2,10,10m,,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,
CDRST2,,,,ACD,,,,,,,,,,,,,,,,,,,,
`
)
@@ -1062,10 +1062,11 @@ func TestLoadCdrStats(t *testing.T) {
t.Error("Failed to load cdr stats: ", csvr.cdrStats)
}
cdrStats1 := &CdrStats{
Id: "CDRST1",
QueueLength: 5,
TimeWindow: 60 * time.Minute,
Metrics: []string{"ASR", "ACD", "ACC"},
Id: "CDRST1",
QueueLength: 5,
TimeWindow: 60 * time.Minute,
SaveInterval: 10 * time.Second,
Metrics: []string{"ASR", "ACD", "ACC"},
SetupInterval: []time.Time{
time.Date(2014, 7, 29, 15, 0, 0, 0, time.UTC),
time.Date(2014, 7, 29, 16, 0, 0, 0, time.UTC),

View File

@@ -325,6 +325,7 @@ func APItoModelCdrStat(stats *utils.TPCdrStats) (result []TpCdrstat) {
Tag: stats.CdrStatsId,
QueueLength: ql,
TimeWindow: st.TimeWindow,
SaveInterval: st.SaveInterval,
Metrics: st.Metrics,
SetupInterval: st.SetupInterval,
Tors: st.TORs,

View File

@@ -506,6 +506,7 @@ func (tps TpCdrStats) GetCdrStats() (map[string][]*utils.TPCdrStat, error) {
QueueLength: strconv.Itoa(tpCs.QueueLength),
TimeWindow: tpCs.TimeWindow,
Metrics: tpCs.Metrics,
SaveInterval: tpCs.SaveInterval,
SetupInterval: tpCs.SetupInterval,
TORs: tpCs.Tors,
CdrHosts: tpCs.CdrHosts,
@@ -546,6 +547,13 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, tpCs *util
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.TimeWindow, cs.Id)
}
}
if tpCs.SaveInterval != "" {
if si, err := time.ParseDuration(tpCs.SaveInterval); err == nil {
cs.SaveInterval = si
} else {
log.Printf("Error parsing SaveInterval %v for cdr stats %v", tpCs.SaveInterval, cs.Id)
}
}
if tpCs.Metrics != "" {
cs.Metrics = append(cs.Metrics, tpCs.Metrics)
}

View File

@@ -16,7 +16,7 @@ func TestModelHelperCsvLoad(t *testing.T) {
}
func TestModelHelperCsvLoadInt(t *testing.T) {
l, err := csvLoad(TpCdrstat{}, []string{"CDRST1", "5", "60m", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", "*rated", "*out", "cgrates.org", "call", "dan", "dan", "49", "3m;7m", "5m;10m", "suppl1", "NORMAL_CLEARING", "default", "rif", "rif", "0;2", "STANDARD_TRIGGERS"})
l, err := csvLoad(TpCdrstat{}, []string{"CDRST1", "5", "60m", "10s", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", "*rated", "*out", "cgrates.org", "call", "dan", "dan", "49", "3m;7m", "5m;10m", "suppl1", "NORMAL_CLEARING", "default", "rif", "rif", "0;2", "STANDARD_TRIGGERS"})
tpd, ok := l.(TpCdrstat)
if err != nil || !ok || tpd.QueueLength != 5 {
t.Errorf("model load failed: %+v", tpd)
@@ -377,6 +377,7 @@ func TestTPCdrStatsAsExportSlice(t *testing.T) {
&utils.TPCdrStat{
QueueLength: "5",
TimeWindow: "60m",
SaveInterval: "10s",
Metrics: "ASR;ACD",
SetupInterval: "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z",
TORs: "*voice",
@@ -401,6 +402,7 @@ func TestTPCdrStatsAsExportSlice(t *testing.T) {
&utils.TPCdrStat{
QueueLength: "5",
TimeWindow: "60m",
SaveInterval: "9s",
Metrics: "ASR",
SetupInterval: "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z",
TORs: "*voice",
@@ -425,9 +427,9 @@ func TestTPCdrStatsAsExportSlice(t *testing.T) {
},
}
expectedSlc := [][]string{
[]string{"CDRST1", "5", "60m", "ASR;ACD", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call",
[]string{"CDRST1", "5", "60m", "10s", "ASR;ACD", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call",
"dan", "dan", "49", "3m;7m", "5m;10m", "supplier1", "NORMAL_CLEARNING", "default", "rif", "rif", "0;2", "STANDARD_TRIGGERS"},
[]string{"CDRST1", "5", "60m", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call",
[]string{"CDRST1", "5", "60m", "9s", "ASR", "2014-07-29T15:00:00Z;2014-07-29T16:00:00Z", "*voice", "87.139.12.167", "FS_JSON", utils.META_RATED, "*out", "cgrates.org", "call",
"dan", "dan", "49", "3m;7m", "5m;10m", "supplier1", "NORMAL_CLEARNING", "default", "dan", "dan", "0;2", "STANDARD_TRIGGERS"},
}
ms := APItoModelCdrStat(cdrStats)

View File

@@ -295,27 +295,28 @@ type TpCdrstat struct {
Tag string `index:"0" re:""`
QueueLength int `index:"1" re:""`
TimeWindow string `index:"2" re:""`
Metrics string `index:"3" re:""`
SetupInterval string `index:"4" re:""`
Tors string `index:"5" re:""`
CdrHosts string `index:"6" re:""`
CdrSources string `index:"7" re:""`
ReqTypes string `index:"8" re:""`
Directions string `index:"9" re:""`
Tenants string `index:"10" re:""`
Categories string `index:"11" re:""`
Accounts string `index:"12" re:""`
Subjects string `index:"13" re:""`
DestinationPrefixes string `index:"14" re:""`
PddInterval string `index:"15" re:""`
UsageInterval string `index:"16" re:""`
Suppliers string `index:"17" re:""`
DisconnectCauses string `index:"18" re:""`
MediationRunids string `index:"19" re:""`
RatedAccounts string `index:"20" re:""`
RatedSubjects string `index:"21" re:""`
CostInterval string `index:"22" re:""`
ActionTriggers string `index:"23" re:""`
SaveInterval string `index:"3" re:""`
Metrics string `index:"4" re:""`
SetupInterval string `index:"5" re:""`
Tors string `index:"6" re:""`
CdrHosts string `index:"7" re:""`
CdrSources string `index:"8" re:""`
ReqTypes string `index:"9" re:""`
Directions string `index:"10" re:""`
Tenants string `index:"11" re:""`
Categories string `index:"12" re:""`
Accounts string `index:"13" re:""`
Subjects string `index:"14" re:""`
DestinationPrefixes string `index:"15" re:""`
PddInterval string `index:"16" re:""`
UsageInterval string `index:"17" re:""`
Suppliers string `index:"18" re:""`
DisconnectCauses string `index:"19" re:""`
MediationRunids string `index:"20" re:""`
RatedAccounts string `index:"21" re:""`
RatedSubjects string `index:"22" re:""`
CostInterval string `index:"23" re:""`
ActionTriggers string `index:"24" re:""`
CreatedAt time.Time
}

View File

@@ -165,7 +165,7 @@ func TestGetSessionRuns(t *testing.T) {
}
func TestGetLCR(t *testing.T) {
rsponder.Stats = NewStats(ratingStorage) // Load stats instance
rsponder.Stats = NewStats(ratingStorage, accountingStorage, 0) // Load stats instance
dstDe := &Destination{Id: "GERMANY", Prefixes: []string{"+49"}}
if err := ratingStorage.SetDestination(dstDe); err != nil {
t.Error(err)

View File

@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
@@ -37,13 +38,47 @@ type StatsInterface interface {
}
type Stats struct {
queues map[string]*StatsQueue
mux sync.RWMutex
ratingDb RatingStorage
queues map[string]*StatsQueue
queueSavers map[string]*queueSaver
mux sync.RWMutex
ratingDb RatingStorage
accountingDb AccountingStorage
defaultSaveInterval time.Duration
}
type queueSaver struct {
ticker *time.Ticker
stopper chan bool
}
func NewStats(ratingDb RatingStorage) *Stats {
cdrStats := &Stats{ratingDb: ratingDb}
func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver {
svr := &queueSaver{
ticker: time.NewTicker(saveInterval),
stopper: make(chan bool),
}
go func(id string, c <-chan time.Time, s <-chan bool, sq *StatsQueue, accountDb AccountingStorage) {
for {
select {
case <-c:
if sq.IsDirty() {
if err := accountDb.SetCdrStatsQueue(id, sq); err != nil {
Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err))
}
}
case <-s:
break
}
}
}(id, svr.ticker.C, svr.stopper, sq, adb)
return svr
}
func (svr *queueSaver) stop() {
svr.ticker.Stop()
svr.stopper <- true
}
func NewStats(ratingDb RatingStorage, accountingDb AccountingStorage, saveInterval time.Duration) *Stats {
cdrStats := &Stats{ratingDb: ratingDb, accountingDb: accountingDb, defaultSaveInterval: saveInterval}
if css, err := ratingDb.GetAllCdrStats(); err == nil {
cdrStats.UpdateQueues(css, nil)
} else {
@@ -79,6 +114,9 @@ func (s *Stats) AddQueue(cs *CdrStats, out *int) error {
if s.queues == nil {
s.queues = make(map[string]*StatsQueue)
}
if s.queueSavers == nil {
s.queueSavers = make(map[string]*queueSaver)
}
if sq, exists := s.queues[cs.Id]; exists {
sq.UpdateConf(cs)
} else {
@@ -108,11 +146,11 @@ func (s *Stats) ReloadQueues(ids []string, out *int) error {
func (s *Stats) ResetQueues(ids []string, out *int) error {
if len(ids) == 0 {
for _, sq := range s.queues {
sq.cdrs = make([]*QCdr, 0)
sq.metrics = make(map[string]Metric, len(sq.conf.Metrics))
for _, m := range sq.conf.Metrics {
sq.Cdrs = make([]*QCdr, 0)
sq.Metrics = make(map[string]Metric, len(sq.Conf.Metrics))
for _, m := range sq.Conf.Metrics {
if metric := CreateMetric(m); metric != nil {
sq.metrics[m] = metric
sq.Metrics[m] = metric
}
}
}
@@ -123,11 +161,11 @@ func (s *Stats) ResetQueues(ids []string, out *int) error {
Logger.Warning(fmt.Sprintf("Cannot reset queue id %v: Not Fund", id))
continue
}
sq.cdrs = make([]*QCdr, 0)
sq.metrics = make(map[string]Metric, len(sq.conf.Metrics))
for _, m := range sq.conf.Metrics {
sq.Cdrs = make([]*QCdr, 0)
sq.Metrics = make(map[string]Metric, len(sq.Conf.Metrics))
for _, m := range sq.Conf.Metrics {
if metric := CreateMetric(m); metric != nil {
sq.metrics[m] = metric
sq.Metrics[m] = metric
}
}
}
@@ -143,8 +181,9 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
defer s.mux.Unlock()
oldQueues := s.queues
s.queues = make(map[string]*StatsQueue, len(css))
s.queueSavers = make(map[string]*queueSaver)
if def, exists := oldQueues[utils.META_DEFAULT]; exists {
def.UpdateConf(def.conf) // for reset
def.UpdateConf(def.Conf) // for reset
s.queues[utils.META_DEFAULT] = def
}
for _, cs := range css {
@@ -158,6 +197,14 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
if sq == nil {
sq = NewStatsQueue(cs)
}
si := cs.SaveInterval
if si == 0 {
si = s.defaultSaveInterval
}
if si > 0 {
s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb)
}
s.queues[cs.Id] = sq
}
return nil

View File

@@ -25,10 +25,11 @@ import (
)
type StatsQueue struct {
cdrs []*QCdr
conf *CdrStats
metrics map[string]Metric
Cdrs []*QCdr
Conf *CdrStats
Metrics map[string]Metric
mux sync.Mutex
dirty bool
}
var METRIC_TRIGGER_MAP = map[string]string{
@@ -57,7 +58,7 @@ type QCdr struct {
func NewStatsQueue(conf *CdrStats) *StatsQueue {
if conf == nil {
return &StatsQueue{metrics: make(map[string]Metric)}
return &StatsQueue{Metrics: make(map[string]Metric)}
}
sq := &StatsQueue{}
sq.UpdateConf(conf)
@@ -67,29 +68,40 @@ func NewStatsQueue(conf *CdrStats) *StatsQueue {
func (sq *StatsQueue) UpdateConf(conf *CdrStats) {
sq.mux.Lock()
defer sq.mux.Unlock()
sq.conf = conf
sq.cdrs = make([]*QCdr, 0)
sq.metrics = make(map[string]Metric, len(conf.Metrics))
sq.Conf = conf
sq.Cdrs = make([]*QCdr, 0)
sq.Metrics = make(map[string]Metric, len(conf.Metrics))
sq.dirty = true
for _, m := range conf.Metrics {
if metric := CreateMetric(m); metric != nil {
sq.metrics[m] = metric
sq.Metrics[m] = metric
}
}
}
func (sq *StatsQueue) IsDirty() bool {
sq.mux.Lock()
defer sq.mux.Unlock()
v := sq.dirty
// take advantage of the locking to set it to flip it
sq.dirty = false
return v
}
func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
sq.mux.Lock()
defer sq.mux.Unlock()
if sq.conf.AcceptCdr(cdr) {
if sq.Conf.AcceptCdr(cdr) {
qcdr := sq.simplifyCdr(cdr)
sq.cdrs = append(sq.cdrs, qcdr)
sq.Cdrs = append(sq.Cdrs, qcdr)
sq.addToMetrics(qcdr)
sq.purgeObsoleteCdrs()
sq.dirty = true
// check for trigger
stats := sq.getStats()
sq.conf.Triggers.Sort()
for _, at := range sq.conf.Triggers {
if at.MinQueuedItems > 0 && len(sq.cdrs) < at.MinQueuedItems {
sq.Conf.Triggers.Sort()
for _, at := range sq.Conf.Triggers {
if at.MinQueuedItems > 0 && len(sq.Cdrs) < at.MinQueuedItems {
continue
}
if strings.HasPrefix(at.ThresholdType, "*min_") {
@@ -111,13 +123,15 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
}
func (sq *StatsQueue) addToMetrics(cdr *QCdr) {
for _, metric := range sq.metrics {
sq.dirty = true
for _, metric := range sq.Metrics {
metric.AddCdr(cdr)
}
}
func (sq *StatsQueue) removeFromMetrics(cdr *QCdr) {
for _, metric := range sq.metrics {
sq.dirty = true
for _, metric := range sq.Metrics {
metric.RemoveCdr(cdr)
}
}
@@ -133,23 +147,23 @@ func (sq *StatsQueue) simplifyCdr(cdr *StoredCdr) *QCdr {
}
func (sq *StatsQueue) purgeObsoleteCdrs() {
if sq.conf.QueueLength > 0 {
currentLength := len(sq.cdrs)
if currentLength > sq.conf.QueueLength {
for _, cdr := range sq.cdrs[:currentLength-sq.conf.QueueLength] {
if sq.Conf.QueueLength > 0 {
currentLength := len(sq.Cdrs)
if currentLength > sq.Conf.QueueLength {
for _, cdr := range sq.Cdrs[:currentLength-sq.Conf.QueueLength] {
sq.removeFromMetrics(cdr)
}
sq.cdrs = sq.cdrs[currentLength-sq.conf.QueueLength:]
sq.Cdrs = sq.Cdrs[currentLength-sq.Conf.QueueLength:]
}
}
if sq.conf.TimeWindow > 0 {
for i, cdr := range sq.cdrs {
if time.Now().Sub(cdr.SetupTime) > sq.conf.TimeWindow {
if sq.Conf.TimeWindow > 0 {
for i, cdr := range sq.Cdrs {
if time.Now().Sub(cdr.SetupTime) > sq.Conf.TimeWindow {
sq.removeFromMetrics(cdr)
continue
} else {
if i > 0 {
sq.cdrs = sq.cdrs[i:]
sq.Cdrs = sq.Cdrs[i:]
}
break
}
@@ -165,20 +179,20 @@ func (sq *StatsQueue) GetStats() map[string]float64 {
}
func (sq *StatsQueue) getStats() map[string]float64 {
stat := make(map[string]float64, len(sq.metrics))
for key, metric := range sq.metrics {
stat := make(map[string]float64, len(sq.Metrics))
for key, metric := range sq.Metrics {
stat[key] = metric.GetValue()
}
return stat
}
func (sq *StatsQueue) GetId() string {
return sq.conf.Id
return sq.Conf.Id
}
// Convert data into a struct which can be used in actions based on triggers hit
func (sq *StatsQueue) Triggered(at *ActionTrigger) *StatsQueueTriggered {
return &StatsQueueTriggered{Id: sq.conf.Id, Metrics: sq.getStats(), Trigger: at}
return &StatsQueueTriggered{Id: sq.Conf.Id, Metrics: sq.getStats(), Trigger: at}
}
// Struct to be passed to triggered actions

View File

@@ -27,8 +27,8 @@ import (
func TestStatsQueueInit(t *testing.T) {
sq := NewStatsQueue(&CdrStats{Metrics: []string{ASR, ACC}})
if len(sq.metrics) != 2 {
t.Error("Expected 2 metrics got ", len(sq.metrics))
if len(sq.Metrics) != 2 {
t.Error("Expected 2 metrics got ", len(sq.Metrics))
}
}
@@ -104,106 +104,106 @@ func TestAcceptCdr(t *testing.T) {
MediationRunId: "mri",
Cost: 10,
}
sq.conf = &CdrStats{}
if sq.conf.AcceptCdr(cdr) != true {
sq.Conf = &CdrStats{}
if sq.Conf.AcceptCdr(cdr) != true {
t.Errorf("Should have accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{TOR: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{TOR: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{CdrHost: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{CdrHost: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{CdrSource: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{CdrSource: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{Direction: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{Direction: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{Tenant: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{Tenant: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{Category: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{Category: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{Account: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{Account: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{Subject: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{Subject: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{Supplier: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{Supplier: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{DisconnectCause: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{DisconnectCause: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{RatedAccount: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{RatedAccount: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{RatedSubject: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{RatedSubject: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{DestinationPrefix: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{DestinationPrefix: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{DestinationPrefix: []string{"test", "123"}}
if sq.conf.AcceptCdr(cdr) != true {
sq.Conf = &CdrStats{DestinationPrefix: []string{"test", "123"}}
if sq.Conf.AcceptCdr(cdr) != true {
t.Errorf("Should have accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}}
if sq.conf.AcceptCdr(cdr) != true {
sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}}
if sq.Conf.AcceptCdr(cdr) != true {
t.Errorf("Should have accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.conf.AcceptCdr(cdr) != true {
sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.Conf.AcceptCdr(cdr) != true {
t.Errorf("Should have accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{PddInterval: []time.Duration{8 * time.Second}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{PddInterval: []time.Duration{8 * time.Second}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 7 * time.Second}}
if sq.conf.AcceptCdr(cdr) == true {
sq.Conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 7 * time.Second}}
if sq.Conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 8 * time.Second}}
if sq.conf.AcceptCdr(cdr) != true {
sq.Conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 8 * time.Second}}
if sq.Conf.AcceptCdr(cdr) != true {
t.Errorf("Should have accepted this CDR: %+v", cdr)
}
}
func TestStatsQueueIds(t *testing.T) {
cdrStats := NewStats(ratingStorage)
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
ids := []string{}
if err := cdrStats.GetQueueIds(0, &ids); err != nil {
t.Error("Errorf getting queue ids: ", err)
@@ -216,7 +216,7 @@ func TestStatsQueueIds(t *testing.T) {
}
func TestStatsAppendCdr(t *testing.T) {
cdrStats := NewStats(ratingStorage)
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
cdr := &StoredCdr{
Tenant: "cgrates.org",
Category: "call",
@@ -231,14 +231,14 @@ func TestStatsAppendCdr(t *testing.T) {
if err != nil {
t.Error("Error appending cdr to stats: ", err)
}
if len(cdrStats.queues["CDRST1"].cdrs) != 0 ||
len(cdrStats.queues["CDRST2"].cdrs) != 1 {
t.Error("Error appending cdr to queue: ", len(cdrStats.queues["CDRST2"].cdrs))
if len(cdrStats.queues["CDRST1"].Cdrs) != 0 ||
len(cdrStats.queues["CDRST2"].Cdrs) != 1 {
t.Error("Error appending cdr to queue: ", len(cdrStats.queues["CDRST2"].Cdrs))
}
}
func TestStatsGetValues(t *testing.T) {
cdrStats := NewStats(ratingStorage)
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
cdr := &StoredCdr{
Tenant: "cgrates.org",
Category: "call",
@@ -267,7 +267,7 @@ func TestStatsGetValues(t *testing.T) {
}
func TestStatsReloadQueues(t *testing.T) {
cdrStats := NewStats(ratingStorage)
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
cdr := &StoredCdr{
Tenant: "cgrates.org",
Category: "call",
@@ -299,7 +299,7 @@ func TestStatsReloadQueues(t *testing.T) {
}
func TestStatsReloadQueuesWithDefault(t *testing.T) {
cdrStats := NewStats(ratingStorage)
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
cdrStats.AddQueue(&CdrStats{
Id: utils.META_DEFAULT,
}, nil)
@@ -335,7 +335,7 @@ func TestStatsReloadQueuesWithDefault(t *testing.T) {
}
func TestStatsReloadQueuesWithIds(t *testing.T) {
cdrStats := NewStats(ratingStorage)
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
cdr := &StoredCdr{
Tenant: "cgrates.org",
Category: "call",
@@ -367,7 +367,7 @@ func TestStatsReloadQueuesWithIds(t *testing.T) {
}
func TestStatsResetQueues(t *testing.T) {
cdrStats := NewStats(ratingStorage)
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
cdr := &StoredCdr{
Tenant: "cgrates.org",
Category: "call",
@@ -399,7 +399,7 @@ func TestStatsResetQueues(t *testing.T) {
}
func TestStatsResetQueuesWithIds(t *testing.T) {
cdrStats := NewStats(ratingStorage)
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
cdr := &StoredCdr{
Tenant: "cgrates.org",
Category: "call",

View File

@@ -77,6 +77,8 @@ type AccountingStorage interface {
Storage
GetAccount(string) (*Account, error)
SetAccount(*Account) error
GetCdrStatsQueue(string) (*StatsQueue, error)
SetCdrStatsQueue(*StatsQueue) error
}
type CdrStorage interface {

View File

@@ -262,7 +262,6 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
if historyScribe != nil {
go historyScribe.Record(rp.GetHistoryRecord(), &response)
}
//cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp)
return
}
@@ -293,7 +292,6 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
if historyScribe != nil {
go historyScribe.Record(rpf.GetHistoryRecord(), &response)
}
//cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf)
return
}
@@ -318,7 +316,6 @@ func (ms *MapStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) {
func (ms *MapStorage) SetLCR(lcr *LCR) (err error) {
result, err := ms.ms.Marshal(lcr)
ms.dict[utils.LCR_PREFIX+lcr.GetId()] = result
//cache2go.Cache(LCR_PREFIX+key, lcr)
return
}
@@ -342,7 +339,6 @@ func (ms *MapStorage) GetRpAlias(key string, skipCache bool) (alias string, err
func (ms *MapStorage) SetRpAlias(key, alias string) (err error) {
ms.dict[utils.RP_ALIAS_PREFIX+key] = []byte(alias)
//cache2go.Cache(ALIAS_PREFIX+key, alias)
return
}
@@ -409,7 +405,6 @@ func (ms *MapStorage) GetAccAlias(key string, skipCache bool) (alias string, err
func (ms *MapStorage) SetAccAlias(key, alias string) (err error) {
ms.dict[utils.ACC_ALIAS_PREFIX+key] = []byte(alias)
//cache2go.Cache(ALIAS_PREFIX+key, alias)
return
}
@@ -471,7 +466,6 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
if historyScribe != nil {
go historyScribe.Record(dest.GetHistoryRecord(), &response)
}
//cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest)
return
}
@@ -496,7 +490,6 @@ func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err er
func (ms *MapStorage) SetActions(key string, as Actions) (err error) {
result, err := ms.ms.Marshal(&as)
ms.dict[utils.ACTION_PREFIX+key] = result
//cache2go.Cache(ACTION_PREFIX+key, as)
return
}
@@ -521,7 +514,6 @@ func (ms *MapStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGrou
func (ms *MapStorage) SetSharedGroup(sg *SharedGroup) (err error) {
result, err := ms.ms.Marshal(sg)
ms.dict[utils.SHARED_GROUP_PREFIX+sg.Id] = result
//cache2go.Cache(SHARED_GROUP_PREFIX+key, sg)
return
}
@@ -553,6 +545,22 @@ func (ms *MapStorage) SetAccount(ub *Account) (err error) {
return
}
func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) {
if values, ok := ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key]; ok {
sq = &StatsQueue{}
err = ms.ms.Unmarshal(values, sq)
} else {
return nil, utils.ErrNotFound
}
return
}
func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
result, err := ms.ms.Marshal(sq)
ms.dict[utils.CDR_STATS_QUEUE_PREFIX+sq.GetId()] = result
return
}
func (ms *MapStorage) GetActionPlans(key string) (ats ActionPlans, err error) {
if values, ok := ms.dict[utils.ACTION_TIMING_PREFIX+key]; ok {
err = ms.ms.Unmarshal(values, &ats)

View File

@@ -355,7 +355,6 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) {
response := 0
go historyScribe.Record(rp.GetHistoryRecord(), &response)
}
//cache2go.Cache(utils.RATING_PLAN_PREFIX+rp.Id, rp)
return
}
@@ -385,7 +384,6 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
response := 0
go historyScribe.Record(rpf.GetHistoryRecord(), &response)
}
//cache2go.Cache(utils.RATING_PROFILE_PREFIX+rpf.Id, rpf)
return
}
@@ -506,7 +504,6 @@ func (rs *RedisStorage) GetAccAlias(key string, skipCache bool) (alias string, e
// Adds one alias for one account
func (rs *RedisStorage) SetAccAlias(key, alias string) (err error) {
err = rs.db.Set(utils.ACC_ALIAS_PREFIX+key, []byte(alias))
//cache2go.Cache(ALIAS_PREFIX+key, alias)
return
}
@@ -600,7 +597,6 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) {
response := 0
go historyScribe.Record(dest.GetHistoryRecord(), &response)
}
//cache2go.Cache(utils.DESTINATION_PREFIX+dest.Id, dest)
return
}
@@ -624,7 +620,6 @@ func (rs *RedisStorage) GetActions(key string, skipCache bool) (as Actions, err
func (rs *RedisStorage) SetActions(key string, as Actions) (err error) {
result, err := rs.ms.Marshal(&as)
err = rs.db.Set(utils.ACTION_PREFIX+key, result)
// cache2go.Cache(utils.ACTION_PREFIX+key, as)
return
}
@@ -648,7 +643,6 @@ func (rs *RedisStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGr
func (rs *RedisStorage) SetSharedGroup(sg *SharedGroup) (err error) {
result, err := rs.ms.Marshal(sg)
err = rs.db.Set(utils.SHARED_GROUP_PREFIX+sg.Id, result)
//cache2go.Cache(utils.SHARED_GROUP_PREFIX+sg.Id, sg)
return
}
@@ -680,6 +674,22 @@ func (rs *RedisStorage) SetAccount(ub *Account) (err error) {
return
}
func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) {
var values []byte
if values, err = rs.db.Get(utils.CDR_STATS_QUEUE_PREFIX + key); err == nil {
sq = &StatsQueue{}
err = rs.ms.Unmarshal(values, sq)
}
return
}
func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
result, err := rs.ms.Marshal(sq)
err = rs.db.Set(utils.CDR_STATS_QUEUE_PREFIX+sq.GetId(), result)
return
}
func (rs *RedisStorage) GetActionPlans(key string) (ats ActionPlans, err error) {
var values []byte
if values, err = rs.db.Get(utils.ACTION_TIMING_PREFIX + key); err == nil {

View File

@@ -324,6 +324,7 @@ type TPCdrStats struct {
type TPCdrStat struct {
QueueLength string
TimeWindow string
SaveInterval string
Metrics string
SetupInterval string
TORs string

View File

@@ -164,6 +164,8 @@ const (
DESTINATION_PREFIX = "dst_"
LCR_PREFIX = "lcr_"
DERIVEDCHARGERS_PREFIX = "dcs_"
CDR_STATS_QUEUE_PREFIX = "csq_"
CDR_STATS_PREFIX = "cst_"
TEMP_DESTINATION_PREFIX = "tmp_"
LOG_CALL_COST_PREFIX = "cco_"
LOG_ACTION_TIMMING_PREFIX = "ltm_"
@@ -180,7 +182,6 @@ const (
CREATE_CDRS_TABLES_SQL = "create_cdrs_tables.sql"
CREATE_TARIFFPLAN_TABLES_SQL = "create_tariffplan_tables.sql"
TEST_SQL = "TEST_SQL"
CDR_STATS_PREFIX = "cst_"
DESTINATIONS_LOAD_THRESHOLD = 0.1
CONSTANT = "constant"
FILLER = "filler"