diff --git a/apier/apier.go b/apier/apier.go index 7a01d8098..a2b51cfff 100644 --- a/apier/apier.go +++ b/apier/apier.go @@ -304,6 +304,53 @@ func (self *ApierV1) LoadRatingProfile(attrs utils.TPRatingProfile, reply *strin return nil } +type AttrLoadSharedGroup struct { + TPid string + SharedGroupId string +} + +// Load destinations from storDb into dataDb. +func (self *ApierV1) LoadSharedGroup(attrs AttrLoadSharedGroup, reply *string) error { + if missing := utils.MissingStructFields(&attrs, []string{"TPid", "SharedGroupId"}); len(missing) != 0 { + return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) + } + if attrs.SharedGroupId == utils.EMPTY { + attrs.SharedGroupId = "" + } + dbReader := engine.NewDbReader(self.StorDb, self.RatingDb, self.AccountDb, attrs.TPid) + if err := dbReader.LoadSharedGroupByTag(attrs.SharedGroupId, true); err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } + //Automatic cache of the newly inserted rating plan + didNotChange := []string{} + if err := self.AccountDb.CacheAccounting(didNotChange, nil, didNotChange, didNotChange); err != nil { + return err + } + *reply = OK + return nil +} + +type AttrLoadCdrStats struct { + TPid string + CdrStatsId string +} + +// Load destinations from storDb into dataDb. +func (self *ApierV1) LoadCdrStats(attrs AttrLoadCdrStats, reply *string) error { + if missing := utils.MissingStructFields(&attrs, []string{"TPid", "CdrStatsId"}); len(missing) != 0 { + return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) + } + if attrs.CdrStatsId == utils.EMPTY { + attrs.CdrStatsId = "" + } + dbReader := engine.NewDbReader(self.StorDb, self.RatingDb, self.AccountDb, attrs.TPid) + if err := dbReader.LoadCdrStatsByTag(attrs.CdrStatsId, true); err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } + *reply = OK + return nil +} + type AttrSetRatingProfile struct { Tenant string // Tenant's Id Category string // TypeOfRecord diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 5d4a5fba5..8b48cffd1 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -934,7 +934,7 @@ func (csvr *CSVReader) LoadCdrStats() (err error) { var cs *CdrStats var exists bool if cs, exists = csvr.cdrStats[tag]; !exists { - cs = &CdrStats{} + cs = &CdrStats{Id: tag} } triggerTag := record[20] triggers, exists := csvr.actionsTriggers[triggerTag] @@ -942,7 +942,29 @@ func (csvr *CSVReader) LoadCdrStats() (err error) { // only return error if there was something there for the tag return fmt.Errorf("Could not get action triggers for cdr stats id %s: %s", cs.Id, triggerTag) } - UpdateCdrStats(cs, triggers, record...) + tpCs := &utils.TPCdrStat{ + QueueLength: record[1], + TimeWindow: record[2], + Metrics: record[3], + SetupInterval: record[4], + TOR: record[5], + CdrHost: record[6], + CdrSource: record[7], + ReqType: record[8], + Direction: record[9], + Tenant: record[10], + Category: record[11], + Account: record[12], + Subject: record[13], + DestinationPrefix: record[14], + UsageInterval: record[15], + MediationRunIds: record[16], + RatedAccount: record[17], + RatedSubject: record[18], + CostInterval: record[19], + ActionTriggers: record[20], + } + UpdateCdrStats(cs, triggers, tpCs) csvr.cdrStats[tag] = cs } return diff --git a/engine/loader_db.go b/engine/loader_db.go index 9ff0800a2..6aab02c5a 100644 --- a/engine/loader_db.go +++ b/engine/loader_db.go @@ -501,11 +501,12 @@ func (dbr *DbReader) LoadRatingProfileFiltered(qriedRpf *utils.TPRatingProfile) return nil } -func (dbr *DbReader) LoadSharedGroups() (err error) { - storSgs, err := dbr.storDb.GetTpSharedGroups(dbr.tpid, "") +func (dbr *DbReader) LoadSharedGroupByTag(tag string, save bool) error { + storSgs, err := dbr.storDb.GetTpSharedGroups(dbr.tpid, tag) if err != nil { return err } + var loadedTags []string for tag, tpSgs := range storSgs { sg, exists := dbr.sharedGroups[tag] if !exists { @@ -521,10 +522,22 @@ func (dbr *DbReader) LoadSharedGroups() (err error) { } } dbr.sharedGroups[tag] = sg + loadedTags = append(loadedTags, tag) + } + if save { + for _, tag := range loadedTags { + if err := dbr.accountDb.SetSharedGroup(dbr.sharedGroups[tag]); err != nil { + return err + } + } } return nil } +func (dbr *DbReader) LoadSharedGroups() error { + return dbr.LoadSharedGroupByTag("", false) +} + func (dbr *DbReader) LoadLCRs() (err error) { dbr.lcrs, err = dbr.storDb.GetTpLCRs(dbr.tpid, "") return err @@ -864,8 +877,46 @@ func (dbr *DbReader) LoadDerivedChargersFiltered(filter *utils.TPDerivedChargers return nil // Placeholder for now } -func (dbr *DbReader) LoadCdrStats() (err error) { - return nil // Placeholder for now +func (dbr *DbReader) LoadCdrStatsByTag(tag string, save bool) error { + storStats, err := dbr.storDb.GetTpCdrStats(dbr.tpid, tag) + if err != nil { + return err + } + if save && len(dbr.actionsTriggers) == 0 { + // load action triggers to check existence + dbr.LoadActionTriggers() + } + var loadedTags []string + for tag, tpStats := range storStats { + for _, tpStat := range tpStats { + var cs *CdrStats + var exists bool + if cs, exists = dbr.cdrStats[tag]; !exists { + cs = &CdrStats{Id: tag} + } + triggerTag := tpStat.ActionTriggers + triggers, exists := dbr.actionsTriggers[triggerTag] + if triggerTag != "" && !exists { + // only return error if there was something there for the tag + return fmt.Errorf("Could not get action triggers for cdr stats id %s: %s", cs.Id, triggerTag) + } + UpdateCdrStats(cs, triggers, tpStat) + dbr.cdrStats[tag] = cs + loadedTags = append(loadedTags, tag) + } + } + if save { + for _, tag := range loadedTags { + if err := dbr.dataDb.SetCdrStats(dbr.cdrStats[tag]); err != nil { + return err + } + } + } + return nil +} + +func (dbr *DbReader) LoadCdrStats() error { + return dbr.LoadCdrStatsByTag("", false) } // Automated loading diff --git a/engine/loader_helpers.go b/engine/loader_helpers.go index f4057adea..08c0f8257 100644 --- a/engine/loader_helpers.go +++ b/engine/loader_helpers.go @@ -96,27 +96,27 @@ func NewTiming(timingInfo ...string) (rt *utils.TPTiming) { return } -func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ...string) { - cs.Id = record[0] - if record[1] != "" { - if qi, err := strconv.Atoi(record[1]); err == nil { +func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, tpCs *utils.TPCdrStat) { + + if tpCs.QueueLength != "" { + if qi, err := strconv.Atoi(tpCs.QueueLength); err == nil { cs.QueueLength = qi } else { - log.Printf("Error parsing QueuedLength %v for cdrs stats %v", record[1], cs.Id) + log.Printf("Error parsing QueuedLength %v for cdrs stats %v", tpCs.QueueLength, cs.Id) } } - if record[2] != "" { - if d, err := time.ParseDuration(record[2]); err == nil { + if tpCs.TimeWindow != "" { + if d, err := time.ParseDuration(tpCs.TimeWindow); err == nil { cs.TimeWindow = d } else { - log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[2], cs.Id) + log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.TimeWindow, cs.Id) } } - if record[3] != "" { - cs.Metrics = append(cs.Metrics, record[3]) + if tpCs.Metrics != "" { + cs.Metrics = append(cs.Metrics, tpCs.Metrics) } - if record[4] != "" { - times := strings.Split(record[4], utils.INFIELD_SEP) + if tpCs.SetupInterval != "" { + times := strings.Split(tpCs.SetupInterval, utils.INFIELD_SEP) if len(times) > 0 { if sTime, err := utils.ParseTimeDetectLayout(times[0]); err == nil { if len(cs.SetupInterval) < 1 { @@ -125,7 +125,7 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ... cs.SetupInterval[0] = sTime } } else { - log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[4], cs.Id) + log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.SetupInterval, cs.Id) } } if len(times) > 1 { @@ -136,42 +136,42 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ... cs.SetupInterval[1] = eTime } } else { - log.Printf("Error parsing TimeWindow %v for cdrs stats %v", record[4], cs.Id) + log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.SetupInterval, cs.Id) } } } - if record[5] != "" { - cs.TOR = append(cs.TOR, record[5]) + if tpCs.TOR != "" { + cs.TOR = append(cs.TOR, tpCs.TOR) } - if record[6] != "" { - cs.CdrHost = append(cs.CdrHost, record[6]) + if tpCs.CdrHost != "" { + cs.CdrHost = append(cs.CdrHost, tpCs.CdrHost) } - if record[7] != "" { - cs.CdrSource = append(cs.CdrSource, record[7]) + if tpCs.CdrSource != "" { + cs.CdrSource = append(cs.CdrSource, tpCs.CdrSource) } - if record[8] != "" { - cs.ReqType = append(cs.ReqType, record[8]) + if tpCs.ReqType != "" { + cs.ReqType = append(cs.ReqType, tpCs.ReqType) } - if record[9] != "" { - cs.Direction = append(cs.Direction, record[9]) + if tpCs.Direction != "" { + cs.Direction = append(cs.Direction, tpCs.Direction) } - if record[10] != "" { - cs.Tenant = append(cs.Tenant, record[10]) + if tpCs.Tenant != "" { + cs.Tenant = append(cs.Tenant, tpCs.Tenant) } - if record[11] != "" { - cs.Category = append(cs.Category, record[11]) + if tpCs.Category != "" { + cs.Category = append(cs.Category, tpCs.Category) } - if record[12] != "" { - cs.Account = append(cs.Account, record[12]) + if tpCs.Account != "" { + cs.Account = append(cs.Account, tpCs.Account) } - if record[13] != "" { - cs.Subject = append(cs.Subject, record[13]) + if tpCs.Subject != "" { + cs.Subject = append(cs.Subject, tpCs.Subject) } - if record[14] != "" { - cs.DestinationPrefix = append(cs.DestinationPrefix, record[14]) + if tpCs.DestinationPrefix != "" { + cs.DestinationPrefix = append(cs.DestinationPrefix, tpCs.DestinationPrefix) } - if record[15] != "" { - durations := strings.Split(record[15], utils.INFIELD_SEP) + if tpCs.UsageInterval != "" { + durations := strings.Split(tpCs.UsageInterval, utils.INFIELD_SEP) if len(durations) > 0 { if sDuration, err := time.ParseDuration(durations[0]); err == nil { if len(cs.UsageInterval) < 1 { @@ -180,7 +180,7 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ... cs.UsageInterval[0] = sDuration } } else { - log.Printf("Error parsing UsageInterval %v for cdrs stats %v", record[15], cs.Id) + log.Printf("Error parsing UsageInterval %v for cdrs stats %v", tpCs.UsageInterval, cs.Id) } } if len(durations) > 1 { @@ -191,21 +191,21 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ... cs.UsageInterval[1] = eDuration } } else { - log.Printf("Error parsing UsageInterval %v for cdrs stats %v", record[15], cs.Id) + log.Printf("Error parsing UsageInterval %v for cdrs stats %v", tpCs.UsageInterval, cs.Id) } } } - if record[16] != "" { - cs.MediationRunIds = append(cs.MediationRunIds, record[16]) + if tpCs.MediationRunIds != "" { + cs.MediationRunIds = append(cs.MediationRunIds, tpCs.MediationRunIds) } - if record[17] != "" { - cs.RatedAccount = append(cs.RatedAccount, record[17]) + if tpCs.RatedAccount != "" { + cs.RatedAccount = append(cs.RatedAccount, tpCs.RatedAccount) } - if record[18] != "" { - cs.RatedSubject = append(cs.RatedSubject, record[18]) + if tpCs.RatedSubject != "" { + cs.RatedSubject = append(cs.RatedSubject, tpCs.RatedSubject) } - if record[19] != "" { - costs := strings.Split(record[19], utils.INFIELD_SEP) + if tpCs.CostInterval != "" { + costs := strings.Split(tpCs.CostInterval, utils.INFIELD_SEP) if len(costs) > 0 { if sCost, err := strconv.ParseFloat(costs[0], 64); err == nil { if len(cs.CostInterval) < 1 { @@ -214,7 +214,7 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ... cs.CostInterval[0] = sCost } } else { - log.Printf("Error parsing CostInterval %v for cdrs stats %v", record[19], cs.Id) + log.Printf("Error parsing CostInterval %v for cdrs stats %v", tpCs.CostInterval, cs.Id) } } if len(costs) > 1 { @@ -225,7 +225,7 @@ func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, record ... cs.CostInterval[1] = eCost } } else { - log.Printf("Error parsing CostInterval %v for cdrs stats %v", record[19], cs.Id) + log.Printf("Error parsing CostInterval %v for cdrs stats %v", tpCs.CostInterval, cs.Id) } } } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index dc23ac00f..8a20310c5 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -336,10 +336,11 @@ func (self *SQLStorage) SetTPCdrStats(tpid string, css map[string][]*utils.TPCdr for csId, cStats := range css { tx.Where("tpid = ?", tpid).Where("id = ?", csId).Delete(TpCdrStat{}) for _, cs := range cStats { + ql, _ := strconv.Atoi(cs.QueueLength) tx.Save(TpCdrStat{ Tpid: tpid, Id: csId, - QueueLength: cs.QueueLength, + QueueLength: ql, TimeWindow: cs.TimeWindow, Metrics: cs.Metrics, SetupInterval: cs.SetupInterval, @@ -1317,7 +1318,7 @@ func (self *SQLStorage) GetTpSharedGroups(tpid, tag string) (map[string][]*utils } for _, tpSg := range tpCdrStats { - sgs[tag] = append(sgs[tpSg.Id], &utils.TPSharedGroup{ + sgs[tpSg.Id] = append(sgs[tpSg.Id], &utils.TPSharedGroup{ Account: tpSg.Account, Strategy: tpSg.Strategy, RatingSubject: tpSg.RatingSubject, @@ -1339,8 +1340,8 @@ func (self *SQLStorage) GetTpCdrStats(tpid, tag string) (map[string][]*utils.TPC } for _, tpCs := range tpCdrStats { - css[tag] = append(css[tpCs.Id], &utils.TPCdrStat{ - QueueLength: tpCs.QueueLength, + css[tpCs.Id] = append(css[tpCs.Id], &utils.TPCdrStat{ + QueueLength: strconv.Itoa(tpCs.QueueLength), TimeWindow: tpCs.TimeWindow, Metrics: tpCs.Metrics, SetupInterval: tpCs.SetupInterval, diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index dc39ef787..8323aaea9 100644 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -680,8 +680,7 @@ func (self *TPCSVImporter) importCdrStats(fn string) error { if len(record[1]) == 0 { record[1] = "0" // Empty value will be translated to 0 as QueueLength } - ql, err := strconv.Atoi(record[1]) - if err != nil { + if _, err = strconv.Atoi(record[1]); err != nil { log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error()) continue } @@ -689,7 +688,7 @@ func (self *TPCSVImporter) importCdrStats(fn string) error { css[record[0]] = make([]*utils.TPCdrStat, 0) } css[record[0]] = append(css[record[0]], &utils.TPCdrStat{ - QueueLength: ql, + QueueLength: record[1], TimeWindow: ValueOrDefault(record[2], "0"), Metrics: record[3], SetupInterval: record[4], diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 5dc8f2549..d58f80324 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -298,7 +298,7 @@ type TPCdrStats struct { } type TPCdrStat struct { - QueueLength int + QueueLength string TimeWindow string Metrics string SetupInterval string