Add sync session for freeswitch

This commit is contained in:
TeoV
2018-06-18 09:18:39 -04:00
committed by Dan Christian Bogos
parent f825775129
commit f93c0238ac
17 changed files with 252 additions and 76 deletions

View File

@@ -303,3 +303,7 @@ func (sma *AsteriskAgent) V1DisconnectSession(args utils.AttrDisconnectSession,
func (sma *AsteriskAgent) Call(serviceMethod string, args interface{}, reply interface{}) error {
return utils.RPCCall(sma, serviceMethod, args, reply)
}
func (fsa *AsteriskAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) {
return utils.ErrNotImplemented
}

View File

@@ -21,6 +21,7 @@ package agents
import (
"errors"
"fmt"
//"net"
"time"
"github.com/cgrates/cgrates/config"
@@ -29,11 +30,16 @@ import (
"github.com/cgrates/fsock"
)
type fsSockWithConfig struct {
fsSock *fsock.FSock
cfg *config.FsConnConfig
}
func NewFSsessions(fsAgentConfig *config.FsAgentConfig,
smg *utils.BiRPCInternalClient, timezone string) (fsa *FSsessions) {
fsa = &FSsessions{
cfg: fsAgentConfig,
conns: make(map[string]*fsock.FSock),
conns: make(map[string]*fsSockWithConfig),
senderPools: make(map[string]*fsock.FSockPool),
smg: smg,
timezone: timezone,
@@ -46,8 +52,8 @@ func NewFSsessions(fsAgentConfig *config.FsAgentConfig,
// and the active sessions
type FSsessions struct {
cfg *config.FsAgentConfig
conns map[string]*fsock.FSock // Keep the list here for connection management purposes
senderPools map[string]*fsock.FSockPool // Keep sender pools here
conns map[string]*fsSockWithConfig // Keep the list here for connection management purposes
senderPools map[string]*fsock.FSockPool // Keep sender pools here
smg *utils.BiRPCInternalClient
timezone string
}
@@ -79,7 +85,7 @@ func (sm *FSsessions) createHandlers() map[string][]func(string, string) {
func (sm *FSsessions) setMaxCallDuration(uuid, connId string,
maxDur time.Duration, destNr string) error {
if len(sm.cfg.EmptyBalanceContext) != 0 {
_, err := sm.conns[connId].SendApiCmd(
_, err := sm.conns[connId].fsSock.SendApiCmd(
fmt.Sprintf("uuid_setvar %s execute_on_answer sched_transfer +%d %s XML %s\n\n",
uuid, int(maxDur.Seconds()), destNr, sm.cfg.EmptyBalanceContext))
if err != nil {
@@ -90,7 +96,7 @@ func (sm *FSsessions) setMaxCallDuration(uuid, connId string,
}
return nil
} else if len(sm.cfg.EmptyBalanceAnnFile) != 0 {
if _, err := sm.conns[connId].SendApiCmd(
if _, err := sm.conns[connId].fsSock.SendApiCmd(
fmt.Sprintf("sched_broadcast +%d %s playback!manager_request::%s aleg\n\n",
int(maxDur.Seconds()), uuid, sm.cfg.EmptyBalanceAnnFile)); err != nil {
utils.Logger.Err(
@@ -100,7 +106,7 @@ func (sm *FSsessions) setMaxCallDuration(uuid, connId string,
}
return nil
} else {
_, err := sm.conns[connId].SendApiCmd(
_, err := sm.conns[connId].fsSock.SendApiCmd(
fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n",
uuid, int(maxDur.Seconds())))
if err != nil {
@@ -116,7 +122,7 @@ func (sm *FSsessions) setMaxCallDuration(uuid, connId string,
// Sends the transfer command to unpark the call to freeswitch
func (sm *FSsessions) unparkCall(uuid, connId, call_dest_nb, notify string) (err error) {
_, err = sm.conns[connId].SendApiCmd(
_, err = sm.conns[connId].fsSock.SendApiCmd(
fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
if err != nil {
utils.Logger.Err(
@@ -124,7 +130,7 @@ func (sm *FSsessions) unparkCall(uuid, connId, call_dest_nb, notify string) (err
utils.FreeSWITCHAgent, err.Error(), connId))
return
}
if _, err = sm.conns[connId].SendApiCmd(
if _, err = sm.conns[connId].fsSock.SendApiCmd(
fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> Could not send unpark api call to freeswitch, error: <%s>, connId: %s",
@@ -137,6 +143,7 @@ func (sm *FSsessions) onChannelPark(fsev FSEvent, connId string) {
if fsev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Not for us
return
}
fsev[VarCGROriginHost] = sm.conns[connId].cfg.Alias
authArgs := fsev.V1AuthorizeArgs()
var authReply sessions.V1AuthorizeReply
if err := sm.smg.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply); err != nil {
@@ -159,7 +166,7 @@ func (sm *FSsessions) onChannelPark(fsev FSEvent, connId string) {
}
}
if authArgs.AuthorizeResources {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n",
if _, err := sm.conns[connId].fsSock.SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n",
fsev.GetUUID(), CGRResourceAllocation, *authReply.ResourceAllocation)); err != nil {
utils.Logger.Info(
fmt.Sprintf("<%s> error %s setting channel variabile: %s",
@@ -171,9 +178,10 @@ func (sm *FSsessions) onChannelPark(fsev FSEvent, connId string) {
}
if authArgs.GetSuppliers {
fsArray := SliceAsFsArray(authReply.Suppliers.SuppliersWithParams())
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n",
if _, err := sm.conns[connId].fsSock.SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n",
fsev.GetUUID(), utils.CGR_SUPPLIERS, fsArray)); err != nil {
utils.Logger.Info(fmt.Sprintf("<%s> error setting suppliers: %s", utils.FreeSWITCHAgent, err.Error()))
utils.Logger.Info(fmt.Sprintf("<%s> error setting suppliers: %s",
utils.FreeSWITCHAgent, err.Error()))
sm.unparkCall(fsev.GetUUID(), connId, fsev.GetCallDestNr(utils.META_DEFAULT), err.Error())
return
}
@@ -184,7 +192,7 @@ func (sm *FSsessions) onChannelPark(fsev FSEvent, connId string) {
if _, has := authReply.Attributes.CGREvent.Event[fldName]; !has {
continue //maybe removed
}
if _, err := sm.conns[connId].SendApiCmd(
if _, err := sm.conns[connId].fsSock.SendApiCmd(
fmt.Sprintf("uuid_setvar %s %s %s\n\n", fsev.GetUUID(), fldName,
authReply.Attributes.CGREvent.Event[fldName])); err != nil {
utils.Logger.Info(
@@ -205,6 +213,17 @@ func (sm *FSsessions) onChannelAnswer(fsev FSEvent, connId string) {
if fsev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
return
}
_, err := sm.conns[connId].fsSock.SendApiCmd(
fmt.Sprintf("uuid_setvar %s %s %s\n\n", fsev.GetUUID(),
utils.CGROriginHost, utils.FirstNonEmpty(sm.conns[connId].cfg.Alias,
sm.conns[connId].cfg.Address)))
if err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> error %s setting channel variabile: %s",
utils.FreeSWITCHAgent, err.Error(), VarCGROriginHost))
return
}
fsev[VarCGROriginHost] = sm.conns[connId].cfg.Alias
chanUUID := fsev.GetUUID()
if missing := fsev.MissingParameter(sm.timezone); missing != "" {
sm.disconnectSession(connId, chanUUID, "",
@@ -263,11 +282,14 @@ func (sm *FSsessions) Connect() error {
} else if !fSock.Connected() {
return errors.New("Could not connect to FreeSWITCH")
} else {
sm.conns[connId] = fSock
sm.conns[connId] = &fsSockWithConfig{
fsSock: fSock,
cfg: connCfg,
}
}
utils.Logger.Info(fmt.Sprintf("<%s> successfully connected to FreeSWITCH at: <%s>", utils.FreeSWITCHAgent, connCfg.Address))
go func() { // Start reading in own goroutine, return on error
if err := sm.conns[connId].ReadEvents(); err != nil {
if err := sm.conns[connId].fsSock.ReadEvents(); err != nil {
errChan <- err
}
}()
@@ -287,7 +309,7 @@ func (sm *FSsessions) Connect() error {
// fsev.GetCallDestNr(utils.META_DEFAULT)
// Disconnects a session by sending hangup command to freeswitch
func (sm *FSsessions) disconnectSession(connId, uuid, redirectNr, notify string) error {
if _, err := sm.conns[connId].SendApiCmd(
if _, err := sm.conns[connId].fsSock.SendApiCmd(
fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> error: %s when attempting to disconnect channelID: %s over connID: %s",
@@ -296,7 +318,7 @@ func (sm *FSsessions) disconnectSession(connId, uuid, redirectNr, notify string)
}
if notify == utils.ErrInsufficientCredit.Error() {
if len(sm.cfg.EmptyBalanceContext) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s XML %s\n\n",
if _, err := sm.conns[connId].fsSock.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s XML %s\n\n",
uuid, redirectNr, sm.cfg.EmptyBalanceContext)); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Could not transfer the call to empty balance context, error: <%s>, connId: %s",
utils.FreeSWITCHAgent, err.Error(), connId))
@@ -304,7 +326,7 @@ func (sm *FSsessions) disconnectSession(connId, uuid, redirectNr, notify string)
}
return nil
} else if len(sm.cfg.EmptyBalanceAnnFile) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n",
if _, err := sm.conns[connId].fsSock.SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n",
uuid, sm.cfg.EmptyBalanceAnnFile)); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s",
utils.FreeSWITCHAgent, err.Error(), connId))
@@ -313,7 +335,7 @@ func (sm *FSsessions) disconnectSession(connId, uuid, redirectNr, notify string)
return nil
}
}
if err := sm.conns[connId].SendMsgCmd(uuid,
if err := sm.conns[connId].fsSock.SendMsgCmd(uuid,
map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> Could not send disconect msg to freeswitch, error: <%s>, connId: %s",
@@ -324,13 +346,13 @@ func (sm *FSsessions) disconnectSession(connId, uuid, redirectNr, notify string)
}
func (sm *FSsessions) Shutdown() (err error) {
for connId, fSock := range sm.conns {
if !fSock.Connected() {
for connId, fSockWithCfg := range sm.conns {
if !fSockWithCfg.fsSock.Connected() {
utils.Logger.Err(fmt.Sprintf("<%s> Cannot shutdown sessions, fsock not connected for connection id: %s", utils.FreeSWITCHAgent, connId))
continue
}
utils.Logger.Info(fmt.Sprintf("<%s> Shutting down all sessions on connection id: %s", utils.FreeSWITCHAgent, connId))
if _, err = fSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype *prepaid"); err != nil {
if _, err = fSockWithCfg.fsSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype *prepaid"); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error on calls shutdown: %s, connection id: %s", utils.FreeSWITCHAgent, err.Error(), connId))
}
}
@@ -353,3 +375,32 @@ func (fsa *FSsessions) V1DisconnectSession(args utils.AttrDisconnectSession, rep
*reply = utils.OK
return
}
func (fsa *FSsessions) V1GetActiveSessionIDs(ignParam string,
sessionIDs *[]*sessions.SessionID) (err error) {
var sIDs []*sessions.SessionID
for connId, senderPool := range fsa.senderPools {
fsConn, err := senderPool.PopFSock()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error on pop FSock: %s, connection id: %s",
utils.FreeSWITCHAgent, err.Error(), connId))
continue
}
activeChanStr, err := fsConn.SendApiCmd("show channels")
senderPool.PushFSock(fsConn)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error on push FSock: %s, connection id: %s",
utils.FreeSWITCHAgent, err.Error(), connId))
continue
}
aChans := fsock.MapChanData(activeChanStr)
for _, fsAChan := range aChans {
sIDs = append(sIDs, &sessions.SessionID{
OriginHost: fsa.conns[connId].cfg.Alias,
OriginID: fsAChan["call_uuid"]},
)
}
}
*sessionIDs = sIDs
return
}

View File

@@ -72,6 +72,7 @@ const (
FsConnID = "FsConnID" // used to share connID info in event for remote disconnects
VarAnswerEpoch = "variable_answer_epoch"
VarCGRACD = "variable_" + utils.CGR_ACD
VarCGROriginHost = "variable_" + utils.CGROriginHost
)
func NewFSEvent(strEv string) (fsev FSEvent) {
@@ -282,7 +283,7 @@ func (fsev FSEvent) GetOriginatorIP(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(fsev[fieldName], fsev[FS_IPv4])
return utils.FirstNonEmpty(fsev[fieldName], fsev[VarCGROriginHost], fsev[FS_IPv4])
}
func (fsev FSEvent) GetExtraFields() map[string]string {

View File

@@ -21,6 +21,7 @@ package agents
import (
"fmt"
"log"
"net"
"regexp"
"strings"
@@ -108,7 +109,12 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) {
utils.KamailioAgent, kev[utils.OriginID]))
return
}
authArgs.CGREvent.Event[utils.OriginHost] = strings.Split(ka.conns[connID].RemoteAddr().String(), ":")[0]
host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err))
return
}
authArgs.CGREvent.Event[utils.OriginHost] = host
var authReply sessions.V1AuthorizeReply
err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply)
if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil {
@@ -143,7 +149,12 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connID string) {
return
}
initSessionArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID so we can properly disconnect later
initSessionArgs.CGREvent.Event[utils.OriginHost] = strings.Split(ka.conns[connID].RemoteAddr().String(), ":")[0]
host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err))
return
}
initSessionArgs.CGREvent.Event[utils.OriginHost] = host
var initReply sessions.V1InitSessionReply
if err := ka.sessionS.Call(utils.SessionSv1InitiateSession,
initSessionArgs, &initReply); err != nil {
@@ -179,7 +190,12 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
return
}
var reply string
tsArgs.CGREvent.Event[utils.OriginHost] = strings.Split(ka.conns[connID].RemoteAddr().String(), ":")[0]
host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err))
return
}
tsArgs.CGREvent.Event[utils.OriginHost] = host
if err := ka.sessionS.Call(utils.SessionSv1TerminateSession,
tsArgs, &reply); err != nil {
utils.Logger.Err(
@@ -192,7 +208,12 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
if err != nil {
return
}
cgrEv.Event[utils.OriginHost] = strings.Split(ka.conns[connID].RemoteAddr().String(), ":")[0]
host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err))
return
}
cgrEv.Event[utils.OriginHost] = host
if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR, *cgrEv, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("%s> failed processing CGREvent: %s, error: %s",
utils.KamailioAgent, utils.ToJSON(cgrEv), err.Error()))
@@ -222,3 +243,8 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r
*reply = utils.OK
return
}
func (fsa *KamailioAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) {
return utils.ErrNotImplemented
}

