SupplierS started by engine, adding additional configuration options, enablind suppliers in tutorial configurations

This commit is contained in:
DanB
2017-11-28 18:29:44 +01:00
parent a3eacda7a6
commit 567d4d95db
15 changed files with 175 additions and 27 deletions

View File

@@ -66,23 +66,23 @@ func (apierV1 *ApierV1) RemSupplierProfile(arg utils.TenantID, reply *string) er
return nil
}
func NewSuplierSv1(splS *engine.SupplierService) *SuplierSv1 {
return &SuplierSv1{splS: splS}
func NewSupplierSv1(splS *engine.SupplierService) *SupplierSv1 {
return &SupplierSv1{splS: splS}
}
// Exports RPC from RLs
type SuplierSv1 struct {
type SupplierSv1 struct {
splS *engine.SupplierService
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
func (splv1 *SuplierSv1) Call(serviceMethod string,
func (splv1 *SupplierSv1) Call(serviceMethod string,
args interface{}, reply interface{}) error {
return utils.APIerRPCCall(splv1, serviceMethod, args, reply)
}
// GetSuppliers returns sorted list of suppliers for Event
func (splv1 *SuplierSv1) GetSuppliers(args *engine.SupplierEvent,
func (splv1 *SupplierSv1) GetSuppliers(args *engine.SupplierEvent,
reply *engine.SortedSuppliers) error {
return splv1.splS.V1GetSuppliers(args, reply)
}

View File

@@ -43,7 +43,7 @@ var sTestsSupplierSV1 = []func(t *testing.T){
testV1SplSLoadConfig,
testV1SplSInitDataDb,
testV1SplSResetStorDb,
testV1SplSStartEngine,
//testV1SplSStartEngine,
testV1SplSRpcConn,
testV1SplSFromFolder,
testV1SplSGetWeightSuppliers,
@@ -120,7 +120,10 @@ func testV1SplSGetWeightSuppliers(t *testing.T) {
ev := &engine.SupplierEvent{
Tenant: "cgrates.org",
ID: "testV1SplSGetWeightSuppliers",
Event: map[string]interface{}{},
Event: map[string]interface{}{
"Account": "1007",
"Destination": "+491511231234",
},
}
var suplsReply engine.SortedSuppliers
if err := splSv1Rpc.Call(utils.SupplierSv1GetSuppliers,

View File

@@ -634,6 +634,58 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec
internalThresholdSChan <- tSv1
}
// startSupplierService fires up the ThresholdS
func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
filterS := <-filterSChan
filterSChan <- filterS
var resourceSConn, statSConn *rpcclient.RpcClientPool
if len(cfg.SupplierSCfg().ResourceSConns) != 0 {
resourceSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.ConnectAttempts, cfg.Reconnects,
cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SupplierSCfg().ResourceSConns,
internalRsChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResourceS: %s",
utils.SupplierS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.SupplierSCfg().StatSConns) != 0 {
statSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.ConnectAttempts, cfg.Reconnects,
cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SupplierSCfg().StatSConns,
internalStatSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
utils.SupplierS, err.Error()))
exitChan <- true
return
}
}
splS, err := engine.NewSupplierService(dm, cfg.DefaultTimezone, filterS,
cfg.SupplierSCfg().IndexedFields, resourceSConn, statSConn)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s",
utils.SupplierS, err.Error()))
exitChan <- true
return
}
go func() {
if err := splS.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets",
utils.SupplierS, err.Error()))
}
splS.Shutdown()
exitChan <- true
return
}()
splV1 := v1.NewSupplierSv1(splS)
server.RpcRegister(splV1)
internalSupplierSChan <- splV1
}
// startFilterService fires up the FilterS
func startFilterService(filterSChan chan *engine.FilterS,
internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
@@ -818,6 +870,7 @@ func main() {
internalRsChan := make(chan rpcclient.RpcClientConnection, 1)
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1)
internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1)
filterSChan := make(chan *engine.FilterS, 1)
// Start ServiceManager
@@ -926,6 +979,11 @@ func main() {
go startThresholdService(internalThresholdSChan, cfg, dm, server, exitChan, filterSChan)
}
if cfg.SupplierSCfg().Enabled {
go startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan,
cfg, dm, server, exitChan, filterSChan)
}
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan,
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan)

View File

