mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added SupplierS as service in ServiceManager
This commit is contained in:
committed by
Dan Christian Bogos
parent
9427fad424
commit
1ffdec31c2
@@ -798,94 +798,6 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneCh
|
||||
exitChan <- true // Should not get out of loop though
|
||||
}
|
||||
|
||||
// startSupplierService fires up the SupplierS
|
||||
func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan,
|
||||
internalAttrSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
|
||||
cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server,
|
||||
filterSChan chan *engine.FilterS, exitChan chan bool) {
|
||||
var err error
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
var attrSConn, resourceSConn, statSConn rpcclient.RpcClientConnection
|
||||
|
||||
intAttrSChan := internalAttrSChan
|
||||
intStatSChan := internalStatSChan
|
||||
intRsChan := internalRsChan
|
||||
if cfg.DispatcherSCfg().Enabled { // use dispatcher as internal chanel if active
|
||||
intAttrSChan = internalDispatcherSChan
|
||||
intStatSChan = internalDispatcherSChan
|
||||
intRsChan = internalDispatcherSChan
|
||||
}
|
||||
if len(cfg.SupplierSCfg().AttributeSConns) != 0 {
|
||||
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.SupplierSCfg().AttributeSConns, intAttrSChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
|
||||
utils.SupplierS, utils.AttributeS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.SupplierSCfg().StatSConns) != 0 {
|
||||
statSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.SupplierSCfg().StatSConns, intStatSChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
|
||||
utils.SupplierS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.SupplierSCfg().ResourceSConns) != 0 {
|
||||
resourceSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.SupplierSCfg().ResourceSConns, intRsChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
|
||||
utils.SupplierS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheSupplierProfiles)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheSupplierFilterIndexes)
|
||||
|
||||
splS, err := engine.NewSupplierService(dm, cfg.GeneralCfg().DefaultTimezone,
|
||||
filterS, cfg.SupplierSCfg().StringIndexedFields,
|
||||
cfg.SupplierSCfg().PrefixIndexedFields, resourceSConn, statSConn, attrSConn)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s",
|
||||
utils.SupplierS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
if err := splS.ListenAndServe(exitChan); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets",
|
||||
utils.SupplierS, err.Error()))
|
||||
}
|
||||
splS.Shutdown()
|
||||
exitChan <- true
|
||||
return
|
||||
}()
|
||||
splV1 := v1.NewSupplierSv1(splS)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(splV1)
|
||||
}
|
||||
internalSupplierSChan <- splV1
|
||||
}
|
||||
|
||||
// startFilterService fires up the FilterS
|
||||
func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
|
||||
internalStatSChan, internalResourceSChan, internalRalSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
@@ -1471,12 +1383,14 @@ func main() {
|
||||
tS := services.NewThresholdService()
|
||||
stS := services.NewStatService()
|
||||
reS := services.NewResourceService()
|
||||
srvManager.AddService(attrS, chrS, tS, stS, reS)
|
||||
supS := services.NewSupplierService()
|
||||
srvManager.AddService(attrS, chrS, tS, stS, reS, supS)
|
||||
internalAttributeSChan = attrS.GetIntenternalChan()
|
||||
internalChargerSChan = chrS.GetIntenternalChan()
|
||||
internalThresholdSChan = tS.GetIntenternalChan()
|
||||
internalStatSChan = stS.GetIntenternalChan()
|
||||
internalRsChan = reS.GetIntenternalChan()
|
||||
internalSupplierSChan = supS.GetIntenternalChan()
|
||||
go srvManager.StartServices()
|
||||
|
||||
initServiceManagerV1(internalServeManagerChan, srvManager, server)
|
||||
@@ -1586,11 +1500,6 @@ func main() {
|
||||
// Start FilterS
|
||||
go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan)
|
||||
|
||||
if cfg.SupplierSCfg().Enabled {
|
||||
go startSupplierService(internalSupplierSChan, internalRsChan,
|
||||
internalStatSChan, internalAttributeSChan, internalDispatcherSChan,
|
||||
cacheS, cfg, dm, server, filterSChan, exitChan)
|
||||
}
|
||||
if cfg.DispatcherSCfg().Enabled {
|
||||
go startDispatcherService(internalDispatcherSChan,
|
||||
internalAttributeSChan, cfg, cacheS, filterSChan,
|
||||
|
||||
@@ -622,14 +622,6 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
// SupplierS checks
|
||||
if self.supplierSCfg.Enabled && !self.dispatcherSCfg.Enabled {
|
||||
for _, connCfg := range self.supplierSCfg.RALsConns {
|
||||
if connCfg.Address != utils.MetaInternal {
|
||||
return errors.New("Only <*internal> RALs connectivity allowed in SupplierS for now")
|
||||
}
|
||||
if !self.ralsCfg.RALsEnabled {
|
||||
return errors.New("RALs not enabled but requested by SupplierS component.")
|
||||
}
|
||||
}
|
||||
if !self.resourceSCfg.Enabled {
|
||||
for _, connCfg := range self.supplierSCfg.ResourceSConns {
|
||||
if connCfg.Address == utils.MetaInternal {
|
||||
@@ -637,7 +629,7 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
}
|
||||
if !self.resourceSCfg.Enabled {
|
||||
if !self.statsCfg.Enabled {
|
||||
for _, connCfg := range self.supplierSCfg.StatSConns {
|
||||
if connCfg.Address == utils.MetaInternal {
|
||||
return errors.New("StatS not enabled but requested by SupplierS component.")
|
||||
@@ -1183,7 +1175,10 @@ func (cfg *CGRConfig) ThresholdSCfg() *ThresholdSCfg {
|
||||
return cfg.thresholdSCfg
|
||||
}
|
||||
|
||||
// SupplierSCfg returns the config for SupplierS
|
||||
func (cfg *CGRConfig) SupplierSCfg() *SupplierSCfg {
|
||||
cfg.lks[SupplierSJson].Lock()
|
||||
defer cfg.lks[SupplierSJson].Unlock()
|
||||
return cfg.supplierSCfg
|
||||
}
|
||||
|
||||
@@ -1589,6 +1584,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
|
||||
}
|
||||
fallthrough
|
||||
case SupplierSJson:
|
||||
cfg.rldChans[SupplierSJson] <- struct{}{}
|
||||
if !fall {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -560,9 +560,6 @@ const CGRATES_CFG_JSON = `
|
||||
//"string_indexed_fields": [], // query indexes based on these fields for faster processing
|
||||
"prefix_indexed_fields": [], // query indexes based on these fields for faster processing
|
||||
"attributes_conns": [], // connections to AttributeS for altering events before supplier queries: <""|*internal|127.0.0.1:2013>
|
||||
"rals_conns": [ // connections to RALs for cost/accounting <*internal>
|
||||
{"address": "*internal"},
|
||||
],
|
||||
"resources_conns": [], // connections to ResourceS for *res sorting, empty to disable functionality: <""|*internal|x.y.z.y:1234>
|
||||
"stats_conns": [], // connections to StatS for *stats sorting, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
|
||||
"default_ratio":1 // default ratio used in case of *load strategy
|
||||
|
||||
@@ -219,6 +219,35 @@ func TestCGRConfigReloadResourceS(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCGRConfigReloadSupplierS(t *testing.T) {
|
||||
cfg, err := NewDefaultCGRConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var reply string
|
||||
if err = cfg.V1ReloadConfig(&ConfigReloadWithArgDispatcher{
|
||||
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo2"),
|
||||
Section: SupplierSJson,
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expected OK received: %s", reply)
|
||||
}
|
||||
expAttr := &SupplierSCfg{
|
||||
Enabled: true,
|
||||
StringIndexedFields: &[]string{"LCRProfile"},
|
||||
PrefixIndexedFields: &[]string{utils.Destination},
|
||||
ResourceSConns: []*RemoteHost{},
|
||||
StatSConns: []*RemoteHost{},
|
||||
AttributeSConns: []*RemoteHost{},
|
||||
IndexedSelects: true,
|
||||
DefaultRatio: 1,
|
||||
}
|
||||
if !reflect.DeepEqual(expAttr, cfg.SupplierSCfg()) {
|
||||
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SupplierSCfg()))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
|
||||
for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
|
||||
@@ -854,14 +854,9 @@ func TestDfSupplierSJsonCfg(t *testing.T) {
|
||||
String_indexed_fields: nil,
|
||||
Prefix_indexed_fields: &[]string{},
|
||||
Attributes_conns: &[]*RemoteHostJson{},
|
||||
Rals_conns: &[]*RemoteHostJson{
|
||||
{
|
||||
Address: utils.StringPointer("*internal"),
|
||||
},
|
||||
},
|
||||
Resources_conns: &[]*RemoteHostJson{},
|
||||
Stats_conns: &[]*RemoteHostJson{},
|
||||
Default_ratio: utils.IntPointer(1),
|
||||
Resources_conns: &[]*RemoteHostJson{},
|
||||
Stats_conns: &[]*RemoteHostJson{},
|
||||
Default_ratio: utils.IntPointer(1),
|
||||
}
|
||||
if cfg, err := dfCgrJsonCfg.SupplierSJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -856,12 +856,9 @@ func TestCgrCfgJSONDefaultSupplierSCfg(t *testing.T) {
|
||||
StringIndexedFields: nil,
|
||||
PrefixIndexedFields: &[]string{},
|
||||
AttributeSConns: []*RemoteHost{},
|
||||
RALsConns: []*RemoteHost{
|
||||
{Address: "*internal"},
|
||||
},
|
||||
ResourceSConns: []*RemoteHost{},
|
||||
StatSConns: []*RemoteHost{},
|
||||
DefaultRatio: 1,
|
||||
ResourceSConns: []*RemoteHost{},
|
||||
StatSConns: []*RemoteHost{},
|
||||
DefaultRatio: 1,
|
||||
}
|
||||
if !reflect.DeepEqual(eSupplSCfg, cgrCfg.supplierSCfg) {
|
||||
t.Errorf("received: %+v, expecting: %+v", eSupplSCfg, cgrCfg.supplierSCfg)
|
||||
|
||||
@@ -429,7 +429,6 @@ type SupplierSJsonCfg struct {
|
||||
String_indexed_fields *[]string
|
||||
Prefix_indexed_fields *[]string
|
||||
Attributes_conns *[]*RemoteHostJson
|
||||
Rals_conns *[]*RemoteHostJson
|
||||
Resources_conns *[]*RemoteHostJson
|
||||
Stats_conns *[]*RemoteHostJson
|
||||
Default_ratio *int
|
||||
|
||||
@@ -25,7 +25,6 @@ type SupplierSCfg struct {
|
||||
StringIndexedFields *[]string
|
||||
PrefixIndexedFields *[]string
|
||||
AttributeSConns []*RemoteHost
|
||||
RALsConns []*RemoteHost
|
||||
ResourceSConns []*RemoteHost
|
||||
StatSConns []*RemoteHost
|
||||
DefaultRatio int
|
||||
@@ -62,13 +61,6 @@ func (spl *SupplierSCfg) loadFromJsonCfg(jsnCfg *SupplierSJsonCfg) (err error) {
|
||||
spl.AttributeSConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
}
|
||||
}
|
||||
if jsnCfg.Rals_conns != nil {
|
||||
spl.RALsConns = make([]*RemoteHost, len(*jsnCfg.Rals_conns))
|
||||
for idx, jsnHaCfg := range *jsnCfg.Rals_conns {
|
||||
spl.RALsConns[idx] = NewDfltRemoteHost()
|
||||
spl.RALsConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
}
|
||||
}
|
||||
if jsnCfg.Resources_conns != nil {
|
||||
spl.ResourceSConns = make([]*RemoteHost, len(*jsnCfg.Resources_conns))
|
||||
for idx, jsnHaCfg := range *jsnCfg.Resources_conns {
|
||||
|
||||
@@ -40,9 +40,6 @@ func TestSupplierSCfgloadFromJsonCfg(t *testing.T) {
|
||||
//"string_indexed_fields": [], // query indexes based on these fields for faster processing
|
||||
"prefix_indexed_fields": ["index1", "index2"], // query indexes based on these fields for faster processing
|
||||
"attributes_conns": [], // address where to reach the AttributeS <""|127.0.0.1:2013>
|
||||
"rals_conns": [
|
||||
{"address": "*internal"}, // address where to reach the RALs for cost/accounting <*internal>
|
||||
],
|
||||
"resources_conns": [], // address where to reach the Resource service, empty to disable functionality: <""|*internal|x.y.z.y:1234>
|
||||
"stats_conns": [], // address where to reach the Stat service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
|
||||
"default_ratio":1,
|
||||
@@ -51,7 +48,6 @@ func TestSupplierSCfgloadFromJsonCfg(t *testing.T) {
|
||||
expected = SupplierSCfg{
|
||||
PrefixIndexedFields: &[]string{"index1", "index2"},
|
||||
AttributeSConns: []*RemoteHost{},
|
||||
RALsConns: []*RemoteHost{{Address: "*internal"}},
|
||||
ResourceSConns: []*RemoteHost{},
|
||||
StatSConns: []*RemoteHost{},
|
||||
DefaultRatio: 1,
|
||||
|
||||
@@ -497,9 +497,6 @@
|
||||
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
|
||||
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
|
||||
// "attributes_conns": [], // connections to AttributeS for altering events before supplier queries: <""|*internal|127.0.0.1:2013>
|
||||
// "rals_conns": [ // connections to RALs for cost/accounting <*internal>
|
||||
// {"address": "*internal"},
|
||||
// ],
|
||||
// "resources_conns": [], // connections to ResourceS for *res sorting, empty to disable functionality: <""|*internal|x.y.z.y:1234>
|
||||
// "stats_conns": [], // connections to StatS for *stats sorting, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
|
||||
// },
|
||||
|
||||
@@ -129,9 +129,6 @@
|
||||
|
||||
"suppliers": {
|
||||
"enabled": true,
|
||||
"rals_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
"resources_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
|
||||
@@ -126,9 +126,6 @@
|
||||
|
||||
"suppliers": {
|
||||
"enabled": true,
|
||||
"rals_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
"resources_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
|
||||
@@ -125,9 +125,6 @@
|
||||
|
||||
"suppliers": {
|
||||
"enabled": true,
|
||||
"rals_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
"resources_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
|
||||
@@ -122,9 +122,6 @@
|
||||
|
||||
"suppliers": {
|
||||
"enabled": true,
|
||||
"rals_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"resources_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
|
||||
@@ -129,9 +129,6 @@
|
||||
|
||||
"suppliers": {
|
||||
"enabled": true,
|
||||
"rals_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
"resources_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
|
||||
@@ -126,9 +126,6 @@
|
||||
|
||||
"suppliers": {
|
||||
"enabled": true,
|
||||
"rals_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
"resources_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
|
||||
@@ -125,9 +125,6 @@
|
||||
|
||||
"suppliers": {
|
||||
"enabled": true,
|
||||
"rals_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
"resources_conns": [
|
||||
{"address": "*internal"},
|
||||
],
|
||||
|
||||
@@ -122,9 +122,6 @@
|
||||
|
||||
"suppliers": {
|
||||
"enabled": true,
|
||||
"rals_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"resources_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
|
||||
@@ -123,9 +123,6 @@
|
||||
|
||||
"suppliers": {
|
||||
"enabled": true,
|
||||
"rals_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"resources_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
|
||||
@@ -108,9 +108,9 @@ func (lps SupplierProfiles) Sort() {
|
||||
sort.Slice(lps, func(i, j int) bool { return lps[i].Weight > lps[j].Weight })
|
||||
}
|
||||
|
||||
// NewLCRService initializes a LCRService
|
||||
func NewSupplierService(dm *DataManager, timezone string,
|
||||
filterS *FilterS, stringIndexedFields, prefixIndexedFields *[]string, resourceS,
|
||||
// NewSupplierService initializes the Supplier Service
|
||||
func NewSupplierService(dm *DataManager,
|
||||
filterS *FilterS, cgrcfg *config.CGRConfig, resourceS,
|
||||
statS, attributeS rpcclient.RpcClientConnection) (spS *SupplierService, err error) {
|
||||
if attributeS != nil && reflect.ValueOf(attributeS).IsNil() { // fix nil value in interface
|
||||
attributeS = nil
|
||||
@@ -122,14 +122,13 @@ func NewSupplierService(dm *DataManager, timezone string,
|
||||
statS = nil
|
||||
}
|
||||
spS = &SupplierService{
|
||||
dm: dm,
|
||||
timezone: timezone,
|
||||
filterS: filterS,
|
||||
attributeS: attributeS,
|
||||
resourceS: resourceS,
|
||||
statS: statS,
|
||||
stringIndexedFields: stringIndexedFields,
|
||||
prefixIndexedFields: prefixIndexedFields}
|
||||
dm: dm,
|
||||
filterS: filterS,
|
||||
attributeS: attributeS,
|
||||
resourceS: resourceS,
|
||||
statS: statS,
|
||||
cgrcfg: cgrcfg,
|
||||
}
|
||||
if spS.sorter, err = NewSupplierSortDispatcher(spS); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -138,15 +137,13 @@ func NewSupplierService(dm *DataManager, timezone string,
|
||||
|
||||
// SupplierService is the service computing Supplier queries
|
||||
type SupplierService struct {
|
||||
dm *DataManager
|
||||
timezone string
|
||||
filterS *FilterS
|
||||
stringIndexedFields *[]string
|
||||
prefixIndexedFields *[]string
|
||||
attributeS,
|
||||
resourceS,
|
||||
statS rpcclient.RpcClientConnection
|
||||
sorter SupplierSortDispatcher
|
||||
dm *DataManager
|
||||
filterS *FilterS
|
||||
cgrcfg *config.CGRConfig
|
||||
attributeS rpcclient.RpcClientConnection
|
||||
resourceS rpcclient.RpcClientConnection
|
||||
statS rpcclient.RpcClientConnection
|
||||
sorter SupplierSortDispatcher
|
||||
}
|
||||
|
||||
// ListenAndServe will initialize the service
|
||||
@@ -166,7 +163,7 @@ func (spS *SupplierService) Shutdown() error {
|
||||
|
||||
// matchingSupplierProfilesForEvent returns ordered list of matching resources which are active by the time of the call
|
||||
func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *utils.CGREvent, singleResult bool) (matchingSLP []*SupplierProfile, err error) {
|
||||
sPrflIDs, err := MatchingItemIDsForEvent(ev.Event, spS.stringIndexedFields, spS.prefixIndexedFields,
|
||||
sPrflIDs, err := MatchingItemIDsForEvent(ev.Event, spS.cgrcfg.SupplierSCfg().StringIndexedFields, spS.cgrcfg.SupplierSCfg().PrefixIndexedFields,
|
||||
spS.dm, utils.CacheSupplierFilterIndexes, ev.Tenant, spS.filterS.cfg.SupplierSCfg().IndexedSelects)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -235,7 +232,7 @@ func (spS *SupplierService) costForEvent(ev *utils.CGREvent,
|
||||
return
|
||||
}
|
||||
var sTime time.Time
|
||||
if sTime, err = ev.FieldAsTime(utils.SetupTime, spS.timezone); err != nil {
|
||||
if sTime, err = ev.FieldAsTime(utils.SetupTime, spS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
|
||||
return
|
||||
}
|
||||
var usage time.Duration
|
||||
|
||||
@@ -300,10 +300,11 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
|
||||
splService, err = NewSupplierService(dmSPP,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone, &FilterS{
|
||||
dm: dmSPP,
|
||||
cfg: defaultCfg}, nil, nil, nil, nil, nil)
|
||||
defaultCfg.SupplierSCfg().StringIndexedFields = nil
|
||||
defaultCfg.SupplierSCfg().PrefixIndexedFields = nil
|
||||
splService, err = NewSupplierService(dmSPP, &FilterS{
|
||||
dm: dmSPP,
|
||||
cfg: defaultCfg}, defaultCfg, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
|
||||
125
services/suppliers.go
Normal file
125
services/suppliers.go
Normal file
@@ -0,0 +1,125 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewSupplierService returns the Supplier Service
|
||||
func NewSupplierService() servmanager.Service {
|
||||
return &SupplierService{
|
||||
connChan: make(chan rpcclient.RpcClientConnection, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// SupplierService implements Service interface
|
||||
type SupplierService struct {
|
||||
splS *engine.SupplierService
|
||||
rpc *v1.SupplierSv1
|
||||
connChan chan rpcclient.RpcClientConnection
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (splS *SupplierService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
|
||||
if splS.IsRunning() {
|
||||
return fmt.Errorf("service aleady running")
|
||||
}
|
||||
|
||||
if waitCache {
|
||||
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheSupplierProfiles)
|
||||
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheSupplierFilterIndexes)
|
||||
}
|
||||
var attrSConn, resourceSConn, statSConn rpcclient.RpcClientConnection
|
||||
|
||||
attrSConn, err = sp.GetConnection(utils.AttributeS, sp.GetConfig().SupplierSCfg().AttributeSConns)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
|
||||
utils.SupplierS, utils.SupplierS, err.Error()))
|
||||
return
|
||||
}
|
||||
statSConn, err = sp.GetConnection(utils.StatS, sp.GetConfig().SupplierSCfg().StatSConns)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
|
||||
utils.SupplierS, err.Error()))
|
||||
return
|
||||
}
|
||||
resourceSConn, err = sp.GetConnection(utils.ResourceS, sp.GetConfig().SupplierSCfg().ResourceSConns)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
|
||||
utils.SupplierS, err.Error()))
|
||||
return
|
||||
}
|
||||
splS.splS, err = engine.NewSupplierService(sp.GetDM(), sp.GetFilterS(), sp.GetConfig(),
|
||||
resourceSConn, statSConn, attrSConn)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s",
|
||||
utils.SupplierS, err.Error()))
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.SupplierS))
|
||||
splS.rpc = v1.NewSupplierSv1(splS.splS)
|
||||
if !sp.GetConfig().DispatcherSCfg().Enabled {
|
||||
sp.GetServer().RpcRegister(splS.rpc)
|
||||
}
|
||||
splS.connChan <- splS.rpc
|
||||
return
|
||||
}
|
||||
|
||||
// GetIntenternalChan returns the internal connection chanel
|
||||
func (splS *SupplierService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
|
||||
return splS.connChan
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (splS *SupplierService) Reload(sp servmanager.ServiceProvider) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (splS *SupplierService) Shutdown() (err error) {
|
||||
if err = splS.splS.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
splS.splS = nil
|
||||
splS.rpc = nil
|
||||
<-splS.connChan
|
||||
return
|
||||
}
|
||||
|
||||
// GetRPCInterface returns the interface to register for server
|
||||
func (splS *SupplierService) GetRPCInterface() interface{} {
|
||||
return splS.rpc
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (splS *SupplierService) IsRunning() bool {
|
||||
return splS != nil && splS.splS != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (splS *SupplierService) ServiceName() string {
|
||||
return utils.SupplierS
|
||||
}
|
||||
81
services/suppliers_it_test.go
Normal file
81
services/suppliers_it_test.go
Normal file
@@ -0,0 +1,81 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package services
|
||||
|
||||
import (
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestSupplierSReload(t *testing.T) {
|
||||
cfg, err := config.NewDefaultCGRConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cfg.StatSCfg().Enabled = true
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
engineShutdown := make(chan bool, 1)
|
||||
chS := engine.NewCacheS(cfg, nil)
|
||||
close(chS.GetPrecacheChannel(utils.CacheSupplierProfiles))
|
||||
close(chS.GetPrecacheChannel(utils.CacheSupplierFilterIndexes))
|
||||
close(chS.GetPrecacheChannel(utils.CacheStatQueueProfiles))
|
||||
close(chS.GetPrecacheChannel(utils.CacheStatQueues))
|
||||
close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes))
|
||||
server := utils.NewServer()
|
||||
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
|
||||
chS /*cdrStorage*/, nil,
|
||||
/*loadStorage*/ nil, filterSChan,
|
||||
server, nil, engineShutdown)
|
||||
supS := NewSupplierService()
|
||||
srvMngr.AddService(supS, NewStatService())
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if supS.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
var reply string
|
||||
if err := cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{
|
||||
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongonew"),
|
||||
Section: config.SupplierSJson,
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expecting OK ,received %s", reply)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
|
||||
if !supS.IsRunning() {
|
||||
t.Errorf("Expected service to be running")
|
||||
}
|
||||
cfg.SupplierSCfg().Enabled = false
|
||||
cfg.GetReloadChan(config.SupplierSJson) <- struct{}{}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if supS.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
engineShutdown <- true
|
||||
}
|
||||
@@ -308,10 +308,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
}
|
||||
if srvMngr.cfg.ThresholdSCfg().Enabled {
|
||||
go func() {
|
||||
if chrS, has := srvMngr.subsystems[utils.ThresholdS]; !has {
|
||||
if thrS, has := srvMngr.subsystems[utils.ThresholdS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = chrS.Start(srvMngr, true); err != nil {
|
||||
} else if err = thrS.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ThresholdS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
@@ -319,10 +319,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
}
|
||||
if srvMngr.cfg.StatSCfg().Enabled {
|
||||
go func() {
|
||||
if chrS, has := srvMngr.subsystems[utils.StatS]; !has {
|
||||
if stS, has := srvMngr.subsystems[utils.StatS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = chrS.Start(srvMngr, true); err != nil {
|
||||
} else if err = stS.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.StatS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
@@ -330,15 +330,26 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
}
|
||||
if srvMngr.cfg.ResourceSCfg().Enabled {
|
||||
go func() {
|
||||
if chrS, has := srvMngr.subsystems[utils.ResourceS]; !has {
|
||||
if reS, has := srvMngr.subsystems[utils.ResourceS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ResourceS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = chrS.Start(srvMngr, true); err != nil {
|
||||
} else if err = reS.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ResourceS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
}
|
||||
if srvMngr.cfg.SupplierSCfg().Enabled {
|
||||
go func() {
|
||||
if supS, has := srvMngr.subsystems[utils.SupplierS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = supS.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.SupplierS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
}
|
||||
// startServer()
|
||||
return
|
||||
}
|
||||
@@ -500,6 +511,34 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
}
|
||||
case <-srvMngr.cfg.GetReloadChan(config.SupplierSJson):
|
||||
tS, has := srvMngr.subsystems[utils.SupplierS]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if srvMngr.cfg.SupplierSCfg().Enabled {
|
||||
if tS.IsRunning() {
|
||||
if err = tS.Reload(srvMngr); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to reload <%s>", utils.ServiceManager, utils.SupplierS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
} else {
|
||||
if err = tS.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
}
|
||||
} else if tS.IsRunning() {
|
||||
if err = tS.Shutdown(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to stop service <%s>", utils.ServiceManager, utils.SupplierS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
}
|
||||
}
|
||||
// handle RPC server
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user