From 1091f5829704295592a0c9d2247f4efd69d155c5 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 22 Jul 2014 22:10:39 +0300 Subject: [PATCH] cdr stats csv load (without tests) --- apier/apier.go | 3 +- apier/tpdestinations.go | 2 +- engine/loader_csv.go | 151 ++++++++++++++++++++++++-------- engine/loader_csv_test.go | 14 +-- engine/loader_db.go | 18 ++-- engine/loader_helpers.go | 135 ++++++++++++++++++++++++++++ engine/loader_local_test.go | 1 + engine/stats.go | 24 +++++ engine/storage_interface.go | 6 +- engine/storage_map.go | 29 +++++- engine/storage_redis.go | 31 +++++++ engine/storage_sql.go | 18 ++-- general_tests/costs1_test.go | 5 +- general_tests/datachrg1_test.go | 2 +- general_tests/ddazmbl1_test.go | 3 +- general_tests/ddazmbl2_test.go | 3 +- general_tests/ddazmbl3_test.go | 3 +- general_tests/smschrg1_test.go | 2 +- utils/consts.go | 4 +- 19 files changed, 378 insertions(+), 76 deletions(-) diff --git a/apier/apier.go b/apier/apier.go index 7b0ae6463..51fe0db8b 100644 --- a/apier/apier.go +++ b/apier/apier.go @@ -708,7 +708,8 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, path.Join(attrs.FolderPath, utils.ACTION_PLANS_CSV), path.Join(attrs.FolderPath, utils.ACTION_TRIGGERS_CSV), path.Join(attrs.FolderPath, utils.ACCOUNT_ACTIONS_CSV), - path.Join(attrs.FolderPath, utils.DERIVED_CHARGERS_CSV)) + path.Join(attrs.FolderPath, utils.DERIVED_CHARGERS_CSV), + path.Join(attrs.FolderPath, utils.CDR_STATS_CSV)) if err := loader.LoadAll(); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } diff --git a/apier/tpdestinations.go b/apier/tpdestinations.go index 951eef144..7e9d22b97 100644 --- a/apier/tpdestinations.go +++ b/apier/tpdestinations.go @@ -53,7 +53,7 @@ func (self *ApierV1) GetTPDestination(attrs AttrGetTPDestination, reply *utils.T } else if len(dsts) == 0 { return errors.New(utils.ERR_NOT_FOUND) } else { - *reply = utils.TPDestination{attrs.TPid, dsts[0].Id, dsts[0].Prefixes} + *reply = utils.TPDestination{attrs.TPid, dsts[attrs.DestinationId].Id, dsts[attrs.DestinationId].Prefixes} } return nil } diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 2e657448e..7d4d37260 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -26,6 +26,7 @@ import ( "os" "strconv" "strings" + "time" "github.com/cgrates/cgrates/utils" ) @@ -43,7 +44,7 @@ type CSVReader struct { accountActions map[string]*Account dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed dirtyAccAliases []*TenantAccount // used to clean aliases that might have changed - destinations []*Destination + destinations map[string]*Destination timings map[string]*utils.TPTiming rates map[string]*utils.TPRate destinationRates map[string]*utils.TPDestinationRate @@ -52,14 +53,15 @@ type CSVReader struct { sharedGroups map[string]*SharedGroup lcrs map[string]*LCR derivedChargers map[string]utils.DerivedChargers + cdrStats map[string]*CdrStats // file names destinationsFn, ratesFn, destinationratesFn, timingsFn, destinationratetimingsFn, ratingprofilesFn, - sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn string + sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn string } func NewFileCSVReader(dataStorage RatingStorage, accountingStorage AccountingStorage, sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, - actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn string) *CSVReader { + actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn string) *CSVReader { c := new(CSVReader) c.sep = sep c.dataStorage = dataStorage @@ -71,25 +73,27 @@ func NewFileCSVReader(dataStorage RatingStorage, accountingStorage AccountingSto c.rates = make(map[string]*utils.TPRate) c.destinationRates = make(map[string]*utils.TPDestinationRate) c.timings = make(map[string]*utils.TPTiming) + c.destinations = make(map[string]*Destination) c.ratingPlans = make(map[string]*RatingPlan) c.ratingProfiles = make(map[string]*RatingProfile) c.sharedGroups = make(map[string]*SharedGroup) c.lcrs = make(map[string]*LCR) c.derivedChargers = make(map[string]utils.DerivedChargers) + c.cdrStats = make(map[string]*CdrStats) c.readerFunc = openFileCSVReader c.rpAliases = make(map[string]string) c.accAliases = make(map[string]string) c.destinationsFn, c.timingsFn, c.ratesFn, c.destinationratesFn, c.destinationratetimingsFn, c.ratingprofilesFn, - c.sharedgroupsFn, c.lcrFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn = destinationsFn, timingsFn, - ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn + c.sharedgroupsFn, c.lcrFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn, c.cdrStatsFn = destinationsFn, timingsFn, + ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn return c } func NewStringCSVReader(dataStorage RatingStorage, accountingStorage AccountingStorage, sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, - actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn string) *CSVReader { + actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn string) *CSVReader { c := NewFileCSVReader(dataStorage, accountingStorage, sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, - ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn) + ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn) c.readerFunc = openStringCSVReader return c } @@ -171,6 +175,8 @@ func (csvr *CSVReader) ShowStatistics() { log.Print("Derived Chargers: ", len(csvr.derivedChargers)) // lcr rules log.Print("LCR rules: ", len(csvr.lcrs)) + // cdr stats + log.Print("CDR stats: ", len(csvr.cdrStats)) } func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { @@ -320,6 +326,18 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print(key) } } + if verbose { + log.Print("CDR Stats Queues") + } + for _, sq := range csvr.cdrStats { + err = accountingStorage.SetCdrStats(sq) + if err != nil { + return err + } + if verbose { + log.Print(sq.Id) + } + } return } @@ -336,15 +354,10 @@ func (csvr *CSVReader) LoadDestinations() (err error) { for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() { tag := record[0] var dest *Destination - for _, d := range csvr.destinations { - if d.Id == tag { - dest = d - break - } - } - if dest == nil { + var found bool + if dest, found = csvr.destinations[tag]; !found { dest = &Destination{Id: tag} - csvr.destinations = append(csvr.destinations, dest) + csvr.destinations[tag] = dest } dest.AddPrefix(record[1]) } @@ -427,12 +440,7 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) { } destinationExists := record[1] == utils.ANY if !destinationExists { - for _, d := range csvr.destinations { - if d.Id == record[1] { - destinationExists = true - break - } - } + _, destinationExists = csvr.destinations[record[1]] } if !destinationExists && csvr.dataStorage != nil { if destinationExists, err = csvr.dataStorage.HasData(DESTINATION_PREFIX, record[1]); err != nil { @@ -753,26 +761,49 @@ func (csvr *CSVReader) LoadActionTriggers() (err error) { tag := record[0] value, err := strconv.ParseFloat(record[4], 64) if err != nil { - return fmt.Errorf("Could not parse action trigger value: %v", err) + return fmt.Errorf("Could not parse action trigger value (%v): %v", record[4], err) } recurrent, err := strconv.ParseBool(record[5]) if err != nil { - return fmt.Errorf("Could not parse action trigger recurrent flag: %v", err) + return fmt.Errorf("Could not parse action trigger recurrent flag (%v): %v", record[5], err) } - weight, err := strconv.ParseFloat(record[8], 64) + minSleep, err := time.ParseDuration(record[6]) if err != nil { - return fmt.Errorf("Could not parse action trigger weight: %v", err) + return fmt.Errorf("Could not parse action trigger MinSleep (%v): %v", record[6], err) } + balanceWeight, err := strconv.ParseFloat(record[8], 64) + if record[8] != "" && err != nil { + return fmt.Errorf("Could not parse action trigger BalanceWeight (%v): %v", record[8], err) + } + balanceExp, err := utils.ParseTimeDetectLayout(record[9]) + if record[9] != "" && err != nil { + return fmt.Errorf("Could not parse action trigger BalanceExpirationDate (%v): %v", record[9], err) + } + minQI, err := strconv.Atoi(record[12]) + if record[12] != "" && err != nil { + return fmt.Errorf("Could not parse action trigger MinQueuedItems (%v): %v", record[12], err) + } + weight, err := strconv.ParseFloat(record[14], 64) + if err != nil { + return fmt.Errorf("Could not parse action trigger weight (%v): %v", record[14], err) + } + at := &ActionTrigger{ - Id: utils.GenUUID(), - BalanceType: record[1], - Direction: record[2], - ThresholdType: record[3], - ThresholdValue: value, - Recurrent: recurrent, - DestinationId: record[6], - ActionsId: record[7], - Weight: weight, + Id: utils.GenUUID(), + BalanceType: record[1], + Direction: record[2], + ThresholdType: record[3], + ThresholdValue: value, + Recurrent: recurrent, + MinSleep: minSleep, + DestinationId: record[7], + BalanceWeight: balanceWeight, + BalanceExpirationDate: balanceExp, + BalanceRatingSubject: record[10], + BalanceSharedGroup: record[11], + MinQueuedItems: minQI, + ActionsId: record[13], + Weight: weight, } csvr.actionsTriggers[tag] = append(csvr.actionsTriggers[tag], at) } @@ -806,7 +837,7 @@ func (csvr *CSVReader) LoadAccountActions() (err error) { } aTriggers, exists := csvr.actionsTriggers[record[4]] if record[4] != "" && !exists { - // only return error if there was something ther for the tag + // only return error if there was something there for the tag return fmt.Errorf("Could not get action triggers for tag %s", record[4]) } ub := &Account{ @@ -882,6 +913,37 @@ func (csvr *CSVReader) LoadDerivedChargers() (err error) { return } +func (csvr *CSVReader) LoadCdrStats() (err error) { + csvReader, fp, err := csvr.readerFunc(csvr.timingsFn, csvr.sep, utils.TIMINGS_NRCOLS) + if err != nil { + log.Print("Could not load cdr stats file: ", err) + // allow writing of the other values + return nil + } + if fp != nil { + defer fp.Close() + } + for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() { + tag := record[0] + if _, exists := csvr.cdrStats[tag]; exists { + log.Print("Warning: duplicate cdr stats found: ", tag) + } + var cs *CdrStats + var exists bool + if cs, exists = csvr.cdrStats[tag]; !exists { + cs = &CdrStats{} + } + triggers, exists := csvr.actionsTriggers[record[18]] + if record[18] != "" && !exists { + // only return error if there was something there for the tag + return fmt.Errorf("Could not get action triggers for cdr stats id %s: %s", cs.Id, record[18]) + } + UpdateCdrStats(cs, triggers, record...) + csvr.cdrStats[tag] = cs + } + return +} + // Automated loading func (csvr *CSVReader) LoadAll() error { var err error @@ -921,6 +983,9 @@ func (csvr *CSVReader) LoadAll() error { if err = csvr.LoadDerivedChargers(); err != nil { return err } + if err = csvr.LoadCdrStats(); err != nil { + return err + } return nil } @@ -929,8 +994,10 @@ func (csvr *CSVReader) GetLoadedIds(categ string) ([]string, error) { switch categ { case DESTINATION_PREFIX: ids := make([]string, len(csvr.destinations)) - for idx, dst := range csvr.destinations { - ids[idx] = dst.Id + i := 0 + for k := range csvr.destinations { + ids[i] = k + i++ } return ids, nil case RATING_PLAN_PREFIX: @@ -981,7 +1048,7 @@ func (csvr *CSVReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil - case DERIVEDCHARGERS_PREFIX: // aliases + case DERIVEDCHARGERS_PREFIX: // derived chargers keys := make([]string, len(csvr.derivedChargers)) i := 0 for k := range csvr.derivedChargers { @@ -989,6 +1056,14 @@ func (csvr *CSVReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil + case CDR_STATS_PREFIX: // cdr stats + keys := make([]string, len(csvr.cdrStats)) + i := 0 + for k := range csvr.cdrStats { + keys[i] = k + i++ + } + return keys, nil } return nil, errors.New("Unsupported category") } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 6393a4800..69b07ea9d 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -161,11 +161,11 @@ TOPUP_SHARED10_AT,SE10,ASAP,10 TOPUP_EMPTY_AT,EE0,ASAP,10 ` actionTriggers = ` -STANDARD_TRIGGER,*voice,*out,*min_counter,10,false,GERMANY_O2,SOME_1,10 -STANDARD_TRIGGER,*voice,*out,*max_balance,200,false,GERMANY,SOME_2,10 -STANDARD_TRIGGERS,*monetary,*out,*min_balance,2,false,,LOG_WARNING,10 -STANDARD_TRIGGERS,*monetary,*out,*max_balance,20,false,,LOG_WARNING,10 -STANDARD_TRIGGERS,*monetary,*out,*max_counter,5,false,FS_USERS,LOG_WARNING,10 +STANDARD_TRIGGER,*voice,*out,*min_counter,10,false,0,GERMANY_O2,,,,,,SOME_1,10 +STANDARD_TRIGGER,*voice,*out,*max_balance,200,false,0,GERMANY,,,,,,SOME_2,10 +STANDARD_TRIGGERS,*monetary,*out,*min_balance,2,false,0,,,,,,,LOG_WARNING,10 +STANDARD_TRIGGERS,*monetary,*out,*max_balance,20,false,0,,,,,,,LOG_WARNING,10 +STANDARD_TRIGGERS,*monetary,*out,*max_counter,5,false,0,FS_USERS,,,,,,LOG_WARNING,10 ` accountActions = ` vdf,minitsboy;a1;a2,*out,MORE_MINUTES,STANDARD_TRIGGER @@ -181,6 +181,8 @@ vdf,emptyY,*out,TOPUP_EMPTY_AT, *out,cgrates.org,call,dan,dan,extra1,^filteredHeader1/filterValue1/,^prepaid,,,,rif,rif,,,, *out,cgrates.org,call,dan,dan,extra2,,,,,,ivo,ivo,,,, *out,cgrates.org,call,dan,*any,extra1,,,,,,rif2,rif2,,,, +` + cdrStats = ` ` ) @@ -188,7 +190,7 @@ var csvr *CSVReader func init() { csvr = NewStringCSVReader(dataStorage, accountingStorage, ',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionTimings, actionTriggers, accountActions, derivedCharges) + sharedGroups, lcrs, actions, actionTimings, actionTriggers, accountActions, derivedCharges, cdrStats) csvr.LoadDestinations() csvr.LoadTimings() csvr.LoadRates() diff --git a/engine/loader_db.go b/engine/loader_db.go index 4863f92c8..49f5eb56d 100644 --- a/engine/loader_db.go +++ b/engine/loader_db.go @@ -38,7 +38,7 @@ type DbReader struct { accountActions map[string]*Account dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed dirtyAccAliases []*TenantAccount // used to clean aliases that might have changed - destinations []*Destination + destinations map[string]*Destination rpAliases map[string]string accAliases map[string]string timings map[string]*utils.TPTiming @@ -49,6 +49,7 @@ type DbReader struct { sharedGroups map[string]*SharedGroup lcrs map[string]*LCR derivedChargers map[string]utils.DerivedChargers + cdrStats map[string]*CdrStats } func NewDbReader(storDB LoadStorage, ratingDb RatingStorage, accountDb AccountingStorage, tpid string) *DbReader { @@ -67,6 +68,8 @@ func NewDbReader(storDB LoadStorage, ratingDb RatingStorage, accountDb Accountin c.rpAliases = make(map[string]string) c.accAliases = make(map[string]string) c.accountActions = make(map[string]*Account) + c.destinations = make(map[string]*Destination) + c.cdrStats = make(map[string]*CdrStats) c.derivedChargers = make(map[string]utils.DerivedChargers) return c } @@ -290,12 +293,7 @@ func (dbr *DbReader) LoadDestinationRates() (err error) { dr.Rate = rate destinationExists := dr.DestinationId == utils.ANY if !destinationExists { - for _, d := range dbr.destinations { - if d.Id == dr.DestinationId { - destinationExists = true - break - } - } + _, destinationExists = dbr.destinations[dr.DestinationId] } if !destinationExists { if dbExists, err := dbr.dataDb.HasData(DESTINATION_PREFIX, dr.DestinationId); err != nil { @@ -824,8 +822,10 @@ func (dbr *DbReader) GetLoadedIds(categ string) ([]string, error) { switch categ { case DESTINATION_PREFIX: ids := make([]string, len(dbr.destinations)) - for idx, dst := range dbr.destinations { - ids[idx] = dst.Id + i := 0 + for k := range dbr.destinations { + ids[i] = k + i++ } return ids, nil case RATING_PLAN_PREFIX: diff --git a/engine/loader_helpers.go b/engine/loader_helpers.go index 896c2a662..1f7c38e80 100644 --- a/engine/loader_helpers.go +++ b/engine/loader_helpers.go @@ -29,6 +29,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/cgrates/cgrates/utils" ) @@ -95,6 +96,140 @@ func NewTiming(timingInfo ...string) (rt *utils.TPTiming) { return } +func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ...string) { + cs = &CdrStats{} + cs.Id = record[0] + if record[1] != "" { + if qi, err := strconv.Atoi(record[1]); err == nil { + cs.QueuedItems = qi + } else { + log.Printf("Error parsing QueuedItems %v for cdrs stats %v", record[1], cs.Id) + } + } + if record[2] != "" { + if d, err := time.ParseDuration(record[2]); err == nil { + cs.TimeWindow = d + } else { + log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[2], cs.Id) + } + } + if record[3] != "" { + cs.Metrics = append(cs.Metrics, record[3]) + } + if record[4] != "" { + times := strings.Split(record[4], utils.INFIELD_SEP) + if len(times) > 0 { + if sTime, err := utils.ParseTimeDetectLayout(times[0]); err == nil { + if len(cs.SetupInterval) < 1 { + cs.SetupInterval = append(cs.SetupInterval, sTime) + } else { + cs.SetupInterval[0] = sTime + } + } else { + log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[4], cs.Id) + } + } + if len(times) > 1 { + if eTime, err := utils.ParseTimeDetectLayout(times[1]); err == nil { + if len(cs.SetupInterval) < 2 { + cs.SetupInterval = append(cs.SetupInterval, eTime) + } else { + cs.SetupInterval[1] = eTime + } + } else { + log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[4], cs.Id) + } + } + } + if record[5] != "" { + cs.TOR = append(cs.TOR, record[5]) + } + if record[6] != "" { + cs.CdrHost = append(cs.CdrHost, record[6]) + } + if record[7] != "" { + cs.CdrSource = append(cs.CdrSource, record[7]) + } + if record[8] != "" { + cs.ReqType = append(cs.ReqType, record[8]) + } + if record[9] != "" { + cs.Direction = append(cs.Direction, record[9]) + } + if record[10] != "" { + cs.Tenant = append(cs.Tenant, record[10]) + } + if record[11] != "" { + cs.Category = append(cs.Category, record[11]) + } + if record[12] != "" { + cs.Account = append(cs.Account, record[12]) + } + if record[13] != "" { + cs.Subject = append(cs.Subject, record[13]) + } + if record[14] != "" { + cs.DestinationPrefix = append(cs.DestinationPrefix, record[14]) + } + if record[15] != "" { + durations := strings.Split(record[15], utils.INFIELD_SEP) + if len(durations) > 0 { + if sDuration, err := time.ParseDuration(durations[0]); err == nil { + if len(cs.UsageInterval) < 1 { + cs.UsageInterval = append(cs.UsageInterval, sDuration) + } else { + cs.UsageInterval[0] = sDuration + } + } else { + log.Printf("Error parsing UsageInterval %v for cdrs stats %v", record[15], cs.Id) + } + } + if len(durations) > 1 { + if eDuration, err := time.ParseDuration(durations[1]); err == nil { + if len(cs.UsageInterval) < 2 { + cs.UsageInterval = append(cs.UsageInterval, eDuration) + } else { + cs.UsageInterval[1] = eDuration + } + } else { + log.Printf("Error parsing UsageInterval %v for cdrs stats %v", record[15], cs.Id) + } + } + } + if record[16] != "" { + cs.MediationRunIds = append(cs.MediationRunIds, record[16]) + } + if record[17] != "" { + costs := strings.Split(record[17], utils.INFIELD_SEP) + if len(costs) > 0 { + if sCost, err := strconv.ParseFloat(costs[0], 64); err == nil { + if len(cs.CostInterval) < 1 { + cs.CostInterval = append(cs.CostInterval, sCost) + } else { + cs.CostInterval[0] = sCost + } + } else { + log.Printf("Error parsing CostInterval %v for cdrs stats %v", record[17], cs.Id) + } + } + if len(costs) > 1 { + if eCost, err := strconv.ParseFloat(costs[1], 64); err == nil { + if len(cs.CostInterval) < 2 { + cs.CostInterval = append(cs.CostInterval, eCost) + } else { + cs.CostInterval[1] = eCost + } + } else { + log.Printf("Error parsing CostInterval %v for cdrs stats %v", record[17], cs.Id) + } + } + } + + if triggers != nil { + cs.Triggers = triggers + } +} + func NewRatingPlan(timing *utils.TPTiming, weight string) (drt *utils.TPRatingPlanBinding) { w, err := strconv.ParseFloat(weight, 64) if err != nil { diff --git a/engine/loader_local_test.go b/engine/loader_local_test.go index 8f325698b..7baa27baa 100644 --- a/engine/loader_local_test.go +++ b/engine/loader_local_test.go @@ -133,6 +133,7 @@ func TestLoadFromCSV(t *testing.T) { path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ACTION_TRIGGERS_CSV), path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ACCOUNT_ACTIONS_CSV), path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.DERIVED_CHARGERS_CSV), + path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.CDR_STATS_CSV), ) if err = loader.LoadDestinations(); err != nil { diff --git a/engine/stats.go b/engine/stats.go index b9b026a87..71b1f23a8 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -40,6 +40,9 @@ type Stats struct { func (s *Stats) AddQueue(sq *StatsQueue, out *int) error { s.mux.Lock() defer s.mux.Unlock() + if s.queues == nil { + s.queues = make(map[string]*StatsQueue) + } s.queues[sq.conf.Id] = sq return nil } @@ -54,6 +57,27 @@ func (s *Stats) GetValues(sqID string, values *map[string]float64) error { return errors.New("Not Found") } +// change the xisting ones +// add new ones +// delete the ones missing from the new list +func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { + s.mux.Lock() + defer s.mux.Unlock() + oldQueues := s.queues + s.queues = make(map[string]*StatsQueue) + for _, cs := range css { + var sq *StatsQueue + var existing bool + if sq, existing = oldQueues[cs.Id]; existing { + sq.conf = cs + } else { + sq = NewStatsQueue(cs) + } + s.queues[cs.Id] = sq + } + return nil +} + func (s *Stats) AppendCDR(cdr *utils.StoredCdr, out *int) error { s.mux.RLock() defer s.mux.RUnlock() diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 6774c86f4..48548b0e7 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -43,6 +43,7 @@ const ( DESTINATION_PREFIX = "dst_" LCR_PREFIX = "lcr_" DERIVEDCHARGERS_PREFIX = "dcs_" + CDR_STATS_PREFIX = "cst_" TEMP_DESTINATION_PREFIX = "tmp_" LOG_CALL_COST_PREFIX = "cco_" LOG_ACTION_TIMMING_PREFIX = "ltm_" @@ -106,6 +107,9 @@ type AccountingStorage interface { GetAllActionTimings() (map[string]ActionPlan, error) GetDerivedChargers(string, bool) (utils.DerivedChargers, error) SetDerivedChargers(string, utils.DerivedChargers) error + SetCdrStats(*CdrStats) error + GetCdrStats(string) (*CdrStats, error) + GetAllCdrStats() ([]*CdrStats, error) } type CdrStorage interface { @@ -138,7 +142,7 @@ type LoadStorage interface { GetTpTimings(string, string) (map[string]*utils.TPTiming, error) SetTPDestination(string, *Destination) error - GetTpDestinations(string, string) ([]*Destination, error) + GetTpDestinations(string, string) (map[string]*Destination, error) SetTPRates(string, map[string][]*utils.RateSlot) error GetTpRates(string, string) (map[string]*utils.TPRate, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 91ad10684..606e7ce83 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -485,7 +485,7 @@ func (ms *MapStorage) SetActionTimings(key string, ats ActionPlan) (err error) { func (ms *MapStorage) GetAllActionTimings() (ats map[string]ActionPlan, err error) { ats = make(map[string]ActionPlan) for key, value := range ms.dict { - if !strings.Contains(key, ACTION_TIMING_PREFIX) { + if !strings.HasPrefix(key, ACTION_TIMING_PREFIX) { continue } var tempAts ActionPlan @@ -519,6 +519,33 @@ func (ms *MapStorage) SetDerivedChargers(key string, dcs utils.DerivedChargers) return err } +func (ms *MapStorage) SetCdrStats(cs *CdrStats) error { + result, err := ms.ms.Marshal(cs) + ms.dict[CDR_STATS_PREFIX+cs.Id] = result + return err +} + +func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) { + if values, ok := ms.dict[key]; ok { + err = ms.ms.Unmarshal(values, &cs) + } else { + return nil, errors.New(utils.ERR_NOT_FOUND) + } + return +} + +func (ms *MapStorage) GetAllCdrStats() (css []*CdrStats, err error) { + for key, value := range ms.dict { + if !strings.HasPrefix(key, CDR_STATS_PREFIX) { + continue + } + var cs *CdrStats + err = ms.ms.Unmarshal(value, cs) + css = append(css, cs) + } + return +} + func (ms *MapStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) error { result, err := ms.ms.Marshal(cc) ms.dict[LOG_CALL_COST_PREFIX+source+runid+"_"+cgrid] = result diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 8e0d20e1f..b3d7d826c 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -664,6 +664,37 @@ func (rs *RedisStorage) SetDerivedChargers(key string, dcs utils.DerivedChargers return err } +func (rs *RedisStorage) SetCdrStats(cs *CdrStats) error { + marshaled, err := rs.ms.Marshal(cs) + err = rs.db.Set(CDR_STATS_PREFIX+cs.Id, marshaled) + return err +} + +func (rs *RedisStorage) GetCdrStats(key string) (cs *CdrStats, err error) { + var values []byte + if values, err = rs.db.Get(key); err == nil { + err = rs.ms.Unmarshal(values, &cs) + } + return +} + +func (rs *RedisStorage) GetAllCdrStats() (css []*CdrStats, err error) { + keys, err := rs.db.Keys(CDR_STATS_PREFIX + "*") + if err != nil { + return nil, err + } + for _, key := range keys { + value, err := rs.db.Get(key) + if err != nil { + continue + } + var cs *CdrStats + err = rs.ms.Unmarshal(value, cs) + css = append(css, cs) + } + return +} + func (rs *RedisStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) (err error) { var result []byte result, err = rs.ms.Marshal(cc) diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 276d71286..7120a367b 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -23,12 +23,13 @@ import ( "database/sql" "encoding/json" "fmt" - "github.com/go-sql-driver/mysql" "io/ioutil" "strconv" "strings" "time" + "github.com/go-sql-driver/mysql" + "github.com/cgrates/cgrates/utils" ) @@ -949,8 +950,8 @@ func (self *SQLStorage) RemStoredCdrs(cgrIds []string) error { return nil } -func (self *SQLStorage) GetTpDestinations(tpid, tag string) ([]*Destination, error) { - var dests []*Destination +func (self *SQLStorage) GetTpDestinations(tpid, tag string) (map[string]*Destination, error) { + var dests map[string]*Destination q := fmt.Sprintf("SELECT * FROM %s WHERE tpid='%s'", utils.TBL_TP_DESTINATIONS, tpid) if len(tag) != 0 { q += fmt.Sprintf(" AND id='%s'", tag) @@ -967,15 +968,10 @@ func (self *SQLStorage) GetTpDestinations(tpid, tag string) ([]*Destination, err return nil, err } var dest *Destination - for _, d := range dests { - if d.Id == tag { - dest = d - break - } - } - if dest == nil { + var found bool + if dest, found = dests[tag]; !found { dest = &Destination{Id: tag} - dests = append(dests, dest) + dests[tag] = dest } dest.AddPrefix(prefix) } diff --git a/general_tests/costs1_test.go b/general_tests/costs1_test.go index e4e70847d..c282db7c8 100644 --- a/general_tests/costs1_test.go +++ b/general_tests/costs1_test.go @@ -19,10 +19,11 @@ along with this program. If not, see package general_tests import ( + "testing" + "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "testing" ) func TestCosts1SetStorage(t *testing.T) { @@ -53,7 +54,7 @@ RP_SMS1,DR_SMS_1,ALWAYS,10` *out,cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1, *out,cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,` csvr := engine.NewStringCSVReader(ratingDb, acntDb, ',', dests, timings, rates, destinationRates, ratingPlans, ratingProfiles, - "", "", "", "", "", "", "") + "", "", "", "", "", "", "", "") if err := csvr.LoadTimings(); err != nil { t.Fatal(err) } diff --git a/general_tests/datachrg1_test.go b/general_tests/datachrg1_test.go index 3cb36395b..a058c8766 100644 --- a/general_tests/datachrg1_test.go +++ b/general_tests/datachrg1_test.go @@ -45,7 +45,7 @@ DR_DATA_2,*any,RT_DATA_1c,*up,4` RP_DATA1,DR_DATA_2,TM2,10` ratingProfiles := `*out,cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1,` csvr := engine.NewStringCSVReader(ratingDb, acntDb, ',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles, - "", "", "", "", "", "", "") + "", "", "", "", "", "", "", "") if err := csvr.LoadTimings(); err != nil { t.Fatal(err) } diff --git a/general_tests/ddazmbl1_test.go b/general_tests/ddazmbl1_test.go index d2f752a49..68308e348 100644 --- a/general_tests/ddazmbl1_test.go +++ b/general_tests/ddazmbl1_test.go @@ -59,8 +59,9 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` actionTriggers := `` accountActions := `cgrates.org,12344,*out,TOPUP10_AT,` derivedCharges := `` + cdrStats := `` csvr := engine.NewStringCSVReader(ratingDb, acntDb, ',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges) + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats) if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } diff --git a/general_tests/ddazmbl2_test.go b/general_tests/ddazmbl2_test.go index 6639e2d5d..ab397772b 100644 --- a/general_tests/ddazmbl2_test.go +++ b/general_tests/ddazmbl2_test.go @@ -59,8 +59,9 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` actionTriggers := `` accountActions := `cgrates.org,12345,*out,TOPUP10_AT,` derivedCharges := `` + cdrStats := `` csvr := engine.NewStringCSVReader(ratingDb2, acntDb2, ',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges) + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats) if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } diff --git a/general_tests/ddazmbl3_test.go b/general_tests/ddazmbl3_test.go index 799fc7e8c..6e1c908cd 100644 --- a/general_tests/ddazmbl3_test.go +++ b/general_tests/ddazmbl3_test.go @@ -57,8 +57,9 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10` actionTriggers := `` accountActions := `cgrates.org,12346,*out,TOPUP10_AT,` derivedCharges := `` + cdrStats := `` csvr := engine.NewStringCSVReader(ratingDb3, acntDb3, ',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges) + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats) if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } diff --git a/general_tests/smschrg1_test.go b/general_tests/smschrg1_test.go index 08f573e09..c078214a1 100644 --- a/general_tests/smschrg1_test.go +++ b/general_tests/smschrg1_test.go @@ -41,7 +41,7 @@ func TestSMSLoadCsvTpSmsChrg1(t *testing.T) { ratingPlans := `RP_SMS1,DR_SMS_1,ALWAYS,10` ratingProfiles := `*out,cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,` csvr := engine.NewStringCSVReader(ratingDb, acntDb, ',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles, - "", "", "", "", "", "", "") + "", "", "", "", "", "", "", "") if err := csvr.LoadTimings(); err != nil { t.Fatal(err) } diff --git a/utils/consts.go b/utils/consts.go index f410e311a..d5a6424df 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -50,6 +50,7 @@ const ( ACTION_TRIGGERS_CSV = "ActionTriggers.csv" ACCOUNT_ACTIONS_CSV = "AccountActions.csv" DERIVED_CHARGERS_CSV = "DerivedChargers.csv" + CDR_STATS_CSV = "CdrStats.csv" TIMINGS_NRCOLS = 6 DESTINATIONS_NRCOLS = 2 RATES_NRCOLS = 6 @@ -60,9 +61,10 @@ const ( LCRS_NRCOLS = 9 ACTIONS_NRCOLS = 12 ACTION_PLANS_NRCOLS = 4 - ACTION_TRIGGERS_NRCOLS = 9 + ACTION_TRIGGERS_NRCOLS = 15 ACCOUNT_ACTIONS_NRCOLS = 5 DERIVED_CHARGERS_NRCOLS = 17 + CDR_STATS_NRCOLS = 19 ROUNDING_UP = "*up" ROUNDING_MIDDLE = "*middle" ROUNDING_DOWN = "*down"