Added connection reload for SessionS

This commit is contained in:
Trial97
2019-09-25 14:18:47 +03:00
committed by Dan Christian Bogos
parent 88fb9c7926
commit 45ab2f9257
11 changed files with 145 additions and 22 deletions

View File

@@ -309,7 +309,7 @@ func testDiamItDryRun(t *testing.T) {
t.Fatal("Diameter client should not be nil")
}
if diamClnt.conn == nil {
t.Fatal("Diameter conection should not be nil")
t.Fatal("Diameter connection should not be nil")
}
if ccr == nil {
t.Fatal("The mesage to diameter should not be nil")
@@ -558,7 +558,7 @@ func testDiamItCCRInit(t *testing.T) {
t.Fatal("Diameter client should not be nil")
}
if diamClnt.conn == nil {
t.Fatal("Diameter conection should not be nil")
t.Fatal("Diameter connection should not be nil")
}
if m == nil {
t.Fatal("The mesage to diameter should not be nil")
@@ -646,7 +646,7 @@ func testDiamItCCRUpdate(t *testing.T) {
t.Fatal("Diameter client should not be nil")
}
if diamClnt.conn == nil {
t.Fatal("Diameter conection should not be nil")
t.Fatal("Diameter connection should not be nil")
}
if m == nil {
t.Fatal("The mesage to diameter should not be nil")
@@ -734,7 +734,7 @@ func testDiamItCCRTerminate(t *testing.T) {
t.Fatal("Diameter client should not be nil")
}
if diamClnt.conn == nil {
t.Fatal("Diameter conection should not be nil")
t.Fatal("Diameter connection should not be nil")
}
if m == nil {
t.Fatal("The mesage to diameter should not be nil")
@@ -831,7 +831,7 @@ func testDiamItCCRSMS(t *testing.T) {
t.Fatal("Diameter client should not be nil")
}
if diamClnt.conn == nil {
t.Fatal("Diameter conection should not be nil")
t.Fatal("Diameter connection should not be nil")
}
if ccr == nil {
t.Fatal("The mesage to diameter should not be nil")
@@ -921,7 +921,7 @@ func testDiamInitWithSessionDisconnect(t *testing.T) {
t.Fatal("Diameter client should not be nil")
}
if diamClnt.conn == nil {
t.Fatal("Diameter conection should not be nil")
t.Fatal("Diameter connection should not be nil")
}
if m == nil {
t.Fatal("The mesage to diameter should not be nil")

View File

@@ -1295,19 +1295,19 @@ func (apierV1 *ApierV1) GetRatingPlanIDs(args utils.TenantArgWithPaginator, attr
return nil
}
// SetAttributeSConnection sets the new conection to the attribute service
// SetAttributeSConnection sets the new connection to the attribute service
// only used on reload
func (v1 *ApierV1) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) {
v1.AttributeS = attrS
}
// SetCacheSConnection sets the new conection to the cache service
// SetCacheSConnection sets the new connection to the cache service
// only used on reload
func (v1 *ApierV1) SetCacheSConnection(chS rpcclient.RpcClientConnection) {
v1.CacheS = chS
}
// SetSchedulerSConnection sets the new conection to the scheduler service
// SetSchedulerSConnection sets the new connection to the scheduler service
// only used on reload
func (v1 *ApierV1) SetSchedulerSConnection(schS rpcclient.RpcClientConnection) {
v1.SchedulerS = schS

View File

@@ -830,31 +830,31 @@ func (cdrS *CDRServer) V1CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, c
return nil
}
// SetAttributeSConnection sets the new conection to the attribute service
// SetAttributeSConnection sets the new connection to the attribute service
// only used on reload
func (cdrS *CDRServer) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) {
cdrS.attrS = attrS
}
// SetThresholSConnection sets the new conection to the threshold service
// SetThresholSConnection sets the new connection to the threshold service
// only used on reload
func (cdrS *CDRServer) SetThresholSConnection(thdS rpcclient.RpcClientConnection) {
cdrS.thdS = thdS
}
// SetStatSConnection sets the new conection to the stat service
// SetStatSConnection sets the new connection to the stat service
// only used on reload
func (cdrS *CDRServer) SetStatSConnection(stS rpcclient.RpcClientConnection) {
cdrS.statS = stS
}
// SetChargerSConnection sets the new conection to the charger service
// SetChargerSConnection sets the new connection to the charger service
// only used on reload
func (cdrS *CDRServer) SetChargerSConnection(chS rpcclient.RpcClientConnection) {
cdrS.chargerS = chS
}
// SetRALsConnection sets the new conection to the RAL service
// SetRALsConnection sets the new connection to the RAL service
// only used on reload
func (cdrS *CDRServer) SetRALsConnection(rls rpcclient.RpcClientConnection) {
cdrS.rals = rls

View File

@@ -180,7 +180,7 @@ func (cS *ChargerService) V1GetChargersForEvent(args *utils.CGREventWithArgDispa
return
}
// SetAttributeConnection sets the new conection to the attribute service
// SetAttributeConnection sets the new connection to the attribute service
// only used on reload
func (cS *ChargerService) SetAttributeConnection(attrS rpcclient.RpcClientConnection) {
cS.attrS = attrS

View File

@@ -69,7 +69,7 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err
break
}
// reset client and try again
// used in case of closed conection because of idle time
// used in case of closed connection because of idle time
if pstr.client != nil {
pstr.client.Close() // Make shure the connection is closed before reseting it
}

View File

@@ -766,7 +766,7 @@ func (rS *ResourceService) StartLoop() {
go rS.runBackup()
}
// SetThresholdConnection sets the new conection to the threshold service
// SetThresholdConnection sets the new connection to the threshold service
// only used on reload
func (rS *ResourceService) SetThresholdConnection(thdS rpcclient.RpcClientConnection) {
rS.thdS = thdS

View File

@@ -423,7 +423,7 @@ func (sS *StatService) StartLoop() {
go sS.runBackup()
}
// SetThresholdConnection sets the new conection to the threshold service
// SetThresholdConnection sets the new connection to the threshold service
// only used on reload
func (sS *StatService) SetThresholdConnection(thdS rpcclient.RpcClientConnection) {
sS.thdS = thdS

View File

@@ -634,19 +634,19 @@ func (spS *SupplierService) V1GetSupplierProfilesForEvent(args *utils.CGREventWi
return
}
// SetAttributeSConnection sets the new conection to the attribute service
// SetAttributeSConnection sets the new connection to the attribute service
// only used on reload
func (spS *SupplierService) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) {
spS.attributeS = attrS
}
// SetStatSConnection sets the new conection to the stat service
// SetStatSConnection sets the new connection to the stat service
// only used on reload
func (spS *SupplierService) SetStatSConnection(stS rpcclient.RpcClientConnection) {
spS.statS = stS
}
// SetResourceSConnection sets the new conection to the resource service
// SetResourceSConnection sets the new connection to the resource service
// only used on reload
func (spS *SupplierService) SetResourceSConnection(rS rpcclient.RpcClientConnection) {
spS.resourceS = rS

View File

@@ -111,7 +111,7 @@ func (rdr *KafkaER) Serve() (err error) {
return
}
}(r)
go rdr.readLoop(r) // read until the conection is closed
go rdr.readLoop(r) // read until the connection is closed
return
}

View File

@@ -152,6 +152,75 @@ func (smg *SessionService) GetIntenternalChan() (conn chan rpcclient.RpcClientCo
// Reload handles the change of config
func (smg *SessionService) Reload(sp servmanager.ServiceProvider) (err error) {
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrConns, cdrsConn, chargerSConn rpcclient.RpcClientConnection
if chargerSConn, err = sp.GetConnection(utils.ChargerS, sp.GetConfig().SessionSCfg().ChargerSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ChargerS, err.Error()))
return
}
if ralsConns, err = sp.GetConnection(utils.ResponderS, sp.GetConfig().SessionSCfg().RALsConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ResponderS, err.Error()))
return
}
if resSConns, err = sp.GetConnection(utils.ResourceS, sp.GetConfig().SessionSCfg().ResSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ResourceS, err.Error()))
return
}
if threshSConns, err = sp.GetConnection(utils.ThresholdS, sp.GetConfig().SessionSCfg().ThreshSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ThresholdS, err.Error()))
return
}
if statSConns, err = sp.GetConnection(utils.StatS, sp.GetConfig().SessionSCfg().StatSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.StatS, err.Error()))
return
}
if suplSConns, err = sp.GetConnection(utils.SupplierS, sp.GetConfig().SessionSCfg().SupplSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.SupplierS, err.Error()))
return
}
if attrConns, err = sp.GetConnection(utils.AttributeS, sp.GetConfig().SessionSCfg().AttrSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.AttributeS, err.Error()))
return
}
if cdrsConn, err = sp.GetConnection(utils.CDRServer, sp.GetConfig().SessionSCfg().CDRsConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.CDRServer, err.Error()))
return
}
sReplConns, err := sessions.NewSReplConns(sp.GetConfig().SessionSCfg().SessionReplicationConns,
sp.GetConfig().GeneralCfg().Reconnects, sp.GetConfig().GeneralCfg().ConnectTimeout,
sp.GetConfig().GeneralCfg().ReplyTimeout)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMGReplicationConnection error: <%s>",
utils.SessionS, err.Error()))
return
}
smg.Lock()
smg.sm.SetAttributeSConnection(attrConns)
smg.sm.SetChargerSConnection(chargerSConn)
smg.sm.SetRALsConnection(ralsConns)
smg.sm.SetResourceSConnection(resSConns)
smg.sm.SetThresholSConnection(threshSConns)
smg.sm.SetStatSConnection(statSConns)
smg.sm.SetSupplierSConnection(suplSConns)
smg.sm.SetCDRSConnection(cdrsConn)
smg.sm.SetReplicationConnections(sReplConns)
smg.Unlock()
return
}

