diff --git a/apier/v1/smgenericbirpcv1.go b/apier/v1/smgenericbirpcv1.go
index 8b9a0812e..0fa8dfe0d 100644
--- a/apier/v1/smgenericbirpcv1.go
+++ b/apier/v1/smgenericbirpcv1.go
@@ -18,8 +18,6 @@ along with this program. If not, see
package v1
import (
- "time"
-
"github.com/cenk/rpc2"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
@@ -47,72 +45,43 @@ func (self *SMGenericBiRpcV1) Handlers() map[string]interface{} {
/// Returns MaxUsage (for calls in seconds), -1 for no limit
func (self *SMGenericBiRpcV1) MaxUsage(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, maxUsage *float64) error {
- maxUsageDur, err := self.sm.MaxUsage(ev, clnt)
- if err != nil {
- return utils.NewErrServerError(err)
- }
- if maxUsageDur == time.Duration(-1) {
- *maxUsage = -1.0
- } else {
- *maxUsage = maxUsageDur.Seconds()
- }
- return nil
+ return self.sm.BiRPCV1MaxUsage(clnt, ev, maxUsage)
}
/// Returns list of suppliers which can be used for the request
func (self *SMGenericBiRpcV1) LCRSuppliers(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, suppliers *[]string) error {
- if supls, err := self.sm.LCRSuppliers(ev, clnt); err != nil {
- return utils.NewErrServerError(err)
- } else {
- *suppliers = supls
- }
- return nil
+ return self.sm.BiRPCV1LCRSuppliers(clnt, ev, suppliers)
}
// Called on session start, returns the maximum number of seconds the session can last
func (self *SMGenericBiRpcV1) InitiateSession(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, maxUsage *float64) error {
- if minMaxUsage, err := self.sm.InitiateSession(ev, clnt); err != nil {
- return utils.NewErrServerError(err)
- } else {
- *maxUsage = minMaxUsage.Seconds()
- }
- return nil
+ return self.sm.BiRPCV1InitiateSession(clnt, ev, maxUsage)
}
// Interim updates, returns remaining duration from the rater
func (self *SMGenericBiRpcV1) UpdateSession(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, maxUsage *float64) error {
- if minMaxUsage, err := self.sm.UpdateSession(ev, clnt); err != nil {
- return utils.NewErrServerError(err)
- } else {
- *maxUsage = minMaxUsage.Seconds()
- }
- return nil
+ return self.sm.BiRPCV1UpdateSession(clnt, ev, maxUsage)
}
// Called on session end, should stop debit loop
func (self *SMGenericBiRpcV1) TerminateSession(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, reply *string) error {
- if err := self.sm.TerminateSession(ev, clnt); err != nil {
- return utils.NewErrServerError(err)
- }
- *reply = utils.OK
- return nil
+ return self.sm.BiRPCV1TerminateSession(clnt, ev, reply)
}
// Called on individual Events (eg SMS)
func (self *SMGenericBiRpcV1) ChargeEvent(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, maxUsage *float64) error {
- if minMaxUsage, err := self.sm.ChargeEvent(ev, clnt); err != nil {
- return utils.NewErrServerError(err)
- } else {
- *maxUsage = minMaxUsage.Seconds()
- }
- return nil
+ return self.sm.BiRPCV1ChargeEvent(clnt, ev, maxUsage)
}
// Called on session end, should send the CDR to CDRS
func (self *SMGenericBiRpcV1) ProcessCDR(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, reply *string) error {
- if err := self.sm.ProcessCDR(ev); err != nil {
- return utils.NewErrServerError(err)
- }
- *reply = utils.OK
- return nil
+ return self.sm.BiRPCV1ProcessCDR(clnt, ev, reply)
+}
+
+func (self *SMGenericBiRpcV1) ActiveSessions(clnt *rpc2.Client, attrs utils.AttrSMGGetActiveSessions, reply *[]*sessionmanager.ActiveSession) error {
+ return self.sm.BiRPCV1ActiveSessions(clnt, attrs, reply)
+}
+
+func (self *SMGenericBiRpcV1) ActiveSessionsCount(attrs utils.AttrSMGGetActiveSessions, reply *int) error {
+ return self.sm.BiRPCV1ActiveSessionsCount(attrs, reply)
}
diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go
index 4af8b656d..9000751a9 100644
--- a/apier/v1/smgenericv1.go
+++ b/apier/v1/smgenericv1.go
@@ -18,6 +18,8 @@ along with this program. If not, see
package v1
import (
+ "reflect"
+ "strings"
"time"
"github.com/cgrates/cgrates/sessionmanager"
@@ -36,7 +38,7 @@ type SMGenericV1 struct {
// Returns MaxUsage (for calls in seconds), -1 for no limit
func (self *SMGenericV1) MaxUsage(ev sessionmanager.SMGenericEvent, maxUsage *float64) error {
- maxUsageDur, err := self.sm.MaxUsage(ev, nil)
+ maxUsageDur, err := self.sm.MaxUsage(ev)
if err != nil {
return utils.NewErrServerError(err)
}
@@ -50,7 +52,7 @@ func (self *SMGenericV1) MaxUsage(ev sessionmanager.SMGenericEvent, maxUsage *fl
// Returns list of suppliers which can be used for the request
func (self *SMGenericV1) LCRSuppliers(ev sessionmanager.SMGenericEvent, suppliers *[]string) error {
- if supls, err := self.sm.LCRSuppliers(ev, nil); err != nil {
+ if supls, err := self.sm.LCRSuppliers(ev); err != nil {
return utils.NewErrServerError(err)
} else {
*suppliers = supls
@@ -89,7 +91,7 @@ func (self *SMGenericV1) TerminateSession(ev sessionmanager.SMGenericEvent, repl
// Called on individual Events (eg SMS)
func (self *SMGenericV1) ChargeEvent(ev sessionmanager.SMGenericEvent, maxUsage *float64) error {
- if minMaxUsage, err := self.sm.ChargeEvent(ev, nil); err != nil {
+ if minMaxUsage, err := self.sm.ChargeEvent(ev); err != nil {
return utils.NewErrServerError(err)
} else {
*maxUsage = minMaxUsage.Seconds()
@@ -126,98 +128,25 @@ func (self *SMGenericV1) ActiveSessionsCount(attrs utils.AttrSMGGetActiveSession
// rpcclient.RpcClientConnection interface
func (self *SMGenericV1) Call(serviceMethod string, args interface{}, reply interface{}) error {
- switch serviceMethod {
- case "SMGenericV1.MaxUsage":
- argsConverted, canConvert := args.(sessionmanager.SMGenericEvent)
- if !canConvert {
- return rpcclient.ErrWrongArgsType
- }
- replyConverted, canConvert := reply.(*float64)
- if !canConvert {
- return rpcclient.ErrWrongReplyType
- }
- return self.MaxUsage(argsConverted, replyConverted)
- case "SMGenericV1.LCRSuppliers":
- argsConverted, canConvert := args.(sessionmanager.SMGenericEvent)
- if !canConvert {
- return rpcclient.ErrWrongArgsType
- }
- replyConverted, canConvert := reply.(*[]string)
- if !canConvert {
- return rpcclient.ErrWrongReplyType
- }
- return self.LCRSuppliers(argsConverted, replyConverted)
- case "SMGenericV1.InitiateSession":
- argsConverted, canConvert := args.(sessionmanager.SMGenericEvent)
- if !canConvert {
- return rpcclient.ErrWrongArgsType
- }
- replyConverted, canConvert := reply.(*float64)
- if !canConvert {
- return rpcclient.ErrWrongReplyType
- }
- return self.InitiateSession(argsConverted, replyConverted)
- case "SMGenericV1.UpdateSession":
- argsConverted, canConvert := args.(sessionmanager.SMGenericEvent)
- if !canConvert {
- return rpcclient.ErrWrongArgsType
- }
- replyConverted, canConvert := reply.(*float64)
- if !canConvert {
- return rpcclient.ErrWrongReplyType
- }
- return self.UpdateSession(argsConverted, replyConverted)
- case "SMGenericV1.TerminateSession":
- argsConverted, canConvert := args.(sessionmanager.SMGenericEvent)
- if !canConvert {
- return rpcclient.ErrWrongArgsType
- }
- replyConverted, canConvert := reply.(*string)
- if !canConvert {
- return rpcclient.ErrWrongReplyType
- }
- return self.TerminateSession(argsConverted, replyConverted)
- case "SMGenericV1.ChargeEvent":
- argsConverted, canConvert := args.(sessionmanager.SMGenericEvent)
- if !canConvert {
- return rpcclient.ErrWrongArgsType
- }
- replyConverted, canConvert := reply.(*float64)
- if !canConvert {
- return rpcclient.ErrWrongReplyType
- }
- return self.ChargeEvent(argsConverted, replyConverted)
- case "SMGenericV1.ProcessCDR":
- argsConverted, canConvert := args.(sessionmanager.SMGenericEvent)
- if !canConvert {
- return rpcclient.ErrWrongArgsType
- }
- replyConverted, canConvert := reply.(*string)
- if !canConvert {
- return rpcclient.ErrWrongReplyType
- }
- return self.ProcessCDR(argsConverted, replyConverted)
- case "SMGenericV1.ActiveSessions":
- argsConverted, canConvert := args.(utils.AttrSMGGetActiveSessions)
- if !canConvert {
- return rpcclient.ErrWrongArgsType
- }
- replyConverted, canConvert := reply.(*[]*sessionmanager.ActiveSession)
- if !canConvert {
- return rpcclient.ErrWrongReplyType
- }
- return self.ActiveSessions(argsConverted, replyConverted)
-
- case "SMGenericV1.ActiveSessionsCount":
- argsConverted, canConvert := args.(utils.AttrSMGGetActiveSessions)
- if !canConvert {
- return rpcclient.ErrWrongArgsType
- }
- replyConverted, canConvert := reply.(*int)
- if !canConvert {
- return rpcclient.ErrWrongReplyType
- }
- return self.ActiveSessionsCount(argsConverted, replyConverted)
+ methodSplit := strings.Split(serviceMethod, ".")
+ if len(methodSplit) != 2 {
+ return rpcclient.ErrUnsupporteServiceMethod
}
- return rpcclient.ErrUnsupporteServiceMethod
+ method := reflect.ValueOf(self).MethodByName(methodSplit[1])
+ if !method.IsValid() {
+ return rpcclient.ErrUnsupporteServiceMethod
+ }
+ params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
+ ret := method.Call(params)
+ if len(ret) != 1 {
+ return utils.ErrServerError
+ }
+ if ret[0].Interface() == nil {
+ return nil
+ }
+ err, ok := ret[0].Interface().(error)
+ if !ok {
+ return utils.ErrServerError
+ }
+ return err
}
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index 43469f5a1..95e27604b 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -161,9 +161,10 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
server.RpcRegister(smgRpc)
internalSMGChan <- smgRpc
// Register BiRpc handlers
+ //server.BiRPCRegister(v1.NewSMGenericBiRpcV1(sm))
smgBiRpc := v1.NewSMGenericBiRpcV1(sm)
for method, handler := range smgBiRpc.Handlers() {
- server.BijsonRegisterName(method, handler)
+ server.BiRPCRegisterName(method, handler)
}
}
diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go
index 0405f294e..beecdec4e 100644
--- a/sessionmanager/smgeneric.go
+++ b/sessionmanager/smgeneric.go
@@ -20,11 +20,11 @@ package sessionmanager
import (
"errors"
"fmt"
+ "reflect"
"strings"
"sync"
"time"
- "github.com/cenk/rpc2"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -67,10 +67,10 @@ type smgSessionTerminator struct {
}
// Updates the timer for the session to a new ttl and terminate info
-func (self *SMGeneric) resetTerminatorTimer(uuid string, ttl time.Duration, ttlLastUsed, ttlUsage *time.Duration) {
- self.sessionsMux.RLock()
- defer self.sessionsMux.RUnlock()
- if st, found := self.sessionTerminators[uuid]; found {
+func (smg *SMGeneric) resetTerminatorTimer(uuid string, ttl time.Duration, ttlLastUsed, ttlUsage *time.Duration) {
+ smg.sessionsMux.RLock()
+ defer smg.sessionsMux.RUnlock()
+ if st, found := smg.sessionTerminators[uuid]; found {
if ttl != 0 {
st.ttl = ttl
}
@@ -85,28 +85,28 @@ func (self *SMGeneric) resetTerminatorTimer(uuid string, ttl time.Duration, ttlL
}
// Called when a session timeouts
-func (self *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) {
+func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) {
debitUsage := tmtr.ttl
if tmtr.ttlUsage != nil {
debitUsage = *tmtr.ttlUsage
}
- for _, s := range self.getSession(s.eventStart.GetUUID()) {
+ for _, s := range smg.getSession(s.eventStart.GetUUID()) {
s.debit(debitUsage, tmtr.ttlLastUsed)
}
- self.sessionEnd(s.eventStart.GetUUID(), s.TotalUsage())
- cdr := s.eventStart.AsStoredCdr(self.cgrCfg, self.timezone)
+ smg.sessionEnd(s.eventStart.GetUUID(), s.TotalUsage())
+ cdr := s.eventStart.AsStoredCdr(smg.cgrCfg, smg.timezone)
cdr.Usage = s.TotalUsage()
var reply string
- self.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply)
+ smg.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply)
}
-func (self *SMGeneric) recordSession(uuid string, s *SMGSession) {
- self.sessionsMux.Lock()
- defer self.sessionsMux.Unlock()
- self.sessions[uuid] = append(self.sessions[uuid], s)
- if self.cgrCfg.SmGenericConfig.SessionTTL != 0 {
- if _, found := self.sessionTerminators[uuid]; !found {
- ttl := self.cgrCfg.SmGenericConfig.SessionTTL
+func (smg *SMGeneric) recordSession(uuid string, s *SMGSession) {
+ smg.sessionsMux.Lock()
+ defer smg.sessionsMux.Unlock()
+ smg.sessions[uuid] = append(smg.sessions[uuid], s)
+ if smg.cgrCfg.SmGenericConfig.SessionTTL != 0 {
+ if _, found := smg.sessionTerminators[uuid]; !found {
+ ttl := smg.cgrCfg.SmGenericConfig.SessionTTL
if ttlEv := s.eventStart.GetSessionTTL(); ttlEv != 0 {
ttl = ttlEv
}
@@ -119,11 +119,11 @@ func (self *SMGeneric) recordSession(uuid string, s *SMGSession) {
ttlLastUsed: s.eventStart.GetSessionTTLLastUsed(),
ttlUsage: s.eventStart.GetSessionTTLUsage(),
}
- self.sessionTerminators[uuid] = terminator
+ smg.sessionTerminators[uuid] = terminator
go func() {
select {
case <-timer.C:
- self.ttlTerminate(s, terminator)
+ smg.ttlTerminate(s, terminator)
case <-endChan:
timer.Stop()
}
@@ -131,31 +131,31 @@ func (self *SMGeneric) recordSession(uuid string, s *SMGSession) {
}
}
- self.indexSession(uuid, s)
+ smg.indexSession(uuid, s)
}
// Remove session from session list, removes all related in case of multiple runs, true if item was found
-func (self *SMGeneric) unrecordSession(uuid string) bool {
- self.sessionsMux.Lock()
- defer self.sessionsMux.Unlock()
- if _, found := self.sessions[uuid]; !found {
+func (smg *SMGeneric) unrecordSession(uuid string) bool {
+ smg.sessionsMux.Lock()
+ defer smg.sessionsMux.Unlock()
+ if _, found := smg.sessions[uuid]; !found {
return false
}
- delete(self.sessions, uuid)
- if st, found := self.sessionTerminators[uuid]; found {
+ delete(smg.sessions, uuid)
+ if st, found := smg.sessionTerminators[uuid]; found {
st.endChan <- true
- delete(self.sessionTerminators, uuid)
+ delete(smg.sessionTerminators, uuid)
}
- self.unindexSession(uuid)
+ smg.unindexSession(uuid)
return true
}
-// indexSession explores settings and builds self.sessionIndexes based on that
-func (self *SMGeneric) indexSession(uuid string, s *SMGSession) bool {
- self.sessionIndexMux.Lock()
- defer self.sessionIndexMux.Unlock()
+// indexSession explores settings and builds smg.sessionIndexes based on that
+func (smg *SMGeneric) indexSession(uuid string, s *SMGSession) bool {
+ smg.sessionIndexMux.Lock()
+ defer smg.sessionIndexMux.Unlock()
ev := s.eventStart
- for _, fieldName := range self.cgrCfg.SmGenericConfig.SessionIndexes {
+ for _, fieldName := range smg.cgrCfg.SmGenericConfig.SessionIndexes {
fieldVal, err := utils.ReflectFieldAsString(ev, fieldName, "")
if err != nil {
if err == utils.ErrNotFound {
@@ -168,32 +168,32 @@ func (self *SMGeneric) indexSession(uuid string, s *SMGSession) bool {
if fieldVal == "" {
fieldVal = utils.MetaEmpty
}
- if _, hasFieldName := self.sessionIndexes[fieldName]; !hasFieldName { // Init it here so we can minimize
- self.sessionIndexes[fieldName] = make(map[string]utils.StringMap)
+ if _, hasFieldName := smg.sessionIndexes[fieldName]; !hasFieldName { // Init it here so we can minimize
+ smg.sessionIndexes[fieldName] = make(map[string]utils.StringMap)
}
- if _, hasFieldVal := self.sessionIndexes[fieldName][fieldVal]; !hasFieldVal {
- self.sessionIndexes[fieldName][fieldVal] = make(utils.StringMap)
+ if _, hasFieldVal := smg.sessionIndexes[fieldName][fieldVal]; !hasFieldVal {
+ smg.sessionIndexes[fieldName][fieldVal] = make(utils.StringMap)
}
- self.sessionIndexes[fieldName][fieldVal][uuid] = true
+ smg.sessionIndexes[fieldName][fieldVal][uuid] = true
}
return true
}
// unindexSession removes a session from indexes
-func (self *SMGeneric) unindexSession(uuid string) bool {
- self.sessionIndexMux.Lock()
- defer self.sessionIndexMux.Unlock()
+func (smg *SMGeneric) unindexSession(uuid string) bool {
+ smg.sessionIndexMux.Lock()
+ defer smg.sessionIndexMux.Unlock()
var found bool
- for fldName := range self.sessionIndexes {
- for fldVal := range self.sessionIndexes[fldName] {
- if _, hasUUID := self.sessionIndexes[fldName][fldVal][uuid]; hasUUID {
+ for fldName := range smg.sessionIndexes {
+ for fldVal := range smg.sessionIndexes[fldName] {
+ if _, hasUUID := smg.sessionIndexes[fldName][fldVal][uuid]; hasUUID {
found = true
- delete(self.sessionIndexes[fldName][fldVal], uuid)
- if len(self.sessionIndexes[fldName][fldVal]) == 0 {
- delete(self.sessionIndexes[fldName], fldVal)
+ delete(smg.sessionIndexes[fldName][fldVal], uuid)
+ if len(smg.sessionIndexes[fldName][fldVal]) == 0 {
+ delete(smg.sessionIndexes[fldName], fldVal)
}
- if len(self.sessionIndexes[fldName]) == 0 {
- delete(self.sessionIndexes, fldName)
+ if len(smg.sessionIndexes[fldName]) == 0 {
+ delete(smg.sessionIndexes, fldName)
}
}
}
@@ -203,10 +203,10 @@ func (self *SMGeneric) unindexSession(uuid string) bool {
// getSessionIDsMatchingIndexes will check inside indexes if it can find sessionIDs matching all filters
// matchedIndexes returns map[matchedFieldName]possibleMatchedFieldVal so we optimize further to avoid checking them
-func (self *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string) (utils.StringMap, map[string]string) {
- self.sessionIndexMux.RLock()
- defer self.sessionIndexMux.RUnlock()
- sessionIDxes := self.sessionIndexes // Clone here and unlock sooner if getting slow
+func (smg *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string) (utils.StringMap, map[string]string) {
+ smg.sessionIndexMux.RLock()
+ defer smg.sessionIndexMux.RUnlock()
+ sessionIDxes := smg.sessionIndexes // Clone here and unlock sooner if getting slow
matchedIndexes := make(map[string]string)
var matchingSessions utils.StringMap
checkNr := 0
@@ -234,11 +234,11 @@ func (self *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string) (ut
return matchingSessions.Clone(), matchedIndexes
}
-func (self *SMGeneric) getSessionIDsForPrefix(prefix string) []string {
- self.sessionsMux.Lock()
- defer self.sessionsMux.Unlock()
+func (smg *SMGeneric) getSessionIDsForPrefix(prefix string) []string {
+ smg.sessionsMux.Lock()
+ defer smg.sessionsMux.Unlock()
sessionIDs := make([]string, 0)
- for sessionID := range self.sessions {
+ for sessionID := range smg.sessions {
if strings.HasPrefix(sessionID, prefix) {
sessionIDs = append(sessionIDs, sessionID)
}
@@ -247,35 +247,35 @@ func (self *SMGeneric) getSessionIDsForPrefix(prefix string) []string {
}
// Returns sessions/derived for a specific uuid
-func (self *SMGeneric) getSession(uuid string) []*SMGSession {
- self.sessionsMux.RLock()
- defer self.sessionsMux.RUnlock()
- return self.sessions[uuid]
+func (smg *SMGeneric) getSession(uuid string) []*SMGSession {
+ smg.sessionsMux.RLock()
+ defer smg.sessionsMux.RUnlock()
+ return smg.sessions[uuid]
}
// Handle a new session, pass the connectionId so we can communicate on disconnect request
-func (self *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) error {
+func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) error {
sessionId := evStart.GetUUID()
- processed, err := self.guard.Guard(func() (interface{}, error) { // Lock it on UUID level
+ processed, err := smg.guard.Guard(func() (interface{}, error) { // Lock it on UUID level
var sessionRuns []*engine.SessionRun
- if err := self.rater.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(self.cgrCfg, self.timezone), &sessionRuns); err != nil {
+ if err := smg.rater.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil {
return true, err
} else if len(sessionRuns) == 0 {
return true, nil
}
stopDebitChan := make(chan struct{})
for _, sessionRun := range sessionRuns {
- s := &SMGSession{eventStart: evStart, runId: sessionRun.DerivedCharger.RunID, timezone: self.timezone,
- rater: self.rater, cdrsrv: self.cdrsrv, cd: sessionRun.CallDescriptor, clntConn: clntConn}
- self.recordSession(sessionId, s)
+ s := &SMGSession{eventStart: evStart, runId: sessionRun.DerivedCharger.RunID, timezone: smg.timezone,
+ rater: smg.rater, cdrsrv: smg.cdrsrv, cd: sessionRun.CallDescriptor, clntConn: clntConn}
+ smg.recordSession(sessionId, s)
//utils.Logger.Info(fmt.Sprintf(" Starting session: %s, runId: %s", sessionId, s.runId))
- if self.cgrCfg.SmGenericConfig.DebitInterval != 0 {
+ if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 {
s.stopDebit = stopDebitChan
- go s.debitLoop(self.cgrCfg.SmGenericConfig.DebitInterval)
+ go s.debitLoop(smg.cgrCfg.SmGenericConfig.DebitInterval)
}
}
return true, nil
- }, self.cgrCfg.LockingTimeout, sessionId)
+ }, smg.cgrCfg.LockingTimeout, sessionId)
if processed == nil || processed == false {
utils.Logger.Err(" Cannot start session, empty reply")
return utils.ErrServerError
@@ -284,13 +284,13 @@ func (self *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.R
}
// End a session from outside
-func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
- _, err := self.guard.Guard(func() (interface{}, error) { // Lock it on UUID level
- ss := self.getSession(sessionId)
+func (smg *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
+ _, err := smg.guard.Guard(func() (interface{}, error) { // Lock it on UUID level
+ ss := smg.getSession(sessionId)
if len(ss) == 0 { // Not handled by us
return nil, nil
}
- if !self.unrecordSession(sessionId) { // Unreference it early so we avoid concurrency
+ if !smg.unrecordSession(sessionId) { // Unreference it early so we avoid concurrency
return nil, nil // Did not find the session so no need to close it anymore
}
for idx, s := range ss {
@@ -298,7 +298,7 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
if idx == 0 && s.stopDebit != nil {
close(s.stopDebit) // Stop automatic debits
}
- aTime, err := s.eventStart.GetAnswerTime(utils.META_DEFAULT, self.cgrCfg.DefaultTimezone)
+ aTime, err := s.eventStart.GetAnswerTime(utils.META_DEFAULT, smg.cgrCfg.DefaultTimezone)
if err != nil || aTime.IsZero() {
utils.Logger.Err(fmt.Sprintf(" Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v",
sessionId, s.runId, aTime, err))
@@ -317,24 +317,24 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
}
// Used when an update will relocate an initial session (eg multiple data streams)
-func (self *SMGeneric) sessionRelocate(sessionID, initialID string) error {
- _, err := self.guard.Guard(func() (interface{}, error) { // Lock it on initialID level
+func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error {
+ _, err := smg.guard.Guard(func() (interface{}, error) { // Lock it on initialID level
if utils.IsSliceMember([]string{sessionID, initialID}, "") { // Not allowed empty params here
return nil, utils.ErrMandatoryIeMissing
}
- ssNew := self.getSession(sessionID) // Already relocated
+ ssNew := smg.getSession(sessionID) // Already relocated
if len(ssNew) != 0 {
return nil, nil
}
- ss := self.getSession(initialID)
+ ss := smg.getSession(initialID)
if len(ss) == 0 { // No need of relocation
return nil, utils.ErrNotFound
}
for i, s := range ss {
s.eventStart[utils.ACCID] = sessionID // Overwrite initialSessionID with new one
- self.recordSession(sessionID, s)
+ smg.recordSession(sessionID, s)
if i == 0 {
- self.unrecordSession(initialID)
+ smg.unrecordSession(initialID)
}
}
return nil, nil
@@ -344,25 +344,25 @@ func (self *SMGeneric) sessionRelocate(sessionID, initialID string) error {
// Methods to apply on sessions, mostly exported through RPC/Bi-RPC
//Calculates maximum usage allowed for gevent
-func (self *SMGeneric) MaxUsage(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) {
+func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (time.Duration, error) {
gev[utils.EVENT_NAME] = utils.CGR_AUTHORIZATION
- storedCdr := gev.AsStoredCdr(config.CgrConfig(), self.timezone)
+ storedCdr := gev.AsStoredCdr(config.CgrConfig(), smg.timezone)
var maxDur float64
- if err := self.rater.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil {
+ if err := smg.rater.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil {
return time.Duration(0), err
}
return time.Duration(maxDur), nil
}
-func (self *SMGeneric) LCRSuppliers(gev SMGenericEvent, clnt *rpc2.Client) ([]string, error) {
+func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) ([]string, error) {
gev[utils.EVENT_NAME] = utils.CGR_LCR_REQUEST
- cd, err := gev.AsLcrRequest().AsCallDescriptor(self.timezone)
- cd.CgrID = gev.GetCgrId(self.timezone)
+ cd, err := gev.AsLcrRequest().AsCallDescriptor(smg.timezone)
+ cd.CgrID = gev.GetCgrId(smg.timezone)
if err != nil {
return nil, err
}
var lcr engine.LCRCost
- if err = self.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
+ if err = smg.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
return nil, err
}
if lcr.HasErrors() {
@@ -373,50 +373,50 @@ func (self *SMGeneric) LCRSuppliers(gev SMGenericEvent, clnt *rpc2.Client) ([]st
}
// Called on session start
-func (self *SMGeneric) InitiateSession(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) {
- if err := self.sessionStart(gev, clnt); err != nil {
- self.sessionEnd(gev.GetUUID(), 0)
+func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (time.Duration, error) {
+ if err := smg.sessionStart(gev, clnt); err != nil {
+ smg.sessionEnd(gev.GetUUID(), 0)
return nilDuration, err
}
- if self.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Session handled by debit loop
+ if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Session handled by debit loop
return -1, nil
}
- d, err := self.UpdateSession(gev, clnt)
+ d, err := smg.UpdateSession(gev, clnt)
if err != nil || d == 0 {
- self.sessionEnd(gev.GetUUID(), 0)
+ smg.sessionEnd(gev.GetUUID(), 0)
}
return d, err
}
// Execute debits for usage/maxUsage
-func (self *SMGeneric) UpdateSession(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) {
- if self.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active
+func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (time.Duration, error) {
+ if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active
return 0, errors.New("ACTIVE_DEBIT_LOOP")
}
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
- err := self.sessionRelocate(gev.GetUUID(), initialID)
+ err := smg.sessionRelocate(gev.GetUUID(), initialID)
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
- err = self.sessionStart(gev, clnt)
+ err = smg.sessionStart(gev, clnt)
}
if err != nil {
return nilDuration, err
}
}
- self.resetTerminatorTimer(gev.GetUUID(), gev.GetSessionTTL(), gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage())
+ smg.resetTerminatorTimer(gev.GetUUID(), gev.GetSessionTTL(), gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage())
var lastUsed *time.Duration
if evLastUsed, err := gev.GetLastUsed(utils.META_DEFAULT); err == nil {
lastUsed = &evLastUsed
} else if err != utils.ErrNotFound {
return nilDuration, err
}
- evMaxUsage, err := gev.GetMaxUsage(utils.META_DEFAULT, self.cgrCfg.SmGenericConfig.MaxCallDuration)
+ evMaxUsage, err := gev.GetMaxUsage(utils.META_DEFAULT, smg.cgrCfg.SmGenericConfig.MaxCallDuration)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrMandatoryIeMissing
}
return nilDuration, err
}
- aSessions := self.getSession(gev.GetUUID())
+ aSessions := smg.getSession(gev.GetUUID())
if len(aSessions) == 0 {
utils.Logger.Err(fmt.Sprintf(" SessionUpdate with no active sessions for event: <%s>", gev.GetUUID()))
return nilDuration, utils.ErrServerError
@@ -432,11 +432,11 @@ func (self *SMGeneric) UpdateSession(gev SMGenericEvent, clnt *rpc2.Client) (tim
}
// Called on session end, should stop debit loop
-func (self *SMGeneric) TerminateSession(gev SMGenericEvent, clnt *rpc2.Client) error {
+func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) error {
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
- err := self.sessionRelocate(gev.GetUUID(), initialID)
+ err := smg.sessionRelocate(gev.GetUUID(), initialID)
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
- err = self.sessionStart(gev, clnt)
+ err = smg.sessionStart(gev, clnt)
}
if err != nil && err != utils.ErrMandatoryIeMissing {
return err
@@ -444,7 +444,7 @@ func (self *SMGeneric) TerminateSession(gev SMGenericEvent, clnt *rpc2.Client) e
}
sessionIDs := []string{gev.GetUUID()}
if sessionIDPrefix, err := gev.GetFieldAsString(utils.OriginIDPrefix); err == nil { // OriginIDPrefix is present, OriginID will not be anymore considered
- sessionIDs = self.getSessionIDsForPrefix(sessionIDPrefix)
+ sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix)
}
usage, errUsage := gev.GetUsage(utils.META_DEFAULT)
var lastUsed time.Duration
@@ -465,7 +465,7 @@ func (self *SMGeneric) TerminateSession(gev SMGenericEvent, clnt *rpc2.Client) e
var hasActiveSession bool
for _, sessionID := range sessionIDs {
var s *SMGSession
- for _, s = range self.getSession(sessionID) {
+ for _, s = range smg.getSession(sessionID) {
break
}
if s == nil {
@@ -475,7 +475,7 @@ func (self *SMGeneric) TerminateSession(gev SMGenericEvent, clnt *rpc2.Client) e
if errUsage != nil {
usage = s.TotalUsage() - s.lastUsage + lastUsed
}
- if err := self.sessionEnd(sessionID, usage); err != nil {
+ if err := smg.sessionEnd(sessionID, usage); err != nil {
interimError = err // Last error will be the one returned as API result
}
}
@@ -486,9 +486,9 @@ func (self *SMGeneric) TerminateSession(gev SMGenericEvent, clnt *rpc2.Client) e
}
// Processes one time events (eg: SMS)
-func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDur time.Duration, err error) {
+func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxDur time.Duration, err error) {
var sessionRuns []*engine.SessionRun
- if err := self.rater.Call("Responder.GetSessionRuns", gev.AsStoredCdr(self.cgrCfg, self.timezone), &sessionRuns); err != nil {
+ if err := smg.rater.Call("Responder.GetSessionRuns", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil {
return nilDuration, err
} else if len(sessionRuns) == 0 {
return nilDuration, nil
@@ -496,7 +496,7 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu
var maxDurInit bool // Avoid differences between default 0 and received 0
for _, sR := range sessionRuns {
cc := new(engine.CallCost)
- if err = self.rater.Call("Responder.MaxDebit", sR.CallDescriptor, cc); err != nil {
+ if err = smg.rater.Call("Responder.MaxDebit", sR.CallDescriptor, cc); err != nil {
utils.Logger.Err(fmt.Sprintf(" Could not Debit CD: %+v, RunID: %s, error: %s", sR.CallDescriptor, sR.DerivedCharger.RunID, err.Error()))
break
}
@@ -534,7 +534,7 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu
cd.Increments.Compress()
//utils.Logger.Info(fmt.Sprintf("Refunding session run callcost: %s", utils.ToJSON(cd)))
var response float64
- err := self.rater.Call("Responder.RefundIncrements", cd, &response)
+ err := smg.rater.Call("Responder.RefundIncrements", cd, &response)
if err != nil {
return nilDuration, err
}
@@ -559,20 +559,20 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu
cd := cc.CreateCallDescriptor()
cd.Increments = roundIncrements
var response float64
- if err := self.rater.Call("Responder.RefundRounding", cd, &response); err != nil {
+ if err := smg.rater.Call("Responder.RefundRounding", cd, &response); err != nil {
utils.Logger.Err(fmt.Sprintf(" ERROR failed to refund rounding: %v", err))
}
}
var reply string
smCost := &engine.SMCost{
- CGRID: gev.GetCgrId(self.timezone),
+ CGRID: gev.GetCgrId(smg.timezone),
CostSource: utils.SESSION_MANAGER_SOURCE,
RunID: sR.DerivedCharger.RunID,
OriginHost: gev.GetOriginatorIP(utils.META_DEFAULT),
OriginID: gev.GetUUID(),
CostDetails: cc,
}
- if err := self.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil && !strings.HasSuffix(err.Error(), utils.ErrExists.Error()) {
+ if err := smg.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil && !strings.HasSuffix(err.Error(), utils.ErrExists.Error()) {
withErrors = true
utils.Logger.Err(fmt.Sprintf(" Could not save CC: %+v, RunID: %s error: %s", cc, sR.DerivedCharger.RunID, err.Error()))
}
@@ -583,29 +583,29 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu
return maxDur, nil
}
-func (self *SMGeneric) ProcessCDR(gev SMGenericEvent) error {
+func (smg *SMGeneric) ProcessCDR(gev SMGenericEvent) error {
var reply string
- if err := self.cdrsrv.Call("CdrsV1.ProcessCDR", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil {
+ if err := smg.cdrsrv.Call("CdrsV1.ProcessCDR", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &reply); err != nil {
return err
}
return nil
}
-func (self *SMGeneric) Connect() error {
+func (smg *SMGeneric) Connect() error {
return nil
}
// Used by APIer to retrieve sessions
-func (self *SMGeneric) getSessions() map[string][]*SMGSession {
- self.sessionsMux.RLock()
- defer self.sessionsMux.RUnlock()
- return self.sessions
+func (smg *SMGeneric) getSessions() map[string][]*SMGSession {
+ smg.sessionsMux.RLock()
+ defer smg.sessionsMux.RUnlock()
+ return smg.sessions
}
-func (self *SMGeneric) ActiveSessions(fltrs map[string]string, count bool) (aSessions []*ActiveSession, counter int, err error) {
+func (smg *SMGeneric) ActiveSessions(fltrs map[string]string, count bool) (aSessions []*ActiveSession, counter int, err error) {
aSessions = make([]*ActiveSession, 0) // Make sure we return at least empty list and not nil
// Check first based on indexes so we can downsize the list of matching sessions
- matchingSessionIDs, checkedFilters := self.getSessionIDsMatchingIndexes(fltrs)
+ matchingSessionIDs, checkedFilters := smg.getSessionIDsMatchingIndexes(fltrs)
if len(matchingSessionIDs) == 0 && len(checkedFilters) != 0 {
return
}
@@ -615,7 +615,7 @@ func (self *SMGeneric) ActiveSessions(fltrs map[string]string, count bool) (aSes
}
}
var remainingSessions []*SMGSession // Survived index matching
- for sUUID, sGrp := range self.getSessions() {
+ for sUUID, sGrp := range smg.getSessions() {
if _, hasUUID := matchingSessionIDs[sUUID]; !hasUUID && len(checkedFilters) != 0 {
continue
}
@@ -650,19 +650,135 @@ func (self *SMGeneric) ActiveSessions(fltrs map[string]string, count bool) (aSes
return nil, len(remainingSessions), nil
}
for _, s := range remainingSessions {
- aSessions = append(aSessions, s.AsActiveSession(self.Timezone())) // Expensive for large number of sessions
+ aSessions = append(aSessions, s.AsActiveSession(smg.Timezone())) // Expensive for large number of sessions
}
return
}
-func (self *SMGeneric) Timezone() string {
- return self.timezone
+func (smg *SMGeneric) Timezone() string {
+ return smg.timezone
}
// System shutdown
-func (self *SMGeneric) Shutdown() error {
- for ssId := range self.getSessions() { // Force sessions shutdown
- self.sessionEnd(ssId, time.Duration(self.cgrCfg.MaxCallDuration))
+func (smg *SMGeneric) Shutdown() error {
+ for ssId := range smg.getSessions() { // Force sessions shutdown
+ smg.sessionEnd(ssId, time.Duration(smg.cgrCfg.MaxCallDuration))
+ }
+ return nil
+}
+
+// Part of utils.BiRPCServer to help internal connections do calls over rpcclient.RpcClientConnection interface
+func (smg *SMGeneric) CallBiRPC(clnt rpcclient.RpcClientConnection, serviceMethod string, args interface{}, reply interface{}) error {
+ parts := strings.Split(serviceMethod, ".")
+ if len(parts) != 2 {
+ return rpcclient.ErrUnsupporteServiceMethod
+ }
+ // get method BiRPCV1.Method
+ method := reflect.ValueOf(smg).MethodByName(parts[0][len(parts[0])-7:] + parts[1]) // Inherit the BiRPCV1 in the method
+ if !method.IsValid() {
+ return rpcclient.ErrUnsupporteServiceMethod
+ }
+ // construct the params
+ params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
+ ret := method.Call(params)
+ if len(ret) != 1 {
+ return utils.ErrServerError
+ }
+ if ret[0].Interface() == nil {
+ return nil
+ }
+ err, ok := ret[0].Interface().(error)
+ if !ok {
+ return utils.ErrServerError
+ }
+ return err
+}
+
+func (smg *SMGeneric) BiRPCV1MaxUsage(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) error {
+ maxUsageDur, err := smg.MaxUsage(ev)
+ if err != nil {
+ return utils.NewErrServerError(err)
+ }
+ if maxUsageDur == time.Duration(-1) {
+ *maxUsage = -1.0
+ } else {
+ *maxUsage = maxUsageDur.Seconds()
+ }
+ return nil
+}
+
+/// Returns list of suppliers which can be used for the request
+func (smg *SMGeneric) BiRPCV1LCRSuppliers(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, suppliers *[]string) error {
+ if supls, err := smg.LCRSuppliers(ev); err != nil {
+ return utils.NewErrServerError(err)
+ } else {
+ *suppliers = supls
+ }
+ return nil
+}
+
+// Called on session start, returns the maximum number of seconds the session can last
+func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) error {
+ if minMaxUsage, err := smg.InitiateSession(ev, clnt); err != nil {
+ return utils.NewErrServerError(err)
+ } else {
+ *maxUsage = minMaxUsage.Seconds()
+ }
+ return nil
+}
+
+// Interim updates, returns remaining duration from the rater
+func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) error {
+ if minMaxUsage, err := smg.UpdateSession(ev, clnt); err != nil {
+ return utils.NewErrServerError(err)
+ } else {
+ *maxUsage = minMaxUsage.Seconds()
+ }
+ return nil
+}
+
+// Called on session end, should stop debit loop
+func (smg *SMGeneric) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, reply *string) error {
+ if err := smg.TerminateSession(ev, clnt); err != nil {
+ return utils.NewErrServerError(err)
+ }
+ *reply = utils.OK
+ return nil
+}
+
+// Called on individual Events (eg SMS)
+func (smg *SMGeneric) BiRPCV1ChargeEvent(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) error {
+ if minMaxUsage, err := smg.ChargeEvent(ev); err != nil {
+ return utils.NewErrServerError(err)
+ } else {
+ *maxUsage = minMaxUsage.Seconds()
+ }
+ return nil
+}
+
+// Called on session end, should send the CDR to CDRS
+func (smg *SMGeneric) BiRPCV1ProcessCDR(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, reply *string) error {
+ if err := smg.ProcessCDR(ev); err != nil {
+ return utils.NewErrServerError(err)
+ }
+ *reply = utils.OK
+ return nil
+}
+
+func (smg *SMGeneric) BiRPCV1ActiveSessions(clnt rpcclient.RpcClientConnection, attrs utils.AttrSMGGetActiveSessions, reply *[]*ActiveSession) error {
+ aSessions, _, err := smg.ActiveSessions(attrs.AsMapStringString(), false)
+ if err != nil {
+ return utils.NewErrServerError(err)
+ }
+ *reply = aSessions
+ return nil
+}
+
+func (smg *SMGeneric) BiRPCV1ActiveSessionsCount(attrs utils.AttrSMGGetActiveSessions, reply *int) error {
+ if _, count, err := smg.ActiveSessions(attrs.AsMapStringString(), true); err != nil {
+ return err
+ } else {
+ *reply = count
}
return nil
}
diff --git a/utils/birpcint_client.go b/utils/birpcint_client.go
new file mode 100644
index 000000000..e729114a6
--- /dev/null
+++ b/utils/birpcint_client.go
@@ -0,0 +1,47 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package utils
+
+import (
+ "github.com/cgrates/rpcclient"
+)
+
+// Interface which the server needs to match as BiRPCServer
+type BiRPCServer interface {
+ CallBiRPC(rpcclient.RpcClientConnection, string, interface{}, interface{}) error
+}
+
+func NewBiRPCInternalClient(serverConn BiRPCServer) *BiRPCInternalClient {
+ return &BiRPCInternalClient{serverConn: serverConn}
+}
+
+// Need separate client from the original RpcClientConnection since diretly passing the server is not enough without passing the client's reference
+type BiRPCInternalClient struct {
+ serverConn BiRPCServer
+ clntConn rpcclient.RpcClientConnection // conn to reach client and do calls over it
+}
+
+// Used in case when clientConn is not available at init time (eg: SMGAsterisk who needs the biRPCConn at initialization)
+func (clnt *BiRPCInternalClient) SetClientConn(clntConn rpcclient.RpcClientConnection) {
+ clnt.clntConn = clntConn
+}
+
+// Part of rpcclient.RpcClientConnection interface
+func (clnt *BiRPCInternalClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
+ return clnt.serverConn.CallBiRPC(clnt.clntConn, serviceMethod, args, reply)
+}
diff --git a/utils/server.go b/utils/server.go
index f7f819571..d781d40e3 100644
--- a/utils/server.go
+++ b/utils/server.go
@@ -26,6 +26,7 @@ import (
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
+ "reflect"
"github.com/cenk/rpc2"
"golang.org/x/net/websocket"
@@ -35,7 +36,7 @@ import _ "net/http/pprof"
type Server struct {
rpcEnabled bool
httpEnabled bool
- bijsonSrv *rpc2.Server
+ birpcSrv *rpc2.Server
}
func (s *Server) RpcRegister(rcvr interface{}) {
@@ -54,11 +55,24 @@ func (s *Server) RegisterHttpFunc(pattern string, handler func(http.ResponseWrit
}
// Registers a new BiJsonRpc name
-func (s *Server) BijsonRegisterName(method string, handlerFunc interface{}) {
- if s.bijsonSrv == nil {
- s.bijsonSrv = rpc2.NewServer()
+func (s *Server) BiRPCRegisterName(method string, handlerFunc interface{}) {
+ if s.birpcSrv == nil {
+ s.birpcSrv = rpc2.NewServer()
+ }
+ s.birpcSrv.Handle(method, handlerFunc)
+}
+
+func (s *Server) BiRPCRegister(rcvr interface{}) {
+ if s.birpcSrv == nil {
+ s.birpcSrv = rpc2.NewServer()
+ }
+ rcvType := reflect.TypeOf(rcvr)
+ for i := 0; i < rcvType.NumMethod(); i++ {
+ method := rcvType.Method(i)
+ if method.Name != "Call" {
+ s.birpcSrv.Handle("SMGenericV1."+method.Name, method.Func.Interface())
+ }
}
- s.bijsonSrv.Handle(method, handlerFunc)
}
func (s *Server) ServeJSON(addr string) {
@@ -124,7 +138,7 @@ func (s *Server) ServeHTTP(addr string) {
}
func (s *Server) ServeBiJSON(addr string) {
- if s.bijsonSrv == nil {
+ if s.birpcSrv == nil {
return
}
lBiJSON, e := net.Listen("tcp", addr)
@@ -132,7 +146,7 @@ func (s *Server) ServeBiJSON(addr string) {
log.Fatal("ServeBiJSON listen error:", e)
}
Logger.Info(fmt.Sprintf("Starting CGRateS BiJSON server at %s.", addr))
- s.bijsonSrv.Accept(lBiJSON)
+ s.birpcSrv.Accept(lBiJSON)
}
// rpcRequest represents a RPC request.