From cc14c1e25add9411f07f923b15b16c896fae3747 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 18 Dec 2019 07:25:46 -0500 Subject: [PATCH 1/3] Add Attribute connection from DispatcherS through ConnManager --- cmd/cgr-engine/cgr-engine.go | 2 +- config/config_json_test.go | 2 +- config/config_test.go | 2 +- config/configsanity.go | 11 ++++++ config/dispatchercfg.go | 16 ++++++--- config/libconfig_json.go | 2 +- .../dispatchers/diamagent/cgrates.json | 4 +-- .../dispatchers/dispatchers/cgrates.json | 4 +-- .../dispatchers/dispatchers_gob/cgrates.json | 4 +-- .../dispatchers_mongo/cgrates.json | 4 +-- .../dispatchers_mongo_gob/cgrates.json | 4 +-- dispatchers/attributes.go | 6 ++-- dispatchers/caches.go | 28 +++++++-------- dispatchers/cdrs.go | 16 ++++----- dispatchers/chargers.go | 6 ++-- dispatchers/config.go | 4 +-- dispatchers/core.go | 4 +-- dispatchers/dispatchers.go | 21 ++++++----- dispatchers/guardian.go | 6 ++-- dispatchers/rals.go | 4 +-- dispatchers/resources.go | 10 +++--- dispatchers/responder.go | 16 ++++----- dispatchers/scheduler.go | 4 +-- dispatchers/servicemanager.go | 8 ++--- dispatchers/sessions.go | 36 +++++++++---------- dispatchers/stats.go | 12 +++---- dispatchers/suppliers.go | 6 ++-- dispatchers/thresholds.go | 10 +++--- services/dispatchers.go | 21 +++-------- 29 files changed, 133 insertions(+), 140 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 71f105d78..af2283b4f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -524,7 +524,7 @@ func main() { }) attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan) - dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, internalDispatcherSChan) + dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager.GetConnMgr()) chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, internalChargerSChan, connManager.GetConnMgr()) tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan) diff --git a/config/config_json_test.go b/config/config_json_test.go index ea57d8103..985b5b91b 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1528,7 +1528,7 @@ func TestDfDispatcherSJsonCfg(t *testing.T) { Indexed_selects: utils.BoolPointer(true), String_indexed_fields: nil, Prefix_indexed_fields: &[]string{}, - Attributes_conns: &[]*RemoteHostJson{}, + Attributes_conns: &[]string{}, } if cfg, err := dfCgrJsonCfg.DispatcherSJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index 2c6fdf021..dea938365 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1526,7 +1526,7 @@ func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) { IndexedSelects: true, StringIndexedFields: nil, PrefixIndexedFields: &[]string{}, - AttributeSConns: []*RemoteHost{}, + AttributeSConns: []string{}, } if !reflect.DeepEqual(cgrCfg.dispatcherSCfg, eDspSCfg) { t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg) diff --git a/config/configsanity.go b/config/configsanity.go index ebd3c9373..b3ee37b3d 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -481,5 +481,16 @@ func (cfg *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ApierV1, connID) } } + // Dispatcher sanity check + if cfg.dispatcherSCfg.Enabled { + for _, connID := range cfg.dispatcherSCfg.AttributeSConns { + if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.attributeSCfg.Enabled { + return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.AttributeS, utils.DispatcherS) + } + if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) { + return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.DispatcherS, connID) + } + } + } return nil } diff --git a/config/dispatchercfg.go b/config/dispatchercfg.go index 6d7961546..a38a7357b 100755 --- a/config/dispatchercfg.go +++ b/config/dispatchercfg.go @@ -18,13 +18,15 @@ along with this program. If not, see package config +import "github.com/cgrates/cgrates/utils" + // DispatcherSCfg is the configuration of dispatcher service type DispatcherSCfg struct { Enabled bool IndexedSelects bool StringIndexedFields *[]string PrefixIndexedFields *[]string - AttributeSConns []*RemoteHost + AttributeSConns []string } func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err error) { @@ -52,10 +54,14 @@ func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err erro dps.PrefixIndexedFields = &pif } if jsnCfg.Attributes_conns != nil { - dps.AttributeSConns = make([]*RemoteHost, len(*jsnCfg.Attributes_conns)) - for idx, jsnHaCfg := range *jsnCfg.Attributes_conns { - dps.AttributeSConns[idx] = NewDfltRemoteHost() - dps.AttributeSConns[idx].loadFromJsonCfg(jsnHaCfg) + dps.AttributeSConns = make([]string, len(*jsnCfg.Attributes_conns)) + for idx, connID := range *jsnCfg.Attributes_conns { + // if we have the connection internal we change the name so we can have internal rpc for each subsystem + if connID == utils.MetaInternal { + dps.AttributeSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes) + } else { + dps.AttributeSConns[idx] = connID + } } } return nil diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 84f9cc3ce..645237f29 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -501,7 +501,7 @@ type DispatcherSJsonCfg struct { Indexed_selects *bool String_indexed_fields *[]string Prefix_indexed_fields *[]string - Attributes_conns *[]*RemoteHostJson + Attributes_conns *[]string } type LoaderCfgJson struct { diff --git a/data/conf/samples/dispatchers/diamagent/cgrates.json b/data/conf/samples/dispatchers/diamagent/cgrates.json index e75ebd36e..b02125cfc 100644 --- a/data/conf/samples/dispatchers/diamagent/cgrates.json +++ b/data/conf/samples/dispatchers/diamagent/cgrates.json @@ -44,9 +44,7 @@ "dispatchers":{ "enabled": true, - "attributes_conns": [ - {"address": "*internal"} - ] + "attributes_conns": ["*internal"] }, diff --git a/data/conf/samples/dispatchers/dispatchers/cgrates.json b/data/conf/samples/dispatchers/dispatchers/cgrates.json index 8de2caf22..966bf48ec 100755 --- a/data/conf/samples/dispatchers/dispatchers/cgrates.json +++ b/data/conf/samples/dispatchers/dispatchers/cgrates.json @@ -62,9 +62,7 @@ "dispatchers":{ "enabled": true, - "attributes_conns": [ - {"address": "*internal"}, - ], + "attributes_conns": ["*internal"], }, "apier": { diff --git a/data/conf/samples/dispatchers/dispatchers_gob/cgrates.json b/data/conf/samples/dispatchers/dispatchers_gob/cgrates.json index b8e1c7de1..62ec78ed3 100755 --- a/data/conf/samples/dispatchers/dispatchers_gob/cgrates.json +++ b/data/conf/samples/dispatchers/dispatchers_gob/cgrates.json @@ -62,9 +62,7 @@ "dispatchers":{ "enabled": true, - "attributes_conns": [ - {"address": "*internal"}, - ], + "attributes_conns": ["*internal"], }, "apier": { diff --git a/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json b/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json index 3410051cb..e3c33f62f 100644 --- a/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json +++ b/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json @@ -72,9 +72,7 @@ "dispatchers":{ "enabled": true, - "attributes_conns": [ - {"address": "*internal"}, - ], + "attributes_conns": ["*internal"], }, diff --git a/data/conf/samples/dispatchers/dispatchers_mongo_gob/cgrates.json b/data/conf/samples/dispatchers/dispatchers_mongo_gob/cgrates.json index 2a28cb711..17ae40d87 100644 --- a/data/conf/samples/dispatchers/dispatchers_mongo_gob/cgrates.json +++ b/data/conf/samples/dispatchers/dispatchers_mongo_gob/cgrates.json @@ -72,9 +72,7 @@ "dispatchers":{ "enabled": true, - "attributes_conns": [ - {"address": "*internal"}, - ], + "attributes_conns": ["*internal"], }, diff --git a/dispatchers/attributes.go b/dispatchers/attributes.go index 08f2d75a1..ab925c7ae 100755 --- a/dispatchers/attributes.go +++ b/dispatchers/attributes.go @@ -30,7 +30,7 @@ func (dS *DispatcherService) AttributeSv1Ping(args *utils.CGREventWithArgDispatc args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -54,7 +54,7 @@ func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *engine.AttrA if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -77,7 +77,7 @@ func (dS *DispatcherService) AttributeSv1ProcessEvent(args *engine.AttrArgsProce if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/caches.go b/dispatchers/caches.go index 77aca2f25..cb751ac22 100644 --- a/dispatchers/caches.go +++ b/dispatchers/caches.go @@ -32,7 +32,7 @@ func (dS *DispatcherService) CacheSv1Ping(args *utils.CGREventWithArgDispatcher, args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -56,7 +56,7 @@ func (dS *DispatcherService) CacheSv1GetItemIDs(args *utils.ArgsGetCacheItemIDsW if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -80,7 +80,7 @@ func (dS *DispatcherService) CacheSv1HasItem(args *utils.ArgsGetCacheItemWithArg if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -104,7 +104,7 @@ func (dS *DispatcherService) CacheSv1GetItemExpiryTime(args *utils.ArgsGetCacheI if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -128,7 +128,7 @@ func (dS *DispatcherService) CacheSv1RemoveItem(args *utils.ArgsGetCacheItemWith if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -152,7 +152,7 @@ func (dS *DispatcherService) CacheSv1Clear(args *utils.AttrCacheIDsWithArgDispat if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -175,7 +175,7 @@ func (dS *DispatcherService) CacheSv1FlushCache(args utils.AttrReloadCacheWithAr if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -199,7 +199,7 @@ func (dS *DispatcherService) CacheSv1GetCacheStats(args *utils.AttrCacheIDsWithA if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -222,7 +222,7 @@ func (dS *DispatcherService) CacheSv1PrecacheStatus(args *utils.AttrCacheIDsWith if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -246,7 +246,7 @@ func (dS *DispatcherService) CacheSv1HasGroup(args *utils.ArgsGetGroupWithArgDis if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -270,7 +270,7 @@ func (dS *DispatcherService) CacheSv1GetGroupItemIDs(args *utils.ArgsGetGroupWit if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -293,7 +293,7 @@ func (dS *DispatcherService) CacheSv1RemoveGroup(args *utils.ArgsGetGroupWithArg if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -316,7 +316,7 @@ func (dS *DispatcherService) CacheSv1ReloadCache(args utils.AttrReloadCacheWithA if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -339,7 +339,7 @@ func (dS *DispatcherService) CacheSv1LoadCache(args utils.AttrReloadCacheWithArg if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/cdrs.go b/dispatchers/cdrs.go index 79e80d756..8b5ce4d42 100644 --- a/dispatchers/cdrs.go +++ b/dispatchers/cdrs.go @@ -35,7 +35,7 @@ func (dS *DispatcherService) CDRsV1Ping(args *utils.CGREventWithArgDispatcher, if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -57,7 +57,7 @@ func (dS *DispatcherService) CDRsV1GetCDRs(args utils.RPCCDRsFilterWithArgDispat if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -79,7 +79,7 @@ func (dS *DispatcherService) CDRsV1GetCDRsCount(args *utils.RPCCDRsFilterWithArg if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -101,7 +101,7 @@ func (dS *DispatcherService) CDRsV1StoreSessionCost(args *engine.AttrCDRSStoreSM if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -123,7 +123,7 @@ func (dS *DispatcherService) CDRsV1RateCDRs(args *engine.ArgRateCDRs, reply *str if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -145,7 +145,7 @@ func (dS *DispatcherService) CDRsV1ProcessExternalCDR(args *engine.ExternalCDRWi if args.Tenant != utils.EmptyString { tnt = args.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -167,7 +167,7 @@ func (dS *DispatcherService) CDRsV1ProcessEvent(args *engine.ArgV1ProcessEvent, if args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -189,7 +189,7 @@ func (dS *DispatcherService) CDRsV1ProcessCDR(args *engine.CDRWithArgDispatcher, if args.Tenant != utils.EmptyString { tnt = args.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/chargers.go b/dispatchers/chargers.go index 1a1e1d488..9c56f0d94 100755 --- a/dispatchers/chargers.go +++ b/dispatchers/chargers.go @@ -28,7 +28,7 @@ func (dS *DispatcherService) ChargerSv1Ping(args *utils.CGREventWithArgDispatche args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -51,7 +51,7 @@ func (dS *DispatcherService) ChargerSv1GetChargersForEvent(args *utils.CGREventW if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -74,7 +74,7 @@ func (dS *DispatcherService) ChargerSv1ProcessEvent(args *utils.CGREventWithArgD if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/config.go b/dispatchers/config.go index 192937148..f710f0421 100644 --- a/dispatchers/config.go +++ b/dispatchers/config.go @@ -30,7 +30,7 @@ func (dS *DispatcherService) ConfigSv1GetJSONSection(args *config.StringWithArgD if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -52,7 +52,7 @@ func (dS *DispatcherService) ConfigSv1ReloadConfig(args *config.ConfigReloadWith if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/core.go b/dispatchers/core.go index a3f1cd821..0fee1c3a9 100644 --- a/dispatchers/core.go +++ b/dispatchers/core.go @@ -30,7 +30,7 @@ func (dS *DispatcherService) CoreSv1Status(args *utils.TenantWithArgDispatcher, if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -52,7 +52,7 @@ func (dS *DispatcherService) CoreSv1Ping(args *utils.CGREventWithArgDispatcher, if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 0fdfe6b27..fb57c5648 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -34,21 +34,19 @@ import ( // NewDispatcherService constructs a DispatcherService func NewDispatcherService(dm *engine.DataManager, cfg *config.CGRConfig, fltrS *engine.FilterS, - attrS *rpcclient.RPCPool) (*DispatcherService, error) { - if attrS != nil && reflect.ValueOf(attrS).IsNil() { - attrS = nil - } + connMgr *engine.ConnManager) (*DispatcherService, error) { + return &DispatcherService{dm: dm, cfg: cfg, - fltrS: fltrS, attrS: attrS}, nil + fltrS: fltrS, connMgr: connMgr}, nil } // DispatcherService is the service handling dispatching towards internal components // designed to handle automatic partitioning and failover type DispatcherService struct { - dm *engine.DataManager - cfg *config.CGRConfig - fltrS *engine.FilterS - attrS *rpcclient.RPCPool // used for API auth + dm *engine.DataManager + cfg *config.CGRConfig + fltrS *engine.FilterS + connMgr *engine.ConnManager } // ListenAndServe will initialize the service @@ -68,7 +66,8 @@ func (dS *DispatcherService) Shutdown() error { func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent, reply *engine.AttrSProcessEventReply) (err error) { - if err = dS.attrS.Call(utils.AttributeSv1ProcessEvent, + if err = dS.connMgr.Call(dS.cfg.DispatcherSCfg().AttributeSConns, nil, + utils.AttributeSv1ProcessEvent, &engine.AttrArgsProcessEvent{ Context: utils.StringPointer(utils.MetaAuth), CGREvent: ev}, reply); err != nil { @@ -225,7 +224,7 @@ func (dS *DispatcherService) V1Apier(apier interface{}, args *utils.MethodParame } tenant := utils.FirstNonEmpty(utils.IfaceAsString(parameters[utils.Tenant]), config.CgrConfig().GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if argD == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/guardian.go b/dispatchers/guardian.go index bd132a71b..2af93da1b 100644 --- a/dispatchers/guardian.go +++ b/dispatchers/guardian.go @@ -31,7 +31,7 @@ func (dS *DispatcherService) GuardianSv1Ping(args *utils.CGREventWithArgDispatch args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -56,7 +56,7 @@ func (dS *DispatcherService) GuardianSv1RemoteLock(args AttrRemoteLockWithApiKey if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -80,7 +80,7 @@ func (dS *DispatcherService) GuardianSv1RemoteUnlock(args AttrRemoteUnlockWithAp if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/rals.go b/dispatchers/rals.go index 3eecfbbee..553ea4183 100644 --- a/dispatchers/rals.go +++ b/dispatchers/rals.go @@ -27,7 +27,7 @@ func (dS *DispatcherService) RALsV1Ping(args *utils.CGREventWithArgDispatcher, r args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -46,7 +46,7 @@ func (dS *DispatcherService) RALsV1Ping(args *utils.CGREventWithArgDispatcher, r func (dS *DispatcherService) RALsV1GetRatingPlansCost(args *utils.RatingPlanCostArg, rpl *RatingPlanCost) (err error) { tenant := dS.cfg.GeneralCfg().DefaultTenant - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/resources.go b/dispatchers/resources.go index 1317a1b72..57a887809 100755 --- a/dispatchers/resources.go +++ b/dispatchers/resources.go @@ -28,7 +28,7 @@ func (dS *DispatcherService) ResourceSv1Ping(args *utils.CGREventWithArgDispatch args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -51,7 +51,7 @@ func (dS *DispatcherService) ResourceSv1GetResourcesForEvent(args utils.ArgRSv1R if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -75,7 +75,7 @@ func (dS *DispatcherService) ResourceSv1AuthorizeResources(args utils.ArgRSv1Res if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -99,7 +99,7 @@ func (dS *DispatcherService) ResourceSv1AllocateResources(args utils.ArgRSv1Reso if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -123,7 +123,7 @@ func (dS *DispatcherService) ResourceSv1ReleaseResources(args utils.ArgRSv1Resou if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { tnt = args.CGREvent.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/responder.go b/dispatchers/responder.go index f8848996c..057397b59 100644 --- a/dispatchers/responder.go +++ b/dispatchers/responder.go @@ -32,7 +32,7 @@ func (dS *DispatcherService) ResponderPing(args *utils.CGREventWithArgDispatcher args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -51,7 +51,7 @@ func (dS *DispatcherService) ResponderPing(args *utils.CGREventWithArgDispatcher func (dS *DispatcherService) ResponderGetCost(args *engine.CallDescriptorWithArgDispatcher, reply *engine.CallCost) (err error) { - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -70,7 +70,7 @@ func (dS *DispatcherService) ResponderGetCost(args *engine.CallDescriptorWithArg func (dS *DispatcherService) ResponderDebit(args *engine.CallDescriptorWithArgDispatcher, reply *engine.CallCost) (err error) { - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -89,7 +89,7 @@ func (dS *DispatcherService) ResponderDebit(args *engine.CallDescriptorWithArgDi func (dS *DispatcherService) ResponderMaxDebit(args *engine.CallDescriptorWithArgDispatcher, reply *engine.CallCost) (err error) { - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -108,7 +108,7 @@ func (dS *DispatcherService) ResponderMaxDebit(args *engine.CallDescriptorWithAr func (dS *DispatcherService) ResponderRefundIncrements(args *engine.CallDescriptorWithArgDispatcher, reply *engine.Account) (err error) { - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -127,7 +127,7 @@ func (dS *DispatcherService) ResponderRefundIncrements(args *engine.CallDescript func (dS *DispatcherService) ResponderRefundRounding(args *engine.CallDescriptorWithArgDispatcher, reply *float64) (err error) { - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -146,7 +146,7 @@ func (dS *DispatcherService) ResponderRefundRounding(args *engine.CallDescriptor func (dS *DispatcherService) ResponderGetMaxSessionTime(args *engine.CallDescriptorWithArgDispatcher, reply *time.Duration) (err error) { - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -166,7 +166,7 @@ func (dS *DispatcherService) ResponderGetMaxSessionTime(args *engine.CallDescrip func (dS *DispatcherService) ResponderShutdown(args *utils.TenantWithArgDispatcher, reply *string) (err error) { tnt := utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/scheduler.go b/dispatchers/scheduler.go index 5b9d90a4b..2410ae8eb 100644 --- a/dispatchers/scheduler.go +++ b/dispatchers/scheduler.go @@ -27,7 +27,7 @@ func (dS *DispatcherService) SchedulerSv1Ping(args *utils.CGREventWithArgDispatc args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -47,7 +47,7 @@ func (dS *DispatcherService) SchedulerSv1Ping(args *utils.CGREventWithArgDispatc func (dS *DispatcherService) SchedulerSv1Reload(args *utils.CGREventWithArgDispatcher, reply *string) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/servicemanager.go b/dispatchers/servicemanager.go index 01dd969f3..d48189b3f 100644 --- a/dispatchers/servicemanager.go +++ b/dispatchers/servicemanager.go @@ -31,7 +31,7 @@ func (dS *DispatcherService) ServiceManagerV1Ping(args *utils.CGREventWithArgDis args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -55,7 +55,7 @@ func (dS *DispatcherService) ServiceManagerV1StartService(args ArgStartServiceWi if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -78,7 +78,7 @@ func (dS *DispatcherService) ServiceManagerV1StopService(args ArgStartServiceWit if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -101,7 +101,7 @@ func (dS *DispatcherService) ServiceManagerV1ServiceStatus(args ArgStartServiceW if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/sessions.go b/dispatchers/sessions.go index 93dc411fb..0326e6a5a 100755 --- a/dispatchers/sessions.go +++ b/dispatchers/sessions.go @@ -27,7 +27,7 @@ import ( func (dS *DispatcherService) SessionSv1Ping(args *utils.CGREventWithArgDispatcher, reply *string) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -48,7 +48,7 @@ func (dS *DispatcherService) SessionSv1Ping(args *utils.CGREventWithArgDispatche func (dS *DispatcherService) SessionSv1AuthorizeEvent(args *sessions.V1AuthorizeArgs, reply *sessions.V1AuthorizeReply) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -69,7 +69,7 @@ func (dS *DispatcherService) SessionSv1AuthorizeEvent(args *sessions.V1Authorize func (dS *DispatcherService) SessionSv1AuthorizeEventWithDigest(args *sessions.V1AuthorizeArgs, reply *sessions.V1AuthorizeReplyWithDigest) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -90,7 +90,7 @@ func (dS *DispatcherService) SessionSv1AuthorizeEventWithDigest(args *sessions.V func (dS *DispatcherService) SessionSv1InitiateSession(args *sessions.V1InitSessionArgs, reply *sessions.V1InitSessionReply) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -111,7 +111,7 @@ func (dS *DispatcherService) SessionSv1InitiateSession(args *sessions.V1InitSess func (dS *DispatcherService) SessionSv1InitiateSessionWithDigest(args *sessions.V1InitSessionArgs, reply *sessions.V1InitReplyWithDigest) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -132,7 +132,7 @@ func (dS *DispatcherService) SessionSv1InitiateSessionWithDigest(args *sessions. func (dS *DispatcherService) SessionSv1UpdateSession(args *sessions.V1UpdateSessionArgs, reply *sessions.V1UpdateSessionReply) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -156,7 +156,7 @@ func (dS *DispatcherService) SessionSv1SyncSessions(args *utils.TenantWithArgDis if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -176,7 +176,7 @@ func (dS *DispatcherService) SessionSv1SyncSessions(args *utils.TenantWithArgDis func (dS *DispatcherService) SessionSv1TerminateSession(args *sessions.V1TerminateSessionArgs, reply *string) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -197,7 +197,7 @@ func (dS *DispatcherService) SessionSv1TerminateSession(args *sessions.V1Termina func (dS *DispatcherService) SessionSv1ProcessCDR(args *utils.CGREventWithArgDispatcher, reply *string) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -218,7 +218,7 @@ func (dS *DispatcherService) SessionSv1ProcessCDR(args *utils.CGREventWithArgDis func (dS *DispatcherService) SessionSv1ProcessMessage(args *sessions.V1ProcessMessageArgs, reply *sessions.V1ProcessMessageReply) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -239,7 +239,7 @@ func (dS *DispatcherService) SessionSv1ProcessMessage(args *sessions.V1ProcessMe func (dS *DispatcherService) SessionSv1ProcessEvent(args *sessions.V1ProcessEventArgs, reply *sessions.V1ProcessEventReply) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -263,7 +263,7 @@ func (dS *DispatcherService) SessionSv1GetActiveSessions(args *utils.SessionFilt if args.Tenant != utils.EmptyString { tnt = args.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -286,7 +286,7 @@ func (dS *DispatcherService) SessionSv1GetActiveSessionsCount(args *utils.Sessio if args.Tenant != utils.EmptyString { tnt = args.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -309,7 +309,7 @@ func (dS *DispatcherService) SessionSv1ForceDisconnect(args *utils.SessionFilter if args.Tenant != utils.EmptyString { tnt = args.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -332,7 +332,7 @@ func (dS *DispatcherService) SessionSv1GetPassiveSessions(args *utils.SessionFil if args.Tenant != utils.EmptyString { tnt = args.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -355,7 +355,7 @@ func (dS *DispatcherService) SessionSv1GetPassiveSessionsCount(args *utils.Sessi if args.Tenant != utils.EmptyString { tnt = args.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -378,7 +378,7 @@ func (dS *DispatcherService) SessionSv1ReplicateSessions(args ArgsReplicateSessi if args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -401,7 +401,7 @@ func (dS *DispatcherService) SessionSv1SetPassiveSession(args *sessions.Session, if args.Tenant != utils.EmptyString { tnt = args.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/stats.go b/dispatchers/stats.go index dc0e1b4ce..9c1df6dd4 100755 --- a/dispatchers/stats.go +++ b/dispatchers/stats.go @@ -30,7 +30,7 @@ func (dS *DispatcherService) StatSv1Ping(args *utils.CGREventWithArgDispatcher, args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -51,7 +51,7 @@ func (dS *DispatcherService) StatSv1Ping(args *utils.CGREventWithArgDispatcher, func (dS *DispatcherService) StatSv1GetStatQueuesForEvent(args *engine.StatsArgsProcessEvent, reply *[]string) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -72,7 +72,7 @@ func (dS *DispatcherService) StatSv1GetStatQueuesForEvent(args *engine.StatsArgs func (dS *DispatcherService) StatSv1GetQueueStringMetrics(args *utils.TenantIDWithArgDispatcher, reply *map[string]string) (err error) { - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -96,7 +96,7 @@ func (dS *DispatcherService) StatSv1GetQueueStringMetrics(args *utils.TenantIDWi func (dS *DispatcherService) StatSv1ProcessEvent(args *engine.StatsArgsProcessEvent, reply *[]string) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -116,7 +116,7 @@ func (dS *DispatcherService) StatSv1ProcessEvent(args *engine.StatsArgsProcessEv func (dS *DispatcherService) StatSv1GetQueueFloatMetrics(args *utils.TenantIDWithArgDispatcher, reply *map[string]float64) (err error) { - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -143,7 +143,7 @@ func (dS *DispatcherService) StatSv1GetQueueIDs(args *utils.TenantWithArgDispatc if args.TenantArg != nil && args.TenantArg.Tenant != utils.EmptyString { tnt = args.TenantArg.Tenant } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/suppliers.go b/dispatchers/suppliers.go index 608ff37af..41e4e4c04 100755 --- a/dispatchers/suppliers.go +++ b/dispatchers/suppliers.go @@ -28,7 +28,7 @@ func (dS *DispatcherService) SupplierSv1Ping(args *utils.CGREventWithArgDispatch args = utils.NewCGREventWithArgDispatcher() } args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -49,7 +49,7 @@ func (dS *DispatcherService) SupplierSv1Ping(args *utils.CGREventWithArgDispatch func (dS *DispatcherService) SupplierSv1GetSuppliers(args *engine.ArgsGetSuppliers, reply *engine.SortedSuppliers) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } @@ -70,7 +70,7 @@ func (dS *DispatcherService) SupplierSv1GetSuppliers(args *engine.ArgsGetSupplie func (dS *DispatcherService) SupplierSv1GetSupplierProfilesForEvent(args *utils.CGREventWithArgDispatcher, reply *[]*engine.SupplierProfile) (err error) { args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } diff --git a/dispatchers/thresholds.go b/dispatchers/thresholds.go index 15635676f..495bdc5ad 100755 --- a/dispatchers/thresholds.go +++ b/dispatchers/thresholds.go @@ -33,7 +33,7 @@ func (dS *DispatcherService) ThresholdSv1Ping(args *utils.CGREventWithArgDispatc if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if err = dS.authorize(utils.ThresholdSv1Ping, args.CGREvent.Tenant, args.APIKey, args.CGREvent.Time); err != nil { @@ -54,7 +54,7 @@ func (dS *DispatcherService) ThresholdSv1GetThresholdsForEvent(args *engine.Args if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if err = dS.authorize(utils.ThresholdSv1GetThresholdsForEvent, args.CGREvent.Tenant, args.APIKey, args.CGREvent.Time); err != nil { @@ -75,7 +75,7 @@ func (dS *DispatcherService) ThresholdSv1ProcessEvent(args *engine.ArgsProcessEv if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if err = dS.authorize(utils.ThresholdSv1ProcessEvent, args.CGREvent.Tenant, args.APIKey, args.CGREvent.Time); err != nil { @@ -98,7 +98,7 @@ func (dS *DispatcherService) ThresholdSv1GetThresholdIDs(args *utils.TenantWithA if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if err = dS.authorize(utils.ThresholdSv1GetThresholdIDs, tnt, args.APIKey, utils.TimePointer(time.Now())); err != nil { return @@ -120,7 +120,7 @@ func (dS *DispatcherService) ThresholdSv1GetThreshold(args *utils.TenantIDWithAr if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) } - if dS.attrS != nil { + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if err = dS.authorize(utils.ThresholdSv1GetThreshold, tnt, args.APIKey, utils.TimePointer(time.Now())); err != nil { return diff --git a/services/dispatchers.go b/services/dispatchers.go index 4a364a541..41ddc8d2c 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -34,7 +34,7 @@ import ( // NewDispatcherService returns the Dispatcher Service func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server, attrsChan, internalChan chan rpcclient.ClientConnector) servmanager.Service { + server *utils.Server, internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service { return &DispatcherService{ connChan: internalChan, cfg: cfg, @@ -42,7 +42,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS: cacheS, filterSChan: filterSChan, server: server, - attrsChan: attrsChan, + connMgr: connMgr, } } @@ -54,7 +54,7 @@ type DispatcherService struct { cacheS *engine.CacheS filterSChan chan *engine.FilterS server *utils.Server - attrsChan chan rpcclient.ClientConnector + connMgr *engine.ConnManager dspS *dispatchers.DispatcherService rpc *v1.DispatcherSv1 @@ -76,20 +76,7 @@ func (dspS *DispatcherService) Start() (err error) { dspS.Lock() defer dspS.Unlock() - var attrSConn *rpcclient.RPCPool - if len(dspS.cfg.DispatcherSCfg().AttributeSConns) != 0 { // AttributeS connection init - if attrSConn, err = engine.NewRPCPool(rpcclient.PoolFirst, - dspS.cfg.TlsCfg().ClientKey, - dspS.cfg.TlsCfg().ClientCerificate, dspS.cfg.TlsCfg().CaCertificate, - dspS.cfg.GeneralCfg().ConnectAttempts, dspS.cfg.GeneralCfg().Reconnects, - dspS.cfg.GeneralCfg().ConnectTimeout, dspS.cfg.GeneralCfg().ReplyTimeout, - dspS.cfg.DispatcherSCfg().AttributeSConns, dspS.attrsChan, false); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.DispatcherS, utils.AttributeS, err.Error())) - return - } - } - if dspS.dspS, err = dispatchers.NewDispatcherService(dspS.dm.GetDM(), dspS.cfg, fltrS, attrSConn); err != nil { + if dspS.dspS, err = dispatchers.NewDispatcherService(dspS.dm.GetDM(), dspS.cfg, fltrS, dspS.connMgr); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error())) return } From ab5afecc4c98ab815e5a0607dfe98213561b53e4 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 18 Dec 2019 08:12:45 -0500 Subject: [PATCH 2/3] Update integration test from services --- services/dispatchers_it_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index c4b02e28a..edbd8158d 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -54,7 +54,7 @@ func TestDispatcherSReload(t *testing.T) { db := NewDataDBService(cfg) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)) srv := NewDispatcherService(cfg, db, chS, filterSChan, server, - attrS.GetIntenternalChan(), make(chan rpcclient.ClientConnector, 1)) + make(chan rpcclient.ClientConnector, 1), nil) srvMngr.AddServices(NewConnManagerService(cfg, nil), attrS, srv, NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) From 0be9fb5a0162334c2e719bca0dc580423dda9e1b Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 18 Dec 2019 08:49:08 -0500 Subject: [PATCH 3/3] When checking Cache in case of *internal DataDB consider only DataDB cache partitions --- config/configsanity.go | 8 +------- config/configsanity_test.go | 32 +++++++++++--------------------- utils/consts.go | 9 +++++++++ 3 files changed, 21 insertions(+), 28 deletions(-) diff --git a/config/configsanity.go b/config/configsanity.go index b3ee37b3d..af5dc658c 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -436,13 +436,7 @@ func (cfg *CGRConfig) checkConfigSanity() error { // DataDB sanity checks if cfg.dataDbCfg.DataDbType == utils.INTERNAL { for key, config := range cfg.cacheCfg { - if key == utils.CacheDiameterMessages || key == utils.CacheClosedSessions || key == utils.CacheRPCConnections { - if config.Limit == 0 { - return fmt.Errorf("<%s> %s needs to be != 0 when DataBD is *internal, found 0.", utils.CacheS, key) - } - continue - } - if config.Limit != 0 { + if utils.CacheDataDBPartitions.Has(key) && config.Limit != 0 { return fmt.Errorf("<%s> %s needs to be 0 when DataBD is *internal, received : %d", utils.CacheS, key, config.Limit) } } diff --git a/config/configsanity_test.go b/config/configsanity_test.go index 46ba0a352..e02b06268 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -578,36 +578,26 @@ func TestConfigSanityStorDB(t *testing.T) { func TestConfigSanityDataDB(t *testing.T) { cfg, _ = NewDefaultCGRConfig() cfg.dataDbCfg.DataDbType = utils.INTERNAL - cfg.cacheCfg = CacheCfg{ - utils.CacheDiameterMessages: &CacheParamCfg{ - Limit: 0, - }, - } - expected := " *diameter_messages needs to be != 0 when DataBD is *internal, found 0." - if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { - t.Errorf("Expecting: %+q received: %+q", expected, err) - } - - cfg.cacheCfg = CacheCfg{ - utils.CacheDiameterMessages: &CacheParamCfg{ - Limit: 1, - }, - } - if err := cfg.checkConfigSanity(); err != nil { - t.Errorf("Expecting: nil received: %+q", err) - } cfg.cacheCfg = CacheCfg{ "test": &CacheParamCfg{ Limit: 1, }, } - expected = " test needs to be 0 when DataBD is *internal, received : 1" + if err := cfg.checkConfigSanity(); err != nil { + t.Error(err) + } + + cfg.cacheCfg = CacheCfg{ + utils.CacheAccounts: &CacheParamCfg{ + Limit: 1, + }, + } + expected := " *accounts needs to be 0 when DataBD is *internal, received : 1" if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.cacheCfg["test"].Limit = 0 - + cfg.cacheCfg[utils.CacheAccounts].Limit = 0 cfg.resourceSCfg.Enabled = true expected = " StoreInterval needs to be -1 when DataBD is *internal, received : 0" if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { diff --git a/utils/consts.go b/utils/consts.go index bbba9507f..5318188a7 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -148,6 +148,15 @@ var ( // AccountableRequestTypes are the ones handled by Accounting subsystem AccountableRequestTypes = NewStringSet([]string{META_PREPAID, META_POSTPAID, META_PSEUDOPREPAID}) + + CacheDataDBPartitions = NewStringSet([]string{CacheDestinations, CacheReverseDestinations, + CacheRatingPlans, CacheRatingProfiles, CacheActions, + CacheActionPlans, CacheAccountActionPlans, CacheActionTriggers, CacheSharedGroups, CacheResourceProfiles, CacheResources, + CacheTimings, CacheStatQueueProfiles, CacheStatQueues, CacheThresholdProfiles, CacheThresholds, + CacheFilters, CacheSupplierProfiles, CacheAttributeProfiles, CacheChargerProfiles, + CacheDispatcherProfiles, CacheDispatcherHosts, CacheResourceFilterIndexes, CacheStatFilterIndexes, + CacheThresholdFilterIndexes, CacheSupplierFilterIndexes, CacheAttributeFilterIndexes, + CacheChargerFilterIndexes, CacheDispatcherFilterIndexes, CacheLoadIDs, CacheAccounts}) ) const (