Merge pull request #1834 from TeoV/master

Add Attribute connection from DispatcherS through ConnManager
This commit is contained in:
Dan Christian Bogos
2019-12-18 15:26:49 +01:00
committed by GitHub
32 changed files with 155 additions and 169 deletions

View File

@@ -524,7 +524,7 @@ func main() {
})
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan)
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, internalDispatcherSChan)
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager.GetConnMgr())
chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server,
internalChargerSChan, connManager.GetConnMgr())
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan)

View File

@@ -1528,7 +1528,7 @@ func TestDfDispatcherSJsonCfg(t *testing.T) {
Indexed_selects: utils.BoolPointer(true),
String_indexed_fields: nil,
Prefix_indexed_fields: &[]string{},
Attributes_conns: &[]*RemoteHostJson{},
Attributes_conns: &[]string{},
}
if cfg, err := dfCgrJsonCfg.DispatcherSJsonCfg(); err != nil {
t.Error(err)

View File

@@ -1526,7 +1526,7 @@ func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) {
IndexedSelects: true,
StringIndexedFields: nil,
PrefixIndexedFields: &[]string{},
AttributeSConns: []*RemoteHost{},
AttributeSConns: []string{},
}
if !reflect.DeepEqual(cgrCfg.dispatcherSCfg, eDspSCfg) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg)

View File

@@ -436,13 +436,7 @@ func (cfg *CGRConfig) checkConfigSanity() error {
// DataDB sanity checks
if cfg.dataDbCfg.DataDbType == utils.INTERNAL {
for key, config := range cfg.cacheCfg {
if key == utils.CacheDiameterMessages || key == utils.CacheClosedSessions || key == utils.CacheRPCConnections {
if config.Limit == 0 {
return fmt.Errorf("<%s> %s needs to be != 0 when DataBD is *internal, found 0.", utils.CacheS, key)
}
continue
}
if config.Limit != 0 {
if utils.CacheDataDBPartitions.Has(key) && config.Limit != 0 {
return fmt.Errorf("<%s> %s needs to be 0 when DataBD is *internal, received : %d", utils.CacheS, key, config.Limit)
}
}
@@ -481,5 +475,16 @@ func (cfg *CGRConfig) checkConfigSanity() error {
return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ApierV1, connID)
}
}
// Dispatcher sanity check
if cfg.dispatcherSCfg.Enabled {
for _, connID := range cfg.dispatcherSCfg.AttributeSConns {
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.attributeSCfg.Enabled {
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.AttributeS, utils.DispatcherS)
}
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.DispatcherS, connID)
}
}
}
return nil
}

View File

