From a0d0424f7927064c4a7fd262bfc3c85583dcf39d Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 29 May 2015 13:19:33 +0300 Subject: [PATCH] added csv dump from tagged struct to csv string --- engine/model_helpers.go | 28 +- engine/model_helpers_test.go | 21 + engine/tpimporter_csv.go | 732 +++++++++++++++++++++++++++++++++++ 3 files changed, 780 insertions(+), 1 deletion(-) create mode 100644 engine/model_helpers_test.go create mode 100644 engine/tpimporter_csv.go diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 3e10fee76..59d40e1a2 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -36,7 +36,7 @@ func csvLoad(s interface{}, values []string) (interface{}, error) { elem := reflect.New(st).Elem() for fildName, fieldValue := range fieldValueMap { field := elem.FieldByName(fildName) - if field.IsValid() && field.CanSet() { + if field.IsValid() { switch field.Kind() { case reflect.Float64: value, err := strconv.ParseFloat(fieldValue, 64) @@ -52,6 +52,32 @@ func csvLoad(s interface{}, values []string) (interface{}, error) { return elem.Interface(), nil } +func csvDump(s interface{}, sep string) (string, error) { + fieldIndexMap := make(map[string]int) + st := reflect.TypeOf(s) + numFields := st.NumField() + for i := 0; i < numFields; i++ { + field := st.Field(i) + index := field.Tag.Get("index") + if index != "" { + if idx, err := strconv.Atoi(index); err != nil { + return "", fmt.Errorf("invalid %v.%v index %v", st.Name(), field.Name, index) + } else { + fieldIndexMap[field.Name] = idx + } + } + } + elem := reflect.ValueOf(s) + result := make([]string, len(fieldIndexMap)) + for fieldName, fieldIndex := range fieldIndexMap { + field := elem.FieldByName(fieldName) + if field.IsValid() && fieldIndex < len(result) { + result[fieldIndex] = field.String() + } + } + return strings.Join(result, sep), nil +} + func getColumnCount(s interface{}) int { st := reflect.TypeOf(s) numFields := st.NumField() diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go new file mode 100644 index 000000000..8da4984ee --- /dev/null +++ b/engine/model_helpers_test.go @@ -0,0 +1,21 @@ +package engine + +import "testing" + +func TestModelHelperCsvLoad(t *testing.T) { + l := csvLoad(TpDestination{}, []string{"TEST_DEST", "+492"}) + tpd := l.(TpDestination) + if tpd.Tag != "TEST_DEST" || tpd.Prefix != "+492" { + t.Errorf("model load failed: %+v", tpd) + } +} + +func TestModelHelperCsvDump(t *testing.T) { + tpd := &TpDestination{ + Tag: "TEST_DEST", + Prefix: "+492"} + csv, err := csvDump(*tpd, ",") + if err != nil || csv != "TEST_DEST,+492" { + t.Errorf("model load failed: %+v", tpd) + } +} diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go new file mode 100644 index 000000000..62be9b103 --- /dev/null +++ b/engine/tpimporter_csv.go @@ -0,0 +1,732 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + "io" + "io/ioutil" + "log" + "strconv" + + "github.com/cgrates/cgrates/utils" +) + +// Import tariff plan from csv into storDb +type TPCSVImporter struct { + TPid string // Load data on this tpid + StorDb LoadStorage // StorDb connection handle + DirPath string // Directory path to import from + Sep rune // Separator in the csv file + Verbose bool // If true will print a detailed information instead of silently discarding it + ImportId string // Use this to differentiate between imports (eg: when autogenerating fields like RatingProfileId +} + +// Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis. +// Change it to func(string) error as soon as Travis updates. +var fileHandlers = map[string]func(*TPCSVImporter, string) error{ + utils.TIMINGS_CSV: (*TPCSVImporter).importTimings, + utils.DESTINATIONS_CSV: (*TPCSVImporter).importDestinations, + utils.RATES_CSV: (*TPCSVImporter).importRates, + utils.DESTINATION_RATES_CSV: (*TPCSVImporter).importDestinationRates, + utils.RATING_PLANS_CSV: (*TPCSVImporter).importRatingPlans, + utils.RATING_PROFILES_CSV: (*TPCSVImporter).importRatingProfiles, + utils.SHARED_GROUPS_CSV: (*TPCSVImporter).importSharedGroups, + utils.ACTIONS_CSV: (*TPCSVImporter).importActions, + utils.ACTION_PLANS_CSV: (*TPCSVImporter).importActionTimings, + utils.ACTION_TRIGGERS_CSV: (*TPCSVImporter).importActionTriggers, + utils.ACCOUNT_ACTIONS_CSV: (*TPCSVImporter).importAccountActions, + utils.DERIVED_CHARGERS_CSV: (*TPCSVImporter).importDerivedChargers, + utils.CDR_STATS_CSV: (*TPCSVImporter).importCdrStats, +} + +func (self *TPCSVImporter) Run() error { + files, _ := ioutil.ReadDir(self.DirPath) + for _, f := range files { + fHandler, hasName := fileHandlers[f.Name()] + if !hasName { + continue + } + if err := fHandler(self, f.Name()); err != nil { + Logger.Err(fmt.Sprintf(" Importing file: %s, got error: %s", f.Name(), err.Error())) + } + } + return nil +} + +// Handler importing timings from file, saved row by row to storDb +func (self *TPCSVImporter) importTimings(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + lineNr := 0 + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + tm := &utils.ApierTPTiming{TPid: self.TPid, TimingId: record[0], Years: record[1], Months: record[2], MonthDays: record[3], WeekDays: record[4], Time: record[5]} + if err := self.StorDb.SetTPTiming(tm); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + } + return nil +} + +func (self *TPCSVImporter) importDestinations(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + lineNr := 0 + dests := make(map[string]*Destination) // Key:destId, value: listOfPrefixes + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } else { + if dst, hasIt := dests[record[0]]; hasIt { + dst.Prefixes = append(dst.Prefixes, record[1]) + } else { + dests[record[0]] = &Destination{record[0], []string{record[1]}} + } + } + } + for _, dst := range dests { + if err := self.StorDb.SetTPDestination(self.TPid, dst); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + } + return nil +} + +func (self *TPCSVImporter) importRates(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + lineNr := 0 + rates := make(map[string][]*utils.RateSlot) + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + newRt, err := NewLoadRate(record[0], record[1], record[2], record[3], record[4], record[5]) + if err != nil { + return err + } + if _, hasIt := rates[record[0]]; !hasIt { + rates[record[0]] = make([]*utils.RateSlot, 0) + } + rates[record[0]] = append(rates[record[0]], newRt.RateSlots...) + } + if err := self.StorDb.SetTPRates(self.TPid, rates); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + return nil +} + +func (self *TPCSVImporter) importDestinationRates(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + lineNr := 0 + drs := make(map[string][]*utils.DestinationRate) + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + roundingDecimals, err := strconv.Atoi(record[4]) + if err != nil { + log.Printf("Error parsing rounding decimals: %s", record[4]) + return err + } + maxCost, err := strconv.ParseFloat(record[5], 64) + if err != nil { + log.Printf("Error parsing max cost from: %v", record[5]) + return err + } + if _, hasIt := drs[record[0]]; !hasIt { + drs[record[0]] = make([]*utils.DestinationRate, 0) + } + drs[record[0]] = append(drs[record[0]], &utils.DestinationRate{ + DestinationId: record[1], + RateId: record[2], + RoundingMethod: record[3], + RoundingDecimals: roundingDecimals, + MaxCost: maxCost, + MaxCostStrategy: record[6], + }) + } + + if err := self.StorDb.SetTPDestinationRates(self.TPid, drs); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + + return nil +} + +func (self *TPCSVImporter) importRatingPlans(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + lineNr := 0 + rpls := make(map[string][]*utils.TPRatingPlanBinding) + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + weight, err := strconv.ParseFloat(record[3], 64) + if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + if _, hasIt := rpls[record[0]]; !hasIt { + rpls[record[0]] = make([]*utils.TPRatingPlanBinding, 0) + } + rpls[record[0]] = append(rpls[record[0]], &utils.TPRatingPlanBinding{ + DestinationRatesId: record[1], + Weight: weight, + TimingId: record[2], + }) + } + if err := self.StorDb.SetTPRatingPlans(self.TPid, rpls); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + + return nil +} + +func (self *TPCSVImporter) importRatingProfiles(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + lineNr := 0 + rpfs := make(map[string]*utils.TPRatingProfile) + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + direction, tenant, tor, subject, ratingPlanTag, fallbacksubject := record[0], record[1], record[2], record[3], record[5], record[6] + _, err = utils.ParseDate(record[4]) + if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + loadId := utils.CSV_LOAD //Autogenerate rating profile id + if self.ImportId != "" { + loadId += "_" + self.ImportId + } + newRp := &utils.TPRatingProfile{ + TPid: self.TPid, + LoadId: loadId, + Tenant: tenant, + Category: tor, + Direction: direction, + Subject: subject, + RatingPlanActivations: []*utils.TPRatingActivation{ + &utils.TPRatingActivation{ActivationTime: record[4], RatingPlanId: ratingPlanTag, FallbackSubjects: fallbacksubject}}, + } + if rp, hasIt := rpfs[newRp.KeyId()]; hasIt { + rp.RatingPlanActivations = append(rp.RatingPlanActivations, newRp.RatingPlanActivations...) + } else { + rpfs[newRp.KeyId()] = newRp + } + } + if err := self.StorDb.SetTPRatingProfiles(self.TPid, rpfs); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + + return nil +} + +func (self *TPCSVImporter) importSharedGroups(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + shgs := make(map[string][]*utils.TPSharedGroup) + lineNr := 0 + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + if _, hasIt := shgs[record[0]]; !hasIt { + shgs[record[0]] = make([]*utils.TPSharedGroup, 0) + } + shgs[record[0]] = append(shgs[record[0]], &utils.TPSharedGroup{Account: record[1], Strategy: record[2], RatingSubject: record[3]}) + } + if err := self.StorDb.SetTPSharedGroups(self.TPid, shgs); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + return nil +} + +func (self *TPCSVImporter) importActions(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + acts := make(map[string][]*utils.TPAction) + lineNr := 0 + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + actId, actionType, balanceTag, balanceType, direction, destTags, rateSubject, category, sharedGroup := record[ACTSCSVIDX_TAG], record[ACTSCSVIDX_ACTION], + record[ACTSCSVIDX_BALANCE_TAG], record[ACTSCSVIDX_BALANCE_TYPE], record[ACTSCSVIDX_DIRECTION], record[ACTSCSVIDX_DESTINATION_TAG], record[ACTSCSVIDX_RATING_SUBJECT], + record[ACTSCSVIDX_CATEGORY], record[ACTSCSVIDX_SHARED_GROUP] + units, err := strconv.ParseFloat(record[ACTSCSVIDX_UNITS], 64) + if err != nil && record[ACTSCSVIDX_UNITS] != "" { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + balanceWeight, _ := strconv.ParseFloat(record[ACTSCSVIDX_BALANCE_WEIGHT], 64) + weight, err := strconv.ParseFloat(record[ACTSCSVIDX_WEIGHT], 64) + if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + if _, hasIt := acts[actId]; !hasIt { + acts[actId] = make([]*utils.TPAction, 0) + } + acts[actId] = append(acts[actId], &utils.TPAction{ + Identifier: actionType, + BalanceId: balanceTag, + BalanceType: balanceType, + Direction: direction, + Units: units, + ExpiryTime: record[ACTSCSVIDX_EXPIRY_TIME], + DestinationIds: destTags, + RatingSubject: rateSubject, + Category: category, + SharedGroup: sharedGroup, + BalanceWeight: balanceWeight, + ExtraParameters: record[ACTSCSVIDX_EXTRA_PARAMS], + Weight: weight, + }) + } + if err := self.StorDb.SetTPActions(self.TPid, acts); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + + } + return nil +} + +func (self *TPCSVImporter) importActionTimings(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + lineNr := 0 + aplns := make(map[string][]*utils.TPActionTiming) + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + tag, actionsTag, timingTag := record[0], record[1], record[2] + weight, err := strconv.ParseFloat(record[3], 64) + if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + if _, hasIt := aplns[tag]; !hasIt { + aplns[tag] = make([]*utils.TPActionTiming, 0) + } + aplns[tag] = append(aplns[tag], &utils.TPActionTiming{ + ActionsId: actionsTag, + TimingId: timingTag, + Weight: weight, + }) + } + if err := self.StorDb.SetTPActionTimings(self.TPid, aplns); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + + return nil +} + +func (self *TPCSVImporter) importActionTriggers(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + lineNr := 0 + atrs := make(map[string][]*utils.TPActionTrigger) + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + tag, balanceId, balanceType, direction, thresholdType, destinationTags, balanceExpirationDate, balanceRatingSubject, balanceCategory, balanceSharedGroup, actionsTag := record[ATRIGCSVIDX_TAG], record[ATRIGCSVIDX_BAL_TAG], record[ATRIGCSVIDX_BAL_TYPE], + record[ATRIGCSVIDX_BAL_DIRECTION], record[ATRIGCSVIDX_THRESHOLD_TYPE], record[ATRIGCSVIDX_BAL_DESTINATION_TAG], record[ATRIGCSVIDX_BAL_EXPIRY_TIME], record[ATRIGCSVIDX_BAL_RATING_SUBJECT], + record[ATRIGCSVIDX_BAL_CATEGORY], record[ATRIGCSVIDX_BAL_SHARED_GROUP], record[ATRIGCSVIDX_ACTIONS_TAG] + threshold, err := strconv.ParseFloat(record[ATRIGCSVIDX_THRESHOLD_VALUE], 64) + if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + recurrent, err := strconv.ParseBool(record[ATRIGCSVIDX_RECURRENT]) + if err != nil { + log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error()) + continue + } + balanceWeight, err := strconv.ParseFloat(record[ATRIGCSVIDX_BAL_WEIGHT], 64) + if err != nil && record[ATRIGCSVIDX_BAL_WEIGHT] != "" { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + minQueuedItems, err := strconv.Atoi(record[ATRIGCSVIDX_STATS_MIN_QUEUED_ITEMS]) + if err != nil && record[ATRIGCSVIDX_STATS_MIN_QUEUED_ITEMS] != "" { + log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error()) + continue + } + weight, err := strconv.ParseFloat(record[ATRIGCSVIDX_WEIGHT], 64) + if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + if _, hasIt := atrs[tag]; !hasIt { + atrs[tag] = make([]*utils.TPActionTrigger, 0) + } + atrs[tag] = append(atrs[tag], &utils.TPActionTrigger{ + ThresholdType: thresholdType, + ThresholdValue: threshold, + Recurrent: recurrent, + MinSleep: record[ATRIGCSVIDX_MIN_SLEEP], + BalanceId: balanceId, + BalanceType: balanceType, + BalanceDirection: direction, + BalanceDestinationIds: destinationTags, + BalanceWeight: balanceWeight, + BalanceExpirationDate: balanceExpirationDate, + BalanceRatingSubject: balanceRatingSubject, + BalanceCategory: balanceCategory, + BalanceSharedGroup: balanceSharedGroup, + MinQueuedItems: minQueuedItems, + Weight: weight, + ActionsId: actionsTag, + }) + } + if err := self.StorDb.SetTPActionTriggers(self.TPid, atrs); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + + return nil +} + +func (self *TPCSVImporter) importAccountActions(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + loadId := utils.CSV_LOAD //Autogenerate account actions profile id + if self.ImportId != "" { + loadId += "_" + self.ImportId + } + lineNr := 0 + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + tenant, account, direction, actionTimingsTag, actionTriggersTag := record[0], record[1], record[2], record[3], record[4] + + tpaa := &utils.TPAccountActions{TPid: self.TPid, LoadId: loadId, Tenant: tenant, Account: account, Direction: direction, + ActionPlanId: actionTimingsTag, ActionTriggersId: actionTriggersTag} + aa := map[string]*utils.TPAccountActions{tpaa.KeyId(): tpaa} + if err := self.StorDb.SetTPAccountActions(self.TPid, aa); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + } + return nil +} + +func (self *TPCSVImporter) importDerivedChargers(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + loadId := utils.CSV_LOAD //Autogenerate account actions profile id + if self.ImportId != "" { + loadId += "_" + self.ImportId + } + dcs := make(map[string][]*utils.TPDerivedCharger) + lineNr := 0 + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + newDcs := utils.TPDerivedChargers{TPid: self.TPid, + Loadid: loadId, + Direction: record[0], + Tenant: record[1], + Category: record[2], + Account: record[3], + Subject: record[4]} + dcsId := newDcs.GetDerivedChargesId() + + if _, hasIt := dcs[dcsId]; !hasIt { + dcs[dcsId] = make([]*utils.TPDerivedCharger, 0) + } + dcs[dcsId] = append(dcs[dcsId], &utils.TPDerivedCharger{ + RunId: ValueOrDefault(record[5], "*default"), + RunFilters: record[6], + ReqTypeField: ValueOrDefault(record[7], "*default"), + DirectionField: ValueOrDefault(record[8], "*default"), + TenantField: ValueOrDefault(record[9], "*default"), + CategoryField: ValueOrDefault(record[10], "*default"), + AccountField: ValueOrDefault(record[11], "*default"), + SubjectField: ValueOrDefault(record[12], "*default"), + DestinationField: ValueOrDefault(record[13], "*default"), + SetupTimeField: ValueOrDefault(record[14], "*default"), + AnswerTimeField: ValueOrDefault(record[15], "*default"), + UsageField: ValueOrDefault(record[16], "*default"), + SupplierField: ValueOrDefault(record[17], "*default"), + }) + } + if err := self.StorDb.SetTPDerivedChargers(self.TPid, dcs); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + return nil +} + +func (self *TPCSVImporter) importCdrStats(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + css := make(map[string][]*utils.TPCdrStat) + lineNr := 0 + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + if len(record[CDRSTATIDX_QLENGHT]) == 0 { + record[CDRSTATIDX_QLENGHT] = "0" + } + if _, err = strconv.Atoi(record[CDRSTATIDX_QLENGHT]); err != nil { + log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error()) + continue + } + if _, hasIt := css[record[CDRSTATIDX_TAG]]; !hasIt { + css[record[CDRSTATIDX_TAG]] = make([]*utils.TPCdrStat, 0) + } + css[record[0]] = append(css[record[0]], &utils.TPCdrStat{ + QueueLength: record[CDRSTATIDX_QLENGHT], + TimeWindow: ValueOrDefault(record[CDRSTATIDX_TIMEWINDOW], "0"), + Metrics: record[CDRSTATIDX_METRICS], + SetupInterval: record[CDRSTATIDX_SETUPTIME], + TORs: record[CDRSTATIDX_TOR], + CdrHosts: record[CDRSTATIDX_CDRHOST], + CdrSources: record[CDRSTATIDX_CDRSRC], + ReqTypes: record[CDRSTATIDX_REQTYPE], + Directions: record[CDRSTATIDX_DIRECTION], + Tenants: record[CDRSTATIDX_TENANT], + Categories: record[CDRSTATIDX_CATEGORY], + Accounts: record[CDRSTATIDX_ACCOUNT], + Subjects: record[CDRSTATIDX_SUBJECT], + DestinationPrefixes: record[CDRSTATIDX_DSTPREFIX], + UsageInterval: record[CDRSTATIDX_USAGE], + Suppliers: record[CDRSTATIDX_SUPPLIER], + DisconnectCauses: record[CDRSTATIDX_DISCONNECT_CAUSE], + MediationRunIds: record[CDRSTATIDX_MEDRUN], + RatedAccounts: record[CDRSTATIDX_RTACCOUNT], + RatedSubjects: record[CDRSTATIDX_RTSUBJECT], + CostInterval: record[CDRSTATIDX_COST], + ActionTriggers: record[CDRSTATIDX_ATRIGGER], + }) + } + if err := self.StorDb.SetTPCdrStats(self.TPid, css); err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + return nil +}