diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index fcafc5950..198ef7ce5 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -45,7 +45,7 @@ var ( func main() { flag.Parse() sep = []rune(*separator)[0] - csvr := ×pans.CSVReader{timespans.OpenFileCSVReader} + csvr := timespans.NewFileCSVReader() csvr.LoadDestinations(*destinationsFn, sep) csvr.LoadRates(*ratesFn, sep) csvr.LoadTimings(*timingsFn, sep) @@ -59,5 +59,5 @@ func main() { if err != nil { log.Fatalf("Could not open database connection: %v", err) } - timespans.WriteToDatabase(storage, *flush, true) + csvr.WriteToDatabase(storage, *flush, true) } diff --git a/timespans/csvreader.go b/timespans/csvreader.go index 6bf390ead..91dc906b0 100644 --- a/timespans/csvreader.go +++ b/timespans/csvreader.go @@ -28,24 +28,111 @@ import ( "time" ) -var ( - actions = make(map[string][]*Action) - actionsTimings = make(map[string][]*ActionTiming) - actionsTriggers = make(map[string][]*ActionTrigger) +type CSVReader struct { + readerFunc func(string, rune) (*csv.Reader, *os.File, error) + actions map[string][]*Action + actionsTimings map[string][]*ActionTiming + actionsTriggers map[string][]*ActionTrigger accountActions []*UserBalance destinations []*Destination - rates = make(map[string][]*Rate) - timings = make(map[string][]*Timing) - activationPeriods = make(map[string]*ActivationPeriod) - ratingProfiles = make(map[string]CallDescriptors) -) + rates map[string][]*Rate + timings map[string][]*Timing + activationPeriods map[string]*ActivationPeriod + ratingProfiles map[string]CallDescriptors +} -type CSVReader struct { - ReaderFunc func(string, rune) (*csv.Reader, *os.File, error) +func NewFileCSVReader() *CSVReader { + c := new(CSVReader) + c.actions = make(map[string][]*Action) + c.actionsTimings = make(map[string][]*ActionTiming) + c.actionsTriggers = make(map[string][]*ActionTrigger) + c.rates = make(map[string][]*Rate) + c.timings = make(map[string][]*Timing) + c.activationPeriods = make(map[string]*ActivationPeriod) + c.ratingProfiles = make(map[string]CallDescriptors) + c.readerFunc = openFileCSVReader + return c +} + +func NewStringCSVReader() *CSVReader { + c := NewFileCSVReader() + c.readerFunc = openStringCSVReader + return c +} + +func openFileCSVReader(fn string, comma rune) (csvReader *csv.Reader, fp *os.File, err error) { + fp, err = os.Open(fn) + if err != nil { + return + } + csvReader = csv.NewReader(fp) + csvReader.Comma = comma + csvReader.TrailingComma = true + return +} + +func openStringCSVReader(data string, comma rune) (csvReader *csv.Reader, fp *os.File, err error) { + csvReader = csv.NewReader(strings.NewReader(data)) + csvReader.Comma = comma + csvReader.TrailingComma = true + return +} + +func (csvr *CSVReader) WriteToDatabase(storage StorageGetter, flush, verbose bool) { + if flush { + storage.Flush() + } + if verbose { + log.Print("Destinations") + } + for _, d := range csvr.destinations { + storage.SetDestination(d) + if verbose { + log.Print(d.Id, " : ", d.Prefixes) + } + } + if verbose { + log.Print("Rating profiles") + } + for _, cds := range csvr.ratingProfiles { + for _, cd := range cds { + storage.SetActivationPeriodsOrFallback(cd.GetKey(), cd.ActivationPeriods, cd.FallbackKey) + if verbose { + log.Print(cd.GetKey()) + } + } + } + if verbose { + log.Print("Action timings") + } + for k, ats := range csvr.actionsTimings { + storage.SetActionTimings(ACTION_TIMING_PREFIX+":"+k, ats) + if verbose { + log.Println(k) + } + } + if verbose { + log.Print("Actions") + } + for k, as := range csvr.actions { + storage.SetActions(k, as) + if verbose { + log.Println(k) + } + } + if verbose { + log.Print("Account actions") + } + for _, ub := range csvr.accountActions { + storage.SetUserBalance(ub) + if verbose { + log.Println(ub.Id) + } + } } func (csvr *CSVReader) LoadDestinations(fn string, comma rune) { - csvReader, fp, err := csvr.ReaderFunc(fn, comma) + csvReader, fp, err := csvr.readerFunc(fn, comma) if err != nil { return } @@ -60,7 +147,7 @@ func (csvr *CSVReader) LoadDestinations(fn string, comma rune) { continue } var dest *Destination - for _, d := range destinations { + for _, d := range csvr.destinations { if d.Id == tag { dest = d break @@ -68,14 +155,14 @@ func (csvr *CSVReader) LoadDestinations(fn string, comma rune) { } if dest == nil { dest = &Destination{Id: tag} - destinations = append(destinations, dest) + csvr.destinations = append(csvr.destinations, dest) } dest.Prefixes = append(dest.Prefixes, record[1]) } } func (csvr *CSVReader) LoadRates(fn string, comma rune) { - csvReader, fp, err := csvr.ReaderFunc(fn, comma) + csvReader, fp, err := csvr.readerFunc(fn, comma) if err != nil { return } @@ -92,12 +179,12 @@ func (csvr *CSVReader) LoadRates(fn string, comma rune) { if err != nil { continue } - rates[tag] = append(rates[tag], r) + csvr.rates[tag] = append(csvr.rates[tag], r) } } func (csvr *CSVReader) LoadTimings(fn string, comma rune) { - csvReader, fp, err := csvr.ReaderFunc(fn, comma) + csvReader, fp, err := csvr.readerFunc(fn, comma) if err != nil { return } @@ -112,12 +199,12 @@ func (csvr *CSVReader) LoadTimings(fn string, comma rune) { } t := NewTiming(record[1:]...) - timings[tag] = append(timings[tag], t) + csvr.timings[tag] = append(csvr.timings[tag], t) } } func (csvr *CSVReader) LoadRateTimings(fn string, comma rune) { - csvReader, fp, err := csvr.ReaderFunc(fn, comma) + csvReader, fp, err := csvr.readerFunc(fn, comma) if err != nil { return } @@ -131,31 +218,31 @@ func (csvr *CSVReader) LoadRateTimings(fn string, comma rune) { continue } - ts, exists := timings[record[2]] + ts, exists := csvr.timings[record[2]] if !exists { log.Printf("Could not get timing for tag %v", record[2]) continue } for _, t := range ts { rt := NewRateTiming(record[1], t, record[3]) - rs, exists := rates[record[1]] + rs, exists := csvr.rates[record[1]] if !exists { log.Printf("Could not rate for tag %v", record[2]) continue } for _, r := range rs { - _, exists := activationPeriods[tag] + _, exists := csvr.activationPeriods[tag] if !exists { - activationPeriods[tag] = &ActivationPeriod{} + csvr.activationPeriods[tag] = &ActivationPeriod{} } - activationPeriods[tag].AddIntervalIfNotPresent(rt.GetInterval(r)) + csvr.activationPeriods[tag].AddIntervalIfNotPresent(rt.GetInterval(r)) } } } } func (csvr *CSVReader) LoadRatingProfiles(fn string, comma rune) { - csvReader, fp, err := csvr.ReaderFunc(fn, comma) + csvReader, fp, err := csvr.readerFunc(fn, comma) if err != nil { return } @@ -178,12 +265,12 @@ func (csvr *CSVReader) LoadRatingProfiles(fn string, comma rune) { log.Printf("Cannot parse activation time from %v", record[6]) continue } - for _, d := range destinations { + for _, d := range csvr.destinations { for _, p := range d.Prefixes { //destinations // Search for a CallDescriptor with the same key var cd *CallDescriptor key := fmt.Sprintf("%s:%s:%s:%s:%s", direction, tenant, tor, subject, p) - for _, c := range ratingProfiles[p] { + for _, c := range csvr.ratingProfiles[p] { if c.GetKey() == key { cd = c } @@ -196,9 +283,9 @@ func (csvr *CSVReader) LoadRatingProfiles(fn string, comma rune) { Subject: subject, Destination: p, } - ratingProfiles[p] = append(ratingProfiles[p], cd) + csvr.ratingProfiles[p] = append(csvr.ratingProfiles[p], cd) } - ap, exists := activationPeriods[record[5]] + ap, exists := csvr.activationPeriods[record[5]] if !exists { log.Print("Could not load ratinTiming for tag: ", record[5]) continue @@ -209,7 +296,7 @@ func (csvr *CSVReader) LoadRatingProfiles(fn string, comma rune) { newAP.ActivationTime = at cd.AddActivationPeriodIfNotPresent(newAP) if fallbacksubject != "" && - ratingProfiles[p].getKey(fmt.Sprintf("%s:%s:%s:%s:%s", direction, tenant, tor, subject, FallbackDestination)) == nil { + csvr.ratingProfiles[p].getKey(fmt.Sprintf("%s:%s:%s:%s:%s", direction, tenant, tor, subject, FallbackDestination)) == nil { cd = &CallDescriptor{ Direction: direction, Tenant: tenant, @@ -218,7 +305,7 @@ func (csvr *CSVReader) LoadRatingProfiles(fn string, comma rune) { Destination: FallbackDestination, FallbackKey: fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, fallbacksubject), } - ratingProfiles[p] = append(ratingProfiles[p], cd) + csvr.ratingProfiles[p] = append(csvr.ratingProfiles[p], cd) } } } @@ -226,7 +313,7 @@ func (csvr *CSVReader) LoadRatingProfiles(fn string, comma rune) { } func (csvr *CSVReader) LoadActions(fn string, comma rune) { - csvReader, fp, err := csvr.ReaderFunc(fn, comma) + csvReader, fp, err := csvr.readerFunc(fn, comma) if err != nil { return } @@ -289,12 +376,12 @@ func (csvr *CSVReader) LoadActions(fn string, comma rune) { }, } } - actions[tag] = append(actions[tag], a) + csvr.actions[tag] = append(csvr.actions[tag], a) } } func (csvr *CSVReader) LoadActionTimings(fn string, comma rune) { - csvReader, fp, err := csvr.ReaderFunc(fn, comma) + csvReader, fp, err := csvr.readerFunc(fn, comma) if err != nil { return } @@ -308,7 +395,7 @@ func (csvr *CSVReader) LoadActionTimings(fn string, comma rune) { continue } - ts, exists := timings[record[2]] + ts, exists := csvr.timings[record[2]] if !exists { log.Printf("Could not load the timing for tag: %v", record[2]) continue @@ -331,13 +418,13 @@ func (csvr *CSVReader) LoadActionTimings(fn string, comma rune) { }, ActionsId: record[1], } - actionsTimings[tag] = append(actionsTimings[tag], at) + csvr.actionsTimings[tag] = append(csvr.actionsTimings[tag], at) } } } func (csvr *CSVReader) LoadActionTriggers(fn string, comma rune) { - csvReader, fp, err := csvr.ReaderFunc(fn, comma) + csvReader, fp, err := csvr.readerFunc(fn, comma) if err != nil { return } @@ -368,12 +455,12 @@ func (csvr *CSVReader) LoadActionTriggers(fn string, comma rune) { ActionsId: record[5], Weight: weight, } - actionsTriggers[tag] = append(actionsTriggers[tag], at) + csvr.actionsTriggers[tag] = append(csvr.actionsTriggers[tag], at) } } func (csvr *CSVReader) LoadAccountActions(fn string, comma rune) { - csvReader, fp, err := csvr.ReaderFunc(fn, comma) + csvReader, fp, err := csvr.readerFunc(fn, comma) if err != nil { return } @@ -385,7 +472,7 @@ func (csvr *CSVReader) LoadAccountActions(fn string, comma rune) { continue } tag := fmt.Sprintf("%s:%s:%s", record[2], record[0], record[1]) - aTriggers, exists := actionsTriggers[record[4]] + aTriggers, exists := csvr.actionsTriggers[record[4]] if !exists { log.Printf("Could not get action triggers for tag %v", record[4]) continue @@ -396,9 +483,9 @@ func (csvr *CSVReader) LoadAccountActions(fn string, comma rune) { Id: tag, ActionTriggers: aTriggers, } - accountActions = append(accountActions, ub) + csvr.accountActions = append(csvr.accountActions, ub) - aTimings, exists := actionsTimings[aTimingsTag] + aTimings, exists := csvr.actionsTimings[aTimingsTag] if !exists { log.Printf("Could not get action triggers for tag %v", aTimingsTag) // must not continue here @@ -408,74 +495,3 @@ func (csvr *CSVReader) LoadAccountActions(fn string, comma rune) { } } } - -func OpenFileCSVReader(fn string, comma rune) (csvReader *csv.Reader, fp *os.File, err error) { - fp, err = os.Open(fn) - if err != nil { - return - } - csvReader = csv.NewReader(fp) - csvReader.Comma = comma - csvReader.TrailingComma = true - return -} - -func OpenStringCSVReader(data string, comma rune) (csvReader *csv.Reader, fp *os.File, err error) { - csvReader = csv.NewReader(strings.NewReader(data)) - csvReader.Comma = comma - csvReader.TrailingComma = true - return -} - -func WriteToDatabase(storage StorageGetter, flush, verbose bool) { - if flush { - storage.Flush() - } - if verbose { - log.Print("Destinations") - } - for _, d := range destinations { - storage.SetDestination(d) - if verbose { - log.Print(d.Id, " : ", d.Prefixes) - } - } - if verbose { - log.Print("Rating profiles") - } - for _, cds := range ratingProfiles { - for _, cd := range cds { - storage.SetActivationPeriodsOrFallback(cd.GetKey(), cd.ActivationPeriods, cd.FallbackKey) - if verbose { - log.Print(cd.GetKey()) - } - } - } - if verbose { - log.Print("Action timings") - } - for k, ats := range actionsTimings { - storage.SetActionTimings(ACTION_TIMING_PREFIX+":"+k, ats) - if verbose { - log.Println(k) - } - } - if verbose { - log.Print("Actions") - } - for k, as := range actions { - storage.SetActions(k, as) - if verbose { - log.Println(k) - } - } - if verbose { - log.Print("Account actions") - } - for _, ub := range accountActions { - storage.SetUserBalance(ub) - if verbose { - log.Println(ub.Id) - } - } -} diff --git a/timespans/csvreader_test.go b/timespans/csvreader_test.go index e9d320bfd..88585dcfb 100644 --- a/timespans/csvreader_test.go +++ b/timespans/csvreader_test.go @@ -88,8 +88,10 @@ vdf,minitsboy,OUT,MORE_MINUTES,STANDARD_TRIGGER ` ) +var csvr *CSVReader + func init() { - csvr := &CSVReader{OpenStringCSVReader} + csvr = NewStringCSVReader() csvr.LoadDestinations(dest, ',') csvr.LoadRates(rts, ',') csvr.LoadTimings(ts, ',') @@ -99,60 +101,60 @@ func init() { csvr.LoadActionTimings(atms, ',') csvr.LoadActionTriggers(atrs, ',') csvr.LoadAccountActions(accs, ',') - WriteToDatabase(storageGetter, false, false) + csvr.WriteToDatabase(storageGetter, false, false) } func TestLoadDestinations(t *testing.T) { - if len(destinations) != 6 { - t.Error("Failed to load destinations: ", destinations) + if len(csvr.destinations) != 6 { + t.Error("Failed to load destinations: ", csvr.destinations) } } func TestLoadRates(t *testing.T) { - if len(rates) != 5 { - t.Error("Failed to load rates: ", rates) + if len(csvr.rates) != 5 { + t.Error("Failed to load rates: ", csvr.rates) } } func TestLoadTimimgs(t *testing.T) { - if len(timings) != 4 { - t.Error("Failed to load timings: ", timings) + if len(csvr.timings) != 4 { + t.Error("Failed to load timings: ", csvr.timings) } } func TestLoadRateTimings(t *testing.T) { - if len(activationPeriods) != 4 { - t.Error("Failed to load rate timings: ", activationPeriods) + if len(csvr.activationPeriods) != 4 { + t.Error("Failed to load rate timings: ", csvr.activationPeriods) } } func TestLoadRatingProfiles(t *testing.T) { - if len(ratingProfiles) != 7 { - t.Error("Failed to load rating profiles: ", len(ratingProfiles), ratingProfiles) + if len(csvr.ratingProfiles) != 7 { + t.Error("Failed to load rating profiles: ", len(csvr.ratingProfiles), csvr.ratingProfiles) } } func TestLoadActions(t *testing.T) { - if len(actions) != 1 { - t.Error("Failed to load actions: ", actions) + if len(csvr.actions) != 1 { + t.Error("Failed to load actions: ", csvr.actions) } } func TestLoadActionTimings(t *testing.T) { - if len(actionsTimings) != 1 { - t.Error("Failed to load action timings: ", actionsTimings) + if len(csvr.actionsTimings) != 1 { + t.Error("Failed to load action timings: ", csvr.actionsTimings) } } func TestLoadActionTriggers(t *testing.T) { - if len(actionsTriggers) != 1 { - t.Error("Failed to load action triggers: ", actionsTriggers) + if len(csvr.actionsTriggers) != 1 { + t.Error("Failed to load action triggers: ", csvr.actionsTriggers) } } func TestLoadAccountActions(t *testing.T) { - if len(accountActions) != 1 { - t.Error("Failed to load account actions: ", accountActions) + if len(csvr.accountActions) != 1 { + t.Error("Failed to load account actions: ", csvr.accountActions) } }