Add sync session for kamailio

This commit is contained in:
TeoV
2018-06-20 10:10:09 -04:00
committed by Dan Christian Bogos
parent fd14eb6b93
commit cea2bbc3c3
8 changed files with 131 additions and 36 deletions

View File

@@ -19,11 +19,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package agents
import (
"encoding/json"
"errors"
"fmt"
"log"
"net"
"regexp"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/sessions"
@@ -34,17 +36,19 @@ import (
func NewKamailioAgent(kaCfg *config.KamAgentCfg,
sessionS *utils.BiRPCInternalClient, timezone string) (ka *KamailioAgent) {
ka = &KamailioAgent{cfg: kaCfg, sessionS: sessionS,
timezone: timezone,
conns: make(map[string]*kamevapi.KamEvapi)}
timezone: timezone,
conns: make(map[string]*kamevapi.KamEvapi),
activeSessionIDs: make(chan []*sessions.SessionID)}
ka.sessionS.SetClientConn(ka) // pass the connection to KA back into smg so we can receive the disconnects
return
}
type KamailioAgent struct {
cfg *config.KamAgentCfg
sessionS *utils.BiRPCInternalClient
timezone string
conns map[string]*kamevapi.KamEvapi
cfg *config.KamAgentCfg
sessionS *utils.BiRPCInternalClient
timezone string
conns map[string]*kamevapi.KamEvapi
activeSessionIDs chan []*sessions.SessionID
}
func (self *KamailioAgent) Connect() error {
@@ -55,6 +59,7 @@ func (self *KamailioAgent) Connect() error {
regexp.MustCompile(CGR_CALL_START): []func([]byte, string){
self.onCallStart},
regexp.MustCompile(CGR_CALL_END): []func([]byte, string){self.onCallEnd},
regexp.MustCompile(CGR_DLG_LIST): []func([]byte, string){self.onDlgList},
}
errChan := make(chan error)
for _, connCfg := range self.cfg.EvapiConns {
@@ -109,12 +114,7 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) {
utils.KamailioAgent, kev[utils.OriginID]))
return
}
host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err))
return
}
authArgs.CGREvent.Event[utils.OriginHost] = host
authArgs.CGREvent.Event[utils.OriginHost] = ka.conns[connID].RemoteAddr().String()
var authReply sessions.V1AuthorizeReply
err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply)
if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil {
@@ -149,12 +149,8 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connID string) {
return
}
initSessionArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID so we can properly disconnect later
host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err))
return
}
initSessionArgs.CGREvent.Event[utils.OriginHost] = host
initSessionArgs.CGREvent.Event[utils.OriginHost] = ka.conns[connID].RemoteAddr().String()
var initReply sessions.V1InitSessionReply
if err := ka.sessionS.Call(utils.SessionSv1InitiateSession,
initSessionArgs, &initReply); err != nil {
@@ -190,12 +186,7 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
return
}
var reply string
host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err))
return
}
tsArgs.CGREvent.Event[utils.OriginHost] = host
tsArgs.CGREvent.Event[utils.OriginHost] = ka.conns[connID].RemoteAddr().String()
if err := ka.sessionS.Call(utils.SessionSv1TerminateSession,
tsArgs, &reply); err != nil {
utils.Logger.Err(
@@ -208,12 +199,7 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
if err != nil {
return
}
host, _, err := net.SplitHostPort(ka.conns[connID].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %+v,", utils.KamailioAgent, err))
return
}
cgrEv.Event[utils.OriginHost] = host
cgrEv.Event[utils.OriginHost] = ka.conns[connID].RemoteAddr().String()
if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR, *cgrEv, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("%s> failed processing CGREvent: %s, error: %s",
utils.KamailioAgent, utils.ToJSON(cgrEv), err.Error()))
@@ -221,6 +207,23 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
}
}
func (ka *KamailioAgent) onDlgList(evData []byte, connID string) {
kamDlgRpl, err := NewKamDlgReply(evData)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s",
utils.KamailioAgent, evData, err.Error()))
return
}
var sIDs []*sessions.SessionID
for _, dlgInfo := range kamDlgRpl.Jsonrpl_body.Result {
sIDs = append(sIDs, &sessions.SessionID{
OriginHost: ka.conns[connID].RemoteAddr().String(),
OriginID: dlgInfo.CallId + ";" + dlgInfo.Caller.Tag,
})
}
ka.activeSessionIDs <- sIDs
}
func (self *KamailioAgent) disconnectSession(connID string, dscEv *KamSessionDisconnect) error {
if err := self.conns[connID].Send(dscEv.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %s, error %s",
@@ -244,7 +247,29 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r
return
}
func (fsa *KamailioAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) {
func (ka *KamailioAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) {
for _, evapi := range ka.conns {
errChan := make(chan error)
go func() {
kamEv, _ := json.Marshal(map[string]string{utils.Event: CGR_DLG_LIST})
errChan <- evapi.Send(string(kamEv))
}()
select {
case err = <-errChan:
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending event, error %s",
utils.KamailioAgent, err.Error()))
return
}
case <-time.After(5 * time.Second):
return errors.New("timeout sending dialog list")
}
return utils.ErrNotImplemented
}
select {
case *sessionIDs = <-ka.activeSessionIDs:
case <-time.After(5 * time.Second):
return errors.New("timeout executing dialog list")
}
return
}

