From a65bf218cb16eae22c5ae6d54c23d38291b36a68 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 30 Jan 2019 17:54:03 +0200 Subject: [PATCH] Started adding new dispatcherS profiles --- apier/v1/dispatcher_it_test.go | 4 +- engine/libdispatcher.go | 41 ++++++++++++++-- engine/model_helpers.go | 84 +++++++++++++++++---------------- engine/model_helpers_test.go | 2 +- engine/onstor_it_test.go | 4 +- engine/storage_csv.go | 39 ++++++++++----- engine/tploader.go | 2 - migrator/dispatchers_it_test.go | 4 +- 8 files changed, 113 insertions(+), 67 deletions(-) diff --git a/apier/v1/dispatcher_it_test.go b/apier/v1/dispatcher_it_test.go index 300722ae2..b9c6478d3 100644 --- a/apier/v1/dispatcher_it_test.go +++ b/apier/v1/dispatcher_it_test.go @@ -121,8 +121,8 @@ func testDispatcherSSetDispatcherProfile(t *testing.T) { ID: "Dsp1", FilterIDs: []string{"*string:Account:1001"}, Strategy: utils.MetaFirst, - Hosts: []string{"192.168.56.203", "192.168.56.204"}, - Weight: 20, + // Hosts: []string{"192.168.56.203", "192.168.56.204"}, + Weight: 20, } if err := dispatcherRPC.Call("ApierV1.SetDispatcherProfile", diff --git a/engine/libdispatcher.go b/engine/libdispatcher.go index 540e1b2f8..a04dbb318 100644 --- a/engine/libdispatcher.go +++ b/engine/libdispatcher.go @@ -24,22 +24,55 @@ import ( "github.com/cgrates/cgrates/utils" ) +// // DispatcherProfile is the config for one Dispatcher +// type DispatcherProfile struct { +// Tenant string +// ID string +// FilterIDs []string +// ActivationInterval *utils.ActivationInterval // Activation interval +// Strategy string +// Hosts []string // perform data aliasing based on these Attributes +// Weight float64 +// } + +// func (dP *DispatcherProfile) TenantID() string { +// return utils.ConcatenatedKey(dP.Tenant, dP.ID) +// } + +// // ChargerProfiles is a sortable list of Charger profiles +// type DispatcherProfiles []*DispatcherProfile + +// // Sort is part of sort interface, sort based on Weight +// func (dps DispatcherProfiles) Sort() { +// sort.Slice(dps, func(i, j int) bool { return dps[i].Weight > dps[j].Weight }) +// } + +type DispatcherConn struct { + ID string + FilterIDs []string + Weight float64 // applied in case of multiple connections need to be ordered + Params map[string]interface{} // additional parameters stored for a session + Blocker bool // no connection after this one +} + // DispatcherProfile is the config for one Dispatcher type DispatcherProfile struct { Tenant string ID string + Subsystems []string FilterIDs []string - ActivationInterval *utils.ActivationInterval // Activation interval + ActivationInterval *utils.ActivationInterval // activation interval Strategy string - Hosts []string // perform data aliasing based on these Attributes - Weight float64 + StrategyParams map[string]interface{} // ie for distribution, set here the pool weights + Weight float64 // used for profile sorting on match + Connections []*DispatcherConn // dispatch to these connections } func (dP *DispatcherProfile) TenantID() string { return utils.ConcatenatedKey(dP.Tenant, dP.ID) } -// ChargerProfiles is a sortable list of Charger profiles +// DispatcherProfiles is a sortable list of Dispatcher profiles type DispatcherProfiles []*DispatcherProfile // Sort is part of sort interface, sort based on Weight diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 0181c3005..370dafc37 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -1509,38 +1509,39 @@ func (tps TpResources) AsTPResources() (result []*utils.TPResource) { } func APItoModelResource(rl *utils.TPResource) (mdls TpResources) { - if rl != nil { - for i, fltr := range rl.FilterIDs { - mdl := &TpResource{ - Tpid: rl.TPid, - Tenant: rl.Tenant, - ID: rl.ID, - Blocker: rl.Blocker, - Stored: rl.Stored, - } - if i == 0 { - mdl.UsageTTL = rl.UsageTTL - mdl.Weight = rl.Weight - mdl.Limit = rl.Limit - mdl.AllocationMessage = rl.AllocationMessage - if rl.ActivationInterval != nil { - if rl.ActivationInterval.ActivationTime != "" { - mdl.ActivationInterval = rl.ActivationInterval.ActivationTime - } - if rl.ActivationInterval.ExpiryTime != "" { - mdl.ActivationInterval += utils.INFIELD_SEP + rl.ActivationInterval.ExpiryTime - } - } - for i, val := range rl.ThresholdIDs { - if i != 0 { - mdl.ThresholdIDs += utils.INFIELD_SEP - } - mdl.ThresholdIDs += val - } - } - mdl.FilterIDs = fltr - mdls = append(mdls, mdl) + if rl == nil { + return + } + for i, fltr := range rl.FilterIDs { + mdl := &TpResource{ + Tpid: rl.TPid, + Tenant: rl.Tenant, + ID: rl.ID, + Blocker: rl.Blocker, + Stored: rl.Stored, } + if i == 0 { + mdl.UsageTTL = rl.UsageTTL + mdl.Weight = rl.Weight + mdl.Limit = rl.Limit + mdl.AllocationMessage = rl.AllocationMessage + if rl.ActivationInterval != nil { + if rl.ActivationInterval.ActivationTime != "" { + mdl.ActivationInterval = rl.ActivationInterval.ActivationTime + } + if rl.ActivationInterval.ExpiryTime != "" { + mdl.ActivationInterval += utils.INFIELD_SEP + rl.ActivationInterval.ExpiryTime + } + } + for i, val := range rl.ThresholdIDs { + if i != 0 { + mdl.ThresholdIDs += utils.INFIELD_SEP + } + mdl.ThresholdIDs += val + } + } + mdl.FilterIDs = fltr + mdls = append(mdls, mdl) } return } @@ -2084,7 +2085,8 @@ func (tps TpSuppliers) AsTPSuppliers() (result []*utils.TPSupplierProfile) { suppliersMap := make(map[string]map[string]*utils.TPSupplier) sortingParameterMap := make(map[string]utils.StringMap) for _, tp := range tps { - th, found := mst[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] + tenID := (&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID() + th, found := mst[tenID] if !found { th = &utils.TPSupplierProfile{ TPid: tp.Tpid, @@ -2095,10 +2097,10 @@ func (tps TpSuppliers) AsTPSuppliers() (result []*utils.TPSupplierProfile) { } } if tp.SupplierID != "" { - if _, has := suppliersMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()]; !has { + if _, has := suppliersMap[tenID]; !has { suppliersMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] = make(map[string]*utils.TPSupplier) } - sup, found := suppliersMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][tp.SupplierID] + sup, found := suppliersMap[tenID][tp.SupplierID] if !found { sup = &utils.TPSupplier{ ID: tp.SupplierID, @@ -2132,12 +2134,12 @@ func (tps TpSuppliers) AsTPSuppliers() (result []*utils.TPSupplierProfile) { suppliersMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][tp.SupplierID] = sup } if tp.SortingParameters != "" { - if _, has := sortingParameterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()]; !has { - sortingParameterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] = make(utils.StringMap) + if _, has := sortingParameterMap[tenID]; !has { + sortingParameterMap[tenID] = make(utils.StringMap) } sortingParamSplit := strings.Split(tp.SortingParameters, utils.INFIELD_SEP) for _, sortingParam := range sortingParamSplit { - sortingParameterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][sortingParam] = true + sortingParameterMap[tenID][sortingParam] = true } } if tp.Weight != 0 { @@ -2770,14 +2772,14 @@ func APItoDispatcherProfile(tpDPP *utils.TPDispatcherProfile, timezone string) ( Weight: tpDPP.Weight, Strategy: tpDPP.Strategy, FilterIDs: make([]string, len(tpDPP.FilterIDs)), - Hosts: make([]string, len(tpDPP.Hosts)), + // Hosts: make([]string, len(tpDPP.Hosts)), } for i, fli := range tpDPP.FilterIDs { dpp.FilterIDs[i] = fli } - for i, host := range tpDPP.Hosts { - dpp.Hosts[i] = host - } + // for i, host := range tpDPP.Hosts { + // dpp.Hosts[i] = host + // } if tpDPP.ActivationInterval != nil { if dpp.ActivationInterval, err = tpDPP.ActivationInterval.AsActivationInterval(timezone); err != nil { return nil, err diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index b989a3442..cd07cf332 100644 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -1791,7 +1791,7 @@ func TestAPItoDispatcherProfile(t *testing.T) { ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), }, - Hosts: []string{"localhost", "192.168.56.203"}, + // Hosts: []string{"localhost", "192.168.56.203"}, Weight: 20, } if rcv, err := APItoDispatcherProfile(tpDPP, "UTC"); err != nil { diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 5c1f3dd18..423f858b6 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -2628,8 +2628,8 @@ func testOnStorITDispatcherProfile(t *testing.T) { ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), }, Strategy: utils.MetaFirst, - Hosts: []string{"192.168.56.203"}, - Weight: 20, + // Hosts: []string{"192.168.56.203"}, + Weight: 20, } if _, rcvErr := onStor.GetDispatcherProfile("cgrates.org", "Dsp1", true, false, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { diff --git a/engine/storage_csv.go b/engine/storage_csv.go index 70279fef0..ac07edfce 100644 --- a/engine/storage_csv.go +++ b/engine/storage_csv.go @@ -42,19 +42,32 @@ func NewFileCSVStorage(sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn string) *CSVStorage { - c := new(CSVStorage) - c.sep = sep - c.readerFunc = openFileCSVStorage - c.destinationsFn, c.timingsFn, c.ratesFn, c.destinationratesFn, c.destinationratetimingsFn, c.ratingprofilesFn, - c.sharedgroupsFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, - c.derivedChargersFn, c.usersFn, c.aliasesFn, c.resProfilesFn, c.statsFn, c.thresholdsFn, - c.filterFn, c.suppProfilesFn, c.attributeProfilesFn, - c.chargerProfilesFn, c.dispatcherProfilesFn = destinationsFn, timingsFn, - ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, - actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, - usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, - attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn - return c + return &CSVStorage{ + sep: sep, + readerFunc: openFileCSVStorage, + destinationsFn: destinationsFn, + timingsFn: timingsFn, + ratesFn: ratesFn, + destinationratesFn: destinationratesFn, + destinationratetimingsFn: destinationratetimingsFn, + ratingprofilesFn: ratingprofilesFn, + sharedgroupsFn: sharedgroupsFn, + actionsFn: actionsFn, + actiontimingsFn: actiontimingsFn, + actiontriggersFn: actiontriggersFn, + accountactionsFn: accountactionsFn, + derivedChargersFn: derivedChargersFn, + usersFn: usersFn, + aliasesFn: aliasesFn, + resProfilesFn: resProfilesFn, + statsFn: statsFn, + thresholdsFn: thresholdsFn, + filterFn: filterFn, + suppProfilesFn: suppProfilesFn, + attributeProfilesFn: attributeProfilesFn, + chargerProfilesFn: chargerProfilesFn, + dispatcherProfilesFn: dispatcherProfilesFn, + } } func NewStringCSVStorage(sep rune, diff --git a/engine/tploader.go b/engine/tploader.go index 6f342d82c..313de18b8 100644 --- a/engine/tploader.go +++ b/engine/tploader.go @@ -18,8 +18,6 @@ along with this program. If not, see package engine -import () - // TPReader is the data source for TPLoader type TPReader interface { // Read will read one record from data source diff --git a/migrator/dispatchers_it_test.go b/migrator/dispatchers_it_test.go index 4a5edd91f..fac7e94f1 100644 --- a/migrator/dispatchers_it_test.go +++ b/migrator/dispatchers_it_test.go @@ -179,8 +179,8 @@ func testDspITMigrateAndMove(t *testing.T) { ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), }, Strategy: utils.MetaRandom, - Hosts: []string{"localhost", "192.168.56.203"}, - Weight: 20, + // Hosts: []string{"localhost", "192.168.56.203"}, + Weight: 20, } if err := dspMigrator.dmIN.DataManager().SetDispatcherProfile(dspPrf, false); err != nil { t.Error(err)