From f93c0238aca7c9557451d38f2e5eb012744782b5 Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 18 Jun 2018 09:18:39 -0400 Subject: [PATCH] Add sync session for freeswitch --- agents/asteriskagent.go | 4 ++ agents/fsagent.go | 93 +++++++++++++++++++++------- agents/fsevent.go | 3 +- agents/kamagent.go | 34 ++++++++-- apier/v1/sessions.go | 26 +++++--- apier/v1/sessionsv1_it_test.go | 8 ++- cmd/cgr-engine/cgr-engine.go | 12 +++- config/config_defaults.go | 4 +- config/config_json_test.go | 3 +- config/config_test.go | 12 ++-- config/libconfig_json.go | 3 +- config/smconfig.go | 18 ++++-- engine/fscdr.go | 5 +- general_tests/tutorial_calls_test.go | 1 + sessions/session.go | 4 ++ sessions/sessions.go | 65 +++++++++++++++++-- utils/consts.go | 33 +++++----- 17 files changed, 252 insertions(+), 76 deletions(-) diff --git a/agents/asteriskagent.go b/agents/asteriskagent.go index 1d6dc0c22..0263f92c1 100644 --- a/agents/asteriskagent.go +++ b/agents/asteriskagent.go @@ -303,3 +303,7 @@ func (sma *AsteriskAgent) V1DisconnectSession(args utils.AttrDisconnectSession, func (sma *AsteriskAgent) Call(serviceMethod string, args interface{}, reply interface{}) error { return utils.RPCCall(sma, serviceMethod, args, reply) } + +func (fsa *AsteriskAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) { + return utils.ErrNotImplemented +} diff --git a/agents/fsagent.go b/agents/fsagent.go index 2252c46c2..e47839241 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -21,6 +21,7 @@ package agents import ( "errors" "fmt" + //"net" "time" "github.com/cgrates/cgrates/config" @@ -29,11 +30,16 @@ import ( "github.com/cgrates/fsock" ) +type fsSockWithConfig struct { + fsSock *fsock.FSock + cfg *config.FsConnConfig +} + func NewFSsessions(fsAgentConfig *config.FsAgentConfig, smg *utils.BiRPCInternalClient, timezone string) (fsa *FSsessions) { fsa = &FSsessions{ cfg: fsAgentConfig, - conns: make(map[string]*fsock.FSock), + conns: make(map[string]*fsSockWithConfig), senderPools: make(map[string]*fsock.FSockPool), smg: smg, timezone: timezone, @@ -46,8 +52,8 @@ func NewFSsessions(fsAgentConfig *config.FsAgentConfig, // and the active sessions type FSsessions struct { cfg *config.FsAgentConfig - conns map[string]*fsock.FSock // Keep the list here for connection management purposes - senderPools map[string]*fsock.FSockPool // Keep sender pools here + conns map[string]*fsSockWithConfig // Keep the list here for connection management purposes + senderPools map[string]*fsock.FSockPool // Keep sender pools here smg *utils.BiRPCInternalClient timezone string } @@ -79,7 +85,7 @@ func (sm *FSsessions) createHandlers() map[string][]func(string, string) { func (sm *FSsessions) setMaxCallDuration(uuid, connId string, maxDur time.Duration, destNr string) error { if len(sm.cfg.EmptyBalanceContext) != 0 { - _, err := sm.conns[connId].SendApiCmd( + _, err := sm.conns[connId].fsSock.SendApiCmd( fmt.Sprintf("uuid_setvar %s execute_on_answer sched_transfer +%d %s XML %s\n\n", uuid, int(maxDur.Seconds()), destNr, sm.cfg.EmptyBalanceContext)) if err != nil { @@ -90,7 +96,7 @@ func (sm *FSsessions) setMaxCallDuration(uuid, connId string, } return nil } else if len(sm.cfg.EmptyBalanceAnnFile) != 0 { - if _, err := sm.conns[connId].SendApiCmd( + if _, err := sm.conns[connId].fsSock.SendApiCmd( fmt.Sprintf("sched_broadcast +%d %s playback!manager_request::%s aleg\n\n", int(maxDur.Seconds()), uuid, sm.cfg.EmptyBalanceAnnFile)); err != nil { utils.Logger.Err( @@ -100,7 +106,7 @@ func (sm *FSsessions) setMaxCallDuration(uuid, connId string, } return nil } else { - _, err := sm.conns[connId].SendApiCmd( + _, err := sm.conns[connId].fsSock.SendApiCmd( fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n", uuid, int(maxDur.Seconds()))) if err != nil { @@ -116,7 +122,7 @@ func (sm *FSsessions) setMaxCallDuration(uuid, connId string, // Sends the transfer command to unpark the call to freeswitch func (sm *FSsessions) unparkCall(uuid, connId, call_dest_nb, notify string) (err error) { - _, err = sm.conns[connId].SendApiCmd( + _, err = sm.conns[connId].fsSock.SendApiCmd( fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) if err != nil { utils.Logger.Err( @@ -124,7 +130,7 @@ func (sm *FSsessions) unparkCall(uuid, connId, call_dest_nb, notify string) (err utils.FreeSWITCHAgent, err.Error(), connId)) return } - if _, err = sm.conns[connId].SendApiCmd( + if _, err = sm.conns[connId].fsSock.SendApiCmd( fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil { utils.Logger.Err( fmt.Sprintf("<%s> Could not send unpark api call to freeswitch, error: <%s>, connId: %s", @@ -137,6 +143,7 @@ func (sm *FSsessions) onChannelPark(fsev FSEvent, connId string) { if fsev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Not for us return } + fsev[VarCGROriginHost] = sm.conns[connId].cfg.Alias authArgs := fsev.V1AuthorizeArgs() var authReply sessions.V1AuthorizeReply if err := sm.smg.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply); err != nil { @@ -159,7 +166,7 @@ func (sm *FSsessions) onChannelPark(fsev FSEvent, connId string) { } } if authArgs.AuthorizeResources { - if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n", + if _, err := sm.conns[connId].fsSock.SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n", fsev.GetUUID(), CGRResourceAllocation, *authReply.ResourceAllocation)); err != nil { utils.Logger.Info( fmt.Sprintf("<%s> error %s setting channel variabile: %s", @@ -171,9 +178,10 @@ func (sm *FSsessions) onChannelPark(fsev FSEvent, connId string) { } if authArgs.GetSuppliers { fsArray := SliceAsFsArray(authReply.Suppliers.SuppliersWithParams()) - if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n", + if _, err := sm.conns[connId].fsSock.SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n", fsev.GetUUID(), utils.CGR_SUPPLIERS, fsArray)); err != nil { - utils.Logger.Info(fmt.Sprintf("<%s> error setting suppliers: %s", utils.FreeSWITCHAgent, err.Error())) + utils.Logger.Info(fmt.Sprintf("<%s> error setting suppliers: %s", + utils.FreeSWITCHAgent, err.Error())) sm.unparkCall(fsev.GetUUID(), connId, fsev.GetCallDestNr(utils.META_DEFAULT), err.Error()) return } @@ -184,7 +192,7 @@ func (sm *FSsessions) onChannelPark(fsev FSEvent, connId string) { if _, has := authReply.Attributes.CGREvent.Event[fldName]; !has { continue //maybe removed } - if _, err := sm.conns[connId].SendApiCmd( + if _, err := sm.conns[connId].fsSock.SendApiCmd( fmt.Sprintf("uuid_setvar %s %s %s\n\n", fsev.GetUUID(), fldName, authReply.Attributes.CGREvent.Event[fldName])); err != nil { utils.Logger.Info( @@ -205,6 +213,17 @@ func (sm *FSsessions) onChannelAnswer(fsev FSEvent, connId string) { if fsev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request return } + _, err := sm.conns[connId].fsSock.SendApiCmd( + fmt.Sprintf("uuid_setvar %s %s %s\n\n", fsev.GetUUID(), + utils.CGROriginHost, utils.FirstNonEmpty(sm.conns[connId].cfg.Alias, + sm.conns[connId].cfg.Address))) + if err != nil { + utils.Logger.Err( + fmt.Sprintf("<%s> error %s setting channel variabile: %s", + utils.FreeSWITCHAgent, err.Error(), VarCGROriginHost)) + return + } + fsev[VarCGROriginHost] = sm.conns[connId].cfg.Alias chanUUID := fsev.GetUUID() if missing := fsev.MissingParameter(sm.timezone); missing != "" { sm.disconnectSession(connId, chanUUID, "", @@ -263,11 +282,14 @@ func (sm *FSsessions) Connect() error { } else if !fSock.Connected() { return errors.New("Could not connect to FreeSWITCH") } else { - sm.conns[connId] = fSock + sm.conns[connId] = &fsSockWithConfig{ + fsSock: fSock, + cfg: connCfg, + } } utils.Logger.Info(fmt.Sprintf("<%s> successfully connected to FreeSWITCH at: <%s>", utils.FreeSWITCHAgent, connCfg.Address)) go func() { // Start reading in own goroutine, return on error - if err := sm.conns[connId].ReadEvents(); err != nil { + if err := sm.conns[connId].fsSock.ReadEvents(); err != nil { errChan <- err } }() @@ -287,7 +309,7 @@ func (sm *FSsessions) Connect() error { // fsev.GetCallDestNr(utils.META_DEFAULT) // Disconnects a session by sending hangup command to freeswitch func (sm *FSsessions) disconnectSession(connId, uuid, redirectNr, notify string) error { - if _, err := sm.conns[connId].SendApiCmd( + if _, err := sm.conns[connId].fsSock.SendApiCmd( fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)); err != nil { utils.Logger.Err( fmt.Sprintf("<%s> error: %s when attempting to disconnect channelID: %s over connID: %s", @@ -296,7 +318,7 @@ func (sm *FSsessions) disconnectSession(connId, uuid, redirectNr, notify string) } if notify == utils.ErrInsufficientCredit.Error() { if len(sm.cfg.EmptyBalanceContext) != 0 { - if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s XML %s\n\n", + if _, err := sm.conns[connId].fsSock.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s XML %s\n\n", uuid, redirectNr, sm.cfg.EmptyBalanceContext)); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Could not transfer the call to empty balance context, error: <%s>, connId: %s", utils.FreeSWITCHAgent, err.Error(), connId)) @@ -304,7 +326,7 @@ func (sm *FSsessions) disconnectSession(connId, uuid, redirectNr, notify string) } return nil } else if len(sm.cfg.EmptyBalanceAnnFile) != 0 { - if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", + if _, err := sm.conns[connId].fsSock.SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", uuid, sm.cfg.EmptyBalanceAnnFile)); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", utils.FreeSWITCHAgent, err.Error(), connId)) @@ -313,7 +335,7 @@ func (sm *FSsessions) disconnectSession(connId, uuid, redirectNr, notify string) return nil } } - if err := sm.conns[connId].SendMsgCmd(uuid, + if err := sm.conns[connId].fsSock.SendMsgCmd(uuid, map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil { utils.Logger.Err( fmt.Sprintf("<%s> Could not send disconect msg to freeswitch, error: <%s>, connId: %s", @@ -324,13 +346,13 @@ func (sm *FSsessions) disconnectSession(connId, uuid, redirectNr, notify string) } func (sm *FSsessions) Shutdown() (err error) { - for connId, fSock := range sm.conns { - if !fSock.Connected() { + for connId, fSockWithCfg := range sm.conns { + if !fSockWithCfg.fsSock.Connected() { utils.Logger.Err(fmt.Sprintf("<%s> Cannot shutdown sessions, fsock not connected for connection id: %s", utils.FreeSWITCHAgent, connId)) continue } utils.Logger.Info(fmt.Sprintf("<%s> Shutting down all sessions on connection id: %s", utils.FreeSWITCHAgent, connId)) - if _, err = fSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype *prepaid"); err != nil { + if _, err = fSockWithCfg.fsSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype *prepaid"); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Error on calls shutdown: %s, connection id: %s", utils.FreeSWITCHAgent, err.Error(), connId)) } } @@ -353,3 +375,32 @@ func (fsa *FSsessions) V1DisconnectSession(args utils.AttrDisconnectSession, rep *reply = utils.OK return } + +func (fsa *FSsessions) V1GetActiveSessionIDs(ignParam string, + sessionIDs *[]*sessions.SessionID) (err error) { + var sIDs []*sessions.SessionID + for connId, senderPool := range fsa.senderPools { + fsConn, err := senderPool.PopFSock() + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error on pop FSock: %s, connection id: %s", + utils.FreeSWITCHAgent, err.Error(), connId)) + continue + } + activeChanStr, err := fsConn.SendApiCmd("show channels") + senderPool.PushFSock(fsConn) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error on push FSock: %s, connection id: %s", + utils.FreeSWITCHAgent, err.Error(), connId)) + continue + } + aChans := fsock.MapChanData(activeChanStr) + for _, fsAChan := range aChans { + sIDs = append(sIDs, &sessions.SessionID{ + OriginHost: fsa.conns[connId].cfg.Alias, + OriginID: fsAChan["call_uuid"]}, + ) + } + } + *sessionIDs = sIDs + return +} diff --git a/agents/fsevent.go b/agents/fsevent.go index ebccfeff1..c2265a33e 100644 --- a/agents/fsevent.go +++ b/agents/fsevent.go @@ -72,6 +72,7 @@ const ( FsConnID = "FsConnID" // used to share connID info in event for remote disconnects VarAnswerEpoch = "variable_answer_epoch" VarCGRACD = "variable_" + utils.CGR_ACD + VarCGROriginHost = "variable_" + utils.CGROriginHost ) func NewFSEvent(strEv string) (fsev FSEvent) { @@ -282,7 +283,7 @@ func (fsev FSEvent) GetOriginatorIP(fieldName string) string { if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value return fieldName[len(utils.STATIC_VALUE_PREFIX):] } - return utils.FirstNonEmpty(fsev[fieldName], fsev[FS_IPv4]) + return utils.FirstNonEmpty(fsev[fieldName], fsev[VarCGROriginHost], fsev[FS_IPv4]) } func (fsev FSEvent) GetExtraFields() map[string]string { diff --git a/agents/kamagent.go b/agents/kamagent.go index b29bd5cd2..c3ed3c7b6 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -21,6 +21,7 @@ package agents import ( "fmt" "log" + "net" "regexp" "strings" @@ -108,7 +109,12 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) { utils.KamailioAgent, kev[utils.OriginID])) return } - authArgs.CGREvent.Event[utils.OriginHost] = strings.Split(ka.conns[connID].RemoteAddr().String(), ":")[0] + host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String()) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err)) + return + } + authArgs.CGREvent.Event[utils.OriginHost] = host var authReply sessions.V1AuthorizeReply err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply) if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil { @@ -143,7 +149,12 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connID string) { return } initSessionArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID so we can properly disconnect later - initSessionArgs.CGREvent.Event[utils.OriginHost] = strings.Split(ka.conns[connID].RemoteAddr().String(), ":")[0] + host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String()) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err)) + return + } + initSessionArgs.CGREvent.Event[utils.OriginHost] = host var initReply sessions.V1InitSessionReply if err := ka.sessionS.Call(utils.SessionSv1InitiateSession, initSessionArgs, &initReply); err != nil { @@ -179,7 +190,12 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) { return } var reply string - tsArgs.CGREvent.Event[utils.OriginHost] = strings.Split(ka.conns[connID].RemoteAddr().String(), ":")[0] + host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String()) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err)) + return + } + tsArgs.CGREvent.Event[utils.OriginHost] = host if err := ka.sessionS.Call(utils.SessionSv1TerminateSession, tsArgs, &reply); err != nil { utils.Logger.Err( @@ -192,7 +208,12 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) { if err != nil { return } - cgrEv.Event[utils.OriginHost] = strings.Split(ka.conns[connID].RemoteAddr().String(), ":")[0] + host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String()) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err)) + return + } + cgrEv.Event[utils.OriginHost] = host if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR, *cgrEv, &reply); err != nil { utils.Logger.Err(fmt.Sprintf("%s> failed processing CGREvent: %s, error: %s", utils.KamailioAgent, utils.ToJSON(cgrEv), err.Error())) @@ -222,3 +243,8 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r *reply = utils.OK return } + +func (fsa *KamailioAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) { + + return utils.ErrNotImplemented +} diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index ddb9fc833..b9b8b8920 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -36,16 +36,17 @@ type SessionSv1 struct { // Publishes BiJSONRPC methods exported by SessionSv1 func (ssv1 *SessionSv1) Handlers() map[string]interface{} { return map[string]interface{}{ - utils.SessionSv1AuthorizeEvent: ssv1.BiRpcAuthorizeEvent, - utils.SessionSv1AuthorizeEventWithDigest: ssv1.BiRpcAuthorizeEventWithDigest, - utils.SessionSv1InitiateSession: ssv1.BiRpcInitiateSession, - utils.SessionSv1InitiateSessionWithDigest: ssv1.BiRpcInitiateSessionWithDigest, - utils.SessionSv1UpdateSession: ssv1.BiRpcUpdateSession, - utils.SessionSv1TerminateSession: ssv1.BiRpcTerminateSession, - utils.SessionSv1ProcessCDR: ssv1.BiRpcProcessCDR, - utils.SessionSv1ProcessEvent: ssv1.BiRpcProcessEvent, - utils.SessionSv1GetActiveSessions: ssv1.BiRPCV1GetActiveSessions, - utils.SessionSv1GetPassiveSessions: ssv1.BiRPCV1GetPassiveSessions, + utils.SessionSv1AuthorizeEvent: ssv1.BiRpcAuthorizeEvent, + utils.SessionSv1AuthorizeEventWithDigest: ssv1.BiRpcAuthorizeEventWithDigest, + utils.SessionSv1InitiateSession: ssv1.BiRpcInitiateSession, + utils.SessionSv1InitiateSessionWithDigest: ssv1.BiRpcInitiateSessionWithDigest, + utils.SessionSv1UpdateSession: ssv1.BiRpcUpdateSession, + utils.SessionSv1TerminateSession: ssv1.BiRpcTerminateSession, + utils.SessionSv1ProcessCDR: ssv1.BiRpcProcessCDR, + utils.SessionSv1ProcessEvent: ssv1.BiRpcProcessEvent, + utils.SessionSv1GetActiveSessions: ssv1.BiRPCV1GetActiveSessions, + utils.SessionSv1GetPassiveSessions: ssv1.BiRPCV1GetPassiveSessions, + utils.SessionSv1RegisterInternalBiJSONConn: ssv1.BiRPCv1RegisterInternalBiJSONConn, } } @@ -145,6 +146,11 @@ func (ssv1 *SessionSv1) BiRPCV1GetPassiveSessions(clnt *rpc2.Client, args map[st return ssv1.SMG.BiRPCV1GetPassiveSessions(clnt, args, rply) } +func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt *rpc2.Client, args string, + rply *string) error { + return ssv1.SMG.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply) +} + func (ssv1 *SessionSv1) Ping(ign string, reply *string) error { *reply = utils.Pong return nil diff --git a/apier/v1/sessionsv1_it_test.go b/apier/v1/sessionsv1_it_test.go index e280b1819..165d8580a 100644 --- a/apier/v1/sessionsv1_it_test.go +++ b/apier/v1/sessionsv1_it_test.go @@ -50,6 +50,11 @@ func handleDisconnectSession(clnt *rpc2.Client, return nil } +func handleGetSessionIDs(clnt *rpc2.Client, + ignParam string, sessionIDs *[]*sessions.SessionID) error { + return nil +} + func TestSSv1ItInitCfg(t *testing.T) { var err error sSv1CfgPath = path.Join(*dataDir, "conf", "samples", "sessions") @@ -88,7 +93,8 @@ func TestSSv1ItRpcConn(t *testing.T) { t.Fatal(err) } clntHandlers := map[string]interface{}{ - utils.SessionSv1DisconnectSession: handleDisconnectSession, + utils.SessionSv1DisconnectSession: handleDisconnectSession, + utils.SessionSv1GetActiveSessionIDs: handleGetSessionIDs, } if sSv1BiRpc, err = utils.NewBiJSONrpcClient(sSv1Cfg.SessionSCfg().ListenBijson, clntHandlers); err != nil { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index eef9c077d..3835f54bf 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -237,7 +237,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in for method, handler := range ssv1.Handlers() { server.BiRPCRegisterName(method, handler) } - server.ServeBiJSON(cfg.SessionSCfg().ListenBijson, sm.OnConnect, sm.OnDisconnect) + server.ServeBiJSON(cfg.SessionSCfg().ListenBijson, sm.OnBiJSONConnect, sm.OnBiJSONDisconnect) exitChan <- true } } @@ -330,9 +330,14 @@ func startFsAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan c internalSMGChan <- smgRpcConn birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SMGeneric)) sm := agents.NewFSsessions(cfg.FsAgentCfg(), birpcClnt, cfg.DefaultTimezone) + var reply string + if err = birpcClnt.Call(utils.SessionSv1RegisterInternalBiJSONConn, "", &reply); err != nil { // for session sync + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) + } if err = sm.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) } + exitChan <- true } @@ -344,7 +349,10 @@ func startKamAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SMGeneric)) ka := agents.NewKamailioAgent(cfg.KamAgentCfg(), birpcClnt, utils.FirstNonEmpty(cfg.KamAgentCfg().Timezone, cfg.DefaultTimezone)) - + var reply string + if err = birpcClnt.Call(utils.SessionSv1RegisterInternalBiJSONConn, "", &reply); err != nil { // for session sync + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.KamailioAgent, err)) + } if err = ka.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) } diff --git a/config/config_defaults.go b/config/config_defaults.go index 2758fd7db..73ec1b43b 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -312,6 +312,7 @@ const CGRATES_CFG_JSON = ` //"session_ttl_usage": "", // tweak Usage for sessions timing-out, not defined by default "session_indexes": [], // index sessions based on these fields for GetActiveSessions API "client_protocol": 1.0, // version of protocol to use when acting as JSON-PRC client <"0","1.0"> + "channel_sync_interval": "5m", // sync channels regularly }, @@ -339,10 +340,9 @@ const CGRATES_CFG_JSON = ` //"low_balance_ann_file": "", // file to be played when low balance is reached for prepaid calls "empty_balance_context": "", // if defined, prepaid calls will be transferred to this context on empty balance "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined) - "channel_sync_interval": "5m", // sync channels with freeswitch regularly "max_wait_connection": "2s", // maximum duration to wait for a connection to be retrieved from the pool "event_socket_conns":[ // instantiate connections to multiple FreeSWITCH servers - {"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5} + {"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5,"alias":""} ], }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 946c1fb2c..f930f4a52 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -507,6 +507,7 @@ func TestSmgJsonCfg(t *testing.T) { Session_ttl: utils.StringPointer("0s"), Session_indexes: &[]string{}, Client_protocol: utils.Float64Pointer(1.0), + Channel_sync_interval: utils.StringPointer("5m"), } if cfg, err := dfCgrJsonCfg.SessionSJsonCfg(); err != nil { t.Error(err) @@ -527,13 +528,13 @@ func TestFsAgentJsonCfg(t *testing.T) { Extra_fields: &[]string{}, Empty_balance_context: utils.StringPointer(""), Empty_balance_ann_file: utils.StringPointer(""), - Channel_sync_interval: utils.StringPointer("5m"), Max_wait_connection: utils.StringPointer("2s"), Event_socket_conns: &[]*FsConnJsonCfg{ &FsConnJsonCfg{ Address: utils.StringPointer("127.0.0.1:8021"), Password: utils.StringPointer("ClueCon"), Reconnects: utils.IntPointer(5), + Alias: utils.StringPointer(""), }}, } if cfg, err := dfCgrJsonCfg.FreeswitchAgentJsonCfg(); err != nil { diff --git a/config/config_test.go b/config/config_test.go index 0034164c5..c3d2eea7c 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -43,8 +43,8 @@ func TestCgrCfgLoadWithDefaults(t *testing.T) { "freeswitch_agent": { "enabled": true, // starts SessionManager service: "event_socket_conns":[ // instantiate connections to multiple FreeSWITCH servers - {"address": "1.2.3.4:8021", "password": "ClueCon", "reconnects": 3}, - {"address": "1.2.3.5:8021", "password": "ClueCon", "reconnects": 5} + {"address": "1.2.3.4:8021", "password": "ClueCon", "reconnects": 3, "alias":""}, + {"address": "1.2.3.5:8021", "password": "ClueCon", "reconnects": 5, "alias":""} ], }, @@ -55,8 +55,8 @@ func TestCgrCfgLoadWithDefaults(t *testing.T) { } eCgrCfg.fsAgentCfg.Enabled = true eCgrCfg.fsAgentCfg.EventSocketConns = []*FsConnConfig{ - &FsConnConfig{Address: "1.2.3.4:8021", Password: "ClueCon", Reconnects: 3}, - &FsConnConfig{Address: "1.2.3.5:8021", Password: "ClueCon", Reconnects: 5}, + &FsConnConfig{Address: "1.2.3.4:8021", Password: "ClueCon", Reconnects: 3, Alias: ""}, + &FsConnConfig{Address: "1.2.3.5:8021", Password: "ClueCon", Reconnects: 5, Alias: ""}, } if cgrCfg, err := NewCGRConfigFromJsonStringWithDefaults(JSN_CFG); err != nil { t.Error(err) @@ -613,6 +613,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) { SessionTTL: 0 * time.Second, SessionIndexes: utils.StringMap{}, ClientProtocol: 1.0, + ChannelSyncInterval: 5 * time.Minute, } if !reflect.DeepEqual(eSessionSCfg, cgrCfg.sessionSCfg) { t.Errorf("expecting: %s, received: %s", @@ -710,11 +711,10 @@ func TestCgrCfgJSONDefaultsFsAgentConfig(t *testing.T) { ExtraFields: nil, EmptyBalanceContext: "", EmptyBalanceAnnFile: "", - ChannelSyncInterval: 5 * time.Minute, MaxWaitConnection: 2 * time.Second, EventSocketConns: []*FsConnConfig{ &FsConnConfig{Address: "127.0.0.1:8021", - Password: "ClueCon", Reconnects: 5}}, + Password: "ClueCon", Reconnects: 5, Alias: ""}}, } if !reflect.DeepEqual(cgrCfg.fsAgentCfg, eFsAgentCfg) { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index b80453b0c..42c7db629 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -225,6 +225,7 @@ type SessionSJsonCfg struct { Session_ttl_usage *string Session_indexes *[]string Client_protocol *float64 + Channel_sync_interval *string } // FreeSWITCHAgent config section @@ -238,7 +239,6 @@ type FreeswitchAgentJsonCfg struct { //Low_balance_ann_file *string Empty_balance_context *string Empty_balance_ann_file *string - Channel_sync_interval *string Max_wait_connection *string Event_socket_conns *[]*FsConnJsonCfg } @@ -248,6 +248,7 @@ type FsConnJsonCfg struct { Address *string Password *string Reconnects *int + Alias *string } // Represents one connection instance towards a rater/cdrs server diff --git a/config/smconfig.go b/config/smconfig.go index c24079761..1c9488952 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -70,6 +70,7 @@ type FsConnConfig struct { Address string Password string Reconnects int + Alias string } func (self *FsConnConfig) loadFromJsonCfg(jsnCfg *FsConnJsonCfg) error { @@ -85,6 +86,11 @@ func (self *FsConnConfig) loadFromJsonCfg(jsnCfg *FsConnJsonCfg) error { if jsnCfg.Reconnects != nil { self.Reconnects = *jsnCfg.Reconnects } + self.Alias = self.Address + if jsnCfg.Alias != nil { + self.Alias = *jsnCfg.Alias + } + return nil } @@ -108,6 +114,7 @@ type SessionSCfg struct { SessionTTLUsage *time.Duration SessionIndexes utils.StringMap ClientProtocol float64 + ChannelSyncInterval time.Duration } func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) error { @@ -217,6 +224,11 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) error { if jsnCfg.Client_protocol != nil { self.ClientProtocol = *jsnCfg.Client_protocol } + if jsnCfg.Channel_sync_interval != nil { + if self.ChannelSyncInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Channel_sync_interval); err != nil { + return err + } + } return nil } @@ -230,7 +242,6 @@ type FsAgentConfig struct { //LowBalanceAnnFile string EmptyBalanceContext string EmptyBalanceAnnFile string - ChannelSyncInterval time.Duration MaxWaitConnection time.Duration EventSocketConns []*FsConnConfig } @@ -268,11 +279,6 @@ func (self *FsAgentConfig) loadFromJsonCfg(jsnCfg *FreeswitchAgentJsonCfg) error if jsnCfg.Empty_balance_ann_file != nil { self.EmptyBalanceAnnFile = *jsnCfg.Empty_balance_ann_file } - if jsnCfg.Channel_sync_interval != nil { - if self.ChannelSyncInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Channel_sync_interval); err != nil { - return err - } - } if jsnCfg.Max_wait_connection != nil { if self.MaxWaitConnection, err = utils.ParseDurationWithNanosecs(*jsnCfg.Max_wait_connection); err != nil { return err diff --git a/engine/fscdr.go b/engine/fscdr.go index 81931f163..0cd8f8185 100644 --- a/engine/fscdr.go +++ b/engine/fscdr.go @@ -72,7 +72,8 @@ type FSCdr struct { } func (fsCdr FSCdr) getCGRID() string { - return utils.Sha1(fsCdr.vars[FS_UUID], fsCdr.vars[FsIPv4]) + return utils.Sha1(fsCdr.vars[FS_UUID], + utils.FirstNonEmpty(fsCdr.vars[utils.CGROriginHost], fsCdr.vars[FsIPv4])) } func (fsCdr FSCdr) getExtraFields() map[string]string { @@ -141,7 +142,7 @@ func (fsCdr FSCdr) AsCDR(timezone string) *CDR { storCdr.CGRID = fsCdr.getCGRID() storCdr.ToR = utils.VOICE storCdr.OriginID = fsCdr.vars[FS_UUID] - storCdr.OriginHost = fsCdr.vars[FsIPv4] + storCdr.OriginHost = utils.FirstNonEmpty(fsCdr.vars[utils.CGROriginHost], fsCdr.vars[FsIPv4]) storCdr.Source = FS_CDR_SOURCE storCdr.RequestType = utils.FirstNonEmpty(fsCdr.vars[utils.CGR_REQTYPE], fsCdr.cgrCfg.DefaultReqType) storCdr.Tenant = utils.FirstNonEmpty(fsCdr.vars[utils.CGR_TENANT], fsCdr.cgrCfg.DefaultTenant) diff --git a/general_tests/tutorial_calls_test.go b/general_tests/tutorial_calls_test.go index c129c0b31..9307c311d 100755 --- a/general_tests/tutorial_calls_test.go +++ b/general_tests/tutorial_calls_test.go @@ -74,6 +74,7 @@ var sTestsCalls = []func(t *testing.T){ testCallCheckResourceRelease, testCallCheckThreshold1001After, testCallCheckThreshold1002After, + //de completat testCallStopPjsuaListener, testCallStopCgrEngine, testCallStopFS, diff --git a/sessions/session.go b/sessions/session.go index 6d9dc9f92..374cb3044 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -58,6 +58,10 @@ type SessionID struct { OriginID string } +func (s *SessionID) CGRID() string { + return utils.Sha1(s.OriginID, s.OriginHost) +} + // Called in case of automatic debits func (self *SMGSession) debitLoop(debitInterval time.Duration) { loopIndex := 0 diff --git a/sessions/sessions.go b/sessions/sessions.go index 074b03d62..3290e5eb4 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -123,6 +123,7 @@ type SMGeneric struct { cdrsrv rpcclient.RpcClientConnection // CDR server connections smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data Timezone string + intBiJSONConns []rpcclient.RpcClientConnection biJsonConns map[*rpc2.Client]struct{} // index BiJSONConnection so we can sync them later activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging aSessionsMux sync.RWMutex @@ -1012,6 +1013,15 @@ func (smg *SMGeneric) ProcessCDR(gev SMGenericEvent) (err error) { } func (smg *SMGeneric) Connect() error { + if smg.cgrCfg.SessionSCfg().ChannelSyncInterval != 0 { + go func() { + for { // Schedule sync channels to run repetately + time.Sleep(smg.cgrCfg.SessionSCfg().ChannelSyncInterval) + smg.syncSessions() + } + + }() + } return nil } @@ -1979,16 +1989,63 @@ func (smg *SMGeneric) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, return nil } -func (smg *SMGeneric) OnConnect(c *rpc2.Client) { +func (smg *SMGeneric) OnBiJSONConnect(c *rpc2.Client) { var s struct{} smg.biJsonConns[c] = s } -func (smg *SMGeneric) OnDisconnect(c *rpc2.Client) { +func (smg *SMGeneric) OnBiJSONDisconnect(c *rpc2.Client) { delete(smg.biJsonConns, c) } func (smg *SMGeneric) syncSessions() { - // var toBeRemovedSessions map[string][]*SMGSession - // var realActiveSession map[string]struct{} + var rpcClnts []rpcclient.RpcClientConnection + for _, conn := range smg.intBiJSONConns { + rpcClnts = append(rpcClnts, conn) + } + for conn := range smg.biJsonConns { + rpcClnts = append(rpcClnts, conn) + } + queriedCGRIDs := make(utils.StringMap) + utils.Logger.Info("Enter on sync --------------") + for _, conn := range rpcClnts { + var queriedSessionIDs []*SessionID + if conn != nil { + if err := conn.Call(utils.SessionSv1GetActiveSessionIDs, + "", &queriedSessionIDs); err != nil { + utils.Logger.Warning( + fmt.Sprintf("error quering session ids : %+v", err)) + continue + } + utils.Logger.Info(fmt.Sprintf("queriedSessionIDs : %+v", utils.ToJSON(queriedSessionIDs))) + for _, sessionID := range queriedSessionIDs { + queriedCGRIDs[sessionID.CGRID()] = true + } + } + } + utils.Logger.Info(fmt.Sprintf("queriedCGRIDs : %+v", queriedCGRIDs)) + smg.aSessionsMux.RLock() + utils.Logger.Info(fmt.Sprintf("smg.activeSessions : %+v", smg.activeSessions)) + for cgrid, _ := range smg.activeSessions { + if _, has := queriedCGRIDs[cgrid]; has { + utils.Logger.Info("gaseste CGRID") + continue + } + utils.Logger.Info("nu gaseste CGRID") + // for _, session := range smgSessions { + // tmtr := &smgSessionTerminator{ + // ttlLastUsed: &session.LastUsage, + // ttlUsage: &session.TotalUsage, + // } + // smg.ttlTerminate(session, tmtr) + // } + } + smg.aSessionsMux.RUnlock() +} + +func (smg *SMGeneric) BiRPCv1RegisterInternalBiJSONConn(clnt rpcclient.RpcClientConnection, + ignParam string, reply *string) error { + smg.intBiJSONConns = append(smg.intBiJSONConns, clnt) + *reply = utils.OK + return nil } diff --git a/utils/consts.go b/utils/consts.go index 7dab06a8f..e60bbd99e 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -546,6 +546,7 @@ const ( MetaRequest = "*request" MetaVars = "*vars" MetaReply = "*reply" + CGROriginHost = "cgr_originhost" ) // Migrator Action @@ -706,21 +707,23 @@ const ( // SessionS APIs const ( - SessionSv1AuthorizeEvent = "SessionSv1.AuthorizeEvent" - SessionSv1AuthorizeEventWithDigest = "SessionSv1.AuthorizeEventWithDigest" - SessionSv1InitiateSession = "SessionSv1.InitiateSession" - SessionSv1InitiateSessionWithDigest = "SessionSv1.InitiateSessionWithDigest" - SessionSv1UpdateSession = "SessionSv1.UpdateSession" - SessionSv1TerminateSession = "SessionSv1.TerminateSession" - SessionSv1ProcessCDR = "SessionSv1.ProcessCDR" - SessionSv1ProcessEvent = "SessionSv1.ProcessEvent" - SessionSv1DisconnectSession = "SessionSv1.DisconnectSession" - SessionSv1GetActiveSessions = "SessionSv1.GetActiveSessions" - SessionSv1GetPassiveSessions = "SessionSv1.GetPassiveSessions" - SMGenericV1InitiateSession = "SMGenericV1.InitiateSession" - SMGenericV2InitiateSession = "SMGenericV2.InitiateSession" - SMGenericV2UpdateSession = "SMGenericV2.UpdateSession" - SessionSv1Ping = "SessionSv1.Ping" + SessionSv1AuthorizeEvent = "SessionSv1.AuthorizeEvent" + SessionSv1AuthorizeEventWithDigest = "SessionSv1.AuthorizeEventWithDigest" + SessionSv1InitiateSession = "SessionSv1.InitiateSession" + SessionSv1InitiateSessionWithDigest = "SessionSv1.InitiateSessionWithDigest" + SessionSv1UpdateSession = "SessionSv1.UpdateSession" + SessionSv1TerminateSession = "SessionSv1.TerminateSession" + SessionSv1ProcessCDR = "SessionSv1.ProcessCDR" + SessionSv1ProcessEvent = "SessionSv1.ProcessEvent" + SessionSv1DisconnectSession = "SessionSv1.DisconnectSession" + SessionSv1GetActiveSessions = "SessionSv1.GetActiveSessions" + SessionSv1GetPassiveSessions = "SessionSv1.GetPassiveSessions" + SMGenericV1InitiateSession = "SMGenericV1.InitiateSession" + SMGenericV2InitiateSession = "SMGenericV2.InitiateSession" + SMGenericV2UpdateSession = "SMGenericV2.UpdateSession" + SessionSv1Ping = "SessionSv1.Ping" + SessionSv1GetActiveSessionIDs = "SessionSv1.GetActiveSessionIDs" + SessionSv1RegisterInternalBiJSONConn = "SessionSv1.RegisterInternalBiJSONConn" ) // DispatcherS APIs