mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Adding ResourceLimiter within SMFreeSWITCH code
This commit is contained in:
@@ -224,7 +224,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclie
|
||||
}
|
||||
}
|
||||
if len(cfg.SmFsConfig.RLsConns) != 0 {
|
||||
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
rlsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.SmFsConfig.RLsConns, rlsChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RLs: %s", err.Error()))
|
||||
|
||||
@@ -45,6 +45,11 @@
|
||||
},
|
||||
|
||||
|
||||
"rls": {
|
||||
"enabled": true, // starts ResourceLimiter service: <true|false>.
|
||||
},
|
||||
|
||||
|
||||
"cdre": {
|
||||
"*default": {
|
||||
"cdr_format": "csv", // exported CDRs format <csv>
|
||||
@@ -111,6 +116,9 @@
|
||||
"sm_freeswitch": {
|
||||
"enabled": true, // starts SessionManager service: <true|false>
|
||||
"debit_interval": "5s", // interval to perform debits on.
|
||||
"rls_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"channel_sync_interval": "10s",
|
||||
"event_socket_conns":[ // instantiate connections to multiple FreeSWITCH servers
|
||||
{"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5}
|
||||
|
||||
@@ -278,6 +278,11 @@ func (rls *ResourceLimiterService) V1CacheResourceLimits(attrs *utils.AttrRLsCac
|
||||
return nil
|
||||
}
|
||||
|
||||
// Alias API for external usage
|
||||
func (rls *ResourceLimiterService) CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error {
|
||||
return rls.V1CacheResourceLimits(attrs, reply)
|
||||
}
|
||||
|
||||
func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error {
|
||||
rls.Lock() // Unknown number of RLs updated
|
||||
defer rls.Unlock()
|
||||
@@ -295,6 +300,11 @@ func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interf
|
||||
return nil
|
||||
}
|
||||
|
||||
// Alias API for external usage
|
||||
func (rls *ResourceLimiterService) ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error {
|
||||
return rls.V1ResourceLimitsForEvent(ev, reply)
|
||||
}
|
||||
|
||||
// Called when a session or another event needs to
|
||||
func (rls *ResourceLimiterService) V1InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
|
||||
rls.Lock() // Unknown number of RLs updated
|
||||
@@ -321,6 +331,11 @@ func (rls *ResourceLimiterService) V1InitiateResourceUsage(attrs utils.AttrRLsRe
|
||||
return nil
|
||||
}
|
||||
|
||||
// Alias for externam methods
|
||||
func (rls *ResourceLimiterService) InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
|
||||
return rls.V1InitiateResourceUsage(attrs, reply)
|
||||
}
|
||||
|
||||
func (rls *ResourceLimiterService) V1TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
|
||||
rls.Lock() // Unknown number of RLs updated
|
||||
defer rls.Unlock()
|
||||
@@ -335,6 +350,11 @@ func (rls *ResourceLimiterService) V1TerminateResourceUsage(attrs utils.AttrRLsR
|
||||
return nil
|
||||
}
|
||||
|
||||
// Alias for external methods
|
||||
func (rls *ResourceLimiterService) TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
|
||||
return rls.V1TerminateResourceUsage(attrs, reply)
|
||||
}
|
||||
|
||||
// Make the service available as RPC internally
|
||||
func (rls *ResourceLimiterService) Call(serviceMethod string, args interface{}, reply interface{}) error {
|
||||
parts := strings.Split(serviceMethod, ".")
|
||||
|
||||
@@ -372,6 +372,12 @@ func (fsev FSEvent) ComputeLcr() bool {
|
||||
}
|
||||
}
|
||||
|
||||
// Used with RLs
|
||||
func (fsev FSEvent) AsMapStringInterface() map[string]interface{} {
|
||||
return utils.ConvertMapValStrIf(fsev)
|
||||
|
||||
}
|
||||
|
||||
// Converts into CallDescriptor due to responder interface needs
|
||||
func (fsev FSEvent) AsCallDescriptor() (*engine.CallDescriptor, error) {
|
||||
lcrReq := &engine.LcrRequest{
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/syslog"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -34,6 +35,9 @@ import (
|
||||
)
|
||||
|
||||
func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs, rls rpcclient.RpcClientConnection, timezone string) *FSSessionManager {
|
||||
if rls != nil && reflect.ValueOf(rls).IsNil() {
|
||||
rls = nil
|
||||
}
|
||||
return &FSSessionManager{
|
||||
cfg: smFsConfig,
|
||||
conns: make(map[string]*fsock.FSock),
|
||||
@@ -161,6 +165,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) {
|
||||
if err = sm.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
|
||||
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_API_ERROR: %s", err.Error()))
|
||||
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
|
||||
return
|
||||
}
|
||||
if lcr.HasErrors() {
|
||||
lcr.LogErrors()
|
||||
@@ -176,9 +181,29 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) {
|
||||
if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n", ev.GetUUID(), utils.CGR_SUPPLIERS, fsArray)); err != nil {
|
||||
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_ERROR: %s", err.Error()))
|
||||
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if sm.rls != nil {
|
||||
var reply string
|
||||
attrRU := utils.AttrRLsResourceUsage{
|
||||
ResourceUsageID: ev.GetUUID(),
|
||||
Event: utils.ConvertMapValStrIf(ev.(FSEvent)),
|
||||
RequestedUnits: 1,
|
||||
}
|
||||
fmt.Printf("InitiateResourceUsage, attrs: %+v\n", attrRU)
|
||||
if err := sm.rls.Call("RLsV1.InitiateResourceUsage", attrRU, &reply); err != nil {
|
||||
if err.Error() == utils.ErrResourceUnavailable.Error() {
|
||||
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), "-"+utils.ErrResourceUnavailable.Error())
|
||||
} else {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> RLs API error: %s", err.Error()))
|
||||
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
// Check ResourceLimits
|
||||
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK)
|
||||
}
|
||||
|
||||
@@ -226,6 +251,18 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) {
|
||||
if sm.cfg.CreateCdr {
|
||||
sm.ProcessCdr(ev.AsStoredCdr(config.CgrConfig().DefaultTimezone))
|
||||
}
|
||||
var reply string
|
||||
attrRU := utils.AttrRLsResourceUsage{
|
||||
ResourceUsageID: ev.GetUUID(),
|
||||
Event: utils.ConvertMapValStrIf(ev.(FSEvent)),
|
||||
RequestedUnits: 1,
|
||||
}
|
||||
if sm.rls != nil {
|
||||
fmt.Printf("Will call the RLs over conn: %+v\n", sm.rls)
|
||||
if err := sm.rls.Call("RLsV1.TerminateResourceUsage", attrRU, &reply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> RLs API error: %s", err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Connects to the freeswitch mod_event_socket server and starts
|
||||
|
||||
Reference in New Issue
Block a user