load cdr stats and shared groups

This commit is contained in:
Radu Ioan Fericean
2014-09-16 17:00:02 +03:00
parent 59bb827dd8
commit 788d3c37a4
7 changed files with 182 additions and 62 deletions

View File

@@ -304,6 +304,53 @@ func (self *ApierV1) LoadRatingProfile(attrs utils.TPRatingProfile, reply *strin
return nil
}
type AttrLoadSharedGroup struct {
TPid string
SharedGroupId string
}
// Load destinations from storDb into dataDb.
func (self *ApierV1) LoadSharedGroup(attrs AttrLoadSharedGroup, reply *string) error {
if missing := utils.MissingStructFields(&attrs, []string{"TPid", "SharedGroupId"}); len(missing) != 0 {
return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing)
}
if attrs.SharedGroupId == utils.EMPTY {
attrs.SharedGroupId = ""
}
dbReader := engine.NewDbReader(self.StorDb, self.RatingDb, self.AccountDb, attrs.TPid)
if err := dbReader.LoadSharedGroupByTag(attrs.SharedGroupId, true); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
//Automatic cache of the newly inserted rating plan
didNotChange := []string{}
if err := self.AccountDb.CacheAccounting(didNotChange, nil, didNotChange, didNotChange); err != nil {
return err
}
*reply = OK
return nil
}
type AttrLoadCdrStats struct {
TPid string
CdrStatsId string
}
// Load destinations from storDb into dataDb.
func (self *ApierV1) LoadCdrStats(attrs AttrLoadCdrStats, reply *string) error {
if missing := utils.MissingStructFields(&attrs, []string{"TPid", "CdrStatsId"}); len(missing) != 0 {
return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing)
}
if attrs.CdrStatsId == utils.EMPTY {
attrs.CdrStatsId = ""
}
dbReader := engine.NewDbReader(self.StorDb, self.RatingDb, self.AccountDb, attrs.TPid)
if err := dbReader.LoadCdrStatsByTag(attrs.CdrStatsId, true); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
*reply = OK
return nil
}
type AttrSetRatingProfile struct {
Tenant string // Tenant's Id
Category string // TypeOfRecord

View File

@@ -934,7 +934,7 @@ func (csvr *CSVReader) LoadCdrStats() (err error) {
var cs *CdrStats
var exists bool
if cs, exists = csvr.cdrStats[tag]; !exists {
cs = &CdrStats{}
cs = &CdrStats{Id: tag}
}
triggerTag := record[20]
triggers, exists := csvr.actionsTriggers[triggerTag]
@@ -942,7 +942,29 @@ func (csvr *CSVReader) LoadCdrStats() (err error) {
// only return error if there was something there for the tag
return fmt.Errorf("Could not get action triggers for cdr stats id %s: %s", cs.Id, triggerTag)
}
UpdateCdrStats(cs, triggers, record...)
tpCs := &utils.TPCdrStat{
QueueLength: record[1],
TimeWindow: record[2],
Metrics: record[3],
SetupInterval: record[4],
TOR: record[5],
CdrHost: record[6],
CdrSource: record[7],
ReqType: record[8],
Direction: record[9],
Tenant: record[10],
Category: record[11],
Account: record[12],
Subject: record[13],
DestinationPrefix: record[14],
UsageInterval: record[15],
MediationRunIds: record[16],
RatedAccount: record[17],
RatedSubject: record[18],
CostInterval: record[19],
ActionTriggers: record[20],
}
UpdateCdrStats(cs, triggers, tpCs)
csvr.cdrStats[tag] = cs
}
return

View File

@@ -501,11 +501,12 @@ func (dbr *DbReader) LoadRatingProfileFiltered(qriedRpf *utils.TPRatingProfile)
return nil
}
func (dbr *DbReader) LoadSharedGroups() (err error) {
storSgs, err := dbr.storDb.GetTpSharedGroups(dbr.tpid, "")
func (dbr *DbReader) LoadSharedGroupByTag(tag string, save bool) error {
storSgs, err := dbr.storDb.GetTpSharedGroups(dbr.tpid, tag)
if err != nil {
return err
}
var loadedTags []string
for tag, tpSgs := range storSgs {
sg, exists := dbr.sharedGroups[tag]
if !exists {
@@ -521,10 +522,22 @@ func (dbr *DbReader) LoadSharedGroups() (err error) {
}
}
dbr.sharedGroups[tag] = sg
loadedTags = append(loadedTags, tag)
}
if save {
for _, tag := range loadedTags {
if err := dbr.accountDb.SetSharedGroup(dbr.sharedGroups[tag]); err != nil {
return err
}
}
}
return nil
}
func (dbr *DbReader) LoadSharedGroups() error {
return dbr.LoadSharedGroupByTag("", false)
}
func (dbr *DbReader) LoadLCRs() (err error) {
dbr.lcrs, err = dbr.storDb.GetTpLCRs(dbr.tpid, "")
return err
@@ -864,8 +877,46 @@ func (dbr *DbReader) LoadDerivedChargersFiltered(filter *utils.TPDerivedChargers
return nil // Placeholder for now
}
func (dbr *DbReader) LoadCdrStats() (err error) {
return nil // Placeholder for now
func (dbr *DbReader) LoadCdrStatsByTag(tag string, save bool) error {
storStats, err := dbr.storDb.GetTpCdrStats(dbr.tpid, tag)
if err != nil {
return err
}
if save && len(dbr.actionsTriggers) == 0 {
// load action triggers to check existence
dbr.LoadActionTriggers()
}
var loadedTags []string
for tag, tpStats := range storStats {
for _, tpStat := range tpStats {
var cs *CdrStats
var exists bool
if cs, exists = dbr.cdrStats[tag]; !exists {
cs = &CdrStats{Id: tag}
}
triggerTag := tpStat.ActionTriggers
triggers, exists := dbr.actionsTriggers[triggerTag]
if triggerTag != "" && !exists {
// only return error if there was something there for the tag
return fmt.Errorf("Could not get action triggers for cdr stats id %s: %s", cs.Id, triggerTag)
}
UpdateCdrStats(cs, triggers, tpStat)
dbr.cdrStats[tag] = cs
loadedTags = append(loadedTags, tag)
}
}
if save {
for _, tag := range loadedTags {
if err := dbr.dataDb.SetCdrStats(dbr.cdrStats[tag]); err != nil {
return err
}
}
}
return nil
}
func (dbr *DbReader) LoadCdrStats() error {
return dbr.LoadCdrStatsByTag("", false)
}
// Automated loading

View File

@@ -96,27 +96,27 @@ func NewTiming(timingInfo ...string) (rt *utils.TPTiming) {
return
}
func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ...string) {
cs.Id = record[0]
if record[1] != "" {
if qi, err := strconv.Atoi(record[1]); err == nil {
func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, tpCs *utils.TPCdrStat) {
if tpCs.QueueLength != "" {
if qi, err := strconv.Atoi(tpCs.QueueLength); err == nil {
cs.QueueLength = qi
} else {
log.Printf("Error parsing QueuedLength %v for cdrs stats %v", record[1], cs.Id)
log.Printf("Error parsing QueuedLength %v for cdrs stats %v", tpCs.QueueLength, cs.Id)
}
}
if record[2] != "" {
if d, err := time.ParseDuration(record[2]); err == nil {
if tpCs.TimeWindow != "" {
if d, err := time.ParseDuration(tpCs.TimeWindow); err == nil {
cs.TimeWindow = d
} else {
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[2], cs.Id)
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.TimeWindow, cs.Id)
}
}
if record[3] != "" {
cs.Metrics = append(cs.Metrics, record[3])
if tpCs.Metrics != "" {
cs.Metrics = append(cs.Metrics, tpCs.Metrics)
}
if record[4] != "" {
times := strings.Split(record[4], utils.INFIELD_SEP)
if tpCs.SetupInterval != "" {
times := strings.Split(tpCs.SetupInterval, utils.INFIELD_SEP)
if len(times) > 0 {
if sTime, err := utils.ParseTimeDetectLayout(times[0]); err == nil {
if len(cs.SetupInterval) < 1 {
@@ -125,7 +125,7 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ...
cs.SetupInterval[0] = sTime
}
} else {
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[4], cs.Id)
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.SetupInterval, cs.Id)
}
}
if len(times) > 1 {
@@ -136,42 +136,42 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ...
cs.SetupInterval[1] = eTime
}
} else {
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[4], cs.Id)
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.SetupInterval, cs.Id)
}
}
}
if record[5] != "" {
cs.TOR = append(cs.TOR, record[5])
if tpCs.TOR != "" {
cs.TOR = append(cs.TOR, tpCs.TOR)
}
if record[6] != "" {
cs.CdrHost = append(cs.CdrHost, record[6])
if tpCs.CdrHost != "" {
cs.CdrHost = append(cs.CdrHost, tpCs.CdrHost)
}
if record[7] != "" {
cs.CdrSource = append(cs.CdrSource, record[7])
if tpCs.CdrSource != "" {
cs.CdrSource = append(cs.CdrSource, tpCs.CdrSource)
}
if record[8] != "" {
cs.ReqType = append(cs.ReqType, record[8])
if tpCs.ReqType != "" {
cs.ReqType = append(cs.ReqType, tpCs.ReqType)
}
if record[9] != "" {
cs.Direction = append(cs.Direction, record[9])
if tpCs.Direction != "" {
cs.Direction = append(cs.Direction, tpCs.Direction)
}
if record[10] != "" {
cs.Tenant = append(cs.Tenant, record[10])
if tpCs.Tenant != "" {
cs.Tenant = append(cs.Tenant, tpCs.Tenant)
}
if record[11] != "" {
cs.Category = append(cs.Category, record[11])
if tpCs.Category != "" {
cs.Category = append(cs.Category, tpCs.Category)
}
if record[12] != "" {
cs.Account = append(cs.Account, record[12])
if tpCs.Account != "" {
cs.Account = append(cs.Account, tpCs.Account)
}
if record[13] != "" {
cs.Subject = append(cs.Subject, record[13])
if tpCs.Subject != "" {
cs.Subject = append(cs.Subject, tpCs.Subject)
}
if record[14] != "" {
cs.DestinationPrefix = append(cs.DestinationPrefix, record[14])
if tpCs.DestinationPrefix != "" {
cs.DestinationPrefix = append(cs.DestinationPrefix, tpCs.DestinationPrefix)
}
if record[15] != "" {
durations := strings.Split(record[15], utils.INFIELD_SEP)
if tpCs.UsageInterval != "" {
durations := strings.Split(tpCs.UsageInterval, utils.INFIELD_SEP)
if len(durations) > 0 {
if sDuration, err := time.ParseDuration(durations[0]); err == nil {
if len(cs.UsageInterval) < 1 {
@@ -180,7 +180,7 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ...
cs.UsageInterval[0] = sDuration
}
} else {
log.Printf("Error parsing UsageInterval %v for cdrs stats %v", record[15], cs.Id)
log.Printf("Error parsing UsageInterval %v for cdrs stats %v", tpCs.UsageInterval, cs.Id)
}
}
if len(durations) > 1 {
@@ -191,21 +191,21 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ...
cs.UsageInterval[1] = eDuration
}
} else {
log.Printf("Error parsing UsageInterval %v for cdrs stats %v", record[15], cs.Id)
log.Printf("Error parsing UsageInterval %v for cdrs stats %v", tpCs.UsageInterval, cs.Id)
}
}
}
if record[16] != "" {
cs.MediationRunIds = append(cs.MediationRunIds, record[16])
if tpCs.MediationRunIds != "" {
cs.MediationRunIds = append(cs.MediationRunIds, tpCs.MediationRunIds)
}
if record[17] != "" {
cs.RatedAccount = append(cs.RatedAccount, record[17])
if tpCs.RatedAccount != "" {
cs.RatedAccount = append(cs.RatedAccount, tpCs.RatedAccount)
}
if record[18] != "" {
cs.RatedSubject = append(cs.RatedSubject, record[18])
if tpCs.RatedSubject != "" {
cs.RatedSubject = append(cs.RatedSubject, tpCs.RatedSubject)
}
if record[19] != "" {
costs := strings.Split(record[19], utils.INFIELD_SEP)
if tpCs.CostInterval != "" {
costs := strings.Split(tpCs.CostInterval, utils.INFIELD_SEP)
if len(costs) > 0 {
if sCost, err := strconv.ParseFloat(costs[0], 64); err == nil {
if len(cs.CostInterval) < 1 {
@@ -214,7 +214,7 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ...
cs.CostInterval[0] = sCost
}
} else {
log.Printf("Error parsing CostInterval %v for cdrs stats %v", record[19], cs.Id)
log.Printf("Error parsing CostInterval %v for cdrs stats %v", tpCs.CostInterval, cs.Id)
}
}
if len(costs) > 1 {
@@ -225,7 +225,7 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ...
cs.CostInterval[1] = eCost
}
} else {
log.Printf("Error parsing CostInterval %v for cdrs stats %v", record[19], cs.Id)
log.Printf("Error parsing CostInterval %v for cdrs stats %v", tpCs.CostInterval, cs.Id)
}
}
}

View File

@@ -336,10 +336,11 @@ func (self *SQLStorage) SetTPCdrStats(tpid string, css map[string][]*utils.TPCdr
for csId, cStats := range css {
tx.Where("tpid = ?", tpid).Where("id = ?", csId).Delete(TpCdrStat{})
for _, cs := range cStats {
ql, _ := strconv.Atoi(cs.QueueLength)
tx.Save(TpCdrStat{
Tpid: tpid,
Id: csId,
QueueLength: cs.QueueLength,
QueueLength: ql,
TimeWindow: cs.TimeWindow,
Metrics: cs.Metrics,
SetupInterval: cs.SetupInterval,
@@ -1317,7 +1318,7 @@ func (self *SQLStorage) GetTpSharedGroups(tpid, tag string) (map[string][]*utils
}
for _, tpSg := range tpCdrStats {
sgs[tag] = append(sgs[tpSg.Id], &utils.TPSharedGroup{
sgs[tpSg.Id] = append(sgs[tpSg.Id], &utils.TPSharedGroup{
Account: tpSg.Account,
Strategy: tpSg.Strategy,
RatingSubject: tpSg.RatingSubject,
@@ -1339,8 +1340,8 @@ func (self *SQLStorage) GetTpCdrStats(tpid, tag string) (map[string][]*utils.TPC
}
for _, tpCs := range tpCdrStats {
css[tag] = append(css[tpCs.Id], &utils.TPCdrStat{
QueueLength: tpCs.QueueLength,
css[tpCs.Id] = append(css[tpCs.Id], &utils.TPCdrStat{
QueueLength: strconv.Itoa(tpCs.QueueLength),
TimeWindow: tpCs.TimeWindow,
Metrics: tpCs.Metrics,
SetupInterval: tpCs.SetupInterval,

View File

@@ -680,8 +680,7 @@ func (self *TPCSVImporter) importCdrStats(fn string) error {
if len(record[1]) == 0 {
record[1] = "0" // Empty value will be translated to 0 as QueueLength
}
ql, err := strconv.Atoi(record[1])
if err != nil {
if _, err = strconv.Atoi(record[1]); err != nil {
log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error())
continue
}
@@ -689,7 +688,7 @@ func (self *TPCSVImporter) importCdrStats(fn string) error {
css[record[0]] = make([]*utils.TPCdrStat, 0)
}
css[record[0]] = append(css[record[0]], &utils.TPCdrStat{
QueueLength: ql,
QueueLength: record[1],
TimeWindow: ValueOrDefault(record[2], "0"),
Metrics: record[3],
SetupInterval: record[4],

View File

@@ -298,7 +298,7 @@ type TPCdrStats struct {
}
type TPCdrStat struct {
QueueLength int
QueueLength string
TimeWindow string
Metrics string
SetupInterval string