View File

@@ -44,6 +44,7 @@ const (
KamCGRSubsystems = "cgr_subsystems"
KamCGRContext = "cgr_context"
EvapiConnID = "EvapiConnID" // used to share connID info in event for remote disconnects
CGR_DLG_LIST = "CGR_DLG_LIST"
)
var (
@@ -325,3 +326,36 @@ func (self *KamAuthReply) String() string {
mrsh, _ := json.Marshal(self)
return string(mrsh)
}
type KamDlgReply struct {
Event string
Jsonrpl_body *kamJsonDlgBody
}
type kamJsonDlgBody struct {
Id int
Jsonrpc string
Result []*kamDlgInfo
}
type kamDlgInfo struct {
CallId string `json:"call-id"`
Caller *kamCallerDlg
}
type kamCallerDlg struct {
Tag string
}
// NewKamDlgReply parses bytes received over the wire from Kamailio into KamDlgReply
func NewKamDlgReply(kamEvData []byte) (rpl KamDlgReply, err error) {
if err = json.Unmarshal(kamEvData, &rpl); err != nil {
return
}
return
}
func (self *KamDlgReply) String() string {
mrsh, _ := json.Marshal(self)
return string(mrsh)
}

View File

@@ -1,6 +1,7 @@
#Tenant,ID,Contexts,FilterIDs,ActivationInterval,FieldName,Initial,Substitute,Append,Weight
cgrates.org,ATTR_1001_SIMPLEAUTH,simpleauth,*string:Account:1001,,Password,*any,CGRateS.org,true,20
cgrates.org,ATTR_1002_SIMPLEAUTH,simpleauth,*string:Account:1002,,Password,*any,CGRateS.org,true,20
cgrates.org,ATTR_1003_SIMPLEAUTH,simpleauth,*string:Account:1003,,Password,*any,CGRateS.org,true,20
cgrates.org,ATTR_1001_SESSIONAUTH,*sessions,*string:Account:1001,,Password,*any,CGRateS.org,true,10
cgrates.org,ATTR_1001_SESSIONAUTH,,,,RequestType,*any,*prepaid,true,
cgrates.org,ATTR_1001_SESSIONAUTH,,,,PaypalAccount,*any,cgrates@paypal.com,true,
1 #Tenant ID Contexts FilterIDs ActivationInterval FieldName Initial Substitute Append Weight
2 cgrates.org ATTR_1001_SIMPLEAUTH simpleauth *string:Account:1001 Password *any CGRateS.org true 20
3 cgrates.org ATTR_1002_SIMPLEAUTH simpleauth *string:Account:1002 Password *any CGRateS.org true 20
4 cgrates.org ATTR_1003_SIMPLEAUTH simpleauth *string:Account:1003 Password *any CGRateS.org true 20
5 cgrates.org ATTR_1001_SESSIONAUTH *sessions *string:Account:1001 Password *any CGRateS.org true 10
6 cgrates.org ATTR_1001_SESSIONAUTH RequestType *any *prepaid true
7 cgrates.org ATTR_1001_SESSIONAUTH PaypalAccount *any cgrates@paypal.com true

View File

@@ -74,7 +74,8 @@
"thresholds_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"debit_interval": "10s",
"debit_interval": "5s",
"channel_sync_interval":"7s",
},

