diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 8392ccd08..0838a4ce3 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -224,7 +224,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclie } } if len(cfg.SmFsConfig.RLsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + rlsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SmFsConfig.RLsConns, rlsChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RLs: %s", err.Error())) diff --git a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json index 06254d64a..65ff28ea3 100644 --- a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -45,6 +45,11 @@ }, +"rls": { + "enabled": true, // starts ResourceLimiter service: . +}, + + "cdre": { "*default": { "cdr_format": "csv", // exported CDRs format @@ -111,6 +116,9 @@ "sm_freeswitch": { "enabled": true, // starts SessionManager service: "debit_interval": "5s", // interval to perform debits on. + "rls_conns": [ + {"address": "*internal"} + ], "channel_sync_interval": "10s", "event_socket_conns":[ // instantiate connections to multiple FreeSWITCH servers {"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5} diff --git a/engine/reslimiter.go b/engine/reslimiter.go index 6e35b71de..2988cfc2a 100644 --- a/engine/reslimiter.go +++ b/engine/reslimiter.go @@ -278,6 +278,11 @@ func (rls *ResourceLimiterService) V1CacheResourceLimits(attrs *utils.AttrRLsCac return nil } +// Alias API for external usage +func (rls *ResourceLimiterService) CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error { + return rls.V1CacheResourceLimits(attrs, reply) +} + func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error { rls.Lock() // Unknown number of RLs updated defer rls.Unlock() @@ -295,6 +300,11 @@ func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interf return nil } +// Alias API for external usage +func (rls *ResourceLimiterService) ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error { + return rls.V1ResourceLimitsForEvent(ev, reply) +} + // Called when a session or another event needs to func (rls *ResourceLimiterService) V1InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error { rls.Lock() // Unknown number of RLs updated @@ -321,6 +331,11 @@ func (rls *ResourceLimiterService) V1InitiateResourceUsage(attrs utils.AttrRLsRe return nil } +// Alias for externam methods +func (rls *ResourceLimiterService) InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error { + return rls.V1InitiateResourceUsage(attrs, reply) +} + func (rls *ResourceLimiterService) V1TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error { rls.Lock() // Unknown number of RLs updated defer rls.Unlock() @@ -335,6 +350,11 @@ func (rls *ResourceLimiterService) V1TerminateResourceUsage(attrs utils.AttrRLsR return nil } +// Alias for external methods +func (rls *ResourceLimiterService) TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error { + return rls.V1TerminateResourceUsage(attrs, reply) +} + // Make the service available as RPC internally func (rls *ResourceLimiterService) Call(serviceMethod string, args interface{}, reply interface{}) error { parts := strings.Split(serviceMethod, ".") diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index c955125e2..daa7cfc8a 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -372,6 +372,12 @@ func (fsev FSEvent) ComputeLcr() bool { } } +// Used with RLs +func (fsev FSEvent) AsMapStringInterface() map[string]interface{} { + return utils.ConvertMapValStrIf(fsev) + +} + // Converts into CallDescriptor due to responder interface needs func (fsev FSEvent) AsCallDescriptor() (*engine.CallDescriptor, error) { lcrReq := &engine.LcrRequest{ diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 13014e1a9..a5c264a59 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "log/syslog" + "reflect" "strconv" "strings" "time" @@ -34,6 +35,9 @@ import ( ) func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs, rls rpcclient.RpcClientConnection, timezone string) *FSSessionManager { + if rls != nil && reflect.ValueOf(rls).IsNil() { + rls = nil + } return &FSSessionManager{ cfg: smFsConfig, conns: make(map[string]*fsock.FSock), @@ -161,6 +165,7 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { if err = sm.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { utils.Logger.Info(fmt.Sprintf(" LCR_API_ERROR: %s", err.Error())) sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) + return } if lcr.HasErrors() { lcr.LogErrors() @@ -176,9 +181,29 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n", ev.GetUUID(), utils.CGR_SUPPLIERS, fsArray)); err != nil { utils.Logger.Info(fmt.Sprintf(" LCR_ERROR: %s", err.Error())) sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) + return } } } + if sm.rls != nil { + var reply string + attrRU := utils.AttrRLsResourceUsage{ + ResourceUsageID: ev.GetUUID(), + Event: utils.ConvertMapValStrIf(ev.(FSEvent)), + RequestedUnits: 1, + } + fmt.Printf("InitiateResourceUsage, attrs: %+v\n", attrRU) + if err := sm.rls.Call("RLsV1.InitiateResourceUsage", attrRU, &reply); err != nil { + if err.Error() == utils.ErrResourceUnavailable.Error() { + sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), "-"+utils.ErrResourceUnavailable.Error()) + } else { + utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error())) + sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) + } + return + } + } + // Check ResourceLimits sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK) } @@ -226,6 +251,18 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) { if sm.cfg.CreateCdr { sm.ProcessCdr(ev.AsStoredCdr(config.CgrConfig().DefaultTimezone)) } + var reply string + attrRU := utils.AttrRLsResourceUsage{ + ResourceUsageID: ev.GetUUID(), + Event: utils.ConvertMapValStrIf(ev.(FSEvent)), + RequestedUnits: 1, + } + if sm.rls != nil { + fmt.Printf("Will call the RLs over conn: %+v\n", sm.rls) + if err := sm.rls.Call("RLsV1.TerminateResourceUsage", attrRU, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error())) + } + } } // Connects to the freeswitch mod_event_socket server and starts