diff --git a/apier/v1/apier.go b/apier/v1/apier.go
index 1db8a1b0b..78f75a67b 100644
--- a/apier/v1/apier.go
+++ b/apier/v1/apier.go
@@ -1016,7 +1016,7 @@ func (self *ApierV1) LoadCache(args utils.AttrReloadCache, reply *string) (err e
if args.FlushAll {
engine.Cache.Clear(nil)
}
- var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, splpIDs, alsPrfIDs, cppIDs []string
+ var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, splpIDs, alsPrfIDs, cppIDs, dppIDs []string
if args.DestinationIDs == nil {
dstIDs = nil
} else {
@@ -1127,10 +1127,15 @@ func (self *ApierV1) LoadCache(args utils.AttrReloadCache, reply *string) (err e
} else {
cppIDs = *args.ChargerProfileIDs
}
+ if args.DispatcherProfileIDs == nil {
+ dppIDs = nil
+ } else {
+ dppIDs = *args.DispatcherProfileIDs
+ }
if err := self.DataManager.LoadDataDBCache(dstIDs, rvDstIDs, rplIDs,
rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs,
rvAlsIDs, rspIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs,
- fltrIDs, splpIDs, alsPrfIDs, cppIDs); err != nil {
+ fltrIDs, splpIDs, alsPrfIDs, cppIDs, dppIDs); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
@@ -1311,6 +1316,14 @@ func (self *ApierV1) FlushCache(args utils.AttrReloadCache, reply *string) (err
true, utils.NonTransactional)
}
}
+ if args.DispatcherProfileIDs == nil {
+ engine.Cache.Clear([]string{utils.CacheDispatcherProfiles})
+ } else if len(*args.DispatcherProfileIDs) != 0 {
+ for _, key := range *args.DispatcherProfileIDs {
+ engine.Cache.Remove(utils.CacheDispatcherProfiles, key,
+ true, utils.NonTransactional)
+ }
+ }
*reply = utils.OK
return
@@ -1339,6 +1352,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach
cs.SupplierProfiles = len(engine.Cache.GetItemIDs(utils.CacheSupplierProfiles, ""))
cs.AttributeProfiles = len(engine.Cache.GetItemIDs(utils.CacheAttributeProfiles, ""))
cs.ChargerProfiles = len(engine.Cache.GetItemIDs(utils.CacheChargerProfiles, ""))
+ cs.DispatcherProfiles = len(engine.Cache.GetItemIDs(utils.CacheDispatcherProfiles, ""))
if self.Users != nil {
var ups engine.UserProfiles
@@ -1761,6 +1775,25 @@ func (v1 *ApierV1) GetCacheKeys(args utils.ArgsCacheKeys, reply *utils.ArgsCache
}
}
+ if args.DispatcherProfileIDs != nil {
+ var ids []string
+ if len(*args.DispatcherProfileIDs) != 0 {
+ for _, id := range *args.DispatcherProfileIDs {
+ if _, hasIt := engine.Cache.Get(utils.CacheDispatcherProfiles, id); hasIt {
+ ids = append(ids, id)
+ }
+ }
+ } else {
+ for _, id := range engine.Cache.GetItemIDs(utils.CacheDispatcherProfiles, "") {
+ ids = append(ids, id)
+ }
+ }
+ ids = args.Paginator.PaginateStringSlice(ids)
+ if len(ids) != 0 {
+ reply.ChargerProfileIDs = &ids
+ }
+ }
+
return
}
@@ -1799,6 +1832,7 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
path.Join(attrs.FolderPath, utils.SuppliersCsv),
path.Join(attrs.FolderPath, utils.AttributesCsv),
path.Join(attrs.FolderPath, utils.ChargersCsv),
+ path.Join(attrs.FolderPath, utils.DispatchersCsv),
), "", self.Config.GeneralCfg().DefaultTimezone)
if err := loader.LoadAll(); err != nil {
return utils.NewErrServerError(err)
diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go
index d5adf8fcd..1361dbb07 100755
--- a/apier/v1/dispatcher.go
+++ b/apier/v1/dispatcher.go
@@ -22,8 +22,68 @@ import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
+ "github.com/cgrates/cgrates/utils"
)
+// GetDispatcherProfile returns a Dispatcher Profile
+func (apierV1 *ApierV1) GetDispatcherProfile(arg *utils.TenantID, reply *engine.DispatcherProfile) error {
+ if missing := utils.MissingStructFields(arg, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ if dpp, err := apierV1.DataManager.GetDispatcherProfile(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
+ if err.Error() != utils.ErrNotFound.Error() {
+ err = utils.NewErrServerError(err)
+ }
+ return err
+ } else {
+ *reply = *dpp
+ }
+ return nil
+}
+
+// GetDispatcherProfileIDs returns list of dispatcherProfile IDs registered for a tenant
+func (apierV1 *ApierV1) GetDispatcherProfileIDs(tenant string, dPrfIDs *[]string) error {
+ prfx := utils.DispatcherProfilePrefix + tenant + ":"
+ keys, err := apierV1.DataManager.DataDB().GetKeysForPrefix(prfx)
+ if err != nil {
+ return err
+ }
+ retIDs := make([]string, len(keys))
+ for i, key := range keys {
+ retIDs[i] = key[len(prfx):]
+ }
+ *dPrfIDs = retIDs
+ return nil
+}
+
+//SetDispatcherProfile add/update a new Dispatcher Profile
+func (apierV1 *ApierV1) SetDispatcherProfile(dpp *engine.DispatcherProfile, reply *string) error {
+ if missing := utils.MissingStructFields(dpp, []string{"Tenant", "ID"}); len(missing) != 0 {
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ if err := apierV1.DataManager.SetDispatcherProfile(dpp, true); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ *reply = utils.OK
+ return nil
+}
+
+//RemoveDispatcherProfile remove a specific Dispatcher Profile
+func (apierV1 *ApierV1) RemoveDispatcherProfile(arg *utils.TenantID, reply *string) error {
+ if missing := utils.MissingStructFields(arg, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ if err := apierV1.DataManager.RemoveDispatcherProfile(arg.Tenant,
+ arg.ID, utils.NonTransactional, true); err != nil {
+ if err.Error() != utils.ErrNotFound.Error() {
+ err = utils.NewErrServerError(err)
+ }
+ return err
+ }
+ *reply = utils.OK
+ return nil
+}
+
func NewDispatcherThresholdSv1(dps *dispatchers.DispatcherService) *DispatcherThresholdSv1 {
return &DispatcherThresholdSv1{dS: dps}
}
diff --git a/apier/v1/dispatcher_it_test.go b/apier/v1/dispatcher_it_test.go
new file mode 100644
index 000000000..a9d068c06
--- /dev/null
+++ b/apier/v1/dispatcher_it_test.go
@@ -0,0 +1,121 @@
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package v1
+
+import (
+ "net/rpc"
+ "net/rpc/jsonrpc"
+ "path"
+ "testing"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+)
+
+var (
+ dispatcherCfgPath string
+ dispatcherCfg *config.CGRConfig
+ dispatcherRPC *rpc.Client
+ dispatcherProfile *engine.DispatcherProfile
+ dispatcherConfigDIR string //run tests for specific configuration
+)
+
+var sTestsDispatcher = []func(t *testing.T){
+ testDispatcherSInitCfg,
+ testDispatcherSInitDataDb,
+ testDispatcherSResetStorDb,
+ testDispatcherSStartEngine,
+ testDispatcherSRPCConn,
+ testDispatcherSSetDispatcherProfile,
+ testDispatcherSGetDispatcherProfileIDs,
+ testDispatcherSUpdateDispatcherProfile,
+ testDispatcherSRemDispatcherProfile,
+ testDispatcherSKillEngine,
+}
+
+//Test start here
+func TestDispatcherSITMySql(t *testing.T) {
+ dispatcherConfigDIR = "tutmysql"
+ for _, stest := range sTestsDispatcher {
+ t.Run(dispatcherConfigDIR, stest)
+ }
+}
+
+func TestDispatcherSITMongo(t *testing.T) {
+ dispatcherConfigDIR = "tutmongo"
+ for _, stest := range sTestsDispatcher {
+ t.Run(dispatcherConfigDIR, stest)
+ }
+}
+
+func testDispatcherSInitCfg(t *testing.T) {
+ var err error
+ dispatcherCfgPath = path.Join(*dataDir, "conf", "samples", dispatcherConfigDIR)
+ dispatcherCfg, err = config.NewCGRConfigFromFolder(dispatcherCfgPath)
+ if err != nil {
+ t.Error(err)
+ }
+ dispatcherCfg.DataFolderPath = *dataDir
+ config.SetCgrConfig(dispatcherCfg)
+}
+
+func testDispatcherSInitDataDb(t *testing.T) {
+ if err := engine.InitDataDb(dispatcherCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Wipe out the cdr database
+func testDispatcherSResetStorDb(t *testing.T) {
+ if err := engine.InitStorDb(dispatcherCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Start CGR Engine
+func testDispatcherSStartEngine(t *testing.T) {
+ if _, err := engine.StopStartEngine(dispatcherCfgPath, *waitRater); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Connect rpc client to rater
+func testDispatcherSRPCConn(t *testing.T) {
+ var err error
+ dispatcherRPC, err = jsonrpc.Dial("tcp", dispatcherCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testDispatcherSSetDispatcherProfile(t *testing.T) {}
+
+func testDispatcherSGetDispatcherProfileIDs(t *testing.T) {}
+
+func testDispatcherSUpdateDispatcherProfile(t *testing.T) {}
+
+func testDispatcherSRemDispatcherProfile(t *testing.T) {}
+
+func testDispatcherSKillEngine(t *testing.T) {
+ if err := engine.KillEngine(*waitRater); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/apier/v1/precache_it_test.go b/apier/v1/precache_it_test.go
index 95ab55e40..11f8df36d 100644
--- a/apier/v1/precache_it_test.go
+++ b/apier/v1/precache_it_test.go
@@ -155,6 +155,14 @@ func testPrecacheGetCacheStatsBeforeLoad(t *testing.T) {
Items: 0,
Groups: 0,
},
+ "dispatcher_filter_indexes": {
+ Items: 0,
+ Groups: 0,
+ },
+ "dispatcher_profiles": {
+ Items: 0,
+ Groups: 0,
+ },
"derived_chargers": {
Items: 0,
Groups: 0,
@@ -315,6 +323,14 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) {
Items: 0,
Groups: 0,
},
+ "dispatcher_filter_indexes": {
+ Items: 0,
+ Groups: 0,
+ },
+ "dispatcher_profiles": {
+ Items: 0,
+ Groups: 0,
+ },
"derived_chargers": {
Items: 1, // expected to have 1 item
Groups: 0,
diff --git a/apier/v2/apier.go b/apier/v2/apier.go
index 53121b5f7..4255ac02e 100644
--- a/apier/v2/apier.go
+++ b/apier/v2/apier.go
@@ -27,7 +27,7 @@ import (
"strconv"
"strings"
- "github.com/cgrates/cgrates/apier/v1"
+ v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
@@ -147,6 +147,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
path.Join(attrs.FolderPath, utils.SuppliersCsv),
path.Join(attrs.FolderPath, utils.AttributesCsv),
path.Join(attrs.FolderPath, utils.ChargersCsv),
+ path.Join(attrs.FolderPath, utils.DispatchersCsv),
), "", self.Config.GeneralCfg().DefaultTimezone)
if err := loader.LoadAll(); err != nil {
return utils.NewErrServerError(err)
diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go
index 21a815922..bc037dac2 100755
--- a/cmd/cgr-loader/cgr-loader.go
+++ b/cmd/cgr-loader/cgr-loader.go
@@ -300,6 +300,7 @@ func main() {
path.Join(*dataPath, utils.SuppliersCsv),
path.Join(*dataPath, utils.AttributesCsv),
path.Join(*dataPath, utils.ChargersCsv),
+ path.Join(*dataPath, utils.DispatchersCsv),
)
}
@@ -353,7 +354,8 @@ func main() {
if err := tpReader.WriteToDatabase(*flush, *verbose, *disableReverse); err != nil {
log.Fatal("Could not write to database: ", err)
}
- var dstIds, revDstIDs, rplIds, rpfIds, actIds, aapIDs, shgIds, alsIds, dcsIds, rspIDs, resIDs, aatIDs, ralsIDs, stqIDs, stqpIDs, trsIDs, trspfIDs, flrIDs, spfIDs, apfIDs, chargerIDs []string
+ var dstIds, revDstIDs, rplIds, rpfIds, actIds, aapIDs, shgIds, alsIds, dcsIds, rspIDs, resIDs,
+ aatIDs, ralsIDs, stqIDs, stqpIDs, trsIDs, trspfIDs, flrIDs, spfIDs, apfIDs, chargerIDs, dppIDs []string
if cacheS != nil {
dstIds, _ = tpReader.GetLoadedIds(utils.DESTINATION_PREFIX)
revDstIDs, _ = tpReader.GetLoadedIds(utils.REVERSE_DESTINATION_PREFIX)
@@ -376,6 +378,7 @@ func main() {
spfIDs, _ = tpReader.GetLoadedIds(utils.SupplierProfilePrefix)
apfIDs, _ = tpReader.GetLoadedIds(utils.AttributeProfilePrefix)
chargerIDs, _ = tpReader.GetLoadedIds(utils.ChargerProfilePrefix)
+ dppIDs, _ = tpReader.GetLoadedIds(utils.DispatcherProfilePrefix)
}
aps, _ := tpReader.GetLoadedIds(utils.ACTION_PLAN_PREFIX)
// for users reloading
@@ -416,7 +419,8 @@ func main() {
FilterIDs: &flrIDs,
SupplierProfileIDs: &spfIDs,
AttributeProfileIDs: &apfIDs,
- ChargerProfileIDs: &chargerIDs},
+ ChargerProfileIDs: &chargerIDs,
+ DispatcherProfileIDs: &dppIDs},
FlushAll: *flush,
}, &reply); err != nil {
log.Printf("WARNING: Got error on cache reload: %s\n", err.Error())
@@ -443,6 +447,9 @@ func main() {
if len(chargerIDs) != 0 {
cacheIDs = append(cacheIDs, utils.CacheChargerFilterIndexes)
}
+ if len(dppIDs) != 0 {
+ cacheIDs = append(cacheIDs, utils.CacheDispatcherFilterIndexes)
+ }
if err = cacheS.Call(utils.CacheSv1Clear, cacheIDs, &reply); err != nil {
log.Printf("WARNING: Got error on cache clear: %s\n", err.Error())
}
diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go
index 28c258303..9b5b0023d 100644
--- a/cmd/cgr-tester/cgr-tester.go
+++ b/cmd/cgr-tester/cgr-tester.go
@@ -81,7 +81,7 @@ func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) {
defer dm.DataDB().Close()
engine.SetDataStorage(dm)
if err := dm.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil,
- nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil); err != nil {
+ nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil); err != nil {
return nilDuration, fmt.Errorf("Cache rating error: %s", err.Error())
}
log.Printf("Runnning %d cycles...", *runs)
diff --git a/config/config_defaults.go b/config/config_defaults.go
index a39c33c4a..7ab651924 100755
--- a/config/config_defaults.go
+++ b/config/config_defaults.go
@@ -141,12 +141,14 @@ const CGRATES_CFG_JSON = `
"supplier_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control supplier profile caching
"attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control attribute profile caching
"charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control charger profile caching
+ "dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control dispatcher profile caching
"resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control resource filter indexes caching
"stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control stat filter indexes caching
"threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control threshold filter indexes caching
"supplier_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control supplier filter indexes caching
"attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control attribute filter indexes caching
"charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control charger filter indexes caching
+ "dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher filter indexes caching
"diameter_messages": {"limit": -1, "ttl": "1h", "static_ttl": false}, // diameter messages caching
},
diff --git a/config/config_json_test.go b/config/config_json_test.go
index 77c8e641c..8ba4f71b8 100755
--- a/config/config_json_test.go
+++ b/config/config_json_test.go
@@ -142,6 +142,9 @@ func TestCacheJsonCfg(t *testing.T) {
utils.CacheChargerProfiles: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
+ utils.CacheDispatcherProfiles: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
+ Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
+ Precache: utils.BoolPointer(false)},
utils.CacheResourceFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheStatFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
@@ -154,6 +157,8 @@ func TestCacheJsonCfg(t *testing.T) {
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheChargerFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
+ utils.CacheDispatcherFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
+ Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheDiameterMessages: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("1h"), Static_ttl: utils.BoolPointer(false)},
}
diff --git a/config/config_test.go b/config/config_test.go
index 8800fb87a..37049a869 100755
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -711,6 +711,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheChargerProfiles: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
+ utils.CacheDispatcherProfiles: &CacheParamCfg{Limit: -1,
+ TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheResourceFilterIndexes: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheStatFilterIndexes: &CacheParamCfg{Limit: -1,
@@ -723,6 +725,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheChargerFilterIndexes: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
+ utils.CacheDispatcherFilterIndexes: &CacheParamCfg{Limit: -1,
+ TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDiameterMessages: &CacheParamCfg{Limit: -1,
TTL: time.Duration(1 * time.Hour), StaticTTL: false},
}
diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql
index d09692cf1..1e3e93a57 100644
--- a/data/storage/mysql/create_tariffplan_tables.sql
+++ b/data/storage/mysql/create_tariffplan_tables.sql
@@ -506,6 +506,28 @@ CREATE TABLE tp_chargers (
`id`,`filter_ids`,`run_id`,`attribute_ids`)
);
+--
+-- Table structure for table `tp_dispatchers`
+--
+
+DROP TABLE IF EXISTS tp_dispatchers;
+CREATE TABLE tp_dispatchers (
+ `pk` int(11) NOT NULL AUTO_INCREMENT,
+ `tpid` varchar(64) NOT NULL,
+ `tenant` varchar(64) NOT NULL,
+ `id` varchar(64) NOT NULL,
+ `filter_ids` varchar(64) NOT NULL,
+ `activation_interval` varchar(64) NOT NULL,
+ `strategy` varchar(64) NOT NULL,
+ `hosts` varchar(64) NOT NULL,
+ `weight` decimal(8,2) NOT NULL,
+ `created_at` TIMESTAMP,
+ PRIMARY KEY (`pk`),
+ KEY `tpid` (`tpid`),
+ UNIQUE KEY `unique_tp_dispatchers` (`tpid`,`tenant`,
+ `id`,`filter_ids`,`strategy`,`hosts`)
+);
+
--
-- Table structure for table `versions`
--
diff --git a/data/storage/postgres/create_tariffplan_tables.sql b/data/storage/postgres/create_tariffplan_tables.sql
index 1236d88ad..567ba2af1 100644
--- a/data/storage/postgres/create_tariffplan_tables.sql
+++ b/data/storage/postgres/create_tariffplan_tables.sql
@@ -496,6 +496,27 @@ CREATE INDEX tp_suppliers_unique ON tp_suppliers ("tpid", "tenant", "id",
CREATE INDEX tp_chargers_unique ON tp_chargers ("tpid", "tenant", "id",
"filter_ids","run_id","attribute_ids");
+ --
+ -- Table structure for table `tp_chargers`
+ --
+
+ DROP TABLE IF EXISTS tp_dispatchers;
+ CREATE TABLE tp_dispatchers (
+ "pk" SERIAL PRIMARY KEY,
+ "tpid" varchar(64) NOT NULL,
+ "tenant"varchar(64) NOT NULL,
+ "id" varchar(64) NOT NULL,
+ "filter_ids" varchar(64) NOT NULL,
+ "activation_interval" varchar(64) NOT NULL,
+ "strategy" varchar(64) NOT NULL,
+ "hosts" varchar(64) NOT NULL,
+ "weight" decimal(8,2) NOT NULL,
+ "created_at" TIMESTAMP WITH TIME ZONE
+ );
+ CREATE INDEX tp_dispatchers_ids ON tp_dispatchers (tpid);
+ CREATE INDEX tp_dispatchers_unique ON tp_dispatchers ("tpid", "tenant", "id",
+ "filter_ids","strategy","hosts");
+
--
-- Table structure for table `versions`
--
diff --git a/engine/datamanager.go b/engine/datamanager.go
index d04f327ef..c4c4af1a6 100644
--- a/engine/datamanager.go
+++ b/engine/datamanager.go
@@ -46,7 +46,7 @@ func (dm *DataManager) DataDB() DataDB {
func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs,
aaPlIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rpIDs, resIDs,
- stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, splPrflIDs, alsPrfIDs, cppIDs []string) (err error) {
+ stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, splPrflIDs, alsPrfIDs, cppIDs, dppIDs []string) (err error) {
if dm.DataDB().GetStorageType() == utils.MAPSTOR {
if dm.cacheCfg == nil {
return
@@ -58,7 +58,8 @@ func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs,
utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACTION_TRIGGER_PREFIX,
utils.SHARED_GROUP_PREFIX, utils.ALIASES_PREFIX, utils.REVERSE_ALIASES_PREFIX, utils.StatQueuePrefix,
utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix,
- utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix, utils.ChargerProfilePrefix}, k) && cacheCfg.Precache {
+ utils.FilterPrefix, utils.SupplierProfilePrefix,
+ utils.AttributeProfilePrefix, utils.ChargerProfilePrefix, utils.DispatcherProfilePrefix}, k) && cacheCfg.Precache {
if err := dm.PreloadCacheForPrefix(k); err != nil && err != utils.ErrInvalidKey {
return err
}
@@ -89,6 +90,7 @@ func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs,
utils.SupplierProfilePrefix: splPrflIDs,
utils.AttributeProfilePrefix: alsPrfIDs,
utils.ChargerProfilePrefix: cppIDs,
+ utils.DispatcherProfilePrefix: dppIDs,
} {
if err = dm.CacheDataFromDB(key, ids, false); err != nil {
return
@@ -148,7 +150,8 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
utils.FilterPrefix,
utils.SupplierProfilePrefix,
utils.AttributeProfilePrefix,
- utils.ChargerProfilePrefix}, prfx) {
+ utils.ChargerProfilePrefix,
+ utils.DispatcherProfilePrefix}, prfx) {
return utils.NewCGRError(utils.DataManager,
utils.MandatoryIEMissingCaps,
utils.UnsupportedCachePrefix,
@@ -241,6 +244,9 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
case utils.ChargerProfilePrefix:
tntID := utils.NewTenantID(dataID)
_, err = dm.GetChargerProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
+ case utils.DispatcherProfilePrefix:
+ tntID := utils.NewTenantID(dataID)
+ _, err = dm.GetDispatcherProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
}
if err != nil {
return utils.NewCGRError(utils.DataManager,
@@ -1228,7 +1234,7 @@ func (dm *DataManager) SetChargerProfile(cpp *ChargerProfile, withIndex bool) (e
}
}
if needsRemove {
- if err = NewFilterIndexer(dm, utils.SupplierProfilePrefix,
+ if err = NewFilterIndexer(dm, utils.ChargerProfilePrefix,
cpp.Tenant).RemoveItemFromIndex(cpp.Tenant, cpp.ID, oldCpp.FilterIDs); err != nil {
return
}
@@ -1255,3 +1261,77 @@ func (dm *DataManager) RemoveChargerProfile(tenant, id string,
}
return
}
+
+func (dm *DataManager) GetDispatcherProfile(tenant, id string, cacheRead, cacheWrite bool,
+ transactionID string) (dpp *DispatcherProfile, err error) {
+ tntID := utils.ConcatenatedKey(tenant, id)
+ if cacheRead {
+ if x, ok := Cache.Get(utils.CacheDispatcherProfiles, tntID); ok {
+ if x == nil {
+ return nil, utils.ErrNotFound
+ }
+ return x.(*DispatcherProfile), nil
+ }
+ }
+ dpp, err = dm.dataDB.GetDispatcherProfileDrv(tenant, id)
+ if err != nil {
+ if err == utils.ErrNotFound && cacheWrite {
+ Cache.Set(utils.CacheDispatcherProfiles, tntID, nil, nil,
+ cacheCommit(transactionID), transactionID)
+ }
+ return nil, err
+ }
+ if cacheWrite {
+ Cache.Set(utils.CacheDispatcherProfiles, tntID, dpp, nil,
+ cacheCommit(transactionID), transactionID)
+ }
+ return
+}
+
+func (dm *DataManager) SetDispatcherProfile(dpp *DispatcherProfile, withIndex bool) (err error) {
+ oldDpp, err := dm.GetDispatcherProfile(dpp.Tenant, dpp.ID, true, false, utils.NonTransactional)
+ if err != nil && err != utils.ErrNotFound {
+ return err
+ }
+ if err = dm.DataDB().SetDispatcherProfileDrv(dpp); err != nil {
+ return err
+ }
+ if err = dm.CacheDataFromDB(utils.DispatcherProfilePrefix, []string{dpp.TenantID()}, true); err != nil {
+ return
+ }
+ if withIndex {
+ if oldDpp != nil {
+ var needsRemove bool
+ for _, fltrID := range oldDpp.FilterIDs {
+ if !utils.IsSliceMember(dpp.FilterIDs, fltrID) {
+ needsRemove = true
+ }
+ }
+ if needsRemove {
+ if err = NewFilterIndexer(dm, utils.DispatcherProfilePrefix,
+ dpp.Tenant).RemoveItemFromIndex(dpp.Tenant, dpp.ID, oldDpp.FilterIDs); err != nil {
+ return
+ }
+ }
+ }
+ return createAndIndex(utils.DispatcherProfilePrefix, dpp.Tenant, utils.EmptyString, dpp.ID, dpp.FilterIDs, dm)
+ }
+ return
+}
+
+func (dm *DataManager) RemoveDispatcherProfile(tenant, id string,
+ transactionID string, withIndex bool) (err error) {
+ oldDpp, err := dm.GetDispatcherProfile(tenant, id, true, false, utils.NonTransactional)
+ if err != nil && err != utils.ErrNotFound {
+ return err
+ }
+ if err = dm.DataDB().RemoveDispatcherProfileDrv(tenant, id); err != nil {
+ return
+ }
+ Cache.Remove(utils.CacheDispatcherProfiles, utils.ConcatenatedKey(tenant, id),
+ cacheCommit(transactionID), transactionID)
+ if withIndex {
+ return NewFilterIndexer(dm, utils.DispatcherProfilePrefix, tenant).RemoveItemFromIndex(tenant, id, oldDpp.FilterIDs)
+ }
+ return
+}
diff --git a/engine/filterindexer.go b/engine/filterindexer.go
index d7f77de18..88664349a 100644
--- a/engine/filterindexer.go
+++ b/engine/filterindexer.go
@@ -97,6 +97,9 @@ func (rfi *FilterIndexer) cacheRemItemType() { // ToDo: tune here by removing pe
case utils.ChargerProfilePrefix:
Cache.Clear([]string{utils.CacheChargerFilterIndexes})
+
+ case utils.DispatcherProfilePrefix:
+ Cache.Clear([]string{utils.CacheDispatcherFilterIndexes})
}
}
@@ -201,6 +204,17 @@ func (rfi *FilterIndexer) RemoveItemFromIndex(tenant, itemID string, oldFilters
filterIDs[i] = fltrID
}
}
+ case utils.DispatcherProfilePrefix:
+ dpp, err := rfi.dm.GetDispatcherProfile(tenant, itemID, true, false, utils.NonTransactional)
+ if err != nil && err != utils.ErrNotFound {
+ return err
+ }
+ if dpp != nil {
+ filterIDs = make([]string, len(dpp.FilterIDs))
+ for i, fltrID := range dpp.FilterIDs {
+ filterIDs[i] = fltrID
+ }
+ }
default:
}
if len(filterIDs) == 0 {
diff --git a/engine/libdispatcher.go b/engine/libdispatcher.go
new file mode 100644
index 000000000..540e1b2f8
--- /dev/null
+++ b/engine/libdispatcher.go
@@ -0,0 +1,48 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package engine
+
+import (
+ "sort"
+
+ "github.com/cgrates/cgrates/utils"
+)
+
+// DispatcherProfile is the config for one Dispatcher
+type DispatcherProfile struct {
+ Tenant string
+ ID string
+ FilterIDs []string
+ ActivationInterval *utils.ActivationInterval // Activation interval
+ Strategy string
+ Hosts []string // perform data aliasing based on these Attributes
+ Weight float64
+}
+
+func (dP *DispatcherProfile) TenantID() string {
+ return utils.ConcatenatedKey(dP.Tenant, dP.ID)
+}
+
+// ChargerProfiles is a sortable list of Charger profiles
+type DispatcherProfiles []*DispatcherProfile
+
+// Sort is part of sort interface, sort based on Weight
+func (dps DispatcherProfiles) Sort() {
+ sort.Slice(dps, func(i, j int) bool { return dps[i].Weight > dps[j].Weight })
+}
diff --git a/engine/libtest.go b/engine/libtest.go
index 884041064..2bd86d1fa 100644
--- a/engine/libtest.go
+++ b/engine/libtest.go
@@ -146,6 +146,7 @@ func LoadTariffPlanFromFolder(tpPath, timezone string, dm *DataManager, disable_
path.Join(tpPath, utils.SuppliersCsv),
path.Join(tpPath, utils.AttributesCsv),
path.Join(tpPath, utils.ChargersCsv),
+ path.Join(tpPath, utils.DispatchersCsv),
), "", timezone)
if err := loader.LoadAll(); err != nil {
return utils.NewErrServerError(err)
diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go
index 3b6cb7421..16bbe4e0c 100644
--- a/engine/loader_csv_test.go
+++ b/engine/loader_csv_test.go
@@ -294,15 +294,20 @@ cgrates.org,ALS1,con2;con3,,,Field2,Initial2,Sub2,false,,
chargerProfiles = `
#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight
cgrates.org,Charger1,*string:Account:1001,2014-07-29T15:00:00Z,*rated,ATTR_1001_SIMPLEAUTH,20
+`
+ dispatcherProfiles = `
+#Tenant,ID,FilterIDs,ActivationInterval,Strategy,Hosts,Weight
+cgrates.org,D1,*string:Account:1001,2014-07-29T15:00:00Z,*first,192.168.56.203;192.168.56.204,20
`
)
var csvr *TpReader
func init() {
- csvr = NewTpReader(dm.dataDB, NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles,
- sharedGroups, actions, actionPlans, actionTriggers, accountActions, derivedCharges,
- users, aliases, resProfiles, stats, thresholds, filters, sppProfiles, attributeProfiles, chargerProfiles), testTPID, "")
+ csvr = NewTpReader(dm.dataDB, NewStringCSVStorage(',', destinations, timings, rates, destinationRates,
+ ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers,
+ accountActions, derivedCharges, users, aliases, resProfiles, stats, thresholds,
+ filters, sppProfiles, attributeProfiles, chargerProfiles, dispatcherProfiles), testTPID, "")
if err := csvr.LoadDestinations(); err != nil {
log.Print("error in LoadDestinations:", err)
@@ -367,6 +372,9 @@ func init() {
if err := csvr.LoadChargerProfiles(); err != nil {
log.Print("error in LoadChargerProfiles:", err)
}
+ if err := csvr.LoadDispatcherProfiles(); err != nil {
+ log.Print("error in LoadChargerProfiles:", err)
+ }
csvr.WriteToDatabase(false, false, false)
Cache.Clear(nil)
//dm.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
@@ -1714,6 +1722,42 @@ func TestLoadChargerProfiles(t *testing.T) {
}
}
+func TestLoadDispatcherProfiles(t *testing.T) {
+ eDispatcherProfiles := map[utils.TenantID]*utils.TPDispatcherProfile{
+ utils.TenantID{Tenant: "cgrates.org", ID: "D1"}: &utils.TPDispatcherProfile{
+ TPid: testTPID,
+ Tenant: "cgrates.org",
+ ID: "D1",
+ FilterIDs: []string{"*string:Account:1001"},
+ ActivationInterval: &utils.TPActivationInterval{
+ ActivationTime: "2014-07-29T15:00:00Z",
+ },
+ Strategy: "*first",
+ Hosts: []string{"192.168.56.203", "192.168.56.204"},
+ Weight: 20,
+ },
+ }
+ revHosts := &utils.TPDispatcherProfile{
+ TPid: testTPID,
+ Tenant: "cgrates.org",
+ ID: "D1",
+ FilterIDs: []string{"*string:Account:1001"},
+ ActivationInterval: &utils.TPActivationInterval{
+ ActivationTime: "2014-07-29T15:00:00Z",
+ },
+ Strategy: "*first",
+ Hosts: []string{"192.168.56.204", "192.168.56.203"},
+ Weight: 20,
+ }
+ dppKey := utils.TenantID{Tenant: "cgrates.org", ID: "D1"}
+ if len(csvr.dispatcherProfiles) != len(eDispatcherProfiles) {
+ t.Errorf("Failed to load chargerProfiles: %s", utils.ToIJSON(csvr.chargerProfiles))
+ } else if !reflect.DeepEqual(eDispatcherProfiles[dppKey], csvr.dispatcherProfiles[dppKey]) &&
+ !reflect.DeepEqual(revHosts, csvr.dispatcherProfiles[dppKey]) {
+ t.Errorf("Expecting: %+v, received: %+v", eDispatcherProfiles[dppKey], csvr.dispatcherProfiles[dppKey])
+ }
+}
+
func TestLoadResource(t *testing.T) {
eResources := []*utils.TenantID{
&utils.TenantID{
diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go
index b63572fb6..b0897c20d 100644
--- a/engine/loader_it_test.go
+++ b/engine/loader_it_test.go
@@ -117,6 +117,7 @@ func TestLoaderITRemoveLoad(t *testing.T) {
path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.SuppliersCsv),
path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.AttributesCsv),
path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ChargersCsv),
+ path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.DispatchersCsv),
), "", "")
if err = loader.LoadDestinations(); err != nil {
@@ -179,6 +180,9 @@ func TestLoaderITRemoveLoad(t *testing.T) {
if err = loader.LoadChargerProfiles(); err != nil {
t.Error("Failed loading Charger profiles: ", err.Error())
}
+ if err = loader.LoadDispatcherProfiles(); err != nil {
+ t.Error("Failed loading Charger profiles: ", err.Error())
+ }
if err := loader.WriteToDatabase(true, false, false); err != nil {
t.Error("Could not write data into dataDb: ", err.Error())
}
@@ -217,6 +221,7 @@ func TestLoaderITLoadFromCSV(t *testing.T) {
path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.SuppliersCsv),
path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.AttributesCsv),
path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ChargersCsv),
+ path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.DispatchersCsv),
), "", "")
if err = loader.LoadDestinations(); err != nil {
@@ -279,6 +284,9 @@ func TestLoaderITLoadFromCSV(t *testing.T) {
if err = loader.LoadChargerProfiles(); err != nil {
t.Error("Failed loading Alias profiles: ", err.Error())
}
+ if err = loader.LoadDispatcherProfiles(); err != nil {
+ t.Error("Failed loading Charger profiles: ", err.Error())
+ }
if err := loader.WriteToDatabase(true, false, false); err != nil {
t.Error("Could not write data into dataDb: ", err.Error())
}
@@ -490,6 +498,20 @@ func TestLoaderITWriteToDatabase(t *testing.T) {
}
}
+ for tenatid, dpp := range loader.dispatcherProfiles {
+ rcv, err := loader.dm.GetDispatcherProfile(tenatid.Tenant, tenatid.ID, false, false, utils.NonTransactional)
+ if err != nil {
+ t.Errorf("Failed GetDispatcherProfile, tenant: %s, id: %s, error: %s ", dpp.Tenant, dpp.ID, err.Error())
+ }
+ dp, err := APItoDispatcherProfile(dpp, "UTC")
+ if err != nil {
+ t.Error(err)
+ }
+ if !reflect.DeepEqual(dp, rcv) {
+ t.Errorf("Expecting: %v, received: %v", dp, rcv)
+ }
+ }
+
}
// Imports data from csv files in tpScenario to storDb
diff --git a/engine/model_helpers.go b/engine/model_helpers.go
index eedade5dc..e380d6a32 100644
--- a/engine/model_helpers.go
+++ b/engine/model_helpers.go
@@ -2614,3 +2614,174 @@ func APItoChargerProfile(tpCPP *utils.TPChargerProfile, timezone string) (cpp *C
}
return cpp, nil
}
+
+type TPDispatchers []*TPDispatcher
+
+func (tps TPDispatchers) AsTPDispatchers() (result []*utils.TPDispatcherProfile) {
+ mst := make(map[string]*utils.TPDispatcherProfile)
+ filterMap := make(map[string]utils.StringMap)
+ hostMap := make(map[string]utils.StringMap)
+ for _, tp := range tps {
+ tpDPP, found := mst[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()]
+ if !found {
+ tpDPP = &utils.TPDispatcherProfile{
+ TPid: tp.Tpid,
+ Tenant: tp.Tenant,
+ ID: tp.ID,
+ }
+ }
+ if tp.Weight != 0 {
+ tpDPP.Weight = tp.Weight
+ }
+ if len(tp.ActivationInterval) != 0 {
+ tpDPP.ActivationInterval = new(utils.TPActivationInterval)
+ aiSplt := strings.Split(tp.ActivationInterval, utils.INFIELD_SEP)
+ if len(aiSplt) == 2 {
+ tpDPP.ActivationInterval.ActivationTime = aiSplt[0]
+ tpDPP.ActivationInterval.ExpiryTime = aiSplt[1]
+ } else if len(aiSplt) == 1 {
+ tpDPP.ActivationInterval.ActivationTime = aiSplt[0]
+ }
+ }
+ if tp.FilterIDs != "" {
+ if _, has := filterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()]; !has {
+ filterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] = make(utils.StringMap)
+ }
+ filterSplit := strings.Split(tp.FilterIDs, utils.INFIELD_SEP)
+ for _, filter := range filterSplit {
+ filterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][filter] = true
+ }
+ }
+ if tp.Strategy != "" {
+ tpDPP.Strategy = tp.Strategy
+ }
+ if tp.Hosts != "" {
+ if _, has := hostMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()]; !has {
+ hostMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] = make(utils.StringMap)
+ }
+ hostsSplit := strings.Split(tp.Hosts, utils.INFIELD_SEP)
+ for _, host := range hostsSplit {
+ hostMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][host] = true
+ }
+ }
+ mst[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] = tpDPP
+ }
+ result = make([]*utils.TPDispatcherProfile, len(mst))
+ i := 0
+ for tntID, tp := range mst {
+ result[i] = tp
+ for filterID := range filterMap[tntID] {
+ result[i].FilterIDs = append(result[i].FilterIDs, filterID)
+ }
+ for host := range hostMap[tntID] {
+ result[i].Hosts = append(result[i].Hosts, host)
+ }
+ i++
+ }
+ return
+}
+
+func APItoModelTPDispatcher(tpDPP *utils.TPDispatcherProfile) (mdls TPDispatchers) {
+ if tpDPP != nil {
+ min := len(tpDPP.FilterIDs)
+ isFilter := true
+ if min > len(tpDPP.Hosts) {
+ min = len(tpDPP.Hosts)
+ isFilter = false
+ }
+ if min == 0 {
+ mdl := &TPDispatcher{
+ Tenant: tpDPP.Tenant,
+ Tpid: tpDPP.TPid,
+ ID: tpDPP.ID,
+ Weight: tpDPP.Weight,
+ Strategy: tpDPP.Strategy,
+ }
+ if tpDPP.ActivationInterval != nil {
+ if tpDPP.ActivationInterval.ActivationTime != "" {
+ mdl.ActivationInterval = tpDPP.ActivationInterval.ActivationTime
+ }
+ if tpDPP.ActivationInterval.ExpiryTime != "" {
+ mdl.ActivationInterval += utils.INFIELD_SEP + tpDPP.ActivationInterval.ExpiryTime
+ }
+ }
+ if isFilter && len(tpDPP.Hosts) > 0 {
+ mdl.Hosts = tpDPP.Hosts[0]
+ } else if len(tpDPP.FilterIDs) > 0 {
+ mdl.FilterIDs = tpDPP.FilterIDs[0]
+ }
+ min = 1
+ mdls = append(mdls, mdl)
+ } else {
+ for i := 0; i < min; i++ {
+ mdl := &TPDispatcher{
+ Tenant: tpDPP.Tenant,
+ Tpid: tpDPP.TPid,
+ ID: tpDPP.ID,
+ }
+ if i == 0 {
+ mdl.Weight = tpDPP.Weight
+ mdl.Strategy = tpDPP.Strategy
+ if tpDPP.ActivationInterval != nil {
+ if tpDPP.ActivationInterval.ActivationTime != "" {
+ mdl.ActivationInterval = tpDPP.ActivationInterval.ActivationTime
+ }
+ if tpDPP.ActivationInterval.ExpiryTime != "" {
+ mdl.ActivationInterval += utils.INFIELD_SEP + tpDPP.ActivationInterval.ExpiryTime
+ }
+ }
+ }
+ mdl.Hosts = tpDPP.Hosts[i]
+ mdl.FilterIDs = tpDPP.FilterIDs[i]
+ mdls = append(mdls, mdl)
+ }
+ }
+ if len(tpDPP.FilterIDs)-min > 0 {
+ for i := min; i < len(tpDPP.FilterIDs); i++ {
+ mdl := &TPDispatcher{
+ Tenant: tpDPP.Tenant,
+ Tpid: tpDPP.TPid,
+ ID: tpDPP.ID,
+ }
+ mdl.FilterIDs = tpDPP.FilterIDs[i]
+ mdls = append(mdls, mdl)
+ }
+ }
+ if len(tpDPP.Hosts)-min > 0 {
+ for i := min; i < len(tpDPP.Hosts); i++ {
+ mdl := &TPDispatcher{
+ Tenant: tpDPP.Tenant,
+ Tpid: tpDPP.TPid,
+ ID: tpDPP.ID,
+ }
+ mdl.Hosts = tpDPP.Hosts[i]
+ mdls = append(mdls, mdl)
+ }
+ }
+
+ }
+ return
+}
+
+func APItoDispatcherProfile(tpDPP *utils.TPDispatcherProfile, timezone string) (dpp *DispatcherProfile, err error) {
+ dpp = &DispatcherProfile{
+ Tenant: tpDPP.Tenant,
+ ID: tpDPP.ID,
+ Weight: tpDPP.Weight,
+ Strategy: tpDPP.Strategy,
+ FilterIDs: make([]string, len(tpDPP.FilterIDs)),
+ Hosts: make([]string, len(tpDPP.Hosts)),
+ }
+ for i, fli := range tpDPP.FilterIDs {
+ dpp.FilterIDs[i] = fli
+ }
+ for i, host := range tpDPP.Hosts {
+ dpp.Hosts[i] = host
+ }
+ if tpDPP.ActivationInterval != nil {
+ if dpp.ActivationInterval, err = tpDPP.ActivationInterval.AsActivationInterval(timezone); err != nil {
+ return nil, err
+ }
+ }
+ return dpp, nil
+}
diff --git a/engine/models.go b/engine/models.go
index 3c1b11a54..a7ab000e7 100644
--- a/engine/models.go
+++ b/engine/models.go
@@ -503,3 +503,16 @@ type TPCharger struct {
Weight float64 `index:"6" re:"\d+\.?\d*"`
CreatedAt time.Time
}
+
+type TPDispatcher struct {
+ PK uint `gorm:"primary_key"`
+ Tpid string
+ Tenant string `index:"0" re:""`
+ ID string `index:"1" re:""`
+ FilterIDs string `index:"2" re:""`
+ ActivationInterval string `index:"3" re:""`
+ Strategy string `index:"4" re:""`
+ Hosts string `index:"5" re:""`
+ Weight float64 `index:"6" re:"\d+\.?\d*"`
+ CreatedAt time.Time
+}
diff --git a/engine/storage_csv.go b/engine/storage_csv.go
index f289a9b04..70279fef0 100644
--- a/engine/storage_csv.go
+++ b/engine/storage_csv.go
@@ -34,34 +34,38 @@ type CSVStorage struct {
// file names
destinationsFn, ratesFn, destinationratesFn, timingsFn, destinationratetimingsFn, ratingprofilesFn,
sharedgroupsFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn,
- usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn string
+ usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn,
+ chargerProfilesFn, dispatcherProfilesFn string
}
func NewFileCSVStorage(sep rune,
destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn,
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, usersFn, aliasesFn,
- resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn string) *CSVStorage {
+ resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn string) *CSVStorage {
c := new(CSVStorage)
c.sep = sep
c.readerFunc = openFileCSVStorage
c.destinationsFn, c.timingsFn, c.ratesFn, c.destinationratesFn, c.destinationratetimingsFn, c.ratingprofilesFn,
c.sharedgroupsFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn,
c.derivedChargersFn, c.usersFn, c.aliasesFn, c.resProfilesFn, c.statsFn, c.thresholdsFn,
- c.filterFn, c.suppProfilesFn, c.attributeProfilesFn, c.chargerProfilesFn = destinationsFn, timingsFn,
+ c.filterFn, c.suppProfilesFn, c.attributeProfilesFn,
+ c.chargerProfilesFn, c.dispatcherProfilesFn = destinationsFn, timingsFn,
ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn,
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn,
- usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn
+ usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn,
+ attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn
return c
}
func NewStringCSVStorage(sep rune,
destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn,
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, usersFn,
- aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn string) *CSVStorage {
+ aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn,
+ attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn string) *CSVStorage {
c := NewFileCSVStorage(sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn,
ratingprofilesFn, sharedgroupsFn, actionsFn, actiontimingsFn, actiontriggersFn,
accountactionsFn, derivedChargersFn, usersFn, aliasesFn, resProfilesFn,
- statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn)
+ statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn)
c.readerFunc = openStringCSVStorage
return c
}
@@ -733,10 +737,38 @@ func (csvs *CSVStorage) GetTPChargers(tpid, tenant, id string) ([]*utils.TPCharg
return tpCPPs.AsTPChargers(), nil
}
+func (csvs *CSVStorage) GetTPDispatchers(tpid, tenant, id string) ([]*utils.TPDispatcherProfile, error) {
+ csvReader, fp, err := csvs.readerFunc(csvs.dispatcherProfilesFn, csvs.sep, getColumnCount(TPDispatcher{}))
+ if err != nil {
+ // allow writing of the other values
+ return nil, nil
+ }
+ if fp != nil {
+ defer fp.Close()
+ }
+ var tpDPPs TPDispatchers
+ for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
+ if err != nil {
+ log.Printf("bad line in %s, %s\n", csvs.dispatcherProfilesFn, err.Error())
+ return nil, err
+ }
+ if dpp, err := csvLoad(TPDispatcher{}, record); err != nil {
+ log.Print("error loading tpDispatcherProfile: ", err)
+ return nil, err
+ } else {
+ dpp := dpp.(TPDispatcher)
+ dpp.Tpid = tpid
+ tpDPPs = append(tpDPPs, &dpp)
+ }
+ }
+ return tpDPPs.AsTPDispatchers(), nil
+}
+
func (csvs *CSVStorage) GetTpIds(colName string) ([]string, error) {
return nil, utils.ErrNotImplemented
}
-func (csvs *CSVStorage) GetTpTableIds(tpid, table string, distinct utils.TPDistinctIds, filters map[string]string, p *utils.Paginator) ([]string, error) {
+func (csvs *CSVStorage) GetTpTableIds(tpid, table string,
+ distinct utils.TPDistinctIds, filters map[string]string, p *utils.Paginator) ([]string, error) {
return nil, utils.ErrNotImplemented
}
diff --git a/engine/storage_interface.go b/engine/storage_interface.go
index ede3b1ac3..87b1bf1c7 100644
--- a/engine/storage_interface.go
+++ b/engine/storage_interface.go
@@ -140,6 +140,9 @@ type DataDB interface {
GetChargerProfileDrv(string, string) (*ChargerProfile, error)
SetChargerProfileDrv(*ChargerProfile) error
RemoveChargerProfileDrv(string, string) error
+ GetDispatcherProfileDrv(string, string) (*DispatcherProfile, error)
+ SetDispatcherProfileDrv(*DispatcherProfile) error
+ RemoveDispatcherProfileDrv(string, string) error
}
type StorDB interface {
@@ -189,6 +192,7 @@ type LoadReader interface {
GetTPSuppliers(string, string, string) ([]*utils.TPSupplierProfile, error)
GetTPAttributes(string, string, string) ([]*utils.TPAttributeProfile, error)
GetTPChargers(string, string, string) ([]*utils.TPChargerProfile, error)
+ GetTPDispatchers(string, string, string) ([]*utils.TPDispatcherProfile, error)
}
type LoadWriter interface {
@@ -214,6 +218,7 @@ type LoadWriter interface {
SetTPSuppliers([]*utils.TPSupplierProfile) error
SetTPAttributes([]*utils.TPAttributeProfile) error
SetTPChargers([]*utils.TPChargerProfile) error
+ SetTPDispatchers([]*utils.TPDispatcherProfile) error
}
// NewMarshaler returns the marshaler type selected by mrshlerStr
diff --git a/engine/storage_map_datadb.go b/engine/storage_map_datadb.go
index 14be2316e..b16c65975 100644
--- a/engine/storage_map_datadb.go
+++ b/engine/storage_map_datadb.go
@@ -222,7 +222,8 @@ func (ms *MapStorage) HasDataDrv(category, subject, tenant string) (bool, error)
return exists, nil
case utils.ResourcesPrefix, utils.ResourceProfilesPrefix, utils.StatQueuePrefix,
utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix,
- utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix, utils.ChargerProfilePrefix:
+ utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix,
+ utils.ChargerProfilePrefix, utils.DispatcherProfilePrefix:
_, exists := ms.dict[category+utils.ConcatenatedKey(tenant, subject)]
return exists, nil
}
@@ -1545,6 +1546,39 @@ func (ms *MapStorage) RemoveChargerProfileDrv(tenant, id string) (err error) {
return
}
+func (ms *MapStorage) GetDispatcherProfileDrv(tenant, id string) (r *DispatcherProfile, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
+ values, ok := ms.dict[utils.DispatcherProfilePrefix+utils.ConcatenatedKey(tenant, id)]
+ if !ok {
+ return nil, utils.ErrNotFound
+ }
+ err = ms.ms.Unmarshal(values, &r)
+ if err != nil {
+ return nil, err
+ }
+ return
+}
+
+func (ms *MapStorage) SetDispatcherProfileDrv(r *DispatcherProfile) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
+ result, err := ms.ms.Marshal(r)
+ if err != nil {
+ return err
+ }
+ ms.dict[utils.DispatcherProfilePrefix+utils.ConcatenatedKey(r.Tenant, r.ID)] = result
+ return
+}
+
+func (ms *MapStorage) RemoveDispatcherProfileDrv(tenant, id string) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
+ key := utils.DispatcherProfilePrefix + utils.ConcatenatedKey(tenant, id)
+ delete(ms.dict, key)
+ return
+}
+
func (ms *MapStorage) GetVersions(itm string) (vrs Versions, err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
diff --git a/engine/storage_map_stordb.go b/engine/storage_map_stordb.go
index 38f9e7c41..0c41936cb 100755
--- a/engine/storage_map_stordb.go
+++ b/engine/storage_map_stordb.go
@@ -94,6 +94,9 @@ func (ms *MapStorage) GetTPAttributes(tpid, tenant, id string) (attrs []*utils.T
func (ms *MapStorage) GetTPChargers(tpid, tenant, id string) (attrs []*utils.TPChargerProfile, err error) {
return nil, utils.ErrNotImplemented
}
+func (ms *MapStorage) GetTPDispatchers(tpid, tenant, id string) (attrs []*utils.TPDispatcherProfile, err error) {
+ return nil, utils.ErrNotImplemented
+}
//implement LoadWriter interface
func (ms *MapStorage) RemTpData(table, tpid string, args map[string]string) (err error) {
@@ -161,7 +164,10 @@ func (ms *MapStorage) SetTPSuppliers(suppliers []*utils.TPSupplierProfile) (err
func (ms *MapStorage) SetTPAttributes(attributes []*utils.TPAttributeProfile) (err error) {
return utils.ErrNotImplemented
}
-func (ms *MapStorage) SetTPChargers(attributes []*utils.TPChargerProfile) (err error) {
+func (ms *MapStorage) SetTPChargers(cpps []*utils.TPChargerProfile) (err error) {
+ return utils.ErrNotImplemented
+}
+func (ms *MapStorage) SetTPDispatchers(dpps []*utils.TPDispatcherProfile) (err error) {
return utils.ErrNotImplemented
}
diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go
index 388b428c4..a5d2028b8 100644
--- a/engine/storage_mongo_datadb.go
+++ b/engine/storage_mongo_datadb.go
@@ -75,6 +75,7 @@ const (
colAttr = "attribute_profiles"
ColCDRs = "cdrs"
colCpp = "charger_profiles"
+ colDpp = "dispatcher_profiles"
)
var (
@@ -638,6 +639,8 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
result, err = ms.getField2(sctx, colAttr, utils.AttributeProfilePrefix, subject, tntID)
case utils.ChargerProfilePrefix:
result, err = ms.getField2(sctx, colCpp, utils.ChargerProfilePrefix, subject, tntID)
+ case utils.DispatcherProfilePrefix:
+ result, err = ms.getField2(sctx, colDpp, utils.DispatcherProfilePrefix, subject, tntID)
default:
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix)
}
@@ -680,6 +683,8 @@ func (ms *MongoStorage) HasDataDrv(category, subject, tenant string) (has bool,
count, err = ms.getCol(colAttr).Count(sctx, bson.M{"tenant": tenant, "id": subject})
case utils.ChargerProfilePrefix:
count, err = ms.getCol(colCpp).Count(sctx, bson.M{"tenant": tenant, "id": subject})
+ case utils.DispatcherProfilePrefix:
+ count, err = ms.getCol(colDpp).Count(sctx, bson.M{"tenant": tenant, "id": subject})
default:
err = fmt.Errorf("unsupported category in HasData: %s", category)
}
@@ -2453,3 +2458,39 @@ func (ms *MongoStorage) RemoveChargerProfileDrv(tenant, id string) (err error) {
return err
})
}
+
+func (ms *MongoStorage) GetDispatcherProfileDrv(tenant, id string) (r *DispatcherProfile, err error) {
+ r = new(DispatcherProfile)
+ err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
+ cur := ms.getCol(colDpp).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
+ if err := cur.Decode(r); err != nil {
+ r = nil
+ if err == mongo.ErrNoDocuments {
+ return utils.ErrNotFound
+ }
+ return err
+ }
+ return nil
+ })
+ return
+}
+
+func (ms *MongoStorage) SetDispatcherProfileDrv(r *DispatcherProfile) (err error) {
+ return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
+ _, err = ms.getCol(colDpp).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID},
+ bson.M{"$set": r},
+ options.Update().SetUpsert(true),
+ )
+ return err
+ })
+}
+
+func (ms *MongoStorage) RemoveDispatcherProfileDrv(tenant, id string) (err error) {
+ return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
+ dr, err := ms.getCol(colDpp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id})
+ if dr.DeletedCount == 0 {
+ return utils.ErrNotFound
+ }
+ return err
+ })
+}
diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go
index 3b9bc36c1..2d9a6664b 100644
--- a/engine/storage_mongo_stordb.go
+++ b/engine/storage_mongo_stordb.go
@@ -1617,6 +1617,54 @@ func (ms *MongoStorage) SetTPChargers(tpCPP []*utils.TPChargerProfile) (err erro
})
}
+func (ms *MongoStorage) GetTPDispatchers(tpid, tenant, id string) ([]*utils.TPDispatcherProfile, error) {
+ filter := bson.M{"tpid": tpid}
+ if id != "" {
+ filter["id"] = id
+ }
+ if tenant != "" {
+ filter["tenant"] = tenant
+ }
+ var results []*utils.TPDispatcherProfile
+ err := ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
+ cur, err := ms.getCol(utils.TBLTPDispatchers).Find(sctx, filter)
+ if err != nil {
+ return err
+ }
+ for cur.Next(sctx) {
+ var tp utils.TPDispatcherProfile
+ err := cur.Decode(&tp)
+ if err != nil {
+ return err
+ }
+ results = append(results, &tp)
+ }
+ if len(results) == 0 {
+ return utils.ErrNotFound
+ }
+ return cur.Close(sctx)
+ })
+ return results, err
+}
+
+func (ms *MongoStorage) SetTPDispatchers(tpDPPs []*utils.TPDispatcherProfile) (err error) {
+ if len(tpDPPs) == 0 {
+ return
+ }
+ return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
+ for _, tp := range tpDPPs {
+ _, err = ms.getCol(utils.TBLTPDispatchers).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
+ bson.M{"$set": tp},
+ options.Update().SetUpsert(true),
+ )
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+}
+
func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) {
fop := options.FindOne()
if itm != "" {
diff --git a/engine/storage_redis.go b/engine/storage_redis.go
index c4aff7ae3..1f7a5c5b3 100644
--- a/engine/storage_redis.go
+++ b/engine/storage_redis.go
@@ -355,7 +355,6 @@ func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) {
return keys, nil
}
return nil, nil
-
}
// Used to check if specific subject is stored using prefix key attached to entity
@@ -367,7 +366,8 @@ func (rs *RedisStorage) HasDataDrv(category, subject, tenant string) (bool, erro
return i == 1, err
case utils.ResourcesPrefix, utils.ResourceProfilesPrefix, utils.StatQueuePrefix,
utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix,
- utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix, utils.ChargerProfilePrefix:
+ utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix,
+ utils.ChargerProfilePrefix, utils.DispatcherProfilePrefix:
i, err := rs.Cmd("EXISTS", category+utils.ConcatenatedKey(tenant, subject)).Int()
return i == 1, err
}
@@ -1768,6 +1768,37 @@ func (rs *RedisStorage) RemoveChargerProfileDrv(tenant, id string) (err error) {
return
}
+func (rs *RedisStorage) GetDispatcherProfileDrv(tenant, id string) (r *DispatcherProfile, err error) {
+ key := utils.DispatcherProfilePrefix + utils.ConcatenatedKey(tenant, id)
+ var values []byte
+ if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
+ if err == redis.ErrRespNil { // did not find the destination
+ err = utils.ErrNotFound
+ }
+ return
+ }
+ if err = rs.ms.Unmarshal(values, &r); err != nil {
+ return
+ }
+ return
+}
+
+func (rs *RedisStorage) SetDispatcherProfileDrv(r *DispatcherProfile) (err error) {
+ result, err := rs.ms.Marshal(r)
+ if err != nil {
+ return err
+ }
+ return rs.Cmd("SET", utils.DispatcherProfilePrefix+utils.ConcatenatedKey(r.Tenant, r.ID), result).Err
+}
+
+func (rs *RedisStorage) RemoveDispatcherProfileDrv(tenant, id string) (err error) {
+ key := utils.DispatcherProfilePrefix + utils.ConcatenatedKey(tenant, id)
+ if err = rs.Cmd("DEL", key).Err; err != nil {
+ return
+ }
+ return
+}
+
func (rs *RedisStorage) GetStorageType() string {
return utils.REDIS
}
diff --git a/engine/storage_sql.go b/engine/storage_sql.go
index f19d5cf06..8e6b4ab62 100644
--- a/engine/storage_sql.go
+++ b/engine/storage_sql.go
@@ -127,7 +127,7 @@ func (self *SQLStorage) GetTpIds(colName string) ([]string, error) {
qryStr := fmt.Sprintf(" (SELECT tpid FROM %s)", colName)
if colName == "" {
qryStr = fmt.Sprintf(
- "(SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s)",
+ "(SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s) UNION (SELECT tpid FROM %s)",
utils.TBLTPTimings,
utils.TBLTPDestinations,
utils.TBLTPRates,
@@ -148,7 +148,8 @@ func (self *SQLStorage) GetTpIds(colName string) ([]string, error) {
utils.TBLTPActionPlans,
utils.TBLTPSuppliers,
utils.TBLTPAttributes,
- utils.TBLTPChargers)
+ utils.TBLTPChargers,
+ utils.TBLTPDispatchers)
}
rows, err = self.Db.Query(qryStr)
if err != nil {
@@ -239,7 +240,8 @@ func (self *SQLStorage) RemTpData(table, tpid string, args map[string]string) er
utils.TBLTPSharedGroups, utils.TBLTPActions, utils.TBLTPActionPlans,
utils.TBLTPActionTriggers, utils.TBLTPAccountActions,
utils.TBLTPDerivedChargers, utils.TBLTPAliases, utils.TBLTPUsers, utils.TBLTPResources,
- utils.TBLTPStats, utils.TBLTPFilters, utils.TBLTPSuppliers, utils.TBLTPAttributes, utils.TBLTPChargers} {
+ utils.TBLTPStats, utils.TBLTPFilters, utils.TBLTPSuppliers, utils.TBLTPAttributes,
+ utils.TBLTPChargers, utils.TBLTPDispatchers} {
if err := tx.Table(tblName).Where("tpid = ?", tpid).Delete(nil).Error; err != nil {
tx.Rollback()
return err
@@ -709,6 +711,28 @@ func (self *SQLStorage) SetTPChargers(tpCPPs []*utils.TPChargerProfile) error {
return nil
}
+func (self *SQLStorage) SetTPDispatchers(tpDPPs []*utils.TPDispatcherProfile) error {
+ if len(tpDPPs) == 0 {
+ return nil
+ }
+ tx := self.db.Begin()
+ for _, dpp := range tpDPPs {
+ // Remove previous
+ if err := tx.Where(&TPDispatcher{Tpid: dpp.TPid, ID: dpp.ID}).Delete(TPDispatcher{}).Error; err != nil {
+ tx.Rollback()
+ return err
+ }
+ for _, mst := range APItoModelTPDispatcher(dpp) {
+ if err := tx.Save(&mst).Error; err != nil {
+ tx.Rollback()
+ return err
+ }
+ }
+ }
+ tx.Commit()
+ return nil
+}
+
func (self *SQLStorage) SetSMCost(smc *SMCost) error {
if smc.CostDetails == nil {
return nil
@@ -1607,6 +1631,25 @@ func (self *SQLStorage) GetTPChargers(tpid, tenant, id string) ([]*utils.TPCharg
return arls, nil
}
+func (self *SQLStorage) GetTPDispatchers(tpid, tenant, id string) ([]*utils.TPDispatcherProfile, error) {
+ var dpps TPDispatchers
+ q := self.db.Where("tpid = ?", tpid)
+ if len(id) != 0 {
+ q = q.Where("id = ?", id)
+ }
+ if len(tenant) != 0 {
+ q = q.Where("tenant = ?", tenant)
+ }
+ if err := q.Find(&dpps).Error; err != nil {
+ return nil, err
+ }
+ arls := dpps.AsTPDispatchers()
+ if len(arls) == 0 {
+ return arls, utils.ErrNotFound
+ }
+ return arls, nil
+}
+
// GetVersions returns slice of all versions or a specific version if tag is specified
func (self *SQLStorage) GetVersions(itm string) (vrs Versions, err error) {
q := self.db.Model(&TBLVersion{})
diff --git a/engine/tp_reader.go b/engine/tp_reader.go
index 2aa9fa612..7bbae87d6 100644
--- a/engine/tp_reader.go
+++ b/engine/tp_reader.go
@@ -30,39 +30,41 @@ import (
)
type TpReader struct {
- tpid string
- timezone string
- dm *DataManager
- lr LoadReader
- actions map[string][]*Action
- actionPlans map[string]*ActionPlan
- actionsTriggers map[string]ActionTriggers
- accountActions map[string]*Account
- dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed
- dirtyAccAliases []*TenantAccount // used to clean aliases that might have changed
- destinations map[string]*Destination
- timings map[string]*utils.TPTiming
- rates map[string]*utils.TPRate
- destinationRates map[string]*utils.TPDestinationRate
- ratingPlans map[string]*RatingPlan
- ratingProfiles map[string]*RatingProfile
- sharedGroups map[string]*SharedGroup
- derivedChargers map[string]*utils.DerivedChargers
- users map[string]*UserProfile
- aliases map[string]*Alias
- resProfiles map[utils.TenantID]*utils.TPResource
- sqProfiles map[utils.TenantID]*utils.TPStats
- thProfiles map[utils.TenantID]*utils.TPThreshold
- filters map[utils.TenantID]*utils.TPFilterProfile
- sppProfiles map[utils.TenantID]*utils.TPSupplierProfile
- attributeProfiles map[utils.TenantID]*utils.TPAttributeProfile
- chargerProfiles map[utils.TenantID]*utils.TPChargerProfile
- resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles
- statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles
- thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles
- suppliers []*utils.TenantID // IDs of suppliers which need creation based on sppProfiles
- attrTntID []*utils.TenantID // IDs of suppliers which need creation based on attributeProfiles
- chargers []*utils.TenantID // IDs of chargers which need creation based on chargerProfiles
+ tpid string
+ timezone string
+ dm *DataManager
+ lr LoadReader
+ actions map[string][]*Action
+ actionPlans map[string]*ActionPlan
+ actionsTriggers map[string]ActionTriggers
+ accountActions map[string]*Account
+ dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed
+ dirtyAccAliases []*TenantAccount // used to clean aliases that might have changed
+ destinations map[string]*Destination
+ timings map[string]*utils.TPTiming
+ rates map[string]*utils.TPRate
+ destinationRates map[string]*utils.TPDestinationRate
+ ratingPlans map[string]*RatingPlan
+ ratingProfiles map[string]*RatingProfile
+ sharedGroups map[string]*SharedGroup
+ derivedChargers map[string]*utils.DerivedChargers
+ users map[string]*UserProfile
+ aliases map[string]*Alias
+ resProfiles map[utils.TenantID]*utils.TPResource
+ sqProfiles map[utils.TenantID]*utils.TPStats
+ thProfiles map[utils.TenantID]*utils.TPThreshold
+ filters map[utils.TenantID]*utils.TPFilterProfile
+ sppProfiles map[utils.TenantID]*utils.TPSupplierProfile
+ attributeProfiles map[utils.TenantID]*utils.TPAttributeProfile
+ chargerProfiles map[utils.TenantID]*utils.TPChargerProfile
+ dispatcherProfiles map[utils.TenantID]*utils.TPDispatcherProfile
+ resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles
+ statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles
+ thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles
+ suppliers []*utils.TenantID // IDs of suppliers which need creation based on sppProfiles
+ attrTntID []*utils.TenantID // IDs of suppliers which need creation based on attributeProfiles
+ chargers []*utils.TenantID // IDs of chargers which need creation based on chargerProfiles
+ dpps []*utils.TenantID // IDs of chargers which need creation based on dispatcherProfiles
revDests,
revAliases,
acntActionPlans map[string][]string
@@ -137,6 +139,7 @@ func (tpr *TpReader) Init() {
tpr.sppProfiles = make(map[utils.TenantID]*utils.TPSupplierProfile)
tpr.attributeProfiles = make(map[utils.TenantID]*utils.TPAttributeProfile)
tpr.chargerProfiles = make(map[utils.TenantID]*utils.TPChargerProfile)
+ tpr.dispatcherProfiles = make(map[utils.TenantID]*utils.TPDispatcherProfile)
tpr.filters = make(map[utils.TenantID]*utils.TPFilterProfile)
tpr.revDests = make(map[string][]string)
tpr.revAliases = make(map[string][]string)
@@ -1450,6 +1453,30 @@ func (tpr *TpReader) LoadChargerProfiles() error {
return tpr.LoadChargerProfilesFiltered("")
}
+func (tpr *TpReader) LoadDispatcherProfilesFiltered(tag string) (err error) {
+ rls, err := tpr.lr.GetTPDispatchers(tpr.tpid, "", tag)
+ if err != nil {
+ return err
+ }
+ mapDispatcherProfile := make(map[utils.TenantID]*utils.TPDispatcherProfile)
+ for _, rl := range rls {
+ mapDispatcherProfile[utils.TenantID{Tenant: rl.Tenant, ID: rl.ID}] = rl
+ }
+ tpr.dispatcherProfiles = mapDispatcherProfile
+ for tntID := range mapDispatcherProfile {
+ if has, err := tpr.dm.HasData(utils.DispatcherProfilePrefix, tntID.ID, tntID.Tenant); err != nil {
+ return err
+ } else if !has {
+ tpr.dpps = append(tpr.dpps, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID})
+ }
+ }
+ return nil
+}
+
+func (tpr *TpReader) LoadDispatcherProfiles() error {
+ return tpr.LoadDispatcherProfilesFiltered("")
+}
+
func (tpr *TpReader) LoadAll() (err error) {
if err = tpr.LoadDestinations(); err != nil && err.Error() != utils.NotFoundCaps {
return
@@ -1514,6 +1541,9 @@ func (tpr *TpReader) LoadAll() (err error) {
if err = tpr.LoadChargerProfiles(); err != nil && err.Error() != utils.NotFoundCaps {
return
}
+ if err = tpr.LoadDispatcherProfiles(); err != nil && err.Error() != utils.NotFoundCaps {
+ return
+ }
return nil
}
@@ -1879,6 +1909,22 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
}
}
+ if verbose {
+ log.Print("DispatcherProfiles:")
+ }
+ for _, tpTH := range tpr.dispatcherProfiles {
+ th, err := APItoDispatcherProfile(tpTH, tpr.timezone)
+ if err != nil {
+ return err
+ }
+ if err = tpr.dm.SetDispatcherProfile(th, true); err != nil {
+ return err
+ }
+ if verbose {
+ log.Print("\t", th.TenantID())
+ }
+ }
+
if verbose {
log.Print("Timings:")
}
@@ -1986,6 +2032,8 @@ func (tpr *TpReader) ShowStatistics() {
log.Print("AttributeProfiles: ", len(tpr.attributeProfiles))
// Charger profiles
log.Print("ChargerProfiles: ", len(tpr.chargerProfiles))
+ // Dispatcher profiles
+ log.Print("DispatcherProfiles: ", len(tpr.dispatcherProfiles))
}
// Returns the identities loaded for a specific category, useful for cache reloads
@@ -2151,6 +2199,14 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
i++
}
return keys, nil
+ case utils.DispatcherProfilePrefix:
+ keys := make([]string, len(tpr.dispatcherProfiles))
+ i := 0
+ for k := range tpr.dispatcherProfiles {
+ keys[i] = k.TenantID()
+ i++
+ }
+ return keys, nil
}
return nil, errors.New("Unsupported load category")
}
@@ -2418,6 +2474,18 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
}
}
+ if verbose {
+ log.Print("DispatcherProfiles:")
+ }
+ for _, tpTH := range tpr.dispatcherProfiles {
+ if err = tpr.dm.RemoveDispatcherProfile(tpTH.Tenant, tpTH.ID, utils.NonTransactional, false); err != nil {
+ return err
+ }
+ if verbose {
+ log.Print("\t", tpTH.Tenant)
+ }
+ }
+
if verbose {
log.Print("Timings:")
}
diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go
index 96e1507c3..0ca747dd2 100644
--- a/engine/tpimporter_csv.go
+++ b/engine/tpimporter_csv.go
@@ -62,6 +62,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{
utils.SuppliersCsv: (*TPCSVImporter).importSuppliers,
utils.AttributesCsv: (*TPCSVImporter).importAttributeProfiles,
utils.ChargersCsv: (*TPCSVImporter).importChargerProfiles,
+ utils.DispatchersCsv: (*TPCSVImporter).importDispatcherProfiles,
}
func (self *TPCSVImporter) Run() error {
@@ -87,6 +88,7 @@ func (self *TPCSVImporter) Run() error {
path.Join(self.DirPath, utils.SuppliersCsv),
path.Join(self.DirPath, utils.AttributesCsv),
path.Join(self.DirPath, utils.ChargersCsv),
+ path.Join(self.DirPath, utils.DispatchersCsv),
)
files, _ := ioutil.ReadDir(self.DirPath)
for _, f := range files {
@@ -402,3 +404,14 @@ func (self *TPCSVImporter) importChargerProfiles(fn string) error {
}
return self.StorDb.SetTPChargers(rls)
}
+
+func (self *TPCSVImporter) importDispatcherProfiles(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ dpps, err := self.csvr.GetTPDispatchers(self.TPid, "", "")
+ if err != nil {
+ return err
+ }
+ return self.StorDb.SetTPDispatchers(dpps)
+}
diff --git a/engine/version.go b/engine/version.go
index efa02efb0..7b2388909 100644
--- a/engine/version.go
+++ b/engine/version.go
@@ -158,6 +158,7 @@ func CurrentDataDBVersions() Versions {
utils.RatingPlan: 1,
utils.RatingProfile: 1,
utils.Chargers: 1,
+ utils.Dispatchers: 1,
}
}
@@ -190,6 +191,7 @@ func CurrentStorDBVersions() Versions {
utils.TpRatingPlan: 1,
utils.TpRatingProfile: 1,
utils.TpChargers: 1,
+ utils.TpDispatchers: 1,
}
}
diff --git a/general_tests/acntacts_test.go b/general_tests/acntacts_test.go
index cf8cedd0c..33d2bb633 100644
--- a/general_tests/acntacts_test.go
+++ b/general_tests/acntacts_test.go
@@ -60,7 +60,7 @@ ENABLE_ACNT,*enable_account,,,,,,,,,,,,,,false,false,10`
csvr := engine.NewTpReader(dbAcntActs.DataDB(), engine.NewStringCSVStorage(',', destinations, timings,
rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups,
actions, actionPlans, actionTriggers, accountActions, derivedCharges,
- users, aliases, resLimits, stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles), "", "")
+ users, aliases, resLimits, stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles, ``), "", "")
if err := csvr.LoadAll(); err != nil {
t.Fatal(err)
}
@@ -69,7 +69,7 @@ ENABLE_ACNT,*enable_account,,,,,,,,,,,,,,false,false,10`
engine.Cache.Clear(nil)
dbAcntActs.LoadDataDBCache(nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil, nil,
- nil, nil, nil, nil, nil, nil, nil, nil, nil)
+ nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
expectAcnt := &engine.Account{ID: "cgrates.org:1"}
if acnt, err := dbAcntActs.DataDB().GetAccount("cgrates.org:1"); err != nil {
diff --git a/general_tests/auth_test.go b/general_tests/auth_test.go
index 50f47aaa7..7c0900380 100644
--- a/general_tests/auth_test.go
+++ b/general_tests/auth_test.go
@@ -67,7 +67,7 @@ RP_ANY,DR_ANY_1CNT,*any,10`
chargerProfiles := ``
csvr := engine.NewTpReader(dbAuth.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates, destinationRates,
ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers, accountActions,
- derivedCharges, users, aliases, resLimits, stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles), "", "")
+ derivedCharges, users, aliases, resLimits, stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles, ``), "", "")
if err := csvr.LoadAll(); err != nil {
t.Fatal(err)
}
@@ -81,7 +81,7 @@ RP_ANY,DR_ANY_1CNT,*any,10`
engine.Cache.Clear(nil)
dbAuth.LoadDataDBCache(nil, nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil, nil,
- nil, nil, nil, nil, nil, nil, nil, nil)
+ nil, nil, nil, nil, nil, nil, nil, nil, nil)
if cachedDests := len(engine.Cache.GetItemIDs(utils.CacheDestinations, "")); cachedDests != 0 {
t.Error("Wrong number of cached destinations found", cachedDests)
diff --git a/general_tests/costs1_test.go b/general_tests/costs1_test.go
index 78fd8474a..11ed9f6bd 100644
--- a/general_tests/costs1_test.go
+++ b/general_tests/costs1_test.go
@@ -51,7 +51,7 @@ RP_SMS1,DR_SMS_1,ALWAYS,10`
*out,cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1,,
*out,cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,,`
csvr := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', dests, timings, rates, destinationRates, ratingPlans, ratingProfiles,
- "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "")
+ "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "")
if err := csvr.LoadTimings(); err != nil {
t.Fatal(err)
@@ -75,7 +75,7 @@ RP_SMS1,DR_SMS_1,ALWAYS,10`
engine.Cache.Clear(nil)
dataDB.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil, nil, nil,
- nil, nil, nil, nil, nil, nil)
+ nil, nil, nil, nil, nil, nil, nil)
if cachedRPlans := len(engine.Cache.GetItemIDs(utils.CacheRatingPlans, "")); cachedRPlans != 3 {
t.Error("Wrong number of cached rating plans found", cachedRPlans)
diff --git a/general_tests/datachrg1_test.go b/general_tests/datachrg1_test.go
index 56b4a852b..03df337c0 100644
--- a/general_tests/datachrg1_test.go
+++ b/general_tests/datachrg1_test.go
@@ -42,7 +42,7 @@ DR_DATA_2,*any,RT_DATA_1c,*up,4,0,`
RP_DATA1,DR_DATA_2,TM2,10`
ratingProfiles := `*out,cgrates.org,data,*any,2012-01-01T00:00:00Z,RP_DATA1,,`
csvr := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles,
- "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "")
+ "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "")
if err := csvr.LoadTimings(); err != nil {
t.Fatal(err)
}
@@ -62,7 +62,7 @@ RP_DATA1,DR_DATA_2,TM2,10`
engine.Cache.Clear(nil)
dataDB.LoadDataDBCache(nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil,
- nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
+ nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
if cachedRPlans := len(engine.Cache.GetItemIDs(utils.CacheRatingPlans, "")); cachedRPlans != 1 {
t.Error("Wrong number of cached rating plans found", cachedRPlans)
diff --git a/general_tests/ddazmbl1_test.go b/general_tests/ddazmbl1_test.go
index 9ca780a63..3680c5e03 100644
--- a/general_tests/ddazmbl1_test.go
+++ b/general_tests/ddazmbl1_test.go
@@ -69,7 +69,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
destinationRates, ratingPlans, ratingProfiles,
sharedGroups, actions, actionPlans, actionTriggers, accountActions,
derivedCharges, users, aliases, resLimits, stats,
- thresholds, filters, suppliers, aliasProfiles, chargerProfiles), "", "")
+ thresholds, filters, suppliers, aliasProfiles, chargerProfiles, ``), "", "")
if err := csvr.LoadDestinations(); err != nil {
t.Fatal(err)
}
@@ -116,7 +116,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
dataDB.LoadDataDBCache(nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil,
- nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
+ nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
if cachedDests := len(engine.Cache.GetItemIDs(utils.CacheDestinations, "")); cachedDests != 0 {
t.Error("Wrong number of cached destinations found", cachedDests)
diff --git a/general_tests/ddazmbl2_test.go b/general_tests/ddazmbl2_test.go
index a0d6c0af4..e86033d12 100644
--- a/general_tests/ddazmbl2_test.go
+++ b/general_tests/ddazmbl2_test.go
@@ -67,7 +67,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
csvr := engine.NewTpReader(dataDB2.DataDB(), engine.NewStringCSVStorage(',', destinations, timings,
rates, destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans,
actionTriggers, accountActions, derivedCharges, users, aliases, resLimits,
- stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles), "", "")
+ stats, thresholds, filters, suppliers, aliasProfiles, chargerProfiles, ``), "", "")
if err := csvr.LoadDestinations(); err != nil {
t.Fatal(err)
}
@@ -113,7 +113,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
engine.Cache.Clear(nil)
dataDB2.LoadDataDBCache(nil, nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil, nil,
- nil, nil, nil, nil, nil, nil, nil, nil)
+ nil, nil, nil, nil, nil, nil, nil, nil, nil)
if cachedDests := len(engine.Cache.GetItemIDs(utils.CacheDestinations, "")); cachedDests != 0 {
t.Error("Wrong number of cached destinations found", cachedDests)
diff --git a/general_tests/ddazmbl3_test.go b/general_tests/ddazmbl3_test.go
index 775e78ea5..a8b7ce900 100644
--- a/general_tests/ddazmbl3_test.go
+++ b/general_tests/ddazmbl3_test.go
@@ -65,7 +65,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10`
csvr := engine.NewTpReader(dataDB3.DataDB(), engine.NewStringCSVStorage(',', destinations, timings, rates,
destinationRates, ratingPlans, ratingProfiles, sharedGroups, actions, actionPlans, actionTriggers,
accountActions, derivedCharges, users, aliases, resLimits, stats,
- thresholds, filters, suppliers, aliasProfiles, chargerProfiles), "", "")
+ thresholds, filters, suppliers, aliasProfiles, chargerProfiles, ``), "", "")
if err := csvr.LoadDestinations(); err != nil {
t.Fatal(err)
}
@@ -111,7 +111,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10`
engine.Cache.Clear(nil)
dataDB3.LoadDataDBCache(nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil,
- nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
+ nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
if cachedDests := len(engine.Cache.GetItemIDs(utils.CacheDestinations, "")); cachedDests != 0 {
t.Error("Wrong number of cached destinations found", cachedDests)
diff --git a/general_tests/smschrg1_test.go b/general_tests/smschrg1_test.go
index e7efd8067..b22824ad2 100644
--- a/general_tests/smschrg1_test.go
+++ b/general_tests/smschrg1_test.go
@@ -40,7 +40,7 @@ func TestSMSLoadCsvTpSmsChrg1(t *testing.T) {
ratingPlans := `RP_SMS1,DR_SMS_1,ALWAYS,10`
ratingProfiles := `*out,cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,,`
csvr := engine.NewTpReader(dataDB.DataDB(), engine.NewStringCSVStorage(',', "", timings, rates, destinationRates, ratingPlans, ratingProfiles,
- "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "")
+ "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""), "", "")
if err := csvr.LoadTimings(); err != nil {
t.Fatal(err)
}
@@ -60,7 +60,7 @@ func TestSMSLoadCsvTpSmsChrg1(t *testing.T) {
engine.Cache.Clear(nil)
dataDB.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil,
- nil, nil, nil, nil, nil, nil, nil, nil)
+ nil, nil, nil, nil, nil, nil, nil, nil, nil)
if cachedRPlans := len(engine.Cache.GetItemIDs(utils.CacheRatingPlans, "")); cachedRPlans != 1 {
t.Error("Wrong number of cached rating plans found", cachedRPlans)
diff --git a/utils/apitpdata.go b/utils/apitpdata.go
index 55c30f479..61e372dda 100755
--- a/utils/apitpdata.go
+++ b/utils/apitpdata.go
@@ -628,6 +628,7 @@ type ArgsCache struct {
SupplierProfileIDs *[]string
AttributeProfileIDs *[]string
ChargerProfileIDs *[]string
+ DispatcherProfileIDs *[]string
}
// Data used to do remote cache reloads via api
@@ -670,6 +671,7 @@ type CacheStats struct {
SupplierProfiles int
AttributeProfiles int
ChargerProfiles int
+ DispatcherProfiles int
}
type AttrExpFileCdrs struct {
@@ -1396,3 +1398,14 @@ type TPTntID struct {
Tenant string
ID string
}
+
+type TPDispatcherProfile struct {
+ TPid string
+ Tenant string
+ ID string
+ FilterIDs []string
+ ActivationInterval *TPActivationInterval // Time when this limit becomes active and expires
+ Strategy string
+ Hosts []string
+ Weight float64
+}
diff --git a/utils/consts.go b/utils/consts.go
index 1bb2fabc3..e3f50d3de 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -19,10 +19,13 @@ along with this program. If not, see
package utils
var (
- CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap}
- PrimaryCdrFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category, Account, Subject, Destination, SetupTime, AnswerTime, Usage,
+ CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap,
+ MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap}
+ PrimaryCdrFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category,
+ Account, Subject, Destination, SetupTime, AnswerTime, Usage,
COST, RATED, Partial, RunID}
- NotExtraCDRFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category, Account, Subject, Destination, SetupTime, AnswerTime, Usage,
+ NotExtraCDRFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category,
+ Account, Subject, Destination, SetupTime, AnswerTime, Usage,
COST, RATED, Partial, RunID, PreRated, CostSource}
GitLastLog string // If set, it will be processed as part of versioning
PosterTransportContentTypes = map[string]string{
@@ -43,45 +46,48 @@ var (
MetaFileFWV: FWVSuffix,
}
CacheInstanceToPrefix = map[string]string{
- CacheDestinations: DESTINATION_PREFIX,
- CacheReverseDestinations: REVERSE_DESTINATION_PREFIX,
- CacheRatingPlans: RATING_PLAN_PREFIX,
- CacheRatingProfiles: RATING_PROFILE_PREFIX,
- CacheActions: ACTION_PREFIX,
- CacheActionPlans: ACTION_PLAN_PREFIX,
- CacheAccountActionPlans: AccountActionPlansPrefix,
- CacheActionTriggers: ACTION_TRIGGER_PREFIX,
- CacheSharedGroups: SHARED_GROUP_PREFIX,
- CacheAliases: ALIASES_PREFIX,
- CacheReverseAliases: REVERSE_ALIASES_PREFIX,
- CacheDerivedChargers: DERIVEDCHARGERS_PREFIX,
- CacheResourceProfiles: ResourceProfilesPrefix,
- CacheResources: ResourcesPrefix,
- CacheEventResources: EventResourcesPrefix,
- CacheTimings: TimingsPrefix,
- CacheStatQueueProfiles: StatQueueProfilePrefix,
- CacheStatQueues: StatQueuePrefix,
- CacheThresholdProfiles: ThresholdProfilePrefix,
- CacheThresholds: ThresholdPrefix,
- CacheFilters: FilterPrefix,
- CacheSupplierProfiles: SupplierProfilePrefix,
- CacheAttributeProfiles: AttributeProfilePrefix,
- CacheChargerProfiles: ChargerProfilePrefix,
- CacheResourceFilterIndexes: ResourceFilterIndexes,
- CacheStatFilterIndexes: StatFilterIndexes,
- CacheThresholdFilterIndexes: ThresholdFilterIndexes,
- CacheSupplierFilterIndexes: SupplierFilterIndexes,
- CacheAttributeFilterIndexes: AttributeFilterIndexes,
- CacheChargerFilterIndexes: ChargerFilterIndexes,
+ CacheDestinations: DESTINATION_PREFIX,
+ CacheReverseDestinations: REVERSE_DESTINATION_PREFIX,
+ CacheRatingPlans: RATING_PLAN_PREFIX,
+ CacheRatingProfiles: RATING_PROFILE_PREFIX,
+ CacheActions: ACTION_PREFIX,
+ CacheActionPlans: ACTION_PLAN_PREFIX,
+ CacheAccountActionPlans: AccountActionPlansPrefix,
+ CacheActionTriggers: ACTION_TRIGGER_PREFIX,
+ CacheSharedGroups: SHARED_GROUP_PREFIX,
+ CacheAliases: ALIASES_PREFIX,
+ CacheReverseAliases: REVERSE_ALIASES_PREFIX,
+ CacheDerivedChargers: DERIVEDCHARGERS_PREFIX,
+ CacheResourceProfiles: ResourceProfilesPrefix,
+ CacheResources: ResourcesPrefix,
+ CacheEventResources: EventResourcesPrefix,
+ CacheTimings: TimingsPrefix,
+ CacheStatQueueProfiles: StatQueueProfilePrefix,
+ CacheStatQueues: StatQueuePrefix,
+ CacheThresholdProfiles: ThresholdProfilePrefix,
+ CacheThresholds: ThresholdPrefix,
+ CacheFilters: FilterPrefix,
+ CacheSupplierProfiles: SupplierProfilePrefix,
+ CacheAttributeProfiles: AttributeProfilePrefix,
+ CacheChargerProfiles: ChargerProfilePrefix,
+ CacheDispatcherProfiles: DispatcherProfilePrefix,
+ CacheResourceFilterIndexes: ResourceFilterIndexes,
+ CacheStatFilterIndexes: StatFilterIndexes,
+ CacheThresholdFilterIndexes: ThresholdFilterIndexes,
+ CacheSupplierFilterIndexes: SupplierFilterIndexes,
+ CacheAttributeFilterIndexes: AttributeFilterIndexes,
+ CacheChargerFilterIndexes: ChargerFilterIndexes,
+ CacheDispatcherFilterIndexes: DispatcherFilterIndexes,
}
CachePrefixToInstance map[string]string // will be built on init
PrefixToIndexCache = map[string]string{
- ThresholdProfilePrefix: CacheThresholdFilterIndexes,
- ResourceProfilesPrefix: CacheResourceFilterIndexes,
- StatQueueProfilePrefix: CacheStatFilterIndexes,
- SupplierProfilePrefix: CacheSupplierFilterIndexes,
- AttributeProfilePrefix: CacheAttributeFilterIndexes,
- ChargerProfilePrefix: CacheChargerFilterIndexes,
+ ThresholdProfilePrefix: CacheThresholdFilterIndexes,
+ ResourceProfilesPrefix: CacheResourceFilterIndexes,
+ StatQueueProfilePrefix: CacheStatFilterIndexes,
+ SupplierProfilePrefix: CacheSupplierFilterIndexes,
+ AttributeProfilePrefix: CacheAttributeFilterIndexes,
+ ChargerProfilePrefix: CacheChargerFilterIndexes,
+ DispatcherProfilePrefix: CacheDispatcherFilterIndexes,
}
CacheIndexesToPrefix map[string]string // will be built on init
)
@@ -245,6 +251,7 @@ const (
SupplierProfilePrefix = "spp_"
AttributeProfilePrefix = "alp_"
ChargerProfilePrefix = "cpp_"
+ DispatcherProfilePrefix = "dpp_"
ThresholdProfilePrefix = "thp_"
StatQueuePrefix = "stq_"
LOADINST_KEY = "load_history"
@@ -412,6 +419,7 @@ const (
Suppliers = "Suppliers"
Attributes = "Attributes"
Chargers = "Chargers"
+ Dispatchers = "Dispatchers"
StatS = "Stats"
RALService = "RALs"
CostSource = "CostSource"
@@ -659,6 +667,7 @@ const (
TpRatingPlan = "TpRatingPlan"
TpRatingProfile = "TpRatingProfile"
TpChargers = "TpChargers"
+ TpDispatchers = "TpDispatchers"
)
// Dispatcher Const
@@ -858,6 +867,7 @@ const (
SuppliersCsv = "Suppliers.csv"
AttributesCsv = "Attributes.csv"
ChargersCsv = "Chargers.csv"
+ DispatchersCsv = "Dispatchers.csv"
)
// Table Name
@@ -887,53 +897,57 @@ const (
TBLTPChargers = "tp_chargers"
TBLVersions = "versions"
OldSMCosts = "sm_costs"
+ TBLTPDispatchers = "tp_dispatchers"
)
// Cache Name
const (
- CacheDestinations = "destinations"
- CacheReverseDestinations = "reverse_destinations"
- CacheRatingPlans = "rating_plans"
- CacheRatingProfiles = "rating_profiles"
- CacheActions = "actions"
- CacheActionPlans = "action_plans"
- CacheAccountActionPlans = "account_action_plans"
- CacheActionTriggers = "action_triggers"
- CacheSharedGroups = "shared_groups"
- CacheAliases = "aliases"
- CacheReverseAliases = "reverse_aliases"
- CacheDerivedChargers = "derived_chargers"
- CacheResources = "resources"
- CacheResourceProfiles = "resource_profiles"
- CacheTimings = "timings"
- CacheEventResources = "event_resources"
- CacheStatQueueProfiles = "statqueue_profiles"
- CacheStatQueues = "statqueues"
- CacheThresholdProfiles = "threshold_profiles"
- CacheThresholds = "thresholds"
- CacheFilters = "filters"
- CacheSupplierProfiles = "supplier_profiles"
- CacheAttributeProfiles = "attribute_profiles"
- CacheChargerProfiles = "charger_profiles"
- CacheResourceFilterIndexes = "resource_filter_indexes"
- CacheStatFilterIndexes = "stat_filter_indexes"
- CacheThresholdFilterIndexes = "threshold_filter_indexes"
- CacheSupplierFilterIndexes = "supplier_filter_indexes"
- CacheAttributeFilterIndexes = "attribute_filter_indexes"
- CacheChargerFilterIndexes = "charger_filter_indexes"
- CacheDiameterMessages = "diameter_messages"
- MetaPrecaching = "*precaching"
- MetaReady = "*ready"
+ CacheDestinations = "destinations"
+ CacheReverseDestinations = "reverse_destinations"
+ CacheRatingPlans = "rating_plans"
+ CacheRatingProfiles = "rating_profiles"
+ CacheActions = "actions"
+ CacheActionPlans = "action_plans"
+ CacheAccountActionPlans = "account_action_plans"
+ CacheActionTriggers = "action_triggers"
+ CacheSharedGroups = "shared_groups"
+ CacheAliases = "aliases"
+ CacheReverseAliases = "reverse_aliases"
+ CacheDerivedChargers = "derived_chargers"
+ CacheResources = "resources"
+ CacheResourceProfiles = "resource_profiles"
+ CacheTimings = "timings"
+ CacheEventResources = "event_resources"
+ CacheStatQueueProfiles = "statqueue_profiles"
+ CacheStatQueues = "statqueues"
+ CacheThresholdProfiles = "threshold_profiles"
+ CacheThresholds = "thresholds"
+ CacheFilters = "filters"
+ CacheSupplierProfiles = "supplier_profiles"
+ CacheAttributeProfiles = "attribute_profiles"
+ CacheChargerProfiles = "charger_profiles"
+ CacheDispatcherProfiles = "dispatcher_profiles"
+ CacheResourceFilterIndexes = "resource_filter_indexes"
+ CacheStatFilterIndexes = "stat_filter_indexes"
+ CacheThresholdFilterIndexes = "threshold_filter_indexes"
+ CacheSupplierFilterIndexes = "supplier_filter_indexes"
+ CacheAttributeFilterIndexes = "attribute_filter_indexes"
+ CacheChargerFilterIndexes = "charger_filter_indexes"
+ CacheDispatcherFilterIndexes = "dispatcher_filter_indexes"
+ CacheDiameterMessages = "diameter_messages"
+ MetaPrecaching = "*precaching"
+ MetaReady = "*ready"
)
// Prefix for indexing
const (
- ResourceFilterIndexes = "rfi_"
- StatFilterIndexes = "sfi_"
- ThresholdFilterIndexes = "tfi_"
- SupplierFilterIndexes = "spi_"
- AttributeFilterIndexes = "afi_"
- ChargerFilterIndexes = "cfi_"
+ ResourceFilterIndexes = "rfi_"
+ StatFilterIndexes = "sfi_"
+ ThresholdFilterIndexes = "tfi_"
+ SupplierFilterIndexes = "spi_"
+ AttributeFilterIndexes = "afi_"
+ ChargerFilterIndexes = "cfi_"
+ DispatcherFilterIndexes = "dfi_"
)
// Agents