View File

@@ -55,6 +55,16 @@ route[CGR_SESSION_DISCONNECT] {
#$jsonrpl($var(reply));
}
route[CGR_DLG_LIST] {
if $sht(cgrconn=>cgr) == $null {
sl_send_reply("503","Charging controller unreachable");
exit;
}
jsonrpc_exec('{"jsonrpc":"2.0","id":1, "method":"dlg.list","params":[]}');
evapi_relay("{\"event\":\"CGR_DLG_LIST_REPLY\",
\"jsonrpl_body\":$jsonrpl(body)}");
}
# Route to mainly query account password from CGRateS
route[CGRATES_SIMPLEAUTH_REQUEST] {

View File

@@ -15,6 +15,11 @@
debug=2
log_stderror=no
listen=udp:eth0:5060
listen=udp:127.0.0.1:5080
listen=udp:127.0.0.1:5060
listen=udp:eth0:5080
memdbg=5
memlog=5
log_facility=LOG_LOCAL0

View File

@@ -616,19 +616,31 @@ func testCallSyncSessions(t *testing.T) {
t.Errorf("Unsuported format")
}
time.Sleep(2 * time.Second)
// activeSessions shouldn't be active
if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions,
&map[string]string{}, &reply); err.Error() != utils.ErrNotFound.Error() {
t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error())
}
var sourceForCDR string
var numberOfCDR int
switch optConf {
case utils.Freeswitch:
sourceForCDR = utils.MetaSessionS
numberOfCDR = 2
case utils.Kamailio:
sourceForCDR = utils.MetaSessionS + "_" + utils.KamailioAgent
numberOfCDR = 3 // in case of kamailio we get 3 CDRs (1 from first disconnect)
}
// verify cdr
var rplCdrs []*engine.ExternalCDR
req := utils.RPCCDRsFilter{Sources: []string{utils.MetaSessionS},
req := utils.RPCCDRsFilter{Sources: []string{sourceForCDR}, MaxUsage: "20s",
RunIDs: []string{utils.META_DEFAULT}, Accounts: []string{"1001"}}
if err := tutorialCallsRpc.Call("ApierV2.GetCdrs", req, &rplCdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(rplCdrs) != 2 { // cdr from sync session + cdr from before
} else if len(rplCdrs) != numberOfCDR { // cdr from sync session + cdr from before
t.Error("Unexpected number of CDRs returned: ", len(rplCdrs))
} else if time1, err := utils.ParseDurationWithSecs(rplCdrs[0].Usage); err != nil {
t.Error(err)
@@ -638,6 +650,12 @@ func testCallSyncSessions(t *testing.T) {
t.Error(err)
} else if time1 > time.Duration(15*time.Second) {
t.Error("Unexpected time duration : ", time1)
} else if numberOfCDR == 3 {
if time1, err := utils.ParseDurationWithSecs(rplCdrs[2].Usage); err != nil {
t.Error(err)
} else if time1 > time.Duration(15*time.Second) {
t.Error("Unexpected time duration : ", time1)
}
}
}

View File

@@ -553,6 +553,7 @@ const (
MetaTerminate = "*terminate"
MetaEvent = "*event"
MetaDryRun = "*dryrun"
Event = "Event"
)
// Migrator Action