TPCSVImporter Actions, store refactoring, small API params changes

This commit is contained in:
DanB
2013-07-29 21:18:02 +02:00
parent ddc2240c67
commit d4017890ec
23 changed files with 323 additions and 247 deletions

View File

@@ -36,6 +36,8 @@ type Action struct {
Units float64
Weight float64
MinuteBucket *MinuteBucket
DestinationTag, RateType string // From here for import/load purposes only
RateValue, MinutesWeight float64
}
const (

View File

@@ -349,7 +349,7 @@ func (csvr *CSVReader) LoadActions() (err error) {
if err != nil {
return errors.New(fmt.Sprintf("Could not parse action units: %v", err))
}
var expiryTime time.Time // Empty initialized time represents never expire
var expiryTime time.Time // Empty initialized time represents never expire
if record[5] != "*unlimited" { // ToDo: Expand here for other meta tags or go way of adding time for expiry
expiryTime, err = time.Parse(time.RFC3339, record[5])
if err != nil {

View File

@@ -20,15 +20,15 @@ package engine
import (
"bufio"
"path"
"errors"
"fmt"
"github.com/cgrates/cgrates/utils"
"log"
"os"
"path"
"regexp"
"strconv"
"strings"
"github.com/cgrates/cgrates/utils"
)
type TPLoader interface {
@@ -202,45 +202,43 @@ func ValidateCSVData(fn string, re *regexp.Regexp) (err error) {
}
type FileLineRegexValidator struct {
FieldsPerRecord int // Number of fields in one record, useful for crosschecks
Rule *regexp.Regexp // Regexp rule
Message string // Pass this message as helper
FieldsPerRecord int // Number of fields in one record, useful for crosschecks
Rule *regexp.Regexp // Regexp rule
Message string // Pass this message as helper
}
var FileValidators = map[string]*FileLineRegexValidator{
utils.DESTINATIONS_CSV: &FileLineRegexValidator{ utils.DESTINATIONS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\+?\d+.?\d*){1}$`),
"Tag([0-9A-Za-z_]),Prefix([0-9])"},
utils.TIMINGS_CSV: &FileLineRegexValidator{ utils.TIMINGS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\*any\s*,\s*|(?:\d{1,4};?)+\s*,\s*|\s*,\s*){4}(?:\d{2}:\d{2}:\d{2}|\*asap){1}$`),
"Tag([0-9A-Za-z_]),Years([0-9;]|*all|<empty>),Months([0-9;]|*all|<empty>),MonthDays([0-9;]|*all|<empty>),WeekDays([0-9;]|*all|<empty>),Time([0-9:]|*asap)"},
utils.RATES_CSV: &FileLineRegexValidator{ utils.RATES_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\d+\.?\d*,){5}(?:\*\w+,){1}(?:\d+\.?\d*,?){2}$`),
"Tag([0-9A-Za-z_]),ConnectFee([0-9.]),Rate([0-9.]),RatedUnits([0-9.]),RateIncrement([0-9.])"},
utils.DESTINATION_RATES_CSV: &FileLineRegexValidator{ utils.DESTINATION_RATES_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,?\s*){3}$`),
"Tag([0-9A-Za-z_]),DestinationsTag([0-9A-Za-z_]),RateTag([0-9A-Za-z_])"},
utils.DESTRATE_TIMINGS_CSV: &FileLineRegexValidator{ utils.DESTRATE_TIMINGS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+.?\d*){1}$`),
"Tag([0-9A-Za-z_]),DestinationRatesTag([0-9A-Za-z_]),TimingProfile([0-9A-Za-z_]),Weight([0-9.])"},
utils.RATE_PROFILES_CSV: &FileLineRegexValidator{ utils.RATE_PROFILES_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){2}(?:\*out\s*,\s*){1}(?:\*any\s*,\s*|\w+\s*,\s*){1}(?:\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z){1}(?:\w*\s*,?\s*){2}$`),
"Tenant([0-9A-Za-z_]),TOR([0-9A-Za-z_]),Direction(*out),Subject([0-9A-Za-z_]|*all),RatesFallbackSubject([0-9A-Za-z_]|<empty>),RatesTimingTag([0-9A-Za-z_]),ActivationTime([0-9T:X])"},
utils.ACTIONS_CSV: &FileLineRegexValidator{ utils.ACTIONS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*),(?:\*\w+\s*),(?:\*\w+\s*),(?:\*out\s*),(?:\d+\s*),(?:\*\w+\s*|\+\d+[smh]\s*|\d+\s*),(?:\*any|\w+\s*),(?:\*\w+\s*)?,(?:\d+\.?\d*\s*)?,(?:\d+\.?\d*\s*)?,(?:\d+\.?\d*\s*)$`),
"Tag([0-9A-Za-z_]),Action([0-9A-Za-z_]),BalanceType([*a-z_]),Direction(*out),Units([0-9]),ExpiryTime(*[a-z_]|+[0-9][smh]|[0-9])DestinationTag([0-9A-Za-z_]|*all),RateType(*[a-z_]),RateValue([0-9.]),MinutesWeight([0-9.]),Weight([0-9.])"},
utils.ACTION_TIMINGS_CSV: &FileLineRegexValidator{ utils.ACTION_TIMINGS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+\.?\d*){1}`),
"Tag([0-9A-Za-z_]),ActionsTag([0-9A-Za-z_]),TimingTag([0-9A-Za-z_]),Weight([0-9.])"},
utils.ACTION_TRIGGERS_CSV: &FileLineRegexValidator{ utils.ACTION_TRIGGERS_NRCOLS,
regexp.MustCompile(`(?:\w+),(?:\*\w+),(?:\*out),(?:\*\w+),(?:\d+\.?\d*),(?:\w+|\*any)?,(?:\w+),(?:\d+\.?\d*)$`),
"Tag([0-9A-Za-z_]),BalanceType(*[a-z_]),Direction(*out),ThresholdType(*[a-z_]),ThresholdValue([0-9]+),DestinationTag([0-9A-Za-z_]|*all),ActionsTag([0-9A-Za-z_]),Weight([0-9]+)"},
utils.ACCOUNT_ACTIONS_CSV: &FileLineRegexValidator{ utils.ACCOUNT_ACTIONS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\w+\s*,\s*){1}(?:\*out\s*,\s*){1}(?:\w+\s*,?\s*){2}$`),
"Tenant([0-9A-Za-z_]),Account([0-9A-Za-z_.]),Direction(*out),ActionTimingsTag([0-9A-Za-z_]),ActionTriggersTag([0-9A-Za-z_])"},
}
var FileValidators = map[string]*FileLineRegexValidator{
utils.DESTINATIONS_CSV: &FileLineRegexValidator{utils.DESTINATIONS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\+?\d+.?\d*){1}$`),
"Tag([0-9A-Za-z_]),Prefix([0-9])"},
utils.TIMINGS_CSV: &FileLineRegexValidator{utils.TIMINGS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\*any\s*,\s*|(?:\d{1,4};?)+\s*,\s*|\s*,\s*){4}(?:\d{2}:\d{2}:\d{2}|\*asap){1}$`),
"Tag([0-9A-Za-z_]),Years([0-9;]|*all|<empty>),Months([0-9;]|*all|<empty>),MonthDays([0-9;]|*all|<empty>),WeekDays([0-9;]|*all|<empty>),Time([0-9:]|*asap)"},
utils.RATES_CSV: &FileLineRegexValidator{utils.RATES_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\d+\.?\d*,){5}(?:\*\w+,){1}(?:\d+\.?\d*,?){2}$`),
"Tag([0-9A-Za-z_]),ConnectFee([0-9.]),Rate([0-9.]),RatedUnits([0-9.]),RateIncrement([0-9.])"},
utils.DESTINATION_RATES_CSV: &FileLineRegexValidator{utils.DESTINATION_RATES_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,?\s*){3}$`),
"Tag([0-9A-Za-z_]),DestinationsTag([0-9A-Za-z_]),RateTag([0-9A-Za-z_])"},
utils.DESTRATE_TIMINGS_CSV: &FileLineRegexValidator{utils.DESTRATE_TIMINGS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+.?\d*){1}$`),
"Tag([0-9A-Za-z_]),DestinationRatesTag([0-9A-Za-z_]),TimingProfile([0-9A-Za-z_]),Weight([0-9.])"},
utils.RATE_PROFILES_CSV: &FileLineRegexValidator{utils.RATE_PROFILES_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){2}(?:\*out\s*,\s*){1}(?:\*any\s*,\s*|\w+\s*,\s*){1}(?:\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z){1}(?:\w*\s*,?\s*){2}$`),
"Tenant([0-9A-Za-z_]),TOR([0-9A-Za-z_]),Direction(*out),Subject([0-9A-Za-z_]|*all),RatesFallbackSubject([0-9A-Za-z_]|<empty>),RatesTimingTag([0-9A-Za-z_]),ActivationTime([0-9T:X])"},
utils.ACTIONS_CSV: &FileLineRegexValidator{utils.ACTIONS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*),(?:\*\w+\s*),(?:\*\w+\s*),(?:\*out\s*),(?:\d+\s*),(?:\*\w+\s*|\+\d+[smh]\s*|\d+\s*),(?:\*any|\w+\s*),(?:\*\w+\s*)?,(?:\d+\.?\d*\s*)?,(?:\d+\.?\d*\s*)?,(?:\d+\.?\d*\s*)$`),
"Tag([0-9A-Za-z_]),Action([0-9A-Za-z_]),BalanceType([*a-z_]),Direction(*out),Units([0-9]),ExpiryTime(*[a-z_]|+[0-9][smh]|[0-9])DestinationTag([0-9A-Za-z_]|*all),RateType(*[a-z_]),RateValue([0-9.]),MinutesWeight([0-9.]),Weight([0-9.])"},
utils.ACTION_TIMINGS_CSV: &FileLineRegexValidator{utils.ACTION_TIMINGS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+\.?\d*){1}`),
"Tag([0-9A-Za-z_]),ActionsTag([0-9A-Za-z_]),TimingTag([0-9A-Za-z_]),Weight([0-9.])"},
utils.ACTION_TRIGGERS_CSV: &FileLineRegexValidator{utils.ACTION_TRIGGERS_NRCOLS,
regexp.MustCompile(`(?:\w+),(?:\*\w+),(?:\*out),(?:\*\w+),(?:\d+\.?\d*),(?:\w+|\*any)?,(?:\w+),(?:\d+\.?\d*)$`),
"Tag([0-9A-Za-z_]),BalanceType(*[a-z_]),Direction(*out),ThresholdType(*[a-z_]),ThresholdValue([0-9]+),DestinationTag([0-9A-Za-z_]|*all),ActionsTag([0-9A-Za-z_]),Weight([0-9]+)"},
utils.ACCOUNT_ACTIONS_CSV: &FileLineRegexValidator{utils.ACCOUNT_ACTIONS_NRCOLS,
regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\w+\s*,\s*){1}(?:\*out\s*,\s*){1}(?:\w+\s*,?\s*){2}$`),
"Tenant([0-9A-Za-z_]),Account([0-9A-Za-z_.]),Direction(*out),ActionTimingsTag([0-9A-Za-z_]),ActionTriggersTag([0-9A-Za-z_])"},
}
func NewTPCSVFileParser(dirPath, fileName string) (*TPCSVFileParser, error) {
validator, hasValidator := FileValidators[fileName]
@@ -248,7 +246,7 @@ func NewTPCSVFileParser(dirPath, fileName string) (*TPCSVFileParser, error) {
return nil, fmt.Errorf("No validator found for file <%s>", fileName)
}
// Open the file here
fin, err := os.Open( path.Join(dirPath, fileName) )
fin, err := os.Open(path.Join(dirPath, fileName))
if err != nil {
return nil, err
}
@@ -259,11 +257,11 @@ func NewTPCSVFileParser(dirPath, fileName string) (*TPCSVFileParser, error) {
// Opens the connection to a file and returns the parsed lines one by one when ParseNextLine() is called
type TPCSVFileParser struct {
validator *FileLineRegexValidator // Row validator
reader *bufio.Reader // Reader to the file we are interested in
validator *FileLineRegexValidator // Row validator
reader *bufio.Reader // Reader to the file we are interested in
}
func (self *TPCSVFileParser) ParseNextLine() ( []string, error ) {
func (self *TPCSVFileParser) ParseNextLine() ([]string, error) {
line, truncated, err := self.reader.ReadLine()
if err != nil {
return nil, err
@@ -279,7 +277,7 @@ func (self *TPCSVFileParser) ParseNextLine() ( []string, error ) {
return nil, fmt.Errorf("Invalid line, <%s>", self.validator.Message)
}
// Open csv reader directly on string line
csvReader, _, err := openStringCSVReader( string(line), ',', self.validator.FieldsPerRecord )
csvReader, _, err := openStringCSVReader(string(line), ',', self.validator.FieldsPerRecord)
if err != nil {
return nil, err
}

View File

@@ -29,11 +29,11 @@ const (
)
type RatingProfile struct {
Id string
FallbackKey string // FallbackKey is used as complete combination of Tenant:TOR:Direction:Subject
DestinationMap map[string][]*ActivationPeriod
Id string
FallbackKey string // FallbackKey is used as complete combination of Tenant:TOR:Direction:Subject
DestinationMap map[string][]*ActivationPeriod
Tag, Tenant, TOR, Direction, Subject, DestRatesTimingTag, RatesFallbackSubject string // used only for loading
ActivationTime int64
ActivationTime int64
}
// Adds an activation period that applyes to current rating profile if not already present.

View File

@@ -84,7 +84,7 @@ type DataStorage interface {
GetTPRatingProfile(string, string) (*utils.TPRatingProfile, error)
GetTPRatingProfileIds(*utils.AttrTPRatingProfileIds) ([]string, error)
ExistsTPActions(string, string) (bool, error)
SetTPActions(*utils.TPActions) error
SetTPActions(string, map[string][]*Action) error
GetTPActions(string, string) (*utils.TPActions, error)
GetTPActionIds(string) ([]string, error)
ExistsTPActionTimings(string, string) (bool, error)

View File

@@ -177,7 +177,7 @@ func (ms *MapStorage) ExistsTPActions(tpid, aId string) (bool, error) {
return false, errors.New(utils.ERR_NOT_IMPLEMENTED)
}
func (ms *MapStorage) SetTPActions(ap *utils.TPActions) error {
func (ms *MapStorage) SetTPActions(tpid string, acts map[string][]*Action) error {
return errors.New(utils.ERR_NOT_IMPLEMENTED)
}

View File

@@ -252,7 +252,7 @@ func (ms *MongoStorage) ExistsTPActions(tpid, aId string) (bool, error) {
return false, errors.New(utils.ERR_NOT_IMPLEMENTED)
}
func (ms *MongoStorage) SetTPActions(ap *utils.TPActions) error {
func (ms *MongoStorage) SetTPActions(tpid string, acts map[string][]*Action) error {
return errors.New(utils.ERR_NOT_IMPLEMENTED)
}

View File

@@ -207,7 +207,7 @@ func (rs *RedisStorage) ExistsTPActions(tpid, aId string) (bool, error) {
return false, errors.New(utils.ERR_NOT_IMPLEMENTED)
}
func (rs *RedisStorage) SetTPActions(ap *utils.TPActions) error {
func (rs *RedisStorage) SetTPActions(tpid string, acts map[string][]*Action) error {
return errors.New(utils.ERR_NOT_IMPLEMENTED)
}

View File

@@ -222,7 +222,7 @@ func (self *SQLStorage) SetTPRates(tpid string, rts map[string][]*Rate) error {
qry += ","
}
qry += fmt.Sprintf("('%s', '%s', %f, %f, %d, %d,%d,'%s', %d, %f)",
tpid, rtId, rt.ConnectFee, rt.Price, int(rt.PricedUnits), int(rt.RateIncrements), int(rt.GroupInterval),
tpid, rtId, rt.ConnectFee, rt.Price, int(rt.PricedUnits), int(rt.RateIncrements), int(rt.GroupInterval),
rt.RoundingMethod, rt.RoundingDecimals, rt.Weight)
i++
}
@@ -250,7 +250,7 @@ func (self *SQLStorage) GetTPRate(tpid, rtId string) (*utils.TPRate, error) {
if err != nil {
return nil, err
}
rt.RateSlots = append(rt.RateSlots, utils.RateSlot{connectFee, rate, ratedUnits, rateIncrements, groupInterval,
rt.RateSlots = append(rt.RateSlots, utils.RateSlot{connectFee, rate, ratedUnits, rateIncrements, groupInterval,
roundingMethod, roundingDecimals, weight})
}
if i == 0 {
@@ -376,7 +376,7 @@ func (self *SQLStorage) SetTPDestRateTimings(tpid string, drts map[string][]*Des
i := 0
for drtId, drtRows := range drts {
for _, drt := range drtRows {
if i!=0 { //Consecutive values after the first will be prefixed with "," as separator
if i != 0 { //Consecutive values after the first will be prefixed with "," as separator
qry += ","
}
qry += fmt.Sprintf("('%s','%s','%s','%s',%f)",
@@ -450,7 +450,7 @@ func (self *SQLStorage) SetTPRatingProfiles(tpid string, rps map[string][]*Ratin
if len(rps) == 0 {
return nil //Nothing to set
}
qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,tenant,tor,direction,subject,activation_time,destrates_timing_tag,rates_fallback_subject) VALUES ",
qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,tenant,tor,direction,subject,activation_time,destrates_timing_tag,rates_fallback_subject) VALUES ",
utils.TBL_TP_RATE_PROFILES)
i := 0
for rpId, rp := range rps {
@@ -458,11 +458,11 @@ func (self *SQLStorage) SetTPRatingProfiles(tpid string, rps map[string][]*Ratin
if i != 0 { //Consecutive values after the first will be prefixed with "," as separator
qry += ","
}
qry += fmt.Sprintf("('%s', '%s', '%s', '%s', '%s', '%s', %d,'%s','%s')", tpid, rpId, rpa.Tenant, rpa.TOR, rpa.Direction,
qry += fmt.Sprintf("('%s', '%s', '%s', '%s', '%s', '%s', %d,'%s','%s')", tpid, rpId, rpa.Tenant, rpa.TOR, rpa.Direction,
rpa.Subject, rpa.ActivationTime, rpa.DestRatesTimingTag, rpa.RatesFallbackSubject)
i++
}
}
if _, err := self.Db.Exec(qry); err != nil {
return err
@@ -546,19 +546,26 @@ func (self *SQLStorage) ExistsTPActions(tpid, actsId string) (bool, error) {
return exists, nil
}
func (self *SQLStorage) SetTPActions(acts *utils.TPActions) error {
if len(acts.Actions) == 0 {
func (self *SQLStorage) SetTPActions(tpid string, acts map[string][]*Action) error {
if len(acts) == 0 {
return nil //Nothing to set
}
// Using multiple values in query to spare some network processing time
qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,action,balance_tag,direction,units,expiration_time,destination_tag,rate_type,rate, minutes_weight,weight) VALUES ", utils.TBL_TP_ACTIONS)
for idx, act := range acts.Actions {
if idx != 0 { //Consecutive values after the first will be prefixed with "," as separator
qry += ","
qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,action,balance_type,direction,units,expiry_time,destination_tag,rate_type,rate, minutes_weight,weight) VALUES ", utils.TBL_TP_ACTIONS)
i := 0
for actId, actRows := range acts {
for _, act := range actRows {
if i != 0 { //Consecutive values after the first will be prefixed with "," as separator
qry += ","
}
var expTime int64
if !act.ExpirationDate.IsZero() {
expTime = act.ExpirationDate.Unix()
}
qry += fmt.Sprintf("('%s','%s','%s','%s','%s',%f,%d,'%s','%s',%f,%f,%f)",
tpid, actId, act.ActionType, act.BalanceId, act.Direction, act.Units, expTime,
act.DestinationTag, act.RateType, act.RateValue, act.MinutesWeight, act.Weight)
i++
}
qry += fmt.Sprintf("('%s','%s','%s','%s','%s',%f,%d,'%s','%s',%f,%f,%f)",
acts.TPid, acts.ActionsId, act.Identifier, act.BalanceId, act.Direction, act.Units, act.ExpirationTime,
act.DestinationId, act.RateType, act.Rate, act.MinutesWeight, act.Weight)
}
if _, err := self.Db.Exec(qry); err != nil {
return err
@@ -567,7 +574,7 @@ func (self *SQLStorage) SetTPActions(acts *utils.TPActions) error {
}
func (self *SQLStorage) GetTPActions(tpid, actsId string) (*utils.TPActions, error) {
rows, err := self.Db.Query(fmt.Sprintf("SELECT action,balance_tag,direction,units,expiration_time,destination_tag,rate_type,rate, minutes_weight,weight FROM %s WHERE tpid='%s' AND tag='%s'", utils.TBL_TP_ACTIONS, tpid, actsId))
rows, err := self.Db.Query(fmt.Sprintf("SELECT action,balance_type,direction,units,expiry_time,destination_tag,rate_type,rate, minutes_weight,weight FROM %s WHERE tpid='%s' AND tag='%s'", utils.TBL_TP_ACTIONS, tpid, actsId))
if err != nil {
return nil, err
}
@@ -703,7 +710,7 @@ func (self *SQLStorage) SetTPActionTriggers(tpid string, ats map[string][]*Actio
if len(ats) == 0 {
return nil //Nothing to set
}
qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,balance_tag,direction,threshold_type,threshold_value,destination_tag,actions_tag,weight) VALUES ",
qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,balance_type,direction,threshold_type,threshold_value,destination_tag,actions_tag,weight) VALUES ",
utils.TBL_TP_ACTION_TRIGGERS)
i := 0
for atId, atRows := range ats {
@@ -1105,8 +1112,8 @@ func (self *SQLStorage) GetTpActions(tpid, tag string) (map[string][]*Action, er
for rows.Next() {
var id int
var units, rate, minutes_weight, weight float64
var tpid, tag, action, balance_tag, direction, destinations_tag, rate_type, expirationDate string
if err := rows.Scan(&id, &tpid, &tag, &action, &balance_tag, &direction, &units, &expirationDate, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil {
var tpid, tag, action, balance_type, direction, destinations_tag, rate_type, expirationDate string
if err := rows.Scan(&id, &tpid, &tag, &action, &balance_type, &direction, &units, &expirationDate, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil {
return nil, err
}
unix, err := strconv.ParseInt(expirationDate, 10, 64)
@@ -1115,10 +1122,10 @@ func (self *SQLStorage) GetTpActions(tpid, tag string) (map[string][]*Action, er
}
expDate := time.Unix(unix, 0)
var a *Action
if balance_tag != MINUTES {
if balance_type != MINUTES {
a = &Action{
ActionType: action,
BalanceId: balance_tag,
BalanceId: balance_type,
Direction: direction,
Units: units,
ExpirationDate: expDate,
@@ -1128,7 +1135,7 @@ func (self *SQLStorage) GetTpActions(tpid, tag string) (map[string][]*Action, er
a = &Action{
Id: utils.GenUUID(),
ActionType: action,
BalanceId: balance_tag,
BalanceId: balance_type,
Direction: direction,
Weight: weight,
ExpirationDate: expDate,
@@ -1179,7 +1186,7 @@ func (self *SQLStorage) GetTpActionTimings(tpid, tag string) (ats map[string][]*
func (self *SQLStorage) GetTpActionTriggers(tpid, tag string) (map[string][]*ActionTrigger, error) {
ats := make(map[string][]*ActionTrigger)
q := fmt.Sprintf("SELECT tpid,tag,balance_tag,direction,threshold_type,threshold_value,destination_tag,actions_tag,weight FROM %s WHERE tpid='%s'",
q := fmt.Sprintf("SELECT tpid,tag,balance_type,direction,threshold_type,threshold_value,destination_tag,actions_tag,weight FROM %s WHERE tpid='%s'",
utils.TBL_TP_ACTION_TRIGGERS, tpid)
if tag != "" {
q += fmt.Sprintf(" AND tag='%s'", tag)

View File

@@ -19,9 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"strconv"
"errors"
"github.com/cgrates/cgrates/utils"
"strconv"
)
// Various helpers to deal with database
@@ -53,4 +53,3 @@ func ConfigureDatabase(db_type, host, port, name, user, pass string) (db DataSto
}
return db, nil
}

View File

@@ -19,48 +19,47 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"github.com/cgrates/cgrates/utils"
"io"
"io/ioutil"
"log"
"strconv"
"time"
"github.com/cgrates/cgrates/utils"
)
// Import tariff plan from csv into storDb
type TPCSVImporter struct {
TPid string // Load data on this tpid
StorDb DataStorage // StorDb connection handle
DirPath string // Directory path to import from
Sep rune // Separator in the csv file
Verbose bool // If true will print a detailed information instead of silently discarding it
ImportId string // Use this to differentiate between imports (eg: when autogenerating fields like RatingProfileId
TPid string // Load data on this tpid
StorDb DataStorage // StorDb connection handle
DirPath string // Directory path to import from
Sep rune // Separator in the csv file
Verbose bool // If true will print a detailed information instead of silently discarding it
ImportId string // Use this to differentiate between imports (eg: when autogenerating fields like RatingProfileId
}
// Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis.
// Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis.
// Change it to func(string) error as soon as Travis updates.
var fileHandlers = map[string]func(*TPCSVImporter,string) error{
utils.TIMINGS_CSV: (*TPCSVImporter).importTimings,
utils.DESTINATIONS_CSV: (*TPCSVImporter).importDestinations,
utils.RATES_CSV: (*TPCSVImporter).importRates,
utils.DESTINATION_RATES_CSV: (*TPCSVImporter).importDestinationRates,
utils.DESTRATE_TIMINGS_CSV: (*TPCSVImporter).importDestRateTimings,
utils.RATE_PROFILES_CSV: (*TPCSVImporter).importRatingProfiles,
utils.ACTIONS_CSV: (*TPCSVImporter).importActions,
utils.ACTION_TIMINGS_CSV: (*TPCSVImporter).importActionTimings,
utils.ACTION_TRIGGERS_CSV: (*TPCSVImporter).importActionTriggers,
utils.ACCOUNT_ACTIONS_CSV: (*TPCSVImporter).importAccountActions,
}
var fileHandlers = map[string]func(*TPCSVImporter, string) error{
utils.TIMINGS_CSV: (*TPCSVImporter).importTimings,
utils.DESTINATIONS_CSV: (*TPCSVImporter).importDestinations,
utils.RATES_CSV: (*TPCSVImporter).importRates,
utils.DESTINATION_RATES_CSV: (*TPCSVImporter).importDestinationRates,
utils.DESTRATE_TIMINGS_CSV: (*TPCSVImporter).importDestRateTimings,
utils.RATE_PROFILES_CSV: (*TPCSVImporter).importRatingProfiles,
utils.ACTIONS_CSV: (*TPCSVImporter).importActions,
utils.ACTION_TIMINGS_CSV: (*TPCSVImporter).importActionTimings,
utils.ACTION_TRIGGERS_CSV: (*TPCSVImporter).importActionTriggers,
utils.ACCOUNT_ACTIONS_CSV: (*TPCSVImporter).importAccountActions,
}
func (self *TPCSVImporter) Run() error {
files, _ := ioutil.ReadDir(self.DirPath)
for _, f := range files {
fHandler,hasName := fileHandlers[f.Name()]
fHandler, hasName := fileHandlers[f.Name()]
if !hasName {
continue
}
fHandler( self, f.Name() )
fHandler(self, f.Name())
}
return nil
}
@@ -70,8 +69,8 @@ func (self *TPCSVImporter) importTimings(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser( self.DirPath, fn )
if err!=nil {
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
if err != nil {
return err
}
lineNr := 0
@@ -86,7 +85,7 @@ func (self *TPCSVImporter) importTimings(fn string) error {
}
continue
}
tm := NewTiming( record... )
tm := NewTiming(record...)
if err := self.StorDb.SetTPTiming(self.TPid, tm); err != nil {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
@@ -98,8 +97,8 @@ func (self *TPCSVImporter) importDestinations(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser( self.DirPath, fn )
if err!=nil {
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
if err != nil {
return err
}
lineNr := 0
@@ -126,8 +125,8 @@ func (self *TPCSVImporter) importRates(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser( self.DirPath, fn )
if err!=nil {
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
if err != nil {
return err
}
lineNr := 0
@@ -146,7 +145,7 @@ func (self *TPCSVImporter) importRates(fn string) error {
if err != nil {
return err
}
if err := self.StorDb.SetTPRates( self.TPid, map[string][]*Rate{ record[0]: []*Rate{rt} } ); err != nil {
if err := self.StorDb.SetTPRates(self.TPid, map[string][]*Rate{record[0]: []*Rate{rt}}); err != nil {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
@@ -157,8 +156,8 @@ func (self *TPCSVImporter) importDestinationRates(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser( self.DirPath, fn )
if err!=nil {
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
if err != nil {
return err
}
lineNr := 0
@@ -173,9 +172,9 @@ func (self *TPCSVImporter) importDestinationRates(fn string) error {
}
continue
}
dr := &DestinationRate{record[0], record[1], record[2], nil}
if err := self.StorDb.SetTPDestinationRates( self.TPid,
map[string][]*DestinationRate{ dr.Tag: []*DestinationRate{dr} } ); err != nil {
dr := &DestinationRate{record[0], record[1], record[2], nil}
if err := self.StorDb.SetTPDestinationRates(self.TPid,
map[string][]*DestinationRate{dr.Tag: []*DestinationRate{dr}}); err != nil {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
@@ -186,8 +185,8 @@ func (self *TPCSVImporter) importDestRateTimings(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser( self.DirPath, fn )
if err!=nil {
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
if err != nil {
return err
}
lineNr := 0
@@ -207,12 +206,12 @@ func (self *TPCSVImporter) importDestRateTimings(fn string) error {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
continue
}
drt := &DestinationRateTiming{Tag: record[0],
DestinationRatesTag: record[1],
Weight: weight,
TimingsTag: record[2],
}
if err := self.StorDb.SetTPDestRateTimings( self.TPid, map[string][]*DestinationRateTiming{drt.Tag:[]*DestinationRateTiming{drt}}); err != nil {
drt := &DestinationRateTiming{Tag: record[0],
DestinationRatesTag: record[1],
Weight: weight,
TimingsTag: record[2],
}
if err := self.StorDb.SetTPDestRateTimings(self.TPid, map[string][]*DestinationRateTiming{drt.Tag: []*DestinationRateTiming{drt}}); err != nil {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
@@ -223,8 +222,8 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser( self.DirPath, fn )
if err!=nil {
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
if err != nil {
return err
}
lineNr := 0
@@ -246,18 +245,18 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error {
}
rpTag := "TPCSV" //Autogenerate rating profile id
if self.ImportId != "" {
rpTag += "_"+self.ImportId
rpTag += "_" + self.ImportId
}
rp := &RatingProfile{Tag: rpTag,
Tenant: tenant,
TOR: tor,
Direction: direction,
Subject: subject,
ActivationTime: at.Unix(),
DestRatesTimingTag: destRatesTimingTag,
RatesFallbackSubject: fallbacksubject,
}
if err := self.StorDb.SetTPRatingProfiles( self.TPid, map[string][]*RatingProfile{rpTag:[]*RatingProfile{rp}}); err != nil {
rp := &RatingProfile{Tag: rpTag,
Tenant: tenant,
TOR: tor,
Direction: direction,
Subject: subject,
ActivationTime: at.Unix(),
DestRatesTimingTag: destRatesTimingTag,
RatesFallbackSubject: fallbacksubject,
}
if err := self.StorDb.SetTPRatingProfiles(self.TPid, map[string][]*RatingProfile{rpTag: []*RatingProfile{rp}}); err != nil {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
@@ -265,6 +264,62 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error {
}
func (self *TPCSVImporter) importActions(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
if err != nil {
return err
}
lineNr := 0
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
actId, actionType, balanceType, direction, destTag, rateType := record[0], record[1], record[2], record[3], record[6], record[7]
units, err := strconv.ParseFloat(record[4], 64)
if err != nil {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
continue
}
var expiryTime time.Time // Empty initialized time represents never expire
if record[5] != "*unlimited" { // ToDo: Expand here for other meta tags or go way of adding time for expiry
expiryTime, err = time.Parse(time.RFC3339, record[5])
if err != nil {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
continue
}
}
rateValue, _ := strconv.ParseFloat(record[8], 64) // Ignore errors since empty string is error, we can find out based on rateType if defined
minutesWeight, _ := strconv.ParseFloat(record[9], 64)
weight, err := strconv.ParseFloat(record[10], 64)
if err != nil {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
continue
}
act := &Action{
ActionType: actionType,
BalanceId: balanceType,
Direction: direction,
Units: units,
ExpirationDate: expiryTime,
DestinationTag: destTag,
RateType: rateType,
RateValue: rateValue,
MinutesWeight: minutesWeight,
Weight: weight,
}
if err := self.StorDb.SetTPActions(self.TPid, map[string][]*Action{actId: []*Action{act}}); err != nil {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
}
@@ -279,5 +334,3 @@ func (self *TPCSVImporter) importActionTriggers(fn string) error {
func (self *TPCSVImporter) importAccountActions(fn string) error {
return nil
}