diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index f91663678..9d77c941b 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -32,6 +32,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" "gopkg.in/fsnotify.v1" ) @@ -54,7 +55,7 @@ Common parameters within configs processed: Parameters specific per config instance: * duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields */ -func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrs engine.Connector, closeChan chan struct{}, dfltTimezone string) (*Cdrc, error) { +func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string) (*Cdrc, error) { var cdrcCfg *config.CdrcConfig for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break @@ -85,7 +86,7 @@ type Cdrc struct { cdrcCfgs map[string]*config.CdrcConfig // All cdrc config profiles attached to this CDRC (key will be profile instance name) dfltCdrcCfg *config.CdrcConfig timezone string - cdrs engine.Connector + cdrs rpcclient.RpcClientConnection httpClient *http.Client closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client maxOpenFiles chan struct{} // Maximum number of simultaneous files processed @@ -201,7 +202,7 @@ func (self *Cdrc) processFile(filePath string) error { utils.Logger.Info(fmt.Sprintf(" DryRun CDR: %+v", storedCdr)) continue } - if err := self.cdrs.ProcessCdr(storedCdr, &reply); err != nil { + if err := self.cdrs.Call("Responder.ProcessCdr", storedCdr, &reply); err != nil { utils.Logger.Err(fmt.Sprintf(" Failed sending CDR, %+v, error: %s", storedCdr, err.Error())) } else if reply != "OK" { utils.Logger.Err(fmt.Sprintf(" Received unexpected reply for CDR, %+v, reply: %s", storedCdr, reply)) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d519c7c9c..cc6a85f2f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -104,7 +104,7 @@ func startCdrcs(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan // Fires up a cdrc instance func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *engine.Responder, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, closeChan chan struct{}, exitChan chan bool) { - var cdrsConn engine.Connector + var cdrsConn rpcclient.RpcClientConnection var cdrcCfg *config.CdrcConfig for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break @@ -122,7 +122,7 @@ func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan * exitChan <- true return } - cdrsConn = &engine.RPCClientConnector{Client: conn} + cdrsConn = conn } cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.DefaultTimezone) if err != nil { @@ -138,7 +138,7 @@ func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan * func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internalRaterChan chan *engine.Responder, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS SM-Generic service.") - var raterConn, cdrsConn engine.Connector + var raterConn, cdrsConn rpcclient.RpcClientConnection var client *rpcclient.RpcClient var err error // Connect to rater @@ -154,7 +154,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal exitChan <- true return } - raterConn = &engine.RPCClientConnector{Client: client} + raterConn = client } } // Connect to CDRS @@ -173,7 +173,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal exitChan <- true return } - cdrsConn = &engine.RPCClientConnector{Client: client} + cdrsConn = client } } } @@ -226,7 +226,7 @@ func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) { utils.Logger.Info("Starting CGRateS SM-FreeSWITCH service.") - var raterConn, cdrsConn engine.Connector + var raterConn, cdrsConn rpcclient.RpcClientConnection var client *rpcclient.RpcClient var err error // Connect to rater @@ -242,7 +242,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd exitChan <- true return } - raterConn = &engine.RPCClientConnector{Client: client} + raterConn = client } } // Connect to CDRS @@ -261,7 +261,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd exitChan <- true return } - cdrsConn = &engine.RPCClientConnector{Client: client} + cdrsConn = client } } } @@ -275,7 +275,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) { utils.Logger.Info("Starting CGRateS SM-Kamailio service.") - var raterConn, cdrsConn engine.Connector + var raterConn, cdrsConn rpcclient.RpcClientConnection var client *rpcclient.RpcClient var err error // Connect to rater @@ -291,7 +291,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS exitChan <- true return } - raterConn = &engine.RPCClientConnector{Client: client} + raterConn = client } } // Connect to CDRS @@ -310,7 +310,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS exitChan <- true return } - cdrsConn = &engine.RPCClientConnector{Client: client} + cdrsConn = client } } } @@ -324,7 +324,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) { utils.Logger.Info("Starting CGRateS SM-OpenSIPS service.") - var raterConn, cdrsConn engine.Connector + var raterConn, cdrsConn rpcclient.RpcClientConnection var client *rpcclient.RpcClient var err error // Connect to rater @@ -340,7 +340,7 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS exitChan <- true return } - raterConn = &engine.RPCClientConnector{Client: client} + raterConn = client } } // Connect to CDRS @@ -359,7 +359,7 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS exitChan <- true return } - cdrsConn = &engine.RPCClientConnector{Client: client} + cdrsConn = client } } } @@ -379,7 +379,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, var err error var client *rpcclient.RpcClient // Rater connection init - var raterConn engine.Connector + var raterConn rpcclient.RpcClientConnection if cfg.CDRSRater == utils.INTERNAL { responder := <-internalRaterChan // Wait for rater to come up before start querying raterConn = responder @@ -391,7 +391,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, exitChan <- true return } - raterConn = &engine.RPCClientConnector{Client: client} + raterConn = client } // Pubsub connection init var pubSubConn engine.PublisherSubscriber diff --git a/engine/cdrs.go b/engine/cdrs.go index 99d47226c..34b78110d 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -27,6 +27,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" "github.com/jinzhu/gorm" ) @@ -65,14 +66,14 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } } -func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) { - return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil +func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, client rpcclient.RpcClientConnection, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) { + return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, client: client, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil } type CdrServer struct { cgrCfg *config.CGRConfig cdrDb CdrStorage - rater Connector + client rpcclient.RpcClientConnection pubsub PublisherSubscriber users UserService aliases AliasService @@ -228,7 +229,7 @@ func (self *CdrServer) rateStoreStatsReplicate(cdr *StoredCdr) error { } } // Rate CDR - if self.rater != nil && !cdr.Rated { + if self.client != nil && !cdr.Rated { if err := self.rateCDR(cdr); err != nil { cdr.Cost = -1.0 // If there was an error, mark the CDR cdr.ExtraInfo = err.Error() @@ -275,7 +276,7 @@ func (self *CdrServer) deriveCdrs(storedCdr *StoredCdr) ([]*StoredCdr, error) { attrsDC := &utils.AttrDerivedChargers{Tenant: storedCdr.Tenant, Category: storedCdr.Category, Direction: storedCdr.Direction, Account: storedCdr.Account, Subject: storedCdr.Subject} var dcs utils.DerivedChargers - if err := self.rater.GetDerivedChargers(attrsDC, &dcs); err != nil { + if err := self.client.Call("Responder.GetDerivedChargers", attrsDC, &dcs); err != nil { utils.Logger.Err(fmt.Sprintf("Could not get derived charging for cgrid %s, error: %s", storedCdr.CgrId, err.Error())) return nil, err } @@ -337,11 +338,11 @@ func (self *CdrServer) getCostFromRater(storedCdr *StoredCdr) (*CallCost, error) DurationIndex: storedCdr.Usage, } if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID, utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, storedCdr.ReqType) { // Prepaid - Cost can be recalculated in case of missing records from SM - if err = self.rater.Debit(cd, cc); err == nil { // Debit has occured, we are forced to write the log, even if CDR store is disabled + if err = self.client.Call("Responder.Debit", cd, cc); err == nil { // Debit has occured, we are forced to write the log, even if CDR store is disabled self.cdrDb.LogCallCost(storedCdr.CgrId, utils.CDRS_SOURCE, storedCdr.MediationRunId, cc) } } else { - err = self.rater.GetCost(cd, cc) + err = self.client.Call("Responder.GetCost", cd, cc) } if err != nil { return cc, err diff --git a/engine/responder.go b/engine/responder.go index 531df85da..a54756683 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -31,7 +31,6 @@ import ( "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) // Individual session run @@ -51,17 +50,15 @@ type Responder struct { ExitChan chan bool CdrSrv *CdrServer Stats StatsInterface - Timeout time.Duration Timezone string cnt int64 responseCache *cache2go.ResponseCache } -func NewResponder(exitChan chan bool, cdrSrv *CdrServer, stats StatsInterface, timeout, timeToLive time.Duration) *Responder { +func NewResponder(exitChan chan bool, cdrSrv *CdrServer, stats StatsInterface, timeToLive time.Duration) *Responder { return &Responder{ ExitChan: exitChan, Stats: stats, - Timeout: timeToLive, responseCache: cache2go.NewResponseCache(timeToLive), } } @@ -617,339 +614,27 @@ func (rs *Responder) UnRegisterRater(clientAddress string, replay *int) error { return nil } -func (rs *Responder) GetTimeout(i int, d *time.Duration) error { - *d = rs.Timeout - return nil -} - -// Reflection worker type for not standalone balancer -type ResponderWorker struct{} - -func (rw *ResponderWorker) Call(serviceMethod string, args interface{}, reply interface{}) error { +func (rs *Responder) Call(serviceMethod string, args interface{}, reply interface{}) error { + if !strings.HasPrefix(serviceMethod, "Responder.") { + return utils.ErrNotImplemented + } methodName := strings.TrimLeft(serviceMethod, "Responder.") - switch args.(type) { - case CallDescriptor: - cd := args.(CallDescriptor) - switch reply.(type) { - case *CallCost: - rep := reply.(*CallCost) - method := reflect.ValueOf(&cd).MethodByName(methodName) - ret := method.Call([]reflect.Value{}) - *rep = *(ret[0].Interface().(*CallCost)) - case *float64: - rep := reply.(*float64) - method := reflect.ValueOf(&cd).MethodByName(methodName) - ret := method.Call([]reflect.Value{}) - *rep = *(ret[0].Interface().(*float64)) - } - case string: - switch methodName { - case "Status": - *(reply.(*string)) = "Local!" - case "Shutdown": - *(reply.(*string)) = "Done!" - } - + // get method + method := reflect.ValueOf(rs).MethodByName(methodName) + if !method.IsValid() { + return utils.ErrNotImplemented } - return nil -} -func (rw *ResponderWorker) Close() error { - return nil -} + // construct the params + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} -type Connector interface { - GetCost(*CallDescriptor, *CallCost) error - Debit(*CallDescriptor, *CallCost) error - MaxDebit(*CallDescriptor, *CallCost) error - RefundIncrements(*CallDescriptor, *float64) error - GetMaxSessionTime(*CallDescriptor, *float64) error - GetDerivedChargers(*utils.AttrDerivedChargers, *utils.DerivedChargers) error - GetDerivedMaxSessionTime(*StoredCdr, *float64) error - GetSessionRuns(*StoredCdr, *[]*SessionRun) error - ProcessCdr(*StoredCdr, *string) error - LogCallCost(*CallCostLog, *string) error - GetLCR(*AttrGetLcr, *LCRCost) error - GetTimeout(int, *time.Duration) error -} - -type RPCClientConnector struct { - Client *rpcclient.RpcClient - Timeout time.Duration -} - -func (rcc *RPCClientConnector) GetCost(cd *CallDescriptor, cc *CallCost) error { - return rcc.Client.Call("Responder.GetCost", cd, cc) -} - -func (rcc *RPCClientConnector) Debit(cd *CallDescriptor, cc *CallCost) error { - return rcc.Client.Call("Responder.Debit", cd, cc) -} - -func (rcc *RPCClientConnector) MaxDebit(cd *CallDescriptor, cc *CallCost) error { - return rcc.Client.Call("Responder.MaxDebit", cd, cc) -} - -func (rcc *RPCClientConnector) RefundIncrements(cd *CallDescriptor, resp *float64) error { - return rcc.Client.Call("Responder.RefundIncrements", cd, resp) -} - -func (rcc *RPCClientConnector) GetMaxSessionTime(cd *CallDescriptor, resp *float64) error { - return rcc.Client.Call("Responder.GetMaxSessionTime", cd, resp) -} - -func (rcc *RPCClientConnector) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error { - return rcc.Client.Call("Responder.GetDerivedMaxSessionTime", ev, reply) -} - -func (rcc *RPCClientConnector) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { - return rcc.Client.Call("Responder.GetSessionRuns", ev, sRuns) -} - -func (rcc *RPCClientConnector) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error { - return rcc.Client.Call("ApierV1.GetDerivedChargers", attrs, dcs) -} - -func (rcc *RPCClientConnector) ProcessCdr(cdr *StoredCdr, reply *string) error { - return rcc.Client.Call("CdrsV1.ProcessCdr", cdr, reply) -} - -func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *string) error { - return rcc.Client.Call("CdrsV1.LogCallCost", ccl, reply) -} - -func (rcc *RPCClientConnector) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error { - return rcc.Client.Call("Responder.GetLCR", attrs, reply) -} - -func (rcc *RPCClientConnector) GetTimeout(i int, d *time.Duration) error { - *d = rcc.Timeout - return nil -} - -type ConnectorPool []Connector - -func (cp ConnectorPool) GetCost(cd *CallDescriptor, cc *CallCost) error { - for _, con := range cp { - c := make(chan error, 1) - callCost := &CallCost{} - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.GetCost(cd, callCost) }() - select { - case err := <-c: - *cc = *callCost - return err - case <-time.After(timeout): - // call timed out, continue - } + ret := method.Call(params) + if len(ret) != 1 { + return utils.ErrServerError } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) Debit(cd *CallDescriptor, cc *CallCost) error { - for _, con := range cp { - c := make(chan error, 1) - callCost := &CallCost{} - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.Debit(cd, callCost) }() - select { - case err := <-c: - *cc = *callCost - return err - case <-time.After(timeout): - // call timed out, continue - } + err, ok := ret[0].Interface().(error) + if !ok { + return utils.ErrServerError } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) MaxDebit(cd *CallDescriptor, cc *CallCost) error { - for _, con := range cp { - c := make(chan error, 1) - callCost := &CallCost{} - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.MaxDebit(cd, callCost) }() - select { - case err := <-c: - *cc = *callCost - return err - case <-time.After(timeout): - // call timed out, continue - } - } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) RefundIncrements(cd *CallDescriptor, resp *float64) error { - for _, con := range cp { - c := make(chan error, 1) - var r float64 - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.RefundIncrements(cd, &r) }() - select { - case err := <-c: - *resp = r - return err - case <-time.After(timeout): - // call timed out, continue - } - } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) GetMaxSessionTime(cd *CallDescriptor, resp *float64) error { - for _, con := range cp { - c := make(chan error, 1) - var r float64 - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.GetMaxSessionTime(cd, &r) }() - select { - case err := <-c: - *resp = r - return err - case <-time.After(timeout): - // call timed out, continue - } - } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error { - for _, con := range cp { - c := make(chan error, 1) - var r float64 - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.GetDerivedMaxSessionTime(ev, &r) }() - select { - case err := <-c: - *reply = r - return err - case <-time.After(timeout): - // call timed out, continue - } - } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { - for _, con := range cp { - c := make(chan error, 1) - sr := make([]*SessionRun, 0) - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.GetSessionRuns(ev, &sr) }() - select { - case err := <-c: - *sRuns = sr - return err - case <-time.After(timeout): - // call timed out, continue - } - } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error { - for _, con := range cp { - c := make(chan error, 1) - derivedChargers := utils.DerivedChargers{} - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.GetDerivedChargers(attrs, &derivedChargers) }() - select { - case err := <-c: - *dcs = derivedChargers - return err - case <-time.After(timeout): - // call timed out, continue - } - } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) ProcessCdr(cdr *StoredCdr, reply *string) error { - for _, con := range cp { - c := make(chan error, 1) - var r string - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.ProcessCdr(cdr, &r) }() - select { - case err := <-c: - *reply = r - return err - case <-time.After(timeout): - // call timed out, continue - } - } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) LogCallCost(ccl *CallCostLog, reply *string) error { - for _, con := range cp { - c := make(chan error, 1) - var r string - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.LogCallCost(ccl, &r) }() - select { - case err := <-c: - *reply = r - return err - case <-time.After(timeout): - // call timed out, continue - } - } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) GetLCR(attr *AttrGetLcr, reply *LCRCost) error { - for _, con := range cp { - c := make(chan error, 1) - lcrCost := &LCRCost{} - - var timeout time.Duration - con.GetTimeout(0, &timeout) - - go func() { c <- con.GetLCR(attr, lcrCost) }() - select { - case err := <-c: - *reply = *lcrCost - return err - case <-time.After(timeout): - // call timed out, continue - } - } - return utils.ErrTimedOut -} - -func (cp ConnectorPool) GetTimeout(i int, d *time.Duration) error { - *d = 0 - return nil + return err } diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 47a728fbd..969dacf4e 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -30,9 +30,10 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/fsock" + "github.com/cgrates/rpcclient" ) -func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Connector, timezone string) *FSSessionManager { +func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs rpcclient.RpcClientConnection, timezone string) *FSSessionManager { return &FSSessionManager{ cfg: smFsConfig, conns: make(map[string]*fsock.FSock), @@ -50,10 +51,11 @@ type FSSessionManager struct { cfg *config.SmFsConfig conns map[string]*fsock.FSock // Keep the list here for connection management purposes senderPools map[string]*fsock.FSockPool // Keep sender pools here - rater engine.Connector - cdrsrv engine.Connector - sessions *Sessions - timezone string + rater rpcclient.RpcClientConnection + cdrsrv rpcclient.RpcClientConnection + + sessions *Sessions + timezone string } func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) { @@ -107,7 +109,7 @@ func (sm *FSSessionManager) setCgrLcr(ev engine.Event, connId string) error { TimeStart: startTime, TimeEnd: startTime.Add(config.CgrConfig().MaxCallDuration), } - if err := sm.rater.GetLCR(&engine.AttrGetLcr{CallDescriptor: cd}, &lcrCost); err != nil { + if err := sm.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcrCost); err != nil { return err } supps := []string{} @@ -131,7 +133,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { return } var maxCallDuration float64 // This will be the maximum duration this channel will be allowed to last - if err := sm.rater.GetDerivedMaxSessionTime(ev.AsStoredCdr(config.CgrConfig().DefaultTimezone), &maxCallDuration); err != nil { + if err := sm.rater.Call("Responder.GetDerivedMaxSessionTime", ev.AsStoredCdr(config.CgrConfig().DefaultTimezone), &maxCallDuration); err != nil { utils.Logger.Err(fmt.Sprintf(" Could not get max session time for %s, error: %s", ev.GetUUID(), err.Error())) } if maxCallDuration != -1 { // For calls different than unlimited, set limits @@ -152,7 +154,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { return } var lcr engine.LCRCost - if err = sm.Rater().GetLCR(&engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { + if err = sm.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { utils.Logger.Info(fmt.Sprintf(" LCR_API_ERROR: %s", err.Error())) sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) } @@ -294,7 +296,7 @@ func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify st func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error { var reply string - if err := sm.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil { + if err := sm.cdrsrv.Call("Responder.ProcessCdr", storedCdr, &reply); err != nil { utils.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error())) } return nil @@ -304,11 +306,11 @@ func (sm *FSSessionManager) DebitInterval() time.Duration { return sm.cfg.DebitInterval } -func (sm *FSSessionManager) CdrSrv() engine.Connector { +func (sm *FSSessionManager) CdrSrv() rpcclient.RpcClientConnection { return sm.cdrsrv } -func (sm *FSSessionManager) Rater() engine.Connector { +func (sm *FSSessionManager) Rater() rpcclient.RpcClientConnection { return sm.rater } diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index f6ed04f85..e195bfe8c 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -29,17 +29,18 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/kamevapi" + "github.com/cgrates/rpcclient" ) -func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv engine.Connector, timezone string) (*KamailioSessionManager, error) { +func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv rpcclient.RpcClientConnection, timezone string) (*KamailioSessionManager, error) { ksm := &KamailioSessionManager{cfg: smKamCfg, rater: rater, cdrsrv: cdrsrv, timezone: timezone, conns: make(map[string]*kamevapi.KamEvapi), sessions: NewSessions()} return ksm, nil } type KamailioSessionManager struct { cfg *config.SmKamConfig - rater engine.Connector - cdrsrv engine.Connector + rater rpcclient.RpcClientConnection + cdrsrv rpcclient.RpcClientConnection timezone string conns map[string]*kamevapi.KamEvapi sessions *Sessions @@ -64,7 +65,7 @@ func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) { } var remainingDuration float64 var errMaxSession error - if errMaxSession = self.rater.GetDerivedMaxSessionTime(kev.AsStoredCdr(self.Timezone()), &remainingDuration); errMaxSession != nil { + if errMaxSession = self.rater.Call("Responder.GetDerivedMaxSessionTime", kev.AsStoredCdr(self.Timezone()), &remainingDuration); errMaxSession != nil { utils.Logger.Err(fmt.Sprintf(" Could not get max session time, error: %s", errMaxSession.Error())) } var supplStr string @@ -107,7 +108,7 @@ func (self *KamailioSessionManager) getSuppliers(kev KamEvent) (string, error) { return "", errors.New("LCR_PREPROCESS_ERROR") } var lcr engine.LCRCost - if err = self.Rater().GetLCR(&engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { + if err = self.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { utils.Logger.Info(fmt.Sprintf(" LCR_API_ERROR error: %s", err.Error())) return "", errors.New("LCR_API_ERROR") } @@ -196,10 +197,10 @@ func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, n func (self *KamailioSessionManager) DebitInterval() time.Duration { return self.cfg.DebitInterval } -func (self *KamailioSessionManager) CdrSrv() engine.Connector { +func (self *KamailioSessionManager) CdrSrv() rpcclient.RpcClientConnection { return self.cdrsrv } -func (self *KamailioSessionManager) Rater() engine.Connector { +func (self *KamailioSessionManager) Rater() rpcclient.RpcClientConnection { return self.rater } @@ -208,7 +209,7 @@ func (self *KamailioSessionManager) ProcessCdr(cdr *engine.StoredCdr) error { return nil } var reply string - if err := self.cdrsrv.ProcessCdr(cdr, &reply); err != nil { + if err := self.cdrsrv.Call("Responder.ProcessCdr", cdr, &reply); err != nil { utils.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CgrId, cdr.AccId, err.Error())) } return nil diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index 315cf86fc..f3a91e4a5 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -29,6 +29,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/osipsdagram" + "github.com/cgrates/rpcclient" ) /* @@ -80,7 +81,7 @@ duration:: */ -func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, reconnects int, rater, cdrsrv engine.Connector, timezone string) (*OsipsSessionManager, error) { +func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, reconnects int, rater, cdrsrv rpcclient.RpcClientConnection, timezone string) (*OsipsSessionManager, error) { osm := &OsipsSessionManager{cfg: smOsipsCfg, reconnects: reconnects, rater: rater, cdrsrv: cdrsrv, timezone: timezone, cdrStartEvents: make(map[string]*OsipsEvent), sessions: NewSessions()} osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){ "E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, // Raised when OpenSIPS starts so we can register our event handlers @@ -94,8 +95,8 @@ func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, reconnects int, ra type OsipsSessionManager struct { cfg *config.SmOsipsConfig reconnects int - rater engine.Connector - cdrsrv engine.Connector + rater rpcclient.RpcClientConnection + cdrsrv rpcclient.RpcClientConnection timezone string eventHandlers map[string][]func(*osipsdagram.OsipsEvent) evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it @@ -130,12 +131,12 @@ func (osm *OsipsSessionManager) DebitInterval() time.Duration { } // Returns the connection to local cdr database, used by session to log it's final costs -func (osm *OsipsSessionManager) CdrSrv() engine.Connector { +func (osm *OsipsSessionManager) CdrSrv() rpcclient.RpcClientConnection { return osm.cdrsrv } // Returns connection to rater/controller -func (osm *OsipsSessionManager) Rater() engine.Connector { +func (osm *OsipsSessionManager) Rater() rpcclient.RpcClientConnection { return osm.rater } @@ -152,7 +153,7 @@ func (osm *OsipsSessionManager) Shutdown() error { // Process the CDR with CDRS component func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error { var reply string - return osm.cdrsrv.ProcessCdr(storedCdr, &reply) + return osm.cdrsrv.Call("Responder.ProcessCdr", storedCdr, &reply) } // Disconnects the session diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 9fde47fb7..82347807c 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -58,7 +58,7 @@ func NewSession(ev engine.Event, connId string, sm SessionManager) *Session { sessionManager: sm, connId: connId, } - if err := sm.Rater().GetSessionRuns(ev.AsStoredCdr(s.sessionManager.Timezone()), &s.sessionRuns); err != nil || len(s.sessionRuns) == 0 { + if err := sm.Rater().Call("Responder.GetSessionRuns", ev.AsStoredCdr(s.sessionManager.Timezone()), &s.sessionRuns); err != nil || len(s.sessionRuns) == 0 { return nil } for runIdx := range s.sessionRuns { @@ -86,7 +86,7 @@ func (s *Session) debitLoop(runIdx int) { nextCd.LoopIndex = index nextCd.DurationIndex += debitPeriod // first presumed duration cc := new(engine.CallCost) - if err := s.sessionManager.Rater().MaxDebit(nextCd, cc); err != nil { + if err := s.sessionManager.Rater().Call("Responder.MaxDebit", nextCd, cc); err != nil { utils.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err)) if err.Error() == utils.ErrUnauthorizedDestination.Error() { s.sessionManager.DisconnectSession(s.eventStart, s.connId, UNAUTHORIZED_DESTINATION) @@ -204,7 +204,7 @@ func (s *Session) Refund(lastCC *engine.CallCost, hangupTime time.Time) error { Increments: refundIncrements, } var response float64 - err := s.sessionManager.Rater().RefundIncrements(cd, &response) + err := s.sessionManager.Rater().Call("Responder.RefundIncrements", cd, &response) if err != nil { return err } @@ -233,7 +233,7 @@ func (s *Session) SaveOperations() { } var reply string - err := s.sessionManager.CdrSrv().LogCallCost(&engine.CallCostLog{ + err := s.sessionManager.CdrSrv().Call("Responder.LogCallCost", &engine.CallCostLog{ CgrId: s.eventStart.GetCgrId(s.sessionManager.Timezone()), Source: utils.SESSION_MANAGER_SOURCE, RunId: sr.DerivedCharger.RunId, diff --git a/sessionmanager/session_test.go b/sessionmanager/session_test.go index 61fd636d3..7b1c10292 100644 --- a/sessionmanager/session_test.go +++ b/sessionmanager/session_test.go @@ -23,7 +23,6 @@ import ( "time" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" ) //"github.com/cgrates/cgrates/config" @@ -80,30 +79,19 @@ func TestSessionNilSession(t *testing.T) { } */ -type MockConnector struct { +type MockRpcClient struct { refundCd *engine.CallDescriptor } -func (mc *MockConnector) GetCost(*engine.CallDescriptor, *engine.CallCost) error { return nil } -func (mc *MockConnector) Debit(*engine.CallDescriptor, *engine.CallCost) error { return nil } -func (mc *MockConnector) MaxDebit(*engine.CallDescriptor, *engine.CallCost) error { return nil } -func (mc *MockConnector) RefundIncrements(cd *engine.CallDescriptor, reply *float64) error { - mc.refundCd = cd +func (mc *MockRpcClient) Call(methodName string, arg interface{}, reply interface{}) error { + if cd, ok := arg.(*engine.CallDescriptor); ok { + mc.refundCd = cd + } return nil } -func (mc *MockConnector) GetMaxSessionTime(*engine.CallDescriptor, *float64) error { return nil } -func (mc *MockConnector) GetDerivedChargers(*utils.AttrDerivedChargers, *utils.DerivedChargers) error { - return nil -} -func (mc *MockConnector) GetDerivedMaxSessionTime(*engine.StoredCdr, *float64) error { return nil } -func (mc *MockConnector) GetSessionRuns(*engine.StoredCdr, *[]*engine.SessionRun) error { return nil } -func (mc *MockConnector) ProcessCdr(*engine.StoredCdr, *string) error { return nil } -func (mc *MockConnector) LogCallCost(*engine.CallCostLog, *string) error { return nil } -func (mc *MockConnector) GetLCR(*engine.AttrGetLcr, *engine.LCRCost) error { return nil } -func (mc *MockConnector) GetTimeout(int, *time.Duration) error { return nil } func TestSessionRefund(t *testing.T) { - mc := &MockConnector{} + mc := &MockRpcClient{} s := &Session{sessionManager: &FSSessionManager{rater: mc}} ts := &engine.TimeSpan{ TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC), @@ -123,7 +111,7 @@ func TestSessionRefund(t *testing.T) { } func TestSessionRefundAll(t *testing.T) { - mc := &MockConnector{} + mc := &MockRpcClient{} s := &Session{sessionManager: &FSSessionManager{rater: mc}} ts := &engine.TimeSpan{ TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC), @@ -143,7 +131,7 @@ func TestSessionRefundAll(t *testing.T) { } func TestSessionRefundManyAll(t *testing.T) { - mc := &MockConnector{} + mc := &MockRpcClient{} s := &Session{sessionManager: &FSSessionManager{rater: mc}} ts1 := &engine.TimeSpan{ TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC), diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 8d2155f11..7a9b5696b 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -22,11 +22,12 @@ import ( "time" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/rpcclient" ) type SessionManager interface { - Rater() engine.Connector - CdrSrv() engine.Connector + Rater() rpcclient.RpcClientConnection + CdrSrv() rpcclient.RpcClientConnection DebitInterval() time.Duration DisconnectSession(engine.Event, string, string) error WarnSessionMinDuration(string, string) diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index 2ee36f58e..97f3226e9 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -25,6 +25,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) // One session handled by SM @@ -34,8 +35,8 @@ type SMGSession struct { connId string // Reference towards connection id on the session manager side. runId string // Keep a reference for the derived run timezone string - rater engine.Connector // Connector to Rater service - cdrsrv engine.Connector // Connector to CDRS service + rater rpcclient.RpcClientConnection // Connector to Rater service + cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service extconns *SMGExternalConnections cd *engine.CallDescriptor sessionCds []*engine.CallDescriptor @@ -86,7 +87,7 @@ func (self *SMGSession) debit(dur time.Duration) (time.Duration, error) { self.cd.TimeEnd = self.cd.TimeStart.Add(dur) self.cd.DurationIndex += dur cc := &engine.CallCost{} - if err := self.rater.MaxDebit(self.cd, cc); err != nil { + if err := self.rater.Call("Responder.MaxDebit", self.cd, cc); err != nil { return 0, err } // cd corrections @@ -155,7 +156,7 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { Increments: refundIncrements, } var response float64 - err := self.rater.RefundIncrements(cd, &response) + err := self.rater.Call("Responder.RefundIncrements", cd, &response) if err != nil { return err } @@ -204,7 +205,7 @@ func (self *SMGSession) saveOperations() error { firstCC.Merge(cc) } var reply string - err := self.cdrsrv.LogCallCost(&engine.CallCostLog{ + err := self.cdrsrv.Call("Responder.LogCallCost", &engine.CallCostLog{ CgrId: self.eventStart.GetCgrId(self.timezone), Source: utils.SESSION_MANAGER_SOURCE, RunId: self.runId, diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 9d0490fca..6546991a3 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -28,9 +28,10 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) -func NewSMGeneric(cgrCfg *config.CGRConfig, rater engine.Connector, cdrsrv engine.Connector, timezone string, extconns *SMGExternalConnections) *SMGeneric { +func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, timezone string, extconns *SMGExternalConnections) *SMGeneric { gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone, sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.NewGuardianLock()} return gsm @@ -38,8 +39,8 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rater engine.Connector, cdrsrv engin type SMGeneric struct { cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple - rater engine.Connector - cdrsrv engine.Connector + rater rpcclient.RpcClientConnection + cdrsrv rpcclient.RpcClientConnection timezone string sessions map[string][]*SMGSession //Group sessions per sessionId, multiple runs based on derived charging extconns *SMGExternalConnections // Reference towards external connections manager @@ -83,7 +84,7 @@ func (self *SMGeneric) sessionStart(evStart SMGenericEvent, connId string) error sessionId := evStart.GetUUID() _, err := self.guard.Guard(func() (interface{}, error) { // Lock it on UUID level var sessionRuns []*engine.SessionRun - if err := self.rater.GetSessionRuns(evStart.AsStoredCdr(self.cgrCfg, self.timezone), &sessionRuns); err != nil { + if err := self.rater.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(self.cgrCfg, self.timezone), &sessionRuns); err != nil { return nil, err } else if len(sessionRuns) == 0 { return nil, nil @@ -135,7 +136,7 @@ func (self *SMGeneric) GetMaxUsage(gev SMGenericEvent, clnt *rpc2.Client) (time. gev[utils.EVENT_NAME] = utils.CGR_AUTHORIZATION storedCdr := gev.AsStoredCdr(config.CgrConfig(), self.timezone) var maxDur float64 - if err := self.rater.GetDerivedMaxSessionTime(storedCdr, &maxDur); err != nil { + if err := self.rater.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil { return time.Duration(0), err } return time.Duration(maxDur), nil @@ -148,7 +149,7 @@ func (self *SMGeneric) GetLcrSuppliers(gev SMGenericEvent, clnt *rpc2.Client) ([ return nil, err } var lcr engine.LCRCost - if err = self.rater.GetLCR(&engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { + if err = self.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { return nil, err } if lcr.HasErrors() { @@ -200,7 +201,7 @@ func (self *SMGeneric) SessionEnd(gev SMGenericEvent, clnt *rpc2.Client) error { func (self *SMGeneric) ProcessCdr(gev SMGenericEvent) error { var reply string - if err := self.cdrsrv.ProcessCdr(gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil { + if err := self.cdrsrv.Call("Responder.ProcessCdr", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil { return err } return nil