Add connections through ConnManager

This commit is contained in:
TeoV
2019-12-09 11:41:57 -05:00
parent 18150825bb
commit 4bd4ae3ee9
129 changed files with 593 additions and 1173 deletions

View File

@@ -536,7 +536,7 @@ func (ub *Account) debitCreditBalance(cd *CallDescriptor, count bool, dryRun boo
defaultBalance := ub.GetDefaultMoneyBalance()
defaultBalance.SubstractValue(cost)
//send default balance to thresholdS to be processed
if thresholdS != nil {
if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 {
acntTnt := utils.NewTenantID(ub.ID)
thEv := &ArgsProcessEvent{
CGREvent: &utils.CGREvent{
@@ -549,7 +549,8 @@ func (ub *Account) debitCreditBalance(cd *CallDescriptor, count bool, dryRun boo
utils.BalanceID: defaultBalance.ID,
utils.Units: defaultBalance.Value}}}
var tIDs []string
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: <%s> processing balance event <%+v> with ThresholdS.",
@@ -1103,10 +1104,11 @@ func (acnt *Account) Publish() {
utils.Account: acntTnt.ID,
utils.AllowNegative: acnt.AllowNegative,
utils.Disabled: acnt.Disabled}}
if statS != nil {
if len(config.CgrConfig().RalsCfg().StatSConns) != 0 {
go func() {
var reply []string
if err := statS.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: cgrEv}, &reply); err != nil &&
if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns,
utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: cgrEv}, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with StatS.",
@@ -1114,10 +1116,11 @@ func (acnt *Account) Publish() {
}
}()
}
if thresholdS != nil {
if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 {
go func() {
var tIDs []string
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent,
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent,
&ArgsProcessEvent{CGREvent: cgrEv}, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(

View File

@@ -721,10 +721,11 @@ func (b *Balance) Publish() {
if !b.ExpirationDate.IsZero() {
cgrEv.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339)
}
if statS != nil {
if len(config.CgrConfig().RalsCfg().StatSConns) != 0 {
go func() {
var reply []string
if err := statS.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: cgrEv}, &reply); err != nil &&
if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns,
utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: cgrEv}, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with StatS.",
@@ -732,10 +733,11 @@ func (b *Balance) Publish() {
}
}()
}
if thresholdS != nil {
if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 {
go func() {
var tIDs []string
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, &ArgsProcessEvent{CGREvent: cgrEv}, &tIDs); err != nil &&
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, &ArgsProcessEvent{CGREvent: cgrEv}, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with ThresholdS.",
@@ -840,9 +842,10 @@ func (bc Balances) SaveDirtyBalances(acc *Account) {
if !b.ExpirationDate.IsZero() {
thEv.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339)
}
if thresholdS != nil {
if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 {
var tIDs []string
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with ThresholdS.",
@@ -855,7 +858,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) {
savedAccounts[b.account.ID] = b.account
}
}
if len(savedAccounts) != 0 && thresholdS != nil {
if len(savedAccounts) != 0 && len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 {
for _, acnt := range savedAccounts {
acntTnt := utils.NewTenantID(acnt.ID)
thEv := &ArgsProcessEvent{
@@ -869,7 +872,8 @@ func (bc Balances) SaveDirtyBalances(acc *Account) {
utils.AllowNegative: acnt.AllowNegative,
utils.Disabled: acnt.Disabled}}}
var tIDs []string
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing account event %+v with ThresholdS.", err.Error(), thEv))

View File

@@ -57,9 +57,8 @@ var (
cdrStorage CdrStorage
debitPeriod = 10 * time.Second
globalRoundingDecimals = 6
thresholdS rpcclient.ClientConnector // used by RALs to communicate with ThresholdS
statS rpcclient.ClientConnector
schedCdrsConns rpcclient.ClientConnector
connMgr *ConnManager
schedCdrsConns rpcclient.RpcClientConnection
rpSubjectPrefixMatching bool
)
@@ -68,12 +67,9 @@ func SetDataStorage(dm2 *DataManager) {
dm = dm2
}
func SetThresholdS(thdS rpcclient.ClientConnector) {
thresholdS = thdS
}
func SetStatS(stsS rpcclient.ClientConnector) {
statS = stsS
// SetConnManager is the exported method to set the connectionManager used when operate on an account.
func SetConnManager(conMgr *ConnManager) {
connMgr = conMgr
}
// SetRoundingDecimals sets the global rounding method and decimal precision for GetCost method

View File

@@ -34,7 +34,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
const (
@@ -54,7 +53,7 @@ const (
func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, exportPath, fallbackPath, exportID string,
synchronous bool, attempts int, fieldSeparator rune,
httpSkipTlsCheck bool, httpPoster *HTTPPoster, attrS rpcclient.ClientConnector, filterS *FilterS) (*CDRExporter, error) {
httpSkipTlsCheck bool, httpPoster *HTTPPoster, attrsConns []string, filterS *FilterS) (*CDRExporter, error) {
if len(cdrs) == 0 { // Nothing to export
return nil, nil
}
@@ -71,7 +70,7 @@ func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, e
httpSkipTlsCheck: httpSkipTlsCheck,
httpPoster: httpPoster,
negativeExports: make(map[string]string),
attrS: attrS,
attrsConns: attrsConns,
filterS: filterS,
}
return cdre, nil
@@ -103,8 +102,8 @@ type CDRExporter struct {
positiveExports []string // CGRIDs of successfully exported CDRs
negativeExports map[string]string // CGRIDs of failed exports
attrS rpcclient.ClientConnector
filterS *FilterS
attrsConns []string
filterS *FilterS
}
// Handle various meta functions used in header/trailer
@@ -302,7 +301,7 @@ func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
}
// send the cdr to be processed by attributeS
if cdre.exportTemplate.AttributeSContext != utils.EmptyString {
if cdre.attrS == nil {
if len(cdre.attrsConns) == 0 {
return errors.New("no connection to AttributeS")
}
args := &AttrArgsProcessEvent{
@@ -310,7 +309,8 @@ func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
CGREvent: cdr.AsCGREvent(),
}
var evReply AttrSProcessEventReply
if err = cdre.attrS.Call(utils.AttributeSv1ProcessEvent,
if err = connMgr.Call(cdre.attrsConns,
utils.AttributeSv1ProcessEvent,
args, &evReply); err != nil {
return err
}

View File

@@ -437,7 +437,7 @@ func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) {
expTpl.ExportPath, cdrS.cgrCfg.GeneralCfg().FailedPostsDir,
"CDRSReplication", expTpl.Synchronous, expTpl.Attempts,
expTpl.FieldSeparator, cdrS.cgrCfg.GeneralCfg().HttpSkipTlsVerify, cdrS.httpPoster,
cdrS.attrS, cdrS.filterS); err != nil {
cdrS.cgrCfg.ApierCfg().AttributeSConns, cdrS.filterS); err != nil { // DON"T FORHERT TO TAKE ATTRIBUTESC FROM CDRS
utils.Logger.Err(fmt.Sprintf("<CDRS> Building CDRExporter for online exports got error: <%s>", err.Error()))
continue
}

View File

@@ -31,7 +31,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
"github.com/cgrates/rpcclient"
"github.com/creack/pty"
)
@@ -376,8 +375,9 @@ func StopStartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) {
}
func LoadTariffPlanFromFolder(tpPath, timezone string, dm *DataManager, disable_reverse bool,
cacheS rpcclient.ClientConnector, schedulerS rpcclient.ClientConnector) error {
loader, err := NewTpReader(dm.dataDB, NewFileCSVStorage(utils.CSV_SEP, tpPath, false), "", timezone, cacheS, schedulerS)
cacheConns, schedConns []string) error {
loader, err := NewTpReader(dm.dataDB, NewFileCSVStorage(utils.CSV_SEP, tpPath, false), "",
timezone, cacheConns, schedConns)
if err != nil {
return utils.NewErrServerError(err)
}

View File

@@ -62,13 +62,13 @@ type TpReader struct {
thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles
revDests,
acntActionPlans map[string][]string
cacheS rpcclient.ClientConnector
schedulerS rpcclient.ClientConnector
cacheConns []string
schedulerConns []string
}
func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string,
cacheS rpcclient.ClientConnector, schedulerS rpcclient.ClientConnector) (*TpReader, error) {
var rmtConns, rplConns *rpcclient.RPCPool
cacheConns, schedulerConns []string) (*TpReader, error) {
var rmtConns, rplConns *rpcclient.RpcClientPool
if len(config.CgrConfig().DataDbCfg().RmtConns) != 0 {
var err error
rmtConns, err = NewRPCPool(rpcclient.PoolFirstPositive, config.CgrConfig().TlsCfg().ClientKey,
@@ -92,12 +92,12 @@ func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string,
}
}
tpr := &TpReader{
tpid: tpid,
timezone: timezone,
dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtConns, rplConns), // ToDo: add ChacheCfg as parameter to the NewTpReader
lr: lr,
cacheS: cacheS,
schedulerS: schedulerS,
tpid: tpid,
timezone: timezone,
dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtConns, rplConns), // ToDo: add ChacheCfg as parameter to the NewTpReader
lr: lr,
cacheConns: cacheConns,
schedulerConns: schedulerConns,
}
tpr.Init()
//add *any and *asap timing tag (in case of no timings file)
@@ -2385,7 +2385,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
}
func (tpr *TpReader) ReloadCache(caching string, verbose bool, argDispatcher *utils.ArgDispatcher) (err error) {
if tpr.cacheS == nil {
if len(tpr.cacheConns) == 0 {
log.Print("Disabled automatic reload")
return
}
@@ -2450,20 +2450,20 @@ func (tpr *TpReader) ReloadCache(caching string, verbose bool, argDispatcher *ut
case utils.META_NONE:
return
case utils.MetaReload:
if err = tpr.cacheS.Call(utils.CacheSv1ReloadCache, cacheArgs, &reply); err != nil {
if err = connMgr.Call(tpr.cacheConns, utils.CacheSv1ReloadCache, cacheArgs, &reply); err != nil {
return
}
case utils.MetaLoad:
if err = tpr.cacheS.Call(utils.CacheSv1LoadCache, cacheArgs, &reply); err != nil {
if err = connMgr.Call(tpr.cacheConns, utils.CacheSv1LoadCache, cacheArgs, &reply); err != nil {
return
}
case utils.MetaRemove:
if err = tpr.cacheS.Call(utils.CacheSv1FlushCache, cacheArgs, &reply); err != nil {
if err = connMgr.Call(tpr.cacheConns, utils.CacheSv1FlushCache, cacheArgs, &reply); err != nil {
return
}
case utils.MetaClear:
cacheArgs.FlushAll = true
if err = tpr.cacheS.Call(utils.CacheSv1FlushCache, cacheArgs, &reply); err != nil {
if err = connMgr.Call(tpr.cacheConns, utils.CacheSv1FlushCache, cacheArgs, &reply); err != nil {
return
}
}
@@ -2498,7 +2498,7 @@ func (tpr *TpReader) ReloadCache(caching string, verbose bool, argDispatcher *ut
ArgDispatcher: argDispatcher,
CacheIDs: cacheIDs,
}
if err = tpr.cacheS.Call(utils.CacheSv1Clear, clearArgs, &reply); err != nil {
if err = connMgr.Call(tpr.cacheConns, utils.CacheSv1Clear, clearArgs, &reply); err != nil {
log.Printf("WARNING: Got error on cache clear: %s\n", err.Error())
}
@@ -2523,7 +2523,7 @@ func (tpr *TpReader) ReloadScheduler(verbose bool) (err error) {
if verbose {
log.Print("Reloading scheduler")
}
if err = tpr.schedulerS.Call(utils.SchedulerSv1Reload,
if err = connMgr.Call(tpr.schedulerConns, utils.SchedulerSv1Reload,
new(utils.CGREventWithArgDispatcher), &reply); err != nil {
log.Printf("WARNING: Got error on scheduler reload: %s\n", err.Error())
}