View File

@@ -36,16 +36,17 @@ type SessionSv1 struct {
// Publishes BiJSONRPC methods exported by SessionSv1
func (ssv1 *SessionSv1) Handlers() map[string]interface{} {
return map[string]interface{}{
utils.SessionSv1AuthorizeEvent: ssv1.BiRpcAuthorizeEvent,
utils.SessionSv1AuthorizeEventWithDigest: ssv1.BiRpcAuthorizeEventWithDigest,
utils.SessionSv1InitiateSession: ssv1.BiRpcInitiateSession,
utils.SessionSv1InitiateSessionWithDigest: ssv1.BiRpcInitiateSessionWithDigest,
utils.SessionSv1UpdateSession: ssv1.BiRpcUpdateSession,
utils.SessionSv1TerminateSession: ssv1.BiRpcTerminateSession,
utils.SessionSv1ProcessCDR: ssv1.BiRpcProcessCDR,
utils.SessionSv1ProcessEvent: ssv1.BiRpcProcessEvent,
utils.SessionSv1GetActiveSessions: ssv1.BiRPCV1GetActiveSessions,
utils.SessionSv1GetPassiveSessions: ssv1.BiRPCV1GetPassiveSessions,
utils.SessionSv1AuthorizeEvent: ssv1.BiRpcAuthorizeEvent,
utils.SessionSv1AuthorizeEventWithDigest: ssv1.BiRpcAuthorizeEventWithDigest,
utils.SessionSv1InitiateSession: ssv1.BiRpcInitiateSession,
utils.SessionSv1InitiateSessionWithDigest: ssv1.BiRpcInitiateSessionWithDigest,
utils.SessionSv1UpdateSession: ssv1.BiRpcUpdateSession,
utils.SessionSv1TerminateSession: ssv1.BiRpcTerminateSession,
utils.SessionSv1ProcessCDR: ssv1.BiRpcProcessCDR,
utils.SessionSv1ProcessEvent: ssv1.BiRpcProcessEvent,
utils.SessionSv1GetActiveSessions: ssv1.BiRPCV1GetActiveSessions,
utils.SessionSv1GetPassiveSessions: ssv1.BiRPCV1GetPassiveSessions,
utils.SessionSv1RegisterInternalBiJSONConn: ssv1.BiRPCv1RegisterInternalBiJSONConn,
}
}
@@ -145,6 +146,11 @@ func (ssv1 *SessionSv1) BiRPCV1GetPassiveSessions(clnt *rpc2.Client, args map[st
return ssv1.SMG.BiRPCV1GetPassiveSessions(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt *rpc2.Client, args string,
rply *string) error {
return ssv1.SMG.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply)
}
func (ssv1 *SessionSv1) Ping(ign string, reply *string) error {
*reply = utils.Pong
return nil

View File

@@ -50,6 +50,11 @@ func handleDisconnectSession(clnt *rpc2.Client,
return nil
}
func handleGetSessionIDs(clnt *rpc2.Client,
ignParam string, sessionIDs *[]*sessions.SessionID) error {
return nil
}
func TestSSv1ItInitCfg(t *testing.T) {
var err error
sSv1CfgPath = path.Join(*dataDir, "conf", "samples", "sessions")
@@ -88,7 +93,8 @@ func TestSSv1ItRpcConn(t *testing.T) {
t.Fatal(err)
}
clntHandlers := map[string]interface{}{
utils.SessionSv1DisconnectSession: handleDisconnectSession,
utils.SessionSv1DisconnectSession: handleDisconnectSession,
utils.SessionSv1GetActiveSessionIDs: handleGetSessionIDs,
}
if sSv1BiRpc, err = utils.NewBiJSONrpcClient(sSv1Cfg.SessionSCfg().ListenBijson,
clntHandlers); err != nil {

View File

@@ -237,7 +237,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
for method, handler := range ssv1.Handlers() {
server.BiRPCRegisterName(method, handler)
}
server.ServeBiJSON(cfg.SessionSCfg().ListenBijson, sm.OnConnect, sm.OnDisconnect)
server.ServeBiJSON(cfg.SessionSCfg().ListenBijson, sm.OnBiJSONConnect, sm.OnBiJSONDisconnect)
exitChan <- true
}
}
@@ -330,9 +330,14 @@ func startFsAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan c
internalSMGChan <- smgRpcConn
birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SMGeneric))
sm := agents.NewFSsessions(cfg.FsAgentCfg(), birpcClnt, cfg.DefaultTimezone)
var reply string
if err = birpcClnt.Call(utils.SessionSv1RegisterInternalBiJSONConn, "", &reply); err != nil { // for session sync
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err))
}
if err = sm.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err))
}
exitChan <- true
}
@@ -344,7 +349,10 @@ func startKamAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan
birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SMGeneric))
ka := agents.NewKamailioAgent(cfg.KamAgentCfg(),
birpcClnt, utils.FirstNonEmpty(cfg.KamAgentCfg().Timezone, cfg.DefaultTimezone))
var reply string
if err = birpcClnt.Call(utils.SessionSv1RegisterInternalBiJSONConn, "", &reply); err != nil { // for session sync
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.KamailioAgent, err))
}
if err = ka.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
}