@@ -537,6 +537,27 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
}
// SupplierS checks
if self.supplierSCfg != nil && self.supplierSCfg.Enabled {
for _, connCfg := range self.supplierSCfg.RALsConns {
if connCfg.Address != utils.MetaInternal {
return errors.New("Only *internal connectivity allowed in SupplierS for now")
}
if connCfg.Address == utils.MetaInternal && !self.RALsEnabled {
return errors.New("RALs not enabled but requested by SupplierS component.")
}
}
for _, connCfg := range self.supplierSCfg.ResourceSConns {
if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled {
return errors.New("ResourceS not enabled but requested by SupplierS component.")
}
}
for _, connCfg := range self.supplierSCfg.StatSConns {
if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled {
return errors.New("StatS not enabled but requested by SupplierS component.")
}
}
}
return nil
}
@@ -1249,7 +1270,7 @@ func (cfg *CGRConfig) ThresholdSCfg() *ThresholdSCfg {
return cfg.thresholdSCfg
}
func (cfg *CGRConfig) SuplierSCfg() *SupplierSCfg {
func (cfg *CGRConfig) SupplierSCfg() *SupplierSCfg {
return cfg.supplierSCfg
}

View File

@@ -449,6 +449,11 @@ const CGRATES_CFG_JSON = `
"suppliers": {
"enabled": false, // starts SupplierS service: <true|false>.
"indexed_fields": [], // query indexes based on these fields for faster processing
"rals_conns": [
{"address": "*internal"}, // address where to reach the RALs for cost/accounting <*internal>
],
"resources_conns": [], // address where to reach the Resource service, empty to disable functionality: <""|*internal|x.y.z.y:1234>
"stats_conns": [], // address where to reach the Stat service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
},

View File

@@ -750,6 +750,13 @@ func TestDfSupplierSJsonCfg(t *testing.T) {
eCfg := &SupplierSJsonCfg{
Enabled: utils.BoolPointer(false),
Indexed_fields: utils.StringSlicePointer([]string{}),
Rals_conns: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Address: utils.StringPointer("*internal"),
},
},
Resources_conns: &[]*HaPoolJsonCfg{},
Stats_conns: &[]*HaPoolJsonCfg{},
}
if cfg, err := dfCgrJsonCfg.SupplierSJsonCfg(); err != nil {
t.Error(err)

View File

@@ -675,6 +675,11 @@ func TestCgrCfgJSONDefaultSupplierSCfg(t *testing.T) {
eSupplSCfg := &SupplierSCfg{
Enabled: false,
IndexedFields: []string{},
RALsConns: []*HaPoolConfig{
&HaPoolConfig{Address: "*internal"},
},
ResourceSConns: []*HaPoolConfig{},
StatSConns: []*HaPoolConfig{},
}
if !reflect.DeepEqual(eSupplSCfg, cgrCfg.supplierSCfg) {
t.Errorf("received: %+v, expecting: %+v", eSupplSCfg, cgrCfg.supplierSCfg)

View File

@@ -412,8 +412,11 @@ type ThresholdSJsonCfg struct {
// Supplier service config section
type SupplierSJsonCfg struct {
Enabled *bool
Indexed_fields *[]string
Enabled *bool
Indexed_fields *[]string
Rals_conns *[]*HaPoolJsonCfg
Resources_conns *[]*HaPoolJsonCfg
Stats_conns *[]*HaPoolJsonCfg
}
// Mailer config section

View File

@@ -18,9 +18,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
// SupplierSCfg is the configuration of supplier service
type SupplierSCfg struct {
Enabled bool
IndexedFields []string
Enabled bool
IndexedFields []string
RALsConns []*HaPoolConfig
ResourceSConns []*HaPoolConfig
StatSConns []*HaPoolConfig
}
func (spl *SupplierSCfg) loadFromJsonCfg(jsnCfg *SupplierSJsonCfg) (err error) {
@@ -36,5 +40,26 @@ func (spl *SupplierSCfg) loadFromJsonCfg(jsnCfg *SupplierSJsonCfg) (err error) {
spl.IndexedFields[i] = fID
}
}
if jsnCfg.Rals_conns != nil {
spl.RALsConns = make([]*HaPoolConfig, len(*jsnCfg.Rals_conns))
for idx, jsnHaCfg := range *jsnCfg.Rals_conns {
spl.RALsConns[idx] = NewDfltHaPoolConfig()
spl.RALsConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Resources_conns != nil {
spl.ResourceSConns = make([]*HaPoolConfig, len(*jsnCfg.Resources_conns))
for idx, jsnHaCfg := range *jsnCfg.Resources_conns {
spl.ResourceSConns[idx] = NewDfltHaPoolConfig()
spl.ResourceSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Stats_conns != nil {
spl.StatSConns = make([]*HaPoolConfig, len(*jsnCfg.Stats_conns))
for idx, jsnHaCfg := range *jsnCfg.Stats_conns {
spl.StatSConns[idx] = NewDfltHaPoolConfig()
spl.StatSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
return nil
}

View File

@@ -146,6 +146,11 @@
},
"suppliers": {
"enabled": true,
},
"historys": {
"enabled": true,
},

View File

@@ -138,6 +138,11 @@
},
"suppliers": {
"enabled": true,
},
"historys": {
"enabled": true,
},

View File

@@ -91,13 +91,17 @@
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"suppliers": {
"enabled": true,
},
"sm_generic": {
"enabled": true,
},

View File

@@ -4,7 +4,7 @@ cgrates.org,FLTR_1,*string_prefix,Destination,10;20,
cgrates.org,FLTR_1,*rsr_fields,,Subject(~^1.*1$);Destination(1002),
cgrates.org,FLTR_ACNT_1007,*string,Account,1007,2014-07-29T15:00:00Z
cgrates.org,FLTR_ACNT_dan,*string,Account,dan,2014-07-29T15:00:00Z
cgrates.org,FLTR_DST_DE,*destinations,Destination,DST_DE,2014-07-29T15:00:00Z
cgrates.org,FLTR_DST_DE,*destinations,Destination,DST_DE_MOBILE,2014-07-29T15:00:00Z
cgrates.org,FLTR_DST_NL,*destinations,Destination,DST_NL,2014-07-29T15:00:00Z
cgrates.org,FLTR_ACNT_BALANCE_1,*string,Account,1001;1002,2014-07-29T15:00:00Z
cgrates.org,FLTR_ACNT_BALANCE_1,*string,EventType,BalanceUpdate,
1 #Tenant[0] ID[1] FilterType[2] FilterFieldName[3] FilterFieldValues[4] ActivationInterval[5]
4 cgrates.org FLTR_1 *rsr_fields Subject(~^1.*1$);Destination(1002)
5 cgrates.org FLTR_ACNT_1007 *string Account 1007 2014-07-29T15:00:00Z
6 cgrates.org FLTR_ACNT_dan *string Account dan 2014-07-29T15:00:00Z
7 cgrates.org FLTR_DST_DE *destinations Destination DST_DE DST_DE_MOBILE 2014-07-29T15:00:00Z
8 cgrates.org FLTR_DST_NL *destinations Destination DST_NL 2014-07-29T15:00:00Z
9 cgrates.org FLTR_ACNT_BALANCE_1 *string Account 1001;1002 2014-07-29T15:00:00Z
10 cgrates.org FLTR_ACNT_BALANCE_1 *string EventType BalanceUpdate

View File

@@ -38,6 +38,7 @@ type SupplierSortDispatcher map[string]SuppliersSorter
func (ssd SupplierSortDispatcher) SortSuppliers(prflID, strategy string,
suppls Suppliers) (sortedSuppls *SortedSuppliers, err error) {
fmt.Printf("Sort strategy: %s, suppliers: %s\n", strategy, utils.ToJSON(suppls))
sd, has := ssd[strategy]
if !has {
return nil, fmt.Errorf("unsupported sorting strategy: %s", strategy)
@@ -75,6 +76,7 @@ type WeightSorter struct {
func (ws *WeightSorter) SortSuppliers(prflID string,
suppls Suppliers) (sortedSuppls *SortedSuppliers, err error) {
fmt.Printf("Sort suppliers: %s\n", utils.ToJSON(suppls))
suppls.Sort()
sortedSuppls = &SortedSuppliers{ProfileID: prflID,
Sorting: ws.Sorting,

View File

@@ -141,7 +141,7 @@ type SupplierService struct {
// ListenAndServe will initialize the service
func (spS *SupplierService) ListenAndServe(exitChan chan bool) error {
utils.Logger.Info(fmt.Sprintf("<%s> start listening for requests", utils.SupplierS))
utils.Logger.Info("Starting Supplier Service")
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return nil
@@ -155,17 +155,18 @@ func (spS *SupplierService) Shutdown() error {
}
// matchingSupplierProfilesForEvent returns ordered list of matching resources which are active by the time of the call
func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *SupplierEvent) (lps SupplierProfiles, err error) {
func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *SupplierEvent) (sPrfls SupplierProfiles, err error) {
matchingLPs := make(map[string]*SupplierProfile)
lpIDs, err := matchingItemIDsForEvent(ev.Event, spS.indexedFields,
sPrflIDs, err := matchingItemIDsForEvent(ev.Event, spS.indexedFields,
spS.dm, utils.SupplierProfilesStringIndex+ev.Tenant)
if err != nil {
return nil, err
}
lockIDs := utils.PrefixSliceItems(lpIDs.Slice(), utils.SupplierProfilesStringIndex)
lockIDs := utils.PrefixSliceItems(sPrflIDs.Slice(), utils.SupplierProfilesStringIndex)
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
for lpID := range lpIDs {
for lpID := range sPrflIDs {
lcrPrfl, err := spS.dm.GetSupplierProfile(ev.Tenant, lpID, false, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
@@ -175,7 +176,11 @@ func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *SupplierEvent)
}
aTime, err := ev.AnswerTime(spS.timezone)
if err != nil {
return nil, err
if err == utils.ErrNotFound {
aTime = time.Now()
} else {
return nil, err
}
}
if lcrPrfl.ActivationInterval != nil &&
!lcrPrfl.ActivationInterval.IsActiveAtTime(aTime) { // not active
@@ -189,16 +194,16 @@ func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *SupplierEvent)
matchingLPs[lpID] = lcrPrfl
}
// All good, convert from Map to Slice so we can sort
lps = make(SupplierProfiles, len(matchingLPs))
sPrfls = make(SupplierProfiles, len(matchingLPs))
i := 0
for _, lp := range matchingLPs {
lps[i] = lp
for _, sPrfl := range matchingLPs {
sPrfls[i] = sPrfl
i++
}
lps.Sort()
for i, lp := range lps {
if lp.Blocker { // blocker will stop processing
lps = lps[:i+1]
sPrfls.Sort()
for i, sPrfl := range sPrfls {
if sPrfl.Blocker { // blocker will stop processing
sPrfls = sPrfls[:i+1]
break
}
}