Started adding new dispatcherS profiles

This commit is contained in:
Trial97
2019-01-30 17:54:03 +02:00
committed by Dan Christian Bogos
parent fb1a12a4b7
commit a65bf218cb
8 changed files with 113 additions and 67 deletions

View File

@@ -121,8 +121,8 @@ func testDispatcherSSetDispatcherProfile(t *testing.T) {
ID: "Dsp1",
FilterIDs: []string{"*string:Account:1001"},
Strategy: utils.MetaFirst,
Hosts: []string{"192.168.56.203", "192.168.56.204"},
Weight: 20,
// Hosts: []string{"192.168.56.203", "192.168.56.204"},
Weight: 20,
}
if err := dispatcherRPC.Call("ApierV1.SetDispatcherProfile",

View File

@@ -24,22 +24,55 @@ import (
"github.com/cgrates/cgrates/utils"
)
// // DispatcherProfile is the config for one Dispatcher
// type DispatcherProfile struct {
// Tenant string
// ID string
// FilterIDs []string
// ActivationInterval *utils.ActivationInterval // Activation interval
// Strategy string
// Hosts []string // perform data aliasing based on these Attributes
// Weight float64
// }
// func (dP *DispatcherProfile) TenantID() string {
// return utils.ConcatenatedKey(dP.Tenant, dP.ID)
// }
// // ChargerProfiles is a sortable list of Charger profiles
// type DispatcherProfiles []*DispatcherProfile
// // Sort is part of sort interface, sort based on Weight
// func (dps DispatcherProfiles) Sort() {
// sort.Slice(dps, func(i, j int) bool { return dps[i].Weight > dps[j].Weight })
// }
type DispatcherConn struct {
ID string
FilterIDs []string
Weight float64 // applied in case of multiple connections need to be ordered
Params map[string]interface{} // additional parameters stored for a session
Blocker bool // no connection after this one
}
// DispatcherProfile is the config for one Dispatcher
type DispatcherProfile struct {
Tenant string
ID string
Subsystems []string
FilterIDs []string
ActivationInterval *utils.ActivationInterval // Activation interval
ActivationInterval *utils.ActivationInterval // activation interval
Strategy string
Hosts []string // perform data aliasing based on these Attributes
Weight float64
StrategyParams map[string]interface{} // ie for distribution, set here the pool weights
Weight float64 // used for profile sorting on match
Connections []*DispatcherConn // dispatch to these connections
}
func (dP *DispatcherProfile) TenantID() string {
return utils.ConcatenatedKey(dP.Tenant, dP.ID)
}
// ChargerProfiles is a sortable list of Charger profiles
// DispatcherProfiles is a sortable list of Dispatcher profiles
type DispatcherProfiles []*DispatcherProfile
// Sort is part of sort interface, sort based on Weight

View File

@@ -1509,38 +1509,39 @@ func (tps TpResources) AsTPResources() (result []*utils.TPResource) {
}
func APItoModelResource(rl *utils.TPResource) (mdls TpResources) {
if rl != nil {
for i, fltr := range rl.FilterIDs {
mdl := &TpResource{
Tpid: rl.TPid,
Tenant: rl.Tenant,
ID: rl.ID,
Blocker: rl.Blocker,
Stored: rl.Stored,
}
if i == 0 {
mdl.UsageTTL = rl.UsageTTL
mdl.Weight = rl.Weight
mdl.Limit = rl.Limit
mdl.AllocationMessage = rl.AllocationMessage
if rl.ActivationInterval != nil {
if rl.ActivationInterval.ActivationTime != "" {
mdl.ActivationInterval = rl.ActivationInterval.ActivationTime
}
if rl.ActivationInterval.ExpiryTime != "" {
mdl.ActivationInterval += utils.INFIELD_SEP + rl.ActivationInterval.ExpiryTime
}
}
for i, val := range rl.ThresholdIDs {
if i != 0 {
mdl.ThresholdIDs += utils.INFIELD_SEP
}
mdl.ThresholdIDs += val
}
}
mdl.FilterIDs = fltr
mdls = append(mdls, mdl)
if rl == nil {
return
}
for i, fltr := range rl.FilterIDs {
mdl := &TpResource{
Tpid: rl.TPid,
Tenant: rl.Tenant,
ID: rl.ID,
Blocker: rl.Blocker,
Stored: rl.Stored,
}
if i == 0 {
mdl.UsageTTL = rl.UsageTTL
mdl.Weight = rl.Weight
mdl.Limit = rl.Limit
mdl.AllocationMessage = rl.AllocationMessage
if rl.ActivationInterval != nil {
if rl.ActivationInterval.ActivationTime != "" {
mdl.ActivationInterval = rl.ActivationInterval.ActivationTime
}
if rl.ActivationInterval.ExpiryTime != "" {
mdl.ActivationInterval += utils.INFIELD_SEP + rl.ActivationInterval.ExpiryTime
}
}
for i, val := range rl.ThresholdIDs {
if i != 0 {
mdl.ThresholdIDs += utils.INFIELD_SEP
}
mdl.ThresholdIDs += val
}
}
mdl.FilterIDs = fltr
mdls = append(mdls, mdl)
}
return
}
@@ -2084,7 +2085,8 @@ func (tps TpSuppliers) AsTPSuppliers() (result []*utils.TPSupplierProfile) {
suppliersMap := make(map[string]map[string]*utils.TPSupplier)
sortingParameterMap := make(map[string]utils.StringMap)
for _, tp := range tps {
th, found := mst[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()]
tenID := (&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()
th, found := mst[tenID]
if !found {
th = &utils.TPSupplierProfile{
TPid: tp.Tpid,
@@ -2095,10 +2097,10 @@ func (tps TpSuppliers) AsTPSuppliers() (result []*utils.TPSupplierProfile) {
}
}
if tp.SupplierID != "" {
if _, has := suppliersMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()]; !has {
if _, has := suppliersMap[tenID]; !has {
suppliersMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] = make(map[string]*utils.TPSupplier)
}
sup, found := suppliersMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][tp.SupplierID]
sup, found := suppliersMap[tenID][tp.SupplierID]
if !found {
sup = &utils.TPSupplier{
ID: tp.SupplierID,
@@ -2132,12 +2134,12 @@ func (tps TpSuppliers) AsTPSuppliers() (result []*utils.TPSupplierProfile) {
suppliersMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][tp.SupplierID] = sup
}
if tp.SortingParameters != "" {
if _, has := sortingParameterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()]; !has {
sortingParameterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()] = make(utils.StringMap)
if _, has := sortingParameterMap[tenID]; !has {
sortingParameterMap[tenID] = make(utils.StringMap)
}
sortingParamSplit := strings.Split(tp.SortingParameters, utils.INFIELD_SEP)
for _, sortingParam := range sortingParamSplit {
sortingParameterMap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][sortingParam] = true
sortingParameterMap[tenID][sortingParam] = true
}
}
if tp.Weight != 0 {
@@ -2770,14 +2772,14 @@ func APItoDispatcherProfile(tpDPP *utils.TPDispatcherProfile, timezone string) (
Weight: tpDPP.Weight,
Strategy: tpDPP.Strategy,
FilterIDs: make([]string, len(tpDPP.FilterIDs)),
Hosts: make([]string, len(tpDPP.Hosts)),
// Hosts: make([]string, len(tpDPP.Hosts)),
}
for i, fli := range tpDPP.FilterIDs {
dpp.FilterIDs[i] = fli
}
for i, host := range tpDPP.Hosts {
dpp.Hosts[i] = host
}
// for i, host := range tpDPP.Hosts {
// dpp.Hosts[i] = host
// }
if tpDPP.ActivationInterval != nil {
if dpp.ActivationInterval, err = tpDPP.ActivationInterval.AsActivationInterval(timezone); err != nil {
return nil, err

View File

@@ -1791,7 +1791,7 @@ func TestAPItoDispatcherProfile(t *testing.T) {
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
},
Hosts: []string{"localhost", "192.168.56.203"},
// Hosts: []string{"localhost", "192.168.56.203"},
Weight: 20,
}
if rcv, err := APItoDispatcherProfile(tpDPP, "UTC"); err != nil {

View File

@@ -2628,8 +2628,8 @@ func testOnStorITDispatcherProfile(t *testing.T) {
ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
},
Strategy: utils.MetaFirst,
Hosts: []string{"192.168.56.203"},
Weight: 20,
// Hosts: []string{"192.168.56.203"},
Weight: 20,
}
if _, rcvErr := onStor.GetDispatcherProfile("cgrates.org", "Dsp1",
true, false, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound {

View File

@@ -42,19 +42,32 @@ func NewFileCSVStorage(sep rune,
destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn,
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, usersFn, aliasesFn,
resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn, attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn string) *CSVStorage {
c := new(CSVStorage)
c.sep = sep
c.readerFunc = openFileCSVStorage
c.destinationsFn, c.timingsFn, c.ratesFn, c.destinationratesFn, c.destinationratetimingsFn, c.ratingprofilesFn,
c.sharedgroupsFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn,
c.derivedChargersFn, c.usersFn, c.aliasesFn, c.resProfilesFn, c.statsFn, c.thresholdsFn,
c.filterFn, c.suppProfilesFn, c.attributeProfilesFn,
c.chargerProfilesFn, c.dispatcherProfilesFn = destinationsFn, timingsFn,
ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn,
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn,
usersFn, aliasesFn, resProfilesFn, statsFn, thresholdsFn, filterFn, suppProfilesFn,
attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn
return c
return &CSVStorage{
sep: sep,
readerFunc: openFileCSVStorage,
destinationsFn: destinationsFn,
timingsFn: timingsFn,
ratesFn: ratesFn,
destinationratesFn: destinationratesFn,
destinationratetimingsFn: destinationratetimingsFn,
ratingprofilesFn: ratingprofilesFn,
sharedgroupsFn: sharedgroupsFn,
actionsFn: actionsFn,
actiontimingsFn: actiontimingsFn,
actiontriggersFn: actiontriggersFn,
accountactionsFn: accountactionsFn,
derivedChargersFn: derivedChargersFn,
usersFn: usersFn,
aliasesFn: aliasesFn,
resProfilesFn: resProfilesFn,
statsFn: statsFn,
thresholdsFn: thresholdsFn,
filterFn: filterFn,
suppProfilesFn: suppProfilesFn,
attributeProfilesFn: attributeProfilesFn,
chargerProfilesFn: chargerProfilesFn,
dispatcherProfilesFn: dispatcherProfilesFn,
}
}
func NewStringCSVStorage(sep rune,

View File

@@ -18,8 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import ()
// TPReader is the data source for TPLoader
type TPReader interface {
// Read will read one record from data source

View File

@@ -179,8 +179,8 @@ func testDspITMigrateAndMove(t *testing.T) {
ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
},
Strategy: utils.MetaRandom,
Hosts: []string{"localhost", "192.168.56.203"},
Weight: 20,
// Hosts: []string{"localhost", "192.168.56.203"},
Weight: 20,
}
if err := dspMigrator.dmIN.DataManager().SetDispatcherProfile(dspPrf, false); err != nil {
t.Error(err)