diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index f36384596..fc9cdc4fa 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -798,94 +798,6 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneCh exitChan <- true // Should not get out of loop though } -// startSupplierService fires up the SupplierS -func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan, - internalAttrSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, - cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, - filterSChan chan *engine.FilterS, exitChan chan bool) { - var err error - filterS := <-filterSChan - filterSChan <- filterS - var attrSConn, resourceSConn, statSConn rpcclient.RpcClientConnection - - intAttrSChan := internalAttrSChan - intStatSChan := internalStatSChan - intRsChan := internalRsChan - if cfg.DispatcherSCfg().Enabled { // use dispatcher as internal chanel if active - intAttrSChan = internalDispatcherSChan - intStatSChan = internalDispatcherSChan - intRsChan = internalDispatcherSChan - } - if len(cfg.SupplierSCfg().AttributeSConns) != 0 { - attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SupplierSCfg().AttributeSConns, intAttrSChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.SupplierS, utils.AttributeS, err.Error())) - exitChan <- true - return - } - } - if len(cfg.SupplierSCfg().StatSConns) != 0 { - statSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SupplierSCfg().StatSConns, intStatSChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", - utils.SupplierS, err.Error())) - exitChan <- true - return - } - } - if len(cfg.SupplierSCfg().ResourceSConns) != 0 { - resourceSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SupplierSCfg().ResourceSConns, intRsChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", - utils.SupplierS, err.Error())) - exitChan <- true - return - } - } - <-cacheS.GetPrecacheChannel(utils.CacheSupplierProfiles) - <-cacheS.GetPrecacheChannel(utils.CacheSupplierFilterIndexes) - - splS, err := engine.NewSupplierService(dm, cfg.GeneralCfg().DefaultTimezone, - filterS, cfg.SupplierSCfg().StringIndexedFields, - cfg.SupplierSCfg().PrefixIndexedFields, resourceSConn, statSConn, attrSConn) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", - utils.SupplierS, err.Error())) - exitChan <- true - return - } - go func() { - if err := splS.ListenAndServe(exitChan); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", - utils.SupplierS, err.Error())) - } - splS.Shutdown() - exitChan <- true - return - }() - splV1 := v1.NewSupplierSv1(splS) - if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(splV1) - } - internalSupplierSChan <- splV1 -} - // startFilterService fires up the FilterS func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, internalStatSChan, internalResourceSChan, internalRalSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, @@ -1471,12 +1383,14 @@ func main() { tS := services.NewThresholdService() stS := services.NewStatService() reS := services.NewResourceService() - srvManager.AddService(attrS, chrS, tS, stS, reS) + supS := services.NewSupplierService() + srvManager.AddService(attrS, chrS, tS, stS, reS, supS) internalAttributeSChan = attrS.GetIntenternalChan() internalChargerSChan = chrS.GetIntenternalChan() internalThresholdSChan = tS.GetIntenternalChan() internalStatSChan = stS.GetIntenternalChan() internalRsChan = reS.GetIntenternalChan() + internalSupplierSChan = supS.GetIntenternalChan() go srvManager.StartServices() initServiceManagerV1(internalServeManagerChan, srvManager, server) @@ -1586,11 +1500,6 @@ func main() { // Start FilterS go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan) - if cfg.SupplierSCfg().Enabled { - go startSupplierService(internalSupplierSChan, internalRsChan, - internalStatSChan, internalAttributeSChan, internalDispatcherSChan, - cacheS, cfg, dm, server, filterSChan, exitChan) - } if cfg.DispatcherSCfg().Enabled { go startDispatcherService(internalDispatcherSChan, internalAttributeSChan, cfg, cacheS, filterSChan, diff --git a/config/config.go b/config/config.go index 7f5cb5833..c7657b381 100755 --- a/config/config.go +++ b/config/config.go @@ -622,14 +622,6 @@ func (self *CGRConfig) checkConfigSanity() error { } // SupplierS checks if self.supplierSCfg.Enabled && !self.dispatcherSCfg.Enabled { - for _, connCfg := range self.supplierSCfg.RALsConns { - if connCfg.Address != utils.MetaInternal { - return errors.New("Only <*internal> RALs connectivity allowed in SupplierS for now") - } - if !self.ralsCfg.RALsEnabled { - return errors.New("RALs not enabled but requested by SupplierS component.") - } - } if !self.resourceSCfg.Enabled { for _, connCfg := range self.supplierSCfg.ResourceSConns { if connCfg.Address == utils.MetaInternal { @@ -637,7 +629,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } } - if !self.resourceSCfg.Enabled { + if !self.statsCfg.Enabled { for _, connCfg := range self.supplierSCfg.StatSConns { if connCfg.Address == utils.MetaInternal { return errors.New("StatS not enabled but requested by SupplierS component.") @@ -1183,7 +1175,10 @@ func (cfg *CGRConfig) ThresholdSCfg() *ThresholdSCfg { return cfg.thresholdSCfg } +// SupplierSCfg returns the config for SupplierS func (cfg *CGRConfig) SupplierSCfg() *SupplierSCfg { + cfg.lks[SupplierSJson].Lock() + defer cfg.lks[SupplierSJson].Unlock() return cfg.supplierSCfg } @@ -1589,6 +1584,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case SupplierSJson: + cfg.rldChans[SupplierSJson] <- struct{}{} if !fall { break } diff --git a/config/config_defaults.go b/config/config_defaults.go index edd98df52..89a646421 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -560,9 +560,6 @@ const CGRATES_CFG_JSON = ` //"string_indexed_fields": [], // query indexes based on these fields for faster processing "prefix_indexed_fields": [], // query indexes based on these fields for faster processing "attributes_conns": [], // connections to AttributeS for altering events before supplier queries: <""|*internal|127.0.0.1:2013> - "rals_conns": [ // connections to RALs for cost/accounting <*internal> - {"address": "*internal"}, - ], "resources_conns": [], // connections to ResourceS for *res sorting, empty to disable functionality: <""|*internal|x.y.z.y:1234> "stats_conns": [], // connections to StatS for *stats sorting, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> "default_ratio":1 // default ratio used in case of *load strategy diff --git a/config/config_it_test.go b/config/config_it_test.go index 64f574ede..938bdcaaf 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -219,6 +219,35 @@ func TestCGRConfigReloadResourceS(t *testing.T) { } } +func TestCGRConfigReloadSupplierS(t *testing.T) { + cfg, err := NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + var reply string + if err = cfg.V1ReloadConfig(&ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo2"), + Section: SupplierSJson, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + expAttr := &SupplierSCfg{ + Enabled: true, + StringIndexedFields: &[]string{"LCRProfile"}, + PrefixIndexedFields: &[]string{utils.Destination}, + ResourceSConns: []*RemoteHost{}, + StatSConns: []*RemoteHost{}, + AttributeSConns: []*RemoteHost{}, + IndexedSelects: true, + DefaultRatio: 1, + } + if !reflect.DeepEqual(expAttr, cfg.SupplierSCfg()) { + t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SupplierSCfg())) + } +} + func TestCgrCfgV1ReloadConfigSection(t *testing.T) { for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} { if err := os.RemoveAll(dir); err != nil { diff --git a/config/config_json_test.go b/config/config_json_test.go index 484bcd0a3..b00e11b71 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -854,14 +854,9 @@ func TestDfSupplierSJsonCfg(t *testing.T) { String_indexed_fields: nil, Prefix_indexed_fields: &[]string{}, Attributes_conns: &[]*RemoteHostJson{}, - Rals_conns: &[]*RemoteHostJson{ - { - Address: utils.StringPointer("*internal"), - }, - }, - Resources_conns: &[]*RemoteHostJson{}, - Stats_conns: &[]*RemoteHostJson{}, - Default_ratio: utils.IntPointer(1), + Resources_conns: &[]*RemoteHostJson{}, + Stats_conns: &[]*RemoteHostJson{}, + Default_ratio: utils.IntPointer(1), } if cfg, err := dfCgrJsonCfg.SupplierSJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index 483afe5a8..ecde14170 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -856,12 +856,9 @@ func TestCgrCfgJSONDefaultSupplierSCfg(t *testing.T) { StringIndexedFields: nil, PrefixIndexedFields: &[]string{}, AttributeSConns: []*RemoteHost{}, - RALsConns: []*RemoteHost{ - {Address: "*internal"}, - }, - ResourceSConns: []*RemoteHost{}, - StatSConns: []*RemoteHost{}, - DefaultRatio: 1, + ResourceSConns: []*RemoteHost{}, + StatSConns: []*RemoteHost{}, + DefaultRatio: 1, } if !reflect.DeepEqual(eSupplSCfg, cgrCfg.supplierSCfg) { t.Errorf("received: %+v, expecting: %+v", eSupplSCfg, cgrCfg.supplierSCfg) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index b9473152a..8411f3268 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -429,7 +429,6 @@ type SupplierSJsonCfg struct { String_indexed_fields *[]string Prefix_indexed_fields *[]string Attributes_conns *[]*RemoteHostJson - Rals_conns *[]*RemoteHostJson Resources_conns *[]*RemoteHostJson Stats_conns *[]*RemoteHostJson Default_ratio *int diff --git a/config/supplierscfg.go b/config/supplierscfg.go index 8375d8e5b..ae23d377e 100644 --- a/config/supplierscfg.go +++ b/config/supplierscfg.go @@ -25,7 +25,6 @@ type SupplierSCfg struct { StringIndexedFields *[]string PrefixIndexedFields *[]string AttributeSConns []*RemoteHost - RALsConns []*RemoteHost ResourceSConns []*RemoteHost StatSConns []*RemoteHost DefaultRatio int @@ -62,13 +61,6 @@ func (spl *SupplierSCfg) loadFromJsonCfg(jsnCfg *SupplierSJsonCfg) (err error) { spl.AttributeSConns[idx].loadFromJsonCfg(jsnHaCfg) } } - if jsnCfg.Rals_conns != nil { - spl.RALsConns = make([]*RemoteHost, len(*jsnCfg.Rals_conns)) - for idx, jsnHaCfg := range *jsnCfg.Rals_conns { - spl.RALsConns[idx] = NewDfltRemoteHost() - spl.RALsConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } if jsnCfg.Resources_conns != nil { spl.ResourceSConns = make([]*RemoteHost, len(*jsnCfg.Resources_conns)) for idx, jsnHaCfg := range *jsnCfg.Resources_conns { diff --git a/config/supplierscfg_test.go b/config/supplierscfg_test.go index a8309d067..58b13751a 100644 --- a/config/supplierscfg_test.go +++ b/config/supplierscfg_test.go @@ -40,9 +40,6 @@ func TestSupplierSCfgloadFromJsonCfg(t *testing.T) { //"string_indexed_fields": [], // query indexes based on these fields for faster processing "prefix_indexed_fields": ["index1", "index2"], // query indexes based on these fields for faster processing "attributes_conns": [], // address where to reach the AttributeS <""|127.0.0.1:2013> - "rals_conns": [ - {"address": "*internal"}, // address where to reach the RALs for cost/accounting <*internal> - ], "resources_conns": [], // address where to reach the Resource service, empty to disable functionality: <""|*internal|x.y.z.y:1234> "stats_conns": [], // address where to reach the Stat service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> "default_ratio":1, @@ -51,7 +48,6 @@ func TestSupplierSCfgloadFromJsonCfg(t *testing.T) { expected = SupplierSCfg{ PrefixIndexedFields: &[]string{"index1", "index2"}, AttributeSConns: []*RemoteHost{}, - RALsConns: []*RemoteHost{{Address: "*internal"}}, ResourceSConns: []*RemoteHost{}, StatSConns: []*RemoteHost{}, DefaultRatio: 1, diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index be9d7e3c9..b059ddb7d 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -497,9 +497,6 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "attributes_conns": [], // connections to AttributeS for altering events before supplier queries: <""|*internal|127.0.0.1:2013> -// "rals_conns": [ // connections to RALs for cost/accounting <*internal> -// {"address": "*internal"}, -// ], // "resources_conns": [], // connections to ResourceS for *res sorting, empty to disable functionality: <""|*internal|x.y.z.y:1234> // "stats_conns": [], // connections to StatS for *stats sorting, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> // }, diff --git a/data/tutorial_tests/asterisk_ari/cgrates/etc/cgrates/cgrates.json b/data/tutorial_tests/asterisk_ari/cgrates/etc/cgrates/cgrates.json index 6878cc3a6..f6c8c3c59 100644 --- a/data/tutorial_tests/asterisk_ari/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorial_tests/asterisk_ari/cgrates/etc/cgrates/cgrates.json @@ -129,9 +129,6 @@ "suppliers": { "enabled": true, - "rals_conns": [ - {"address": "*internal"}, - ], "resources_conns": [ {"address": "*internal"}, ], diff --git a/data/tutorial_tests/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorial_tests/fs_evsock/cgrates/etc/cgrates/cgrates.json index c044b42bf..ce7eab8c6 100644 --- a/data/tutorial_tests/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorial_tests/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -126,9 +126,6 @@ "suppliers": { "enabled": true, - "rals_conns": [ - {"address": "*internal"}, - ], "resources_conns": [ {"address": "*internal"}, ], diff --git a/data/tutorial_tests/kamevapi/cgrates/etc/cgrates/cgrates.json b/data/tutorial_tests/kamevapi/cgrates/etc/cgrates/cgrates.json index a9f955719..ff868a184 100644 --- a/data/tutorial_tests/kamevapi/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorial_tests/kamevapi/cgrates/etc/cgrates/cgrates.json @@ -125,9 +125,6 @@ "suppliers": { "enabled": true, - "rals_conns": [ - {"address": "*internal"}, - ], "resources_conns": [ {"address": "*internal"}, ], diff --git a/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json b/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json index fa3b522bb..d539ffbac 100644 --- a/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json @@ -122,9 +122,6 @@ "suppliers": { "enabled": true, - "rals_conns": [ - {"address": "*internal"} - ], "resources_conns": [ {"address": "*internal"} ], diff --git a/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json b/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json index 6878cc3a6..f6c8c3c59 100644 --- a/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json @@ -129,9 +129,6 @@ "suppliers": { "enabled": true, - "rals_conns": [ - {"address": "*internal"}, - ], "resources_conns": [ {"address": "*internal"}, ], diff --git a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json index c044b42bf..ce7eab8c6 100644 --- a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -126,9 +126,6 @@ "suppliers": { "enabled": true, - "rals_conns": [ - {"address": "*internal"}, - ], "resources_conns": [ {"address": "*internal"}, ], diff --git a/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json b/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json index f15bac3b7..e0875a738 100644 --- a/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json @@ -125,9 +125,6 @@ "suppliers": { "enabled": true, - "rals_conns": [ - {"address": "*internal"}, - ], "resources_conns": [ {"address": "*internal"}, ], diff --git a/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json b/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json index 15188fa12..b7719ecd5 100644 --- a/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json @@ -122,9 +122,6 @@ "suppliers": { "enabled": true, - "rals_conns": [ - {"address": "*internal"} - ], "resources_conns": [ {"address": "*internal"} ], diff --git a/data/tutorials/osips_native/cgrates/etc/cgrates/cgrates.json b/data/tutorials/osips_native/cgrates/etc/cgrates/cgrates.json index 71de649cf..0aa8fdbbc 100644 --- a/data/tutorials/osips_native/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/osips_native/cgrates/etc/cgrates/cgrates.json @@ -123,9 +123,6 @@ "suppliers": { "enabled": true, - "rals_conns": [ - {"address": "*internal"} - ], "resources_conns": [ {"address": "*internal"} ], diff --git a/engine/suppliers.go b/engine/suppliers.go index e32177d89..a875e2cb7 100644 --- a/engine/suppliers.go +++ b/engine/suppliers.go @@ -108,9 +108,9 @@ func (lps SupplierProfiles) Sort() { sort.Slice(lps, func(i, j int) bool { return lps[i].Weight > lps[j].Weight }) } -// NewLCRService initializes a LCRService -func NewSupplierService(dm *DataManager, timezone string, - filterS *FilterS, stringIndexedFields, prefixIndexedFields *[]string, resourceS, +// NewSupplierService initializes the Supplier Service +func NewSupplierService(dm *DataManager, + filterS *FilterS, cgrcfg *config.CGRConfig, resourceS, statS, attributeS rpcclient.RpcClientConnection) (spS *SupplierService, err error) { if attributeS != nil && reflect.ValueOf(attributeS).IsNil() { // fix nil value in interface attributeS = nil @@ -122,14 +122,13 @@ func NewSupplierService(dm *DataManager, timezone string, statS = nil } spS = &SupplierService{ - dm: dm, - timezone: timezone, - filterS: filterS, - attributeS: attributeS, - resourceS: resourceS, - statS: statS, - stringIndexedFields: stringIndexedFields, - prefixIndexedFields: prefixIndexedFields} + dm: dm, + filterS: filterS, + attributeS: attributeS, + resourceS: resourceS, + statS: statS, + cgrcfg: cgrcfg, + } if spS.sorter, err = NewSupplierSortDispatcher(spS); err != nil { return nil, err } @@ -138,15 +137,13 @@ func NewSupplierService(dm *DataManager, timezone string, // SupplierService is the service computing Supplier queries type SupplierService struct { - dm *DataManager - timezone string - filterS *FilterS - stringIndexedFields *[]string - prefixIndexedFields *[]string - attributeS, - resourceS, - statS rpcclient.RpcClientConnection - sorter SupplierSortDispatcher + dm *DataManager + filterS *FilterS + cgrcfg *config.CGRConfig + attributeS rpcclient.RpcClientConnection + resourceS rpcclient.RpcClientConnection + statS rpcclient.RpcClientConnection + sorter SupplierSortDispatcher } // ListenAndServe will initialize the service @@ -166,7 +163,7 @@ func (spS *SupplierService) Shutdown() error { // matchingSupplierProfilesForEvent returns ordered list of matching resources which are active by the time of the call func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *utils.CGREvent, singleResult bool) (matchingSLP []*SupplierProfile, err error) { - sPrflIDs, err := MatchingItemIDsForEvent(ev.Event, spS.stringIndexedFields, spS.prefixIndexedFields, + sPrflIDs, err := MatchingItemIDsForEvent(ev.Event, spS.cgrcfg.SupplierSCfg().StringIndexedFields, spS.cgrcfg.SupplierSCfg().PrefixIndexedFields, spS.dm, utils.CacheSupplierFilterIndexes, ev.Tenant, spS.filterS.cfg.SupplierSCfg().IndexedSelects) if err != nil { return nil, err @@ -235,7 +232,7 @@ func (spS *SupplierService) costForEvent(ev *utils.CGREvent, return } var sTime time.Time - if sTime, err = ev.FieldAsTime(utils.SetupTime, spS.timezone); err != nil { + if sTime, err = ev.FieldAsTime(utils.SetupTime, spS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil { return } var usage time.Duration diff --git a/engine/suppliers_test.go b/engine/suppliers_test.go index 0050fc026..ff38e5d7e 100644 --- a/engine/suppliers_test.go +++ b/engine/suppliers_test.go @@ -300,10 +300,11 @@ func TestSuppliersPopulateSupplierService(t *testing.T) { t.Errorf("Error: %+v", err) } - splService, err = NewSupplierService(dmSPP, - config.CgrConfig().GeneralCfg().DefaultTimezone, &FilterS{ - dm: dmSPP, - cfg: defaultCfg}, nil, nil, nil, nil, nil) + defaultCfg.SupplierSCfg().StringIndexedFields = nil + defaultCfg.SupplierSCfg().PrefixIndexedFields = nil + splService, err = NewSupplierService(dmSPP, &FilterS{ + dm: dmSPP, + cfg: defaultCfg}, defaultCfg, nil, nil, nil) if err != nil { t.Errorf("Error: %+v", err) } diff --git a/services/suppliers.go b/services/suppliers.go new file mode 100644 index 000000000..35a5e56ac --- /dev/null +++ b/services/suppliers.go @@ -0,0 +1,125 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewSupplierService returns the Supplier Service +func NewSupplierService() servmanager.Service { + return &SupplierService{ + connChan: make(chan rpcclient.RpcClientConnection, 1), + } +} + +// SupplierService implements Service interface +type SupplierService struct { + splS *engine.SupplierService + rpc *v1.SupplierSv1 + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +func (splS *SupplierService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if splS.IsRunning() { + return fmt.Errorf("service aleady running") + } + + if waitCache { + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheSupplierProfiles) + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheSupplierFilterIndexes) + } + var attrSConn, resourceSConn, statSConn rpcclient.RpcClientConnection + + attrSConn, err = sp.GetConnection(utils.AttributeS, sp.GetConfig().SupplierSCfg().AttributeSConns) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.SupplierS, utils.SupplierS, err.Error())) + return + } + statSConn, err = sp.GetConnection(utils.StatS, sp.GetConfig().SupplierSCfg().StatSConns) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", + utils.SupplierS, err.Error())) + return + } + resourceSConn, err = sp.GetConnection(utils.ResourceS, sp.GetConfig().SupplierSCfg().ResourceSConns) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", + utils.SupplierS, err.Error())) + return + } + splS.splS, err = engine.NewSupplierService(sp.GetDM(), sp.GetFilterS(), sp.GetConfig(), + resourceSConn, statSConn, attrSConn) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", + utils.SupplierS, err.Error())) + return + } + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.SupplierS)) + splS.rpc = v1.NewSupplierSv1(splS.splS) + if !sp.GetConfig().DispatcherSCfg().Enabled { + sp.GetServer().RpcRegister(splS.rpc) + } + splS.connChan <- splS.rpc + return +} + +// GetIntenternalChan returns the internal connection chanel +func (splS *SupplierService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return splS.connChan +} + +// Reload handles the change of config +func (splS *SupplierService) Reload(sp servmanager.ServiceProvider) (err error) { + return +} + +// Shutdown stops the service +func (splS *SupplierService) Shutdown() (err error) { + if err = splS.splS.Shutdown(); err != nil { + return + } + splS.splS = nil + splS.rpc = nil + <-splS.connChan + return +} + +// GetRPCInterface returns the interface to register for server +func (splS *SupplierService) GetRPCInterface() interface{} { + return splS.rpc +} + +// IsRunning returns if the service is running +func (splS *SupplierService) IsRunning() bool { + return splS != nil && splS.splS != nil +} + +// ServiceName returns the service name +func (splS *SupplierService) ServiceName() string { + return utils.SupplierS +} diff --git a/services/suppliers_it_test.go b/services/suppliers_it_test.go new file mode 100644 index 000000000..520d44821 --- /dev/null +++ b/services/suppliers_it_test.go @@ -0,0 +1,81 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package services + +import ( + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +func TestSupplierSReload(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + cfg.StatSCfg().Enabled = true + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + engineShutdown := make(chan bool, 1) + chS := engine.NewCacheS(cfg, nil) + close(chS.GetPrecacheChannel(utils.CacheSupplierProfiles)) + close(chS.GetPrecacheChannel(utils.CacheSupplierFilterIndexes)) + close(chS.GetPrecacheChannel(utils.CacheStatQueueProfiles)) + close(chS.GetPrecacheChannel(utils.CacheStatQueues)) + close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes)) + server := utils.NewServer() + srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, + chS /*cdrStorage*/, nil, + /*loadStorage*/ nil, filterSChan, + server, nil, engineShutdown) + supS := NewSupplierService() + srvMngr.AddService(supS, NewStatService()) + if err = srvMngr.StartServices(); err != nil { + t.Error(err) + } + if supS.IsRunning() { + t.Errorf("Expected service to be down") + } + var reply string + if err := cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongonew"), + Section: config.SupplierSJson, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !supS.IsRunning() { + t.Errorf("Expected service to be running") + } + cfg.SupplierSCfg().Enabled = false + cfg.GetReloadChan(config.SupplierSJson) <- struct{}{} + time.Sleep(10 * time.Millisecond) + if supS.IsRunning() { + t.Errorf("Expected service to be down") + } + engineShutdown <- true +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index a77f39f55..ab254ef55 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -308,10 +308,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.cfg.ThresholdSCfg().Enabled { go func() { - if chrS, has := srvMngr.subsystems[utils.ThresholdS]; !has { + if thrS, has := srvMngr.subsystems[utils.ThresholdS]; !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS)) srvMngr.engineShutdown <- true - } else if err = chrS.Start(srvMngr, true); err != nil { + } else if err = thrS.Start(srvMngr, true); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ThresholdS, err)) srvMngr.engineShutdown <- true } @@ -319,10 +319,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.cfg.StatSCfg().Enabled { go func() { - if chrS, has := srvMngr.subsystems[utils.StatS]; !has { + if stS, has := srvMngr.subsystems[utils.StatS]; !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS)) srvMngr.engineShutdown <- true - } else if err = chrS.Start(srvMngr, true); err != nil { + } else if err = stS.Start(srvMngr, true); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.StatS, err)) srvMngr.engineShutdown <- true } @@ -330,15 +330,26 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.cfg.ResourceSCfg().Enabled { go func() { - if chrS, has := srvMngr.subsystems[utils.ResourceS]; !has { + if reS, has := srvMngr.subsystems[utils.ResourceS]; !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ResourceS)) srvMngr.engineShutdown <- true - } else if err = chrS.Start(srvMngr, true); err != nil { + } else if err = reS.Start(srvMngr, true); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ResourceS, err)) srvMngr.engineShutdown <- true } }() } + if srvMngr.cfg.SupplierSCfg().Enabled { + go func() { + if supS, has := srvMngr.subsystems[utils.SupplierS]; !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS)) + srvMngr.engineShutdown <- true + } else if err = supS.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.SupplierS, err)) + srvMngr.engineShutdown <- true + } + }() + } // startServer() return } @@ -500,6 +511,34 @@ func (srvMngr *ServiceManager) handleReload() { return // stop if we encounter an error } } + case <-srvMngr.cfg.GetReloadChan(config.SupplierSJson): + tS, has := srvMngr.subsystems[utils.SupplierS] + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + if srvMngr.cfg.SupplierSCfg().Enabled { + if tS.IsRunning() { + if err = tS.Reload(srvMngr); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to reload <%s>", utils.ServiceManager, utils.SupplierS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } else { + if err = tS.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } + } else if tS.IsRunning() { + if err = tS.Shutdown(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to stop service <%s>", utils.ServiceManager, utils.SupplierS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } } // handle RPC server }