using rpc client instead of connector

This commit is contained in:
Radu Ioan Fericean
2015-11-25 19:27:01 +02:00
parent feedacc7b5
commit f68dc3fcfd
12 changed files with 104 additions and 422 deletions

View File

@@ -32,6 +32,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"gopkg.in/fsnotify.v1"
)
@@ -54,7 +55,7 @@ Common parameters within configs processed:
Parameters specific per config instance:
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
*/
func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrs engine.Connector, closeChan chan struct{}, dfltTimezone string) (*Cdrc, error) {
func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string) (*Cdrc, error) {
var cdrcCfg *config.CdrcConfig
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
break
@@ -85,7 +86,7 @@ type Cdrc struct {
cdrcCfgs map[string]*config.CdrcConfig // All cdrc config profiles attached to this CDRC (key will be profile instance name)
dfltCdrcCfg *config.CdrcConfig
timezone string
cdrs engine.Connector
cdrs rpcclient.RpcClientConnection
httpClient *http.Client
closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
@@ -201,7 +202,7 @@ func (self *Cdrc) processFile(filePath string) error {
utils.Logger.Info(fmt.Sprintf("<Cdrc> DryRun CDR: %+v", storedCdr))
continue
}
if err := self.cdrs.ProcessCdr(storedCdr, &reply); err != nil {
if err := self.cdrs.Call("Responder.ProcessCdr", storedCdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<Cdrc> Failed sending CDR, %+v, error: %s", storedCdr, err.Error()))
} else if reply != "OK" {
utils.Logger.Err(fmt.Sprintf("<Cdrc> Received unexpected reply for CDR, %+v, reply: %s", storedCdr, reply))

View File

@@ -104,7 +104,7 @@ func startCdrcs(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan
// Fires up a cdrc instance
func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *engine.Responder, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool,
closeChan chan struct{}, exitChan chan bool) {
var cdrsConn engine.Connector
var cdrsConn rpcclient.RpcClientConnection
var cdrcCfg *config.CdrcConfig
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
break
@@ -122,7 +122,7 @@ func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: conn}
cdrsConn = conn
}
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.DefaultTimezone)
if err != nil {
@@ -138,7 +138,7 @@ func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *
func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internalRaterChan chan *engine.Responder, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-Generic service.")
var raterConn, cdrsConn engine.Connector
var raterConn, cdrsConn rpcclient.RpcClientConnection
var client *rpcclient.RpcClient
var err error
// Connect to rater
@@ -154,7 +154,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
exitChan <- true
return
}
raterConn = &engine.RPCClientConnector{Client: client}
raterConn = client
}
}
// Connect to CDRS
@@ -173,7 +173,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
cdrsConn = client
}
}
}
@@ -226,7 +226,7 @@ func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit
func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-FreeSWITCH service.")
var raterConn, cdrsConn engine.Connector
var raterConn, cdrsConn rpcclient.RpcClientConnection
var client *rpcclient.RpcClient
var err error
// Connect to rater
@@ -242,7 +242,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd
exitChan <- true
return
}
raterConn = &engine.RPCClientConnector{Client: client}
raterConn = client
}
}
// Connect to CDRS
@@ -261,7 +261,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
cdrsConn = client
}
}
}
@@ -275,7 +275,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd
func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-Kamailio service.")
var raterConn, cdrsConn engine.Connector
var raterConn, cdrsConn rpcclient.RpcClientConnection
var client *rpcclient.RpcClient
var err error
// Connect to rater
@@ -291,7 +291,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
exitChan <- true
return
}
raterConn = &engine.RPCClientConnector{Client: client}
raterConn = client
}
}
// Connect to CDRS
@@ -310,7 +310,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
cdrsConn = client
}
}
}
@@ -324,7 +324,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-OpenSIPS service.")
var raterConn, cdrsConn engine.Connector
var raterConn, cdrsConn rpcclient.RpcClientConnection
var client *rpcclient.RpcClient
var err error
// Connect to rater
@@ -340,7 +340,7 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
exitChan <- true
return
}
raterConn = &engine.RPCClientConnector{Client: client}
raterConn = client
}
}
// Connect to CDRS
@@ -359,7 +359,7 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
cdrsConn = client
}
}
}
@@ -379,7 +379,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
var err error
var client *rpcclient.RpcClient
// Rater connection init
var raterConn engine.Connector
var raterConn rpcclient.RpcClientConnection
if cfg.CDRSRater == utils.INTERNAL {
responder := <-internalRaterChan // Wait for rater to come up before start querying
raterConn = responder
@@ -391,7 +391,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
exitChan <- true
return
}
raterConn = &engine.RPCClientConnector{Client: client}
raterConn = client
}
// Pubsub connection init
var pubSubConn engine.PublisherSubscriber

