mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Kamailio SM with RL support
This commit is contained in:
@@ -59,16 +59,18 @@ route[CGR_AUTH_REPLY] {
|
||||
json_get_field("$evapi(msg)", "TransactionLabel", "$var(TransactionLabel)");
|
||||
json_get_field("$evapi(msg)", "MaxSessionTime", "$var(MaxSessionTime)");
|
||||
json_get_field("$evapi(msg)", "Suppliers", "$var(Suppliers)");
|
||||
json_get_field("$evapi(msg)", "Error", "$var(Error)");
|
||||
json_get_field("$evapi(msg)", "ResourceAllowed", "$var(CGRResourceAllowed)");
|
||||
json_get_field("$evapi(msg)", "Error", "$var(CgrError)");
|
||||
$var(id_index) = $(var(TransactionIndex){s.int});
|
||||
$var(id_label) = $(var(TransactionLabel){s.int});
|
||||
$var(CgrMaxSessionTime) = $(var(MaxSessionTime){s.int});
|
||||
$var(CgrSuppliers) = $(var(Suppliers){s.rm,"});
|
||||
$var(CGRResourceAllowed) = $(var(CGRResourceAllowed){s.rm,"});
|
||||
$var(CgrError) = $(var(Error){s.rm,"});
|
||||
t_continue("$var(id_index)", "$var(id_label)", "CGRATES_AUTH_REPLY"); # Unpark the transaction
|
||||
}
|
||||
|
||||
# Send AUTH_REQUEST to CGRateS
|
||||
# Send LCR_REQUEST to CGRateS
|
||||
route[CGRATES_LCR_REQUEST] {
|
||||
# Auth INVITEs with CGRateS
|
||||
if $sht(cgrconn=>cgr) == $null {
|
||||
@@ -99,6 +101,33 @@ route[CGR_LCR_REPLY] {
|
||||
t_continue("$var(id_index)", "$var(id_label)", "CGRATES_AUTH_REPLY"); # Unpark the transaction
|
||||
}
|
||||
|
||||
|
||||
# Send ResourceAllocation request to CGRateS
|
||||
route[CGRATES_RL_REQUEST] {
|
||||
if $sht(cgrconn=>cgr) == $null {
|
||||
sl_send_reply("503","Charging controller unreachable");
|
||||
exit;
|
||||
}
|
||||
evapi_async_relay("{\"event\":\"CGR_RL_REQUEST\",
|
||||
\"tr_index\":\"$T(id_index)\",
|
||||
\"tr_label\":\"$T(id_label)\",
|
||||
\"cgr_tenant\":\"$dlg_var(cgrTenant)\",
|
||||
\"cgr_account\":\"$dlg_var(cgrAccount)\",
|
||||
\"cgr_destination\":\"$dlg_var(cgrDestination)\",
|
||||
\"cgr_setuptime\":\"$TS\"}");
|
||||
}
|
||||
|
||||
# Process LCR_REPLY from CGRateS
|
||||
route[CGR_RL_REPLY] {
|
||||
json_get_field("$evapi(msg)", "ResourceAllowed", "$var(CGRResourceAllowed)");
|
||||
json_get_field("$evapi(msg)", "Error", "$var(CgrError)");
|
||||
$var(id_index) = $(var(TransactionIndex){s.int});
|
||||
$var(id_label) = $(var(TransactionLabel){s.int});
|
||||
$var(CGRResourceAllowed) = $(var(CGRResourceAllowed){s.rm,"});
|
||||
$var(CgrError) = $(var(CgrError){s.rm,"});
|
||||
t_continue("$var(id_index)", "$var(id_label)", "CGRATES_RL_REPLY"); # Unpark the transaction
|
||||
}
|
||||
|
||||
# CGRateS request for session disconnect
|
||||
route[CGR_SESSION_DISCONNECT] {
|
||||
json_get_field("$evapi(msg)", "HashEntry", "$var(HashEntry)");
|
||||
|
||||
@@ -207,6 +207,23 @@ route[CGRATES_AUTH_REPLY] {
|
||||
if $var(CgrSuppliers) != "" { # Enforce the supplier variable to the first one received from CGRateS, more for testing purposes
|
||||
$dlg_var(cgrSupplier) = $(var(CgrSuppliers){s.select,0,,});
|
||||
}
|
||||
if $var(CGRResourceAllowed) == "false" {
|
||||
sl_send_reply("403","Resource not allowed");
|
||||
exit;
|
||||
}
|
||||
route(RELAY);
|
||||
}
|
||||
|
||||
route[CGRATES_RL_REPLY] {
|
||||
if $var(CgrError) != "" {
|
||||
xlog("CGR_RL_ERROR: $var(CgrError)");
|
||||
sl_send_reply("503","CGR_ERROR");
|
||||
exit;
|
||||
}
|
||||
if $var(CGRResourceAllowed) == "false" {
|
||||
sl_send_reply("403","Resource not allowed");
|
||||
exit;
|
||||
}
|
||||
route(RELAY);
|
||||
}
|
||||
|
||||
|
||||
@@ -66,6 +66,23 @@ func (self *KamailioSessionManager) getSuppliers(kev KamEvent) (string, error) {
|
||||
return lcr.SuppliersString()
|
||||
}
|
||||
|
||||
func (self *KamailioSessionManager) allocateResources(kev KamEvent) (err error) {
|
||||
if self.rlS == nil {
|
||||
return errors.New("no RLs connection")
|
||||
}
|
||||
var ev map[string]interface{}
|
||||
if ev, err = kev.AsMapStringIface(); err != nil {
|
||||
return
|
||||
}
|
||||
attrRU := utils.AttrRLsResourceUsage{
|
||||
UsageID: kev.GetUUID(),
|
||||
Event: ev,
|
||||
Units: 1, // One channel reserved
|
||||
}
|
||||
var reply string
|
||||
return self.rlS.Call("RLsV1.AllocateResource", attrRU, &reply)
|
||||
}
|
||||
|
||||
func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) {
|
||||
kev, err := NewKamEvent(evData)
|
||||
if err != nil {
|
||||
@@ -101,21 +118,10 @@ func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) {
|
||||
}
|
||||
resourceAllowed := true
|
||||
if self.rlS != nil {
|
||||
var reply string
|
||||
ev, err := kev.AsMapStringIface()
|
||||
if err != nil {
|
||||
if err := self.allocateResources(kev); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> RLs error: %s", err.Error()))
|
||||
resourceAllowed = false
|
||||
}
|
||||
attrRU := utils.AttrRLsResourceUsage{
|
||||
UsageID: kev.GetUUID(),
|
||||
Event: ev,
|
||||
Units: 1, // One channel reserved
|
||||
}
|
||||
if err := self.rlS.Call("RLsV1.AllocateResource", attrRU, &reply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> RLs API error: %s", err.Error()))
|
||||
resourceAllowed = false
|
||||
}
|
||||
}
|
||||
if kar, err := kev.AsKamAuthReply(remainingDuration, supplStr, resourceAllowed, errReply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed building auth reply %s", err.Error()))
|
||||
@@ -134,9 +140,30 @@ func (self *KamailioSessionManager) onCgrLcrReq(evData []byte, connId string) {
|
||||
kamLcrReply, errReply := kev.AsKamAuthReply(0, supplStr, false, err)
|
||||
kamLcrReply.Event = CGR_LCR_REPLY // Hit the CGR_LCR_REPLY event route on Kamailio side
|
||||
if errReply != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed building auth reply %s", errReply.Error()))
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed building LCR reply %s", errReply.Error()))
|
||||
} else if err = self.conns[connId].Send(kamLcrReply.String()); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending lcr reply %s", err.Error()))
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending LCR reply %s", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
// onCgrRLReq is the handler for CGR_RL_REQUEST events coming from Kamailio
|
||||
func (self *KamailioSessionManager) onCgrRLReq(evData []byte, connId string) {
|
||||
kev, err := NewKamEvent(evData)
|
||||
if err != nil {
|
||||
utils.Logger.Info(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", string(evData), err.Error()))
|
||||
return
|
||||
}
|
||||
resourceAllowed := true
|
||||
if err := self.allocateResources(kev); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> RLs error: %s", err.Error()))
|
||||
resourceAllowed = false
|
||||
}
|
||||
kamRLReply, errReply := kev.AsKamAuthReply(0, "", resourceAllowed, err)
|
||||
kamRLReply.Event = CGR_RL_REPLY // Hit the CGR_LCR_REPLY event route on Kamailio side
|
||||
if errReply != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed building RL reply %s", errReply.Error()))
|
||||
} else if err = self.conns[connId].Send(kamRLReply.String()); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending RL reply %s", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,10 +228,11 @@ func (self *KamailioSessionManager) onCallEnd(evData []byte, connId string) {
|
||||
func (self *KamailioSessionManager) Connect() error {
|
||||
var err error
|
||||
eventHandlers := map[*regexp.Regexp][]func([]byte, string){
|
||||
regexp.MustCompile("CGR_AUTH_REQUEST"): []func([]byte, string){self.onCgrAuth},
|
||||
regexp.MustCompile("CGR_LCR_REQUEST"): []func([]byte, string){self.onCgrLcrReq},
|
||||
regexp.MustCompile("CGR_CALL_START"): []func([]byte, string){self.onCallStart},
|
||||
regexp.MustCompile("CGR_CALL_END"): []func([]byte, string){self.onCallEnd},
|
||||
regexp.MustCompile(CGR_AUTH_REQUEST): []func([]byte, string){self.onCgrAuth},
|
||||
regexp.MustCompile(CGR_LCR_REQUEST): []func([]byte, string){self.onCgrLcrReq},
|
||||
regexp.MustCompile(CGR_RL_REQUEST): []func([]byte, string){self.onCgrRLReq},
|
||||
regexp.MustCompile(CGR_CALL_START): []func([]byte, string){self.onCallStart},
|
||||
regexp.MustCompile(CGR_CALL_END): []func([]byte, string){self.onCallEnd},
|
||||
}
|
||||
errChan := make(chan error)
|
||||
for _, connCfg := range self.cfg.EvapiConns {
|
||||
|
||||
@@ -37,6 +37,8 @@ const (
|
||||
CGR_SESSION_DISCONNECT = "CGR_SESSION_DISCONNECT"
|
||||
CGR_CALL_START = "CGR_CALL_START"
|
||||
CGR_CALL_END = "CGR_CALL_END"
|
||||
CGR_RL_REQUEST = "CGR_RL_REQUEST"
|
||||
CGR_RL_REPLY = "CGR_RL_REPLY"
|
||||
CGR_SETUPTIME = "cgr_setuptime"
|
||||
CGR_ANSWERTIME = "cgr_answertime"
|
||||
CGR_STOPTIME = "cgr_stoptime"
|
||||
|
||||
Reference in New Issue
Block a user