Modified TPImporter to cope with new storage_sql methods

This commit is contained in:
DanB
2014-09-02 11:00:36 +02:00
parent 0fbd76abea
commit 62f53af8c0
3 changed files with 124 additions and 82 deletions

View File

@@ -20,6 +20,7 @@ package engine
import (
"flag"
"fmt"
"path"
"testing"
@@ -186,6 +187,7 @@ func TestImportToStorDb(t *testing.T) {
if tpids, err := storDb.GetTPIds(); err != nil {
t.Error("Error when querying storDb for imported data: ", err)
} else if len(tpids) != 1 || tpids[0] != TEST_SQL {
fmt.Printf("len(tpids): %d, tpids[0]=%s, tpids: %+v\n", len(tpids), tpids[0], tpids)
t.Errorf("Data in storDb is different than expected %v", tpids)
}
}

View File

@@ -92,7 +92,7 @@ func (self *SQLStorage) GetTPIds() ([]string, error) {
return nil, err
}
defer rows.Close()
ids := []string{}
ids := make([]string, 0)
i := 0
for rows.Next() {
i++ //Keep here a reference so we know we got at least one
@@ -212,7 +212,6 @@ func (self *SQLStorage) SetTPRates(tpid string, rts map[string][]*utils.RateSlot
if len(rts) == 0 {
return nil //Nothing to set
}
tx := self.db.Begin()
for rtId, rSlots := range rts {
tx.Where("tpid = ?", tpid).Where("id = ?", rtId).Delete(TpRate{})
@@ -259,7 +258,6 @@ func (self *SQLStorage) SetTPRatingPlans(tpid string, drts map[string][]*utils.T
if len(drts) == 0 {
return nil //Nothing to set
}
tx := self.db.Begin()
for rpId, rPlans := range drts {
tx.Where("tpid = ?", tpid).Where("id = ?", rpId).Delete(TpRatingPlan{})
@@ -541,7 +539,7 @@ func (self *SQLStorage) GetTPActionTimings(tpid, tag string) (map[string][]*util
}
for _, tpAp := range tpActionPlans {
ats[tag] = append(ats[tag], &utils.TPActionTiming{ActionsId: tpAp.ActionsId, TimingId: tpAp.TimingId, Weight: tpAp.Weight})
ats[tpAp.Id] = append(ats[tpAp.Id], &utils.TPActionTiming{ActionsId: tpAp.ActionsId, TimingId: tpAp.TimingId, Weight: tpAp.Weight})
}
return ats, nil
}
@@ -1131,9 +1129,9 @@ func (self *SQLStorage) GetTpDestinations(tpid, tag string) (map[string]*Destina
for _, tpDest := range tpDests {
var dest *Destination
var found bool
if dest, found = dests[tag]; !found {
dest = &Destination{Id: tag}
dests[tag] = dest
if dest, found = dests[tpDest.Id]; !found {
dest = &Destination{Id: tpDest.Id}
dests[tpDest.Id] = dest
}
dest.AddPrefix(tpDest.Prefix)
}
@@ -1158,19 +1156,19 @@ func (self *SQLStorage) GetTpRates(tpid, tag string) (map[string]*utils.TPRate,
}
r := &utils.TPRate{
TPid: tpid,
RateId: tag,
RateId: tr.Id,
RateSlots: []*utils.RateSlot{rs},
}
// same tag only to create rate groups
er, exists := rts[tag]
er, exists := rts[tr.Id]
if exists {
if err := ValidNextGroup(er.RateSlots[len(er.RateSlots)-1], r.RateSlots[0]); err != nil {
return nil, err
}
er.RateSlots = append(er.RateSlots, r.RateSlots[0])
} else {
rts[tag] = r
rts[tr.Id] = r
}
}
return rts, nil
@@ -1190,7 +1188,7 @@ func (self *SQLStorage) GetTpDestinationRates(tpid, tag string) (map[string]*uti
for _, tpDr := range tpDestinationRates {
dr := &utils.TPDestinationRate{
TPid: tpid,
DestinationRateId: tag,
DestinationRateId: tpDr.Id,
DestinationRates: []*utils.DestinationRate{
&utils.DestinationRate{
DestinationId: tpDr.DestinationsId,
@@ -1200,13 +1198,13 @@ func (self *SQLStorage) GetTpDestinationRates(tpid, tag string) (map[string]*uti
},
},
}
existingDR, exists := rts[tag]
existingDR, exists := rts[tpDr.Id]
if exists {
existingDR.DestinationRates = append(existingDR.DestinationRates, dr.DestinationRates[0])
} else {
existingDR = dr
}
rts[tag] = existingDR
rts[tpDr.Id] = existingDR
}
return rts, nil
@@ -1224,7 +1222,7 @@ func (self *SQLStorage) GetTpTimings(tpid, tag string) (map[string]*utils.TPTimi
}
for _, tpTm := range tpTimings {
tms[tag] = NewTiming(tag, tpTm.Years, tpTm.Months, tpTm.MonthDays, tpTm.WeekDays, tpTm.Time)
tms[tpTm.Id] = NewTiming(tpTm.Id, tpTm.Years, tpTm.Months, tpTm.MonthDays, tpTm.WeekDays, tpTm.Time)
}
return tms, nil
@@ -1248,10 +1246,10 @@ func (self *SQLStorage) GetTpRatingPlans(tpid, tag string) (map[string][]*utils.
TimingId: tpRp.TimingId,
Weight: tpRp.Weight,
}
if _, exists := rpbns[tag]; exists {
rpbns[tag] = append(rpbns[tag], rpb)
if _, exists := rpbns[tpRp.Id]; exists {
rpbns[tpRp.Id] = append(rpbns[tpRp.Id], rpb)
} else { // New
rpbns[tag] = []*utils.TPRatingPlanBinding{rpb}
rpbns[tpRp.Id] = []*utils.TPRatingPlanBinding{rpb}
}
}
return rpbns, nil
@@ -1319,7 +1317,7 @@ func (self *SQLStorage) GetTpSharedGroups(tpid, tag string) (map[string][]*utils
}
for _, tpSg := range tpCdrStats {
sgs[tag] = append(sgs[tag], &utils.TPSharedGroup{
sgs[tag] = append(sgs[tpSg.Id], &utils.TPSharedGroup{
Account: tpSg.Account,
Strategy: tpSg.Strategy,
RatingSubject: tpSg.RatingSubject,
@@ -1341,7 +1339,7 @@ func (self *SQLStorage) GetTpCdrStats(tpid, tag string) (map[string][]*utils.TPC
}
for _, tpCs := range tpCdrStats {
css[tag] = append(css[tag], &utils.TPCdrStat{
css[tag] = append(css[tpCs.Id], &utils.TPCdrStat{
QueueLength: tpCs.QueueLength,
TimeWindow: tpCs.TimeWindow,
Metrics: tpCs.Metrics,
@@ -1493,7 +1491,7 @@ func (self *SQLStorage) GetTpActions(tpid, tag string) (map[string][]*utils.TPAc
ExtraParameters: tpAc.ExtraParameters,
Weight: tpAc.Weight,
}
as[tag] = append(as[tag], a)
as[tpAc.Id] = append(as[tpAc.Id], a)
}
return as, nil
}
@@ -1528,7 +1526,7 @@ func (self *SQLStorage) GetTpActionTriggers(tpid, tag string) (map[string][]*uti
ActionsId: tpAt.ActionsId,
MinQueuedItems: tpAt.MinQueuedItems,
}
ats[tag] = append(ats[tag], at)
ats[tpAt.Id] = append(ats[tpAt.Id], at)
}
return ats, nil
}

View File

@@ -106,6 +106,7 @@ func (self *TPCSVImporter) importDestinations(fn string) error {
return err
}
lineNr := 0
dests := make(map[string]*Destination) // Key:destId, value: listOfPrefixes
for {
lineNr++
record, err := fParser.ParseNextLine()
@@ -116,8 +117,15 @@ func (self *TPCSVImporter) importDestinations(fn string) error {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
} else {
if dst, hasIt := dests[record[0]]; hasIt {
dst.Prefixes = append(dst.Prefixes, record[1])
} else {
dests[record[0]] = &Destination{record[0], []string{record[1]}}
}
}
dst := &Destination{record[0], []string{record[1]}}
}
for _, dst := range dests {
if err := self.StorDb.SetTPDestination(self.TPid, dst); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
@@ -136,6 +144,7 @@ func (self *TPCSVImporter) importRates(fn string) error {
return err
}
lineNr := 0
rates := make(map[string][]*utils.RateSlot)
for {
lineNr++
record, err := fParser.ParseNextLine()
@@ -147,14 +156,18 @@ func (self *TPCSVImporter) importRates(fn string) error {
}
continue
}
rt, err := NewLoadRate(record[0], record[1], record[2], record[3], record[4], record[5])
newRt, err := NewLoadRate(record[0], record[1], record[2], record[3], record[4], record[5])
if err != nil {
return err
}
if err := self.StorDb.SetTPRates(self.TPid, map[string][]*utils.RateSlot{record[0]: rt.RateSlots}); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
if _, hasIt := rates[record[0]]; !hasIt {
rates[record[0]] = make([]*utils.RateSlot, 0)
}
rates[record[0]] = append(rates[record[0]], newRt.RateSlots...)
}
if err := self.StorDb.SetTPRates(self.TPid, rates); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
@@ -169,6 +182,7 @@ func (self *TPCSVImporter) importDestinationRates(fn string) error {
return err
}
lineNr := 0
drs := make(map[string][]*utils.DestinationRate)
for {
lineNr++
record, err := fParser.ParseNextLine()
@@ -185,21 +199,23 @@ func (self *TPCSVImporter) importDestinationRates(fn string) error {
log.Printf("Error parsing rounding decimals: %s", record[4])
return err
}
drs := []*utils.DestinationRate{
&utils.DestinationRate{
DestinationId: record[1],
RateId: record[2],
RoundingMethod: record[3],
RoundingDecimals: roundingDecimals,
},
if _, hasIt := drs[record[0]]; !hasIt {
drs[record[0]] = make([]*utils.DestinationRate, 0)
}
if err := self.StorDb.SetTPDestinationRates(self.TPid,
map[string][]*utils.DestinationRate{record[0]: drs}); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
drs[record[0]] = append(drs[record[0]], &utils.DestinationRate{
DestinationId: record[1],
RateId: record[2],
RoundingMethod: record[3],
RoundingDecimals: roundingDecimals,
})
}
if err := self.StorDb.SetTPDestinationRates(self.TPid, drs); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
}
@@ -212,6 +228,7 @@ func (self *TPCSVImporter) importRatingPlans(fn string) error {
return err
}
lineNr := 0
rpls := make(map[string][]*utils.TPRatingPlanBinding)
for {
lineNr++
record, err := fParser.ParseNextLine()
@@ -230,19 +247,21 @@ func (self *TPCSVImporter) importRatingPlans(fn string) error {
}
continue
}
drt := []*utils.TPRatingPlanBinding{
&utils.TPRatingPlanBinding{
DestinationRatesId: record[1],
Weight: weight,
TimingId: record[2],
},
if _, hasIt := rpls[record[0]]; !hasIt {
rpls[record[0]] = make([]*utils.TPRatingPlanBinding, 0)
}
if err := self.StorDb.SetTPRatingPlans(self.TPid, map[string][]*utils.TPRatingPlanBinding{record[0]: drt}); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
rpls[record[0]] = append(rpls[record[0]], &utils.TPRatingPlanBinding{
DestinationRatesId: record[1],
Weight: weight,
TimingId: record[2],
})
}
if err := self.StorDb.SetTPRatingPlans(self.TPid, rpls); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
}
@@ -255,6 +274,7 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error {
return err
}
lineNr := 0
rpfs := make(map[string]*utils.TPRatingProfile)
for {
lineNr++
record, err := fParser.ParseNextLine()
@@ -278,7 +298,8 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error {
if self.ImportId != "" {
loadId += "_" + self.ImportId
}
rp := &utils.TPRatingProfile{
newRp := &utils.TPRatingProfile{
TPid: self.TPid,
LoadId: loadId,
Tenant: tenant,
Category: tor,
@@ -287,12 +308,18 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error {
RatingPlanActivations: []*utils.TPRatingActivation{
&utils.TPRatingActivation{ActivationTime: record[4], RatingPlanId: ratingPlanTag, FallbackSubjects: fallbacksubject}},
}
if err := self.StorDb.SetTPRatingProfiles(self.TPid, map[string]*utils.TPRatingProfile{rp.KeyId(): rp}); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
if rp, hasIt := rpfs[newRp.KeyId()]; hasIt {
rp.RatingPlanActivations = append(rp.RatingPlanActivations, newRp.RatingPlanActivations...)
} else {
rpfs[newRp.KeyId()] = newRp
}
}
if err := self.StorDb.SetTPRatingProfiles(self.TPid, rpfs); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
}
@@ -309,6 +336,7 @@ func (self *TPCSVImporter) importActions(fn string) error {
return err
}
lineNr := 0
acts := make(map[string][]*utils.TPAction)
for {
lineNr++
record, err := fParser.ParseNextLine()
@@ -320,7 +348,7 @@ func (self *TPCSVImporter) importActions(fn string) error {
}
continue
}
actId, actionType, balanceType, direction, destTag, rateSubject, sharedGroup := record[0], record[1], record[2], record[3], record[6], record[7], record[8]
actId, actionType, balanceType, direction, destTag, rateSubject, category, sharedGroup := record[0], record[1], record[2], record[3], record[6], record[7], record[8], record[10]
units, err := strconv.ParseFloat(record[4], 64)
if err != nil && record[4] != "" {
if self.Verbose {
@@ -329,14 +357,17 @@ func (self *TPCSVImporter) importActions(fn string) error {
continue
}
balanceWeight, _ := strconv.ParseFloat(record[9], 64)
weight, err := strconv.ParseFloat(record[11], 64)
weight, err := strconv.ParseFloat(record[12], 64)
if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
act := &utils.TPAction{
if _, hasIt := acts[actId]; !hasIt {
acts[actId] = make([]*utils.TPAction, 0)
}
acts[actId] = append(acts[actId], &utils.TPAction{
Identifier: actionType,
BalanceType: balanceType,
Direction: direction,
@@ -344,16 +375,18 @@ func (self *TPCSVImporter) importActions(fn string) error {
ExpiryTime: record[5],
DestinationId: destTag,
RatingSubject: rateSubject,
Category: category,
SharedGroup: sharedGroup,
BalanceWeight: balanceWeight,
ExtraParameters: record[10],
ExtraParameters: record[11],
Weight: weight,
})
}
if err := self.StorDb.SetTPActions(self.TPid, acts); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
if err := self.StorDb.SetTPActions(self.TPid, map[string][]*utils.TPAction{actId: []*utils.TPAction{act}}); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
}
return nil
}
@@ -367,6 +400,7 @@ func (self *TPCSVImporter) importActionTimings(fn string) error {
return err
}
lineNr := 0
aplns := make(map[string][]*utils.TPActionTiming)
for {
lineNr++
record, err := fParser.ParseNextLine()
@@ -386,19 +420,21 @@ func (self *TPCSVImporter) importActionTimings(fn string) error {
}
continue
}
at := []*utils.TPActionTiming{
&utils.TPActionTiming{
ActionsId: actionsTag,
TimingId: timingTag,
Weight: weight,
},
if _, hasIt := aplns[tag]; !hasIt {
aplns[tag] = make([]*utils.TPActionTiming, 0)
}
if err := self.StorDb.SetTPActionTimings(self.TPid, map[string][]*utils.TPActionTiming{tag: at}); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
aplns[tag] = append(aplns[tag], &utils.TPActionTiming{
ActionsId: actionsTag,
TimingId: timingTag,
Weight: weight,
})
}
if err := self.StorDb.SetTPActionTimings(self.TPid, aplns); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
}
@@ -411,6 +447,7 @@ func (self *TPCSVImporter) importActionTriggers(fn string) error {
return err
}
lineNr := 0
atrs := make(map[string][]*utils.TPActionTrigger)
for {
lineNr++
record, err := fParser.ParseNextLine()
@@ -422,7 +459,7 @@ func (self *TPCSVImporter) importActionTriggers(fn string) error {
}
continue
}
tag, balanceType, direction, thresholdType, destinationTag, balanceExpirationDate, balanceRatingSubject, balanceSharedGroup, actionsTag := record[0], record[1], record[2], record[3], record[7], record[9], record[10], record[11], record[13]
tag, balanceType, direction, thresholdType, destinationTag, balanceExpirationDate, balanceRatingSubject, balanceCategory, balanceSharedGroup, actionsTag := record[0], record[1], record[2], record[3], record[7], record[9], record[10], record[10], record[12], record[14]
threshold, err := strconv.ParseFloat(record[4], 64)
if err != nil {
if self.Verbose {
@@ -447,19 +484,22 @@ func (self *TPCSVImporter) importActionTriggers(fn string) error {
}
continue
}
minQueuedItems, err := strconv.Atoi(record[12])
minQueuedItems, err := strconv.Atoi(record[13])
if err != nil && record[12] != "" {
log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error())
continue
}
weight, err := strconv.ParseFloat(record[14], 64)
weight, err := strconv.ParseFloat(record[15], 64)
if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
at := &utils.TPActionTrigger{
if _, hasIt := atrs[tag]; !hasIt {
atrs[tag] = make([]*utils.TPActionTrigger, 0)
}
atrs[tag] = append(atrs[tag], &utils.TPActionTrigger{
BalanceType: balanceType,
Direction: direction,
ThresholdType: thresholdType,
@@ -470,17 +510,19 @@ func (self *TPCSVImporter) importActionTriggers(fn string) error {
BalanceWeight: balanceWeight,
BalanceExpirationDate: balanceExpirationDate,
BalanceRatingSubject: balanceRatingSubject,
BalanceCategory: balanceCategory,
BalanceSharedGroup: balanceSharedGroup,
MinQueuedItems: minQueuedItems,
Weight: weight,
ActionsId: actionsTag,
}
if err := self.StorDb.SetTPActionTriggers(self.TPid, map[string][]*utils.TPActionTrigger{tag: []*utils.TPActionTrigger{at}}); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
})
}
if err := self.StorDb.SetTPActionTriggers(self.TPid, atrs); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
}