From 7d302691b6c66a18c879c0afe27ee02f219b6b26 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 17 Jul 2015 19:27:53 +0300 Subject: [PATCH] created connector pool --- cmd/cgr-engine/cgr-engine.go | 4 +- engine/calldesc.go | 1 + engine/responder.go | 180 +++++++++++++++++++++++++++++++++++ sessionmanager/session.go | 1 + utils/consts.go | 1 + 5 files changed, 186 insertions(+), 1 deletion(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 5b60033f2..c8592fa0f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -152,7 +152,9 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac } cdrsConn = &engine.RPCClientConnector{Client: client} } - sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, raterConn, cdrsConn) + rcp := engine.ConnectorPool{raterConn} + ccp := engine.ConnectorPool{cdrsConn} + sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, rcp, ccp) sms = append(sms, sm) smRpc.SMs = append(smRpc.SMs, sm) if err = sm.Connect(); err != nil { diff --git a/engine/calldesc.go b/engine/calldesc.go index 417f3c0f6..0994ea53f 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -137,6 +137,7 @@ type CallDescriptor struct { MaxRate float64 MaxRateUnit time.Duration MaxCostSoFar float64 + CgrId string account *Account testCallcost *CallCost // testing purpose only! } diff --git a/engine/responder.go b/engine/responder.go index 6cee095d1..7a1f1dde8 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -499,3 +499,183 @@ func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *string) erro func (rcc *RPCClientConnector) GetLCR(cd *CallDescriptor, reply *LCRCost) error { return rcc.Client.Call("Responder.GetLCR", cd, reply) } + +var timeout = 1 * time.Millisecond + +type ConnectorPool []Connector + +func (cp ConnectorPool) GetCost(cd *CallDescriptor, cc *CallCost) error { + for _, con := range cp { + c := make(chan error, 1) + callCost := &CallCost{} + go func() { c <- con.GetCost(cd, callCost) }() + select { + case err := <-c: + *cc = *callCost + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} + +func (cp ConnectorPool) Debit(cd *CallDescriptor, cc *CallCost) error { + for _, con := range cp { + c := make(chan error, 1) + callCost := &CallCost{} + go func() { c <- con.Debit(cd, callCost) }() + select { + case err := <-c: + *cc = *callCost + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} + +func (cp ConnectorPool) MaxDebit(cd *CallDescriptor, cc *CallCost) error { + for _, con := range cp { + c := make(chan error, 1) + callCost := &CallCost{} + go func() { c <- con.MaxDebit(cd, callCost) }() + select { + case err := <-c: + *cc = *callCost + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} + +func (cp ConnectorPool) RefundIncrements(cd *CallDescriptor, resp *float64) error { + for _, con := range cp { + c := make(chan error, 1) + var r float64 + go func() { c <- con.RefundIncrements(cd, &r) }() + select { + case err := <-c: + *resp = r + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} + +func (cp ConnectorPool) GetMaxSessionTime(cd *CallDescriptor, resp *float64) error { + for _, con := range cp { + c := make(chan error, 1) + var r float64 + go func() { c <- con.GetMaxSessionTime(cd, &r) }() + select { + case err := <-c: + *resp = r + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} + +func (cp ConnectorPool) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error { + for _, con := range cp { + c := make(chan error, 1) + var r float64 + go func() { c <- con.GetDerivedMaxSessionTime(ev, &r) }() + select { + case err := <-c: + *reply = r + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} + +func (cp ConnectorPool) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { + for _, con := range cp { + c := make(chan error, 1) + sr := make([]*SessionRun, 0) + go func() { c <- con.GetSessionRuns(ev, &sr) }() + select { + case err := <-c: + *sRuns = sr + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} + +func (cp ConnectorPool) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error { + for _, con := range cp { + c := make(chan error, 1) + derivedChargers := utils.DerivedChargers{} + go func() { c <- con.GetDerivedChargers(attrs, &derivedChargers) }() + select { + case err := <-c: + *dcs = derivedChargers + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} + +func (cp ConnectorPool) ProcessCdr(cdr *StoredCdr, reply *string) error { + for _, con := range cp { + c := make(chan error, 1) + var r string + go func() { c <- con.ProcessCdr(cdr, &r) }() + select { + case err := <-c: + *reply = r + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} + +func (cp ConnectorPool) LogCallCost(ccl *CallCostLog, reply *string) error { + for _, con := range cp { + c := make(chan error, 1) + var r string + go func() { c <- con.LogCallCost(ccl, &r) }() + select { + case err := <-c: + *reply = r + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} + +func (cp ConnectorPool) GetLCR(cd *CallDescriptor, reply *LCRCost) error { + for _, con := range cp { + c := make(chan error, 1) + lcrCost := &LCRCost{} + go func() { c <- con.GetLCR(cd, lcrCost) }() + select { + case err := <-c: + *reply = *lcrCost + return err + case <-time.After(timeout): + // call timed out, continue + } + } + return utils.ErrTimedOut +} diff --git a/sessionmanager/session.go b/sessionmanager/session.go index d0cf99539..70e4b6867 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -70,6 +70,7 @@ func NewSession(ev engine.Event, connId string, sm SessionManager) *Session { // the debit loop method (to be stoped by sending somenthing on stopDebit channel) func (s *Session) debitLoop(runIdx int) { nextCd := s.sessionRuns[runIdx].CallDescriptor + nextCd.CgrId = s.eventStart.GetCgrId() index := 0.0 debitPeriod := s.sessionManager.DebitInterval() for { diff --git a/utils/consts.go b/utils/consts.go index 7300cfcc1..2dc826ec9 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -16,6 +16,7 @@ func NewErrServerError(err error) error { var ( ErrNotImplemented = errors.New("NOT_IMPLEMENTED") ErrNotFound = errors.New("NOT_FOUND") + ErrTimedOut = errors.New("TIMED_OUT") ErrServerError = errors.New("SERVER_ERROR") ErrMandatoryIeMissing = errors.New("MANDATORY_IE_MISSING") ErrExists = errors.New("EXISTS")