@@ -578,36 +578,26 @@ func TestConfigSanityStorDB(t *testing.T) {
func TestConfigSanityDataDB(t *testing.T) {
cfg, _ = NewDefaultCGRConfig()
cfg.dataDbCfg.DataDbType = utils.INTERNAL
cfg.cacheCfg = CacheCfg{
utils.CacheDiameterMessages: &CacheParamCfg{
Limit: 0,
},
}
expected := "<CacheS> *diameter_messages needs to be != 0 when DataBD is *internal, found 0."
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.cacheCfg = CacheCfg{
utils.CacheDiameterMessages: &CacheParamCfg{
Limit: 1,
},
}
if err := cfg.checkConfigSanity(); err != nil {
t.Errorf("Expecting: nil received: %+q", err)
}
cfg.cacheCfg = CacheCfg{
"test": &CacheParamCfg{
Limit: 1,
},
}
expected = "<CacheS> test needs to be 0 when DataBD is *internal, received : 1"
if err := cfg.checkConfigSanity(); err != nil {
t.Error(err)
}
cfg.cacheCfg = CacheCfg{
utils.CacheAccounts: &CacheParamCfg{
Limit: 1,
},
}
expected := "<CacheS> *accounts needs to be 0 when DataBD is *internal, received : 1"
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.cacheCfg["test"].Limit = 0
cfg.cacheCfg[utils.CacheAccounts].Limit = 0
cfg.resourceSCfg.Enabled = true
expected = "<ResourceS> StoreInterval needs to be -1 when DataBD is *internal, received : 0"
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {

View File

@@ -18,13 +18,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
import "github.com/cgrates/cgrates/utils"
// DispatcherSCfg is the configuration of dispatcher service
type DispatcherSCfg struct {
Enabled bool
IndexedSelects bool
StringIndexedFields *[]string
PrefixIndexedFields *[]string
AttributeSConns []*RemoteHost
AttributeSConns []string
}
func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err error) {
@@ -52,10 +54,14 @@ func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err erro
dps.PrefixIndexedFields = &pif
}
if jsnCfg.Attributes_conns != nil {
dps.AttributeSConns = make([]*RemoteHost, len(*jsnCfg.Attributes_conns))
for idx, jsnHaCfg := range *jsnCfg.Attributes_conns {
dps.AttributeSConns[idx] = NewDfltRemoteHost()
dps.AttributeSConns[idx].loadFromJsonCfg(jsnHaCfg)
dps.AttributeSConns = make([]string, len(*jsnCfg.Attributes_conns))
for idx, connID := range *jsnCfg.Attributes_conns {
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
if connID == utils.MetaInternal {
dps.AttributeSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)
} else {
dps.AttributeSConns[idx] = connID
}
}
}
return nil

View File

@@ -501,7 +501,7 @@ type DispatcherSJsonCfg struct {
Indexed_selects *bool
String_indexed_fields *[]string
Prefix_indexed_fields *[]string
Attributes_conns *[]*RemoteHostJson
Attributes_conns *[]string
}
type LoaderCfgJson struct {

View File

@@ -44,9 +44,7 @@
"dispatchers":{
"enabled": true,
"attributes_conns": [
{"address": "*internal"}
]
"attributes_conns": ["*internal"]
},

View File

@@ -62,9 +62,7 @@
"dispatchers":{
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
"attributes_conns": ["*internal"],
},
"apier": {

View File

@@ -62,9 +62,7 @@
"dispatchers":{
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
"attributes_conns": ["*internal"],
},
"apier": {

View File

@@ -72,9 +72,7 @@
"dispatchers":{
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
"attributes_conns": ["*internal"],
},

View File

@@ -72,9 +72,7 @@
"dispatchers":{
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
"attributes_conns": ["*internal"],
},

View File

@@ -30,7 +30,7 @@ func (dS *DispatcherService) AttributeSv1Ping(args *utils.CGREventWithArgDispatc
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -54,7 +54,7 @@ func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *engine.AttrA
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -77,7 +77,7 @@ func (dS *DispatcherService) AttributeSv1ProcessEvent(args *engine.AttrArgsProce
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -32,7 +32,7 @@ func (dS *DispatcherService) CacheSv1Ping(args *utils.CGREventWithArgDispatcher,
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -56,7 +56,7 @@ func (dS *DispatcherService) CacheSv1GetItemIDs(args *utils.ArgsGetCacheItemIDsW
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -80,7 +80,7 @@ func (dS *DispatcherService) CacheSv1HasItem(args *utils.ArgsGetCacheItemWithArg
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -104,7 +104,7 @@ func (dS *DispatcherService) CacheSv1GetItemExpiryTime(args *utils.ArgsGetCacheI
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -128,7 +128,7 @@ func (dS *DispatcherService) CacheSv1RemoveItem(args *utils.ArgsGetCacheItemWith
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -152,7 +152,7 @@ func (dS *DispatcherService) CacheSv1Clear(args *utils.AttrCacheIDsWithArgDispat
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -175,7 +175,7 @@ func (dS *DispatcherService) CacheSv1FlushCache(args utils.AttrReloadCacheWithAr
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -199,7 +199,7 @@ func (dS *DispatcherService) CacheSv1GetCacheStats(args *utils.AttrCacheIDsWithA
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -222,7 +222,7 @@ func (dS *DispatcherService) CacheSv1PrecacheStatus(args *utils.AttrCacheIDsWith
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -246,7 +246,7 @@ func (dS *DispatcherService) CacheSv1HasGroup(args *utils.ArgsGetGroupWithArgDis
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -270,7 +270,7 @@ func (dS *DispatcherService) CacheSv1GetGroupItemIDs(args *utils.ArgsGetGroupWit
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -293,7 +293,7 @@ func (dS *DispatcherService) CacheSv1RemoveGroup(args *utils.ArgsGetGroupWithArg
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -316,7 +316,7 @@ func (dS *DispatcherService) CacheSv1ReloadCache(args utils.AttrReloadCacheWithA
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -339,7 +339,7 @@ func (dS *DispatcherService) CacheSv1LoadCache(args utils.AttrReloadCacheWithArg
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -35,7 +35,7 @@ func (dS *DispatcherService) CDRsV1Ping(args *utils.CGREventWithArgDispatcher,
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -57,7 +57,7 @@ func (dS *DispatcherService) CDRsV1GetCDRs(args utils.RPCCDRsFilterWithArgDispat
if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -79,7 +79,7 @@ func (dS *DispatcherService) CDRsV1GetCDRsCount(args *utils.RPCCDRsFilterWithArg
if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -101,7 +101,7 @@ func (dS *DispatcherService) CDRsV1StoreSessionCost(args *engine.AttrCDRSStoreSM
if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -123,7 +123,7 @@ func (dS *DispatcherService) CDRsV1RateCDRs(args *engine.ArgRateCDRs, reply *str
if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -145,7 +145,7 @@ func (dS *DispatcherService) CDRsV1ProcessExternalCDR(args *engine.ExternalCDRWi
if args.Tenant != utils.EmptyString {
tnt = args.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -167,7 +167,7 @@ func (dS *DispatcherService) CDRsV1ProcessEvent(args *engine.ArgV1ProcessEvent,
if args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -189,7 +189,7 @@ func (dS *DispatcherService) CDRsV1ProcessCDR(args *engine.CDRWithArgDispatcher,
if args.Tenant != utils.EmptyString {
tnt = args.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -28,7 +28,7 @@ func (dS *DispatcherService) ChargerSv1Ping(args *utils.CGREventWithArgDispatche
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -51,7 +51,7 @@ func (dS *DispatcherService) ChargerSv1GetChargersForEvent(args *utils.CGREventW
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -74,7 +74,7 @@ func (dS *DispatcherService) ChargerSv1ProcessEvent(args *utils.CGREventWithArgD
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -30,7 +30,7 @@ func (dS *DispatcherService) ConfigSv1GetJSONSection(args *config.StringWithArgD
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -52,7 +52,7 @@ func (dS *DispatcherService) ConfigSv1ReloadConfig(args *config.ConfigReloadWith
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -30,7 +30,7 @@ func (dS *DispatcherService) CoreSv1Status(args *utils.TenantWithArgDispatcher,
if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -52,7 +52,7 @@ func (dS *DispatcherService) CoreSv1Ping(args *utils.CGREventWithArgDispatcher,
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -34,21 +34,19 @@ import (
// NewDispatcherService constructs a DispatcherService
func NewDispatcherService(dm *engine.DataManager,
cfg *config.CGRConfig, fltrS *engine.FilterS,
attrS *rpcclient.RPCPool) (*DispatcherService, error) {
if attrS != nil && reflect.ValueOf(attrS).IsNil() {
attrS = nil
}
connMgr *engine.ConnManager) (*DispatcherService, error) {
return &DispatcherService{dm: dm, cfg: cfg,
fltrS: fltrS, attrS: attrS}, nil
fltrS: fltrS, connMgr: connMgr}, nil
}
// DispatcherService is the service handling dispatching towards internal components
// designed to handle automatic partitioning and failover
type DispatcherService struct {
dm *engine.DataManager
cfg *config.CGRConfig
fltrS *engine.FilterS
attrS *rpcclient.RPCPool // used for API auth
dm *engine.DataManager
cfg *config.CGRConfig
fltrS *engine.FilterS
connMgr *engine.ConnManager
}
// ListenAndServe will initialize the service
@@ -68,7 +66,8 @@ func (dS *DispatcherService) Shutdown() error {
func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent,
reply *engine.AttrSProcessEventReply) (err error) {
if err = dS.attrS.Call(utils.AttributeSv1ProcessEvent,
if err = dS.connMgr.Call(dS.cfg.DispatcherSCfg().AttributeSConns, nil,
utils.AttributeSv1ProcessEvent,
&engine.AttrArgsProcessEvent{
Context: utils.StringPointer(utils.MetaAuth),
CGREvent: ev}, reply); err != nil {
@@ -225,7 +224,7 @@ func (dS *DispatcherService) V1Apier(apier interface{}, args *utils.MethodParame
}
tenant := utils.FirstNonEmpty(utils.IfaceAsString(parameters[utils.Tenant]), config.CgrConfig().GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if argD == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -31,7 +31,7 @@ func (dS *DispatcherService) GuardianSv1Ping(args *utils.CGREventWithArgDispatch
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -56,7 +56,7 @@ func (dS *DispatcherService) GuardianSv1RemoteLock(args AttrRemoteLockWithApiKey
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -80,7 +80,7 @@ func (dS *DispatcherService) GuardianSv1RemoteUnlock(args AttrRemoteUnlockWithAp
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -27,7 +27,7 @@ func (dS *DispatcherService) RALsV1Ping(args *utils.CGREventWithArgDispatcher, r
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -46,7 +46,7 @@ func (dS *DispatcherService) RALsV1Ping(args *utils.CGREventWithArgDispatcher, r
func (dS *DispatcherService) RALsV1GetRatingPlansCost(args *utils.RatingPlanCostArg, rpl *RatingPlanCost) (err error) {
tenant := dS.cfg.GeneralCfg().DefaultTenant
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -28,7 +28,7 @@ func (dS *DispatcherService) ResourceSv1Ping(args *utils.CGREventWithArgDispatch
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -51,7 +51,7 @@ func (dS *DispatcherService) ResourceSv1GetResourcesForEvent(args utils.ArgRSv1R
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -75,7 +75,7 @@ func (dS *DispatcherService) ResourceSv1AuthorizeResources(args utils.ArgRSv1Res
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -99,7 +99,7 @@ func (dS *DispatcherService) ResourceSv1AllocateResources(args utils.ArgRSv1Reso
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -123,7 +123,7 @@ func (dS *DispatcherService) ResourceSv1ReleaseResources(args utils.ArgRSv1Resou
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
tnt = args.CGREvent.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -32,7 +32,7 @@ func (dS *DispatcherService) ResponderPing(args *utils.CGREventWithArgDispatcher
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -51,7 +51,7 @@ func (dS *DispatcherService) ResponderPing(args *utils.CGREventWithArgDispatcher
func (dS *DispatcherService) ResponderGetCost(args *engine.CallDescriptorWithArgDispatcher,
reply *engine.CallCost) (err error) {
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -70,7 +70,7 @@ func (dS *DispatcherService) ResponderGetCost(args *engine.CallDescriptorWithArg
func (dS *DispatcherService) ResponderDebit(args *engine.CallDescriptorWithArgDispatcher,
reply *engine.CallCost) (err error) {
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -89,7 +89,7 @@ func (dS *DispatcherService) ResponderDebit(args *engine.CallDescriptorWithArgDi
func (dS *DispatcherService) ResponderMaxDebit(args *engine.CallDescriptorWithArgDispatcher,
reply *engine.CallCost) (err error) {
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -108,7 +108,7 @@ func (dS *DispatcherService) ResponderMaxDebit(args *engine.CallDescriptorWithAr
func (dS *DispatcherService) ResponderRefundIncrements(args *engine.CallDescriptorWithArgDispatcher,
reply *engine.Account) (err error) {
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -127,7 +127,7 @@ func (dS *DispatcherService) ResponderRefundIncrements(args *engine.CallDescript
func (dS *DispatcherService) ResponderRefundRounding(args *engine.CallDescriptorWithArgDispatcher,
reply *float64) (err error) {
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -146,7 +146,7 @@ func (dS *DispatcherService) ResponderRefundRounding(args *engine.CallDescriptor
func (dS *DispatcherService) ResponderGetMaxSessionTime(args *engine.CallDescriptorWithArgDispatcher,
reply *time.Duration) (err error) {
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -166,7 +166,7 @@ func (dS *DispatcherService) ResponderGetMaxSessionTime(args *engine.CallDescrip
func (dS *DispatcherService) ResponderShutdown(args *utils.TenantWithArgDispatcher,
reply *string) (err error) {
tnt := utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -27,7 +27,7 @@ func (dS *DispatcherService) SchedulerSv1Ping(args *utils.CGREventWithArgDispatc
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -47,7 +47,7 @@ func (dS *DispatcherService) SchedulerSv1Ping(args *utils.CGREventWithArgDispatc
func (dS *DispatcherService) SchedulerSv1Reload(args *utils.CGREventWithArgDispatcher, reply *string) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -31,7 +31,7 @@ func (dS *DispatcherService) ServiceManagerV1Ping(args *utils.CGREventWithArgDis
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -55,7 +55,7 @@ func (dS *DispatcherService) ServiceManagerV1StartService(args ArgStartServiceWi
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -78,7 +78,7 @@ func (dS *DispatcherService) ServiceManagerV1StopService(args ArgStartServiceWit
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -101,7 +101,7 @@ func (dS *DispatcherService) ServiceManagerV1ServiceStatus(args ArgStartServiceW
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -27,7 +27,7 @@ import (
func (dS *DispatcherService) SessionSv1Ping(args *utils.CGREventWithArgDispatcher, reply *string) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -48,7 +48,7 @@ func (dS *DispatcherService) SessionSv1Ping(args *utils.CGREventWithArgDispatche
func (dS *DispatcherService) SessionSv1AuthorizeEvent(args *sessions.V1AuthorizeArgs,
reply *sessions.V1AuthorizeReply) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -69,7 +69,7 @@ func (dS *DispatcherService) SessionSv1AuthorizeEvent(args *sessions.V1Authorize
func (dS *DispatcherService) SessionSv1AuthorizeEventWithDigest(args *sessions.V1AuthorizeArgs,
reply *sessions.V1AuthorizeReplyWithDigest) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -90,7 +90,7 @@ func (dS *DispatcherService) SessionSv1AuthorizeEventWithDigest(args *sessions.V
func (dS *DispatcherService) SessionSv1InitiateSession(args *sessions.V1InitSessionArgs,
reply *sessions.V1InitSessionReply) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -111,7 +111,7 @@ func (dS *DispatcherService) SessionSv1InitiateSession(args *sessions.V1InitSess
func (dS *DispatcherService) SessionSv1InitiateSessionWithDigest(args *sessions.V1InitSessionArgs,
reply *sessions.V1InitReplyWithDigest) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -132,7 +132,7 @@ func (dS *DispatcherService) SessionSv1InitiateSessionWithDigest(args *sessions.
func (dS *DispatcherService) SessionSv1UpdateSession(args *sessions.V1UpdateSessionArgs,
reply *sessions.V1UpdateSessionReply) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -156,7 +156,7 @@ func (dS *DispatcherService) SessionSv1SyncSessions(args *utils.TenantWithArgDis
if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -176,7 +176,7 @@ func (dS *DispatcherService) SessionSv1SyncSessions(args *utils.TenantWithArgDis
func (dS *DispatcherService) SessionSv1TerminateSession(args *sessions.V1TerminateSessionArgs,
reply *string) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -197,7 +197,7 @@ func (dS *DispatcherService) SessionSv1TerminateSession(args *sessions.V1Termina
func (dS *DispatcherService) SessionSv1ProcessCDR(args *utils.CGREventWithArgDispatcher,
reply *string) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -218,7 +218,7 @@ func (dS *DispatcherService) SessionSv1ProcessCDR(args *utils.CGREventWithArgDis
func (dS *DispatcherService) SessionSv1ProcessMessage(args *sessions.V1ProcessMessageArgs,
reply *sessions.V1ProcessMessageReply) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -239,7 +239,7 @@ func (dS *DispatcherService) SessionSv1ProcessMessage(args *sessions.V1ProcessMe
func (dS *DispatcherService) SessionSv1ProcessEvent(args *sessions.V1ProcessEventArgs,
reply *sessions.V1ProcessEventReply) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -263,7 +263,7 @@ func (dS *DispatcherService) SessionSv1GetActiveSessions(args *utils.SessionFilt
if args.Tenant != utils.EmptyString {
tnt = args.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -286,7 +286,7 @@ func (dS *DispatcherService) SessionSv1GetActiveSessionsCount(args *utils.Sessio
if args.Tenant != utils.EmptyString {
tnt = args.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -309,7 +309,7 @@ func (dS *DispatcherService) SessionSv1ForceDisconnect(args *utils.SessionFilter
if args.Tenant != utils.EmptyString {
tnt = args.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -332,7 +332,7 @@ func (dS *DispatcherService) SessionSv1GetPassiveSessions(args *utils.SessionFil
if args.Tenant != utils.EmptyString {
tnt = args.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -355,7 +355,7 @@ func (dS *DispatcherService) SessionSv1GetPassiveSessionsCount(args *utils.Sessi
if args.Tenant != utils.EmptyString {
tnt = args.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -378,7 +378,7 @@ func (dS *DispatcherService) SessionSv1ReplicateSessions(args ArgsReplicateSessi
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -401,7 +401,7 @@ func (dS *DispatcherService) SessionSv1SetPassiveSession(args *sessions.Session,
if args.Tenant != utils.EmptyString {
tnt = args.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -30,7 +30,7 @@ func (dS *DispatcherService) StatSv1Ping(args *utils.CGREventWithArgDispatcher,
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -51,7 +51,7 @@ func (dS *DispatcherService) StatSv1Ping(args *utils.CGREventWithArgDispatcher,
func (dS *DispatcherService) StatSv1GetStatQueuesForEvent(args *engine.StatsArgsProcessEvent,
reply *[]string) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -72,7 +72,7 @@ func (dS *DispatcherService) StatSv1GetStatQueuesForEvent(args *engine.StatsArgs
func (dS *DispatcherService) StatSv1GetQueueStringMetrics(args *utils.TenantIDWithArgDispatcher,
reply *map[string]string) (err error) {
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -96,7 +96,7 @@ func (dS *DispatcherService) StatSv1GetQueueStringMetrics(args *utils.TenantIDWi
func (dS *DispatcherService) StatSv1ProcessEvent(args *engine.StatsArgsProcessEvent,
reply *[]string) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -116,7 +116,7 @@ func (dS *DispatcherService) StatSv1ProcessEvent(args *engine.StatsArgsProcessEv
func (dS *DispatcherService) StatSv1GetQueueFloatMetrics(args *utils.TenantIDWithArgDispatcher,
reply *map[string]float64) (err error) {
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -143,7 +143,7 @@ func (dS *DispatcherService) StatSv1GetQueueIDs(args *utils.TenantWithArgDispatc
if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -28,7 +28,7 @@ func (dS *DispatcherService) SupplierSv1Ping(args *utils.CGREventWithArgDispatch
args = utils.NewCGREventWithArgDispatcher()
}
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -49,7 +49,7 @@ func (dS *DispatcherService) SupplierSv1Ping(args *utils.CGREventWithArgDispatch
func (dS *DispatcherService) SupplierSv1GetSuppliers(args *engine.ArgsGetSuppliers,
reply *engine.SortedSuppliers) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
@@ -70,7 +70,7 @@ func (dS *DispatcherService) SupplierSv1GetSuppliers(args *engine.ArgsGetSupplie
func (dS *DispatcherService) SupplierSv1GetSupplierProfilesForEvent(args *utils.CGREventWithArgDispatcher,
reply *[]*engine.SupplierProfile) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}

View File

@@ -33,7 +33,7 @@ func (dS *DispatcherService) ThresholdSv1Ping(args *utils.CGREventWithArgDispatc
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if err = dS.authorize(utils.ThresholdSv1Ping,
args.CGREvent.Tenant,
args.APIKey, args.CGREvent.Time); err != nil {
@@ -54,7 +54,7 @@ func (dS *DispatcherService) ThresholdSv1GetThresholdsForEvent(args *engine.Args
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if err = dS.authorize(utils.ThresholdSv1GetThresholdsForEvent,
args.CGREvent.Tenant,
args.APIKey, args.CGREvent.Time); err != nil {
@@ -75,7 +75,7 @@ func (dS *DispatcherService) ThresholdSv1ProcessEvent(args *engine.ArgsProcessEv
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if err = dS.authorize(utils.ThresholdSv1ProcessEvent,
args.CGREvent.Tenant,
args.APIKey, args.CGREvent.Time); err != nil {
@@ -98,7 +98,7 @@ func (dS *DispatcherService) ThresholdSv1GetThresholdIDs(args *utils.TenantWithA
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if err = dS.authorize(utils.ThresholdSv1GetThresholdIDs,
tnt, args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
@@ -120,7 +120,7 @@ func (dS *DispatcherService) ThresholdSv1GetThreshold(args *utils.TenantIDWithAr
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if dS.attrS != nil {
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if err = dS.authorize(utils.ThresholdSv1GetThreshold, tnt,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return

View File

@@ -34,7 +34,7 @@ import (
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, attrsChan, internalChan chan rpcclient.ClientConnector) servmanager.Service {
server *utils.Server, internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service {
return &DispatcherService{
connChan: internalChan,
cfg: cfg,
@@ -42,7 +42,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
attrsChan: attrsChan,
connMgr: connMgr,
}
}
@@ -54,7 +54,7 @@ type DispatcherService struct {
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *utils.Server
attrsChan chan rpcclient.ClientConnector
connMgr *engine.ConnManager
dspS *dispatchers.DispatcherService
rpc *v1.DispatcherSv1
@@ -76,20 +76,7 @@ func (dspS *DispatcherService) Start() (err error) {
dspS.Lock()
defer dspS.Unlock()
var attrSConn *rpcclient.RPCPool
if len(dspS.cfg.DispatcherSCfg().AttributeSConns) != 0 { // AttributeS connection init
if attrSConn, err = engine.NewRPCPool(rpcclient.PoolFirst,
dspS.cfg.TlsCfg().ClientKey,
dspS.cfg.TlsCfg().ClientCerificate, dspS.cfg.TlsCfg().CaCertificate,
dspS.cfg.GeneralCfg().ConnectAttempts, dspS.cfg.GeneralCfg().Reconnects,
dspS.cfg.GeneralCfg().ConnectTimeout, dspS.cfg.GeneralCfg().ReplyTimeout,
dspS.cfg.DispatcherSCfg().AttributeSConns, dspS.attrsChan, false); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.DispatcherS, utils.AttributeS, err.Error()))
return
}
}
if dspS.dspS, err = dispatchers.NewDispatcherService(dspS.dm.GetDM(), dspS.cfg, fltrS, attrSConn); err != nil {
if dspS.dspS, err = dispatchers.NewDispatcherService(dspS.dm.GetDM(), dspS.cfg, fltrS, dspS.connMgr); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error()))
return
}

View File

@@ -54,7 +54,7 @@ func TestDispatcherSReload(t *testing.T) {
db := NewDataDBService(cfg)
attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1))
srv := NewDispatcherService(cfg, db, chS, filterSChan, server,
attrS.GetIntenternalChan(), make(chan rpcclient.ClientConnector, 1))
make(chan rpcclient.ClientConnector, 1), nil)
srvMngr.AddServices(NewConnManagerService(cfg, nil), attrS, srv,
NewLoaderService(cfg, db, filterSChan, server,
engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)

View File

@@ -148,6 +148,15 @@ var (
// AccountableRequestTypes are the ones handled by Accounting subsystem
AccountableRequestTypes = NewStringSet([]string{META_PREPAID, META_POSTPAID, META_PSEUDOPREPAID})
CacheDataDBPartitions = NewStringSet([]string{CacheDestinations, CacheReverseDestinations,
CacheRatingPlans, CacheRatingProfiles, CacheActions,
CacheActionPlans, CacheAccountActionPlans, CacheActionTriggers, CacheSharedGroups, CacheResourceProfiles, CacheResources,
CacheTimings, CacheStatQueueProfiles, CacheStatQueues, CacheThresholdProfiles, CacheThresholds,
CacheFilters, CacheSupplierProfiles, CacheAttributeProfiles, CacheChargerProfiles,
CacheDispatcherProfiles, CacheDispatcherHosts, CacheResourceFilterIndexes, CacheStatFilterIndexes,
CacheThresholdFilterIndexes, CacheSupplierFilterIndexes, CacheAttributeFilterIndexes,
CacheChargerFilterIndexes, CacheDispatcherFilterIndexes, CacheLoadIDs, CacheAccounts})
)
const (