View File

@@ -312,6 +312,7 @@ const CGRATES_CFG_JSON = `
//"session_ttl_usage": "", // tweak Usage for sessions timing-out, not defined by default
"session_indexes": [], // index sessions based on these fields for GetActiveSessions API
"client_protocol": 1.0, // version of protocol to use when acting as JSON-PRC client <"0","1.0">
"channel_sync_interval": "5m", // sync channels regularly
},
@@ -339,10 +340,9 @@ const CGRATES_CFG_JSON = `
//"low_balance_ann_file": "", // file to be played when low balance is reached for prepaid calls
"empty_balance_context": "", // if defined, prepaid calls will be transferred to this context on empty balance
"empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined)
"channel_sync_interval": "5m", // sync channels with freeswitch regularly
"max_wait_connection": "2s", // maximum duration to wait for a connection to be retrieved from the pool
"event_socket_conns":[ // instantiate connections to multiple FreeSWITCH servers
{"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5}
{"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5,"alias":""}
],
},

View File

@@ -507,6 +507,7 @@ func TestSmgJsonCfg(t *testing.T) {
Session_ttl: utils.StringPointer("0s"),
Session_indexes: &[]string{},
Client_protocol: utils.Float64Pointer(1.0),
Channel_sync_interval: utils.StringPointer("5m"),
}
if cfg, err := dfCgrJsonCfg.SessionSJsonCfg(); err != nil {
t.Error(err)
@@ -527,13 +528,13 @@ func TestFsAgentJsonCfg(t *testing.T) {
Extra_fields: &[]string{},
Empty_balance_context: utils.StringPointer(""),
Empty_balance_ann_file: utils.StringPointer(""),
Channel_sync_interval: utils.StringPointer("5m"),
Max_wait_connection: utils.StringPointer("2s"),
Event_socket_conns: &[]*FsConnJsonCfg{
&FsConnJsonCfg{
Address: utils.StringPointer("127.0.0.1:8021"),
Password: utils.StringPointer("ClueCon"),
Reconnects: utils.IntPointer(5),
Alias: utils.StringPointer(""),
}},
}
if cfg, err := dfCgrJsonCfg.FreeswitchAgentJsonCfg(); err != nil {

View File

@@ -43,8 +43,8 @@ func TestCgrCfgLoadWithDefaults(t *testing.T) {
"freeswitch_agent": {
"enabled": true, // starts SessionManager service: <true|false>
"event_socket_conns":[ // instantiate connections to multiple FreeSWITCH servers
{"address": "1.2.3.4:8021", "password": "ClueCon", "reconnects": 3},
{"address": "1.2.3.5:8021", "password": "ClueCon", "reconnects": 5}
{"address": "1.2.3.4:8021", "password": "ClueCon", "reconnects": 3, "alias":""},
{"address": "1.2.3.5:8021", "password": "ClueCon", "reconnects": 5, "alias":""}
],
},
@@ -55,8 +55,8 @@ func TestCgrCfgLoadWithDefaults(t *testing.T) {
}
eCgrCfg.fsAgentCfg.Enabled = true
eCgrCfg.fsAgentCfg.EventSocketConns = []*FsConnConfig{
&FsConnConfig{Address: "1.2.3.4:8021", Password: "ClueCon", Reconnects: 3},
&FsConnConfig{Address: "1.2.3.5:8021", Password: "ClueCon", Reconnects: 5},
&FsConnConfig{Address: "1.2.3.4:8021", Password: "ClueCon", Reconnects: 3, Alias: ""},
&FsConnConfig{Address: "1.2.3.5:8021", Password: "ClueCon", Reconnects: 5, Alias: ""},
}
if cgrCfg, err := NewCGRConfigFromJsonStringWithDefaults(JSN_CFG); err != nil {
t.Error(err)
@@ -613,6 +613,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) {
SessionTTL: 0 * time.Second,
SessionIndexes: utils.StringMap{},
ClientProtocol: 1.0,
ChannelSyncInterval: 5 * time.Minute,
}
if !reflect.DeepEqual(eSessionSCfg, cgrCfg.sessionSCfg) {
t.Errorf("expecting: %s, received: %s",
@@ -710,11 +711,10 @@ func TestCgrCfgJSONDefaultsFsAgentConfig(t *testing.T) {
ExtraFields: nil,
EmptyBalanceContext: "",
EmptyBalanceAnnFile: "",
ChannelSyncInterval: 5 * time.Minute,
MaxWaitConnection: 2 * time.Second,
EventSocketConns: []*FsConnConfig{
&FsConnConfig{Address: "127.0.0.1:8021",
Password: "ClueCon", Reconnects: 5}},
Password: "ClueCon", Reconnects: 5, Alias: ""}},
}
if !reflect.DeepEqual(cgrCfg.fsAgentCfg, eFsAgentCfg) {

View File

@@ -225,6 +225,7 @@ type SessionSJsonCfg struct {
Session_ttl_usage *string
Session_indexes *[]string
Client_protocol *float64
Channel_sync_interval *string
}
// FreeSWITCHAgent config section
@@ -238,7 +239,6 @@ type FreeswitchAgentJsonCfg struct {
//Low_balance_ann_file *string
Empty_balance_context *string
Empty_balance_ann_file *string
Channel_sync_interval *string
Max_wait_connection *string
Event_socket_conns *[]*FsConnJsonCfg
}
@@ -248,6 +248,7 @@ type FsConnJsonCfg struct {
Address *string
Password *string
Reconnects *int
Alias *string
}
// Represents one connection instance towards a rater/cdrs server

View File

@@ -70,6 +70,7 @@ type FsConnConfig struct {
Address string
Password string
Reconnects int
Alias string
}
func (self *FsConnConfig) loadFromJsonCfg(jsnCfg *FsConnJsonCfg) error {
@@ -85,6 +86,11 @@ func (self *FsConnConfig) loadFromJsonCfg(jsnCfg *FsConnJsonCfg) error {
if jsnCfg.Reconnects != nil {
self.Reconnects = *jsnCfg.Reconnects
}
self.Alias = self.Address
if jsnCfg.Alias != nil {
self.Alias = *jsnCfg.Alias
}
return nil
}
@@ -108,6 +114,7 @@ type SessionSCfg struct {
SessionTTLUsage *time.Duration
SessionIndexes utils.StringMap
ClientProtocol float64
ChannelSyncInterval time.Duration
}
func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) error {
@@ -217,6 +224,11 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) error {
if jsnCfg.Client_protocol != nil {
self.ClientProtocol = *jsnCfg.Client_protocol
}
if jsnCfg.Channel_sync_interval != nil {
if self.ChannelSyncInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Channel_sync_interval); err != nil {
return err
}
}
return nil
}
@@ -230,7 +242,6 @@ type FsAgentConfig struct {
//LowBalanceAnnFile string
EmptyBalanceContext string
EmptyBalanceAnnFile string
ChannelSyncInterval time.Duration
MaxWaitConnection time.Duration
EventSocketConns []*FsConnConfig
}
@@ -268,11 +279,6 @@ func (self *FsAgentConfig) loadFromJsonCfg(jsnCfg *FreeswitchAgentJsonCfg) error
if jsnCfg.Empty_balance_ann_file != nil {
self.EmptyBalanceAnnFile = *jsnCfg.Empty_balance_ann_file
}
if jsnCfg.Channel_sync_interval != nil {
if self.ChannelSyncInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Channel_sync_interval); err != nil {
return err
}
}
if jsnCfg.Max_wait_connection != nil {
if self.MaxWaitConnection, err = utils.ParseDurationWithNanosecs(*jsnCfg.Max_wait_connection); err != nil {
return err

View File

@@ -72,7 +72,8 @@ type FSCdr struct {
}
func (fsCdr FSCdr) getCGRID() string {
return utils.Sha1(fsCdr.vars[FS_UUID], fsCdr.vars[FsIPv4])
return utils.Sha1(fsCdr.vars[FS_UUID],
utils.FirstNonEmpty(fsCdr.vars[utils.CGROriginHost], fsCdr.vars[FsIPv4]))
}
func (fsCdr FSCdr) getExtraFields() map[string]string {
@@ -141,7 +142,7 @@ func (fsCdr FSCdr) AsCDR(timezone string) *CDR {
storCdr.CGRID = fsCdr.getCGRID()
storCdr.ToR = utils.VOICE
storCdr.OriginID = fsCdr.vars[FS_UUID]
storCdr.OriginHost = fsCdr.vars[FsIPv4]
storCdr.OriginHost = utils.FirstNonEmpty(fsCdr.vars[utils.CGROriginHost], fsCdr.vars[FsIPv4])
storCdr.Source = FS_CDR_SOURCE
storCdr.RequestType = utils.FirstNonEmpty(fsCdr.vars[utils.CGR_REQTYPE], fsCdr.cgrCfg.DefaultReqType)
storCdr.Tenant = utils.FirstNonEmpty(fsCdr.vars[utils.CGR_TENANT], fsCdr.cgrCfg.DefaultTenant)

View File

@@ -74,6 +74,7 @@ var sTestsCalls = []func(t *testing.T){
testCallCheckResourceRelease,
testCallCheckThreshold1001After,
testCallCheckThreshold1002After,
//de completat
testCallStopPjsuaListener,
testCallStopCgrEngine,
testCallStopFS,

View File

@@ -58,6 +58,10 @@ type SessionID struct {
OriginID string
}
func (s *SessionID) CGRID() string {
return utils.Sha1(s.OriginID, s.OriginHost)
}
// Called in case of automatic debits
func (self *SMGSession) debitLoop(debitInterval time.Duration) {
loopIndex := 0

View File

@@ -123,6 +123,7 @@ type SMGeneric struct {
cdrsrv rpcclient.RpcClientConnection // CDR server connections
smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data
Timezone string
intBiJSONConns []rpcclient.RpcClientConnection
biJsonConns map[*rpc2.Client]struct{} // index BiJSONConnection so we can sync them later
activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging
aSessionsMux sync.RWMutex
@@ -1012,6 +1013,15 @@ func (smg *SMGeneric) ProcessCDR(gev SMGenericEvent) (err error) {
}
func (smg *SMGeneric) Connect() error {
if smg.cgrCfg.SessionSCfg().ChannelSyncInterval != 0 {
go func() {
for { // Schedule sync channels to run repetately
time.Sleep(smg.cgrCfg.SessionSCfg().ChannelSyncInterval)
smg.syncSessions()
}
}()
}
return nil
}
@@ -1979,16 +1989,63 @@ func (smg *SMGeneric) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
return nil
}
func (smg *SMGeneric) OnConnect(c *rpc2.Client) {
func (smg *SMGeneric) OnBiJSONConnect(c *rpc2.Client) {
var s struct{}
smg.biJsonConns[c] = s
}
func (smg *SMGeneric) OnDisconnect(c *rpc2.Client) {
func (smg *SMGeneric) OnBiJSONDisconnect(c *rpc2.Client) {
delete(smg.biJsonConns, c)
}
func (smg *SMGeneric) syncSessions() {
// var toBeRemovedSessions map[string][]*SMGSession
// var realActiveSession map[string]struct{}
var rpcClnts []rpcclient.RpcClientConnection
for _, conn := range smg.intBiJSONConns {
rpcClnts = append(rpcClnts, conn)
}
for conn := range smg.biJsonConns {
rpcClnts = append(rpcClnts, conn)
}
queriedCGRIDs := make(utils.StringMap)
utils.Logger.Info("Enter on sync --------------")
for _, conn := range rpcClnts {
var queriedSessionIDs []*SessionID
if conn != nil {
if err := conn.Call(utils.SessionSv1GetActiveSessionIDs,
"", &queriedSessionIDs); err != nil {
utils.Logger.Warning(
fmt.Sprintf("error quering session ids : %+v", err))
continue
}
utils.Logger.Info(fmt.Sprintf("queriedSessionIDs : %+v", utils.ToJSON(queriedSessionIDs)))
for _, sessionID := range queriedSessionIDs {
queriedCGRIDs[sessionID.CGRID()] = true
}
}
}
utils.Logger.Info(fmt.Sprintf("queriedCGRIDs : %+v", queriedCGRIDs))
smg.aSessionsMux.RLock()
utils.Logger.Info(fmt.Sprintf("smg.activeSessions : %+v", smg.activeSessions))
for cgrid, _ := range smg.activeSessions {
if _, has := queriedCGRIDs[cgrid]; has {
utils.Logger.Info("gaseste CGRID")
continue
}
utils.Logger.Info("nu gaseste CGRID")
// for _, session := range smgSessions {
// tmtr := &smgSessionTerminator{
// ttlLastUsed: &session.LastUsage,
// ttlUsage: &session.TotalUsage,
// }
// smg.ttlTerminate(session, tmtr)
// }
}
smg.aSessionsMux.RUnlock()
}
func (smg *SMGeneric) BiRPCv1RegisterInternalBiJSONConn(clnt rpcclient.RpcClientConnection,
ignParam string, reply *string) error {
smg.intBiJSONConns = append(smg.intBiJSONConns, clnt)
*reply = utils.OK
return nil
}

View File

@@ -546,6 +546,7 @@ const (
MetaRequest = "*request"
MetaVars = "*vars"
MetaReply = "*reply"
CGROriginHost = "cgr_originhost"
)
// Migrator Action
@@ -706,21 +707,23 @@ const (
// SessionS APIs
const (
SessionSv1AuthorizeEvent = "SessionSv1.AuthorizeEvent"
SessionSv1AuthorizeEventWithDigest = "SessionSv1.AuthorizeEventWithDigest"
SessionSv1InitiateSession = "SessionSv1.InitiateSession"
SessionSv1InitiateSessionWithDigest = "SessionSv1.InitiateSessionWithDigest"
SessionSv1UpdateSession = "SessionSv1.UpdateSession"
SessionSv1TerminateSession = "SessionSv1.TerminateSession"
SessionSv1ProcessCDR = "SessionSv1.ProcessCDR"
SessionSv1ProcessEvent = "SessionSv1.ProcessEvent"
SessionSv1DisconnectSession = "SessionSv1.DisconnectSession"
SessionSv1GetActiveSessions = "SessionSv1.GetActiveSessions"
SessionSv1GetPassiveSessions = "SessionSv1.GetPassiveSessions"
SMGenericV1InitiateSession = "SMGenericV1.InitiateSession"
SMGenericV2InitiateSession = "SMGenericV2.InitiateSession"
SMGenericV2UpdateSession = "SMGenericV2.UpdateSession"
SessionSv1Ping = "SessionSv1.Ping"
SessionSv1AuthorizeEvent = "SessionSv1.AuthorizeEvent"
SessionSv1AuthorizeEventWithDigest = "SessionSv1.AuthorizeEventWithDigest"
SessionSv1InitiateSession = "SessionSv1.InitiateSession"
SessionSv1InitiateSessionWithDigest = "SessionSv1.InitiateSessionWithDigest"
SessionSv1UpdateSession = "SessionSv1.UpdateSession"
SessionSv1TerminateSession = "SessionSv1.TerminateSession"
SessionSv1ProcessCDR = "SessionSv1.ProcessCDR"
SessionSv1ProcessEvent = "SessionSv1.ProcessEvent"
SessionSv1DisconnectSession = "SessionSv1.DisconnectSession"
SessionSv1GetActiveSessions = "SessionSv1.GetActiveSessions"
SessionSv1GetPassiveSessions = "SessionSv1.GetPassiveSessions"
SMGenericV1InitiateSession = "SMGenericV1.InitiateSession"
SMGenericV2InitiateSession = "SMGenericV2.InitiateSession"
SMGenericV2UpdateSession = "SMGenericV2.UpdateSession"
SessionSv1Ping = "SessionSv1.Ping"
SessionSv1GetActiveSessionIDs = "SessionSv1.GetActiveSessionIDs"
SessionSv1RegisterInternalBiJSONConn = "SessionSv1.RegisterInternalBiJSONConn"
)
// DispatcherS APIs