diff --git a/engine/loader_local_test.go b/engine/loader_local_test.go index 9997bbb2b..e67989878 100644 --- a/engine/loader_local_test.go +++ b/engine/loader_local_test.go @@ -20,6 +20,7 @@ package engine import ( "flag" + "fmt" "path" "testing" @@ -186,6 +187,7 @@ func TestImportToStorDb(t *testing.T) { if tpids, err := storDb.GetTPIds(); err != nil { t.Error("Error when querying storDb for imported data: ", err) } else if len(tpids) != 1 || tpids[0] != TEST_SQL { + fmt.Printf("len(tpids): %d, tpids[0]=%s, tpids: %+v\n", len(tpids), tpids[0], tpids) t.Errorf("Data in storDb is different than expected %v", tpids) } } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index b3026bdd4..43700850c 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -92,7 +92,7 @@ func (self *SQLStorage) GetTPIds() ([]string, error) { return nil, err } defer rows.Close() - ids := []string{} + ids := make([]string, 0) i := 0 for rows.Next() { i++ //Keep here a reference so we know we got at least one @@ -212,7 +212,6 @@ func (self *SQLStorage) SetTPRates(tpid string, rts map[string][]*utils.RateSlot if len(rts) == 0 { return nil //Nothing to set } - tx := self.db.Begin() for rtId, rSlots := range rts { tx.Where("tpid = ?", tpid).Where("id = ?", rtId).Delete(TpRate{}) @@ -259,7 +258,6 @@ func (self *SQLStorage) SetTPRatingPlans(tpid string, drts map[string][]*utils.T if len(drts) == 0 { return nil //Nothing to set } - tx := self.db.Begin() for rpId, rPlans := range drts { tx.Where("tpid = ?", tpid).Where("id = ?", rpId).Delete(TpRatingPlan{}) @@ -541,7 +539,7 @@ func (self *SQLStorage) GetTPActionTimings(tpid, tag string) (map[string][]*util } for _, tpAp := range tpActionPlans { - ats[tag] = append(ats[tag], &utils.TPActionTiming{ActionsId: tpAp.ActionsId, TimingId: tpAp.TimingId, Weight: tpAp.Weight}) + ats[tpAp.Id] = append(ats[tpAp.Id], &utils.TPActionTiming{ActionsId: tpAp.ActionsId, TimingId: tpAp.TimingId, Weight: tpAp.Weight}) } return ats, nil } @@ -1131,9 +1129,9 @@ func (self *SQLStorage) GetTpDestinations(tpid, tag string) (map[string]*Destina for _, tpDest := range tpDests { var dest *Destination var found bool - if dest, found = dests[tag]; !found { - dest = &Destination{Id: tag} - dests[tag] = dest + if dest, found = dests[tpDest.Id]; !found { + dest = &Destination{Id: tpDest.Id} + dests[tpDest.Id] = dest } dest.AddPrefix(tpDest.Prefix) } @@ -1158,19 +1156,19 @@ func (self *SQLStorage) GetTpRates(tpid, tag string) (map[string]*utils.TPRate, } r := &utils.TPRate{ TPid: tpid, - RateId: tag, + RateId: tr.Id, RateSlots: []*utils.RateSlot{rs}, } // same tag only to create rate groups - er, exists := rts[tag] + er, exists := rts[tr.Id] if exists { if err := ValidNextGroup(er.RateSlots[len(er.RateSlots)-1], r.RateSlots[0]); err != nil { return nil, err } er.RateSlots = append(er.RateSlots, r.RateSlots[0]) } else { - rts[tag] = r + rts[tr.Id] = r } } return rts, nil @@ -1190,7 +1188,7 @@ func (self *SQLStorage) GetTpDestinationRates(tpid, tag string) (map[string]*uti for _, tpDr := range tpDestinationRates { dr := &utils.TPDestinationRate{ TPid: tpid, - DestinationRateId: tag, + DestinationRateId: tpDr.Id, DestinationRates: []*utils.DestinationRate{ &utils.DestinationRate{ DestinationId: tpDr.DestinationsId, @@ -1200,13 +1198,13 @@ func (self *SQLStorage) GetTpDestinationRates(tpid, tag string) (map[string]*uti }, }, } - existingDR, exists := rts[tag] + existingDR, exists := rts[tpDr.Id] if exists { existingDR.DestinationRates = append(existingDR.DestinationRates, dr.DestinationRates[0]) } else { existingDR = dr } - rts[tag] = existingDR + rts[tpDr.Id] = existingDR } return rts, nil @@ -1224,7 +1222,7 @@ func (self *SQLStorage) GetTpTimings(tpid, tag string) (map[string]*utils.TPTimi } for _, tpTm := range tpTimings { - tms[tag] = NewTiming(tag, tpTm.Years, tpTm.Months, tpTm.MonthDays, tpTm.WeekDays, tpTm.Time) + tms[tpTm.Id] = NewTiming(tpTm.Id, tpTm.Years, tpTm.Months, tpTm.MonthDays, tpTm.WeekDays, tpTm.Time) } return tms, nil @@ -1248,10 +1246,10 @@ func (self *SQLStorage) GetTpRatingPlans(tpid, tag string) (map[string][]*utils. TimingId: tpRp.TimingId, Weight: tpRp.Weight, } - if _, exists := rpbns[tag]; exists { - rpbns[tag] = append(rpbns[tag], rpb) + if _, exists := rpbns[tpRp.Id]; exists { + rpbns[tpRp.Id] = append(rpbns[tpRp.Id], rpb) } else { // New - rpbns[tag] = []*utils.TPRatingPlanBinding{rpb} + rpbns[tpRp.Id] = []*utils.TPRatingPlanBinding{rpb} } } return rpbns, nil @@ -1319,7 +1317,7 @@ func (self *SQLStorage) GetTpSharedGroups(tpid, tag string) (map[string][]*utils } for _, tpSg := range tpCdrStats { - sgs[tag] = append(sgs[tag], &utils.TPSharedGroup{ + sgs[tag] = append(sgs[tpSg.Id], &utils.TPSharedGroup{ Account: tpSg.Account, Strategy: tpSg.Strategy, RatingSubject: tpSg.RatingSubject, @@ -1341,7 +1339,7 @@ func (self *SQLStorage) GetTpCdrStats(tpid, tag string) (map[string][]*utils.TPC } for _, tpCs := range tpCdrStats { - css[tag] = append(css[tag], &utils.TPCdrStat{ + css[tag] = append(css[tpCs.Id], &utils.TPCdrStat{ QueueLength: tpCs.QueueLength, TimeWindow: tpCs.TimeWindow, Metrics: tpCs.Metrics, @@ -1493,7 +1491,7 @@ func (self *SQLStorage) GetTpActions(tpid, tag string) (map[string][]*utils.TPAc ExtraParameters: tpAc.ExtraParameters, Weight: tpAc.Weight, } - as[tag] = append(as[tag], a) + as[tpAc.Id] = append(as[tpAc.Id], a) } return as, nil } @@ -1528,7 +1526,7 @@ func (self *SQLStorage) GetTpActionTriggers(tpid, tag string) (map[string][]*uti ActionsId: tpAt.ActionsId, MinQueuedItems: tpAt.MinQueuedItems, } - ats[tag] = append(ats[tag], at) + ats[tpAt.Id] = append(ats[tpAt.Id], at) } return ats, nil } diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index e24a8fe44..12c2eb06e 100644 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -106,6 +106,7 @@ func (self *TPCSVImporter) importDestinations(fn string) error { return err } lineNr := 0 + dests := make(map[string]*Destination) // Key:destId, value: listOfPrefixes for { lineNr++ record, err := fParser.ParseNextLine() @@ -116,8 +117,15 @@ func (self *TPCSVImporter) importDestinations(fn string) error { log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) } continue + } else { + if dst, hasIt := dests[record[0]]; hasIt { + dst.Prefixes = append(dst.Prefixes, record[1]) + } else { + dests[record[0]] = &Destination{record[0], []string{record[1]}} + } } - dst := &Destination{record[0], []string{record[1]}} + } + for _, dst := range dests { if err := self.StorDb.SetTPDestination(self.TPid, dst); err != nil { if self.Verbose { log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) @@ -136,6 +144,7 @@ func (self *TPCSVImporter) importRates(fn string) error { return err } lineNr := 0 + rates := make(map[string][]*utils.RateSlot) for { lineNr++ record, err := fParser.ParseNextLine() @@ -147,14 +156,18 @@ func (self *TPCSVImporter) importRates(fn string) error { } continue } - rt, err := NewLoadRate(record[0], record[1], record[2], record[3], record[4], record[5]) + newRt, err := NewLoadRate(record[0], record[1], record[2], record[3], record[4], record[5]) if err != nil { return err } - if err := self.StorDb.SetTPRates(self.TPid, map[string][]*utils.RateSlot{record[0]: rt.RateSlots}); err != nil { - if self.Verbose { - log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) - } + if _, hasIt := rates[record[0]]; !hasIt { + rates[record[0]] = make([]*utils.RateSlot, 0) + } + rates[record[0]] = append(rates[record[0]], newRt.RateSlots...) + } + if err := self.StorDb.SetTPRates(self.TPid, rates); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } } return nil @@ -169,6 +182,7 @@ func (self *TPCSVImporter) importDestinationRates(fn string) error { return err } lineNr := 0 + drs := make(map[string][]*utils.DestinationRate) for { lineNr++ record, err := fParser.ParseNextLine() @@ -185,21 +199,23 @@ func (self *TPCSVImporter) importDestinationRates(fn string) error { log.Printf("Error parsing rounding decimals: %s", record[4]) return err } - drs := []*utils.DestinationRate{ - &utils.DestinationRate{ - DestinationId: record[1], - RateId: record[2], - RoundingMethod: record[3], - RoundingDecimals: roundingDecimals, - }, + if _, hasIt := drs[record[0]]; !hasIt { + drs[record[0]] = make([]*utils.DestinationRate, 0) } - if err := self.StorDb.SetTPDestinationRates(self.TPid, - map[string][]*utils.DestinationRate{record[0]: drs}); err != nil { - if self.Verbose { - log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) - } + drs[record[0]] = append(drs[record[0]], &utils.DestinationRate{ + DestinationId: record[1], + RateId: record[2], + RoundingMethod: record[3], + RoundingDecimals: roundingDecimals, + }) + } + + if err := self.StorDb.SetTPDestinationRates(self.TPid, drs); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } } + return nil } @@ -212,6 +228,7 @@ func (self *TPCSVImporter) importRatingPlans(fn string) error { return err } lineNr := 0 + rpls := make(map[string][]*utils.TPRatingPlanBinding) for { lineNr++ record, err := fParser.ParseNextLine() @@ -230,19 +247,21 @@ func (self *TPCSVImporter) importRatingPlans(fn string) error { } continue } - drt := []*utils.TPRatingPlanBinding{ - &utils.TPRatingPlanBinding{ - DestinationRatesId: record[1], - Weight: weight, - TimingId: record[2], - }, + if _, hasIt := rpls[record[0]]; !hasIt { + rpls[record[0]] = make([]*utils.TPRatingPlanBinding, 0) } - if err := self.StorDb.SetTPRatingPlans(self.TPid, map[string][]*utils.TPRatingPlanBinding{record[0]: drt}); err != nil { - if self.Verbose { - log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) - } + rpls[record[0]] = append(rpls[record[0]], &utils.TPRatingPlanBinding{ + DestinationRatesId: record[1], + Weight: weight, + TimingId: record[2], + }) + } + if err := self.StorDb.SetTPRatingPlans(self.TPid, rpls); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } } + return nil } @@ -255,6 +274,7 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error { return err } lineNr := 0 + rpfs := make(map[string]*utils.TPRatingProfile) for { lineNr++ record, err := fParser.ParseNextLine() @@ -278,7 +298,8 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error { if self.ImportId != "" { loadId += "_" + self.ImportId } - rp := &utils.TPRatingProfile{ + newRp := &utils.TPRatingProfile{ + TPid: self.TPid, LoadId: loadId, Tenant: tenant, Category: tor, @@ -287,12 +308,18 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error { RatingPlanActivations: []*utils.TPRatingActivation{ &utils.TPRatingActivation{ActivationTime: record[4], RatingPlanId: ratingPlanTag, FallbackSubjects: fallbacksubject}}, } - if err := self.StorDb.SetTPRatingProfiles(self.TPid, map[string]*utils.TPRatingProfile{rp.KeyId(): rp}); err != nil { - if self.Verbose { - log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) - } + if rp, hasIt := rpfs[newRp.KeyId()]; hasIt { + rp.RatingPlanActivations = append(rp.RatingPlanActivations, newRp.RatingPlanActivations...) + } else { + rpfs[newRp.KeyId()] = newRp } } + if err := self.StorDb.SetTPRatingProfiles(self.TPid, rpfs); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + return nil } @@ -309,6 +336,7 @@ func (self *TPCSVImporter) importActions(fn string) error { return err } lineNr := 0 + acts := make(map[string][]*utils.TPAction) for { lineNr++ record, err := fParser.ParseNextLine() @@ -320,7 +348,7 @@ func (self *TPCSVImporter) importActions(fn string) error { } continue } - actId, actionType, balanceType, direction, destTag, rateSubject, sharedGroup := record[0], record[1], record[2], record[3], record[6], record[7], record[8] + actId, actionType, balanceType, direction, destTag, rateSubject, category, sharedGroup := record[0], record[1], record[2], record[3], record[6], record[7], record[8], record[10] units, err := strconv.ParseFloat(record[4], 64) if err != nil && record[4] != "" { if self.Verbose { @@ -329,14 +357,17 @@ func (self *TPCSVImporter) importActions(fn string) error { continue } balanceWeight, _ := strconv.ParseFloat(record[9], 64) - weight, err := strconv.ParseFloat(record[11], 64) + weight, err := strconv.ParseFloat(record[12], 64) if err != nil { if self.Verbose { log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) } continue } - act := &utils.TPAction{ + if _, hasIt := acts[actId]; !hasIt { + acts[actId] = make([]*utils.TPAction, 0) + } + acts[actId] = append(acts[actId], &utils.TPAction{ Identifier: actionType, BalanceType: balanceType, Direction: direction, @@ -344,16 +375,18 @@ func (self *TPCSVImporter) importActions(fn string) error { ExpiryTime: record[5], DestinationId: destTag, RatingSubject: rateSubject, + Category: category, SharedGroup: sharedGroup, BalanceWeight: balanceWeight, - ExtraParameters: record[10], + ExtraParameters: record[11], Weight: weight, + }) + } + if err := self.StorDb.SetTPActions(self.TPid, acts); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } - if err := self.StorDb.SetTPActions(self.TPid, map[string][]*utils.TPAction{actId: []*utils.TPAction{act}}); err != nil { - if self.Verbose { - log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) - } - } + } return nil } @@ -367,6 +400,7 @@ func (self *TPCSVImporter) importActionTimings(fn string) error { return err } lineNr := 0 + aplns := make(map[string][]*utils.TPActionTiming) for { lineNr++ record, err := fParser.ParseNextLine() @@ -386,19 +420,21 @@ func (self *TPCSVImporter) importActionTimings(fn string) error { } continue } - at := []*utils.TPActionTiming{ - &utils.TPActionTiming{ - ActionsId: actionsTag, - TimingId: timingTag, - Weight: weight, - }, + if _, hasIt := aplns[tag]; !hasIt { + aplns[tag] = make([]*utils.TPActionTiming, 0) } - if err := self.StorDb.SetTPActionTimings(self.TPid, map[string][]*utils.TPActionTiming{tag: at}); err != nil { - if self.Verbose { - log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) - } + aplns[tag] = append(aplns[tag], &utils.TPActionTiming{ + ActionsId: actionsTag, + TimingId: timingTag, + Weight: weight, + }) + } + if err := self.StorDb.SetTPActionTimings(self.TPid, aplns); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } } + return nil } @@ -411,6 +447,7 @@ func (self *TPCSVImporter) importActionTriggers(fn string) error { return err } lineNr := 0 + atrs := make(map[string][]*utils.TPActionTrigger) for { lineNr++ record, err := fParser.ParseNextLine() @@ -422,7 +459,7 @@ func (self *TPCSVImporter) importActionTriggers(fn string) error { } continue } - tag, balanceType, direction, thresholdType, destinationTag, balanceExpirationDate, balanceRatingSubject, balanceSharedGroup, actionsTag := record[0], record[1], record[2], record[3], record[7], record[9], record[10], record[11], record[13] + tag, balanceType, direction, thresholdType, destinationTag, balanceExpirationDate, balanceRatingSubject, balanceCategory, balanceSharedGroup, actionsTag := record[0], record[1], record[2], record[3], record[7], record[9], record[10], record[10], record[12], record[14] threshold, err := strconv.ParseFloat(record[4], 64) if err != nil { if self.Verbose { @@ -447,19 +484,22 @@ func (self *TPCSVImporter) importActionTriggers(fn string) error { } continue } - minQueuedItems, err := strconv.Atoi(record[12]) + minQueuedItems, err := strconv.Atoi(record[13]) if err != nil && record[12] != "" { log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error()) continue } - weight, err := strconv.ParseFloat(record[14], 64) + weight, err := strconv.ParseFloat(record[15], 64) if err != nil { if self.Verbose { log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) } continue } - at := &utils.TPActionTrigger{ + if _, hasIt := atrs[tag]; !hasIt { + atrs[tag] = make([]*utils.TPActionTrigger, 0) + } + atrs[tag] = append(atrs[tag], &utils.TPActionTrigger{ BalanceType: balanceType, Direction: direction, ThresholdType: thresholdType, @@ -470,17 +510,19 @@ func (self *TPCSVImporter) importActionTriggers(fn string) error { BalanceWeight: balanceWeight, BalanceExpirationDate: balanceExpirationDate, BalanceRatingSubject: balanceRatingSubject, + BalanceCategory: balanceCategory, BalanceSharedGroup: balanceSharedGroup, MinQueuedItems: minQueuedItems, Weight: weight, ActionsId: actionsTag, - } - if err := self.StorDb.SetTPActionTriggers(self.TPid, map[string][]*utils.TPActionTrigger{tag: []*utils.TPActionTrigger{at}}); err != nil { - if self.Verbose { - log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) - } + }) + } + if err := self.StorDb.SetTPActionTriggers(self.TPid, atrs); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } } + return nil }