created connector pool

This commit is contained in:
Radu Ioan Fericean
2015-07-17 19:27:53 +03:00
parent 99cf2cbd91
commit 7d302691b6
5 changed files with 186 additions and 1 deletions

View File

@@ -152,7 +152,9 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, raterConn, cdrsConn)
rcp := engine.ConnectorPool{raterConn}
ccp := engine.ConnectorPool{cdrsConn}
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, rcp, ccp)
sms = append(sms, sm)
smRpc.SMs = append(smRpc.SMs, sm)
if err = sm.Connect(); err != nil {

View File

@@ -137,6 +137,7 @@ type CallDescriptor struct {
MaxRate float64
MaxRateUnit time.Duration
MaxCostSoFar float64
CgrId string
account *Account
testCallcost *CallCost // testing purpose only!
}

View File

@@ -499,3 +499,183 @@ func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *string) erro
func (rcc *RPCClientConnector) GetLCR(cd *CallDescriptor, reply *LCRCost) error {
return rcc.Client.Call("Responder.GetLCR", cd, reply)
}
var timeout = 1 * time.Millisecond
type ConnectorPool []Connector
func (cp ConnectorPool) GetCost(cd *CallDescriptor, cc *CallCost) error {
for _, con := range cp {
c := make(chan error, 1)
callCost := &CallCost{}
go func() { c <- con.GetCost(cd, callCost) }()
select {
case err := <-c:
*cc = *callCost
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) Debit(cd *CallDescriptor, cc *CallCost) error {
for _, con := range cp {
c := make(chan error, 1)
callCost := &CallCost{}
go func() { c <- con.Debit(cd, callCost) }()
select {
case err := <-c:
*cc = *callCost
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) MaxDebit(cd *CallDescriptor, cc *CallCost) error {
for _, con := range cp {
c := make(chan error, 1)
callCost := &CallCost{}
go func() { c <- con.MaxDebit(cd, callCost) }()
select {
case err := <-c:
*cc = *callCost
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) RefundIncrements(cd *CallDescriptor, resp *float64) error {
for _, con := range cp {
c := make(chan error, 1)
var r float64
go func() { c <- con.RefundIncrements(cd, &r) }()
select {
case err := <-c:
*resp = r
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetMaxSessionTime(cd *CallDescriptor, resp *float64) error {
for _, con := range cp {
c := make(chan error, 1)
var r float64
go func() { c <- con.GetMaxSessionTime(cd, &r) }()
select {
case err := <-c:
*resp = r
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error {
for _, con := range cp {
c := make(chan error, 1)
var r float64
go func() { c <- con.GetDerivedMaxSessionTime(ev, &r) }()
select {
case err := <-c:
*reply = r
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
for _, con := range cp {
c := make(chan error, 1)
sr := make([]*SessionRun, 0)
go func() { c <- con.GetSessionRuns(ev, &sr) }()
select {
case err := <-c:
*sRuns = sr
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error {
for _, con := range cp {
c := make(chan error, 1)
derivedChargers := utils.DerivedChargers{}
go func() { c <- con.GetDerivedChargers(attrs, &derivedChargers) }()
select {
case err := <-c:
*dcs = derivedChargers
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) ProcessCdr(cdr *StoredCdr, reply *string) error {
for _, con := range cp {
c := make(chan error, 1)
var r string
go func() { c <- con.ProcessCdr(cdr, &r) }()
select {
case err := <-c:
*reply = r
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) LogCallCost(ccl *CallCostLog, reply *string) error {
for _, con := range cp {
c := make(chan error, 1)
var r string
go func() { c <- con.LogCallCost(ccl, &r) }()
select {
case err := <-c:
*reply = r
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetLCR(cd *CallDescriptor, reply *LCRCost) error {
for _, con := range cp {
c := make(chan error, 1)
lcrCost := &LCRCost{}
go func() { c <- con.GetLCR(cd, lcrCost) }()
select {
case err := <-c:
*reply = *lcrCost
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}

View File

@@ -70,6 +70,7 @@ func NewSession(ev engine.Event, connId string, sm SessionManager) *Session {
// the debit loop method (to be stoped by sending somenthing on stopDebit channel)
func (s *Session) debitLoop(runIdx int) {
nextCd := s.sessionRuns[runIdx].CallDescriptor
nextCd.CgrId = s.eventStart.GetCgrId()
index := 0.0
debitPeriod := s.sessionManager.DebitInterval()
for {

View File

@@ -16,6 +16,7 @@ func NewErrServerError(err error) error {
var (
ErrNotImplemented = errors.New("NOT_IMPLEMENTED")
ErrNotFound = errors.New("NOT_FOUND")
ErrTimedOut = errors.New("TIMED_OUT")
ErrServerError = errors.New("SERVER_ERROR")
ErrMandatoryIeMissing = errors.New("MANDATORY_IE_MISSING")
ErrExists = errors.New("EXISTS")