From ccd91e36ae1475ac14281fe6eab7ded506e1bf0a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 25 May 2015 14:40:41 +0300 Subject: [PATCH] started loader refactoring --- engine/loader.go | 390 ++++++++++++++++++++++++++++++++++++ engine/loader_csv.go | 14 +- engine/loader_db.go | 17 +- engine/loader_helpers.go | 369 ---------------------------------- engine/models.go | 5 + engine/storage_interface.go | 2 +- 6 files changed, 413 insertions(+), 384 deletions(-) create mode 100644 engine/loader.go diff --git a/engine/loader.go b/engine/loader.go new file mode 100644 index 000000000..1cbef7804 --- /dev/null +++ b/engine/loader.go @@ -0,0 +1,390 @@ +package engine + +import ( + "errors" + "log" + + "github.com/cgrates/cgrates/utils" +) + +type TPData struct { + actions map[string][]*Action + actionsTimings map[string][]*ActionTiming + actionsTriggers map[string][]*ActionTrigger + accountActions map[string]*Account + dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed + dirtyAccAliases []*TenantAccount // used to clean aliases that might have changed + destinations map[string]*Destination + rpAliases map[string]string + accAliases map[string]string + timings map[string]*utils.TPTiming + rates map[string]*utils.TPRate + destinationRates map[string]*utils.TPDestinationRate + ratingPlans map[string]*RatingPlan + ratingProfiles map[string]*RatingProfile + sharedGroups map[string]*SharedGroup + lcrs map[string]*LCR + derivedChargers map[string]utils.DerivedChargers + cdrStats map[string]*CdrStats +} + +func NewTPData() *TPData { + tp := &TPData{} + tp.actions = make(map[string][]*Action) + tp.actionsTimings = make(map[string][]*ActionTiming) + tp.actionsTriggers = make(map[string][]*ActionTrigger) + tp.rates = make(map[string]*utils.TPRate) + tp.destinations = make(map[string]*Destination) + tp.destinationRates = make(map[string]*utils.TPDestinationRate) + tp.timings = make(map[string]*utils.TPTiming) + tp.ratingPlans = make(map[string]*RatingPlan) + tp.ratingProfiles = make(map[string]*RatingProfile) + tp.sharedGroups = make(map[string]*SharedGroup) + tp.lcrs = make(map[string]*LCR) + tp.rpAliases = make(map[string]string) + tp.accAliases = make(map[string]string) + tp.timings = make(map[string]*utils.TPTiming) + tp.accountActions = make(map[string]*Account) + tp.destinations = make(map[string]*Destination) + tp.cdrStats = make(map[string]*CdrStats) + tp.derivedChargers = make(map[string]utils.DerivedChargers) + return tp +} + +func (tp *TPData) LoadDestinations(tpDests []*TpDestination) error { + for _, tpDest := range tpDests { + var dest *Destination + var found bool + if dest, found = tp.destinations[tpDest.Tag]; !found { + dest = &Destination{Id: tpDest.Tag} + tp.destinations[tpDest.Tag] = dest + } + dest.AddPrefix(tpDest.Prefix) + } + return nil +} + +func (tp *TPData) IsValid() bool { + valid := true + for rplTag, rpl := range tp.ratingPlans { + if !rpl.isContinous() { + log.Printf("The rating plan %s is not covering all weekdays", rplTag) + valid = false + } + if !rpl.areRatesSane() { + log.Printf("The rating plan %s contains invalid rate groups", rplTag) + valid = false + } + if !rpl.areTimingsSane() { + log.Printf("The rating plan %s contains invalid timings", rplTag) + valid = false + } + } + return valid +} + +func (tp *TPData) WriteToDatabase(dataStorage RatingStorage, accountingStorage AccountingStorage, flush, verbose bool) (err error) { + if dataStorage == nil { + return errors.New("No database connection!") + } + if flush { + dataStorage.Flush("") + } + if verbose { + log.Print("Destinations:") + } + for _, d := range tp.destinations { + err = dataStorage.SetDestination(d) + if err != nil { + return err + } + if verbose { + log.Print("\t", d.Id, " : ", d.Prefixes) + } + } + if verbose { + log.Print("Rating Plans:") + } + for _, rp := range tp.ratingPlans { + err = dataStorage.SetRatingPlan(rp) + if err != nil { + return err + } + if verbose { + log.Print("\t", rp.Id) + } + } + if verbose { + log.Print("Rating Profiles:") + } + for _, rp := range tp.ratingProfiles { + err = dataStorage.SetRatingProfile(rp) + if err != nil { + return err + } + if verbose { + log.Print("\t", rp.Id) + } + } + if verbose { + log.Print("Action Plans:") + } + for k, ats := range tp.actionsTimings { + err = accountingStorage.SetActionTimings(k, ats) + if err != nil { + return err + } + if verbose { + log.Println("\t", k) + } + } + if verbose { + log.Print("Shared Groups:") + } + for k, sg := range tp.sharedGroups { + err = accountingStorage.SetSharedGroup(sg) + if err != nil { + return err + } + if verbose { + log.Println("\t", k) + } + } + if verbose { + log.Print("LCR Rules:") + } + for k, lcr := range tp.lcrs { + err = dataStorage.SetLCR(lcr) + if err != nil { + return err + } + if verbose { + log.Println("\t", k) + } + } + if verbose { + log.Print("Actions:") + } + for k, as := range tp.actions { + err = accountingStorage.SetActions(k, as) + if err != nil { + return err + } + if verbose { + log.Println("\t", k) + } + } + if verbose { + log.Print("Account Actions:") + } + for _, ub := range tp.accountActions { + err = accountingStorage.SetAccount(ub) + if err != nil { + return err + } + if verbose { + log.Println("\t", ub.Id) + } + } + if verbose { + log.Print("Rating Profile Aliases:") + } + if err := dataStorage.RemoveRpAliases(tp.dirtyRpAliases); err != nil { + return err + } + for key, alias := range tp.rpAliases { + err = dataStorage.SetRpAlias(key, alias) + if err != nil { + return err + } + if verbose { + log.Print("\t", key) + } + } + if verbose { + log.Print("Account Aliases:") + } + if err := accountingStorage.RemoveAccAliases(tp.dirtyAccAliases); err != nil { + return err + } + for key, alias := range tp.accAliases { + err = accountingStorage.SetAccAlias(key, alias) + if err != nil { + return err + } + if verbose { + log.Print("\t", key) + } + } + if verbose { + log.Print("Derived Chargers:") + } + for key, dcs := range tp.derivedChargers { + err = accountingStorage.SetDerivedChargers(key, dcs) + if err != nil { + return err + } + if verbose { + log.Print("\t", key) + } + } + if verbose { + log.Print("CDR Stats Queues:") + } + for _, sq := range tp.cdrStats { + err = dataStorage.SetCdrStats(sq) + if err != nil { + return err + } + if verbose { + log.Print("\t", sq.Id) + } + } + return +} + +func (tp *TPData) ShowStatistics() { + // destinations + destCount := len(tp.destinations) + log.Print("Destinations: ", destCount) + prefixDist := make(map[int]int, 50) + prefixCount := 0 + for _, d := range tp.destinations { + prefixDist[len(d.Prefixes)] += 1 + prefixCount += len(d.Prefixes) + } + log.Print("Avg Prefixes: ", prefixCount/destCount) + log.Print("Prefixes distribution:") + for k, v := range prefixDist { + log.Printf("%d: %d", k, v) + } + // rating plans + rplCount := len(tp.ratingPlans) + log.Print("Rating plans: ", rplCount) + destRatesDist := make(map[int]int, 50) + destRatesCount := 0 + for _, rpl := range tp.ratingPlans { + destRatesDist[len(rpl.DestinationRates)] += 1 + destRatesCount += len(rpl.DestinationRates) + } + log.Print("Avg Destination Rates: ", destRatesCount/rplCount) + log.Print("Destination Rates distribution:") + for k, v := range destRatesDist { + log.Printf("%d: %d", k, v) + } + // rating profiles + rpfCount := len(tp.ratingProfiles) + log.Print("Rating profiles: ", rpfCount) + activDist := make(map[int]int, 50) + activCount := 0 + for _, rpf := range tp.ratingProfiles { + activDist[len(rpf.RatingPlanActivations)] += 1 + activCount += len(rpf.RatingPlanActivations) + } + log.Print("Avg Activations: ", activCount/rpfCount) + log.Print("Activation distribution:") + for k, v := range activDist { + log.Printf("%d: %d", k, v) + } + // actions + log.Print("Actions: ", len(tp.actions)) + // action plans + log.Print("Action plans: ", len(tp.actionsTimings)) + // action trigers + log.Print("Action trigers: ", len(tp.actionsTriggers)) + // account actions + log.Print("Account actions: ", len(tp.accountActions)) + // derivedChargers + log.Print("Derived Chargers: ", len(tp.derivedChargers)) + // lcr rules + log.Print("LCR rules: ", len(tp.lcrs)) + // cdr stats + log.Print("CDR stats: ", len(tp.cdrStats)) +} + +// Returns the identities loaded for a specific category, useful for cache reloads +func (tp *TPData) GetLoadedIds(categ string) ([]string, error) { + switch categ { + case DESTINATION_PREFIX: + keys := make([]string, len(tp.destinations)) + i := 0 + for k := range tp.destinations { + keys[i] = k + i++ + } + return keys, nil + case RATING_PLAN_PREFIX: + keys := make([]string, len(tp.ratingPlans)) + i := 0 + for k := range tp.ratingPlans { + keys[i] = k + i++ + } + return keys, nil + case RATING_PROFILE_PREFIX: + keys := make([]string, len(tp.ratingProfiles)) + i := 0 + for k := range tp.ratingProfiles { + keys[i] = k + i++ + } + return keys, nil + case ACTION_PREFIX: // actionsTimings + keys := make([]string, len(tp.actions)) + i := 0 + for k := range tp.actions { + keys[i] = k + i++ + } + return keys, nil + case ACTION_TIMING_PREFIX: // actionsTimings + keys := make([]string, len(tp.actionsTimings)) + i := 0 + for k := range tp.actionsTimings { + keys[i] = k + i++ + } + return keys, nil + case RP_ALIAS_PREFIX: // aliases + keys := make([]string, len(tp.rpAliases)) + i := 0 + for k := range tp.rpAliases { + keys[i] = k + i++ + } + return keys, nil + case ACC_ALIAS_PREFIX: // aliases + keys := make([]string, len(tp.accAliases)) + i := 0 + for k := range tp.accAliases { + keys[i] = k + i++ + } + return keys, nil + case DERIVEDCHARGERS_PREFIX: // derived chargers + keys := make([]string, len(tp.derivedChargers)) + i := 0 + for k := range tp.derivedChargers { + keys[i] = k + i++ + } + return keys, nil + case CDR_STATS_PREFIX: // cdr stats + keys := make([]string, len(tp.cdrStats)) + i := 0 + for k := range tp.cdrStats { + keys[i] = k + i++ + } + return keys, nil + case SHARED_GROUP_PREFIX: + keys := make([]string, len(tp.sharedGroups)) + i := 0 + for k := range tp.sharedGroups { + keys[i] = k + i++ + } + return keys, nil + } + return nil, errors.New("Unsupported category") +} diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 2562296ce..9cf31ce09 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -110,16 +110,14 @@ func (csvr *CSVReader) LoadDestinations() (err error) { if fp != nil { defer fp.Close() } + var tpDests []*TpDestination for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() { - tag := record[0] - var dest *Destination - var found bool - if dest, found = csvr.tp.destinations[tag]; !found { - dest = &Destination{Id: tag} - csvr.tp.destinations[tag] = dest - } - dest.AddPrefix(record[1]) + tpDest := &TpDestination{} + tpDest.Load(record) + tpDests = append(tpDests, tpDest) + //log.Printf("%+v\n", tpDest) } + csvr.tp.LoadDestinations(tpDests) return } diff --git a/engine/loader_db.go b/engine/loader_db.go index a90d2379d..3860664ef 100644 --- a/engine/loader_db.go +++ b/engine/loader_db.go @@ -60,16 +60,21 @@ func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) { } func (dbr *DbReader) LoadDestinations() (err error) { - dbr.tp.destinations, err = dbr.storDb.GetTpDestinations(dbr.tpid, "") - return + tpDests, err := dbr.storDb.GetTpDestinations(dbr.tpid, "") + if err == nil { + return + } + return csvr.tp.LoadDestinations(tpDests) } func (dbr *DbReader) LoadDestinationByTag(tag string) (bool, error) { - destinations, err := dbr.storDb.GetTpDestinations(dbr.tpid, tag) - for _, destination := range destinations { - dbr.dataDb.SetDestination(destination) + tpDests, err := dbr.storDb.GetTpDestinations(dbr.tpid, tag) + dest := &Destination{Id: tag} + for _, tpDest := range tpDests { + dest.AddPrefix(tpDest.Prefix) } - return len(destinations) > 0, err + dbr.dataDb.SetDestination(dest) + return len(tpDests) > 0, err } func (dbr *DbReader) LoadTimings() (err error) { diff --git a/engine/loader_helpers.go b/engine/loader_helpers.go index f31d5ef41..52d9eefe5 100644 --- a/engine/loader_helpers.go +++ b/engine/loader_helpers.go @@ -122,375 +122,6 @@ type TPLoader interface { WriteToDatabase(bool, bool) error } -type TPData struct { - actions map[string][]*Action - actionsTimings map[string][]*ActionTiming - actionsTriggers map[string][]*ActionTrigger - accountActions map[string]*Account - dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed - dirtyAccAliases []*TenantAccount // used to clean aliases that might have changed - destinations map[string]*Destination - rpAliases map[string]string - accAliases map[string]string - timings map[string]*utils.TPTiming - rates map[string]*utils.TPRate - destinationRates map[string]*utils.TPDestinationRate - ratingPlans map[string]*RatingPlan - ratingProfiles map[string]*RatingProfile - sharedGroups map[string]*SharedGroup - lcrs map[string]*LCR - derivedChargers map[string]utils.DerivedChargers - cdrStats map[string]*CdrStats -} - -func NewTPData() *TPData { - tp := &TPData{} - tp.actions = make(map[string][]*Action) - tp.actionsTimings = make(map[string][]*ActionTiming) - tp.actionsTriggers = make(map[string][]*ActionTrigger) - tp.rates = make(map[string]*utils.TPRate) - tp.destinations = make(map[string]*Destination) - tp.destinationRates = make(map[string]*utils.TPDestinationRate) - tp.timings = make(map[string]*utils.TPTiming) - tp.ratingPlans = make(map[string]*RatingPlan) - tp.ratingProfiles = make(map[string]*RatingProfile) - tp.sharedGroups = make(map[string]*SharedGroup) - tp.lcrs = make(map[string]*LCR) - tp.rpAliases = make(map[string]string) - tp.accAliases = make(map[string]string) - tp.timings = make(map[string]*utils.TPTiming) - tp.accountActions = make(map[string]*Account) - tp.destinations = make(map[string]*Destination) - tp.cdrStats = make(map[string]*CdrStats) - tp.derivedChargers = make(map[string]utils.DerivedChargers) - return tp -} - -func (tp *TPData) IsValid() bool { - valid := true - for rplTag, rpl := range tp.ratingPlans { - if !rpl.isContinous() { - log.Printf("The rating plan %s is not covering all weekdays", rplTag) - valid = false - } - if !rpl.areRatesSane() { - log.Printf("The rating plan %s contains invalid rate groups", rplTag) - valid = false - } - if !rpl.areTimingsSane() { - log.Printf("The rating plan %s contains invalid timings", rplTag) - valid = false - } - } - return valid -} - -func (tp *TPData) WriteToDatabase(dataStorage RatingStorage, accountingStorage AccountingStorage, flush, verbose bool) (err error) { - if dataStorage == nil { - return errors.New("No database connection!") - } - if flush { - dataStorage.Flush("") - } - if verbose { - log.Print("Destinations:") - } - for _, d := range tp.destinations { - err = dataStorage.SetDestination(d) - if err != nil { - return err - } - if verbose { - log.Print("\t", d.Id, " : ", d.Prefixes) - } - } - if verbose { - log.Print("Rating Plans:") - } - for _, rp := range tp.ratingPlans { - err = dataStorage.SetRatingPlan(rp) - if err != nil { - return err - } - if verbose { - log.Print("\t", rp.Id) - } - } - if verbose { - log.Print("Rating Profiles:") - } - for _, rp := range tp.ratingProfiles { - err = dataStorage.SetRatingProfile(rp) - if err != nil { - return err - } - if verbose { - log.Print("\t", rp.Id) - } - } - if verbose { - log.Print("Action Plans:") - } - for k, ats := range tp.actionsTimings { - err = accountingStorage.SetActionTimings(k, ats) - if err != nil { - return err - } - if verbose { - log.Println("\t", k) - } - } - if verbose { - log.Print("Shared Groups:") - } - for k, sg := range tp.sharedGroups { - err = accountingStorage.SetSharedGroup(sg) - if err != nil { - return err - } - if verbose { - log.Println("\t", k) - } - } - if verbose { - log.Print("LCR Rules:") - } - for k, lcr := range tp.lcrs { - err = dataStorage.SetLCR(lcr) - if err != nil { - return err - } - if verbose { - log.Println("\t", k) - } - } - if verbose { - log.Print("Actions:") - } - for k, as := range tp.actions { - err = accountingStorage.SetActions(k, as) - if err != nil { - return err - } - if verbose { - log.Println("\t", k) - } - } - if verbose { - log.Print("Account Actions:") - } - for _, ub := range tp.accountActions { - err = accountingStorage.SetAccount(ub) - if err != nil { - return err - } - if verbose { - log.Println("\t", ub.Id) - } - } - if verbose { - log.Print("Rating Profile Aliases:") - } - if err := dataStorage.RemoveRpAliases(tp.dirtyRpAliases); err != nil { - return err - } - for key, alias := range tp.rpAliases { - err = dataStorage.SetRpAlias(key, alias) - if err != nil { - return err - } - if verbose { - log.Print("\t", key) - } - } - if verbose { - log.Print("Account Aliases:") - } - if err := accountingStorage.RemoveAccAliases(tp.dirtyAccAliases); err != nil { - return err - } - for key, alias := range tp.accAliases { - err = accountingStorage.SetAccAlias(key, alias) - if err != nil { - return err - } - if verbose { - log.Print("\t", key) - } - } - if verbose { - log.Print("Derived Chargers:") - } - for key, dcs := range tp.derivedChargers { - err = accountingStorage.SetDerivedChargers(key, dcs) - if err != nil { - return err - } - if verbose { - log.Print("\t", key) - } - } - if verbose { - log.Print("CDR Stats Queues:") - } - for _, sq := range tp.cdrStats { - err = dataStorage.SetCdrStats(sq) - if err != nil { - return err - } - if verbose { - log.Print("\t", sq.Id) - } - } - return -} - -func (tp *TPData) ShowStatistics() { - // destinations - destCount := len(tp.destinations) - log.Print("Destinations: ", destCount) - prefixDist := make(map[int]int, 50) - prefixCount := 0 - for _, d := range tp.destinations { - prefixDist[len(d.Prefixes)] += 1 - prefixCount += len(d.Prefixes) - } - log.Print("Avg Prefixes: ", prefixCount/destCount) - log.Print("Prefixes distribution:") - for k, v := range prefixDist { - log.Printf("%d: %d", k, v) - } - // rating plans - rplCount := len(tp.ratingPlans) - log.Print("Rating plans: ", rplCount) - destRatesDist := make(map[int]int, 50) - destRatesCount := 0 - for _, rpl := range tp.ratingPlans { - destRatesDist[len(rpl.DestinationRates)] += 1 - destRatesCount += len(rpl.DestinationRates) - } - log.Print("Avg Destination Rates: ", destRatesCount/rplCount) - log.Print("Destination Rates distribution:") - for k, v := range destRatesDist { - log.Printf("%d: %d", k, v) - } - // rating profiles - rpfCount := len(tp.ratingProfiles) - log.Print("Rating profiles: ", rpfCount) - activDist := make(map[int]int, 50) - activCount := 0 - for _, rpf := range tp.ratingProfiles { - activDist[len(rpf.RatingPlanActivations)] += 1 - activCount += len(rpf.RatingPlanActivations) - } - log.Print("Avg Activations: ", activCount/rpfCount) - log.Print("Activation distribution:") - for k, v := range activDist { - log.Printf("%d: %d", k, v) - } - // actions - log.Print("Actions: ", len(tp.actions)) - // action plans - log.Print("Action plans: ", len(tp.actionsTimings)) - // action trigers - log.Print("Action trigers: ", len(tp.actionsTriggers)) - // account actions - log.Print("Account actions: ", len(tp.accountActions)) - // derivedChargers - log.Print("Derived Chargers: ", len(tp.derivedChargers)) - // lcr rules - log.Print("LCR rules: ", len(tp.lcrs)) - // cdr stats - log.Print("CDR stats: ", len(tp.cdrStats)) -} - -// Returns the identities loaded for a specific category, useful for cache reloads -func (tp *TPData) GetLoadedIds(categ string) ([]string, error) { - switch categ { - case DESTINATION_PREFIX: - keys := make([]string, len(tp.destinations)) - i := 0 - for k := range tp.destinations { - keys[i] = k - i++ - } - return keys, nil - case RATING_PLAN_PREFIX: - keys := make([]string, len(tp.ratingPlans)) - i := 0 - for k := range tp.ratingPlans { - keys[i] = k - i++ - } - return keys, nil - case RATING_PROFILE_PREFIX: - keys := make([]string, len(tp.ratingProfiles)) - i := 0 - for k := range tp.ratingProfiles { - keys[i] = k - i++ - } - return keys, nil - case ACTION_PREFIX: // actionsTimings - keys := make([]string, len(tp.actions)) - i := 0 - for k := range tp.actions { - keys[i] = k - i++ - } - return keys, nil - case ACTION_TIMING_PREFIX: // actionsTimings - keys := make([]string, len(tp.actionsTimings)) - i := 0 - for k := range tp.actionsTimings { - keys[i] = k - i++ - } - return keys, nil - case RP_ALIAS_PREFIX: // aliases - keys := make([]string, len(tp.rpAliases)) - i := 0 - for k := range tp.rpAliases { - keys[i] = k - i++ - } - return keys, nil - case ACC_ALIAS_PREFIX: // aliases - keys := make([]string, len(tp.accAliases)) - i := 0 - for k := range tp.accAliases { - keys[i] = k - i++ - } - return keys, nil - case DERIVEDCHARGERS_PREFIX: // derived chargers - keys := make([]string, len(tp.derivedChargers)) - i := 0 - for k := range tp.derivedChargers { - keys[i] = k - i++ - } - return keys, nil - case CDR_STATS_PREFIX: // cdr stats - keys := make([]string, len(tp.cdrStats)) - i := 0 - for k := range tp.cdrStats { - keys[i] = k - i++ - } - return keys, nil - case SHARED_GROUP_PREFIX: - keys := make([]string, len(tp.sharedGroups)) - i := 0 - for k := range tp.sharedGroups { - keys[i] = k - i++ - } - return keys, nil - } - return nil, errors.New("Unsupported category") -} - func NewLoadRate(tag, connectFee, price, ratedUnits, rateIncrements, groupInterval string) (r *utils.TPRate, err error) { cf, err := strconv.ParseFloat(connectFee, 64) if err != nil { diff --git a/engine/models.go b/engine/models.go index 141173559..299e529ef 100644 --- a/engine/models.go +++ b/engine/models.go @@ -49,6 +49,11 @@ type TpDestination struct { CreatedAt time.Time } +func (d *TpDestination) Load(row []string) { + d.Tag = row[0] + d.Prefix = row[1] +} + type TpRate struct { Id int64 Tpid string diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 820e8c8d7..2a49d6acf 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -139,7 +139,7 @@ type LoadStorage interface { GetTpTimings(string, string) (map[string]*utils.ApierTPTiming, error) SetTPDestination(string, *Destination) error - GetTpDestinations(string, string) (map[string]*Destination, error) + GetTpDestinations(string, string) ([]*TpDestination, error) SetTPRates(string, map[string][]*utils.RateSlot) error GetTpRates(string, string) (map[string]*utils.TPRate, error)