View File

@@ -27,6 +27,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/jinzhu/gorm"
)
@@ -65,14 +66,14 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, client rpcclient.RpcClientConnection, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, client: client, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil
}
type CdrServer struct {
cgrCfg *config.CGRConfig
cdrDb CdrStorage
rater Connector
client rpcclient.RpcClientConnection
pubsub PublisherSubscriber
users UserService
aliases AliasService
@@ -228,7 +229,7 @@ func (self *CdrServer) rateStoreStatsReplicate(cdr *StoredCdr) error {
}
}
// Rate CDR
if self.rater != nil && !cdr.Rated {
if self.client != nil && !cdr.Rated {
if err := self.rateCDR(cdr); err != nil {
cdr.Cost = -1.0 // If there was an error, mark the CDR
cdr.ExtraInfo = err.Error()
@@ -275,7 +276,7 @@ func (self *CdrServer) deriveCdrs(storedCdr *StoredCdr) ([]*StoredCdr, error) {
attrsDC := &utils.AttrDerivedChargers{Tenant: storedCdr.Tenant, Category: storedCdr.Category, Direction: storedCdr.Direction,
Account: storedCdr.Account, Subject: storedCdr.Subject}
var dcs utils.DerivedChargers
if err := self.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
if err := self.client.Call("Responder.GetDerivedChargers", attrsDC, &dcs); err != nil {
utils.Logger.Err(fmt.Sprintf("Could not get derived charging for cgrid %s, error: %s", storedCdr.CgrId, err.Error()))
return nil, err
}
@@ -337,11 +338,11 @@ func (self *CdrServer) getCostFromRater(storedCdr *StoredCdr) (*CallCost, error)
DurationIndex: storedCdr.Usage,
}
if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID, utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, storedCdr.ReqType) { // Prepaid - Cost can be recalculated in case of missing records from SM
if err = self.rater.Debit(cd, cc); err == nil { // Debit has occured, we are forced to write the log, even if CDR store is disabled
if err = self.client.Call("Responder.Debit", cd, cc); err == nil { // Debit has occured, we are forced to write the log, even if CDR store is disabled
self.cdrDb.LogCallCost(storedCdr.CgrId, utils.CDRS_SOURCE, storedCdr.MediationRunId, cc)
}
} else {
err = self.rater.GetCost(cd, cc)
err = self.client.Call("Responder.GetCost", cd, cc)
}
if err != nil {
return cc, err

View File

@@ -31,7 +31,6 @@ import (
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// Individual session run
@@ -51,17 +50,15 @@ type Responder struct {
ExitChan chan bool
CdrSrv *CdrServer
Stats StatsInterface
Timeout time.Duration
Timezone string
cnt int64
responseCache *cache2go.ResponseCache
}
func NewResponder(exitChan chan bool, cdrSrv *CdrServer, stats StatsInterface, timeout, timeToLive time.Duration) *Responder {
func NewResponder(exitChan chan bool, cdrSrv *CdrServer, stats StatsInterface, timeToLive time.Duration) *Responder {
return &Responder{
ExitChan: exitChan,
Stats: stats,
Timeout: timeToLive,
responseCache: cache2go.NewResponseCache(timeToLive),
}
}
@@ -617,339 +614,27 @@ func (rs *Responder) UnRegisterRater(clientAddress string, replay *int) error {
return nil
}
func (rs *Responder) GetTimeout(i int, d *time.Duration) error {
*d = rs.Timeout
return nil
}
// Reflection worker type for not standalone balancer
type ResponderWorker struct{}
func (rw *ResponderWorker) Call(serviceMethod string, args interface{}, reply interface{}) error {
func (rs *Responder) Call(serviceMethod string, args interface{}, reply interface{}) error {
if !strings.HasPrefix(serviceMethod, "Responder.") {
return utils.ErrNotImplemented
}
methodName := strings.TrimLeft(serviceMethod, "Responder.")
switch args.(type) {
case CallDescriptor:
cd := args.(CallDescriptor)
switch reply.(type) {
case *CallCost:
rep := reply.(*CallCost)
method := reflect.ValueOf(&cd).MethodByName(methodName)
ret := method.Call([]reflect.Value{})
*rep = *(ret[0].Interface().(*CallCost))
case *float64:
rep := reply.(*float64)
method := reflect.ValueOf(&cd).MethodByName(methodName)
ret := method.Call([]reflect.Value{})
*rep = *(ret[0].Interface().(*float64))
}
case string:
switch methodName {
case "Status":
*(reply.(*string)) = "Local!"
case "Shutdown":
*(reply.(*string)) = "Done!"
}
// get method
method := reflect.ValueOf(rs).MethodByName(methodName)
if !method.IsValid() {
return utils.ErrNotImplemented
}
return nil
}
func (rw *ResponderWorker) Close() error {
return nil
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
type Connector interface {
GetCost(*CallDescriptor, *CallCost) error
Debit(*CallDescriptor, *CallCost) error
MaxDebit(*CallDescriptor, *CallCost) error
RefundIncrements(*CallDescriptor, *float64) error
GetMaxSessionTime(*CallDescriptor, *float64) error
GetDerivedChargers(*utils.AttrDerivedChargers, *utils.DerivedChargers) error
GetDerivedMaxSessionTime(*StoredCdr, *float64) error
GetSessionRuns(*StoredCdr, *[]*SessionRun) error
ProcessCdr(*StoredCdr, *string) error
LogCallCost(*CallCostLog, *string) error
GetLCR(*AttrGetLcr, *LCRCost) error
GetTimeout(int, *time.Duration) error
}
type RPCClientConnector struct {
Client *rpcclient.RpcClient
Timeout time.Duration
}
func (rcc *RPCClientConnector) GetCost(cd *CallDescriptor, cc *CallCost) error {
return rcc.Client.Call("Responder.GetCost", cd, cc)
}
func (rcc *RPCClientConnector) Debit(cd *CallDescriptor, cc *CallCost) error {
return rcc.Client.Call("Responder.Debit", cd, cc)
}
func (rcc *RPCClientConnector) MaxDebit(cd *CallDescriptor, cc *CallCost) error {
return rcc.Client.Call("Responder.MaxDebit", cd, cc)
}
func (rcc *RPCClientConnector) RefundIncrements(cd *CallDescriptor, resp *float64) error {
return rcc.Client.Call("Responder.RefundIncrements", cd, resp)
}
func (rcc *RPCClientConnector) GetMaxSessionTime(cd *CallDescriptor, resp *float64) error {
return rcc.Client.Call("Responder.GetMaxSessionTime", cd, resp)
}
func (rcc *RPCClientConnector) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error {
return rcc.Client.Call("Responder.GetDerivedMaxSessionTime", ev, reply)
}
func (rcc *RPCClientConnector) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
return rcc.Client.Call("Responder.GetSessionRuns", ev, sRuns)
}
func (rcc *RPCClientConnector) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error {
return rcc.Client.Call("ApierV1.GetDerivedChargers", attrs, dcs)
}
func (rcc *RPCClientConnector) ProcessCdr(cdr *StoredCdr, reply *string) error {
return rcc.Client.Call("CdrsV1.ProcessCdr", cdr, reply)
}
func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *string) error {
return rcc.Client.Call("CdrsV1.LogCallCost", ccl, reply)
}
func (rcc *RPCClientConnector) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error {
return rcc.Client.Call("Responder.GetLCR", attrs, reply)
}
func (rcc *RPCClientConnector) GetTimeout(i int, d *time.Duration) error {
*d = rcc.Timeout
return nil
}
type ConnectorPool []Connector
func (cp ConnectorPool) GetCost(cd *CallDescriptor, cc *CallCost) error {
for _, con := range cp {
c := make(chan error, 1)
callCost := &CallCost{}
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetCost(cd, callCost) }()
select {
case err := <-c:
*cc = *callCost
return err
case <-time.After(timeout):
// call timed out, continue
}
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) Debit(cd *CallDescriptor, cc *CallCost) error {
for _, con := range cp {
c := make(chan error, 1)
callCost := &CallCost{}
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.Debit(cd, callCost) }()
select {
case err := <-c:
*cc = *callCost
return err
case <-time.After(timeout):
// call timed out, continue
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) MaxDebit(cd *CallDescriptor, cc *CallCost) error {
for _, con := range cp {
c := make(chan error, 1)
callCost := &CallCost{}
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.MaxDebit(cd, callCost) }()
select {
case err := <-c:
*cc = *callCost
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) RefundIncrements(cd *CallDescriptor, resp *float64) error {
for _, con := range cp {
c := make(chan error, 1)
var r float64
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.RefundIncrements(cd, &r) }()
select {
case err := <-c:
*resp = r
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetMaxSessionTime(cd *CallDescriptor, resp *float64) error {
for _, con := range cp {
c := make(chan error, 1)
var r float64
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetMaxSessionTime(cd, &r) }()
select {
case err := <-c:
*resp = r
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error {
for _, con := range cp {
c := make(chan error, 1)
var r float64
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetDerivedMaxSessionTime(ev, &r) }()
select {
case err := <-c:
*reply = r
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
for _, con := range cp {
c := make(chan error, 1)
sr := make([]*SessionRun, 0)
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetSessionRuns(ev, &sr) }()
select {
case err := <-c:
*sRuns = sr
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error {
for _, con := range cp {
c := make(chan error, 1)
derivedChargers := utils.DerivedChargers{}
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetDerivedChargers(attrs, &derivedChargers) }()
select {
case err := <-c:
*dcs = derivedChargers
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) ProcessCdr(cdr *StoredCdr, reply *string) error {
for _, con := range cp {
c := make(chan error, 1)
var r string
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.ProcessCdr(cdr, &r) }()
select {
case err := <-c:
*reply = r
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) LogCallCost(ccl *CallCostLog, reply *string) error {
for _, con := range cp {
c := make(chan error, 1)
var r string
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.LogCallCost(ccl, &r) }()
select {
case err := <-c:
*reply = r
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetLCR(attr *AttrGetLcr, reply *LCRCost) error {
for _, con := range cp {
c := make(chan error, 1)
lcrCost := &LCRCost{}
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetLCR(attr, lcrCost) }()
select {
case err := <-c:
*reply = *lcrCost
return err
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetTimeout(i int, d *time.Duration) error {
*d = 0
return nil
return err
}

View File

@@ -30,9 +30,10 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/fsock"
"github.com/cgrates/rpcclient"
)
func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Connector, timezone string) *FSSessionManager {
func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs rpcclient.RpcClientConnection, timezone string) *FSSessionManager {
return &FSSessionManager{
cfg: smFsConfig,
conns: make(map[string]*fsock.FSock),
@@ -50,10 +51,11 @@ type FSSessionManager struct {
cfg *config.SmFsConfig
conns map[string]*fsock.FSock // Keep the list here for connection management purposes
senderPools map[string]*fsock.FSockPool // Keep sender pools here
rater engine.Connector
cdrsrv engine.Connector
sessions *Sessions
timezone string
rater rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
sessions *Sessions
timezone string
}
func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) {
@@ -107,7 +109,7 @@ func (sm *FSSessionManager) setCgrLcr(ev engine.Event, connId string) error {
TimeStart: startTime,
TimeEnd: startTime.Add(config.CgrConfig().MaxCallDuration),
}
if err := sm.rater.GetLCR(&engine.AttrGetLcr{CallDescriptor: cd}, &lcrCost); err != nil {
if err := sm.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcrCost); err != nil {
return err
}
supps := []string{}
@@ -131,7 +133,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) {
return
}
var maxCallDuration float64 // This will be the maximum duration this channel will be allowed to last
if err := sm.rater.GetDerivedMaxSessionTime(ev.AsStoredCdr(config.CgrConfig().DefaultTimezone), &maxCallDuration); err != nil {
if err := sm.rater.Call("Responder.GetDerivedMaxSessionTime", ev.AsStoredCdr(config.CgrConfig().DefaultTimezone), &maxCallDuration); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not get max session time for %s, error: %s", ev.GetUUID(), err.Error()))
}
if maxCallDuration != -1 { // For calls different than unlimited, set limits
@@ -152,7 +154,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) {
return
}
var lcr engine.LCRCost
if err = sm.Rater().GetLCR(&engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
if err = sm.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_API_ERROR: %s", err.Error()))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
}
@@ -294,7 +296,7 @@ func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify st
func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error {
var reply string
if err := sm.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil {
if err := sm.cdrsrv.Call("Responder.ProcessCdr", storedCdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error()))
}
return nil
@@ -304,11 +306,11 @@ func (sm *FSSessionManager) DebitInterval() time.Duration {
return sm.cfg.DebitInterval
}
func (sm *FSSessionManager) CdrSrv() engine.Connector {
func (sm *FSSessionManager) CdrSrv() rpcclient.RpcClientConnection {
return sm.cdrsrv
}
func (sm *FSSessionManager) Rater() engine.Connector {
func (sm *FSSessionManager) Rater() rpcclient.RpcClientConnection {
return sm.rater
}

View File

@@ -29,17 +29,18 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/kamevapi"
"github.com/cgrates/rpcclient"
)
func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv engine.Connector, timezone string) (*KamailioSessionManager, error) {
func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv rpcclient.RpcClientConnection, timezone string) (*KamailioSessionManager, error) {
ksm := &KamailioSessionManager{cfg: smKamCfg, rater: rater, cdrsrv: cdrsrv, timezone: timezone, conns: make(map[string]*kamevapi.KamEvapi), sessions: NewSessions()}
return ksm, nil
}
type KamailioSessionManager struct {
cfg *config.SmKamConfig
rater engine.Connector
cdrsrv engine.Connector
rater rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
timezone string
conns map[string]*kamevapi.KamEvapi
sessions *Sessions
@@ -64,7 +65,7 @@ func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) {
}
var remainingDuration float64
var errMaxSession error
if errMaxSession = self.rater.GetDerivedMaxSessionTime(kev.AsStoredCdr(self.Timezone()), &remainingDuration); errMaxSession != nil {
if errMaxSession = self.rater.Call("Responder.GetDerivedMaxSessionTime", kev.AsStoredCdr(self.Timezone()), &remainingDuration); errMaxSession != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Could not get max session time, error: %s", errMaxSession.Error()))
}
var supplStr string
@@ -107,7 +108,7 @@ func (self *KamailioSessionManager) getSuppliers(kev KamEvent) (string, error) {
return "", errors.New("LCR_PREPROCESS_ERROR")
}
var lcr engine.LCRCost
if err = self.Rater().GetLCR(&engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
if err = self.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-Kamailio> LCR_API_ERROR error: %s", err.Error()))
return "", errors.New("LCR_API_ERROR")
}
@@ -196,10 +197,10 @@ func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, n
func (self *KamailioSessionManager) DebitInterval() time.Duration {
return self.cfg.DebitInterval
}
func (self *KamailioSessionManager) CdrSrv() engine.Connector {
func (self *KamailioSessionManager) CdrSrv() rpcclient.RpcClientConnection {
return self.cdrsrv
}
func (self *KamailioSessionManager) Rater() engine.Connector {
func (self *KamailioSessionManager) Rater() rpcclient.RpcClientConnection {
return self.rater
}
@@ -208,7 +209,7 @@ func (self *KamailioSessionManager) ProcessCdr(cdr *engine.StoredCdr) error {
return nil
}
var reply string
if err := self.cdrsrv.ProcessCdr(cdr, &reply); err != nil {
if err := self.cdrsrv.Call("Responder.ProcessCdr", cdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CgrId, cdr.AccId, err.Error()))
}
return nil

View File

@@ -29,6 +29,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/osipsdagram"
"github.com/cgrates/rpcclient"
)
/*
@@ -80,7 +81,7 @@ duration::
*/
func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, reconnects int, rater, cdrsrv engine.Connector, timezone string) (*OsipsSessionManager, error) {
func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, reconnects int, rater, cdrsrv rpcclient.RpcClientConnection, timezone string) (*OsipsSessionManager, error) {
osm := &OsipsSessionManager{cfg: smOsipsCfg, reconnects: reconnects, rater: rater, cdrsrv: cdrsrv, timezone: timezone, cdrStartEvents: make(map[string]*OsipsEvent), sessions: NewSessions()}
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, // Raised when OpenSIPS starts so we can register our event handlers
@@ -94,8 +95,8 @@ func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, reconnects int, ra
type OsipsSessionManager struct {
cfg *config.SmOsipsConfig
reconnects int
rater engine.Connector
cdrsrv engine.Connector
rater rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
timezone string
eventHandlers map[string][]func(*osipsdagram.OsipsEvent)
evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
@@ -130,12 +131,12 @@ func (osm *OsipsSessionManager) DebitInterval() time.Duration {
}
// Returns the connection to local cdr database, used by session to log it's final costs
func (osm *OsipsSessionManager) CdrSrv() engine.Connector {
func (osm *OsipsSessionManager) CdrSrv() rpcclient.RpcClientConnection {
return osm.cdrsrv
}
// Returns connection to rater/controller
func (osm *OsipsSessionManager) Rater() engine.Connector {
func (osm *OsipsSessionManager) Rater() rpcclient.RpcClientConnection {
return osm.rater
}
@@ -152,7 +153,7 @@ func (osm *OsipsSessionManager) Shutdown() error {
// Process the CDR with CDRS component
func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error {
var reply string
return osm.cdrsrv.ProcessCdr(storedCdr, &reply)
return osm.cdrsrv.Call("Responder.ProcessCdr", storedCdr, &reply)
}
// Disconnects the session

View File

@@ -58,7 +58,7 @@ func NewSession(ev engine.Event, connId string, sm SessionManager) *Session {
sessionManager: sm,
connId: connId,
}
if err := sm.Rater().GetSessionRuns(ev.AsStoredCdr(s.sessionManager.Timezone()), &s.sessionRuns); err != nil || len(s.sessionRuns) == 0 {
if err := sm.Rater().Call("Responder.GetSessionRuns", ev.AsStoredCdr(s.sessionManager.Timezone()), &s.sessionRuns); err != nil || len(s.sessionRuns) == 0 {
return nil
}
for runIdx := range s.sessionRuns {
@@ -86,7 +86,7 @@ func (s *Session) debitLoop(runIdx int) {
nextCd.LoopIndex = index
nextCd.DurationIndex += debitPeriod // first presumed duration
cc := new(engine.CallCost)
if err := s.sessionManager.Rater().MaxDebit(nextCd, cc); err != nil {
if err := s.sessionManager.Rater().Call("Responder.MaxDebit", nextCd, cc); err != nil {
utils.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err))
if err.Error() == utils.ErrUnauthorizedDestination.Error() {
s.sessionManager.DisconnectSession(s.eventStart, s.connId, UNAUTHORIZED_DESTINATION)
@@ -204,7 +204,7 @@ func (s *Session) Refund(lastCC *engine.CallCost, hangupTime time.Time) error {
Increments: refundIncrements,
}
var response float64
err := s.sessionManager.Rater().RefundIncrements(cd, &response)
err := s.sessionManager.Rater().Call("Responder.RefundIncrements", cd, &response)
if err != nil {
return err
}
@@ -233,7 +233,7 @@ func (s *Session) SaveOperations() {
}
var reply string
err := s.sessionManager.CdrSrv().LogCallCost(&engine.CallCostLog{
err := s.sessionManager.CdrSrv().Call("Responder.LogCallCost", &engine.CallCostLog{
CgrId: s.eventStart.GetCgrId(s.sessionManager.Timezone()),
Source: utils.SESSION_MANAGER_SOURCE,
RunId: sr.DerivedCharger.RunId,

View File

@@ -23,7 +23,6 @@ import (
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
//"github.com/cgrates/cgrates/config"
@@ -80,30 +79,19 @@ func TestSessionNilSession(t *testing.T) {
}
*/
type MockConnector struct {
type MockRpcClient struct {
refundCd *engine.CallDescriptor
}
func (mc *MockConnector) GetCost(*engine.CallDescriptor, *engine.CallCost) error { return nil }
func (mc *MockConnector) Debit(*engine.CallDescriptor, *engine.CallCost) error { return nil }
func (mc *MockConnector) MaxDebit(*engine.CallDescriptor, *engine.CallCost) error { return nil }
func (mc *MockConnector) RefundIncrements(cd *engine.CallDescriptor, reply *float64) error {
mc.refundCd = cd
func (mc *MockRpcClient) Call(methodName string, arg interface{}, reply interface{}) error {
if cd, ok := arg.(*engine.CallDescriptor); ok {
mc.refundCd = cd
}
return nil
}
func (mc *MockConnector) GetMaxSessionTime(*engine.CallDescriptor, *float64) error { return nil }
func (mc *MockConnector) GetDerivedChargers(*utils.AttrDerivedChargers, *utils.DerivedChargers) error {
return nil
}
func (mc *MockConnector) GetDerivedMaxSessionTime(*engine.StoredCdr, *float64) error { return nil }
func (mc *MockConnector) GetSessionRuns(*engine.StoredCdr, *[]*engine.SessionRun) error { return nil }
func (mc *MockConnector) ProcessCdr(*engine.StoredCdr, *string) error { return nil }
func (mc *MockConnector) LogCallCost(*engine.CallCostLog, *string) error { return nil }
func (mc *MockConnector) GetLCR(*engine.AttrGetLcr, *engine.LCRCost) error { return nil }
func (mc *MockConnector) GetTimeout(int, *time.Duration) error { return nil }
func TestSessionRefund(t *testing.T) {
mc := &MockConnector{}
mc := &MockRpcClient{}
s := &Session{sessionManager: &FSSessionManager{rater: mc}}
ts := &engine.TimeSpan{
TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC),
@@ -123,7 +111,7 @@ func TestSessionRefund(t *testing.T) {
}
func TestSessionRefundAll(t *testing.T) {
mc := &MockConnector{}
mc := &MockRpcClient{}
s := &Session{sessionManager: &FSSessionManager{rater: mc}}
ts := &engine.TimeSpan{
TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC),
@@ -143,7 +131,7 @@ func TestSessionRefundAll(t *testing.T) {
}
func TestSessionRefundManyAll(t *testing.T) {
mc := &MockConnector{}
mc := &MockRpcClient{}
s := &Session{sessionManager: &FSSessionManager{rater: mc}}
ts1 := &engine.TimeSpan{
TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC),

View File

@@ -22,11 +22,12 @@ import (
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/rpcclient"
)
type SessionManager interface {
Rater() engine.Connector
CdrSrv() engine.Connector
Rater() rpcclient.RpcClientConnection
CdrSrv() rpcclient.RpcClientConnection
DebitInterval() time.Duration
DisconnectSession(engine.Event, string, string) error
WarnSessionMinDuration(string, string)

View File

@@ -25,6 +25,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// One session handled by SM
@@ -34,8 +35,8 @@ type SMGSession struct {
connId string // Reference towards connection id on the session manager side.
runId string // Keep a reference for the derived run
timezone string
rater engine.Connector // Connector to Rater service
cdrsrv engine.Connector // Connector to CDRS service
rater rpcclient.RpcClientConnection // Connector to Rater service
cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service
extconns *SMGExternalConnections
cd *engine.CallDescriptor
sessionCds []*engine.CallDescriptor
@@ -86,7 +87,7 @@ func (self *SMGSession) debit(dur time.Duration) (time.Duration, error) {
self.cd.TimeEnd = self.cd.TimeStart.Add(dur)
self.cd.DurationIndex += dur
cc := &engine.CallCost{}
if err := self.rater.MaxDebit(self.cd, cc); err != nil {
if err := self.rater.Call("Responder.MaxDebit", self.cd, cc); err != nil {
return 0, err
}
// cd corrections
@@ -155,7 +156,7 @@ func (self *SMGSession) refund(refundDuration time.Duration) error {
Increments: refundIncrements,
}
var response float64
err := self.rater.RefundIncrements(cd, &response)
err := self.rater.Call("Responder.RefundIncrements", cd, &response)
if err != nil {
return err
}
@@ -204,7 +205,7 @@ func (self *SMGSession) saveOperations() error {
firstCC.Merge(cc)
}
var reply string
err := self.cdrsrv.LogCallCost(&engine.CallCostLog{
err := self.cdrsrv.Call("Responder.LogCallCost", &engine.CallCostLog{
CgrId: self.eventStart.GetCgrId(self.timezone),
Source: utils.SESSION_MANAGER_SOURCE,
RunId: self.runId,

View File

@@ -28,9 +28,10 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func NewSMGeneric(cgrCfg *config.CGRConfig, rater engine.Connector, cdrsrv engine.Connector, timezone string, extconns *SMGExternalConnections) *SMGeneric {
func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, timezone string, extconns *SMGExternalConnections) *SMGeneric {
gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone,
sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.NewGuardianLock()}
return gsm
@@ -38,8 +39,8 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rater engine.Connector, cdrsrv engin
type SMGeneric struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
rater engine.Connector
cdrsrv engine.Connector
rater rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
timezone string
sessions map[string][]*SMGSession //Group sessions per sessionId, multiple runs based on derived charging
extconns *SMGExternalConnections // Reference towards external connections manager
@@ -83,7 +84,7 @@ func (self *SMGeneric) sessionStart(evStart SMGenericEvent, connId string) error
sessionId := evStart.GetUUID()
_, err := self.guard.Guard(func() (interface{}, error) { // Lock it on UUID level
var sessionRuns []*engine.SessionRun
if err := self.rater.GetSessionRuns(evStart.AsStoredCdr(self.cgrCfg, self.timezone), &sessionRuns); err != nil {
if err := self.rater.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(self.cgrCfg, self.timezone), &sessionRuns); err != nil {
return nil, err
} else if len(sessionRuns) == 0 {
return nil, nil
@@ -135,7 +136,7 @@ func (self *SMGeneric) GetMaxUsage(gev SMGenericEvent, clnt *rpc2.Client) (time.
gev[utils.EVENT_NAME] = utils.CGR_AUTHORIZATION
storedCdr := gev.AsStoredCdr(config.CgrConfig(), self.timezone)
var maxDur float64
if err := self.rater.GetDerivedMaxSessionTime(storedCdr, &maxDur); err != nil {
if err := self.rater.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil {
return time.Duration(0), err
}
return time.Duration(maxDur), nil
@@ -148,7 +149,7 @@ func (self *SMGeneric) GetLcrSuppliers(gev SMGenericEvent, clnt *rpc2.Client) ([
return nil, err
}
var lcr engine.LCRCost
if err = self.rater.GetLCR(&engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
if err = self.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
return nil, err
}
if lcr.HasErrors() {
@@ -200,7 +201,7 @@ func (self *SMGeneric) SessionEnd(gev SMGenericEvent, clnt *rpc2.Client) error {
func (self *SMGeneric) ProcessCdr(gev SMGenericEvent) error {
var reply string
if err := self.cdrsrv.ProcessCdr(gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil {
if err := self.cdrsrv.Call("Responder.ProcessCdr", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil {
return err
}
return nil