From 473d8dbc5a511a1ea8a81e1469d301b067fa0f93 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 10 Jan 2019 09:09:10 -0500 Subject: [PATCH] Add infrastructure for DispatcherProfile --- apier/v1/apier.go | 38 +++- apier/v1/dispatcher.go | 60 ++++++ apier/v1/dispatcher_it_test.go | 121 +++++++++++++ apier/v1/precache_it_test.go | 16 ++ apier/v2/apier.go | 3 +- cmd/cgr-loader/cgr-loader.go | 11 +- cmd/cgr-tester/cgr-tester.go | 2 +- config/config_defaults.go | 2 + config/config_json_test.go | 5 + config/config_test.go | 4 + .../mysql/create_tariffplan_tables.sql | 22 +++ .../postgres/create_tariffplan_tables.sql | 21 +++ engine/datamanager.go | 88 ++++++++- engine/filterindexer.go | 14 ++ engine/libdispatcher.go | 48 +++++ engine/libtest.go | 1 + engine/loader_csv_test.go | 50 ++++- engine/loader_it_test.go | 22 +++ engine/model_helpers.go | 171 ++++++++++++++++++ engine/models.go | 13 ++ engine/storage_csv.go | 46 ++++- engine/storage_interface.go | 5 + engine/storage_map_datadb.go | 36 +++- engine/storage_map_stordb.go | 8 +- engine/storage_mongo_datadb.go | 41 +++++ engine/storage_mongo_stordb.go | 48 +++++ engine/storage_redis.go | 35 +++- engine/storage_sql.go | 49 ++++- engine/tp_reader.go | 134 ++++++++++---- engine/tpimporter_csv.go | 13 ++ engine/version.go | 2 + general_tests/acntacts_test.go | 4 +- general_tests/auth_test.go | 4 +- general_tests/costs1_test.go | 4 +- general_tests/datachrg1_test.go | 4 +- general_tests/ddazmbl1_test.go | 4 +- general_tests/ddazmbl2_test.go | 4 +- general_tests/ddazmbl3_test.go | 4 +- general_tests/smschrg1_test.go | 4 +- utils/apitpdata.go | 13 ++ utils/consts.go | 170 +++++++++-------- 41 files changed, 1190 insertions(+), 154 deletions(-) create mode 100644 apier/v1/dispatcher_it_test.go create mode 100644 engine/libdispatcher.go diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 1db8a1b0b..78f75a67b 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1016,7 +1016,7 @@ func (self *ApierV1) LoadCache(args utils.AttrReloadCache, reply *string) (err e if args.FlushAll { engine.Cache.Clear(nil) } - var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, splpIDs, alsPrfIDs, cppIDs []string + var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, splpIDs, alsPrfIDs, cppIDs, dppIDs []string if args.DestinationIDs == nil { dstIDs = nil } else { @@ -1127,10 +1127,15 @@ func (self *ApierV1) LoadCache(args utils.AttrReloadCache, reply *string) (err e } else { cppIDs = *args.ChargerProfileIDs } + if args.DispatcherProfileIDs == nil { + dppIDs = nil + } else { + dppIDs = *args.DispatcherProfileIDs + } if err := self.DataManager.LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, - fltrIDs, splpIDs, alsPrfIDs, cppIDs); err != nil { + fltrIDs, splpIDs, alsPrfIDs, cppIDs, dppIDs); err != nil { return utils.NewErrServerError(err) } *reply = utils.OK @@ -1311,6 +1316,14 @@ func (self *ApierV1) FlushCache(args utils.AttrReloadCache, reply *string) (err true, utils.NonTransactional) } } + if args.DispatcherProfileIDs == nil { + engine.Cache.Clear([]string{utils.CacheDispatcherProfiles}) + } else if len(*args.DispatcherProfileIDs) != 0 { + for _, key := range *args.DispatcherProfileIDs { + engine.Cache.Remove(utils.CacheDispatcherProfiles, key, + true, utils.NonTransactional) + } + } *reply = utils.OK return @@ -1339,6 +1352,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach cs.SupplierProfiles = len(engine.Cache.GetItemIDs(utils.CacheSupplierProfiles, "")) cs.AttributeProfiles = len(engine.Cache.GetItemIDs(utils.CacheAttributeProfiles, "")) cs.ChargerProfiles = len(engine.Cache.GetItemIDs(utils.CacheChargerProfiles, "")) + cs.DispatcherProfiles = len(engine.Cache.GetItemIDs(utils.CacheDispatcherProfiles, "")) if self.Users != nil { var ups engine.UserProfiles @@ -1761,6 +1775,25 @@ func (v1 *ApierV1) GetCacheKeys(args utils.ArgsCacheKeys, reply *utils.ArgsCache } } + if args.DispatcherProfileIDs != nil { + var ids []string + if len(*args.DispatcherProfileIDs) != 0 { + for _, id := range *args.DispatcherProfileIDs { + if _, hasIt := engine.Cache.Get(utils.CacheDispatcherProfiles, id); hasIt { + ids = append(ids, id) + } + } + } else { + for _, id := range engine.Cache.GetItemIDs(utils.CacheDispatcherProfiles, "") { + ids = append(ids, id) + } + } + ids = args.Paginator.PaginateStringSlice(ids) + if len(ids) != 0 { + reply.ChargerProfileIDs = &ids + } + } + return } @@ -1799,6 +1832,7 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, path.Join(attrs.FolderPath, utils.SuppliersCsv), path.Join(attrs.FolderPath, utils.AttributesCsv), path.Join(attrs.FolderPath, utils.ChargersCsv), + path.Join(attrs.FolderPath, utils.DispatchersCsv), ), "", self.Config.GeneralCfg().DefaultTimezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index d5adf8fcd..1361dbb07 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -22,8 +22,68 @@ import ( "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/utils" ) +// GetDispatcherProfile returns a Dispatcher Profile +func (apierV1 *ApierV1) GetDispatcherProfile(arg *utils.TenantID, reply *engine.DispatcherProfile) error { + if missing := utils.MissingStructFields(arg, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + if dpp, err := apierV1.DataManager.GetDispatcherProfile(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { + if err.Error() != utils.ErrNotFound.Error() { + err = utils.NewErrServerError(err) + } + return err + } else { + *reply = *dpp + } + return nil +} + +// GetDispatcherProfileIDs returns list of dispatcherProfile IDs registered for a tenant +func (apierV1 *ApierV1) GetDispatcherProfileIDs(tenant string, dPrfIDs *[]string) error { + prfx := utils.DispatcherProfilePrefix + tenant + ":" + keys, err := apierV1.DataManager.DataDB().GetKeysForPrefix(prfx) + if err != nil { + return err + } + retIDs := make([]string, len(keys)) + for i, key := range keys { + retIDs[i] = key[len(prfx):] + } + *dPrfIDs = retIDs + return nil +} + +//SetDispatcherProfile add/update a new Dispatcher Profile +func (apierV1 *ApierV1) SetDispatcherProfile(dpp *engine.DispatcherProfile, reply *string) error { + if missing := utils.MissingStructFields(dpp, []string{"Tenant", "ID"}); len(missing) != 0 { + return utils.NewErrMandatoryIeMissing(missing...) + } + if err := apierV1.DataManager.SetDispatcherProfile(dpp, true); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} + +//RemoveDispatcherProfile remove a specific Dispatcher Profile +func (apierV1 *ApierV1) RemoveDispatcherProfile(arg *utils.TenantID, reply *string) error { + if missing := utils.MissingStructFields(arg, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + if err := apierV1.DataManager.RemoveDispatcherProfile(arg.Tenant, + arg.ID, utils.NonTransactional, true); err != nil { + if err.Error() != utils.ErrNotFound.Error() { + err = utils.NewErrServerError(err) + } + return err + } + *reply = utils.OK + return nil +} + func NewDispatcherThresholdSv1(dps *dispatchers.DispatcherService) *DispatcherThresholdSv1 { return &DispatcherThresholdSv1{dS: dps} } diff --git a/apier/v1/dispatcher_it_test.go b/apier/v1/dispatcher_it_test.go new file mode 100644 index 000000000..a9d068c06 --- /dev/null +++ b/apier/v1/dispatcher_it_test.go @@ -0,0 +1,121 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "net/rpc" + "net/rpc/jsonrpc" + "path" + "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" +) + +var ( + dispatcherCfgPath string + dispatcherCfg *config.CGRConfig + dispatcherRPC *rpc.Client + dispatcherProfile *engine.DispatcherProfile + dispatcherConfigDIR string //run tests for specific configuration +) + +var sTestsDispatcher = []func(t *testing.T){ + testDispatcherSInitCfg, + testDispatcherSInitDataDb, + testDispatcherSResetStorDb, + testDispatcherSStartEngine, + testDispatcherSRPCConn, + testDispatcherSSetDispatcherProfile, + testDispatcherSGetDispatcherProfileIDs, + testDispatcherSUpdateDispatcherProfile, + testDispatcherSRemDispatcherProfile, + testDispatcherSKillEngine, +} + +//Test start here +func TestDispatcherSITMySql(t *testing.T) { + dispatcherConfigDIR = "tutmysql" + for _, stest := range sTestsDispatcher { + t.Run(dispatcherConfigDIR, stest) + } +} + +func TestDispatcherSITMongo(t *testing.T) { + dispatcherConfigDIR = "tutmongo" + for _, stest := range sTestsDispatcher { + t.Run(dispatcherConfigDIR, stest) + } +} + +func testDispatcherSInitCfg(t *testing.T) { + var err error + dispatcherCfgPath = path.Join(*dataDir, "conf", "samples", dispatcherConfigDIR) + dispatcherCfg, err = config.NewCGRConfigFromFolder(dispatcherCfgPath) + if err != nil { + t.Error(err) + } + dispatcherCfg.DataFolderPath = *dataDir + config.SetCgrConfig(dispatcherCfg) +} + +func testDispatcherSInitDataDb(t *testing.T) { + if err := engine.InitDataDb(dispatcherCfg); err != nil { + t.Fatal(err) + } +} + +// Wipe out the cdr database +func testDispatcherSResetStorDb(t *testing.T) { + if err := engine.InitStorDb(dispatcherCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func testDispatcherSStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(dispatcherCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testDispatcherSRPCConn(t *testing.T) { + var err error + dispatcherRPC, err = jsonrpc.Dial("tcp", dispatcherCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +func testDispatcherSSetDispatcherProfile(t *testing.T) {} + +func testDispatcherSGetDispatcherProfileIDs(t *testing.T) {} + +func testDispatcherSUpdateDispatcherProfile(t *testing.T) {} + +func testDispatcherSRemDispatcherProfile(t *testing.T) {} + +func testDispatcherSKillEngine(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/apier/v1/precache_it_test.go b/apier/v1/precache_it_test.go index 95ab55e40..11f8df36d 100644 --- a/apier/v1/precache_it_test.go +++ b/apier/v1/precache_it_test.go @@ -155,6 +155,14 @@ func testPrecacheGetCacheStatsBeforeLoad(t *testing.T) { Items: 0, Groups: 0, }, + "dispatcher_filter_indexes": { + Items: 0, + Groups: 0, + }, + "dispatcher_profiles": { + Items: 0, + Groups: 0, + }, "derived_chargers": { Items: 0, Groups: 0, @@ -315,6 +323,14 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) { Items: 0, Groups: 0, }, + "dispatcher_filter_indexes": { + Items: 0, + Groups: 0, + }, + "dispatcher_profiles": { + Items: 0, + Groups: 0, + }, "derived_chargers": { Items: 1, // expected to have 1 item Groups: 0, diff --git a/apier/v2/apier.go b/apier/v2/apier.go index 53121b5f7..4255ac02e 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -27,7 +27,7 @@ import ( "strconv" "strings" - "github.com/cgrates/cgrates/apier/v1" + v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" @@ -147,6 +147,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, path.Join(attrs.FolderPath, utils.SuppliersCsv), path.Join(attrs.FolderPath, utils.AttributesCsv), path.Join(attrs.FolderPath, utils.ChargersCsv), + path.Join(attrs.FolderPath, utils.DispatchersCsv), ), "", self.Config.GeneralCfg().DefaultTimezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 21a815922..bc037dac2 100755 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -300,6 +300,7 @@ func main() { path.Join(*dataPath, utils.SuppliersCsv), path.Join(*dataPath, utils.AttributesCsv), path.Join(*dataPath, utils.ChargersCsv), + path.Join(*dataPath, utils.DispatchersCsv), ) } @@ -353,7 +354,8 @@ func main() { if err := tpReader.WriteToDatabase(*flush, *verbose, *disableReverse); err != nil { log.Fatal("Could not write to database: ", err) } - var dstIds, revDstIDs, rplIds, rpfIds, actIds, aapIDs, shgIds, alsIds, dcsIds, rspIDs, resIDs, aatIDs, ralsIDs, stqIDs, stqpIDs, trsIDs, trspfIDs, flrIDs, spfIDs, apfIDs, chargerIDs []string + var dstIds, revDstIDs, rplIds, rpfIds, actIds, aapIDs, shgIds, alsIds, dcsIds, rspIDs, resIDs, + aatIDs, ralsIDs, stqIDs, stqpIDs, trsIDs, trspfIDs, flrIDs, spfIDs, apfIDs, chargerIDs, dppIDs []string if cacheS != nil { dstIds, _ = tpReader.GetLoadedIds(utils.DESTINATION_PREFIX) revDstIDs, _ = tpReader.GetLoadedIds(utils.REVERSE_DESTINATION_PREFIX) @@ -376,6 +378,7 @@ func main() { spfIDs, _ = tpReader.GetLoadedIds(utils.SupplierProfilePrefix) apfIDs, _ = tpReader.GetLoadedIds(utils.AttributeProfilePrefix) chargerIDs, _ = tpReader.GetLoadedIds(utils.ChargerProfilePrefix) + dppIDs, _ = tpReader.GetLoadedIds(utils.DispatcherProfilePrefix) } aps, _ := tpReader.GetLoadedIds(utils.ACTION_PLAN_PREFIX) // for users reloading @@ -416,7 +419,8 @@ func main() { FilterIDs: &flrIDs, SupplierProfileIDs: &spfIDs, AttributeProfileIDs: &apfIDs, - ChargerProfileIDs: &chargerIDs}, + ChargerProfileIDs: &chargerIDs, + DispatcherProfileIDs: &dppIDs}, FlushAll: *flush, }, &reply); err != nil { log.Printf("WARNING: Got error on cache reload: %s\n", err.Error()) @@ -443,6 +447,9 @@ func main() { if len(chargerIDs) != 0 { cacheIDs = append(cacheIDs, utils.CacheChargerFilterIndexes) } + if len(dppIDs) != 0 { + cacheIDs = append(cacheIDs, utils.CacheDispatcherFilterIndexes) + } if err = cacheS.Call(utils.CacheSv1Clear, cacheIDs, &reply); err != nil { log.Printf("WARNING: Got error on cache clear: %s\n", err.Error()) } diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 28c258303..9b5b0023d 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -81,7 +81,7 @@ func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) { defer dm.DataDB().Close() engine.SetDataStorage(dm) if err := dm.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil); err != nil { + nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil); err != nil { return nilDuration, fmt.Errorf("Cache rating error: %s", err.Error()) } log.Printf("Runnning %d cycles...", *runs) diff --git a/config/config_defaults.go b/config/config_defaults.go index a39c33c4a..7ab651924 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -141,12 +141,14 @@ const CGRATES_CFG_JSON = ` "supplier_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control supplier profile caching "attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control attribute profile caching "charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control charger profile caching + "dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control dispatcher profile caching "resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control resource filter indexes caching "stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control stat filter indexes caching "threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control threshold filter indexes caching "supplier_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control supplier filter indexes caching "attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control attribute filter indexes caching "charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control charger filter indexes caching + "dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher filter indexes caching "diameter_messages": {"limit": -1, "ttl": "1h", "static_ttl": false}, // diameter messages caching }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 77c8e641c..8ba4f71b8 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -142,6 +142,9 @@ func TestCacheJsonCfg(t *testing.T) { utils.CacheChargerProfiles: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false), Precache: utils.BoolPointer(false)}, + utils.CacheDispatcherProfiles: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), + Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false), + Precache: utils.BoolPointer(false)}, utils.CacheResourceFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)}, utils.CacheStatFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), @@ -154,6 +157,8 @@ func TestCacheJsonCfg(t *testing.T) { Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)}, utils.CacheChargerFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)}, + utils.CacheDispatcherFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), + Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)}, utils.CacheDiameterMessages: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("1h"), Static_ttl: utils.BoolPointer(false)}, } diff --git a/config/config_test.go b/config/config_test.go index 8800fb87a..37049a869 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -711,6 +711,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) { TTL: time.Duration(0), StaticTTL: false, Precache: false}, utils.CacheChargerProfiles: &CacheParamCfg{Limit: -1, TTL: time.Duration(0), StaticTTL: false, Precache: false}, + utils.CacheDispatcherProfiles: &CacheParamCfg{Limit: -1, + TTL: time.Duration(0), StaticTTL: false, Precache: false}, utils.CacheResourceFilterIndexes: &CacheParamCfg{Limit: -1, TTL: time.Duration(0), StaticTTL: false, Precache: false}, utils.CacheStatFilterIndexes: &CacheParamCfg{Limit: -1, @@ -723,6 +725,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) { TTL: time.Duration(0), StaticTTL: false, Precache: false}, utils.CacheChargerFilterIndexes: &CacheParamCfg{Limit: -1, TTL: time.Duration(0), StaticTTL: false, Precache: false}, + utils.CacheDispatcherFilterIndexes: &CacheParamCfg{Limit: -1, + TTL: time.Duration(0), StaticTTL: false, Precache: false}, utils.CacheDiameterMessages: &CacheParamCfg{Limit: -1, TTL: time.Duration(1 * time.Hour), StaticTTL: false}, } diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql index d09692cf1..1e3e93a57 100644 --- a/data/storage/mysql/create_tariffplan_tables.sql +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -506,6 +506,28 @@ CREATE TABLE tp_chargers ( `id`,`filter_ids`,`run_id`,`attribute_ids`) ); +-- +-- Table structure for table `tp_dispatchers` +-- + +DROP TABLE IF EXISTS tp_dispatchers; +CREATE TABLE tp_dispatchers ( + `pk` int(11) NOT NULL AUTO_INCREMENT, + `tpid` varchar(64) NOT NULL, + `tenant` varchar(64) NOT NULL, + `id` varchar(64) NOT NULL, + `filter_ids` varchar(64) NOT NULL, + `activation_interval` varchar(64) NOT NULL, + `strategy` varchar(64) NOT NULL, + `hosts` varchar(64) NOT NULL, + `weight` decimal(8,2) NOT NULL, + `created_at` TIMESTAMP, + PRIMARY KEY (`pk`), + KEY `tpid` (`tpid`), + UNIQUE KEY `unique_tp_dispatchers` (`tpid`,`tenant`, + `id`,`filter_ids`,`strategy`,`hosts`) +); + -- -- Table structure for table `versions` -- diff --git a/data/storage/postgres/create_tariffplan_tables.sql b/data/storage/postgres/create_tariffplan_tables.sql index 1236d88ad..567ba2af1 100644 --- a/data/storage/postgres/create_tariffplan_tables.sql +++ b/data/storage/postgres/create_tariffplan_tables.sql @@ -496,6 +496,27 @@ CREATE INDEX tp_suppliers_unique ON tp_suppliers ("tpid", "tenant", "id", CREATE INDEX tp_chargers_unique ON tp_chargers ("tpid", "tenant", "id", "filter_ids","run_id","attribute_ids"); + -- + -- Table structure for table `tp_chargers` + -- + + DROP TABLE IF EXISTS tp_dispatchers; + CREATE TABLE tp_dispatchers ( + "pk" SERIAL PRIMARY KEY, + "tpid" varchar(64) NOT NULL, + "tenant"varchar(64) NOT NULL, + "id" varchar(64) NOT NULL, + "filter_ids" varchar(64) NOT NULL, + "activation_interval" varchar(64) NOT NULL, + "strategy" varchar(64) NOT NULL, + "hosts" varchar(64) NOT NULL, + "weight" decimal(8,2) NOT NULL, + "created_at" TIMESTAMP WITH TIME ZONE + ); + CREATE INDEX tp_dispatchers_ids ON tp_dispatchers (tpid); + CREATE INDEX tp_dispatchers_unique ON tp_dispatchers ("tpid", "tenant", "id", + "filter_ids","strategy","hosts"); + -- -- Table structure for table `versions` -- diff --git a/engine/datamanager.go b/engine/datamanager.go index d04f327ef..c4c4af1a6 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -46,7 +46,7 @@ func (dm *DataManager) DataDB() DataDB { func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aaPlIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rpIDs, resIDs, - stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, splPrflIDs, alsPrfIDs, cppIDs []string) (err error) { + stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, splPrflIDs, alsPrfIDs, cppIDs, dppIDs []string) (err error) { if dm.DataDB().GetStorageType() == utils.MAPSTOR { if dm.cacheCfg == nil { return @@ -58,7 +58,8 @@ func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACTION_TRIGGER_PREFIX, utils.SHARED_GROUP_PREFIX, utils.ALIASES_PREFIX, utils.REVERSE_ALIASES_PREFIX, utils.StatQueuePrefix, utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix, - utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix, utils.ChargerProfilePrefix}, k) && cacheCfg.Precache { + utils.FilterPrefix, utils.SupplierProfilePrefix, + utils.AttributeProfilePrefix, utils.ChargerProfilePrefix, utils.DispatcherProfilePrefix}, k) && cacheCfg.Precache { if err := dm.PreloadCacheForPrefix(k); err != nil && err != utils.ErrInvalidKey { return err } @@ -89,6 +90,7 @@ func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, utils.SupplierProfilePrefix: splPrflIDs, utils.AttributeProfilePrefix: alsPrfIDs, utils.ChargerProfilePrefix: cppIDs, + utils.DispatcherProfilePrefix: dppIDs, } { if err = dm.CacheDataFromDB(key, ids, false); err != nil { return @@ -148,7 +150,8 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix, - utils.ChargerProfilePrefix}, prfx) { + utils.ChargerProfilePrefix, + utils.DispatcherProfilePrefix}, prfx) { return utils.NewCGRError(utils.DataManager, utils.MandatoryIEMissingCaps, utils.UnsupportedCachePrefix, @@ -241,6 +244,9 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b case utils.ChargerProfilePrefix: tntID := utils.NewTenantID(dataID) _, err = dm.GetChargerProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + case utils.DispatcherProfilePrefix: + tntID := utils.NewTenantID(dataID) + _, err = dm.GetDispatcherProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) } if err != nil { return utils.NewCGRError(utils.DataManager, @@ -1228,7 +1234,7 @@ func (dm *DataManager) SetChargerProfile(cpp *ChargerProfile, withIndex bool) (e } } if needsRemove { - if err = NewFilterIndexer(dm, utils.SupplierProfilePrefix, + if err = NewFilterIndexer(dm, utils.ChargerProfilePrefix, cpp.Tenant).RemoveItemFromIndex(cpp.Tenant, cpp.ID, oldCpp.FilterIDs); err != nil { return } @@ -1255,3 +1261,77 @@ func (dm *DataManager) RemoveChargerProfile(tenant, id string, } return } + +func (dm *DataManager) GetDispatcherProfile(tenant, id string, cacheRead, cacheWrite bool, + transactionID string) (dpp *DispatcherProfile, err error) { + tntID := utils.ConcatenatedKey(tenant, id) + if cacheRead { + if x, ok := Cache.Get(utils.CacheDispatcherProfiles, tntID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*DispatcherProfile), nil + } + } + dpp, err = dm.dataDB.GetDispatcherProfileDrv(tenant, id) + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheDispatcherProfiles, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + } + return nil, err + } + if cacheWrite { + Cache.Set(utils.CacheDispatcherProfiles, tntID, dpp, nil, + cacheCommit(transactionID), transactionID) + } + return +} + +func (dm *DataManager) SetDispatcherProfile(dpp *DispatcherProfile, withIndex bool) (err error) { + oldDpp, err := dm.GetDispatcherProfile(dpp.Tenant, dpp.ID, true, false, utils.NonTransactional) + if err != nil && err != utils.ErrNotFound { + return err + } + if err = dm.DataDB().SetDispatcherProfileDrv(dpp); err != nil { + return err + } + if err = dm.CacheDataFromDB(utils.DispatcherProfilePrefix, []string{dpp.TenantID()}, true); err != nil { + return + } + if withIndex { + if oldDpp != nil { + var needsRemove bool + for _, fltrID := range oldDpp.FilterIDs { + if !utils.IsSliceMember(dpp.FilterIDs, fltrID) { + needsRemove = true + } + } + if needsRemove { + if err = NewFilterIndexer(dm, utils.DispatcherProfilePrefix, + dpp.Tenant).RemoveItemFromIndex(dpp.Tenant, dpp.ID, oldDpp.FilterIDs); err != nil { + return + } + } + } + return createAndIndex(utils.DispatcherProfilePrefix, dpp.Tenant, utils.EmptyString, dpp.ID, dpp.FilterIDs, dm) + } + return +} + +func (dm *DataManager) RemoveDispatcherProfile(tenant, id string, + transactionID string, withIndex bool) (err error) { + oldDpp, err := dm.GetDispatcherProfile(tenant, id, true, false, utils.NonTransactional) + if err != nil && err != utils.ErrNotFound { + return err + } + if err = dm.DataDB().RemoveDispatcherProfileDrv(tenant, id); err != nil { + return + } + Cache.Remove(utils.CacheDispatcherProfiles, utils.ConcatenatedKey(tenant, id), + cacheCommit(transactionID), transactionID) + if withIndex { + return NewFilterIndexer(dm, utils.DispatcherProfilePrefix, tenant).RemoveItemFromIndex(tenant, id, oldDpp.FilterIDs) + } + return +} diff --git a/engine/filterindexer.go b/engine/filterindexer.go index d7f77de18..88664349a 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -97,6 +97,9 @@ func (rfi *FilterIndexer) cacheRemItemType() { // ToDo: tune here by removing pe case utils.ChargerProfilePrefix: Cache.Clear([]string{utils.CacheChargerFilterIndexes}) + + case utils.DispatcherProfilePrefix: + Cache.Clear([]string{utils.CacheDispatcherFilterIndexes}) } } @@ -201,6 +204,17 @@ func (rfi *FilterIndexer) RemoveItemFromIndex(tenant, itemID string, oldFilters filterIDs[i] = fltrID } } + case utils.DispatcherProfilePrefix: + dpp, err := rfi.dm.GetDispatcherProfile(tenant, itemID, true, false, utils.NonTransactional) + if err != nil && err != utils.ErrNotFound { + return err + } + if dpp != nil { + filterIDs = make([]string, len(dpp.FilterIDs)) + for i, fltrID := range dpp.FilterIDs { + filterIDs[i] = fltrID + } + } default: } if len(filterIDs) == 0 { diff --git a/engine/libdispatcher.go b/engine/libdispatcher.go new file mode 100644 index 000000000..540e1b2f8 --- /dev/null +++ b/engine/libdispatcher.go @@ -0,0 +1,48 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "sort" + + "github.com/cgrates/cgrates/utils" +) + +// DispatcherProfile is the config for one Dispatcher +type DispatcherProfile struct { + Tenant string + ID string + FilterIDs []string + ActivationInterval *utils.ActivationInterval // Activation interval + Strategy string + Hosts []string // perform data aliasing based on these Attributes + Weight float64 +} + +func (dP *DispatcherProfile) TenantID() string { + return utils.ConcatenatedKey(dP.Tenant, dP.ID) +} + +// ChargerProfiles is a sortable list of Charger profiles +type DispatcherProfiles []*DispatcherProfile + +// Sort is part of sort interface, sort based on Weight +func (dps DispatcherProfiles) Sort() { + sort.Slice(dps, func(i, j int) bool { return dps[i].Weight > dps[j].Weight }) +} diff --git a/engine/libtest.go b/engine/libtest.go index 884041064..2bd86d1fa 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -146,6 +146,7 @@ func LoadTariffPlanFromFolder(tpPath, timezone string, dm *DataManager, disable_ path.Join(tpPath, utils.SuppliersCsv), path.Join(tpPath, utils.AttributesCsv), path.Join(tpPath, utils.ChargersCsv), + path.Join(tpPath, utils.DispatchersCsv), ), "", timezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 3b6cb7421..16bbe4e0c 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -294,15 +294,20 @@ cgrates.org,ALS1,con2;con3,,,Field2,Initial2,Sub2,false,, chargerProfiles = ` #Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight cgrates.org,Charger1,*string:Account:1001,2014-07-29T15:00:00Z,*rated,ATTR_1001_SIMPLEAUTH,20 +` + dispatcherProfiles = ` +#Tenant,ID,FilterIDs,ActivationInterval,Strategy,Hosts,Weight +cgrates.org,D1,*string:Account:1001,2014-07-29T15:00:00Z,*first,192.168.56.203;192.168.56.204,20 ` ) var csvr *TpReader func init() { - csvr = NewTpReader(dm.dataDB, NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, actions, actionPlans, actionTriggers, accountActions, derivedCharges, - users, aliases, resProfiles, stats, thresholds, filters, sppProfiles, attributeProfiles, chargerProfiles), testTPID, "") + csvr = NewTpReader(dm.dataDB, NewStringCSVStorage(',', destinations, timings, rates, destinationRates, + ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, + accountActions, derivedCharges, users, aliases, resProfiles, stats, thresholds, + filters, sppProfiles, attributeProfiles, chargerProfiles, dispatcherProfiles), testTPID, "") if err := csvr.LoadDestinations(); err != nil { log.Print("error in LoadDestinations:", err) @@ -367,6 +372,9 @@ func init() { if err := csvr.LoadChargerProfiles(); err != nil { log.Print("error in LoadChargerProfiles:", err) } + if err := csvr.LoadDispatcherProfiles(); err != nil { + log.Print("error in LoadChargerProfiles:", err) + } csvr.WriteToDatabase(false, false, false) Cache.Clear(nil) //dm.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) @@ -1714,6 +1722,42 @@ func TestLoadChargerProfiles(t *testing.T) { } } +func TestLoadDispatcherProfiles(t *testing.T) { + eDispatcherProfiles := map[utils.TenantID]*utils.TPDispatcherProfile{ + utils.TenantID{Tenant: "cgrates.org", ID: "D1"}: &utils.TPDispatcherProfile{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "D1", + FilterIDs: []string{"*string:Account:1001"}, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + }, + Strategy: "*first", + Hosts: []string{"192.168.56.203", "192.168.56.204"}, + Weight: 20, + }, + } + revHosts := &utils.TPDispatcherProfile{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "D1", + FilterIDs: []string{"*string:Account:1001"}, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + }, + Strategy: "*first", + Hosts: []string{"192.168.56.204", "192.168.56.203"}, + Weight: 20, + } + dppKey := utils.TenantID{Tenant: "cgrates.org", ID: "D1"} + if len(csvr.dispatcherProfiles) != len(eDispatcherProfiles) { + t.Errorf("Failed to load chargerProfiles: %s", utils.ToIJSON(csvr.chargerProfiles)) + } else if !reflect.DeepEqual(eDispatcherProfiles[dppKey], csvr.dispatcherProfiles[dppKey]) && + !reflect.DeepEqual(revHosts, csvr.dispatcherProfiles[dppKey]) { + t.Errorf("Expecting: %+v, received: %+v", eDispatcherProfiles[dppKey], csvr.dispatcherProfiles[dppKey]) + } +} + func TestLoadResource(t *testing.T) { eResources := []*utils.TenantID{ &utils.TenantID{ diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index b63572fb6..b0897c20d 100644 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -117,6 +117,7 @@ func TestLoaderITRemoveLoad(t *testing.T) { path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.SuppliersCsv), path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.AttributesCsv), path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ChargersCsv), + path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.DispatchersCsv), ), "", "") if err = loader.LoadDestinations(); err != nil { @@ -179,6 +180,9 @@ func TestLoaderITRemoveLoad(t *testing.T) { if err = loader.LoadChargerProfiles(); err != nil { t.Error("Failed loading Charger profiles: ", err.Error()) } + if err = loader.LoadDispatcherProfiles(); err != nil { + t.Error("Failed loading Charger profiles: ", err.Error()) + } if err := loader.WriteToDatabase(true, false, false); err != nil { t.Error("Could not write data into dataDb: ", err.Error()) } @@ -217,6 +221,7 @@ func TestLoaderITLoadFromCSV(t *testing.T) { path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.SuppliersCsv), path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.AttributesCsv), path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ChargersCsv), + path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.DispatchersCsv), ), "", "") if err = loader.LoadDestinations(); err != nil { @@ -279,6 +284,9 @@ func TestLoaderITLoadFromCSV(t *testing.T) { if err = loader.LoadChargerProfiles(); err != nil { t.Error("Failed loading Alias profiles: ", err.Error()) } + if err = loader.LoadDispatcherProfiles(); err != nil { + t.Error("Failed loading Charger profiles: ", err.Error()) + } if err := loader.WriteToDatabase(true, false, false); err != nil { t.Error("Could not write data into dataDb: ", err.Error()) } @@ -490,6 +498,20 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } } + for tenatid, dpp := range loader.dispatcherProfiles { + rcv, err := loader.dm.GetDispatcherProfile(tenatid.Tenant, tenatid.ID, false, false, utils.NonTransactional) + if err != nil { + t.Errorf("Failed GetDispatcherProfile, tenant: %s, id: %s, error: %s ", dpp.Tenant, dpp.ID, err.Error()) + } + dp, err := APItoDispatcherProfile(dpp, "UTC") + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(dp, rcv) { + t.Errorf("Expecting: %v, received: %v", dp, rcv) + } + } + } // Imports data from csv files in tpScenario to storDb diff --git a/engine/model_helpers.go b/engine/model_helpers.go index eedade5dc..e380d6a32 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2614,3 +2614,174 @@ func APItoChargerProfile(tpCPP *utils.TPChargerProfile, timezone string) (cpp *C } return cpp, nil } + +type TPDispatchers []*TPDispatcher + +func (tps TPDispatchers) AsTPDispatchers() (result []*utils.TPDispatcherProfile) { + mst := make(map[string]*utils.TPDispatcherProfile) + filterMap := make(map[string]utils.StringMap) + hostMap := make(map[string]utils.StringMap) + for _, tp := range tps { + tpDPP, found := mst[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] + if !found { + tpDPP = &utils.TPDispatcherProfile{ + TPid: tp.Tpid, + Tenant: tp.Tenant, + ID: tp.ID, + } + } + if tp.Weight != 0 { + tpDPP.Weight = tp.Weight + } + if len(tp.ActivationInterval) != 0 { + tpDPP.ActivationInterval = new(utils.TPActivationInterval) + aiSplt := strings.Split(tp.ActivationInterval, utils.INFIELD_SEP) + if len(aiSplt) == 2 { + tpDPP.ActivationInterval.ActivationTime = aiSplt[0] + tpDPP.ActivationInterval.ExpiryTime = aiSplt[1] + } else if len(aiSplt) == 1 { + tpDPP.ActivationInterval.ActivationTime = aiSplt[0] + } + } + if tp.FilterIDs != "" { + if _, has := filterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()]; !has { + filterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] = make(utils.StringMap) + } + filterSplit := strings.Split(tp.FilterIDs, utils.INFIELD_SEP) + for _, filter := range filterSplit { + filterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][filter] = true + } + } + if tp.Strategy != "" { + tpDPP.Strategy = tp.Strategy + } + if tp.Hosts != "" { + if _, has := hostMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()]; !has { + hostMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] = make(utils.StringMap) + } + hostsSplit := strings.Split(tp.Hosts, utils.INFIELD_SEP) + for _, host := range hostsSplit { + hostMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][host] = true + } + } + mst[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] = tpDPP + } + result = make([]*utils.TPDispatcherProfile, len(mst)) + i := 0 + for tntID, tp := range mst { + result[i] = tp + for filterID := range filterMap[tntID] { + result[i].FilterIDs = append(result[i].FilterIDs, filterID) + } + for host := range hostMap[tntID] { + result[i].Hosts = append(result[i].Hosts, host) + } + i++ + } + return +} + +func APItoModelTPDispatcher(tpDPP *utils.TPDispatcherProfile) (mdls TPDispatchers) { + if tpDPP != nil { + min := len(tpDPP.FilterIDs) + isFilter := true + if min > len(tpDPP.Hosts) { + min = len(tpDPP.Hosts) + isFilter = false + } + if min == 0 { + mdl := &TPDispatcher{ + Tenant: tpDPP.Tenant, + Tpid: tpDPP.TPid, + ID: tpDPP.ID, + Weight: tpDPP.Weight, + Strategy: tpDPP.Strategy, + } + if tpDPP.ActivationInterval != nil { + if tpDPP.ActivationInterval.ActivationTime != "" { + mdl.ActivationInterval = tpDPP.ActivationInterval.ActivationTime + } + if tpDPP.ActivationInterval.ExpiryTime != "" { + mdl.ActivationInterval += utils.INFIELD_SEP + tpDPP.ActivationInterval.ExpiryTime + } + } + if isFilter && len(tpDPP.Hosts) > 0 { + mdl.Hosts = tpDPP.Hosts[0] + } else if len(tpDPP.FilterIDs) > 0 { + mdl.FilterIDs = tpDPP.FilterIDs[0] + } + min = 1 + mdls = append(mdls, mdl) + } else { + for i := 0; i < min; i++ { + mdl := &TPDispatcher{ + Tenant: tpDPP.Tenant, + Tpid: tpDPP.TPid, + ID: tpDPP.ID, + } + if i == 0 { + mdl.Weight = tpDPP.Weight + mdl.Strategy = tpDPP.Strategy + if tpDPP.ActivationInterval != nil { + if tpDPP.ActivationInterval.ActivationTime != "" { + mdl.ActivationInterval = tpDPP.ActivationInterval.ActivationTime + } + if tpDPP.ActivationInterval.ExpiryTime != "" { + mdl.ActivationInterval += utils.INFIELD_SEP + tpDPP.ActivationInterval.ExpiryTime + } + } + } + mdl.Hosts = tpDPP.Hosts[i] + mdl.FilterIDs = tpDPP.FilterIDs[i] + mdls = append(mdls, mdl) + } + } + if len(tpDPP.FilterIDs)-min > 0 { + for i := min; i < len(tpDPP.FilterIDs); i++ { + mdl := &TPDispatcher{ + Tenant: tpDPP.Tenant, + Tpid: tpDPP.TPid, + ID: tpDPP.ID, + } + mdl.FilterIDs = tpDPP.FilterIDs[i] + mdls = append(mdls, mdl) + } + } + if len(tpDPP.Hosts)-min > 0 { + for i := min; i < len(tpDPP.Hosts); i++ { + mdl := &TPDispatcher{ + Tenant: tpDPP.Tenant, + Tpid: tpDPP.TPid, + ID: tpDPP.ID, + } + mdl.Hosts = tpDPP.Hosts[i] + mdls = append(mdls, mdl) + } + } + + } + return +} + +func APItoDispatcherProfile(tpDPP *utils.TPDispatcherProfile, timezone string) (dpp *DispatcherProfile, err error) { + dpp = &DispatcherProfile{ + Tenant: tpDPP.Tenant, + ID: tpDPP.ID, + Weight: tpDPP.Weight, + Strategy: tpDPP.Strategy, + FilterIDs: make([]string, len(tpDPP.FilterIDs)), + Hosts: make([]string, len(tpDPP.Hosts)), + } + for i, fli := range tpDPP.FilterIDs { + dpp.FilterIDs[i] = fli + } + for i, host := range tpDPP.Hosts { + dpp.Hosts[i] = host + } + if tpDPP.ActivationInterval != nil { + if dpp.ActivationInterval, err = tpDPP.ActivationInterval.AsActivationInterval(timezone); err != nil { + return nil, err + } + } + return dpp, nil +} diff --git a/engine/models.go b/engine/models.go index 3c1b11a54..a7ab000e7 100644 --- a/engine/models.go +++ b/engine/models.go @@ -503,3 +503,16 @@ type TPCharger struct { Weight float64 `index:"6" re:"\d+\.?\d*"` CreatedAt time.Time } + +type TPDispatcher struct { + PK uint `gorm:"primary_key"` + Tpid string + Tenant string `index:"0" re:""` + ID string `index:"1" re:""` + FilterIDs string `index:"2" re:""` + ActivationInterval string `index:"3" re:""` + Strategy string `index:"4" re:""` + Hosts string `index:"5" re:""` + Weight float64 `index:"6" re:"\d+\.?\d*"` + CreatedAt time.Time +} diff --git a/engine/storage_csv.go b/engine/storage_csv.go index f289a9b04..70279fef0 100644 --- a/engine/storage_csv.go +++ b/engine/storage_csv.go @@ -34,34 +34,38 @@ type CSVStorage struct { // file names destinationsFn, ratesFn, destinationratesFn, timingsFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, - usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn string + usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, + chargerProfilesFn, dispatcherProfilesFn string } func NewFileCSVStorage(sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, usersFn, aliasesFn, - resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn string) *CSVStorage { + resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn string) *CSVStorage { c := new(CSVStorage) c.sep = sep c.readerFunc = openFileCSVStorage c.destinationsFn, c.timingsFn, c.ratesFn, c.destinationratesFn, c.destinationratetimingsFn, c.ratingprofilesFn, c.sharedgroupsFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn, c.usersFn, c.aliasesFn, c.resProfilesFn, c.statsFn, c.thresholdsFn, - c.filterFn, c.suppProfilesFn, c.attributeProfilesFn, c.chargerProfilesFn = destinationsFn, timingsFn, + c.filterFn, c.suppProfilesFn, c.attributeProfilesFn, + c.chargerProfilesFn, c.dispatcherProfilesFn = destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, - usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn + usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, + attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn return c } func NewStringCSVStorage(sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, usersFn, - aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn string) *CSVStorage { + aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, + attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn string) *CSVStorage { c := NewFileCSVStorage(sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, usersFn, aliasesFn, resProfilesFn, - statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn) + statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn) c.readerFunc = openStringCSVStorage return c } @@ -733,10 +737,38 @@ func (csvs *CSVStorage) GetTPChargers(tpid, tenant, id string) ([]*utils.TPCharg return tpCPPs.AsTPChargers(), nil } +func (csvs *CSVStorage) GetTPDispatchers(tpid, tenant, id string) ([]*utils.TPDispatcherProfile, error) { + csvReader, fp, err := csvs.readerFunc(csvs.dispatcherProfilesFn, csvs.sep, getColumnCount(TPDispatcher{})) + if err != nil { + // allow writing of the other values + return nil, nil + } + if fp != nil { + defer fp.Close() + } + var tpDPPs TPDispatchers + for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { + if err != nil { + log.Printf("bad line in %s, %s\n", csvs.dispatcherProfilesFn, err.Error()) + return nil, err + } + if dpp, err := csvLoad(TPDispatcher{}, record); err != nil { + log.Print("error loading tpDispatcherProfile: ", err) + return nil, err + } else { + dpp := dpp.(TPDispatcher) + dpp.Tpid = tpid + tpDPPs = append(tpDPPs, &dpp) + } + } + return tpDPPs.AsTPDispatchers(), nil +} + func (csvs *CSVStorage) GetTpIds(colName string) ([]string, error) { return nil, utils.ErrNotImplemented } -func (csvs *CSVStorage) GetTpTableIds(tpid, table string, distinct utils.TPDistinctIds, filters map[string]string, p *utils.Paginator) ([]string, error) { +func (csvs *CSVStorage) GetTpTableIds(tpid, table string, + distinct utils.TPDistinctIds, filters map[string]string, p *utils.Paginator) ([]string, error) { return nil, utils.ErrNotImplemented } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index ede3b1ac3..87b1bf1c7 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -140,6 +140,9 @@ type DataDB interface { GetChargerProfileDrv(string, string) (*ChargerProfile, error) SetChargerProfileDrv(*ChargerProfile) error RemoveChargerProfileDrv(string, string) error + GetDispatcherProfileDrv(string, string) (*DispatcherProfile, error) + SetDispatcherProfileDrv(*DispatcherProfile) error + RemoveDispatcherProfileDrv(string, string) error } type StorDB interface { @@ -189,6 +192,7 @@ type LoadReader interface { GetTPSuppliers(string, string, string) ([]*utils.TPSupplierProfile, error) GetTPAttributes(string, string, string) ([]*utils.TPAttributeProfile, error) GetTPChargers(string, string, string) ([]*utils.TPChargerProfile, error) + GetTPDispatchers(string, string, string) ([]*utils.TPDispatcherProfile, error) } type LoadWriter interface { @@ -214,6 +218,7 @@ type LoadWriter interface { SetTPSuppliers([]*utils.TPSupplierProfile) error SetTPAttributes([]*utils.TPAttributeProfile) error SetTPChargers([]*utils.TPChargerProfile) error + SetTPDispatchers([]*utils.TPDispatcherProfile) error } // NewMarshaler returns the marshaler type selected by mrshlerStr diff --git a/engine/storage_map_datadb.go b/engine/storage_map_datadb.go index 14be2316e..b16c65975 100644 --- a/engine/storage_map_datadb.go +++ b/engine/storage_map_datadb.go @@ -222,7 +222,8 @@ func (ms *MapStorage) HasDataDrv(category, subject, tenant string) (bool, error) return exists, nil case utils.ResourcesPrefix, utils.ResourceProfilesPrefix, utils.StatQueuePrefix, utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix, - utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix, utils.ChargerProfilePrefix: + utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix, + utils.ChargerProfilePrefix, utils.DispatcherProfilePrefix: _, exists := ms.dict[category+utils.ConcatenatedKey(tenant, subject)] return exists, nil } @@ -1545,6 +1546,39 @@ func (ms *MapStorage) RemoveChargerProfileDrv(tenant, id string) (err error) { return } +func (ms *MapStorage) GetDispatcherProfileDrv(tenant, id string) (r *DispatcherProfile, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + values, ok := ms.dict[utils.DispatcherProfilePrefix+utils.ConcatenatedKey(tenant, id)] + if !ok { + return nil, utils.ErrNotFound + } + err = ms.ms.Unmarshal(values, &r) + if err != nil { + return nil, err + } + return +} + +func (ms *MapStorage) SetDispatcherProfileDrv(r *DispatcherProfile) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + result, err := ms.ms.Marshal(r) + if err != nil { + return err + } + ms.dict[utils.DispatcherProfilePrefix+utils.ConcatenatedKey(r.Tenant, r.ID)] = result + return +} + +func (ms *MapStorage) RemoveDispatcherProfileDrv(tenant, id string) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + key := utils.DispatcherProfilePrefix + utils.ConcatenatedKey(tenant, id) + delete(ms.dict, key) + return +} + func (ms *MapStorage) GetVersions(itm string) (vrs Versions, err error) { ms.mu.Lock() defer ms.mu.Unlock() diff --git a/engine/storage_map_stordb.go b/engine/storage_map_stordb.go index 38f9e7c41..0c41936cb 100755 --- a/engine/storage_map_stordb.go +++ b/engine/storage_map_stordb.go @@ -94,6 +94,9 @@ func (ms *MapStorage) GetTPAttributes(tpid, tenant, id string) (attrs []*utils.T func (ms *MapStorage) GetTPChargers(tpid, tenant, id string) (attrs []*utils.TPChargerProfile, err error) { return nil, utils.ErrNotImplemented } +func (ms *MapStorage) GetTPDispatchers(tpid, tenant, id string) (attrs []*utils.TPDispatcherProfile, err error) { + return nil, utils.ErrNotImplemented +} //implement LoadWriter interface func (ms *MapStorage) RemTpData(table, tpid string, args map[string]string) (err error) { @@ -161,7 +164,10 @@ func (ms *MapStorage) SetTPSuppliers(suppliers []*utils.TPSupplierProfile) (err func (ms *MapStorage) SetTPAttributes(attributes []*utils.TPAttributeProfile) (err error) { return utils.ErrNotImplemented } -func (ms *MapStorage) SetTPChargers(attributes []*utils.TPChargerProfile) (err error) { +func (ms *MapStorage) SetTPChargers(cpps []*utils.TPChargerProfile) (err error) { + return utils.ErrNotImplemented +} +func (ms *MapStorage) SetTPDispatchers(dpps []*utils.TPDispatcherProfile) (err error) { return utils.ErrNotImplemented } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 388b428c4..a5d2028b8 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -75,6 +75,7 @@ const ( colAttr = "attribute_profiles" ColCDRs = "cdrs" colCpp = "charger_profiles" + colDpp = "dispatcher_profiles" ) var ( @@ -638,6 +639,8 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er result, err = ms.getField2(sctx, colAttr, utils.AttributeProfilePrefix, subject, tntID) case utils.ChargerProfilePrefix: result, err = ms.getField2(sctx, colCpp, utils.ChargerProfilePrefix, subject, tntID) + case utils.DispatcherProfilePrefix: + result, err = ms.getField2(sctx, colDpp, utils.DispatcherProfilePrefix, subject, tntID) default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } @@ -680,6 +683,8 @@ func (ms *MongoStorage) HasDataDrv(category, subject, tenant string) (has bool, count, err = ms.getCol(colAttr).Count(sctx, bson.M{"tenant": tenant, "id": subject}) case utils.ChargerProfilePrefix: count, err = ms.getCol(colCpp).Count(sctx, bson.M{"tenant": tenant, "id": subject}) + case utils.DispatcherProfilePrefix: + count, err = ms.getCol(colDpp).Count(sctx, bson.M{"tenant": tenant, "id": subject}) default: err = fmt.Errorf("unsupported category in HasData: %s", category) } @@ -2453,3 +2458,39 @@ func (ms *MongoStorage) RemoveChargerProfileDrv(tenant, id string) (err error) { return err }) } + +func (ms *MongoStorage) GetDispatcherProfileDrv(tenant, id string) (r *DispatcherProfile, err error) { + r = new(DispatcherProfile) + err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) { + cur := ms.getCol(colDpp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + if err := cur.Decode(r); err != nil { + r = nil + if err == mongo.ErrNoDocuments { + return utils.ErrNotFound + } + return err + } + return nil + }) + return +} + +func (ms *MongoStorage) SetDispatcherProfileDrv(r *DispatcherProfile) (err error) { + return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) { + _, err = ms.getCol(colDpp).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, + bson.M{"$set": r}, + options.Update().SetUpsert(true), + ) + return err + }) +} + +func (ms *MongoStorage) RemoveDispatcherProfileDrv(tenant, id string) (err error) { + return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) { + dr, err := ms.getCol(colDpp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) + if dr.DeletedCount == 0 { + return utils.ErrNotFound + } + return err + }) +} diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index 3b9bc36c1..2d9a6664b 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -1617,6 +1617,54 @@ func (ms *MongoStorage) SetTPChargers(tpCPP []*utils.TPChargerProfile) (err erro }) } +func (ms *MongoStorage) GetTPDispatchers(tpid, tenant, id string) ([]*utils.TPDispatcherProfile, error) { + filter := bson.M{"tpid": tpid} + if id != "" { + filter["id"] = id + } + if tenant != "" { + filter["tenant"] = tenant + } + var results []*utils.TPDispatcherProfile + err := ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) { + cur, err := ms.getCol(utils.TBLTPDispatchers).Find(sctx, filter) + if err != nil { + return err + } + for cur.Next(sctx) { + var tp utils.TPDispatcherProfile + err := cur.Decode(&tp) + if err != nil { + return err + } + results = append(results, &tp) + } + if len(results) == 0 { + return utils.ErrNotFound + } + return cur.Close(sctx) + }) + return results, err +} + +func (ms *MongoStorage) SetTPDispatchers(tpDPPs []*utils.TPDispatcherProfile) (err error) { + if len(tpDPPs) == 0 { + return + } + return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) { + for _, tp := range tpDPPs { + _, err = ms.getCol(utils.TBLTPDispatchers).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, + bson.M{"$set": tp}, + options.Update().SetUpsert(true), + ) + if err != nil { + return err + } + } + return nil + }) +} + func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) { fop := options.FindOne() if itm != "" { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index c4aff7ae3..1f7a5c5b3 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -355,7 +355,6 @@ func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) { return keys, nil } return nil, nil - } // Used to check if specific subject is stored using prefix key attached to entity @@ -367,7 +366,8 @@ func (rs *RedisStorage) HasDataDrv(category, subject, tenant string) (bool, erro return i == 1, err case utils.ResourcesPrefix, utils.ResourceProfilesPrefix, utils.StatQueuePrefix, utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix, - utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix, utils.ChargerProfilePrefix: + utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix, + utils.ChargerProfilePrefix, utils.DispatcherProfilePrefix: i, err := rs.Cmd("EXISTS", category+utils.ConcatenatedKey(tenant, subject)).Int() return i == 1, err } @@ -1768,6 +1768,37 @@ func (rs *RedisStorage) RemoveChargerProfileDrv(tenant, id string) (err error) { return } +func (rs *RedisStorage) GetDispatcherProfileDrv(tenant, id string) (r *DispatcherProfile, err error) { + key := utils.DispatcherProfilePrefix + utils.ConcatenatedKey(tenant, id) + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err == redis.ErrRespNil { // did not find the destination + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &r); err != nil { + return + } + return +} + +func (rs *RedisStorage) SetDispatcherProfileDrv(r *DispatcherProfile) (err error) { + result, err := rs.ms.Marshal(r) + if err != nil { + return err + } + return rs.Cmd("SET", utils.DispatcherProfilePrefix+utils.ConcatenatedKey(r.Tenant, r.ID), result).Err +} + +func (rs *RedisStorage) RemoveDispatcherProfileDrv(tenant, id string) (err error) { + key := utils.DispatcherProfilePrefix + utils.ConcatenatedKey(tenant, id) + if err = rs.Cmd("DEL", key).Err; err != nil { + return + } + return +} + func (rs *RedisStorage) GetStorageType() string { return utils.REDIS } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index f19d5cf06..8e6b4ab62 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -127,7 +127,7 @@ func (self *SQLStorage) GetTpIds(colName string) ([]string, error) { qryStr := fmt.Sprintf(" (SELECT tpid FROM %s)", colName) if colName == "" { qryStr = fmt.Sprintf( - "(SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s)", + "(SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s)", utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPRates, @@ -148,7 +148,8 @@ func (self *SQLStorage) GetTpIds(colName string) ([]string, error) { utils.TBLTPActionPlans, utils.TBLTPSuppliers, utils.TBLTPAttributes, - utils.TBLTPChargers) + utils.TBLTPChargers, + utils.TBLTPDispatchers) } rows, err = self.Db.Query(qryStr) if err != nil { @@ -239,7 +240,8 @@ func (self *SQLStorage) RemTpData(table, tpid string, args map[string]string) er utils.TBLTPSharedGroups, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPAccountActions, utils.TBLTPDerivedChargers, utils.TBLTPAliases, utils.TBLTPUsers, utils.TBLTPResources, - utils.TBLTPStats, utils.TBLTPFilters, utils.TBLTPSuppliers, utils.TBLTPAttributes, utils.TBLTPChargers} { + utils.TBLTPStats, utils.TBLTPFilters, utils.TBLTPSuppliers, utils.TBLTPAttributes, + utils.TBLTPChargers, utils.TBLTPDispatchers} { if err := tx.Table(tblName).Where("tpid = ?", tpid).Delete(nil).Error; err != nil { tx.Rollback() return err @@ -709,6 +711,28 @@ func (self *SQLStorage) SetTPChargers(tpCPPs []*utils.TPChargerProfile) error { return nil } +func (self *SQLStorage) SetTPDispatchers(tpDPPs []*utils.TPDispatcherProfile) error { + if len(tpDPPs) == 0 { + return nil + } + tx := self.db.Begin() + for _, dpp := range tpDPPs { + // Remove previous + if err := tx.Where(&TPDispatcher{Tpid: dpp.TPid, ID: dpp.ID}).Delete(TPDispatcher{}).Error; err != nil { + tx.Rollback() + return err + } + for _, mst := range APItoModelTPDispatcher(dpp) { + if err := tx.Save(&mst).Error; err != nil { + tx.Rollback() + return err + } + } + } + tx.Commit() + return nil +} + func (self *SQLStorage) SetSMCost(smc *SMCost) error { if smc.CostDetails == nil { return nil @@ -1607,6 +1631,25 @@ func (self *SQLStorage) GetTPChargers(tpid, tenant, id string) ([]*utils.TPCharg return arls, nil } +func (self *SQLStorage) GetTPDispatchers(tpid, tenant, id string) ([]*utils.TPDispatcherProfile, error) { + var dpps TPDispatchers + q := self.db.Where("tpid = ?", tpid) + if len(id) != 0 { + q = q.Where("id = ?", id) + } + if len(tenant) != 0 { + q = q.Where("tenant = ?", tenant) + } + if err := q.Find(&dpps).Error; err != nil { + return nil, err + } + arls := dpps.AsTPDispatchers() + if len(arls) == 0 { + return arls, utils.ErrNotFound + } + return arls, nil +} + // GetVersions returns slice of all versions or a specific version if tag is specified func (self *SQLStorage) GetVersions(itm string) (vrs Versions, err error) { q := self.db.Model(&TBLVersion{}) diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 2aa9fa612..7bbae87d6 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -30,39 +30,41 @@ import ( ) type TpReader struct { - tpid string - timezone string - dm *DataManager - lr LoadReader - actions map[string][]*Action - actionPlans map[string]*ActionPlan - actionsTriggers map[string]ActionTriggers - accountActions map[string]*Account - dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed - dirtyAccAliases []*TenantAccount // used to clean aliases that might have changed - destinations map[string]*Destination - timings map[string]*utils.TPTiming - rates map[string]*utils.TPRate - destinationRates map[string]*utils.TPDestinationRate - ratingPlans map[string]*RatingPlan - ratingProfiles map[string]*RatingProfile - sharedGroups map[string]*SharedGroup - derivedChargers map[string]*utils.DerivedChargers - users map[string]*UserProfile - aliases map[string]*Alias - resProfiles map[utils.TenantID]*utils.TPResource - sqProfiles map[utils.TenantID]*utils.TPStats - thProfiles map[utils.TenantID]*utils.TPThreshold - filters map[utils.TenantID]*utils.TPFilterProfile - sppProfiles map[utils.TenantID]*utils.TPSupplierProfile - attributeProfiles map[utils.TenantID]*utils.TPAttributeProfile - chargerProfiles map[utils.TenantID]*utils.TPChargerProfile - resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles - statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles - thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles - suppliers []*utils.TenantID // IDs of suppliers which need creation based on sppProfiles - attrTntID []*utils.TenantID // IDs of suppliers which need creation based on attributeProfiles - chargers []*utils.TenantID // IDs of chargers which need creation based on chargerProfiles + tpid string + timezone string + dm *DataManager + lr LoadReader + actions map[string][]*Action + actionPlans map[string]*ActionPlan + actionsTriggers map[string]ActionTriggers + accountActions map[string]*Account + dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed + dirtyAccAliases []*TenantAccount // used to clean aliases that might have changed + destinations map[string]*Destination + timings map[string]*utils.TPTiming + rates map[string]*utils.TPRate + destinationRates map[string]*utils.TPDestinationRate + ratingPlans map[string]*RatingPlan + ratingProfiles map[string]*RatingProfile + sharedGroups map[string]*SharedGroup + derivedChargers map[string]*utils.DerivedChargers + users map[string]*UserProfile + aliases map[string]*Alias + resProfiles map[utils.TenantID]*utils.TPResource + sqProfiles map[utils.TenantID]*utils.TPStats + thProfiles map[utils.TenantID]*utils.TPThreshold + filters map[utils.TenantID]*utils.TPFilterProfile + sppProfiles map[utils.TenantID]*utils.TPSupplierProfile + attributeProfiles map[utils.TenantID]*utils.TPAttributeProfile + chargerProfiles map[utils.TenantID]*utils.TPChargerProfile + dispatcherProfiles map[utils.TenantID]*utils.TPDispatcherProfile + resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles + statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles + thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles + suppliers []*utils.TenantID // IDs of suppliers which need creation based on sppProfiles + attrTntID []*utils.TenantID // IDs of suppliers which need creation based on attributeProfiles + chargers []*utils.TenantID // IDs of chargers which need creation based on chargerProfiles + dpps []*utils.TenantID // IDs of chargers which need creation based on dispatcherProfiles revDests, revAliases, acntActionPlans map[string][]string @@ -137,6 +139,7 @@ func (tpr *TpReader) Init() { tpr.sppProfiles = make(map[utils.TenantID]*utils.TPSupplierProfile) tpr.attributeProfiles = make(map[utils.TenantID]*utils.TPAttributeProfile) tpr.chargerProfiles = make(map[utils.TenantID]*utils.TPChargerProfile) + tpr.dispatcherProfiles = make(map[utils.TenantID]*utils.TPDispatcherProfile) tpr.filters = make(map[utils.TenantID]*utils.TPFilterProfile) tpr.revDests = make(map[string][]string) tpr.revAliases = make(map[string][]string) @@ -1450,6 +1453,30 @@ func (tpr *TpReader) LoadChargerProfiles() error { return tpr.LoadChargerProfilesFiltered("") } +func (tpr *TpReader) LoadDispatcherProfilesFiltered(tag string) (err error) { + rls, err := tpr.lr.GetTPDispatchers(tpr.tpid, "", tag) + if err != nil { + return err + } + mapDispatcherProfile := make(map[utils.TenantID]*utils.TPDispatcherProfile) + for _, rl := range rls { + mapDispatcherProfile[utils.TenantID{Tenant: rl.Tenant, ID: rl.ID}] = rl + } + tpr.dispatcherProfiles = mapDispatcherProfile + for tntID := range mapDispatcherProfile { + if has, err := tpr.dm.HasData(utils.DispatcherProfilePrefix, tntID.ID, tntID.Tenant); err != nil { + return err + } else if !has { + tpr.dpps = append(tpr.dpps, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID}) + } + } + return nil +} + +func (tpr *TpReader) LoadDispatcherProfiles() error { + return tpr.LoadDispatcherProfilesFiltered("") +} + func (tpr *TpReader) LoadAll() (err error) { if err = tpr.LoadDestinations(); err != nil && err.Error() != utils.NotFoundCaps { return @@ -1514,6 +1541,9 @@ func (tpr *TpReader) LoadAll() (err error) { if err = tpr.LoadChargerProfiles(); err != nil && err.Error() != utils.NotFoundCaps { return } + if err = tpr.LoadDispatcherProfiles(); err != nil && err.Error() != utils.NotFoundCaps { + return + } return nil } @@ -1879,6 +1909,22 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } + if verbose { + log.Print("DispatcherProfiles:") + } + for _, tpTH := range tpr.dispatcherProfiles { + th, err := APItoDispatcherProfile(tpTH, tpr.timezone) + if err != nil { + return err + } + if err = tpr.dm.SetDispatcherProfile(th, true); err != nil { + return err + } + if verbose { + log.Print("\t", th.TenantID()) + } + } + if verbose { log.Print("Timings:") } @@ -1986,6 +2032,8 @@ func (tpr *TpReader) ShowStatistics() { log.Print("AttributeProfiles: ", len(tpr.attributeProfiles)) // Charger profiles log.Print("ChargerProfiles: ", len(tpr.chargerProfiles)) + // Dispatcher profiles + log.Print("DispatcherProfiles: ", len(tpr.dispatcherProfiles)) } // Returns the identities loaded for a specific category, useful for cache reloads @@ -2151,6 +2199,14 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil + case utils.DispatcherProfilePrefix: + keys := make([]string, len(tpr.dispatcherProfiles)) + i := 0 + for k := range tpr.dispatcherProfiles { + keys[i] = k.TenantID() + i++ + } + return keys, nil } return nil, errors.New("Unsupported load category") } @@ -2418,6 +2474,18 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro } } + if verbose { + log.Print("DispatcherProfiles:") + } + for _, tpTH := range tpr.dispatcherProfiles { + if err = tpr.dm.RemoveDispatcherProfile(tpTH.Tenant, tpTH.ID, utils.NonTransactional, false); err != nil { + return err + } + if verbose { + log.Print("\t", tpTH.Tenant) + } + } + if verbose { log.Print("Timings:") } diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index 96e1507c3..0ca747dd2 100644 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -62,6 +62,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{ utils.SuppliersCsv: (*TPCSVImporter).importSuppliers, utils.AttributesCsv: (*TPCSVImporter).importAttributeProfiles, utils.ChargersCsv: (*TPCSVImporter).importChargerProfiles, + utils.DispatchersCsv: (*TPCSVImporter).importDispatcherProfiles, } func (self *TPCSVImporter) Run() error { @@ -87,6 +88,7 @@ func (self *TPCSVImporter) Run() error { path.Join(self.DirPath, utils.SuppliersCsv), path.Join(self.DirPath, utils.AttributesCsv), path.Join(self.DirPath, utils.ChargersCsv), + path.Join(self.DirPath, utils.DispatchersCsv), ) files, _ := ioutil.ReadDir(self.DirPath) for _, f := range files { @@ -402,3 +404,14 @@ func (self *TPCSVImporter) importChargerProfiles(fn string) error { } return self.StorDb.SetTPChargers(rls) } + +func (self *TPCSVImporter) importDispatcherProfiles(fn string) error { + if self.Verbose { + log.Printf("Processing file: <%s> ", fn) + } + dpps, err := self.csvr.GetTPDispatchers(self.TPid, "", "") + if err != nil { + return err + } + return self.StorDb.SetTPDispatchers(dpps) +} diff --git a/engine/version.go b/engine/version.go index efa02efb0..7b2388909 100644 --- a/engine/version.go +++ b/engine/version.go @@ -158,6 +158,7 @@ func CurrentDataDBVersions() Versions { utils.RatingPlan: 1, utils.RatingProfile: 1, utils.Chargers: 1, + utils.Dispatchers: 1, } } @@ -190,6 +191,7 @@ func CurrentStorDBVersions() Versions { utils.TpRatingPlan: 1, utils.TpRatingProfile: 1, utils.TpChargers: 1, + utils.TpDispatchers: 1, } } diff --git a/general_tests/acntacts_test.go b/general_tests/acntacts_test.go index cf8cedd0c..33d2bb633 100644 --- a/general_tests/acntacts_test.go +++ b/general_tests/acntacts_test.go @@ -60,7 +60,7 @@ ENABLE_ACNT,*enable_account,,,,,,,,,,,,,,false,false,10` csvr := engine.NewTpReader(dbAcntActs.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions, derivedCharges, - users, aliases, resLimits, stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles), "", "") + users, aliases, resLimits, stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles, ``), "", "") if err := csvr.LoadAll(); err != nil { t.Fatal(err) } @@ -69,7 +69,7 @@ ENABLE_ACNT,*enable_account,,,,,,,,,,,,,,false,false,10` engine.Cache.Clear(nil) dbAcntActs.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil, nil, nil, nil) + nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) expectAcnt := &engine.Account{ID: "cgrates.org:1"} if acnt, err := dbAcntActs.DataDB().GetAccount("cgrates.org:1"); err != nil { diff --git a/general_tests/auth_test.go b/general_tests/auth_test.go index 50f47aaa7..7c0900380 100644 --- a/general_tests/auth_test.go +++ b/general_tests/auth_test.go @@ -67,7 +67,7 @@ RP_ANY,DR_ANY_1CNT,*any,10` chargerProfiles := `` csvr := engine.NewTpReader(dbAuth.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions, - derivedCharges, users, aliases, resLimits, stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles), "", "") + derivedCharges, users, aliases, resLimits, stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles, ``), "", "") if err := csvr.LoadAll(); err != nil { t.Fatal(err) } @@ -81,7 +81,7 @@ RP_ANY,DR_ANY_1CNT,*any,10` engine.Cache.Clear(nil) dbAuth.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil, nil, nil) + nil, nil, nil, nil, nil, nil, nil, nil, nil) if cachedDests := len(engine.Cache.GetItemIDs(utils.CacheDestinations, "")); cachedDests != 0 { t.Error("Wrong number of cached destinations found", cachedDests) diff --git a/general_tests/costs1_test.go b/general_tests/costs1_test.go index 78fd8474a..11ed9f6bd 100644 --- a/general_tests/costs1_test.go +++ b/general_tests/costs1_test.go @@ -51,7 +51,7 @@ RP_SMS1,DR_SMS_1,ALWAYS,10` *out,cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1,, *out,cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,,` csvr := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', dests, timings, rates, destinationRates, ratingPlans, ratingProfiles, - "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "") + "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "") if err := csvr.LoadTimings(); err != nil { t.Fatal(err) @@ -75,7 +75,7 @@ RP_SMS1,DR_SMS_1,ALWAYS,10` engine.Cache.Clear(nil) dataDB.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil) + nil, nil, nil, nil, nil, nil, nil) if cachedRPlans := len(engine.Cache.GetItemIDs(utils.CacheRatingPlans, "")); cachedRPlans != 3 { t.Error("Wrong number of cached rating plans found", cachedRPlans) diff --git a/general_tests/datachrg1_test.go b/general_tests/datachrg1_test.go index 56b4a852b..03df337c0 100644 --- a/general_tests/datachrg1_test.go +++ b/general_tests/datachrg1_test.go @@ -42,7 +42,7 @@ DR_DATA_2,*any,RT_DATA_1c,*up,4,0,` RP_DATA1,DR_DATA_2,TM2,10` ratingProfiles := `*out,cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1,,` csvr := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles, - "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "") + "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "") if err := csvr.LoadTimings(); err != nil { t.Fatal(err) } @@ -62,7 +62,7 @@ RP_DATA1,DR_DATA_2,TM2,10` engine.Cache.Clear(nil) dataDB.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) if cachedRPlans := len(engine.Cache.GetItemIDs(utils.CacheRatingPlans, "")); cachedRPlans != 1 { t.Error("Wrong number of cached rating plans found", cachedRPlans) diff --git a/general_tests/ddazmbl1_test.go b/general_tests/ddazmbl1_test.go index 9ca780a63..3680c5e03 100644 --- a/general_tests/ddazmbl1_test.go +++ b/general_tests/ddazmbl1_test.go @@ -69,7 +69,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions, derivedCharges, users, aliases, resLimits, stats, - thresholds, filters, suppliers, aliasProfiles, chargerProfiles), "", "") + thresholds, filters, suppliers, aliasProfiles, chargerProfiles, ``), "", "") if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } @@ -116,7 +116,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` dataDB.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) if cachedDests := len(engine.Cache.GetItemIDs(utils.CacheDestinations, "")); cachedDests != 0 { t.Error("Wrong number of cached destinations found", cachedDests) diff --git a/general_tests/ddazmbl2_test.go b/general_tests/ddazmbl2_test.go index a0d6c0af4..e86033d12 100644 --- a/general_tests/ddazmbl2_test.go +++ b/general_tests/ddazmbl2_test.go @@ -67,7 +67,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` csvr := engine.NewTpReader(dataDB2.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions, derivedCharges, users, aliases, resLimits, - stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles), "", "") + stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles, ``), "", "") if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } @@ -113,7 +113,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` engine.Cache.Clear(nil) dataDB2.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil, nil, nil) + nil, nil, nil, nil, nil, nil, nil, nil, nil) if cachedDests := len(engine.Cache.GetItemIDs(utils.CacheDestinations, "")); cachedDests != 0 { t.Error("Wrong number of cached destinations found", cachedDests) diff --git a/general_tests/ddazmbl3_test.go b/general_tests/ddazmbl3_test.go index 775e78ea5..a8b7ce900 100644 --- a/general_tests/ddazmbl3_test.go +++ b/general_tests/ddazmbl3_test.go @@ -65,7 +65,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10` csvr := engine.NewTpReader(dataDB3.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions, derivedCharges, users, aliases, resLimits, stats, - thresholds, filters, suppliers, aliasProfiles, chargerProfiles), "", "") + thresholds, filters, suppliers, aliasProfiles, chargerProfiles, ``), "", "") if err := csvr.LoadDestinations(); err != nil { t.Fatal(err) } @@ -111,7 +111,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10` engine.Cache.Clear(nil) dataDB3.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) if cachedDests := len(engine.Cache.GetItemIDs(utils.CacheDestinations, "")); cachedDests != 0 { t.Error("Wrong number of cached destinations found", cachedDests) diff --git a/general_tests/smschrg1_test.go b/general_tests/smschrg1_test.go index e7efd8067..b22824ad2 100644 --- a/general_tests/smschrg1_test.go +++ b/general_tests/smschrg1_test.go @@ -40,7 +40,7 @@ func TestSMSLoadCsvTpSmsChrg1(t *testing.T) { ratingPlans := `RP_SMS1,DR_SMS_1,ALWAYS,10` ratingProfiles := `*out,cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,,` csvr := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles, - "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "") + "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "") if err := csvr.LoadTimings(); err != nil { t.Fatal(err) } @@ -60,7 +60,7 @@ func TestSMSLoadCsvTpSmsChrg1(t *testing.T) { engine.Cache.Clear(nil) dataDB.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil, nil, nil) + nil, nil, nil, nil, nil, nil, nil, nil, nil) if cachedRPlans := len(engine.Cache.GetItemIDs(utils.CacheRatingPlans, "")); cachedRPlans != 1 { t.Error("Wrong number of cached rating plans found", cachedRPlans) diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 55c30f479..61e372dda 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -628,6 +628,7 @@ type ArgsCache struct { SupplierProfileIDs *[]string AttributeProfileIDs *[]string ChargerProfileIDs *[]string + DispatcherProfileIDs *[]string } // Data used to do remote cache reloads via api @@ -670,6 +671,7 @@ type CacheStats struct { SupplierProfiles int AttributeProfiles int ChargerProfiles int + DispatcherProfiles int } type AttrExpFileCdrs struct { @@ -1396,3 +1398,14 @@ type TPTntID struct { Tenant string ID string } + +type TPDispatcherProfile struct { + TPid string + Tenant string + ID string + FilterIDs []string + ActivationInterval *TPActivationInterval // Time when this limit becomes active and expires + Strategy string + Hosts []string + Weight float64 +} diff --git a/utils/consts.go b/utils/consts.go index 1bb2fabc3..e3f50d3de 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -19,10 +19,13 @@ along with this program. If not, see package utils var ( - CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} - PrimaryCdrFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category, Account, Subject, Destination, SetupTime, AnswerTime, Usage, + CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap, + MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} + PrimaryCdrFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category, + Account, Subject, Destination, SetupTime, AnswerTime, Usage, COST, RATED, Partial, RunID} - NotExtraCDRFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category, Account, Subject, Destination, SetupTime, AnswerTime, Usage, + NotExtraCDRFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category, + Account, Subject, Destination, SetupTime, AnswerTime, Usage, COST, RATED, Partial, RunID, PreRated, CostSource} GitLastLog string // If set, it will be processed as part of versioning PosterTransportContentTypes = map[string]string{ @@ -43,45 +46,48 @@ var ( MetaFileFWV: FWVSuffix, } CacheInstanceToPrefix = map[string]string{ - CacheDestinations: DESTINATION_PREFIX, - CacheReverseDestinations: REVERSE_DESTINATION_PREFIX, - CacheRatingPlans: RATING_PLAN_PREFIX, - CacheRatingProfiles: RATING_PROFILE_PREFIX, - CacheActions: ACTION_PREFIX, - CacheActionPlans: ACTION_PLAN_PREFIX, - CacheAccountActionPlans: AccountActionPlansPrefix, - CacheActionTriggers: ACTION_TRIGGER_PREFIX, - CacheSharedGroups: SHARED_GROUP_PREFIX, - CacheAliases: ALIASES_PREFIX, - CacheReverseAliases: REVERSE_ALIASES_PREFIX, - CacheDerivedChargers: DERIVEDCHARGERS_PREFIX, - CacheResourceProfiles: ResourceProfilesPrefix, - CacheResources: ResourcesPrefix, - CacheEventResources: EventResourcesPrefix, - CacheTimings: TimingsPrefix, - CacheStatQueueProfiles: StatQueueProfilePrefix, - CacheStatQueues: StatQueuePrefix, - CacheThresholdProfiles: ThresholdProfilePrefix, - CacheThresholds: ThresholdPrefix, - CacheFilters: FilterPrefix, - CacheSupplierProfiles: SupplierProfilePrefix, - CacheAttributeProfiles: AttributeProfilePrefix, - CacheChargerProfiles: ChargerProfilePrefix, - CacheResourceFilterIndexes: ResourceFilterIndexes, - CacheStatFilterIndexes: StatFilterIndexes, - CacheThresholdFilterIndexes: ThresholdFilterIndexes, - CacheSupplierFilterIndexes: SupplierFilterIndexes, - CacheAttributeFilterIndexes: AttributeFilterIndexes, - CacheChargerFilterIndexes: ChargerFilterIndexes, + CacheDestinations: DESTINATION_PREFIX, + CacheReverseDestinations: REVERSE_DESTINATION_PREFIX, + CacheRatingPlans: RATING_PLAN_PREFIX, + CacheRatingProfiles: RATING_PROFILE_PREFIX, + CacheActions: ACTION_PREFIX, + CacheActionPlans: ACTION_PLAN_PREFIX, + CacheAccountActionPlans: AccountActionPlansPrefix, + CacheActionTriggers: ACTION_TRIGGER_PREFIX, + CacheSharedGroups: SHARED_GROUP_PREFIX, + CacheAliases: ALIASES_PREFIX, + CacheReverseAliases: REVERSE_ALIASES_PREFIX, + CacheDerivedChargers: DERIVEDCHARGERS_PREFIX, + CacheResourceProfiles: ResourceProfilesPrefix, + CacheResources: ResourcesPrefix, + CacheEventResources: EventResourcesPrefix, + CacheTimings: TimingsPrefix, + CacheStatQueueProfiles: StatQueueProfilePrefix, + CacheStatQueues: StatQueuePrefix, + CacheThresholdProfiles: ThresholdProfilePrefix, + CacheThresholds: ThresholdPrefix, + CacheFilters: FilterPrefix, + CacheSupplierProfiles: SupplierProfilePrefix, + CacheAttributeProfiles: AttributeProfilePrefix, + CacheChargerProfiles: ChargerProfilePrefix, + CacheDispatcherProfiles: DispatcherProfilePrefix, + CacheResourceFilterIndexes: ResourceFilterIndexes, + CacheStatFilterIndexes: StatFilterIndexes, + CacheThresholdFilterIndexes: ThresholdFilterIndexes, + CacheSupplierFilterIndexes: SupplierFilterIndexes, + CacheAttributeFilterIndexes: AttributeFilterIndexes, + CacheChargerFilterIndexes: ChargerFilterIndexes, + CacheDispatcherFilterIndexes: DispatcherFilterIndexes, } CachePrefixToInstance map[string]string // will be built on init PrefixToIndexCache = map[string]string{ - ThresholdProfilePrefix: CacheThresholdFilterIndexes, - ResourceProfilesPrefix: CacheResourceFilterIndexes, - StatQueueProfilePrefix: CacheStatFilterIndexes, - SupplierProfilePrefix: CacheSupplierFilterIndexes, - AttributeProfilePrefix: CacheAttributeFilterIndexes, - ChargerProfilePrefix: CacheChargerFilterIndexes, + ThresholdProfilePrefix: CacheThresholdFilterIndexes, + ResourceProfilesPrefix: CacheResourceFilterIndexes, + StatQueueProfilePrefix: CacheStatFilterIndexes, + SupplierProfilePrefix: CacheSupplierFilterIndexes, + AttributeProfilePrefix: CacheAttributeFilterIndexes, + ChargerProfilePrefix: CacheChargerFilterIndexes, + DispatcherProfilePrefix: CacheDispatcherFilterIndexes, } CacheIndexesToPrefix map[string]string // will be built on init ) @@ -245,6 +251,7 @@ const ( SupplierProfilePrefix = "spp_" AttributeProfilePrefix = "alp_" ChargerProfilePrefix = "cpp_" + DispatcherProfilePrefix = "dpp_" ThresholdProfilePrefix = "thp_" StatQueuePrefix = "stq_" LOADINST_KEY = "load_history" @@ -412,6 +419,7 @@ const ( Suppliers = "Suppliers" Attributes = "Attributes" Chargers = "Chargers" + Dispatchers = "Dispatchers" StatS = "Stats" RALService = "RALs" CostSource = "CostSource" @@ -659,6 +667,7 @@ const ( TpRatingPlan = "TpRatingPlan" TpRatingProfile = "TpRatingProfile" TpChargers = "TpChargers" + TpDispatchers = "TpDispatchers" ) // Dispatcher Const @@ -858,6 +867,7 @@ const ( SuppliersCsv = "Suppliers.csv" AttributesCsv = "Attributes.csv" ChargersCsv = "Chargers.csv" + DispatchersCsv = "Dispatchers.csv" ) // Table Name @@ -887,53 +897,57 @@ const ( TBLTPChargers = "tp_chargers" TBLVersions = "versions" OldSMCosts = "sm_costs" + TBLTPDispatchers = "tp_dispatchers" ) // Cache Name const ( - CacheDestinations = "destinations" - CacheReverseDestinations = "reverse_destinations" - CacheRatingPlans = "rating_plans" - CacheRatingProfiles = "rating_profiles" - CacheActions = "actions" - CacheActionPlans = "action_plans" - CacheAccountActionPlans = "account_action_plans" - CacheActionTriggers = "action_triggers" - CacheSharedGroups = "shared_groups" - CacheAliases = "aliases" - CacheReverseAliases = "reverse_aliases" - CacheDerivedChargers = "derived_chargers" - CacheResources = "resources" - CacheResourceProfiles = "resource_profiles" - CacheTimings = "timings" - CacheEventResources = "event_resources" - CacheStatQueueProfiles = "statqueue_profiles" - CacheStatQueues = "statqueues" - CacheThresholdProfiles = "threshold_profiles" - CacheThresholds = "thresholds" - CacheFilters = "filters" - CacheSupplierProfiles = "supplier_profiles" - CacheAttributeProfiles = "attribute_profiles" - CacheChargerProfiles = "charger_profiles" - CacheResourceFilterIndexes = "resource_filter_indexes" - CacheStatFilterIndexes = "stat_filter_indexes" - CacheThresholdFilterIndexes = "threshold_filter_indexes" - CacheSupplierFilterIndexes = "supplier_filter_indexes" - CacheAttributeFilterIndexes = "attribute_filter_indexes" - CacheChargerFilterIndexes = "charger_filter_indexes" - CacheDiameterMessages = "diameter_messages" - MetaPrecaching = "*precaching" - MetaReady = "*ready" + CacheDestinations = "destinations" + CacheReverseDestinations = "reverse_destinations" + CacheRatingPlans = "rating_plans" + CacheRatingProfiles = "rating_profiles" + CacheActions = "actions" + CacheActionPlans = "action_plans" + CacheAccountActionPlans = "account_action_plans" + CacheActionTriggers = "action_triggers" + CacheSharedGroups = "shared_groups" + CacheAliases = "aliases" + CacheReverseAliases = "reverse_aliases" + CacheDerivedChargers = "derived_chargers" + CacheResources = "resources" + CacheResourceProfiles = "resource_profiles" + CacheTimings = "timings" + CacheEventResources = "event_resources" + CacheStatQueueProfiles = "statqueue_profiles" + CacheStatQueues = "statqueues" + CacheThresholdProfiles = "threshold_profiles" + CacheThresholds = "thresholds" + CacheFilters = "filters" + CacheSupplierProfiles = "supplier_profiles" + CacheAttributeProfiles = "attribute_profiles" + CacheChargerProfiles = "charger_profiles" + CacheDispatcherProfiles = "dispatcher_profiles" + CacheResourceFilterIndexes = "resource_filter_indexes" + CacheStatFilterIndexes = "stat_filter_indexes" + CacheThresholdFilterIndexes = "threshold_filter_indexes" + CacheSupplierFilterIndexes = "supplier_filter_indexes" + CacheAttributeFilterIndexes = "attribute_filter_indexes" + CacheChargerFilterIndexes = "charger_filter_indexes" + CacheDispatcherFilterIndexes = "dispatcher_filter_indexes" + CacheDiameterMessages = "diameter_messages" + MetaPrecaching = "*precaching" + MetaReady = "*ready" ) // Prefix for indexing const ( - ResourceFilterIndexes = "rfi_" - StatFilterIndexes = "sfi_" - ThresholdFilterIndexes = "tfi_" - SupplierFilterIndexes = "spi_" - AttributeFilterIndexes = "afi_" - ChargerFilterIndexes = "cfi_" + ResourceFilterIndexes = "rfi_" + StatFilterIndexes = "sfi_" + ThresholdFilterIndexes = "tfi_" + SupplierFilterIndexes = "spi_" + AttributeFilterIndexes = "afi_" + ChargerFilterIndexes = "cfi_" + DispatcherFilterIndexes = "dfi_" ) // Agents