mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
cdr stats csv load (without tests)
This commit is contained in:
@@ -708,7 +708,8 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
|
||||
path.Join(attrs.FolderPath, utils.ACTION_PLANS_CSV),
|
||||
path.Join(attrs.FolderPath, utils.ACTION_TRIGGERS_CSV),
|
||||
path.Join(attrs.FolderPath, utils.ACCOUNT_ACTIONS_CSV),
|
||||
path.Join(attrs.FolderPath, utils.DERIVED_CHARGERS_CSV))
|
||||
path.Join(attrs.FolderPath, utils.DERIVED_CHARGERS_CSV),
|
||||
path.Join(attrs.FolderPath, utils.CDR_STATS_CSV))
|
||||
if err := loader.LoadAll(); err != nil {
|
||||
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ func (self *ApierV1) GetTPDestination(attrs AttrGetTPDestination, reply *utils.T
|
||||
} else if len(dsts) == 0 {
|
||||
return errors.New(utils.ERR_NOT_FOUND)
|
||||
} else {
|
||||
*reply = utils.TPDestination{attrs.TPid, dsts[0].Id, dsts[0].Prefixes}
|
||||
*reply = utils.TPDestination{attrs.TPid, dsts[attrs.DestinationId].Id, dsts[attrs.DestinationId].Prefixes}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
@@ -43,7 +44,7 @@ type CSVReader struct {
|
||||
accountActions map[string]*Account
|
||||
dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed
|
||||
dirtyAccAliases []*TenantAccount // used to clean aliases that might have changed
|
||||
destinations []*Destination
|
||||
destinations map[string]*Destination
|
||||
timings map[string]*utils.TPTiming
|
||||
rates map[string]*utils.TPRate
|
||||
destinationRates map[string]*utils.TPDestinationRate
|
||||
@@ -52,14 +53,15 @@ type CSVReader struct {
|
||||
sharedGroups map[string]*SharedGroup
|
||||
lcrs map[string]*LCR
|
||||
derivedChargers map[string]utils.DerivedChargers
|
||||
cdrStats map[string]*CdrStats
|
||||
// file names
|
||||
destinationsFn, ratesFn, destinationratesFn, timingsFn, destinationratetimingsFn, ratingprofilesFn,
|
||||
sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn string
|
||||
sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn string
|
||||
}
|
||||
|
||||
func NewFileCSVReader(dataStorage RatingStorage, accountingStorage AccountingStorage, sep rune,
|
||||
destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn,
|
||||
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn string) *CSVReader {
|
||||
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn string) *CSVReader {
|
||||
c := new(CSVReader)
|
||||
c.sep = sep
|
||||
c.dataStorage = dataStorage
|
||||
@@ -71,25 +73,27 @@ func NewFileCSVReader(dataStorage RatingStorage, accountingStorage AccountingSto
|
||||
c.rates = make(map[string]*utils.TPRate)
|
||||
c.destinationRates = make(map[string]*utils.TPDestinationRate)
|
||||
c.timings = make(map[string]*utils.TPTiming)
|
||||
c.destinations = make(map[string]*Destination)
|
||||
c.ratingPlans = make(map[string]*RatingPlan)
|
||||
c.ratingProfiles = make(map[string]*RatingProfile)
|
||||
c.sharedGroups = make(map[string]*SharedGroup)
|
||||
c.lcrs = make(map[string]*LCR)
|
||||
c.derivedChargers = make(map[string]utils.DerivedChargers)
|
||||
c.cdrStats = make(map[string]*CdrStats)
|
||||
c.readerFunc = openFileCSVReader
|
||||
c.rpAliases = make(map[string]string)
|
||||
c.accAliases = make(map[string]string)
|
||||
c.destinationsFn, c.timingsFn, c.ratesFn, c.destinationratesFn, c.destinationratetimingsFn, c.ratingprofilesFn,
|
||||
c.sharedgroupsFn, c.lcrFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn = destinationsFn, timingsFn,
|
||||
ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn
|
||||
c.sharedgroupsFn, c.lcrFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn, c.cdrStatsFn = destinationsFn, timingsFn,
|
||||
ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn
|
||||
return c
|
||||
}
|
||||
|
||||
func NewStringCSVReader(dataStorage RatingStorage, accountingStorage AccountingStorage, sep rune,
|
||||
destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn,
|
||||
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn string) *CSVReader {
|
||||
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn string) *CSVReader {
|
||||
c := NewFileCSVReader(dataStorage, accountingStorage, sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn,
|
||||
ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn)
|
||||
ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn)
|
||||
c.readerFunc = openStringCSVReader
|
||||
return c
|
||||
}
|
||||
@@ -171,6 +175,8 @@ func (csvr *CSVReader) ShowStatistics() {
|
||||
log.Print("Derived Chargers: ", len(csvr.derivedChargers))
|
||||
// lcr rules
|
||||
log.Print("LCR rules: ", len(csvr.lcrs))
|
||||
// cdr stats
|
||||
log.Print("CDR stats: ", len(csvr.cdrStats))
|
||||
}
|
||||
|
||||
func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
@@ -320,6 +326,18 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
log.Print(key)
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("CDR Stats Queues")
|
||||
}
|
||||
for _, sq := range csvr.cdrStats {
|
||||
err = accountingStorage.SetCdrStats(sq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
log.Print(sq.Id)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -336,15 +354,10 @@ func (csvr *CSVReader) LoadDestinations() (err error) {
|
||||
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
|
||||
tag := record[0]
|
||||
var dest *Destination
|
||||
for _, d := range csvr.destinations {
|
||||
if d.Id == tag {
|
||||
dest = d
|
||||
break
|
||||
}
|
||||
}
|
||||
if dest == nil {
|
||||
var found bool
|
||||
if dest, found = csvr.destinations[tag]; !found {
|
||||
dest = &Destination{Id: tag}
|
||||
csvr.destinations = append(csvr.destinations, dest)
|
||||
csvr.destinations[tag] = dest
|
||||
}
|
||||
dest.AddPrefix(record[1])
|
||||
}
|
||||
@@ -427,12 +440,7 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) {
|
||||
}
|
||||
destinationExists := record[1] == utils.ANY
|
||||
if !destinationExists {
|
||||
for _, d := range csvr.destinations {
|
||||
if d.Id == record[1] {
|
||||
destinationExists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
_, destinationExists = csvr.destinations[record[1]]
|
||||
}
|
||||
if !destinationExists && csvr.dataStorage != nil {
|
||||
if destinationExists, err = csvr.dataStorage.HasData(DESTINATION_PREFIX, record[1]); err != nil {
|
||||
@@ -753,26 +761,49 @@ func (csvr *CSVReader) LoadActionTriggers() (err error) {
|
||||
tag := record[0]
|
||||
value, err := strconv.ParseFloat(record[4], 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not parse action trigger value: %v", err)
|
||||
return fmt.Errorf("Could not parse action trigger value (%v): %v", record[4], err)
|
||||
}
|
||||
recurrent, err := strconv.ParseBool(record[5])
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not parse action trigger recurrent flag: %v", err)
|
||||
return fmt.Errorf("Could not parse action trigger recurrent flag (%v): %v", record[5], err)
|
||||
}
|
||||
weight, err := strconv.ParseFloat(record[8], 64)
|
||||
minSleep, err := time.ParseDuration(record[6])
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not parse action trigger weight: %v", err)
|
||||
return fmt.Errorf("Could not parse action trigger MinSleep (%v): %v", record[6], err)
|
||||
}
|
||||
balanceWeight, err := strconv.ParseFloat(record[8], 64)
|
||||
if record[8] != "" && err != nil {
|
||||
return fmt.Errorf("Could not parse action trigger BalanceWeight (%v): %v", record[8], err)
|
||||
}
|
||||
balanceExp, err := utils.ParseTimeDetectLayout(record[9])
|
||||
if record[9] != "" && err != nil {
|
||||
return fmt.Errorf("Could not parse action trigger BalanceExpirationDate (%v): %v", record[9], err)
|
||||
}
|
||||
minQI, err := strconv.Atoi(record[12])
|
||||
if record[12] != "" && err != nil {
|
||||
return fmt.Errorf("Could not parse action trigger MinQueuedItems (%v): %v", record[12], err)
|
||||
}
|
||||
weight, err := strconv.ParseFloat(record[14], 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not parse action trigger weight (%v): %v", record[14], err)
|
||||
}
|
||||
|
||||
at := &ActionTrigger{
|
||||
Id: utils.GenUUID(),
|
||||
BalanceType: record[1],
|
||||
Direction: record[2],
|
||||
ThresholdType: record[3],
|
||||
ThresholdValue: value,
|
||||
Recurrent: recurrent,
|
||||
DestinationId: record[6],
|
||||
ActionsId: record[7],
|
||||
Weight: weight,
|
||||
Id: utils.GenUUID(),
|
||||
BalanceType: record[1],
|
||||
Direction: record[2],
|
||||
ThresholdType: record[3],
|
||||
ThresholdValue: value,
|
||||
Recurrent: recurrent,
|
||||
MinSleep: minSleep,
|
||||
DestinationId: record[7],
|
||||
BalanceWeight: balanceWeight,
|
||||
BalanceExpirationDate: balanceExp,
|
||||
BalanceRatingSubject: record[10],
|
||||
BalanceSharedGroup: record[11],
|
||||
MinQueuedItems: minQI,
|
||||
ActionsId: record[13],
|
||||
Weight: weight,
|
||||
}
|
||||
csvr.actionsTriggers[tag] = append(csvr.actionsTriggers[tag], at)
|
||||
}
|
||||
@@ -806,7 +837,7 @@ func (csvr *CSVReader) LoadAccountActions() (err error) {
|
||||
}
|
||||
aTriggers, exists := csvr.actionsTriggers[record[4]]
|
||||
if record[4] != "" && !exists {
|
||||
// only return error if there was something ther for the tag
|
||||
// only return error if there was something there for the tag
|
||||
return fmt.Errorf("Could not get action triggers for tag %s", record[4])
|
||||
}
|
||||
ub := &Account{
|
||||
@@ -882,6 +913,37 @@ func (csvr *CSVReader) LoadDerivedChargers() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (csvr *CSVReader) LoadCdrStats() (err error) {
|
||||
csvReader, fp, err := csvr.readerFunc(csvr.timingsFn, csvr.sep, utils.TIMINGS_NRCOLS)
|
||||
if err != nil {
|
||||
log.Print("Could not load cdr stats file: ", err)
|
||||
// allow writing of the other values
|
||||
return nil
|
||||
}
|
||||
if fp != nil {
|
||||
defer fp.Close()
|
||||
}
|
||||
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
|
||||
tag := record[0]
|
||||
if _, exists := csvr.cdrStats[tag]; exists {
|
||||
log.Print("Warning: duplicate cdr stats found: ", tag)
|
||||
}
|
||||
var cs *CdrStats
|
||||
var exists bool
|
||||
if cs, exists = csvr.cdrStats[tag]; !exists {
|
||||
cs = &CdrStats{}
|
||||
}
|
||||
triggers, exists := csvr.actionsTriggers[record[18]]
|
||||
if record[18] != "" && !exists {
|
||||
// only return error if there was something there for the tag
|
||||
return fmt.Errorf("Could not get action triggers for cdr stats id %s: %s", cs.Id, record[18])
|
||||
}
|
||||
UpdateCdrStats(cs, triggers, record...)
|
||||
csvr.cdrStats[tag] = cs
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Automated loading
|
||||
func (csvr *CSVReader) LoadAll() error {
|
||||
var err error
|
||||
@@ -921,6 +983,9 @@ func (csvr *CSVReader) LoadAll() error {
|
||||
if err = csvr.LoadDerivedChargers(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = csvr.LoadCdrStats(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -929,8 +994,10 @@ func (csvr *CSVReader) GetLoadedIds(categ string) ([]string, error) {
|
||||
switch categ {
|
||||
case DESTINATION_PREFIX:
|
||||
ids := make([]string, len(csvr.destinations))
|
||||
for idx, dst := range csvr.destinations {
|
||||
ids[idx] = dst.Id
|
||||
i := 0
|
||||
for k := range csvr.destinations {
|
||||
ids[i] = k
|
||||
i++
|
||||
}
|
||||
return ids, nil
|
||||
case RATING_PLAN_PREFIX:
|
||||
@@ -981,7 +1048,7 @@ func (csvr *CSVReader) GetLoadedIds(categ string) ([]string, error) {
|
||||
i++
|
||||
}
|
||||
return keys, nil
|
||||
case DERIVEDCHARGERS_PREFIX: // aliases
|
||||
case DERIVEDCHARGERS_PREFIX: // derived chargers
|
||||
keys := make([]string, len(csvr.derivedChargers))
|
||||
i := 0
|
||||
for k := range csvr.derivedChargers {
|
||||
@@ -989,6 +1056,14 @@ func (csvr *CSVReader) GetLoadedIds(categ string) ([]string, error) {
|
||||
i++
|
||||
}
|
||||
return keys, nil
|
||||
case CDR_STATS_PREFIX: // cdr stats
|
||||
keys := make([]string, len(csvr.cdrStats))
|
||||
i := 0
|
||||
for k := range csvr.cdrStats {
|
||||
keys[i] = k
|
||||
i++
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
return nil, errors.New("Unsupported category")
|
||||
}
|
||||
|
||||
@@ -161,11 +161,11 @@ TOPUP_SHARED10_AT,SE10,ASAP,10
|
||||
TOPUP_EMPTY_AT,EE0,ASAP,10
|
||||
`
|
||||
actionTriggers = `
|
||||
STANDARD_TRIGGER,*voice,*out,*min_counter,10,false,GERMANY_O2,SOME_1,10
|
||||
STANDARD_TRIGGER,*voice,*out,*max_balance,200,false,GERMANY,SOME_2,10
|
||||
STANDARD_TRIGGERS,*monetary,*out,*min_balance,2,false,,LOG_WARNING,10
|
||||
STANDARD_TRIGGERS,*monetary,*out,*max_balance,20,false,,LOG_WARNING,10
|
||||
STANDARD_TRIGGERS,*monetary,*out,*max_counter,5,false,FS_USERS,LOG_WARNING,10
|
||||
STANDARD_TRIGGER,*voice,*out,*min_counter,10,false,0,GERMANY_O2,,,,,,SOME_1,10
|
||||
STANDARD_TRIGGER,*voice,*out,*max_balance,200,false,0,GERMANY,,,,,,SOME_2,10
|
||||
STANDARD_TRIGGERS,*monetary,*out,*min_balance,2,false,0,,,,,,,LOG_WARNING,10
|
||||
STANDARD_TRIGGERS,*monetary,*out,*max_balance,20,false,0,,,,,,,LOG_WARNING,10
|
||||
STANDARD_TRIGGERS,*monetary,*out,*max_counter,5,false,0,FS_USERS,,,,,,LOG_WARNING,10
|
||||
`
|
||||
accountActions = `
|
||||
vdf,minitsboy;a1;a2,*out,MORE_MINUTES,STANDARD_TRIGGER
|
||||
@@ -181,6 +181,8 @@ vdf,emptyY,*out,TOPUP_EMPTY_AT,
|
||||
*out,cgrates.org,call,dan,dan,extra1,^filteredHeader1/filterValue1/,^prepaid,,,,rif,rif,,,,
|
||||
*out,cgrates.org,call,dan,dan,extra2,,,,,,ivo,ivo,,,,
|
||||
*out,cgrates.org,call,dan,*any,extra1,,,,,,rif2,rif2,,,,
|
||||
`
|
||||
cdrStats = `
|
||||
`
|
||||
)
|
||||
|
||||
@@ -188,7 +190,7 @@ var csvr *CSVReader
|
||||
|
||||
func init() {
|
||||
csvr = NewStringCSVReader(dataStorage, accountingStorage, ',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles,
|
||||
sharedGroups, lcrs, actions, actionTimings, actionTriggers, accountActions, derivedCharges)
|
||||
sharedGroups, lcrs, actions, actionTimings, actionTriggers, accountActions, derivedCharges, cdrStats)
|
||||
csvr.LoadDestinations()
|
||||
csvr.LoadTimings()
|
||||
csvr.LoadRates()
|
||||
|
||||
@@ -38,7 +38,7 @@ type DbReader struct {
|
||||
accountActions map[string]*Account
|
||||
dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed
|
||||
dirtyAccAliases []*TenantAccount // used to clean aliases that might have changed
|
||||
destinations []*Destination
|
||||
destinations map[string]*Destination
|
||||
rpAliases map[string]string
|
||||
accAliases map[string]string
|
||||
timings map[string]*utils.TPTiming
|
||||
@@ -49,6 +49,7 @@ type DbReader struct {
|
||||
sharedGroups map[string]*SharedGroup
|
||||
lcrs map[string]*LCR
|
||||
derivedChargers map[string]utils.DerivedChargers
|
||||
cdrStats map[string]*CdrStats
|
||||
}
|
||||
|
||||
func NewDbReader(storDB LoadStorage, ratingDb RatingStorage, accountDb AccountingStorage, tpid string) *DbReader {
|
||||
@@ -67,6 +68,8 @@ func NewDbReader(storDB LoadStorage, ratingDb RatingStorage, accountDb Accountin
|
||||
c.rpAliases = make(map[string]string)
|
||||
c.accAliases = make(map[string]string)
|
||||
c.accountActions = make(map[string]*Account)
|
||||
c.destinations = make(map[string]*Destination)
|
||||
c.cdrStats = make(map[string]*CdrStats)
|
||||
c.derivedChargers = make(map[string]utils.DerivedChargers)
|
||||
return c
|
||||
}
|
||||
@@ -290,12 +293,7 @@ func (dbr *DbReader) LoadDestinationRates() (err error) {
|
||||
dr.Rate = rate
|
||||
destinationExists := dr.DestinationId == utils.ANY
|
||||
if !destinationExists {
|
||||
for _, d := range dbr.destinations {
|
||||
if d.Id == dr.DestinationId {
|
||||
destinationExists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
_, destinationExists = dbr.destinations[dr.DestinationId]
|
||||
}
|
||||
if !destinationExists {
|
||||
if dbExists, err := dbr.dataDb.HasData(DESTINATION_PREFIX, dr.DestinationId); err != nil {
|
||||
@@ -824,8 +822,10 @@ func (dbr *DbReader) GetLoadedIds(categ string) ([]string, error) {
|
||||
switch categ {
|
||||
case DESTINATION_PREFIX:
|
||||
ids := make([]string, len(dbr.destinations))
|
||||
for idx, dst := range dbr.destinations {
|
||||
ids[idx] = dst.Id
|
||||
i := 0
|
||||
for k := range dbr.destinations {
|
||||
ids[i] = k
|
||||
i++
|
||||
}
|
||||
return ids, nil
|
||||
case RATING_PLAN_PREFIX:
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
@@ -95,6 +96,140 @@ func NewTiming(timingInfo ...string) (rt *utils.TPTiming) {
|
||||
return
|
||||
}
|
||||
|
||||
func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ...string) {
|
||||
cs = &CdrStats{}
|
||||
cs.Id = record[0]
|
||||
if record[1] != "" {
|
||||
if qi, err := strconv.Atoi(record[1]); err == nil {
|
||||
cs.QueuedItems = qi
|
||||
} else {
|
||||
log.Printf("Error parsing QueuedItems %v for cdrs stats %v", record[1], cs.Id)
|
||||
}
|
||||
}
|
||||
if record[2] != "" {
|
||||
if d, err := time.ParseDuration(record[2]); err == nil {
|
||||
cs.TimeWindow = d
|
||||
} else {
|
||||
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[2], cs.Id)
|
||||
}
|
||||
}
|
||||
if record[3] != "" {
|
||||
cs.Metrics = append(cs.Metrics, record[3])
|
||||
}
|
||||
if record[4] != "" {
|
||||
times := strings.Split(record[4], utils.INFIELD_SEP)
|
||||
if len(times) > 0 {
|
||||
if sTime, err := utils.ParseTimeDetectLayout(times[0]); err == nil {
|
||||
if len(cs.SetupInterval) < 1 {
|
||||
cs.SetupInterval = append(cs.SetupInterval, sTime)
|
||||
} else {
|
||||
cs.SetupInterval[0] = sTime
|
||||
}
|
||||
} else {
|
||||
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[4], cs.Id)
|
||||
}
|
||||
}
|
||||
if len(times) > 1 {
|
||||
if eTime, err := utils.ParseTimeDetectLayout(times[1]); err == nil {
|
||||
if len(cs.SetupInterval) < 2 {
|
||||
cs.SetupInterval = append(cs.SetupInterval, eTime)
|
||||
} else {
|
||||
cs.SetupInterval[1] = eTime
|
||||
}
|
||||
} else {
|
||||
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[4], cs.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
if record[5] != "" {
|
||||
cs.TOR = append(cs.TOR, record[5])
|
||||
}
|
||||
if record[6] != "" {
|
||||
cs.CdrHost = append(cs.CdrHost, record[6])
|
||||
}
|
||||
if record[7] != "" {
|
||||
cs.CdrSource = append(cs.CdrSource, record[7])
|
||||
}
|
||||
if record[8] != "" {
|
||||
cs.ReqType = append(cs.ReqType, record[8])
|
||||
}
|
||||
if record[9] != "" {
|
||||
cs.Direction = append(cs.Direction, record[9])
|
||||
}
|
||||
if record[10] != "" {
|
||||
cs.Tenant = append(cs.Tenant, record[10])
|
||||
}
|
||||
if record[11] != "" {
|
||||
cs.Category = append(cs.Category, record[11])
|
||||
}
|
||||
if record[12] != "" {
|
||||
cs.Account = append(cs.Account, record[12])
|
||||
}
|
||||
if record[13] != "" {
|
||||
cs.Subject = append(cs.Subject, record[13])
|
||||
}
|
||||
if record[14] != "" {
|
||||
cs.DestinationPrefix = append(cs.DestinationPrefix, record[14])
|
||||
}
|
||||
if record[15] != "" {
|
||||
durations := strings.Split(record[15], utils.INFIELD_SEP)
|
||||
if len(durations) > 0 {
|
||||
if sDuration, err := time.ParseDuration(durations[0]); err == nil {
|
||||
if len(cs.UsageInterval) < 1 {
|
||||
cs.UsageInterval = append(cs.UsageInterval, sDuration)
|
||||
} else {
|
||||
cs.UsageInterval[0] = sDuration
|
||||
}
|
||||
} else {
|
||||
log.Printf("Error parsing UsageInterval %v for cdrs stats %v", record[15], cs.Id)
|
||||
}
|
||||
}
|
||||
if len(durations) > 1 {
|
||||
if eDuration, err := time.ParseDuration(durations[1]); err == nil {
|
||||
if len(cs.UsageInterval) < 2 {
|
||||
cs.UsageInterval = append(cs.UsageInterval, eDuration)
|
||||
} else {
|
||||
cs.UsageInterval[1] = eDuration
|
||||
}
|
||||
} else {
|
||||
log.Printf("Error parsing UsageInterval %v for cdrs stats %v", record[15], cs.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
if record[16] != "" {
|
||||
cs.MediationRunIds = append(cs.MediationRunIds, record[16])
|
||||
}
|
||||
if record[17] != "" {
|
||||
costs := strings.Split(record[17], utils.INFIELD_SEP)
|
||||
if len(costs) > 0 {
|
||||
if sCost, err := strconv.ParseFloat(costs[0], 64); err == nil {
|
||||
if len(cs.CostInterval) < 1 {
|
||||
cs.CostInterval = append(cs.CostInterval, sCost)
|
||||
} else {
|
||||
cs.CostInterval[0] = sCost
|
||||
}
|
||||
} else {
|
||||
log.Printf("Error parsing CostInterval %v for cdrs stats %v", record[17], cs.Id)
|
||||
}
|
||||
}
|
||||
if len(costs) > 1 {
|
||||
if eCost, err := strconv.ParseFloat(costs[1], 64); err == nil {
|
||||
if len(cs.CostInterval) < 2 {
|
||||
cs.CostInterval = append(cs.CostInterval, eCost)
|
||||
} else {
|
||||
cs.CostInterval[1] = eCost
|
||||
}
|
||||
} else {
|
||||
log.Printf("Error parsing CostInterval %v for cdrs stats %v", record[17], cs.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if triggers != nil {
|
||||
cs.Triggers = triggers
|
||||
}
|
||||
}
|
||||
|
||||
func NewRatingPlan(timing *utils.TPTiming, weight string) (drt *utils.TPRatingPlanBinding) {
|
||||
w, err := strconv.ParseFloat(weight, 64)
|
||||
if err != nil {
|
||||
|
||||
@@ -133,6 +133,7 @@ func TestLoadFromCSV(t *testing.T) {
|
||||
path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ACTION_TRIGGERS_CSV),
|
||||
path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ACCOUNT_ACTIONS_CSV),
|
||||
path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.DERIVED_CHARGERS_CSV),
|
||||
path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.CDR_STATS_CSV),
|
||||
)
|
||||
|
||||
if err = loader.LoadDestinations(); err != nil {
|
||||
|
||||
@@ -40,6 +40,9 @@ type Stats struct {
|
||||
func (s *Stats) AddQueue(sq *StatsQueue, out *int) error {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
if s.queues == nil {
|
||||
s.queues = make(map[string]*StatsQueue)
|
||||
}
|
||||
s.queues[sq.conf.Id] = sq
|
||||
return nil
|
||||
}
|
||||
@@ -54,6 +57,27 @@ func (s *Stats) GetValues(sqID string, values *map[string]float64) error {
|
||||
return errors.New("Not Found")
|
||||
}
|
||||
|
||||
// change the xisting ones
|
||||
// add new ones
|
||||
// delete the ones missing from the new list
|
||||
func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
oldQueues := s.queues
|
||||
s.queues = make(map[string]*StatsQueue)
|
||||
for _, cs := range css {
|
||||
var sq *StatsQueue
|
||||
var existing bool
|
||||
if sq, existing = oldQueues[cs.Id]; existing {
|
||||
sq.conf = cs
|
||||
} else {
|
||||
sq = NewStatsQueue(cs)
|
||||
}
|
||||
s.queues[cs.Id] = sq
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Stats) AppendCDR(cdr *utils.StoredCdr, out *int) error {
|
||||
s.mux.RLock()
|
||||
defer s.mux.RUnlock()
|
||||
|
||||
@@ -43,6 +43,7 @@ const (
|
||||
DESTINATION_PREFIX = "dst_"
|
||||
LCR_PREFIX = "lcr_"
|
||||
DERIVEDCHARGERS_PREFIX = "dcs_"
|
||||
CDR_STATS_PREFIX = "cst_"
|
||||
TEMP_DESTINATION_PREFIX = "tmp_"
|
||||
LOG_CALL_COST_PREFIX = "cco_"
|
||||
LOG_ACTION_TIMMING_PREFIX = "ltm_"
|
||||
@@ -106,6 +107,9 @@ type AccountingStorage interface {
|
||||
GetAllActionTimings() (map[string]ActionPlan, error)
|
||||
GetDerivedChargers(string, bool) (utils.DerivedChargers, error)
|
||||
SetDerivedChargers(string, utils.DerivedChargers) error
|
||||
SetCdrStats(*CdrStats) error
|
||||
GetCdrStats(string) (*CdrStats, error)
|
||||
GetAllCdrStats() ([]*CdrStats, error)
|
||||
}
|
||||
|
||||
type CdrStorage interface {
|
||||
@@ -138,7 +142,7 @@ type LoadStorage interface {
|
||||
GetTpTimings(string, string) (map[string]*utils.TPTiming, error)
|
||||
|
||||
SetTPDestination(string, *Destination) error
|
||||
GetTpDestinations(string, string) ([]*Destination, error)
|
||||
GetTpDestinations(string, string) (map[string]*Destination, error)
|
||||
|
||||
SetTPRates(string, map[string][]*utils.RateSlot) error
|
||||
GetTpRates(string, string) (map[string]*utils.TPRate, error)
|
||||
|
||||
@@ -485,7 +485,7 @@ func (ms *MapStorage) SetActionTimings(key string, ats ActionPlan) (err error) {
|
||||
func (ms *MapStorage) GetAllActionTimings() (ats map[string]ActionPlan, err error) {
|
||||
ats = make(map[string]ActionPlan)
|
||||
for key, value := range ms.dict {
|
||||
if !strings.Contains(key, ACTION_TIMING_PREFIX) {
|
||||
if !strings.HasPrefix(key, ACTION_TIMING_PREFIX) {
|
||||
continue
|
||||
}
|
||||
var tempAts ActionPlan
|
||||
@@ -519,6 +519,33 @@ func (ms *MapStorage) SetDerivedChargers(key string, dcs utils.DerivedChargers)
|
||||
return err
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetCdrStats(cs *CdrStats) error {
|
||||
result, err := ms.ms.Marshal(cs)
|
||||
ms.dict[CDR_STATS_PREFIX+cs.Id] = result
|
||||
return err
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
|
||||
if values, ok := ms.dict[key]; ok {
|
||||
err = ms.ms.Unmarshal(values, &cs)
|
||||
} else {
|
||||
return nil, errors.New(utils.ERR_NOT_FOUND)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetAllCdrStats() (css []*CdrStats, err error) {
|
||||
for key, value := range ms.dict {
|
||||
if !strings.HasPrefix(key, CDR_STATS_PREFIX) {
|
||||
continue
|
||||
}
|
||||
var cs *CdrStats
|
||||
err = ms.ms.Unmarshal(value, cs)
|
||||
css = append(css, cs)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) error {
|
||||
result, err := ms.ms.Marshal(cc)
|
||||
ms.dict[LOG_CALL_COST_PREFIX+source+runid+"_"+cgrid] = result
|
||||
|
||||
@@ -664,6 +664,37 @@ func (rs *RedisStorage) SetDerivedChargers(key string, dcs utils.DerivedChargers
|
||||
return err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetCdrStats(cs *CdrStats) error {
|
||||
marshaled, err := rs.ms.Marshal(cs)
|
||||
err = rs.db.Set(CDR_STATS_PREFIX+cs.Id, marshaled)
|
||||
return err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
|
||||
var values []byte
|
||||
if values, err = rs.db.Get(key); err == nil {
|
||||
err = rs.ms.Unmarshal(values, &cs)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetAllCdrStats() (css []*CdrStats, err error) {
|
||||
keys, err := rs.db.Keys(CDR_STATS_PREFIX + "*")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, key := range keys {
|
||||
value, err := rs.db.Get(key)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var cs *CdrStats
|
||||
err = rs.ms.Unmarshal(value, cs)
|
||||
css = append(css, cs)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) (err error) {
|
||||
var result []byte
|
||||
result, err = rs.ms.Marshal(cc)
|
||||
|
||||
@@ -23,12 +23,13 @@ import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"io/ioutil"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -949,8 +950,8 @@ func (self *SQLStorage) RemStoredCdrs(cgrIds []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *SQLStorage) GetTpDestinations(tpid, tag string) ([]*Destination, error) {
|
||||
var dests []*Destination
|
||||
func (self *SQLStorage) GetTpDestinations(tpid, tag string) (map[string]*Destination, error) {
|
||||
var dests map[string]*Destination
|
||||
q := fmt.Sprintf("SELECT * FROM %s WHERE tpid='%s'", utils.TBL_TP_DESTINATIONS, tpid)
|
||||
if len(tag) != 0 {
|
||||
q += fmt.Sprintf(" AND id='%s'", tag)
|
||||
@@ -967,15 +968,10 @@ func (self *SQLStorage) GetTpDestinations(tpid, tag string) ([]*Destination, err
|
||||
return nil, err
|
||||
}
|
||||
var dest *Destination
|
||||
for _, d := range dests {
|
||||
if d.Id == tag {
|
||||
dest = d
|
||||
break
|
||||
}
|
||||
}
|
||||
if dest == nil {
|
||||
var found bool
|
||||
if dest, found = dests[tag]; !found {
|
||||
dest = &Destination{Id: tag}
|
||||
dests = append(dests, dest)
|
||||
dests[tag] = dest
|
||||
}
|
||||
dest.AddPrefix(prefix)
|
||||
}
|
||||
|
||||
@@ -19,10 +19,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package general_tests
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCosts1SetStorage(t *testing.T) {
|
||||
@@ -53,7 +54,7 @@ RP_SMS1,DR_SMS_1,ALWAYS,10`
|
||||
*out,cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1,
|
||||
*out,cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,`
|
||||
csvr := engine.NewStringCSVReader(ratingDb, acntDb, ',', dests, timings, rates, destinationRates, ratingPlans, ratingProfiles,
|
||||
"", "", "", "", "", "", "")
|
||||
"", "", "", "", "", "", "", "")
|
||||
if err := csvr.LoadTimings(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ DR_DATA_2,*any,RT_DATA_1c,*up,4`
|
||||
RP_DATA1,DR_DATA_2,TM2,10`
|
||||
ratingProfiles := `*out,cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1,`
|
||||
csvr := engine.NewStringCSVReader(ratingDb, acntDb, ',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles,
|
||||
"", "", "", "", "", "", "")
|
||||
"", "", "", "", "", "", "", "")
|
||||
if err := csvr.LoadTimings(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -59,8 +59,9 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
|
||||
actionTriggers := ``
|
||||
accountActions := `cgrates.org,12344,*out,TOPUP10_AT,`
|
||||
derivedCharges := ``
|
||||
cdrStats := ``
|
||||
csvr := engine.NewStringCSVReader(ratingDb, acntDb, ',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles,
|
||||
sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges)
|
||||
sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats)
|
||||
if err := csvr.LoadDestinations(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -59,8 +59,9 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
|
||||
actionTriggers := ``
|
||||
accountActions := `cgrates.org,12345,*out,TOPUP10_AT,`
|
||||
derivedCharges := ``
|
||||
cdrStats := ``
|
||||
csvr := engine.NewStringCSVReader(ratingDb2, acntDb2, ',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles,
|
||||
sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges)
|
||||
sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats)
|
||||
if err := csvr.LoadDestinations(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -57,8 +57,9 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10`
|
||||
actionTriggers := ``
|
||||
accountActions := `cgrates.org,12346,*out,TOPUP10_AT,`
|
||||
derivedCharges := ``
|
||||
cdrStats := ``
|
||||
csvr := engine.NewStringCSVReader(ratingDb3, acntDb3, ',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles,
|
||||
sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges)
|
||||
sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats)
|
||||
if err := csvr.LoadDestinations(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ func TestSMSLoadCsvTpSmsChrg1(t *testing.T) {
|
||||
ratingPlans := `RP_SMS1,DR_SMS_1,ALWAYS,10`
|
||||
ratingProfiles := `*out,cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,`
|
||||
csvr := engine.NewStringCSVReader(ratingDb, acntDb, ',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles,
|
||||
"", "", "", "", "", "", "")
|
||||
"", "", "", "", "", "", "", "")
|
||||
if err := csvr.LoadTimings(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@ const (
|
||||
ACTION_TRIGGERS_CSV = "ActionTriggers.csv"
|
||||
ACCOUNT_ACTIONS_CSV = "AccountActions.csv"
|
||||
DERIVED_CHARGERS_CSV = "DerivedChargers.csv"
|
||||
CDR_STATS_CSV = "CdrStats.csv"
|
||||
TIMINGS_NRCOLS = 6
|
||||
DESTINATIONS_NRCOLS = 2
|
||||
RATES_NRCOLS = 6
|
||||
@@ -60,9 +61,10 @@ const (
|
||||
LCRS_NRCOLS = 9
|
||||
ACTIONS_NRCOLS = 12
|
||||
ACTION_PLANS_NRCOLS = 4
|
||||
ACTION_TRIGGERS_NRCOLS = 9
|
||||
ACTION_TRIGGERS_NRCOLS = 15
|
||||
ACCOUNT_ACTIONS_NRCOLS = 5
|
||||
DERIVED_CHARGERS_NRCOLS = 17
|
||||
CDR_STATS_NRCOLS = 19
|
||||
ROUNDING_UP = "*up"
|
||||
ROUNDING_MIDDLE = "*middle"
|
||||
ROUNDING_DOWN = "*down"
|
||||
|
||||
Reference in New Issue
Block a user