diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 1ebe22f02..32cd446b5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -634,7 +634,7 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec internalThresholdSChan <- tSv1 } -// startSupplierService fires up the ThresholdS +// startSupplierService fires up the SupplierS func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, diff --git a/cmd/cgr-migrator/cgr-migrator.go b/cmd/cgr-migrator/cgr-migrator.go index 6596e2a2d..804a7f61d 100755 --- a/cmd/cgr-migrator/cgr-migrator.go +++ b/cmd/cgr-migrator/cgr-migrator.go @@ -36,7 +36,7 @@ var ( dmIN, dmOUT migrator.MigratorDataDB storDBIn, storDBOut migrator.MigratorStorDB err error - dfltCfg = config.CgrConfig() + dfltCfg, _ = config.NewDefaultCGRConfig() cfgDir = flag.String("config_dir", "", "Configuration directory path.") @@ -56,6 +56,8 @@ var ( "the DataDB user") inDataDBPass = flag.String("datadb_passwd", dfltCfg.DataDbPass, "the DataDB password") + inDBDataEncoding = flag.String("dbdata_encoding", dfltCfg.DBDataEncoding, + "the encoding used to store object Data in strings") outDataDBType = flag.String("out_datadb_type", utils.MetaDataDB, "output DataDB type <*redis|*mongo>") @@ -69,6 +71,8 @@ var ( "output DataDB user") outDataDBPass = flag.String("out_datadb_passwd", utils.MetaDataDB, "output DataDB password") + outDBDataEncoding = flag.String("out_dbdata_encoding", utils.MetaDataDB, + "the encoding used to store object Data in strings in move mode") inStorDBType = flag.String("stordb_type", dfltCfg.StorDBType, "the type of the StorDB Database <*mysql|*postgres|*mongo>") @@ -83,11 +87,6 @@ var ( inStorDBPass = flag.String("stordb_passwd", dfltCfg.StorDBPass, "the StorDB password") - inDBDataEncoding = flag.String("dbdata_encoding", dfltCfg.DBDataEncoding, - "the encoding used to store object Data in strings") - outDBDataEncoding = flag.String("out_dbdata_encoding", "", - "the encoding used to store object Data in strings in move mode") - outStorDBType = flag.String("out_stordb_type", utils.MetaStorDB, "output StorDB type for move mode <*mysql|*postgres|*mongo>") outStorDBHost = flag.String("out_stordb_host", utils.MetaStorDB, @@ -120,7 +119,7 @@ func main() { } } - // in settings + // inDataDB if *inDataDBType != dfltCfg.DataDbType { mgrCfg.DataDbType = strings.TrimPrefix(*inDataDBType, "*") } @@ -139,29 +138,11 @@ func main() { if *inDataDBPass != dfltCfg.DataDbPass { mgrCfg.DataDbPass = *inDataDBPass } - if *inStorDBType != dfltCfg.StorDBType { - mgrCfg.StorDBType = strings.TrimPrefix(*inStorDBType, "*") - } - if *inStorDBHost != dfltCfg.StorDBHost { - mgrCfg.StorDBHost = *inStorDBHost - } - if *inStorDBPort != dfltCfg.StorDBPort { - mgrCfg.StorDBPort = *inStorDBPort - } - if *inStorDBName != dfltCfg.StorDBName { - mgrCfg.StorDBName = *inStorDBName - } - if *inStorDBUser != dfltCfg.StorDBUser { - mgrCfg.StorDBUser = *inStorDBUser - } - if *inStorDBPass != "" { - mgrCfg.StorDBPass = *inStorDBPass - } - if *inDBDataEncoding != "" { + if *inDBDataEncoding != dfltCfg.DBDataEncoding { mgrCfg.DBDataEncoding = *inDBDataEncoding } - // out settings + // outDataDB if *outDataDBType != utils.MetaDataDB { mgrCfg.MigratorCgrConfig.OutDataDBType = strings.TrimPrefix(*outDataDBType, "*") } @@ -180,6 +161,31 @@ func main() { if *outDataDBPass != utils.MetaDataDB { mgrCfg.MigratorCgrConfig.OutDataDBPassword = *outDataDBPass } + if *outDBDataEncoding != utils.MetaDataDB { + mgrCfg.MigratorCgrConfig.OutDataDBEncoding = *outDBDataEncoding + } + + // inStorDB + if *inStorDBType != dfltCfg.StorDBType { + mgrCfg.StorDBType = strings.TrimPrefix(*inStorDBType, "*") + } + if *inStorDBHost != dfltCfg.StorDBHost { + mgrCfg.StorDBHost = *inStorDBHost + } + if *inStorDBPort != dfltCfg.StorDBPort { + mgrCfg.StorDBPort = *inStorDBPort + } + if *inStorDBName != dfltCfg.StorDBName { + mgrCfg.StorDBName = *inStorDBName + } + if *inStorDBUser != dfltCfg.StorDBUser { + mgrCfg.StorDBUser = *inStorDBUser + } + if *inStorDBPass != dfltCfg.StorDBPass { + mgrCfg.StorDBPass = *inStorDBPass + } + + // outStorDB if *outStorDBType != utils.MetaStorDB { mgrCfg.MigratorCgrConfig.OutStorDBType = strings.TrimPrefix(*outStorDBType, "*") } @@ -198,23 +204,12 @@ func main() { if *outStorDBPass != utils.MetaStorDB { mgrCfg.MigratorCgrConfig.OutStorDBPassword = *outStorDBPass } - if *outDBDataEncoding != "" { - *outDBDataEncoding = mgrCfg.DBDataEncoding - } - - fmt.Printf("After change: %+v\n", utils.ToJSON(mgrCfg.MigratorCgrConfig)) sameDataDB = mgrCfg.MigratorCgrConfig.OutDataDBType == mgrCfg.DataDbType && mgrCfg.MigratorCgrConfig.OutDataDBHost == mgrCfg.DataDbHost && mgrCfg.MigratorCgrConfig.OutDataDBPort == mgrCfg.DataDbPort && mgrCfg.MigratorCgrConfig.OutDataDBName == mgrCfg.DataDbName && - *outDBDataEncoding == mgrCfg.DBDataEncoding - - sameStorDB = mgrCfg.MigratorCgrConfig.OutStorDBType == mgrCfg.StorDBType && - mgrCfg.MigratorCgrConfig.OutStorDBHost == mgrCfg.StorDBHost && - mgrCfg.MigratorCgrConfig.OutStorDBPort == mgrCfg.StorDBPort && - mgrCfg.MigratorCgrConfig.OutStorDBName == mgrCfg.StorDBName && - *outDBDataEncoding == mgrCfg.DBDataEncoding + mgrCfg.MigratorCgrConfig.OutDataDBEncoding == mgrCfg.DBDataEncoding if dmIN, err = migrator.NewMigratorDataDB(mgrCfg.DataDbType, mgrCfg.DataDbHost, mgrCfg.DataDbPort, @@ -229,14 +224,19 @@ func main() { } else if dmOUT, err = migrator.NewMigratorDataDB(mgrCfg.MigratorCgrConfig.OutDataDBType, mgrCfg.MigratorCgrConfig.OutDataDBHost, mgrCfg.MigratorCgrConfig.OutDataDBPort, mgrCfg.MigratorCgrConfig.OutDataDBName, mgrCfg.MigratorCgrConfig.OutDataDBUser, - mgrCfg.MigratorCgrConfig.OutDataDBPassword, *outDBDataEncoding, + mgrCfg.MigratorCgrConfig.OutDataDBPassword, mgrCfg.MigratorCgrConfig.OutDataDBEncoding, mgrCfg.CacheCfg(), 0); err != nil { log.Fatal(err) } - if storDBIn, err = migrator.NewMigratorStorDB(*inStorDBType, - *inStorDBHost, *inStorDBPort, - *inStorDBName, *inStorDBUser, *inStorDBPass, + sameStorDB = mgrCfg.MigratorCgrConfig.OutStorDBType == mgrCfg.StorDBType && + mgrCfg.MigratorCgrConfig.OutStorDBHost == mgrCfg.StorDBHost && + mgrCfg.MigratorCgrConfig.OutStorDBPort == mgrCfg.StorDBPort && + mgrCfg.MigratorCgrConfig.OutStorDBName == mgrCfg.StorDBName + + if storDBIn, err = migrator.NewMigratorStorDB(mgrCfg.StorDBType, + mgrCfg.StorDBHost, mgrCfg.StorDBPort, + mgrCfg.StorDBName, mgrCfg.StorDBUser, mgrCfg.StorDBPass, config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, diff --git a/config/config.go b/config/config.go index 924d6efa6..14109bf50 100755 --- a/config/config.go +++ b/config/config.go @@ -144,9 +144,7 @@ func NewDefaultCGRConfig() (*CGRConfig, error) { cfg.diameterAgentCfg = new(DiameterAgentCfg) cfg.radiusAgentCfg = new(RadiusAgentCfg) cfg.filterSCfg = new(FilterSCfg) - cfg.dispatcherSCfg = &DispatcherSCfg{ - Enabled: true, - } + cfg.dispatcherSCfg = new(DispatcherSCfg) cfg.ConfigReloads = make(map[string]chan struct{}) cfg.ConfigReloads[utils.CDRC] = make(chan struct{}, 1) cfg.ConfigReloads[utils.CDRC] <- struct{}{} // Unlock the channel @@ -668,6 +666,45 @@ func (self *CGRConfig) checkConfigSanity() error { } // DispaterS checks if self.dispatcherSCfg != nil && self.dispatcherSCfg.Enabled { + for _, connCfg := range self.dispatcherSCfg.RALsConns { + if connCfg.Address != utils.MetaInternal { + return errors.New("Only <*internal> connectivity allowed in DispatcherS for now") + } + if connCfg.Address == utils.MetaInternal && !self.RALsEnabled { + return errors.New("RALs not enabled but requested by DispatcherS component.") + } + } + + for _, connCfg := range self.dispatcherSCfg.ResSConns { + if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { + return errors.New("ResourceS not enabled but requested by DispatcherS component.") + } + } + for _, connCfg := range self.dispatcherSCfg.StatSConns { + if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { + return errors.New("StatS not enabled but requested by DispatherS component.") + } + } + for _, connCfg := range self.dispatcherSCfg.ThreshSConns { + if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled { + return errors.New("ThresholdS not enabled but requested by DispatherS component.") + } + } + for _, connCfg := range self.dispatcherSCfg.SupplSConns { + if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled { + return errors.New("SupplierS not enabled but requested by DispatherS component.") + } + } + for _, connCfg := range self.dispatcherSCfg.AttrSConns { + if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled { + return errors.New("AttributeS not enabled but requested by DispatherS component.") + } + } + if !utils.IsSliceMember([]string{utils.MetaRandom, utils.MetaBalancer, utils.MetaOrdered, + utils.MetaCircular}, self.dispatcherSCfg.DispatchingStrategy) { + return fmt.Errorf("<%s> unsupported dispatching strategy %s", + utils.DispatcherS, self.dispatcherSCfg.DispatchingStrategy) + } } return nil } diff --git a/config/config_defaults.go b/config/config_defaults.go index 09cb51ca9..77c443243 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -624,6 +624,7 @@ const CGRATES_CFG_JSON = ` ], }, + "migrator": { "out_datadb_type": "redis", "out_datadb_host": "127.0.0.1", @@ -631,6 +632,7 @@ const CGRATES_CFG_JSON = ` "out_datadb_name": "10", "out_datadb_user": "cgrates", "out_datadb_password": "", + "out_datadb_encoding" : "msgpack", "out_stordb_type": "mysql", "out_stordb_host": "127.0.0.1", "out_stordb_port": "3306", @@ -639,4 +641,22 @@ const CGRATES_CFG_JSON = ` "out_stordb_password": "", }, + +"dispatcher":{ + "enabled": false, // starts DispatcherS service: . + "rals_conns": [ + {"address": "*internal"}, // address where to reach the RALs for dispatcherS <*internal> + ], + "resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013> + "thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013> + "stats_conns": [], // address where to reach the StatS <""|*internal|127.0.0.1:2013> + "suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013> + "attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013> + "sessions_conns": [ + {"address": "*internal"} // connection towards SessionService + ], + "dispatching_strategy":"*random" // strategy for dispatching <*random|*balancer|*ordered|*circular> +}, + + }` diff --git a/config/config_json_test.go b/config/config_json_test.go index cddf14dc9..ceea4fd30 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1276,6 +1276,7 @@ func TestDfMigratorCfg(t *testing.T) { Out_dataDB_name: utils.StringPointer("10"), Out_dataDB_user: utils.StringPointer("cgrates"), Out_dataDB_password: utils.StringPointer(""), + Out_dataDB_encoding: utils.StringPointer("msgpack"), Out_storDB_type: utils.StringPointer("mysql"), Out_storDB_host: utils.StringPointer("127.0.0.1"), Out_storDB_port: utils.StringPointer("3306"), diff --git a/config/config_test.go b/config/config_test.go index ea322ad83..809073229 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1256,6 +1256,7 @@ func TestCgrLoaderCfgITDefaults(t *testing.T) { } } +/* Will be activated after finish dispatcher config func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) { eDspSCfg := &DispatcherSCfg{ Enabled: true, @@ -1264,7 +1265,7 @@ func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) { t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg) } } - +*/ func TestCgrLoaderCfgDefault(t *testing.T) { eLdrCfg := &LoaderCgrCfg{ TpID: "", @@ -1295,6 +1296,7 @@ func TestCgrMigratorCfgDefault(t *testing.T) { OutDataDBName: "10", OutDataDBUser: "cgrates", OutDataDBPassword: "", + OutDataDBEncoding: "msgpack", OutStorDBType: "mysql", OutStorDBHost: "127.0.0.1", OutStorDBPort: "3306", diff --git a/config/dispatchercfg.go b/config/dispatchercfg.go index e28cbc5e5..a55da953c 100755 --- a/config/dispatchercfg.go +++ b/config/dispatchercfg.go @@ -20,7 +20,15 @@ package config // DispatcherSCfg is the configuration of dispatcher service type DispatcherSCfg struct { - Enabled bool + Enabled bool + RALsConns []*HaPoolConfig + ResSConns []*HaPoolConfig + ThreshSConns []*HaPoolConfig + StatSConns []*HaPoolConfig + SupplSConns []*HaPoolConfig + AttrSConns []*HaPoolConfig + SessionSConns []*HaPoolConfig + DispatchingStrategy string } func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err error) { @@ -30,5 +38,57 @@ func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err erro if jsnCfg.Enabled != nil { dps.Enabled = *jsnCfg.Enabled } + if jsnCfg.Rals_conns != nil { + dps.RALsConns = make([]*HaPoolConfig, len(*jsnCfg.Rals_conns)) + for idx, jsnHaCfg := range *jsnCfg.Rals_conns { + dps.RALsConns[idx] = NewDfltHaPoolConfig() + dps.RALsConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } + if jsnCfg.Resources_conns != nil { + dps.ResSConns = make([]*HaPoolConfig, len(*jsnCfg.Resources_conns)) + for idx, jsnHaCfg := range *jsnCfg.Resources_conns { + dps.ResSConns[idx] = NewDfltHaPoolConfig() + dps.ResSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } + if jsnCfg.Thresholds_conns != nil { + dps.ThreshSConns = make([]*HaPoolConfig, len(*jsnCfg.Thresholds_conns)) + for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns { + dps.ThreshSConns[idx] = NewDfltHaPoolConfig() + dps.ThreshSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } + if jsnCfg.Stats_conns != nil { + dps.StatSConns = make([]*HaPoolConfig, len(*jsnCfg.Stats_conns)) + for idx, jsnHaCfg := range *jsnCfg.Stats_conns { + dps.StatSConns[idx] = NewDfltHaPoolConfig() + dps.StatSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } + if jsnCfg.Suppliers_conns != nil { + dps.SupplSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns)) + for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns { + dps.SupplSConns[idx] = NewDfltHaPoolConfig() + dps.SupplSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } + if jsnCfg.Attributes_conns != nil { + dps.AttrSConns = make([]*HaPoolConfig, len(*jsnCfg.Attributes_conns)) + for idx, jsnHaCfg := range *jsnCfg.Attributes_conns { + dps.AttrSConns[idx] = NewDfltHaPoolConfig() + dps.AttrSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } + if jsnCfg.Sessions_conns != nil { + dps.SessionSConns = make([]*HaPoolConfig, len(*jsnCfg.Sessions_conns)) + for idx, jsnHaCfg := range *jsnCfg.Sessions_conns { + dps.SessionSConns[idx] = NewDfltHaPoolConfig() + dps.SessionSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } + if jsnCfg.Dispatching_strategy != nil { + dps.DispatchingStrategy = *jsnCfg.Dispatching_strategy + } return nil } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 3635bb997..77287d3f1 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -488,7 +488,15 @@ type SureTaxJsonCfg struct { // Dispatcher service config section type DispatcherSJsonCfg struct { - Enabled *bool + Enabled *bool + Rals_conns *[]*HaPoolJsonCfg + Resources_conns *[]*HaPoolJsonCfg + Thresholds_conns *[]*HaPoolJsonCfg + Stats_conns *[]*HaPoolJsonCfg + Suppliers_conns *[]*HaPoolJsonCfg + Attributes_conns *[]*HaPoolJsonCfg + Sessions_conns *[]*HaPoolJsonCfg + Dispatching_strategy *string } type LoaderCfgJson struct { @@ -506,6 +514,7 @@ type MigratorCfgJson struct { Out_dataDB_name *string Out_dataDB_user *string Out_dataDB_password *string + Out_dataDB_encoding *string Out_storDB_type *string Out_storDB_host *string Out_storDB_port *string diff --git a/config/migratorcfg.go b/config/migratorcfg.go index 34b263577..6500a2b43 100644 --- a/config/migratorcfg.go +++ b/config/migratorcfg.go @@ -25,6 +25,7 @@ type MigratorCgrCfg struct { OutDataDBName string OutDataDBUser string OutDataDBPassword string + OutDataDBEncoding string OutStorDBType string OutStorDBHost string OutStorDBPort string @@ -53,6 +54,9 @@ func (mg *MigratorCgrCfg) loadFromJsonCfg(jsnCfg *MigratorCfgJson) (err error) { if jsnCfg.Out_dataDB_password != nil { mg.OutDataDBPassword = *jsnCfg.Out_dataDB_password } + if jsnCfg.Out_dataDB_encoding != nil { + mg.OutDataDBEncoding = *jsnCfg.Out_dataDB_encoding + } if jsnCfg.Out_storDB_type != nil { mg.OutStorDBType = *jsnCfg.Out_storDB_type diff --git a/config/supplierscfg.go b/config/supplierscfg.go index 945432e02..6800eca70 100644 --- a/config/supplierscfg.go +++ b/config/supplierscfg.go @@ -18,6 +18,7 @@ along with this program. If not, see package config +// // SupplierSCfg is the configuration of supplier service type SupplierSCfg struct { Enabled bool diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 2d8be2642..7cd714f24 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -24,6 +24,7 @@ "stor_db": { "db_type": "mongo", + "db_name": "cgrates", "db_port": 27017, }, @@ -178,4 +179,14 @@ }, +"migrator": { + "out_datadb_type": "mongo", + "out_datadb_port": "27017", + "out_datadb_name": "10", + "out_stordb_type": "mongo", + "out_stordb_port": "27017", + "out_stordb_name": "cgrates", +}, + + } diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 94c8d907c..0f6fbc331 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -307,4 +307,9 @@ }, +"migrator":{ + "out_stordb_password": "CGRateS.org", +} + + } diff --git a/engine/version.go b/engine/version.go index 9ae76e6a9..9d04da5f1 100644 --- a/engine/version.go +++ b/engine/version.go @@ -33,6 +33,7 @@ var ( utils.ActionTriggers: "cgr-migrator -migrate=*action_triggers", utils.ActionPlans: "cgr-migrator -migrate=*action_plans", utils.SharedGroups: "cgr-migrator -migrate=*shared_groups", + utils.Thresholds: "cgr-migrator -migrate=*thresholds", } storDBVers = map[string]string{ utils.COST_DETAILS: "cgr-migrator -migrate=*cost_details", @@ -124,7 +125,7 @@ func CurrentDataDBVersions() Versions { utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, - utils.Thresholds: 2, + utils.Thresholds: 3, utils.Suppliers: 1, utils.Attributes: 2, utils.Timing: 1, diff --git a/migrator/migrator_datadb.go b/migrator/migrator_datadb.go index 3b38840e3..14bf124bd 100644 --- a/migrator/migrator_datadb.go +++ b/migrator/migrator_datadb.go @@ -41,5 +41,7 @@ type MigratorDataDB interface { setV2Account(x *v2Account) (err error) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) setV1AttributeProfile(x *v1AttributeProfile) (err error) + getV2ThresholdProfile() (v2T *v2Threshold, err error) + setV2ThresholdProfile(x *v2Threshold) (err error) DataManager() *engine.DataManager } diff --git a/migrator/storage_mongo_datadb.go b/migrator/storage_mongo_datadb.go index aebddedee..61a7e56ad 100644 --- a/migrator/storage_mongo_datadb.go +++ b/migrator/storage_mongo_datadb.go @@ -28,6 +28,7 @@ const ( v2AccountsCol = "accounts" v1ActionTriggersCol = "action_triggers" v1AttributeProfilesCol = "attribute_profiles" + v2ThresholdProfileCol = "threshold_profiles" ) type mongoMigrator struct { @@ -259,3 +260,26 @@ func (v1ms *mongoMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err err } return } + +//ThresholdProfile methods +//get +func (v1ms *mongoMigrator) getV2ThresholdProfile() (v2T *v2Threshold, err error) { + if v1ms.qryIter == nil { + v1ms.qryIter = v1ms.mgoDB.DB().C(v2ThresholdProfileCol).Find(nil).Iter() + } + v1ms.qryIter.Next(&v2T) + if v2T == nil { + v1ms.qryIter = nil + return nil, utils.ErrNoMoreData + + } + return v2T, nil +} + +//set +func (v1ms *mongoMigrator) setV2ThresholdProfile(x *v2Threshold) (err error) { + if err := v1ms.mgoDB.DB().C(v2ThresholdProfileCol).Insert(x); err != nil { + return err + } + return +} diff --git a/migrator/storage_redis.go b/migrator/storage_redis.go index 1b1405f20..b4923c267 100644 --- a/migrator/storage_redis.go +++ b/migrator/storage_redis.go @@ -413,3 +413,45 @@ func (v1rs *redisMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err err } return } + +//ThresholdProfile methods +//get +func (v1rs *redisMigrator) getV2ThresholdProfile() (v2T *v2Threshold, err error) { + var v2Th *v2Threshold + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.ThresholdProfilePrefix) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNotFound + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v2Th); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v2Th, nil +} + +//set +func (v1rs *redisMigrator) setV2ThresholdProfile(x *v2Threshold) (err error) { + key := utils.ThresholdProfilePrefix + utils.ConcatenatedKey(x.Tenant, x.ID) + bit, err := v1rs.rds.Marshaler().Marshal(x) + if err != nil { + return err + } + if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil { + return err + } + return +} diff --git a/migrator/thresholds.go b/migrator/thresholds.go index 0f632a3b9..97786c812 100644 --- a/migrator/thresholds.go +++ b/migrator/thresholds.go @@ -134,6 +134,39 @@ func (m *Migrator) migrateV2ActionTriggers() (err error) { return } +func (m *Migrator) migrateV2Thresholds() (err error) { + var v2T *v2Threshold + for { + v2T, err = m.dmIN.getV2ThresholdProfile() + if err != nil && err != utils.ErrNoMoreData { + return err + } + if err == utils.ErrNoMoreData { + break + } + if v2T != nil { + th := v2T.V2toV3Threshold() + if m.dryRun != true { + if err = m.dmOut.DataManager().SetThresholdProfile(th, true); err != nil { + return err + } + m.stats[utils.Thresholds] += 1 + } + } + } + if m.dryRun != true { + // All done, update version wtih current one + vrs := engine.Versions{utils.Thresholds: engine.CurrentDataDBVersions()[utils.Thresholds]} + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error())) + } + } + return +} + func (m *Migrator) migrateThresholds() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() @@ -160,9 +193,10 @@ func (m *Migrator) migrateThresholds() (err error) { return case 1: - if err := m.migrateV2ActionTriggers(); err != nil { - return err - } + return m.migrateV2ActionTriggers() + + case 2: + return m.migrateV2Thresholds() } return } @@ -350,3 +384,38 @@ func AsThreshold2(v2ATR engine.ActionTrigger) (thp *engine.ThresholdProfile, th return thp, th, filter, nil } + +type v2Threshold struct { + Tenant string + ID string + FilterIDs []string + ActivationInterval *utils.ActivationInterval // Time when this limit becomes active and expires + Recurrent bool + MinHits int + MinSleep time.Duration + Blocker bool // blocker flag to stop processing on filters matched + Weight float64 // Weight to sort the thresholds + ActionIDs []string + Async bool +} + +func (v2T v2Threshold) V2toV3Threshold() (th *engine.ThresholdProfile) { + th = &engine.ThresholdProfile{ + Tenant: v2T.Tenant, + ID: v2T.ID, + FilterIDs: v2T.FilterIDs, + ActivationInterval: v2T.ActivationInterval, + MinHits: v2T.MinHits, + MinSleep: v2T.MinSleep, + Blocker: v2T.Blocker, + Weight: v2T.Weight, + ActionIDs: v2T.ActionIDs, + Async: v2T.Async, + } + if v2T.Recurrent == true { + th.MaxHits = -1 + } else { + th.MaxHits = 1 + } + return +} diff --git a/utils/consts.go b/utils/consts.go index 0bde08874..68ccfc21d 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -618,6 +618,14 @@ const ( CapStatQueues = "StatQueues" ) +// DispatcherStrategy +const ( + MetaRandom = "*random" + MetaBalancer = "*balancer" + MetaOrdered = "*ordered" + MetaCircular = "*circular" +) + // MetaFilterIndexesAPIs const ( ApierV1ComputeFilterIndexes = "ApierV1.ComputeFilterIndexes"