From 45ab2f92572f577203fbf047f071fbc865b849d6 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 25 Sep 2019 14:18:47 +0300 Subject: [PATCH] Added connection reload for SessionS --- agents/diam_it_test.go | 12 ++++---- apier/v1/apier.go | 6 ++-- engine/cdrs.go | 10 +++--- engine/chargers.go | 2 +- engine/pstr_amqpv1.go | 2 +- engine/resources.go | 2 +- engine/stats.go | 2 +- engine/suppliers.go | 6 ++-- ers/kafka.go | 2 +- services/sessions.go | 69 ++++++++++++++++++++++++++++++++++++++++++ sessions/sessions.go | 54 +++++++++++++++++++++++++++++++++ 11 files changed, 145 insertions(+), 22 deletions(-) diff --git a/agents/diam_it_test.go b/agents/diam_it_test.go index ed27fa788..c5c8268d9 100644 --- a/agents/diam_it_test.go +++ b/agents/diam_it_test.go @@ -309,7 +309,7 @@ func testDiamItDryRun(t *testing.T) { t.Fatal("Diameter client should not be nil") } if diamClnt.conn == nil { - t.Fatal("Diameter conection should not be nil") + t.Fatal("Diameter connection should not be nil") } if ccr == nil { t.Fatal("The mesage to diameter should not be nil") @@ -558,7 +558,7 @@ func testDiamItCCRInit(t *testing.T) { t.Fatal("Diameter client should not be nil") } if diamClnt.conn == nil { - t.Fatal("Diameter conection should not be nil") + t.Fatal("Diameter connection should not be nil") } if m == nil { t.Fatal("The mesage to diameter should not be nil") @@ -646,7 +646,7 @@ func testDiamItCCRUpdate(t *testing.T) { t.Fatal("Diameter client should not be nil") } if diamClnt.conn == nil { - t.Fatal("Diameter conection should not be nil") + t.Fatal("Diameter connection should not be nil") } if m == nil { t.Fatal("The mesage to diameter should not be nil") @@ -734,7 +734,7 @@ func testDiamItCCRTerminate(t *testing.T) { t.Fatal("Diameter client should not be nil") } if diamClnt.conn == nil { - t.Fatal("Diameter conection should not be nil") + t.Fatal("Diameter connection should not be nil") } if m == nil { t.Fatal("The mesage to diameter should not be nil") @@ -831,7 +831,7 @@ func testDiamItCCRSMS(t *testing.T) { t.Fatal("Diameter client should not be nil") } if diamClnt.conn == nil { - t.Fatal("Diameter conection should not be nil") + t.Fatal("Diameter connection should not be nil") } if ccr == nil { t.Fatal("The mesage to diameter should not be nil") @@ -921,7 +921,7 @@ func testDiamInitWithSessionDisconnect(t *testing.T) { t.Fatal("Diameter client should not be nil") } if diamClnt.conn == nil { - t.Fatal("Diameter conection should not be nil") + t.Fatal("Diameter connection should not be nil") } if m == nil { t.Fatal("The mesage to diameter should not be nil") diff --git a/apier/v1/apier.go b/apier/v1/apier.go index ba8195287..d11f710b5 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1295,19 +1295,19 @@ func (apierV1 *ApierV1) GetRatingPlanIDs(args utils.TenantArgWithPaginator, attr return nil } -// SetAttributeSConnection sets the new conection to the attribute service +// SetAttributeSConnection sets the new connection to the attribute service // only used on reload func (v1 *ApierV1) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) { v1.AttributeS = attrS } -// SetCacheSConnection sets the new conection to the cache service +// SetCacheSConnection sets the new connection to the cache service // only used on reload func (v1 *ApierV1) SetCacheSConnection(chS rpcclient.RpcClientConnection) { v1.CacheS = chS } -// SetSchedulerSConnection sets the new conection to the scheduler service +// SetSchedulerSConnection sets the new connection to the scheduler service // only used on reload func (v1 *ApierV1) SetSchedulerSConnection(schS rpcclient.RpcClientConnection) { v1.SchedulerS = schS diff --git a/engine/cdrs.go b/engine/cdrs.go index ef7371ff0..96464658b 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -830,31 +830,31 @@ func (cdrS *CDRServer) V1CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, c return nil } -// SetAttributeSConnection sets the new conection to the attribute service +// SetAttributeSConnection sets the new connection to the attribute service // only used on reload func (cdrS *CDRServer) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) { cdrS.attrS = attrS } -// SetThresholSConnection sets the new conection to the threshold service +// SetThresholSConnection sets the new connection to the threshold service // only used on reload func (cdrS *CDRServer) SetThresholSConnection(thdS rpcclient.RpcClientConnection) { cdrS.thdS = thdS } -// SetStatSConnection sets the new conection to the stat service +// SetStatSConnection sets the new connection to the stat service // only used on reload func (cdrS *CDRServer) SetStatSConnection(stS rpcclient.RpcClientConnection) { cdrS.statS = stS } -// SetChargerSConnection sets the new conection to the charger service +// SetChargerSConnection sets the new connection to the charger service // only used on reload func (cdrS *CDRServer) SetChargerSConnection(chS rpcclient.RpcClientConnection) { cdrS.chargerS = chS } -// SetRALsConnection sets the new conection to the RAL service +// SetRALsConnection sets the new connection to the RAL service // only used on reload func (cdrS *CDRServer) SetRALsConnection(rls rpcclient.RpcClientConnection) { cdrS.rals = rls diff --git a/engine/chargers.go b/engine/chargers.go index 16fd6360f..e3224a820 100644 --- a/engine/chargers.go +++ b/engine/chargers.go @@ -180,7 +180,7 @@ func (cS *ChargerService) V1GetChargersForEvent(args *utils.CGREventWithArgDispa return } -// SetAttributeConnection sets the new conection to the attribute service +// SetAttributeConnection sets the new connection to the attribute service // only used on reload func (cS *ChargerService) SetAttributeConnection(attrS rpcclient.RpcClientConnection) { cS.attrS = attrS diff --git a/engine/pstr_amqpv1.go b/engine/pstr_amqpv1.go index 8423b4795..b3f6a65c1 100644 --- a/engine/pstr_amqpv1.go +++ b/engine/pstr_amqpv1.go @@ -69,7 +69,7 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err break } // reset client and try again - // used in case of closed conection because of idle time + // used in case of closed connection because of idle time if pstr.client != nil { pstr.client.Close() // Make shure the connection is closed before reseting it } diff --git a/engine/resources.go b/engine/resources.go index 3bd6a632f..9b85d179d 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -766,7 +766,7 @@ func (rS *ResourceService) StartLoop() { go rS.runBackup() } -// SetThresholdConnection sets the new conection to the threshold service +// SetThresholdConnection sets the new connection to the threshold service // only used on reload func (rS *ResourceService) SetThresholdConnection(thdS rpcclient.RpcClientConnection) { rS.thdS = thdS diff --git a/engine/stats.go b/engine/stats.go index f05b37e52..54e2c85d2 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -423,7 +423,7 @@ func (sS *StatService) StartLoop() { go sS.runBackup() } -// SetThresholdConnection sets the new conection to the threshold service +// SetThresholdConnection sets the new connection to the threshold service // only used on reload func (sS *StatService) SetThresholdConnection(thdS rpcclient.RpcClientConnection) { sS.thdS = thdS diff --git a/engine/suppliers.go b/engine/suppliers.go index 8dce4731e..31d1c4133 100644 --- a/engine/suppliers.go +++ b/engine/suppliers.go @@ -634,19 +634,19 @@ func (spS *SupplierService) V1GetSupplierProfilesForEvent(args *utils.CGREventWi return } -// SetAttributeSConnection sets the new conection to the attribute service +// SetAttributeSConnection sets the new connection to the attribute service // only used on reload func (spS *SupplierService) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) { spS.attributeS = attrS } -// SetStatSConnection sets the new conection to the stat service +// SetStatSConnection sets the new connection to the stat service // only used on reload func (spS *SupplierService) SetStatSConnection(stS rpcclient.RpcClientConnection) { spS.statS = stS } -// SetResourceSConnection sets the new conection to the resource service +// SetResourceSConnection sets the new connection to the resource service // only used on reload func (spS *SupplierService) SetResourceSConnection(rS rpcclient.RpcClientConnection) { spS.resourceS = rS diff --git a/ers/kafka.go b/ers/kafka.go index a1e6a3cdf..0f4a2bbf1 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -111,7 +111,7 @@ func (rdr *KafkaER) Serve() (err error) { return } }(r) - go rdr.readLoop(r) // read until the conection is closed + go rdr.readLoop(r) // read until the connection is closed return } diff --git a/services/sessions.go b/services/sessions.go index 23728db6e..edb13e236 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -152,6 +152,75 @@ func (smg *SessionService) GetIntenternalChan() (conn chan rpcclient.RpcClientCo // Reload handles the change of config func (smg *SessionService) Reload(sp servmanager.ServiceProvider) (err error) { + var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrConns, cdrsConn, chargerSConn rpcclient.RpcClientConnection + + if chargerSConn, err = sp.GetConnection(utils.ChargerS, sp.GetConfig().SessionSCfg().ChargerSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.SessionS, utils.ChargerS, err.Error())) + return + } + + if ralsConns, err = sp.GetConnection(utils.ResponderS, sp.GetConfig().SessionSCfg().RALsConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.SessionS, utils.ResponderS, err.Error())) + return + } + + if resSConns, err = sp.GetConnection(utils.ResourceS, sp.GetConfig().SessionSCfg().ResSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.SessionS, utils.ResourceS, err.Error())) + return + } + + if threshSConns, err = sp.GetConnection(utils.ThresholdS, sp.GetConfig().SessionSCfg().ThreshSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.SessionS, utils.ThresholdS, err.Error())) + return + } + + if statSConns, err = sp.GetConnection(utils.StatS, sp.GetConfig().SessionSCfg().StatSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.SessionS, utils.StatS, err.Error())) + return + } + + if suplSConns, err = sp.GetConnection(utils.SupplierS, sp.GetConfig().SessionSCfg().SupplSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.SessionS, utils.SupplierS, err.Error())) + return + } + + if attrConns, err = sp.GetConnection(utils.AttributeS, sp.GetConfig().SessionSCfg().AttrSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.SessionS, utils.AttributeS, err.Error())) + return + } + + if cdrsConn, err = sp.GetConnection(utils.CDRServer, sp.GetConfig().SessionSCfg().CDRsConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.SessionS, utils.CDRServer, err.Error())) + return + } + + sReplConns, err := sessions.NewSReplConns(sp.GetConfig().SessionSCfg().SessionReplicationConns, + sp.GetConfig().GeneralCfg().Reconnects, sp.GetConfig().GeneralCfg().ConnectTimeout, + sp.GetConfig().GeneralCfg().ReplyTimeout) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMGReplicationConnection error: <%s>", + utils.SessionS, err.Error())) + return + } + smg.Lock() + smg.sm.SetAttributeSConnection(attrConns) + smg.sm.SetChargerSConnection(chargerSConn) + smg.sm.SetRALsConnection(ralsConns) + smg.sm.SetResourceSConnection(resSConns) + smg.sm.SetThresholSConnection(threshSConns) + smg.sm.SetStatSConnection(statSConns) + smg.sm.SetSupplierSConnection(suplSConns) + smg.sm.SetCDRSConnection(cdrsConn) + smg.sm.SetReplicationConnections(sReplConns) + smg.Unlock() return } diff --git a/sessions/sessions.go b/sessions/sessions.go index 85dd00600..a0f4fb213 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -3465,3 +3465,57 @@ func (sS *SessionS) BiRPCV1ProcessCDR(clnt rpcclient.RpcClientConnection, Event: ev}}, rply) } + +// SetAttributeSConnection sets the new connection to the attribute service +// only used on reload +func (sS *SessionS) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) { + sS.attrS = attrS +} + +// SetThresholSConnection sets the new connection to the threshold service +// only used on reload +func (sS *SessionS) SetThresholSConnection(thdS rpcclient.RpcClientConnection) { + sS.thdS = thdS +} + +// SetStatSConnection sets the new connection to the stat service +// only used on reload +func (sS *SessionS) SetStatSConnection(stS rpcclient.RpcClientConnection) { + sS.statS = stS +} + +// SetChargerSConnection sets the new connection to the charger service +// only used on reload +func (sS *SessionS) SetChargerSConnection(chS rpcclient.RpcClientConnection) { + sS.chargerS = chS +} + +// SetRALsConnection sets the new connection to the RAL service +// only used on reload +func (sS *SessionS) SetRALsConnection(rls rpcclient.RpcClientConnection) { + sS.ralS = rls +} + +// SetResourceSConnection sets the new connection to the resource service +// only used on reload +func (sS *SessionS) SetResourceSConnection(rS rpcclient.RpcClientConnection) { + sS.resS = rS +} + +// SetSupplierSConnection sets the new connection to the supplier service +// only used on reload +func (sS *SessionS) SetSupplierSConnection(splS rpcclient.RpcClientConnection) { + sS.splS = splS +} + +// SetCDRSConnection sets the new connection to the CDR server +// only used on reload +func (sS *SessionS) SetCDRSConnection(cdrS rpcclient.RpcClientConnection) { + sS.cdrS = cdrS +} + +// SetReplicationConnections sets the new connections to the replictes sessions +// only used on reload +func (sS *SessionS) SetReplicationConnections(sReplConns []*SReplConn) { + sS.sReplConns = sReplConns +}