From d4017890ec04402fa2a5263ecefd1a384af8345a Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 29 Jul 2013 21:18:02 +0200 Subject: [PATCH] TPCSVImporter Actions, store refactoring, small API params changes --- apier/tpaccountactions.go | 23 ++- apier/tpactions.go | 23 ++- apier/tpdestinationrates.go | 6 +- apier/tpdestratetimings.go | 16 +- apier/tprates.go | 8 +- apier/tpratingprofiles.go | 26 +-- cmd/cgr-loader/cgr-loader.go | 30 +-- config/helpers.go | 11 +- .../mysql/create_tariffplan_tables.sql | 10 +- docs/api_tpactions.rst | 28 +-- engine/action.go | 2 + engine/loader_csv.go | 2 +- engine/loader_helpers.go | 86 +++++---- engine/ratingprofile.go | 8 +- engine/storage_interface.go | 2 +- engine/storage_map.go | 2 +- engine/storage_mongo.go | 2 +- engine/storage_redis.go | 2 +- engine/storage_sql.go | 55 +++--- engine/storage_utils.go | 3 +- engine/tpimporter_csv.go | 171 ++++++++++++------ utils/apitpdata.go | 8 +- utils/consts.go | 46 ++--- 23 files changed, 323 insertions(+), 247 deletions(-) diff --git a/apier/tpaccountactions.go b/apier/tpaccountactions.go index 69681f055..459a03645 100644 --- a/apier/tpaccountactions.go +++ b/apier/tpaccountactions.go @@ -25,11 +25,10 @@ import ( "github.com/cgrates/cgrates/utils" ) - // Creates a new AccountActions profile within a tariff plan func (self *Apier) SetTPAccountActions(attrs utils.ApiTPAccountActions, reply *string) error { if missing := utils.MissingStructFields(&attrs, - []string{"TPid", "AccountActionsId","Tenant","Account","Direction","ActionTimingsId","ActionTriggersId"}); len(missing) != 0 { + []string{"TPid", "AccountActionsId", "Tenant", "Account", "Direction", "ActionTimingsId", "ActionTriggersId"}); len(missing) != 0 { return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } if exists, err := self.StorDb.ExistsTPAccountActions(attrs.TPid, attrs.AccountActionsId); err != nil { @@ -38,9 +37,9 @@ func (self *Apier) SetTPAccountActions(attrs utils.ApiTPAccountActions, reply *s return errors.New(utils.ERR_DUPLICATE) } aa := map[string]*engine.AccountAction{ - attrs.AccountActionsId: &engine.AccountAction{Tenant: attrs.Tenant, Account: attrs.Account, Direction: attrs.Direction, - ActionTimingsTag: attrs.ActionTimingsId, ActionTriggersTag: attrs.ActionTriggersId}, - } + attrs.AccountActionsId: &engine.AccountAction{Tenant: attrs.Tenant, Account: attrs.Account, Direction: attrs.Direction, + ActionTimingsTag: attrs.ActionTimingsId, ActionTriggersTag: attrs.ActionTriggersId}, + } if err := self.StorDb.SetTPAccountActions(attrs.TPid, aa); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) @@ -64,13 +63,13 @@ func (self *Apier) GetTPAccountActions(attrs AttrGetTPAccountActions, reply *uti } else if len(aa) == 0 { return errors.New(utils.ERR_NOT_FOUND) } else { - *reply = utils.ApiTPAccountActions{TPid: attrs.TPid, - AccountActionsId: attrs.AccountActionsId, - Tenant: aa[attrs.AccountActionsId].Tenant, - Account:aa[attrs.AccountActionsId].Account, - Direction: aa[attrs.AccountActionsId].Direction, - ActionTimingsId: aa[attrs.AccountActionsId].ActionTimingsTag, - ActionTriggersId: aa[attrs.AccountActionsId].ActionTriggersTag } + *reply = utils.ApiTPAccountActions{TPid: attrs.TPid, + AccountActionsId: attrs.AccountActionsId, + Tenant: aa[attrs.AccountActionsId].Tenant, + Account: aa[attrs.AccountActionsId].Account, + Direction: aa[attrs.AccountActionsId].Direction, + ActionTimingsId: aa[attrs.AccountActionsId].ActionTimingsTag, + ActionTriggersId: aa[attrs.AccountActionsId].ActionTriggersTag} } return nil } diff --git a/apier/tpactions.go b/apier/tpactions.go index 0f043ad65..6f365704d 100644 --- a/apier/tpactions.go +++ b/apier/tpactions.go @@ -21,7 +21,9 @@ package apier import ( "errors" "fmt" + "time" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/cgrates/engine" ) // Creates a new Actions profile within a tariff plan @@ -31,8 +33,8 @@ func (self *Apier) SetTPActions(attrs utils.TPActions, reply *string) error { } for _, action := range attrs.Actions { requiredFields := []string{"Identifier", "Weight"} - if action.BalanceId != "" { // Add some inter-dependent parameters - if balanceType then we are not talking about simply calling actions - requiredFields = append(requiredFields, "Direction", "Units", "ExpirationTime") + if action.BalanceType != "" { // Add some inter-dependent parameters - if balanceType then we are not talking about simply calling actions + requiredFields = append(requiredFields, "Direction", "Units", "ExpiryTime") } if missing := utils.MissingStructFields(&action, requiredFields); len(missing) != 0 { return fmt.Errorf("%s:Action:%s:%v", utils.ERR_MANDATORY_IE_MISSING, action.Identifier, missing) @@ -43,7 +45,22 @@ func (self *Apier) SetTPActions(attrs utils.TPActions, reply *string) error { } else if exists { return errors.New(utils.ERR_DUPLICATE) } - if err := self.StorDb.SetTPActions(&attrs); err != nil { + acts := make([]*engine.Action, len(attrs.Actions)) + for idx, act := range attrs.Actions { + acts[idx] = &engine.Action{ + ActionType: act.Identifier, + BalanceId: act.BalanceType, + Direction: act.Direction, + Units: act.Units, + ExpirationDate: time.Unix(act.ExpiryTime,0), + DestinationTag: act.DestinationId, + RateType: act.RateType, + RateValue: act.Rate, + MinutesWeight: act.MinutesWeight, + Weight: act.Weight, + } + } + if err := self.StorDb.SetTPActions(attrs.TPid, map[string][]*engine.Action{attrs.ActionsId: acts}); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } *reply = "OK" diff --git a/apier/tpdestinationrates.go b/apier/tpdestinationrates.go index e649cb8c5..874d4de33 100644 --- a/apier/tpdestinationrates.go +++ b/apier/tpdestinationrates.go @@ -23,8 +23,8 @@ package apier import ( "errors" "fmt" - "github.com/cgrates/cgrates/utils" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) // Creates a new DestinationRate profile within a tariff plan @@ -38,10 +38,10 @@ func (self *Apier) SetTPDestinationRate(attrs utils.TPDestinationRate, reply *st return errors.New(utils.ERR_DUPLICATE) } drs := make([]*engine.DestinationRate, len(attrs.DestinationRates)) - for idx,dr := range attrs.DestinationRates { + for idx, dr := range attrs.DestinationRates { drs[idx] = &engine.DestinationRate{attrs.DestinationRateId, dr.DestinationId, dr.RateId, nil} } - if err := self.StorDb.SetTPDestinationRates( attrs.TPid, map[string][]*engine.DestinationRate{ attrs.DestinationRateId: drs } ); err != nil { + if err := self.StorDb.SetTPDestinationRates(attrs.TPid, map[string][]*engine.DestinationRate{attrs.DestinationRateId: drs}); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } *reply = "OK" diff --git a/apier/tpdestratetimings.go b/apier/tpdestratetimings.go index 16ae3bd24..f357c81e9 100644 --- a/apier/tpdestratetimings.go +++ b/apier/tpdestratetimings.go @@ -23,8 +23,8 @@ package apier import ( "errors" "fmt" - "github.com/cgrates/cgrates/utils" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) // Creates a new DestinationRateTiming profile within a tariff plan @@ -38,14 +38,14 @@ func (self *Apier) SetTPDestRateTiming(attrs utils.TPDestRateTiming, reply *stri return errors.New(utils.ERR_DUPLICATE) } drts := make([]*engine.DestinationRateTiming, len(attrs.DestRateTimings)) - for idx,drt := range attrs.DestRateTimings { - drts[idx] = &engine.DestinationRateTiming{Tag: attrs.DestRateTimingId, - DestinationRatesTag: drt.DestRatesId, - Weight: drt.Weight, - TimingsTag: drt.TimingId, - } + for idx, drt := range attrs.DestRateTimings { + drts[idx] = &engine.DestinationRateTiming{Tag: attrs.DestRateTimingId, + DestinationRatesTag: drt.DestRatesId, + Weight: drt.Weight, + TimingsTag: drt.TimingId, + } } - if err := self.StorDb.SetTPDestRateTimings( attrs.TPid, map[string][]*engine.DestinationRateTiming{ attrs.DestRateTimingId: drts } ); err != nil { + if err := self.StorDb.SetTPDestRateTimings(attrs.TPid, map[string][]*engine.DestinationRateTiming{attrs.DestRateTimingId: drts}); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } *reply = "OK" diff --git a/apier/tprates.go b/apier/tprates.go index 41790aa6b..d55192cb7 100644 --- a/apier/tprates.go +++ b/apier/tprates.go @@ -23,8 +23,8 @@ package apier import ( "errors" "fmt" - "github.com/cgrates/cgrates/utils" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) // Creates a new rate within a tariff plan @@ -38,11 +38,11 @@ func (self *Apier) SetTPRate(attrs utils.TPRate, reply *string) error { return errors.New(utils.ERR_DUPLICATE) } rts := make([]*engine.Rate, len(attrs.RateSlots)) - for idx,rtSlot := range attrs.RateSlots { - rts[idx] = &engine.Rate{attrs.RateId, rtSlot.ConnectFee, rtSlot.Rate, float64(rtSlot.RatedUnits), + for idx, rtSlot := range attrs.RateSlots { + rts[idx] = &engine.Rate{attrs.RateId, rtSlot.ConnectFee, rtSlot.Rate, float64(rtSlot.RatedUnits), float64(rtSlot.RateIncrements), float64(rtSlot.GroupInterval), rtSlot.RoundingMethod, rtSlot.RoundingDecimals, rtSlot.Weight} } - if err := self.StorDb.SetTPRates( attrs.TPid, map[string][]*engine.Rate{ attrs.RateId: rts } ); err != nil { + if err := self.StorDb.SetTPRates(attrs.TPid, map[string][]*engine.Rate{attrs.RateId: rts}); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } *reply = "OK" diff --git a/apier/tpratingprofiles.go b/apier/tpratingprofiles.go index ca6a6908c..928c0d367 100644 --- a/apier/tpratingprofiles.go +++ b/apier/tpratingprofiles.go @@ -23,8 +23,8 @@ package apier import ( "errors" "fmt" - "github.com/cgrates/cgrates/utils" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) // Creates a new RatingProfile within a tariff plan @@ -38,18 +38,18 @@ func (self *Apier) SetTPRatingProfile(attrs utils.TPRatingProfile, reply *string return errors.New(utils.ERR_DUPLICATE) } rps := make([]*engine.RatingProfile, len(attrs.RatingActivations)) - for idx,ra := range attrs.RatingActivations { - rps[idx] = &engine.RatingProfile{Tag: attrs.RatingProfileId, - Tenant: attrs.Tenant, - TOR: attrs.TOR, - Direction: attrs.Direction, - Subject: attrs.Subject, - ActivationTime: ra.ActivationTime, - DestRatesTimingTag: ra.DestRateTimingId, - RatesFallbackSubject: attrs.RatesFallbackSubject, - } + for idx, ra := range attrs.RatingActivations { + rps[idx] = &engine.RatingProfile{Tag: attrs.RatingProfileId, + Tenant: attrs.Tenant, + TOR: attrs.TOR, + Direction: attrs.Direction, + Subject: attrs.Subject, + ActivationTime: ra.ActivationTime, + DestRatesTimingTag: ra.DestRateTimingId, + RatesFallbackSubject: attrs.RatesFallbackSubject, + } } - if err := self.StorDb.SetTPRatingProfiles( attrs.TPid, map[string][]*engine.RatingProfile{ attrs.RatingProfileId: rps } ); err != nil { + if err := self.StorDb.SetTPRatingProfiles(attrs.TPid, map[string][]*engine.RatingProfile{attrs.RatingProfileId: rps}); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } *reply = "OK" @@ -57,7 +57,7 @@ func (self *Apier) SetTPRatingProfile(attrs utils.TPRatingProfile, reply *string } type AttrGetTPRatingProfile struct { - TPid string // Tariff plan id + TPid string // Tariff plan id RatingProfileId string // RatingProfile id } diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 1a958f733..a520f5aa8 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -21,22 +21,22 @@ package main import ( "flag" "fmt" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/cgrates/config" "log" "path" ) var ( //separator = flag.String("separator", ",", "Default field separator") - cgrConfig,_ = config.NewDefaultCGRConfig() + cgrConfig, _ = config.NewDefaultCGRConfig() data_db_type = flag.String("datadb_type", cgrConfig.DataDBType, "The type of the dataDb database (redis|mongo|postgres|mysql)") data_db_host = flag.String("datadb_host", cgrConfig.DataDBHost, "The dataDb host to connect to.") data_db_port = flag.String("datadb_port", cgrConfig.DataDBPort, "The dataDb port to bind to.") data_db_name = flag.String("datadb_name", cgrConfig.DataDBName, "The name/number of the dataDb to connect to.") data_db_user = flag.String("datadb_user", cgrConfig.DataDBUser, "The dataDb user to sign in as.") - data_db_pass = flag.String("datadb_passwd", cgrConfig.DataDBPass, "The dataDb user's password.") + data_db_pass = flag.String("datadb_passwd", cgrConfig.DataDBPass, "The dataDb user's password.") stor_db_type = flag.String("stordb_type", cgrConfig.StorDBType, "The type of the storDb database (redis|mongo|postgres|mysql)") stor_db_host = flag.String("stordb_host", cgrConfig.StorDBHost, "The storDb host to connect to.") @@ -45,14 +45,14 @@ var ( stor_db_user = flag.String("stordb_user", cgrConfig.StorDBUser, "The storDb user to sign in as.") stor_db_pass = flag.String("stordb_passwd", cgrConfig.StorDBPass, "The storDb user's password.") - flush = flag.Bool("flush", false, "Flush the database before importing") - tpid = flag.String("tpid", "", "The tariff plan id from the database") - dataPath = flag.String("path", ".", "The path containing the data files") - version = flag.Bool("version", false, "Prints the application version.") - verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output") + flush = flag.Bool("flush", false, "Flush the database before importing") + tpid = flag.String("tpid", "", "The tariff plan id from the database") + dataPath = flag.String("path", ".", "The path containing the data files") + version = flag.Bool("version", false, "Prints the application version.") + verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output") fromStorDb = flag.Bool("from-stordb", false, "Load the tariff plan from storDb to dataDb") - toStorDb = flag.Bool("to-stordb", false, "Import the tariff plan from files to storDb") - runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") + toStorDb = flag.Bool("to-stordb", false, "Import the tariff plan from files to storDb") + runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") ) func main() { @@ -73,11 +73,13 @@ func main() { dataDb, errDataDb = engine.ConfigureDatabase(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass) } // Defer databases opened to be closed when we are done - for _,db := range []engine.DataStorage{ dataDb, storDb } { - if db != nil { defer db.Close() } + for _, db := range []engine.DataStorage{dataDb, storDb} { + if db != nil { + defer db.Close() + } } // Stop on db errors - for _,err = range []error{errDataDb, errStorDb} { + for _, err = range []error{errDataDb, errStorDb} { if err != nil { log.Fatalf("Could not open database connection: %v", err) } @@ -89,7 +91,7 @@ func main() { if *tpid == "" { log.Fatal("TPid required, please define it via *-tpid* command argument.") } - csvImporter := engine.TPCSVImporter{ *tpid, storDb, *dataPath, ',', *verbose, *runId } + csvImporter := engine.TPCSVImporter{*tpid, storDb, *dataPath, ',', *verbose, *runId} if errImport := csvImporter.Run(); errImport != nil { log.Fatal(errImport) } diff --git a/config/helpers.go b/config/helpers.go index b76663d4e..646ebc30f 100644 --- a/config/helpers.go +++ b/config/helpers.go @@ -24,21 +24,20 @@ import ( "strings" ) - // Adds support for slice values in config -func ConfigSlice( c *conf.ConfigFile, section, valName string ) ([]string, error) { +func ConfigSlice(c *conf.ConfigFile, section, valName string) ([]string, error) { sliceStr, errGet := c.GetString(section, valName) if errGet != nil { return nil, errGet } - cfgValStrs := strings.Split(sliceStr, ",") // If need arrises, we can make the separator configurable - if len(cfgValStrs)==1 && cfgValStrs[0]=="" { // Prevents returning iterable with empty value + cfgValStrs := strings.Split(sliceStr, ",") // If need arrises, we can make the separator configurable + if len(cfgValStrs) == 1 && cfgValStrs[0] == "" { // Prevents returning iterable with empty value return []string{}, nil } - for _,elm := range cfgValStrs { + for _, elm := range cfgValStrs { if elm == "" { //One empty element is presented when splitting empty string return nil, errors.New("Empty values in config slice") - + } } return cfgValStrs, nil diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql index 4a7fa78b0..e0051da29 100644 --- a/data/storage/mysql/create_tariffplan_tables.sql +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -115,10 +115,10 @@ CREATE TABLE `tp_actions` ( `tpid` char(40) NOT NULL, `tag` varchar(24) NOT NULL, `action` varchar(24) NOT NULL, - `balance_tag` varchar(24) NOT NULL, + `balance_type` varchar(24) NOT NULL, `direction` varchar(8) NOT NULL, `units` DECIMAL(5,2) NOT NULL, - `expiration_time` int(11) NOT NULL, + `expiry_time` int(16) NOT NULL, `destination_tag` varchar(24) NOT NULL, `rate_type` varchar(8) NOT NULL, `rate` DECIMAL(5,4) NOT NULL, @@ -126,7 +126,7 @@ CREATE TABLE `tp_actions` ( `weight` DECIMAL(5,2) NOT NULL, PRIMARY KEY (`id`), KEY `tpid` (`tpid`), - UNIQUE KEY `unique_action` (`tpid`,`tag`,`action`,`balance_tag`,`direction`,`expiration_time`,`destination_tag`,`rate_type`,`minutes_weight`,`weight`) + UNIQUE KEY `unique_action` (`tpid`,`tag`,`action`,`balance_type`,`direction`,`expiry_time`,`destination_tag`,`rate_type`,`minutes_weight`,`weight`) ); -- @@ -153,7 +153,7 @@ CREATE TABLE `tp_action_triggers` ( `id` int(11) NOT NULL AUTO_INCREMENT, `tpid` char(40) NOT NULL, `tag` varchar(24) NOT NULL, - `balance_tag` varchar(24) NOT NULL, + `balance_type` varchar(24) NOT NULL, `direction` varchar(8) NOT NULL, `threshold_type` char(11) NOT NULL, `threshold_value` DECIMAL(5,4) NOT NULL, @@ -162,7 +162,7 @@ CREATE TABLE `tp_action_triggers` ( `weight` DECIMAL(5,2) NOT NULL, PRIMARY KEY (`id`), KEY `tpid` (`tpid`), - UNIQUE KEY `unique_trigger_definition` (`tpid`,`tag`,`balance_tag`,`direction`,`threshold_type`,`threshold_value`,`destination_tag`,`actions_tag`) + UNIQUE KEY `unique_trigger_definition` (`tpid`,`tag`,`balance_type`,`direction`,`threshold_type`,`threshold_value`,`destination_tag`,`actions_tag`) ); -- diff --git a/docs/api_tpactions.rst b/docs/api_tpactions.rst index 8d921e989..4bcebbc46 100644 --- a/docs/api_tpactions.rst +++ b/docs/api_tpactions.rst @@ -16,16 +16,16 @@ Creates a new Actions profile within a tariff plan. type Action struct { Identifier string // Identifier mapped in the code - BalanceId string // Type of balance the action will operate on + BalanceType string // Type of balance the action will operate on Direction string // Balance direction Units float64 // Number of units to add/deduct - ExpirationTime int64 // Time when the units will expire + ExpiryTime int64 // Time when the units will expire DestinationId string // Destination profile id - RateType string // Type of price + RateType string // Type of rate <*absolute|*percent> Rate float64 // Price value MinutesWeight float64 // Minutes weight Weight float64 // Action's weight - } + } Mandatory parameters: ``[]string{"TPid", "ActionsId", "Actions", "Identifier", "Weight"}`` @@ -39,14 +39,14 @@ Creates a new Actions profile within a tariff plan. { "Actions": [ { - "BalanceId": "MONEY", + "BalanceType": "*monetary", "DestinationId": "CGRATES_NET", - "Direction": "OUT", - "ExpirationTime": 1374082259, + "Direction": "*out", + "ExpiryTime": 1374082259, "Identifier": "TOPUP_RESET", "MinutesWeight": 10, "Rate": 0.12, - "RateType": "ABSOLUTE", + "RateType": "*absolute", "Units": 10, "Weight": 10 } @@ -129,10 +129,10 @@ Queries specific Actions profile on tariff plan. type Action struct { Identifier string // Identifier mapped in the code - BalanceId string // Type of balance the action will operate on + BalanceType string // Type of balance the action will operate on Direction string // Balance direction Units float64 // Number of units to add/deduct - ExpirationTime int64 // Time when the units will expire + ExpiryTime int64 // Time when the units will expire DestinationId string // Destination profile id RateType string // Type of price Rate float64 // Price value @@ -149,14 +149,14 @@ Queries specific Actions profile on tariff plan. "result": { "Actions": [ { - "BalanceId": "MONEY", + "BalanceType": "*monetary", "DestinationId": "CGRATES_NET", - "Direction": "OUT", - "ExpirationTime": 1374082259, + "Direction": "*out", + "ExpiryTime": 1374082259, "Identifier": "TOPUP_RESET", "MinutesWeight": 10, "Rate": 0.12, - "RateType": "ABSOLUTE", + "RateType": "*absolute", "Units": 10, "Weight": 10 } diff --git a/engine/action.go b/engine/action.go index 227277e56..65e38c3d1 100644 --- a/engine/action.go +++ b/engine/action.go @@ -36,6 +36,8 @@ type Action struct { Units float64 Weight float64 MinuteBucket *MinuteBucket + DestinationTag, RateType string // From here for import/load purposes only + RateValue, MinutesWeight float64 } const ( diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 96c4c7da3..ec4213b4e 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -349,7 +349,7 @@ func (csvr *CSVReader) LoadActions() (err error) { if err != nil { return errors.New(fmt.Sprintf("Could not parse action units: %v", err)) } - var expiryTime time.Time // Empty initialized time represents never expire + var expiryTime time.Time // Empty initialized time represents never expire if record[5] != "*unlimited" { // ToDo: Expand here for other meta tags or go way of adding time for expiry expiryTime, err = time.Parse(time.RFC3339, record[5]) if err != nil { diff --git a/engine/loader_helpers.go b/engine/loader_helpers.go index 919ebb120..07977005e 100644 --- a/engine/loader_helpers.go +++ b/engine/loader_helpers.go @@ -20,15 +20,15 @@ package engine import ( "bufio" - "path" "errors" "fmt" + "github.com/cgrates/cgrates/utils" "log" "os" + "path" "regexp" "strconv" "strings" - "github.com/cgrates/cgrates/utils" ) type TPLoader interface { @@ -202,45 +202,43 @@ func ValidateCSVData(fn string, re *regexp.Regexp) (err error) { } type FileLineRegexValidator struct { - FieldsPerRecord int // Number of fields in one record, useful for crosschecks - Rule *regexp.Regexp // Regexp rule - Message string // Pass this message as helper + FieldsPerRecord int // Number of fields in one record, useful for crosschecks + Rule *regexp.Regexp // Regexp rule + Message string // Pass this message as helper } -var FileValidators = map[string]*FileLineRegexValidator{ - utils.DESTINATIONS_CSV: &FileLineRegexValidator{ utils.DESTINATIONS_NRCOLS, - regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\+?\d+.?\d*){1}$`), - "Tag([0-9A-Za-z_]),Prefix([0-9])"}, - utils.TIMINGS_CSV: &FileLineRegexValidator{ utils.TIMINGS_NRCOLS, - regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\*any\s*,\s*|(?:\d{1,4};?)+\s*,\s*|\s*,\s*){4}(?:\d{2}:\d{2}:\d{2}|\*asap){1}$`), - "Tag([0-9A-Za-z_]),Years([0-9;]|*all|),Months([0-9;]|*all|),MonthDays([0-9;]|*all|),WeekDays([0-9;]|*all|),Time([0-9:]|*asap)"}, - utils.RATES_CSV: &FileLineRegexValidator{ utils.RATES_NRCOLS, - regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\d+\.?\d*,){5}(?:\*\w+,){1}(?:\d+\.?\d*,?){2}$`), - "Tag([0-9A-Za-z_]),ConnectFee([0-9.]),Rate([0-9.]),RatedUnits([0-9.]),RateIncrement([0-9.])"}, - utils.DESTINATION_RATES_CSV: &FileLineRegexValidator{ utils.DESTINATION_RATES_NRCOLS, - regexp.MustCompile(`(?:\w+\s*,?\s*){3}$`), - "Tag([0-9A-Za-z_]),DestinationsTag([0-9A-Za-z_]),RateTag([0-9A-Za-z_])"}, - utils.DESTRATE_TIMINGS_CSV: &FileLineRegexValidator{ utils.DESTRATE_TIMINGS_NRCOLS, - regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+.?\d*){1}$`), - "Tag([0-9A-Za-z_]),DestinationRatesTag([0-9A-Za-z_]),TimingProfile([0-9A-Za-z_]),Weight([0-9.])"}, - utils.RATE_PROFILES_CSV: &FileLineRegexValidator{ utils.RATE_PROFILES_NRCOLS, - regexp.MustCompile(`(?:\w+\s*,\s*){2}(?:\*out\s*,\s*){1}(?:\*any\s*,\s*|\w+\s*,\s*){1}(?:\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z){1}(?:\w*\s*,?\s*){2}$`), - "Tenant([0-9A-Za-z_]),TOR([0-9A-Za-z_]),Direction(*out),Subject([0-9A-Za-z_]|*all),RatesFallbackSubject([0-9A-Za-z_]|),RatesTimingTag([0-9A-Za-z_]),ActivationTime([0-9T:X])"}, - utils.ACTIONS_CSV: &FileLineRegexValidator{ utils.ACTIONS_NRCOLS, - regexp.MustCompile(`(?:\w+\s*),(?:\*\w+\s*),(?:\*\w+\s*),(?:\*out\s*),(?:\d+\s*),(?:\*\w+\s*|\+\d+[smh]\s*|\d+\s*),(?:\*any|\w+\s*),(?:\*\w+\s*)?,(?:\d+\.?\d*\s*)?,(?:\d+\.?\d*\s*)?,(?:\d+\.?\d*\s*)$`), - "Tag([0-9A-Za-z_]),Action([0-9A-Za-z_]),BalanceType([*a-z_]),Direction(*out),Units([0-9]),ExpiryTime(*[a-z_]|+[0-9][smh]|[0-9])DestinationTag([0-9A-Za-z_]|*all),RateType(*[a-z_]),RateValue([0-9.]),MinutesWeight([0-9.]),Weight([0-9.])"}, - utils.ACTION_TIMINGS_CSV: &FileLineRegexValidator{ utils.ACTION_TIMINGS_NRCOLS, - regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+\.?\d*){1}`), - "Tag([0-9A-Za-z_]),ActionsTag([0-9A-Za-z_]),TimingTag([0-9A-Za-z_]),Weight([0-9.])"}, - utils.ACTION_TRIGGERS_CSV: &FileLineRegexValidator{ utils.ACTION_TRIGGERS_NRCOLS, - regexp.MustCompile(`(?:\w+),(?:\*\w+),(?:\*out),(?:\*\w+),(?:\d+\.?\d*),(?:\w+|\*any)?,(?:\w+),(?:\d+\.?\d*)$`), - "Tag([0-9A-Za-z_]),BalanceType(*[a-z_]),Direction(*out),ThresholdType(*[a-z_]),ThresholdValue([0-9]+),DestinationTag([0-9A-Za-z_]|*all),ActionsTag([0-9A-Za-z_]),Weight([0-9]+)"}, - utils.ACCOUNT_ACTIONS_CSV: &FileLineRegexValidator{ utils.ACCOUNT_ACTIONS_NRCOLS, - regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\w+\s*,\s*){1}(?:\*out\s*,\s*){1}(?:\w+\s*,?\s*){2}$`), - "Tenant([0-9A-Za-z_]),Account([0-9A-Za-z_.]),Direction(*out),ActionTimingsTag([0-9A-Za-z_]),ActionTriggersTag([0-9A-Za-z_])"}, - } - - +var FileValidators = map[string]*FileLineRegexValidator{ + utils.DESTINATIONS_CSV: &FileLineRegexValidator{utils.DESTINATIONS_NRCOLS, + regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\+?\d+.?\d*){1}$`), + "Tag([0-9A-Za-z_]),Prefix([0-9])"}, + utils.TIMINGS_CSV: &FileLineRegexValidator{utils.TIMINGS_NRCOLS, + regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\*any\s*,\s*|(?:\d{1,4};?)+\s*,\s*|\s*,\s*){4}(?:\d{2}:\d{2}:\d{2}|\*asap){1}$`), + "Tag([0-9A-Za-z_]),Years([0-9;]|*all|),Months([0-9;]|*all|),MonthDays([0-9;]|*all|),WeekDays([0-9;]|*all|),Time([0-9:]|*asap)"}, + utils.RATES_CSV: &FileLineRegexValidator{utils.RATES_NRCOLS, + regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\d+\.?\d*,){5}(?:\*\w+,){1}(?:\d+\.?\d*,?){2}$`), + "Tag([0-9A-Za-z_]),ConnectFee([0-9.]),Rate([0-9.]),RatedUnits([0-9.]),RateIncrement([0-9.])"}, + utils.DESTINATION_RATES_CSV: &FileLineRegexValidator{utils.DESTINATION_RATES_NRCOLS, + regexp.MustCompile(`(?:\w+\s*,?\s*){3}$`), + "Tag([0-9A-Za-z_]),DestinationsTag([0-9A-Za-z_]),RateTag([0-9A-Za-z_])"}, + utils.DESTRATE_TIMINGS_CSV: &FileLineRegexValidator{utils.DESTRATE_TIMINGS_NRCOLS, + regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+.?\d*){1}$`), + "Tag([0-9A-Za-z_]),DestinationRatesTag([0-9A-Za-z_]),TimingProfile([0-9A-Za-z_]),Weight([0-9.])"}, + utils.RATE_PROFILES_CSV: &FileLineRegexValidator{utils.RATE_PROFILES_NRCOLS, + regexp.MustCompile(`(?:\w+\s*,\s*){2}(?:\*out\s*,\s*){1}(?:\*any\s*,\s*|\w+\s*,\s*){1}(?:\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z){1}(?:\w*\s*,?\s*){2}$`), + "Tenant([0-9A-Za-z_]),TOR([0-9A-Za-z_]),Direction(*out),Subject([0-9A-Za-z_]|*all),RatesFallbackSubject([0-9A-Za-z_]|),RatesTimingTag([0-9A-Za-z_]),ActivationTime([0-9T:X])"}, + utils.ACTIONS_CSV: &FileLineRegexValidator{utils.ACTIONS_NRCOLS, + regexp.MustCompile(`(?:\w+\s*),(?:\*\w+\s*),(?:\*\w+\s*),(?:\*out\s*),(?:\d+\s*),(?:\*\w+\s*|\+\d+[smh]\s*|\d+\s*),(?:\*any|\w+\s*),(?:\*\w+\s*)?,(?:\d+\.?\d*\s*)?,(?:\d+\.?\d*\s*)?,(?:\d+\.?\d*\s*)$`), + "Tag([0-9A-Za-z_]),Action([0-9A-Za-z_]),BalanceType([*a-z_]),Direction(*out),Units([0-9]),ExpiryTime(*[a-z_]|+[0-9][smh]|[0-9])DestinationTag([0-9A-Za-z_]|*all),RateType(*[a-z_]),RateValue([0-9.]),MinutesWeight([0-9.]),Weight([0-9.])"}, + utils.ACTION_TIMINGS_CSV: &FileLineRegexValidator{utils.ACTION_TIMINGS_NRCOLS, + regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+\.?\d*){1}`), + "Tag([0-9A-Za-z_]),ActionsTag([0-9A-Za-z_]),TimingTag([0-9A-Za-z_]),Weight([0-9.])"}, + utils.ACTION_TRIGGERS_CSV: &FileLineRegexValidator{utils.ACTION_TRIGGERS_NRCOLS, + regexp.MustCompile(`(?:\w+),(?:\*\w+),(?:\*out),(?:\*\w+),(?:\d+\.?\d*),(?:\w+|\*any)?,(?:\w+),(?:\d+\.?\d*)$`), + "Tag([0-9A-Za-z_]),BalanceType(*[a-z_]),Direction(*out),ThresholdType(*[a-z_]),ThresholdValue([0-9]+),DestinationTag([0-9A-Za-z_]|*all),ActionsTag([0-9A-Za-z_]),Weight([0-9]+)"}, + utils.ACCOUNT_ACTIONS_CSV: &FileLineRegexValidator{utils.ACCOUNT_ACTIONS_NRCOLS, + regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\w+\s*,\s*){1}(?:\*out\s*,\s*){1}(?:\w+\s*,?\s*){2}$`), + "Tenant([0-9A-Za-z_]),Account([0-9A-Za-z_.]),Direction(*out),ActionTimingsTag([0-9A-Za-z_]),ActionTriggersTag([0-9A-Za-z_])"}, +} func NewTPCSVFileParser(dirPath, fileName string) (*TPCSVFileParser, error) { validator, hasValidator := FileValidators[fileName] @@ -248,7 +246,7 @@ func NewTPCSVFileParser(dirPath, fileName string) (*TPCSVFileParser, error) { return nil, fmt.Errorf("No validator found for file <%s>", fileName) } // Open the file here - fin, err := os.Open( path.Join(dirPath, fileName) ) + fin, err := os.Open(path.Join(dirPath, fileName)) if err != nil { return nil, err } @@ -259,11 +257,11 @@ func NewTPCSVFileParser(dirPath, fileName string) (*TPCSVFileParser, error) { // Opens the connection to a file and returns the parsed lines one by one when ParseNextLine() is called type TPCSVFileParser struct { - validator *FileLineRegexValidator // Row validator - reader *bufio.Reader // Reader to the file we are interested in + validator *FileLineRegexValidator // Row validator + reader *bufio.Reader // Reader to the file we are interested in } -func (self *TPCSVFileParser) ParseNextLine() ( []string, error ) { +func (self *TPCSVFileParser) ParseNextLine() ([]string, error) { line, truncated, err := self.reader.ReadLine() if err != nil { return nil, err @@ -279,7 +277,7 @@ func (self *TPCSVFileParser) ParseNextLine() ( []string, error ) { return nil, fmt.Errorf("Invalid line, <%s>", self.validator.Message) } // Open csv reader directly on string line - csvReader, _, err := openStringCSVReader( string(line), ',', self.validator.FieldsPerRecord ) + csvReader, _, err := openStringCSVReader(string(line), ',', self.validator.FieldsPerRecord) if err != nil { return nil, err } diff --git a/engine/ratingprofile.go b/engine/ratingprofile.go index 8105e72e4..431d4fd2a 100644 --- a/engine/ratingprofile.go +++ b/engine/ratingprofile.go @@ -29,11 +29,11 @@ const ( ) type RatingProfile struct { - Id string - FallbackKey string // FallbackKey is used as complete combination of Tenant:TOR:Direction:Subject - DestinationMap map[string][]*ActivationPeriod + Id string + FallbackKey string // FallbackKey is used as complete combination of Tenant:TOR:Direction:Subject + DestinationMap map[string][]*ActivationPeriod Tag, Tenant, TOR, Direction, Subject, DestRatesTimingTag, RatesFallbackSubject string // used only for loading - ActivationTime int64 + ActivationTime int64 } // Adds an activation period that applyes to current rating profile if not already present. diff --git a/engine/storage_interface.go b/engine/storage_interface.go index dd7570213..6f25d34bf 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -84,7 +84,7 @@ type DataStorage interface { GetTPRatingProfile(string, string) (*utils.TPRatingProfile, error) GetTPRatingProfileIds(*utils.AttrTPRatingProfileIds) ([]string, error) ExistsTPActions(string, string) (bool, error) - SetTPActions(*utils.TPActions) error + SetTPActions(string, map[string][]*Action) error GetTPActions(string, string) (*utils.TPActions, error) GetTPActionIds(string) ([]string, error) ExistsTPActionTimings(string, string) (bool, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 051b10745..d80958d1d 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -177,7 +177,7 @@ func (ms *MapStorage) ExistsTPActions(tpid, aId string) (bool, error) { return false, errors.New(utils.ERR_NOT_IMPLEMENTED) } -func (ms *MapStorage) SetTPActions(ap *utils.TPActions) error { +func (ms *MapStorage) SetTPActions(tpid string, acts map[string][]*Action) error { return errors.New(utils.ERR_NOT_IMPLEMENTED) } diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index a73bb6c0a..04c748ee3 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -252,7 +252,7 @@ func (ms *MongoStorage) ExistsTPActions(tpid, aId string) (bool, error) { return false, errors.New(utils.ERR_NOT_IMPLEMENTED) } -func (ms *MongoStorage) SetTPActions(ap *utils.TPActions) error { +func (ms *MongoStorage) SetTPActions(tpid string, acts map[string][]*Action) error { return errors.New(utils.ERR_NOT_IMPLEMENTED) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index ada155c96..39aacbdce 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -207,7 +207,7 @@ func (rs *RedisStorage) ExistsTPActions(tpid, aId string) (bool, error) { return false, errors.New(utils.ERR_NOT_IMPLEMENTED) } -func (rs *RedisStorage) SetTPActions(ap *utils.TPActions) error { +func (rs *RedisStorage) SetTPActions(tpid string, acts map[string][]*Action) error { return errors.New(utils.ERR_NOT_IMPLEMENTED) } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index e90b5dfbf..55e6986e7 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -222,7 +222,7 @@ func (self *SQLStorage) SetTPRates(tpid string, rts map[string][]*Rate) error { qry += "," } qry += fmt.Sprintf("('%s', '%s', %f, %f, %d, %d,%d,'%s', %d, %f)", - tpid, rtId, rt.ConnectFee, rt.Price, int(rt.PricedUnits), int(rt.RateIncrements), int(rt.GroupInterval), + tpid, rtId, rt.ConnectFee, rt.Price, int(rt.PricedUnits), int(rt.RateIncrements), int(rt.GroupInterval), rt.RoundingMethod, rt.RoundingDecimals, rt.Weight) i++ } @@ -250,7 +250,7 @@ func (self *SQLStorage) GetTPRate(tpid, rtId string) (*utils.TPRate, error) { if err != nil { return nil, err } - rt.RateSlots = append(rt.RateSlots, utils.RateSlot{connectFee, rate, ratedUnits, rateIncrements, groupInterval, + rt.RateSlots = append(rt.RateSlots, utils.RateSlot{connectFee, rate, ratedUnits, rateIncrements, groupInterval, roundingMethod, roundingDecimals, weight}) } if i == 0 { @@ -376,7 +376,7 @@ func (self *SQLStorage) SetTPDestRateTimings(tpid string, drts map[string][]*Des i := 0 for drtId, drtRows := range drts { for _, drt := range drtRows { - if i!=0 { //Consecutive values after the first will be prefixed with "," as separator + if i != 0 { //Consecutive values after the first will be prefixed with "," as separator qry += "," } qry += fmt.Sprintf("('%s','%s','%s','%s',%f)", @@ -450,7 +450,7 @@ func (self *SQLStorage) SetTPRatingProfiles(tpid string, rps map[string][]*Ratin if len(rps) == 0 { return nil //Nothing to set } - qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,tenant,tor,direction,subject,activation_time,destrates_timing_tag,rates_fallback_subject) VALUES ", + qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,tenant,tor,direction,subject,activation_time,destrates_timing_tag,rates_fallback_subject) VALUES ", utils.TBL_TP_RATE_PROFILES) i := 0 for rpId, rp := range rps { @@ -458,11 +458,11 @@ func (self *SQLStorage) SetTPRatingProfiles(tpid string, rps map[string][]*Ratin if i != 0 { //Consecutive values after the first will be prefixed with "," as separator qry += "," } - qry += fmt.Sprintf("('%s', '%s', '%s', '%s', '%s', '%s', %d,'%s','%s')", tpid, rpId, rpa.Tenant, rpa.TOR, rpa.Direction, + qry += fmt.Sprintf("('%s', '%s', '%s', '%s', '%s', '%s', %d,'%s','%s')", tpid, rpId, rpa.Tenant, rpa.TOR, rpa.Direction, rpa.Subject, rpa.ActivationTime, rpa.DestRatesTimingTag, rpa.RatesFallbackSubject) i++ } - + } if _, err := self.Db.Exec(qry); err != nil { return err @@ -546,19 +546,26 @@ func (self *SQLStorage) ExistsTPActions(tpid, actsId string) (bool, error) { return exists, nil } -func (self *SQLStorage) SetTPActions(acts *utils.TPActions) error { - if len(acts.Actions) == 0 { +func (self *SQLStorage) SetTPActions(tpid string, acts map[string][]*Action) error { + if len(acts) == 0 { return nil //Nothing to set } - // Using multiple values in query to spare some network processing time - qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,action,balance_tag,direction,units,expiration_time,destination_tag,rate_type,rate, minutes_weight,weight) VALUES ", utils.TBL_TP_ACTIONS) - for idx, act := range acts.Actions { - if idx != 0 { //Consecutive values after the first will be prefixed with "," as separator - qry += "," + qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,action,balance_type,direction,units,expiry_time,destination_tag,rate_type,rate, minutes_weight,weight) VALUES ", utils.TBL_TP_ACTIONS) + i := 0 + for actId, actRows := range acts { + for _, act := range actRows { + if i != 0 { //Consecutive values after the first will be prefixed with "," as separator + qry += "," + } + var expTime int64 + if !act.ExpirationDate.IsZero() { + expTime = act.ExpirationDate.Unix() + } + qry += fmt.Sprintf("('%s','%s','%s','%s','%s',%f,%d,'%s','%s',%f,%f,%f)", + tpid, actId, act.ActionType, act.BalanceId, act.Direction, act.Units, expTime, + act.DestinationTag, act.RateType, act.RateValue, act.MinutesWeight, act.Weight) + i++ } - qry += fmt.Sprintf("('%s','%s','%s','%s','%s',%f,%d,'%s','%s',%f,%f,%f)", - acts.TPid, acts.ActionsId, act.Identifier, act.BalanceId, act.Direction, act.Units, act.ExpirationTime, - act.DestinationId, act.RateType, act.Rate, act.MinutesWeight, act.Weight) } if _, err := self.Db.Exec(qry); err != nil { return err @@ -567,7 +574,7 @@ func (self *SQLStorage) SetTPActions(acts *utils.TPActions) error { } func (self *SQLStorage) GetTPActions(tpid, actsId string) (*utils.TPActions, error) { - rows, err := self.Db.Query(fmt.Sprintf("SELECT action,balance_tag,direction,units,expiration_time,destination_tag,rate_type,rate, minutes_weight,weight FROM %s WHERE tpid='%s' AND tag='%s'", utils.TBL_TP_ACTIONS, tpid, actsId)) + rows, err := self.Db.Query(fmt.Sprintf("SELECT action,balance_type,direction,units,expiry_time,destination_tag,rate_type,rate, minutes_weight,weight FROM %s WHERE tpid='%s' AND tag='%s'", utils.TBL_TP_ACTIONS, tpid, actsId)) if err != nil { return nil, err } @@ -703,7 +710,7 @@ func (self *SQLStorage) SetTPActionTriggers(tpid string, ats map[string][]*Actio if len(ats) == 0 { return nil //Nothing to set } - qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,balance_tag,direction,threshold_type,threshold_value,destination_tag,actions_tag,weight) VALUES ", + qry := fmt.Sprintf("INSERT INTO %s (tpid,tag,balance_type,direction,threshold_type,threshold_value,destination_tag,actions_tag,weight) VALUES ", utils.TBL_TP_ACTION_TRIGGERS) i := 0 for atId, atRows := range ats { @@ -1105,8 +1112,8 @@ func (self *SQLStorage) GetTpActions(tpid, tag string) (map[string][]*Action, er for rows.Next() { var id int var units, rate, minutes_weight, weight float64 - var tpid, tag, action, balance_tag, direction, destinations_tag, rate_type, expirationDate string - if err := rows.Scan(&id, &tpid, &tag, &action, &balance_tag, &direction, &units, &expirationDate, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil { + var tpid, tag, action, balance_type, direction, destinations_tag, rate_type, expirationDate string + if err := rows.Scan(&id, &tpid, &tag, &action, &balance_type, &direction, &units, &expirationDate, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil { return nil, err } unix, err := strconv.ParseInt(expirationDate, 10, 64) @@ -1115,10 +1122,10 @@ func (self *SQLStorage) GetTpActions(tpid, tag string) (map[string][]*Action, er } expDate := time.Unix(unix, 0) var a *Action - if balance_tag != MINUTES { + if balance_type != MINUTES { a = &Action{ ActionType: action, - BalanceId: balance_tag, + BalanceId: balance_type, Direction: direction, Units: units, ExpirationDate: expDate, @@ -1128,7 +1135,7 @@ func (self *SQLStorage) GetTpActions(tpid, tag string) (map[string][]*Action, er a = &Action{ Id: utils.GenUUID(), ActionType: action, - BalanceId: balance_tag, + BalanceId: balance_type, Direction: direction, Weight: weight, ExpirationDate: expDate, @@ -1179,7 +1186,7 @@ func (self *SQLStorage) GetTpActionTimings(tpid, tag string) (ats map[string][]* func (self *SQLStorage) GetTpActionTriggers(tpid, tag string) (map[string][]*ActionTrigger, error) { ats := make(map[string][]*ActionTrigger) - q := fmt.Sprintf("SELECT tpid,tag,balance_tag,direction,threshold_type,threshold_value,destination_tag,actions_tag,weight FROM %s WHERE tpid='%s'", + q := fmt.Sprintf("SELECT tpid,tag,balance_type,direction,threshold_type,threshold_value,destination_tag,actions_tag,weight FROM %s WHERE tpid='%s'", utils.TBL_TP_ACTION_TRIGGERS, tpid) if tag != "" { q += fmt.Sprintf(" AND tag='%s'", tag) diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 7dc1ad7b3..b791f056a 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -19,9 +19,9 @@ along with this program. If not, see package engine import ( - "strconv" "errors" "github.com/cgrates/cgrates/utils" + "strconv" ) // Various helpers to deal with database @@ -53,4 +53,3 @@ func ConfigureDatabase(db_type, host, port, name, user, pass string) (db DataSto } return db, nil } - diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index 23d35f6bb..2c9fb2d0a 100644 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -19,48 +19,47 @@ along with this program. If not, see package engine import ( + "github.com/cgrates/cgrates/utils" "io" "io/ioutil" "log" "strconv" "time" - "github.com/cgrates/cgrates/utils" ) - // Import tariff plan from csv into storDb type TPCSVImporter struct { - TPid string // Load data on this tpid - StorDb DataStorage // StorDb connection handle - DirPath string // Directory path to import from - Sep rune // Separator in the csv file - Verbose bool // If true will print a detailed information instead of silently discarding it - ImportId string // Use this to differentiate between imports (eg: when autogenerating fields like RatingProfileId + TPid string // Load data on this tpid + StorDb DataStorage // StorDb connection handle + DirPath string // Directory path to import from + Sep rune // Separator in the csv file + Verbose bool // If true will print a detailed information instead of silently discarding it + ImportId string // Use this to differentiate between imports (eg: when autogenerating fields like RatingProfileId } -// Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis. +// Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis. // Change it to func(string) error as soon as Travis updates. -var fileHandlers = map[string]func(*TPCSVImporter,string) error{ - utils.TIMINGS_CSV: (*TPCSVImporter).importTimings, - utils.DESTINATIONS_CSV: (*TPCSVImporter).importDestinations, - utils.RATES_CSV: (*TPCSVImporter).importRates, - utils.DESTINATION_RATES_CSV: (*TPCSVImporter).importDestinationRates, - utils.DESTRATE_TIMINGS_CSV: (*TPCSVImporter).importDestRateTimings, - utils.RATE_PROFILES_CSV: (*TPCSVImporter).importRatingProfiles, - utils.ACTIONS_CSV: (*TPCSVImporter).importActions, - utils.ACTION_TIMINGS_CSV: (*TPCSVImporter).importActionTimings, - utils.ACTION_TRIGGERS_CSV: (*TPCSVImporter).importActionTriggers, - utils.ACCOUNT_ACTIONS_CSV: (*TPCSVImporter).importAccountActions, - } +var fileHandlers = map[string]func(*TPCSVImporter, string) error{ + utils.TIMINGS_CSV: (*TPCSVImporter).importTimings, + utils.DESTINATIONS_CSV: (*TPCSVImporter).importDestinations, + utils.RATES_CSV: (*TPCSVImporter).importRates, + utils.DESTINATION_RATES_CSV: (*TPCSVImporter).importDestinationRates, + utils.DESTRATE_TIMINGS_CSV: (*TPCSVImporter).importDestRateTimings, + utils.RATE_PROFILES_CSV: (*TPCSVImporter).importRatingProfiles, + utils.ACTIONS_CSV: (*TPCSVImporter).importActions, + utils.ACTION_TIMINGS_CSV: (*TPCSVImporter).importActionTimings, + utils.ACTION_TRIGGERS_CSV: (*TPCSVImporter).importActionTriggers, + utils.ACCOUNT_ACTIONS_CSV: (*TPCSVImporter).importAccountActions, +} func (self *TPCSVImporter) Run() error { files, _ := ioutil.ReadDir(self.DirPath) for _, f := range files { - fHandler,hasName := fileHandlers[f.Name()] + fHandler, hasName := fileHandlers[f.Name()] if !hasName { continue } - fHandler( self, f.Name() ) + fHandler(self, f.Name()) } return nil } @@ -70,8 +69,8 @@ func (self *TPCSVImporter) importTimings(fn string) error { if self.Verbose { log.Printf("Processing file: <%s> ", fn) } - fParser, err := NewTPCSVFileParser( self.DirPath, fn ) - if err!=nil { + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { return err } lineNr := 0 @@ -86,7 +85,7 @@ func (self *TPCSVImporter) importTimings(fn string) error { } continue } - tm := NewTiming( record... ) + tm := NewTiming(record...) if err := self.StorDb.SetTPTiming(self.TPid, tm); err != nil { log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } @@ -98,8 +97,8 @@ func (self *TPCSVImporter) importDestinations(fn string) error { if self.Verbose { log.Printf("Processing file: <%s> ", fn) } - fParser, err := NewTPCSVFileParser( self.DirPath, fn ) - if err!=nil { + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { return err } lineNr := 0 @@ -126,8 +125,8 @@ func (self *TPCSVImporter) importRates(fn string) error { if self.Verbose { log.Printf("Processing file: <%s> ", fn) } - fParser, err := NewTPCSVFileParser( self.DirPath, fn ) - if err!=nil { + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { return err } lineNr := 0 @@ -146,7 +145,7 @@ func (self *TPCSVImporter) importRates(fn string) error { if err != nil { return err } - if err := self.StorDb.SetTPRates( self.TPid, map[string][]*Rate{ record[0]: []*Rate{rt} } ); err != nil { + if err := self.StorDb.SetTPRates(self.TPid, map[string][]*Rate{record[0]: []*Rate{rt}}); err != nil { log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } } @@ -157,8 +156,8 @@ func (self *TPCSVImporter) importDestinationRates(fn string) error { if self.Verbose { log.Printf("Processing file: <%s> ", fn) } - fParser, err := NewTPCSVFileParser( self.DirPath, fn ) - if err!=nil { + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { return err } lineNr := 0 @@ -173,9 +172,9 @@ func (self *TPCSVImporter) importDestinationRates(fn string) error { } continue } - dr := &DestinationRate{record[0], record[1], record[2], nil} - if err := self.StorDb.SetTPDestinationRates( self.TPid, - map[string][]*DestinationRate{ dr.Tag: []*DestinationRate{dr} } ); err != nil { + dr := &DestinationRate{record[0], record[1], record[2], nil} + if err := self.StorDb.SetTPDestinationRates(self.TPid, + map[string][]*DestinationRate{dr.Tag: []*DestinationRate{dr}}); err != nil { log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } } @@ -186,8 +185,8 @@ func (self *TPCSVImporter) importDestRateTimings(fn string) error { if self.Verbose { log.Printf("Processing file: <%s> ", fn) } - fParser, err := NewTPCSVFileParser( self.DirPath, fn ) - if err!=nil { + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { return err } lineNr := 0 @@ -207,12 +206,12 @@ func (self *TPCSVImporter) importDestRateTimings(fn string) error { log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) continue } - drt := &DestinationRateTiming{Tag: record[0], - DestinationRatesTag: record[1], - Weight: weight, - TimingsTag: record[2], - } - if err := self.StorDb.SetTPDestRateTimings( self.TPid, map[string][]*DestinationRateTiming{drt.Tag:[]*DestinationRateTiming{drt}}); err != nil { + drt := &DestinationRateTiming{Tag: record[0], + DestinationRatesTag: record[1], + Weight: weight, + TimingsTag: record[2], + } + if err := self.StorDb.SetTPDestRateTimings(self.TPid, map[string][]*DestinationRateTiming{drt.Tag: []*DestinationRateTiming{drt}}); err != nil { log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } } @@ -223,8 +222,8 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error { if self.Verbose { log.Printf("Processing file: <%s> ", fn) } - fParser, err := NewTPCSVFileParser( self.DirPath, fn ) - if err!=nil { + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { return err } lineNr := 0 @@ -246,18 +245,18 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error { } rpTag := "TPCSV" //Autogenerate rating profile id if self.ImportId != "" { - rpTag += "_"+self.ImportId + rpTag += "_" + self.ImportId } - rp := &RatingProfile{Tag: rpTag, - Tenant: tenant, - TOR: tor, - Direction: direction, - Subject: subject, - ActivationTime: at.Unix(), - DestRatesTimingTag: destRatesTimingTag, - RatesFallbackSubject: fallbacksubject, - } - if err := self.StorDb.SetTPRatingProfiles( self.TPid, map[string][]*RatingProfile{rpTag:[]*RatingProfile{rp}}); err != nil { + rp := &RatingProfile{Tag: rpTag, + Tenant: tenant, + TOR: tor, + Direction: direction, + Subject: subject, + ActivationTime: at.Unix(), + DestRatesTimingTag: destRatesTimingTag, + RatesFallbackSubject: fallbacksubject, + } + if err := self.StorDb.SetTPRatingProfiles(self.TPid, map[string][]*RatingProfile{rpTag: []*RatingProfile{rp}}); err != nil { log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) } } @@ -265,6 +264,62 @@ func (self *TPCSVImporter) importRatingProfiles(fn string) error { } func (self *TPCSVImporter) importActions(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + fParser, err := NewTPCSVFileParser(self.DirPath, fn) + if err != nil { + return err + } + lineNr := 0 + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + actId, actionType, balanceType, direction, destTag, rateType := record[0], record[1], record[2], record[3], record[6], record[7] + units, err := strconv.ParseFloat(record[4], 64) + if err != nil { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + continue + } + var expiryTime time.Time // Empty initialized time represents never expire + if record[5] != "*unlimited" { // ToDo: Expand here for other meta tags or go way of adding time for expiry + expiryTime, err = time.Parse(time.RFC3339, record[5]) + if err != nil { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + continue + } + } + rateValue, _ := strconv.ParseFloat(record[8], 64) // Ignore errors since empty string is error, we can find out based on rateType if defined + minutesWeight, _ := strconv.ParseFloat(record[9], 64) + weight, err := strconv.ParseFloat(record[10], 64) + if err != nil { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + continue + } + act := &Action{ + ActionType: actionType, + BalanceId: balanceType, + Direction: direction, + Units: units, + ExpirationDate: expiryTime, + DestinationTag: destTag, + RateType: rateType, + RateValue: rateValue, + MinutesWeight: minutesWeight, + Weight: weight, + } + if err := self.StorDb.SetTPActions(self.TPid, map[string][]*Action{actId: []*Action{act}}); err != nil { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } return nil } @@ -279,5 +334,3 @@ func (self *TPCSVImporter) importActionTriggers(fn string) error { func (self *TPCSVImporter) importAccountActions(fn string) error { return nil } - - diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 03d92d7d1..38a7aab55 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -62,7 +62,7 @@ type DestRateTiming struct { type TPRatingProfile struct { TPid string // Tariff plan id - RatingProfileId string // RatingProfile id + RatingProfileId string // RatingProfile id Tenant string // Tenant's Id TOR string // TypeOfRecord Direction string // Traffic direction, OUT is the only one supported for now @@ -92,12 +92,12 @@ type TPActions struct { type Action struct { Identifier string // Identifier mapped in the code - BalanceId string // Type of balance the action will operate on + BalanceType string // Type of balance the action will operate on Direction string // Balance direction Units float64 // Number of units to add/deduct - ExpirationTime int64 // Time when the units will expire + ExpiryTime int64 // Time when the units will expire DestinationId string // Destination profile id - RateType string // Type of price + RateType string // Type of rate <*absolute|*percent> Rate float64 // Price value MinutesWeight float64 // Minutes weight Weight float64 // Action's weight diff --git a/utils/consts.go b/utils/consts.go index bbd905f60..944def977 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1,11 +1,11 @@ package utils const ( - VERSION = "0.9.1rc3" - POSTGRES = "postgres" - MYSQL = "mysql" - MONGO = "mongo" - REDIS = "redis" + VERSION = "0.9.1rc3" + POSTGRES = "postgres" + MYSQL = "mysql" + MONGO = "mongo" + REDIS = "redis" LOCALHOST = "127.0.0.1" FSCDR_FILE_CSV = "freeswitch_file_csv" FSCDR_HTTP_JSON = "freeswitch_http_json" @@ -29,28 +29,28 @@ const ( TBL_TP_ACTION_TIMINGS = "tp_action_timings" TBL_TP_ACTION_TRIGGERS = "tp_action_triggers" TBL_TP_ACCOUNT_ACTIONS = "tp_account_actions" - TIMINGS_CSV = "Timings.csv" - DESTINATIONS_CSV = "Destinations.csv" - RATES_CSV = "Rates.csv" - DESTINATION_RATES_CSV = "DestinationRates.csv" - DESTRATE_TIMINGS_CSV = "DestinationRateTimings.csv" - RATE_PROFILES_CSV = "RatingProfiles.csv" - ACTIONS_CSV = "Actions.csv" - ACTION_TIMINGS_CSV = "ActionTimings.csv" - ACTION_TRIGGERS_CSV = "ActionTriggers.csv" - ACCOUNT_ACTIONS_CSV = "AccountActions.csv" - TIMINGS_NRCOLS = 6 - DESTINATIONS_NRCOLS = 2 - RATES_NRCOLS = 9 + TIMINGS_CSV = "Timings.csv" + DESTINATIONS_CSV = "Destinations.csv" + RATES_CSV = "Rates.csv" + DESTINATION_RATES_CSV = "DestinationRates.csv" + DESTRATE_TIMINGS_CSV = "DestinationRateTimings.csv" + RATE_PROFILES_CSV = "RatingProfiles.csv" + ACTIONS_CSV = "Actions.csv" + ACTION_TIMINGS_CSV = "ActionTimings.csv" + ACTION_TRIGGERS_CSV = "ActionTriggers.csv" + ACCOUNT_ACTIONS_CSV = "AccountActions.csv" + TIMINGS_NRCOLS = 6 + DESTINATIONS_NRCOLS = 2 + RATES_NRCOLS = 9 DESTINATION_RATES_NRCOLS = 3 DESTRATE_TIMINGS_NRCOLS = 4 RATE_PROFILES_NRCOLS = 7 ACTIONS_NRCOLS = 11 - ACTION_TIMINGS_NRCOLS = 4 + ACTION_TIMINGS_NRCOLS = 4 ACTION_TRIGGERS_NRCOLS = 8 ACCOUNT_ACTIONS_NRCOLS = 5 - ROUNDING_UP = "up" - ROUNDING_MIDDLE = "middle" - ROUNDING_DOWN = "down" - COMMENT_CHAR = '#' + ROUNDING_UP = "up" + ROUNDING_MIDDLE = "middle" + ROUNDING_DOWN = "down" + COMMENT_CHAR = '#' )