diff --git a/engine/model_helpers.go b/engine/model_helpers.go
index 3e10fee76..59d40e1a2 100644
--- a/engine/model_helpers.go
+++ b/engine/model_helpers.go
@@ -36,7 +36,7 @@ func csvLoad(s interface{}, values []string) (interface{}, error) {
elem := reflect.New(st).Elem()
for fildName, fieldValue := range fieldValueMap {
field := elem.FieldByName(fildName)
- if field.IsValid() && field.CanSet() {
+ if field.IsValid() {
switch field.Kind() {
case reflect.Float64:
value, err := strconv.ParseFloat(fieldValue, 64)
@@ -52,6 +52,32 @@ func csvLoad(s interface{}, values []string) (interface{}, error) {
return elem.Interface(), nil
}
+func csvDump(s interface{}, sep string) (string, error) {
+ fieldIndexMap := make(map[string]int)
+ st := reflect.TypeOf(s)
+ numFields := st.NumField()
+ for i := 0; i < numFields; i++ {
+ field := st.Field(i)
+ index := field.Tag.Get("index")
+ if index != "" {
+ if idx, err := strconv.Atoi(index); err != nil {
+ return "", fmt.Errorf("invalid %v.%v index %v", st.Name(), field.Name, index)
+ } else {
+ fieldIndexMap[field.Name] = idx
+ }
+ }
+ }
+ elem := reflect.ValueOf(s)
+ result := make([]string, len(fieldIndexMap))
+ for fieldName, fieldIndex := range fieldIndexMap {
+ field := elem.FieldByName(fieldName)
+ if field.IsValid() && fieldIndex < len(result) {
+ result[fieldIndex] = field.String()
+ }
+ }
+ return strings.Join(result, sep), nil
+}
+
func getColumnCount(s interface{}) int {
st := reflect.TypeOf(s)
numFields := st.NumField()
diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go
new file mode 100644
index 000000000..8da4984ee
--- /dev/null
+++ b/engine/model_helpers_test.go
@@ -0,0 +1,21 @@
+package engine
+
+import "testing"
+
+func TestModelHelperCsvLoad(t *testing.T) {
+ l := csvLoad(TpDestination{}, []string{"TEST_DEST", "+492"})
+ tpd := l.(TpDestination)
+ if tpd.Tag != "TEST_DEST" || tpd.Prefix != "+492" {
+ t.Errorf("model load failed: %+v", tpd)
+ }
+}
+
+func TestModelHelperCsvDump(t *testing.T) {
+ tpd := &TpDestination{
+ Tag: "TEST_DEST",
+ Prefix: "+492"}
+ csv, err := csvDump(*tpd, ",")
+ if err != nil || csv != "TEST_DEST,+492" {
+ t.Errorf("model load failed: %+v", tpd)
+ }
+}
diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go
new file mode 100644
index 000000000..62be9b103
--- /dev/null
+++ b/engine/tpimporter_csv.go
@@ -0,0 +1,732 @@
+/*
+Real-time Charging System for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package engine
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "strconv"
+
+ "github.com/cgrates/cgrates/utils"
+)
+
+// Import tariff plan from csv into storDb
+type TPCSVImporter struct {
+ TPid string // Load data on this tpid
+ StorDb LoadStorage // StorDb connection handle
+ DirPath string // Directory path to import from
+ Sep rune // Separator in the csv file
+ Verbose bool // If true will print a detailed information instead of silently discarding it
+ ImportId string // Use this to differentiate between imports (eg: when autogenerating fields like RatingProfileId
+}
+
+// Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis.
+// Change it to func(string) error as soon as Travis updates.
+var fileHandlers = map[string]func(*TPCSVImporter, string) error{
+ utils.TIMINGS_CSV: (*TPCSVImporter).importTimings,
+ utils.DESTINATIONS_CSV: (*TPCSVImporter).importDestinations,
+ utils.RATES_CSV: (*TPCSVImporter).importRates,
+ utils.DESTINATION_RATES_CSV: (*TPCSVImporter).importDestinationRates,
+ utils.RATING_PLANS_CSV: (*TPCSVImporter).importRatingPlans,
+ utils.RATING_PROFILES_CSV: (*TPCSVImporter).importRatingProfiles,
+ utils.SHARED_GROUPS_CSV: (*TPCSVImporter).importSharedGroups,
+ utils.ACTIONS_CSV: (*TPCSVImporter).importActions,
+ utils.ACTION_PLANS_CSV: (*TPCSVImporter).importActionTimings,
+ utils.ACTION_TRIGGERS_CSV: (*TPCSVImporter).importActionTriggers,
+ utils.ACCOUNT_ACTIONS_CSV: (*TPCSVImporter).importAccountActions,
+ utils.DERIVED_CHARGERS_CSV: (*TPCSVImporter).importDerivedChargers,
+ utils.CDR_STATS_CSV: (*TPCSVImporter).importCdrStats,
+}
+
+func (self *TPCSVImporter) Run() error {
+ files, _ := ioutil.ReadDir(self.DirPath)
+ for _, f := range files {
+ fHandler, hasName := fileHandlers[f.Name()]
+ if !hasName {
+ continue
+ }
+ if err := fHandler(self, f.Name()); err != nil {
+ Logger.Err(fmt.Sprintf(" Importing file: %s, got error: %s", f.Name(), err.Error()))
+ }
+ }
+ return nil
+}
+
+// Handler importing timings from file, saved row by row to storDb
+func (self *TPCSVImporter) importTimings(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ lineNr := 0
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ tm := &utils.ApierTPTiming{TPid: self.TPid, TimingId: record[0], Years: record[1], Months: record[2], MonthDays: record[3], WeekDays: record[4], Time: record[5]}
+ if err := self.StorDb.SetTPTiming(tm); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+ }
+ return nil
+}
+
+func (self *TPCSVImporter) importDestinations(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ lineNr := 0
+ dests := make(map[string]*Destination) // Key:destId, value: listOfPrefixes
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ } else {
+ if dst, hasIt := dests[record[0]]; hasIt {
+ dst.Prefixes = append(dst.Prefixes, record[1])
+ } else {
+ dests[record[0]] = &Destination{record[0], []string{record[1]}}
+ }
+ }
+ }
+ for _, dst := range dests {
+ if err := self.StorDb.SetTPDestination(self.TPid, dst); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+ }
+ return nil
+}
+
+func (self *TPCSVImporter) importRates(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ lineNr := 0
+ rates := make(map[string][]*utils.RateSlot)
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ newRt, err := NewLoadRate(record[0], record[1], record[2], record[3], record[4], record[5])
+ if err != nil {
+ return err
+ }
+ if _, hasIt := rates[record[0]]; !hasIt {
+ rates[record[0]] = make([]*utils.RateSlot, 0)
+ }
+ rates[record[0]] = append(rates[record[0]], newRt.RateSlots...)
+ }
+ if err := self.StorDb.SetTPRates(self.TPid, rates); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+ return nil
+}
+
+func (self *TPCSVImporter) importDestinationRates(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ lineNr := 0
+ drs := make(map[string][]*utils.DestinationRate)
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ roundingDecimals, err := strconv.Atoi(record[4])
+ if err != nil {
+ log.Printf("Error parsing rounding decimals: %s", record[4])
+ return err
+ }
+ maxCost, err := strconv.ParseFloat(record[5], 64)
+ if err != nil {
+ log.Printf("Error parsing max cost from: %v", record[5])
+ return err
+ }
+ if _, hasIt := drs[record[0]]; !hasIt {
+ drs[record[0]] = make([]*utils.DestinationRate, 0)
+ }
+ drs[record[0]] = append(drs[record[0]], &utils.DestinationRate{
+ DestinationId: record[1],
+ RateId: record[2],
+ RoundingMethod: record[3],
+ RoundingDecimals: roundingDecimals,
+ MaxCost: maxCost,
+ MaxCostStrategy: record[6],
+ })
+ }
+
+ if err := self.StorDb.SetTPDestinationRates(self.TPid, drs); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+
+ return nil
+}
+
+func (self *TPCSVImporter) importRatingPlans(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ lineNr := 0
+ rpls := make(map[string][]*utils.TPRatingPlanBinding)
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ weight, err := strconv.ParseFloat(record[3], 64)
+ if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ if _, hasIt := rpls[record[0]]; !hasIt {
+ rpls[record[0]] = make([]*utils.TPRatingPlanBinding, 0)
+ }
+ rpls[record[0]] = append(rpls[record[0]], &utils.TPRatingPlanBinding{
+ DestinationRatesId: record[1],
+ Weight: weight,
+ TimingId: record[2],
+ })
+ }
+ if err := self.StorDb.SetTPRatingPlans(self.TPid, rpls); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+
+ return nil
+}
+
+func (self *TPCSVImporter) importRatingProfiles(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ lineNr := 0
+ rpfs := make(map[string]*utils.TPRatingProfile)
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ direction, tenant, tor, subject, ratingPlanTag, fallbacksubject := record[0], record[1], record[2], record[3], record[5], record[6]
+ _, err = utils.ParseDate(record[4])
+ if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ loadId := utils.CSV_LOAD //Autogenerate rating profile id
+ if self.ImportId != "" {
+ loadId += "_" + self.ImportId
+ }
+ newRp := &utils.TPRatingProfile{
+ TPid: self.TPid,
+ LoadId: loadId,
+ Tenant: tenant,
+ Category: tor,
+ Direction: direction,
+ Subject: subject,
+ RatingPlanActivations: []*utils.TPRatingActivation{
+ &utils.TPRatingActivation{ActivationTime: record[4], RatingPlanId: ratingPlanTag, FallbackSubjects: fallbacksubject}},
+ }
+ if rp, hasIt := rpfs[newRp.KeyId()]; hasIt {
+ rp.RatingPlanActivations = append(rp.RatingPlanActivations, newRp.RatingPlanActivations...)
+ } else {
+ rpfs[newRp.KeyId()] = newRp
+ }
+ }
+ if err := self.StorDb.SetTPRatingProfiles(self.TPid, rpfs); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+
+ return nil
+}
+
+func (self *TPCSVImporter) importSharedGroups(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ shgs := make(map[string][]*utils.TPSharedGroup)
+ lineNr := 0
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ if _, hasIt := shgs[record[0]]; !hasIt {
+ shgs[record[0]] = make([]*utils.TPSharedGroup, 0)
+ }
+ shgs[record[0]] = append(shgs[record[0]], &utils.TPSharedGroup{Account: record[1], Strategy: record[2], RatingSubject: record[3]})
+ }
+ if err := self.StorDb.SetTPSharedGroups(self.TPid, shgs); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+ return nil
+}
+
+func (self *TPCSVImporter) importActions(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ acts := make(map[string][]*utils.TPAction)
+ lineNr := 0
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ actId, actionType, balanceTag, balanceType, direction, destTags, rateSubject, category, sharedGroup := record[ACTSCSVIDX_TAG], record[ACTSCSVIDX_ACTION],
+ record[ACTSCSVIDX_BALANCE_TAG], record[ACTSCSVIDX_BALANCE_TYPE], record[ACTSCSVIDX_DIRECTION], record[ACTSCSVIDX_DESTINATION_TAG], record[ACTSCSVIDX_RATING_SUBJECT],
+ record[ACTSCSVIDX_CATEGORY], record[ACTSCSVIDX_SHARED_GROUP]
+ units, err := strconv.ParseFloat(record[ACTSCSVIDX_UNITS], 64)
+ if err != nil && record[ACTSCSVIDX_UNITS] != "" {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ balanceWeight, _ := strconv.ParseFloat(record[ACTSCSVIDX_BALANCE_WEIGHT], 64)
+ weight, err := strconv.ParseFloat(record[ACTSCSVIDX_WEIGHT], 64)
+ if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ if _, hasIt := acts[actId]; !hasIt {
+ acts[actId] = make([]*utils.TPAction, 0)
+ }
+ acts[actId] = append(acts[actId], &utils.TPAction{
+ Identifier: actionType,
+ BalanceId: balanceTag,
+ BalanceType: balanceType,
+ Direction: direction,
+ Units: units,
+ ExpiryTime: record[ACTSCSVIDX_EXPIRY_TIME],
+ DestinationIds: destTags,
+ RatingSubject: rateSubject,
+ Category: category,
+ SharedGroup: sharedGroup,
+ BalanceWeight: balanceWeight,
+ ExtraParameters: record[ACTSCSVIDX_EXTRA_PARAMS],
+ Weight: weight,
+ })
+ }
+ if err := self.StorDb.SetTPActions(self.TPid, acts); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+
+ }
+ return nil
+}
+
+func (self *TPCSVImporter) importActionTimings(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ lineNr := 0
+ aplns := make(map[string][]*utils.TPActionTiming)
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ tag, actionsTag, timingTag := record[0], record[1], record[2]
+ weight, err := strconv.ParseFloat(record[3], 64)
+ if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ if _, hasIt := aplns[tag]; !hasIt {
+ aplns[tag] = make([]*utils.TPActionTiming, 0)
+ }
+ aplns[tag] = append(aplns[tag], &utils.TPActionTiming{
+ ActionsId: actionsTag,
+ TimingId: timingTag,
+ Weight: weight,
+ })
+ }
+ if err := self.StorDb.SetTPActionTimings(self.TPid, aplns); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+
+ return nil
+}
+
+func (self *TPCSVImporter) importActionTriggers(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ lineNr := 0
+ atrs := make(map[string][]*utils.TPActionTrigger)
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ tag, balanceId, balanceType, direction, thresholdType, destinationTags, balanceExpirationDate, balanceRatingSubject, balanceCategory, balanceSharedGroup, actionsTag := record[ATRIGCSVIDX_TAG], record[ATRIGCSVIDX_BAL_TAG], record[ATRIGCSVIDX_BAL_TYPE],
+ record[ATRIGCSVIDX_BAL_DIRECTION], record[ATRIGCSVIDX_THRESHOLD_TYPE], record[ATRIGCSVIDX_BAL_DESTINATION_TAG], record[ATRIGCSVIDX_BAL_EXPIRY_TIME], record[ATRIGCSVIDX_BAL_RATING_SUBJECT],
+ record[ATRIGCSVIDX_BAL_CATEGORY], record[ATRIGCSVIDX_BAL_SHARED_GROUP], record[ATRIGCSVIDX_ACTIONS_TAG]
+ threshold, err := strconv.ParseFloat(record[ATRIGCSVIDX_THRESHOLD_VALUE], 64)
+ if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ recurrent, err := strconv.ParseBool(record[ATRIGCSVIDX_RECURRENT])
+ if err != nil {
+ log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error())
+ continue
+ }
+ balanceWeight, err := strconv.ParseFloat(record[ATRIGCSVIDX_BAL_WEIGHT], 64)
+ if err != nil && record[ATRIGCSVIDX_BAL_WEIGHT] != "" {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ minQueuedItems, err := strconv.Atoi(record[ATRIGCSVIDX_STATS_MIN_QUEUED_ITEMS])
+ if err != nil && record[ATRIGCSVIDX_STATS_MIN_QUEUED_ITEMS] != "" {
+ log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error())
+ continue
+ }
+ weight, err := strconv.ParseFloat(record[ATRIGCSVIDX_WEIGHT], 64)
+ if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ if _, hasIt := atrs[tag]; !hasIt {
+ atrs[tag] = make([]*utils.TPActionTrigger, 0)
+ }
+ atrs[tag] = append(atrs[tag], &utils.TPActionTrigger{
+ ThresholdType: thresholdType,
+ ThresholdValue: threshold,
+ Recurrent: recurrent,
+ MinSleep: record[ATRIGCSVIDX_MIN_SLEEP],
+ BalanceId: balanceId,
+ BalanceType: balanceType,
+ BalanceDirection: direction,
+ BalanceDestinationIds: destinationTags,
+ BalanceWeight: balanceWeight,
+ BalanceExpirationDate: balanceExpirationDate,
+ BalanceRatingSubject: balanceRatingSubject,
+ BalanceCategory: balanceCategory,
+ BalanceSharedGroup: balanceSharedGroup,
+ MinQueuedItems: minQueuedItems,
+ Weight: weight,
+ ActionsId: actionsTag,
+ })
+ }
+ if err := self.StorDb.SetTPActionTriggers(self.TPid, atrs); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+
+ return nil
+}
+
+func (self *TPCSVImporter) importAccountActions(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ loadId := utils.CSV_LOAD //Autogenerate account actions profile id
+ if self.ImportId != "" {
+ loadId += "_" + self.ImportId
+ }
+ lineNr := 0
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ tenant, account, direction, actionTimingsTag, actionTriggersTag := record[0], record[1], record[2], record[3], record[4]
+
+ tpaa := &utils.TPAccountActions{TPid: self.TPid, LoadId: loadId, Tenant: tenant, Account: account, Direction: direction,
+ ActionPlanId: actionTimingsTag, ActionTriggersId: actionTriggersTag}
+ aa := map[string]*utils.TPAccountActions{tpaa.KeyId(): tpaa}
+ if err := self.StorDb.SetTPAccountActions(self.TPid, aa); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+ }
+ return nil
+}
+
+func (self *TPCSVImporter) importDerivedChargers(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ loadId := utils.CSV_LOAD //Autogenerate account actions profile id
+ if self.ImportId != "" {
+ loadId += "_" + self.ImportId
+ }
+ dcs := make(map[string][]*utils.TPDerivedCharger)
+ lineNr := 0
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ newDcs := utils.TPDerivedChargers{TPid: self.TPid,
+ Loadid: loadId,
+ Direction: record[0],
+ Tenant: record[1],
+ Category: record[2],
+ Account: record[3],
+ Subject: record[4]}
+ dcsId := newDcs.GetDerivedChargesId()
+
+ if _, hasIt := dcs[dcsId]; !hasIt {
+ dcs[dcsId] = make([]*utils.TPDerivedCharger, 0)
+ }
+ dcs[dcsId] = append(dcs[dcsId], &utils.TPDerivedCharger{
+ RunId: ValueOrDefault(record[5], "*default"),
+ RunFilters: record[6],
+ ReqTypeField: ValueOrDefault(record[7], "*default"),
+ DirectionField: ValueOrDefault(record[8], "*default"),
+ TenantField: ValueOrDefault(record[9], "*default"),
+ CategoryField: ValueOrDefault(record[10], "*default"),
+ AccountField: ValueOrDefault(record[11], "*default"),
+ SubjectField: ValueOrDefault(record[12], "*default"),
+ DestinationField: ValueOrDefault(record[13], "*default"),
+ SetupTimeField: ValueOrDefault(record[14], "*default"),
+ AnswerTimeField: ValueOrDefault(record[15], "*default"),
+ UsageField: ValueOrDefault(record[16], "*default"),
+ SupplierField: ValueOrDefault(record[17], "*default"),
+ })
+ }
+ if err := self.StorDb.SetTPDerivedChargers(self.TPid, dcs); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+ return nil
+}
+
+func (self *TPCSVImporter) importCdrStats(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ fParser, err := NewTPCSVFileParser(self.DirPath, fn)
+ if err != nil {
+ return err
+ }
+ css := make(map[string][]*utils.TPCdrStat)
+ lineNr := 0
+ for {
+ lineNr++
+ record, err := fParser.ParseNextLine()
+ if err == io.EOF { // Reached end of file
+ break
+ } else if err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
+ }
+ continue
+ }
+ if len(record[CDRSTATIDX_QLENGHT]) == 0 {
+ record[CDRSTATIDX_QLENGHT] = "0"
+ }
+ if _, err = strconv.Atoi(record[CDRSTATIDX_QLENGHT]); err != nil {
+ log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error())
+ continue
+ }
+ if _, hasIt := css[record[CDRSTATIDX_TAG]]; !hasIt {
+ css[record[CDRSTATIDX_TAG]] = make([]*utils.TPCdrStat, 0)
+ }
+ css[record[0]] = append(css[record[0]], &utils.TPCdrStat{
+ QueueLength: record[CDRSTATIDX_QLENGHT],
+ TimeWindow: ValueOrDefault(record[CDRSTATIDX_TIMEWINDOW], "0"),
+ Metrics: record[CDRSTATIDX_METRICS],
+ SetupInterval: record[CDRSTATIDX_SETUPTIME],
+ TORs: record[CDRSTATIDX_TOR],
+ CdrHosts: record[CDRSTATIDX_CDRHOST],
+ CdrSources: record[CDRSTATIDX_CDRSRC],
+ ReqTypes: record[CDRSTATIDX_REQTYPE],
+ Directions: record[CDRSTATIDX_DIRECTION],
+ Tenants: record[CDRSTATIDX_TENANT],
+ Categories: record[CDRSTATIDX_CATEGORY],
+ Accounts: record[CDRSTATIDX_ACCOUNT],
+ Subjects: record[CDRSTATIDX_SUBJECT],
+ DestinationPrefixes: record[CDRSTATIDX_DSTPREFIX],
+ UsageInterval: record[CDRSTATIDX_USAGE],
+ Suppliers: record[CDRSTATIDX_SUPPLIER],
+ DisconnectCauses: record[CDRSTATIDX_DISCONNECT_CAUSE],
+ MediationRunIds: record[CDRSTATIDX_MEDRUN],
+ RatedAccounts: record[CDRSTATIDX_RTACCOUNT],
+ RatedSubjects: record[CDRSTATIDX_RTSUBJECT],
+ CostInterval: record[CDRSTATIDX_COST],
+ ActionTriggers: record[CDRSTATIDX_ATRIGGER],
+ })
+ }
+ if err := self.StorDb.SetTPCdrStats(self.TPid, css); err != nil {
+ if self.Verbose {
+ log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
+ }
+ }
+ return nil
+}