View File

@@ -3465,3 +3465,57 @@ func (sS *SessionS) BiRPCV1ProcessCDR(clnt rpcclient.RpcClientConnection,
Event: ev}},
rply)
}
// SetAttributeSConnection sets the new connection to the attribute service
// only used on reload
func (sS *SessionS) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) {
sS.attrS = attrS
}
// SetThresholSConnection sets the new connection to the threshold service
// only used on reload
func (sS *SessionS) SetThresholSConnection(thdS rpcclient.RpcClientConnection) {
sS.thdS = thdS
}
// SetStatSConnection sets the new connection to the stat service
// only used on reload
func (sS *SessionS) SetStatSConnection(stS rpcclient.RpcClientConnection) {
sS.statS = stS
}
// SetChargerSConnection sets the new connection to the charger service
// only used on reload
func (sS *SessionS) SetChargerSConnection(chS rpcclient.RpcClientConnection) {
sS.chargerS = chS
}
// SetRALsConnection sets the new connection to the RAL service
// only used on reload
func (sS *SessionS) SetRALsConnection(rls rpcclient.RpcClientConnection) {
sS.ralS = rls
}
// SetResourceSConnection sets the new connection to the resource service
// only used on reload
func (sS *SessionS) SetResourceSConnection(rS rpcclient.RpcClientConnection) {
sS.resS = rS
}
// SetSupplierSConnection sets the new connection to the supplier service
// only used on reload
func (sS *SessionS) SetSupplierSConnection(splS rpcclient.RpcClientConnection) {
sS.splS = splS
}
// SetCDRSConnection sets the new connection to the CDR server
// only used on reload
func (sS *SessionS) SetCDRSConnection(cdrS rpcclient.RpcClientConnection) {
sS.cdrS = cdrS
}
// SetReplicationConnections sets the new connections to the replictes sessions
// only used on reload
func (sS *SessionS) SetReplicationConnections(sReplConns []*SReplConn) {
sS.sReplConns = sReplConns
}