From 640336d7b94faae307e53fc509aafa95eb663603 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 6 Nov 2015 19:31:29 +0100 Subject: [PATCH] Skel of the new SM-Generic --- apier/v1/auth.go | 2 +- apier/v1/smgenericv1.go | 61 +++++++++++++++++++++++ sessionmanager/genericsm.go | 92 ++++++++++++++++++++++++++++++++--- sessionmanager/genericvent.go | 30 ++++++++++++ sessionmanager/kamailiosm.go | 1 + utils/server.go | 26 ++++++++-- 6 files changed, 201 insertions(+), 11 deletions(-) create mode 100644 apier/v1/smgenericv1.go create mode 100644 sessionmanager/genericvent.go diff --git a/apier/v1/auth.go b/apier/v1/auth.go index 15e8a4b54..6d0367487 100644 --- a/apier/v1/auth.go +++ b/apier/v1/auth.go @@ -30,7 +30,7 @@ import ( func (self *ApierV1) GetMaxUsage(usageRecord engine.UsageRecord, maxUsage *float64) error { err := engine.LoadUserProfile(&usageRecord, "ExtraFields") if err != nil { - return err + return utils.NewErrServerError(err) } if usageRecord.TOR == "" { usageRecord.TOR = utils.VOICE diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go new file mode 100644 index 000000000..6c25b2426 --- /dev/null +++ b/apier/v1/smgenericv1.go @@ -0,0 +1,61 @@ +package v1 + +import ( + "time" + + "github.com/cgrates/cgrates/sessionmanager" + "github.com/cgrates/cgrates/utils" +) + +// Exports RPC from SMGeneric +type SMGenericV1 struct { + sm *sessionmanager.GenericSessionManager +} + +// Returns MaxUsage (for calls in seconds), -1 for no limit +func (self *SMGenericV1) GetMaxUsage(ev sessionmanager.GenericEvent, maxUsage *float64) error { + maxUsageDur, err := self.sm.GetMaxUsage(ev) + if err != nil { + return utils.NewErrServerError(err) + } + if maxUsageDur == time.Duration(-1) { + *maxUsage = -1.0 + } else { + *maxUsage = maxUsageDur.Seconds() + } + return nil +} + +// Called on session start, returns the maximum number of seconds the session can last +func (self *SMGenericV1) SessionStart(ev sessionmanager.GenericEvent, maxUsage *float64) error { + if err := self.sm.SessionStart(ev); err != nil { + return utils.NewErrServerError(err) + } + return self.GetMaxUsage(ev, maxUsage) +} + +// Interim updates, returns remaining duration from the rater +func (self *SMGenericV1) SessionUpdate(ev sessionmanager.GenericEvent, maxUsage *float64) error { + if err := self.sm.SessionUpdate(ev); err != nil { + return utils.NewErrServerError(err) + } + return self.GetMaxUsage(ev, maxUsage) +} + +// Called on session end, should stop debit loop +func (self *SMGenericV1) SessionEnd(ev sessionmanager.GenericEvent, reply *string) error { + if err := self.sm.SessionEnd(ev); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil +} + +// Called on session end, should send the CDR to CDRS +func (self *SMGenericV1) ProcessCdr(ev sessionmanager.GenericEvent, reply *string) error { + if err := self.sm.ProcessCdr(ev); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil +} diff --git a/sessionmanager/genericsm.go b/sessionmanager/genericsm.go index 469087b5e..643823cce 100644 --- a/sessionmanager/genericsm.go +++ b/sessionmanager/genericsm.go @@ -19,16 +19,94 @@ along with this program. If not, see package sessionmanager import ( + "sync" + "time" + "github.com/cenkalti/rpc2" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" ) -type GenericSessionManager struct { - cfg *config.SmGenericConfig - rater engine.Connector - cdrsrv engine.Connector - timezone string - conns map[string]*rpc2.Client - sessions []*Session +const ( + CGR_CONNUUID = "cgr_connid" +) + +var smgen *GenericSessionManager + +// Attempts to get the connId previously set in the client state container +func getClientConnId(clnt *rpc2.Client) string { + uuid, hasIt := clnt.State.Get(CGR_CONNUUID) + if !hasIt { + return "" + } + return uuid.(string) +} + +func SMGeneric() *GenericSessionManager { + return smgen +} + +func NewGenericSessionManager(cfg *config.SmGenericConfig, rater engine.Connector, cdrsrv engine.Connector, timezone string) (*GenericSessionManager, error) { + gsm := &GenericSessionManager{cfg: cfg, rater: rater, cdrsrv: cdrsrv, timezone: timezone, conns: make(map[string]*rpc2.Client), connMutex: new(sync.Mutex)} + return gsm, nil +} + +type GenericSessionManager struct { + cfg *config.SmGenericConfig + rater engine.Connector + cdrsrv engine.Connector + timezone string + conns map[string]*rpc2.Client + sessions []*Session + connMutex *sync.Mutex +} + +// Index the client connection so we can use it to communicate back +func (self *GenericSessionManager) OnClientConnect(clnt *rpc2.Client) { + self.connMutex.Lock() + defer self.connMutex.Unlock() + if connId := getClientConnId(clnt); connId != "" { + self.conns[connId] = clnt + } +} + +// Unindex the client connection so we can use it to communicate back +func (self *GenericSessionManager) OnClientDisconnect(clnt *rpc2.Client) { + self.connMutex.Lock() + defer self.connMutex.Unlock() + if connId := getClientConnId(clnt); connId != "" { + delete(self.conns, connId) + } +} + +func (self *GenericSessionManager) GetMaxUsage(ev GenericEvent) (time.Duration, error) { + storedCdr, err := ev.AsStoredCdr(self.timezone) + if err != nil { + return time.Duration(0), err + } + var maxDur float64 + if err := self.rater.GetDerivedMaxSessionTime(storedCdr, &maxDur); err != nil { + return time.Duration(0), err + } + return time.Duration(maxDur), nil +} + +// Called on session start +func (self *GenericSessionManager) SessionStart(ev GenericEvent) error { + return nil +} + +// Interim updates +func (self *GenericSessionManager) SessionUpdate(ev GenericEvent) error { + return nil +} + +// Called on session end, should stop debit loop +func (self *GenericSessionManager) SessionEnd(ev GenericEvent) error { + return nil +} + +// Called on session end, should send the CDR to CDRS +func (self *GenericSessionManager) ProcessCdr(ev GenericEvent) error { + return nil } diff --git a/sessionmanager/genericvent.go b/sessionmanager/genericvent.go new file mode 100644 index 000000000..1041cc59d --- /dev/null +++ b/sessionmanager/genericvent.go @@ -0,0 +1,30 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package sessionmanager + +import ( + "github.com/cgrates/cgrates/engine" +) + +type GenericEvent map[string]interface{} + +// Converts the event into StoredCdr instance +func (self GenericEvent) AsStoredCdr(timezone string) (*engine.StoredCdr, error) { + return nil, nil +} diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index b3c1477e3..3c64c77aa 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -216,6 +216,7 @@ func (self *KamailioSessionManager) ProcessCdr(cdr *engine.StoredCdr) error { func (sm *KamailioSessionManager) WarnSessionMinDuration(sessionUuid, connId string) { } + func (self *KamailioSessionManager) Shutdown() error { return nil } diff --git a/utils/server.go b/utils/server.go index fc906e489..e24888b7e 100644 --- a/utils/server.go +++ b/utils/server.go @@ -27,12 +27,14 @@ import ( "net/rpc" "net/rpc/jsonrpc" + "github.com/cenkalti/rpc2" "golang.org/x/net/websocket" ) type Server struct { rpcEnabled bool httpEnabled bool + bijsonSrv *rpc2.Server } func (s *Server) RpcRegister(rcvr interface{}) { @@ -50,13 +52,20 @@ func (s *Server) RegisterHttpFunc(pattern string, handler func(http.ResponseWrit s.httpEnabled = true } +func (s *Server) BijsonRegisterName(method string, handlerFunc interface{}) { + if s.bijsonSrv == nil { + s.bijsonSrv = rpc2.NewServer() + } + s.bijsonSrv.Handle(method, handlerFunc) +} + func (s *Server) ServeJSON(addr string) { if !s.rpcEnabled { return } lJSON, e := net.Listen("tcp", addr) if e != nil { - log.Fatal("listen error:", e) + log.Fatal("ServeJSON listen error:", e) } Logger.Info(fmt.Sprintf("Starting CGRateS JSON server at %s.", addr)) for { @@ -65,7 +74,6 @@ func (s *Server) ServeJSON(addr string) { Logger.Err(fmt.Sprintf(" Accept error: %v", conn)) continue } - //utils.Logger.Info(fmt.Sprintf(" New incoming connection: %v", conn.RemoteAddr())) go jsonrpc.ServeConn(conn) } @@ -78,7 +86,7 @@ func (s *Server) ServeGOB(addr string) { } lGOB, e := net.Listen("tcp", addr) if e != nil { - log.Fatal("listen error:", e) + log.Fatal("ServeGOB listen error:", e) } Logger.Info(fmt.Sprintf("Starting CGRateS GOB server at %s.", addr)) for { @@ -113,6 +121,18 @@ func (s *Server) ServeHTTP(addr string) { http.ListenAndServe(addr, nil) } +func (s *Server) ServeBiJSON(addr string) { + if s.bijsonSrv == nil { + return + } + lBiJSON, e := net.Listen("tcp", addr) + if e != nil { + log.Fatal("ServeBiJSON listen error:", e) + } + Logger.Info(fmt.Sprintf("Starting CGRateS BiJSON server at %s.", addr)) + s.bijsonSrv.Accept(lBiJSON) +} + // rpcRequest represents a RPC request. // rpcRequest implements the io.ReadWriteCloser interface. type rpcRequest struct {