First CUT of SM-FreeSWITCH using SMG instead of own session management, including resources, suppliers and attributes

This commit is contained in:
DanB
2017-12-27 19:34:11 +01:00
parent 8a30d80095
commit 9bb19bea53
23 changed files with 740 additions and 721 deletions

View File

@@ -48,12 +48,12 @@ func (rsv1 *ResourceSv1) AuthorizeResources(args utils.ArgRSv1ResourceUsage, rep
}
// V1InitiateResourceUsage records usage for an event
func (rsv1 *ResourceSv1) AllocateResource(args utils.ArgRSv1ResourceUsage, reply *string) error {
func (rsv1 *ResourceSv1) AllocateResources(args utils.ArgRSv1ResourceUsage, reply *string) error {
return rsv1.rls.V1AllocateResource(args, reply)
}
// V1TerminateResourceUsage releases usage for an event
func (rsv1 *ResourceSv1) ReleaseResource(args utils.ArgRSv1ResourceUsage, reply *string) error {
func (rsv1 *ResourceSv1) ReleaseResources(args utils.ArgRSv1ResourceUsage, reply *string) error {
return rsv1.rls.V1ReleaseResource(args, reply)
}

66
apier/v1/smg.go Normal file
View File

@@ -0,0 +1,66 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
)
func NewSMGv1(sm *sessionmanager.SMGeneric) *SMGv1 {
return &SMGv1{SMG: sm}
}
// Exports RPC from SMGv1
type SMGv1 struct {
SMG *sessionmanager.SMGeneric
}
// Publishes BiJSONRPC methods exported by SMGv1
func (smgv1 *SMGv1) Handlers() map[string]interface{} {
return map[string]interface{}{
"SMGv1.InitiateSession": smgv1.SMG.BiRPCv1InitiateSession,
"SMGv1.UpdateSession": smgv1.SMG.BiRPCv1UpdateSession,
"SMGv1.TerminateSession": smgv1.SMG.BiRPCv1TerminateSession,
"SMGv1.ProcessCDR": smgv1.SMG.BiRPCv1ProcessCDR,
}
}
// Called on session start, returns the maximum number of seconds the session can last
func (smgv1 *SMGv1) InitiateSession(args *sessionmanager.V1InitSessionArgs,
rply *sessionmanager.V1InitSessionReply) error {
return smgv1.SMG.BiRPCv1InitiateSession(nil, args, rply)
}
// Interim updates, returns remaining duration from the rater
func (smgv1 *SMGv1) UpdateSession(args *sessionmanager.V1UpdateSessionArgs,
rply *sessionmanager.V1UpdateSessionReply) error {
return smgv1.SMG.BiRPCv1UpdateSession(nil, args, rply)
}
// Called on session end, should stop debit loop
func (smgv1 *SMGv1) TerminateSession(args *sessionmanager.V1TerminateSessionArgs,
rply *string) error {
return smgv1.SMG.BiRPCv1TerminateSession(nil, args, rply)
}
// Called on session end, should stop debit loop
func (smgv1 *SMGv1) ProcessCDR(cgrEv utils.CGREvent, rply *string) error {
return smgv1.SMG.BiRPCv1ProcessCDR(nil, cgrEv, rply)
}

View File

@@ -130,36 +130,64 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
}
}
func startSmGeneric(internalSMGChan, internalRaterChan,
internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
func startSmGeneric(internalSMGChan, internalRaterChan, internalResourceSChan, internalSupplierSChan,
internalAttrSChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SMGeneric service.")
var err error
var ralsConns, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmGenericConfig.RALsConns) != 0 {
var ralsConns, resSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SMGConfig.RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmGenericConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
cfg.SMGConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to RALs: %s", err.Error()))
exitChan <- true
return
}
}
if len(cfg.SmGenericConfig.CDRsConns) != 0 {
if len(cfg.SMGConfig.ResSConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SMGConfig.ResSConns, internalResourceSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to ResourceS: %s", err.Error()))
exitChan <- true
return
}
}
if len(cfg.SMGConfig.SupplSConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SMGConfig.SupplSConns, internalSupplierSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to SupplierS: %s", err.Error()))
exitChan <- true
return
}
}
if len(cfg.SMGConfig.AttrSConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SMGConfig.AttrSConns, internalAttrSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to AttributeS: %s", err.Error()))
exitChan <- true
return
}
}
if len(cfg.SMGConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmGenericConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
cfg.SMGConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to RALs: %s", err.Error()))
exitChan <- true
return
}
}
smgReplConns, err := sessionmanager.NewSMGReplicationConns(cfg.SmGenericConfig.SMGReplicationConns, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout)
smgReplConns, err := sessionmanager.NewSMGReplicationConns(cfg.SMGConfig.SMGReplicationConns, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to SMGReplicationConnection error: <%s>", err.Error()))
exitChan <- true
return
}
sm := sessionmanager.NewSMGeneric(cfg, ralsConns, cdrsConn, smgReplConns, cfg.DefaultTimezone)
sm := sessionmanager.NewSMGeneric(cfg, ralsConns, resSConns, suplSConns,
attrSConns, cdrsConn, smgReplConns, cfg.DefaultTimezone)
if err = sm.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> error: %s!", err))
}
@@ -169,37 +197,28 @@ func startSmGeneric(internalSMGChan, internalRaterChan,
smgRpc := v1.NewSMGenericV1(sm)
server.RpcRegister(smgRpc)
server.RpcRegister(&v2.SMGenericV2{*smgRpc})
smgv1 := v1.NewSMGv1(sm) // methods with multiple options
server.RpcRegister(smgv1)
// Register BiRpc handlers
if cfg.SmGenericConfig.ListenBijson != "" {
if cfg.SMGConfig.ListenBijson != "" {
smgBiRpc := v1.NewSMGenericBiRpcV1(sm)
for method, handler := range smgBiRpc.Handlers() {
server.BiRPCRegisterName(method, handler)
}
server.ServeBiJSON(cfg.SmGenericConfig.ListenBijson)
for method, handler := range smgv1.Handlers() {
server.BiRPCRegisterName(method, handler)
}
server.ServeBiJSON(cfg.SMGConfig.ListenBijson)
exitChan <- true
}
}
func startSMAsterisk(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SMAsterisk service.")
/*
var smgConn *rpcclient.RpcClientPool
if len(cfg.SMAsteriskCfg().SMGConns) != 0 {
smgConn, err = engine.NewRPCPool(rpcclient.POOL_BROADCAST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SMAsteriskCfg().SMGConns, internalSMGChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMAsterisk> Could not connect to SMG: %s", err.Error()))
exitChan <- true
return
}
}
*/
utils.Logger.Info("Starting CGRateS SM-Asterisk service.")
smgRpcConn := <-internalSMGChan
internalSMGChan <- smgRpcConn
birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessionmanager.SMGeneric))
for connIdx := range cfg.SMAsteriskCfg().AsteriskConns { // Instantiate connections towards asterisk servers
smgRpcConn := <-internalSMGChan
internalSMGChan <- smgRpcConn
sma, err := sessionmanager.NewSMAsterisk(cfg, connIdx, birpcClnt)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> error: %s!", err))
@@ -217,9 +236,9 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC
var err error
utils.Logger.Info("Starting CGRateS DiameterAgent service")
var smgConn, pubsubConn *rpcclient.RpcClientPool
if len(cfg.DiameterAgentCfg().SMGenericConns) != 0 {
if len(cfg.DiameterAgentCfg().SMGConns) != 0 {
smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan, cfg.InternalTtl)
cfg.DiameterAgentCfg().SMGConns, internalSMGChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to SMG: %s", err.Error()))
exitChan <- true
@@ -251,10 +270,10 @@ func startRadiusAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitCh
var err error
utils.Logger.Info("Starting CGRateS RadiusAgent service")
var smgConn *rpcclient.RpcClientPool
if len(cfg.RadiusAgentCfg().SMGenericConns) != 0 {
if len(cfg.RadiusAgentCfg().SMGConns) != 0 {
smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts,
cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RadiusAgentCfg().SMGenericConns, internalSMGChan, cfg.InternalTtl)
cfg.RadiusAgentCfg().SMGConns, internalSMGChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RadiusAgent> Could not connect to SMG: %s", err.Error()))
exitChan <- true
@@ -273,39 +292,13 @@ func startRadiusAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitCh
exitChan <- true
}
func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
func startSmFreeSWITCH(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
var err error
utils.Logger.Info("Starting CGRateS SMFreeSWITCH service")
var ralsConn, cdrsConn, rlsConn *rpcclient.RpcClientPool
if len(cfg.SmFsConfig.RALsConns) != 0 {
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmFsConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
}
if len(cfg.SmFsConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmFsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
}
if len(cfg.SmFsConfig.RLsConns) != 0 {
rlsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmFsConfig.RLsConns, rlsChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RLs: %s", err.Error()))
exitChan <- true
return
}
}
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, ralsConn, cdrsConn, rlsConn, cfg.DefaultTimezone)
smRpc.SMs = append(smRpc.SMs, sm)
utils.Logger.Info("Starting CGRateS SM-FreeSWITCH service")
smgRpcConn := <-internalSMGChan
internalSMGChan <- smgRpcConn
birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessionmanager.SMGeneric))
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, birpcClnt, cfg.DefaultTimezone)
if err = sm.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMFreeSWITCH> error: %s!", err))
}
@@ -927,12 +920,13 @@ func main() {
go startCdrcs(internalCdrSChan, internalRaterChan, exitChan)
// Start SM-Generic
if cfg.SmGenericConfig.Enabled {
go startSmGeneric(internalSMGChan, internalRaterChan, internalCdrSChan, server, exitChan)
if cfg.SMGConfig.Enabled {
go startSmGeneric(internalSMGChan, internalRaterChan, internalRsChan,
internalSupplierSChan, internalAttributeSChan, internalCdrSChan, server, exitChan)
}
// Start SM-FreeSWITCH
if cfg.SmFsConfig.Enabled {
go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, internalRsChan, cdrDb, exitChan)
go startSmFreeSWITCH(internalSMGChan, exitChan)
// close all sessions on shutdown
go shutdownSessionmanagerSingnalHandler(exitChan)
}
@@ -947,12 +941,6 @@ func main() {
go startSmOpenSIPS(internalRaterChan, internalCdrSChan, cdrDb, exitChan)
}
// Register session manager service // FixMe: make sure this is thread safe
if cfg.SmGenericConfig.Enabled || cfg.SmFsConfig.Enabled || cfg.SmKamConfig.Enabled || cfg.SmOsipsConfig.Enabled || cfg.SMAsteriskCfg().Enabled { // Register SessionManagerV1 service
smRpc = new(v1.SessionManagerV1)
server.RpcRegister(smRpc)
}
if cfg.SMAsteriskCfg().Enabled {
go startSMAsterisk(internalSMGChan, exitChan)
}

View File

@@ -51,4 +51,16 @@
],
},
"resources": {
"enabled": true,
},
"suppliers": {
"enabled": true,
},
"attributes": {
"enabled": true,
},
}

View File

@@ -414,6 +414,21 @@ func (self *CGRConfig) checkConfigSanity() error {
return errors.New("<SMGeneric> RALs not enabled but requested by SMGeneric component.")
}
}
for _, conn := range self.SMGConfig.ResSConns {
if conn.Address == utils.MetaInternal && !self.resourceSCfg.Enabled {
return errors.New("<SMGeneric> ResourceS not enabled but requested by SMGeneric component.")
}
}
for _, conn := range self.SMGConfig.SupplSConns {
if conn.Address == utils.MetaInternal && !self.supplierSCfg.Enabled {
return errors.New("<SMGeneric> SupplierS not enabled but requested by SMGeneric component.")
}
}
for _, conn := range self.SMGConfig.AttrSConns {
if conn.Address == utils.MetaInternal && !self.attributeSCfg.Enabled {
return errors.New("<SMGeneric> AttributeS not enabled but requested by SMGeneric component.")
}
}
if len(self.SMGConfig.CDRsConns) == 0 {
return errors.New("<SMGeneric> CDRs definition is mandatory!")
}

View File

@@ -283,6 +283,15 @@ const CGRATES_CFG_JSON = `
"rals_conns": [
{"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013>
],
"resources_conns": [
{"address": "*internal"} // address where to reach the ResourceS <""|*internal|127.0.0.1:2013>
],
"suppliers_conns": [
{"address": "*internal"} // address where to reach the SupplierS <""|*internal|127.0.0.1:2013>
],
"attributes_conns": [
{"address": "*internal"} // address where to reach the AttributeS <""|*internal|127.0.0.1:2013>
],
"cdrs_conns": [
{"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234>
],

View File

@@ -491,6 +491,18 @@ func TestSmgJsonCfg(t *testing.T) {
&HaPoolJsonCfg{
Address: utils.StringPointer(utils.MetaInternal),
}},
Resources_conns: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Address: utils.StringPointer(utils.MetaInternal),
}},
Suppliers_conns: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Address: utils.StringPointer(utils.MetaInternal),
}},
Attributes_conns: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Address: utils.StringPointer(utils.MetaInternal),
}},
Cdrs_conns: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Address: utils.StringPointer(utils.MetaInternal),

View File

@@ -447,10 +447,18 @@ func TestCgrCfgJSONDefaultsCdreProfiles(t *testing.T) {
func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) {
eSmGeCfg := &SMGConfig{
Enabled: false,
ListenBijson: "127.0.0.1:2014",
RALsConns: []*HaPoolConfig{&HaPoolConfig{Address: "*internal"}},
CDRsConns: []*HaPoolConfig{&HaPoolConfig{Address: "*internal"}},
Enabled: false,
ListenBijson: "127.0.0.1:2014",
RALsConns: []*HaPoolConfig{
&HaPoolConfig{Address: "*internal"}},
ResSConns: []*HaPoolConfig{
&HaPoolConfig{Address: "*internal"}},
SupplSConns: []*HaPoolConfig{
&HaPoolConfig{Address: "*internal"}},
AttrSConns: []*HaPoolConfig{
&HaPoolConfig{Address: "*internal"}},
CDRsConns: []*HaPoolConfig{
&HaPoolConfig{Address: "*internal"}},
SMGReplicationConns: []*HaPoolConfig{},
DebitInterval: 0 * time.Second,
MinCallDuration: 0 * time.Second,

View File

@@ -198,8 +198,11 @@ type SmgJsonCfg struct {
Enabled *bool
Listen_bijson *string
Rals_conns *[]*HaPoolJsonCfg
Resources_conns *[]*HaPoolJsonCfg
Suppliers_conns *[]*HaPoolJsonCfg
Cdrs_conns *[]*HaPoolJsonCfg
Smg_replication_conns *[]*HaPoolJsonCfg
Attributes_conns *[]*HaPoolJsonCfg
Debit_interval *string
Min_call_duration *string
Max_call_duration *string

View File

@@ -92,6 +92,9 @@ type SMGConfig struct {
Enabled bool
ListenBijson string
RALsConns []*HaPoolConfig
ResSConns []*HaPoolConfig
SupplSConns []*HaPoolConfig
AttrSConns []*HaPoolConfig
CDRsConns []*HaPoolConfig
SMGReplicationConns []*HaPoolConfig
DebitInterval time.Duration
@@ -122,6 +125,27 @@ func (self *SMGConfig) loadFromJsonCfg(jsnCfg *SmgJsonCfg) error {
self.RALsConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Resources_conns != nil {
self.ResSConns = make([]*HaPoolConfig, len(*jsnCfg.Resources_conns))
for idx, jsnHaCfg := range *jsnCfg.Resources_conns {
self.ResSConns[idx] = NewDfltHaPoolConfig()
self.ResSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Suppliers_conns != nil {
self.SupplSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns))
for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns {
self.SupplSConns[idx] = NewDfltHaPoolConfig()
self.SupplSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Attributes_conns != nil {
self.AttrSConns = make([]*HaPoolConfig, len(*jsnCfg.Attributes_conns))
for idx, jsnHaCfg := range *jsnCfg.Attributes_conns {
self.AttrSConns[idx] = NewDfltHaPoolConfig()
self.AttrSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Cdrs_conns != nil {
self.CDRsConns = make([]*HaPoolConfig, len(*jsnCfg.Cdrs_conns))
for idx, jsnHaCfg := range *jsnCfg.Cdrs_conns {

View File

@@ -26,6 +26,12 @@ import (
"github.com/cgrates/cgrates/utils"
)
// SupplierReply represents one supplier in
type SortedSupplier struct {
SupplierID string
SortingData map[string]interface{} // store here extra info like cost or stats
}
// SuppliersReply is returned as part of GetSuppliers call
type SortedSuppliers struct {
ProfileID string // Profile matched
@@ -33,10 +39,13 @@ type SortedSuppliers struct {
SortedSuppliers []*SortedSupplier // list of supplier IDs and SortingData data
}
// SupplierReply represents one supplier in
type SortedSupplier struct {
SupplierID string
SortingData map[string]interface{} // store here extra info like cost or stats
// SupplierIDs returns list of suppliers
func (sSpls *SortedSuppliers) SupplierIDs() (sIDs []string) {
sIDs = make([]string, len(sSpls.SortedSuppliers))
for i, spl := range sSpls.SortedSuppliers {
sIDs[i] = spl.SupplierID
}
return
}
// SortWeight is part of sort interface, sort based on Weight

File diff suppressed because one or more lines are too long

View File

@@ -72,6 +72,12 @@ const (
PDD_NOMEDIA_MS = "variable_progressmsec"
IGNOREPARK = "variable_cgr_ignorepark"
FS_VARPREFIX = "variable_"
VarCGRSubsystems = "variable_cgr_subsystems"
SubSAccountS = "accounts"
SubSSupplierS = "suppliers"
SubSResourceS = "resources"
SubSAttributeS = "attributes"
CGRResourcesAllowed = "cgr_resources_allowed"
VAR_CGR_DISCONNECT_CAUSE = "variable_" + utils.CGR_DISCONNECT_CAUSE
VAR_CGR_CMPUTELCR = "variable_" + utils.CGR_COMPUTELCR
@@ -86,11 +92,8 @@ func (fsev FSEvent) String() (result string) {
return
}
// Loads the new event data from a body of text containing the key value proprieties.
// It stores the parsed proprieties in the internal map.
func (fsev FSEvent) AsEvent(body string) engine.Event {
fsev = fsock.FSEventStrToMap(body, nil)
return fsev
func NewFSEvent(strEv string) (fsev FSEvent) {
return fsock.FSEventStrToMap(strEv, nil)
}
func (fsev FSEvent) GetName() string {
@@ -415,6 +418,87 @@ func (fsev FSEvent) AsMapStringIface() (map[string]interface{}, error) {
return nil, utils.ErrNotImplemented
}
// V1AuthorizeArgs returns the arguments used in SMGv1.Authorize
func (fsev FSEvent) V1AuthorizeArgs() (args *V1AuthorizeArgs) {
args = &V1AuthorizeArgs{ // defaults
GetMaxUsage: true,
}
subsystems, has := fsev[VarCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, SubSAccountS) == -1 {
args.GetMaxUsage = false
}
if strings.Index(subsystems, SubSResourceS) != -1 {
args.CheckResources = true
}
if strings.Index(subsystems, SubSSupplierS) != -1 {
args.GetSuppliers = true
}
if strings.Index(subsystems, SubSAttributeS) != -1 {
args.GetAttributes = true
}
return
}
// V2InitSessionArgs returns the arguments used in SMGv1.InitSession
func (fsev FSEvent) V1InitSessionArgs() (args *V1InitSessionArgs) {
args = &V1InitSessionArgs{ // defaults
InitSession: true,
}
subsystems, has := fsev[VarCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, SubSAccountS) == -1 {
args.InitSession = false
}
if strings.Index(subsystems, SubSResourceS) != -1 {
args.AllocateResources = true
}
if strings.Index(subsystems, SubSAttributeS) != -1 {
args.GetAttributes = true
}
return
}
// V1UpdateSessionArgs returns the arguments used in SMGv1.UpdateSession
func (fsev FSEvent) V1UpdateSessionArgs() (args *V1UpdateSessionArgs) {
args = &V1UpdateSessionArgs{ // defaults
UpdateSession: true,
}
subsystems, has := fsev[VarCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, SubSAccountS) == -1 {
args.UpdateSession = false
}
if strings.Index(subsystems, SubSResourceS) != -1 {
args.AllocateResources = true
}
return
}
// V1TerminateSessionArgs returns the arguments used in SMGv1.TerminateSession
func (fsev FSEvent) V1TerminateSessionArgs() (args *V1TerminateSessionArgs) {
args = &V1TerminateSessionArgs{ // defaults
TerminateSession: true,
}
subsystems, has := fsev[VarCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, SubSAccountS) == -1 {
args.TerminateSession = false
}
if strings.Index(subsystems, SubSResourceS) != -1 {
args.ReleaseResources = true
}
return
}
// Converts a slice of strings into a FS array string, contains len(array) at first index since FS does not support len(ARRAY::) for now
func SliceAsFsArray(slc []string) string {
arry := ""

View File

@@ -354,11 +354,11 @@ Task-ID: 2
Task-Desc: heartbeat
Task-Group: core
Task-Runtime: 1349437318`
ev := new(FSEvent).AsEvent(body)
ev := NewFSEvent(body)
if ev.GetName() != "RE_SCHEDULE" {
t.Error("Event not parsed correctly: ", ev)
}
l := len(ev.(FSEvent))
l := len(ev)
if l != 17 {
t.Error("Incorrect number of event fields: ", l)
}
@@ -366,7 +366,7 @@ Task-Runtime: 1349437318`
// Detects if any of the parsers do not return static values
func TestEventParseStatic(t *testing.T) {
ev := new(FSEvent).AsEvent("")
ev := NewFSEvent("")
setupTime, _ := ev.GetSetupTime("^2013-12-07 08:42:24", "")
answerTime, _ := ev.GetAnswerTime("^2013-12-07 08:42:24", "")
dur, _ := ev.GetDuration("^60s")
@@ -413,7 +413,7 @@ Task-Group: core
Task-Runtime: 1349437318`
cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(body)
ev := NewFSEvent(body)
setupTime, _ := ev.GetSetupTime("Event-Date-Local", "")
answerTime, _ := ev.GetAnswerTime("Event-Date-Local", "")
dur, _ := ev.GetDuration("Event-Calling-Line-Number")
@@ -449,7 +449,7 @@ Caller-Channel-Created-Time: 0
Caller-Channel-Answered-Time
Task-Runtime: 1349437318`
var nilTime time.Time
ev := new(FSEvent).AsEvent(body)
ev := NewFSEvent(body)
if setupTime, err := ev.GetSetupTime("", ""); err != nil {
t.Error("Error when parsing empty setupTime")
} else if setupTime != nilTime {
@@ -465,7 +465,7 @@ Task-Runtime: 1349437318`
func TestParseFsHangup(t *testing.T) {
cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
ev := NewFSEvent(hangupEv)
setupTime, _ := ev.GetSetupTime(utils.META_DEFAULT, "")
answerTime, _ := ev.GetAnswerTime(utils.META_DEFAULT, "")
dur, _ := ev.GetDuration(utils.META_DEFAULT)
@@ -494,7 +494,7 @@ func TestParseFsHangup(t *testing.T) {
func TestParseEventValue(t *testing.T) {
cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
ev := NewFSEvent(hangupEv)
if cgrid := ev.ParseEventValue(&utils.RSRField{Id: utils.CGRID}, ""); cgrid != "164b0422fdc6a5117031b427439482c6a4f90e41" {
t.Error("Unexpected cgrid parsed", cgrid)
}
@@ -559,61 +559,10 @@ func TestParseEventValue(t *testing.T) {
}
}
/*
func TestPassesFieldFilterDn1(t *testing.T) {
body := `Event-Name: RE_SCHEDULE
Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d
FreeSWITCH-Hostname: h1.ip-switch.net
FreeSWITCH-Switchname: h1.ip-switch.net
FreeSWITCH-IPv4: 88.198.12.156
Caller-Username: futurem0005`
ev := new(FSEvent).AsEvent(body)
acntPrefxFltr, _ := utils.NewRSRField(`~Account:s/^\w+[shmp]\d{4}$//`)
if pass, _ := ev.PassesFieldFilter(acntPrefxFltr); !pass {
t.Error("Not passing valid filter")
}
body = `Event-Name: RE_SCHEDULE
Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d
FreeSWITCH-Hostname: h1.ip-switch.net
FreeSWITCH-Switchname: h1.ip-switch.net
FreeSWITCH-IPv4: 88.198.12.156
Caller-Username: futurem00005`
ev = new(FSEvent).AsEvent(body)
if pass, _ := ev.PassesFieldFilter(acntPrefxFltr); pass {
t.Error("Should not pass filter")
}
body = `Event-Name: RE_SCHEDULE
Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d
FreeSWITCH-Hostname: h1.ip-switch.net
FreeSWITCH-Switchname: h1.ip-switch.net
FreeSWITCH-IPv4: 88.198.12.156
Caller-Username: 0402129281`
ev = new(FSEvent).AsEvent(body)
acntPrefxFltr, _ = utils.NewRSRField(`~Account:s/^0\d{9}$//`)
if pass, _ := ev.PassesFieldFilter(acntPrefxFltr); !pass {
t.Error("Not passing valid filter")
}
acntPrefxFltr, _ = utils.NewRSRField(`~account:s/^0(\d{9})$/placeholder/`)
if pass, _ := ev.PassesFieldFilter(acntPrefxFltr); pass {
t.Error("Should not pass filter")
}
body = `Event-Name: RE_SCHEDULE
Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d
FreeSWITCH-Hostname: h1.ip-switch.net
FreeSWITCH-Switchname: h1.ip-switch.net
FreeSWITCH-IPv4: 88.198.12.156
Caller-Username: 04021292812`
ev = new(FSEvent).AsEvent(body)
if pass, _ := ev.PassesFieldFilter(acntPrefxFltr); pass {
t.Error("Should not pass filter")
}
}
*/
func TestFsEvAsCDR(t *testing.T) {
cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
ev := NewFSEvent(hangupEv)
setupTime, _ := utils.ParseTimeDetectLayout("1436280728", "")
aTime, _ := utils.ParseTimeDetectLayout("1436280728", "")
eStoredCdr := &engine.CDR{CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41",
@@ -630,10 +579,15 @@ func TestFsEvAsCDR(t *testing.T) {
func TestFsEvGetExtraFields(t *testing.T) {
cfg, _ := config.NewDefaultCGRConfig()
cfg.SmFsConfig.ExtraFields = []*utils.RSRField{&utils.RSRField{Id: "Channel-Read-Codec-Name"}, &utils.RSRField{Id: "Channel-Write-Codec-Name"}, &utils.RSRField{Id: "NonExistingHeader"}}
cfg.SmFsConfig.ExtraFields = []*utils.RSRField{
&utils.RSRField{Id: "Channel-Read-Codec-Name"},
&utils.RSRField{Id: "Channel-Write-Codec-Name"},
&utils.RSRField{Id: "NonExistingHeader"}}
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
expectedExtraFields := map[string]string{"Channel-Read-Codec-Name": "SPEEX", "Channel-Write-Codec-Name": "SPEEX", "NonExistingHeader": ""}
ev := NewFSEvent(hangupEv)
expectedExtraFields := map[string]string{
"Channel-Read-Codec-Name": "SPEEX",
"Channel-Write-Codec-Name": "SPEEX", "NonExistingHeader": ""}
if extraFields := ev.GetExtraFields(); !reflect.DeepEqual(expectedExtraFields, extraFields) {
t.Errorf("Expecting: %+v, received: %+v", expectedExtraFields, extraFields)
}

View File

@@ -21,30 +21,20 @@ package sessionmanager
import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/fsock"
"github.com/cgrates/rpcclient"
)
func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs, rls rpcclient.RpcClientConnection, timezone string) *FSSessionManager {
if rls != nil && reflect.ValueOf(rls).IsNil() {
rls = nil
}
func NewFSSessionManager(smFsConfig *config.SmFsConfig,
smg *utils.BiRPCInternalClient, timezone string) *FSSessionManager {
return &FSSessionManager{
cfg: smFsConfig,
conns: make(map[string]*fsock.FSock),
senderPools: make(map[string]*fsock.FSockPool),
rater: rater,
cdrsrv: cdrs,
rls: rls,
sessions: NewSessions(),
smg: smg,
timezone: timezone,
}
}
@@ -55,22 +45,18 @@ type FSSessionManager struct {
cfg *config.SmFsConfig
conns map[string]*fsock.FSock // Keep the list here for connection management purposes
senderPools map[string]*fsock.FSockPool // Keep sender pools here
rater rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
rls rpcclient.RpcClientConnection
sessions *Sessions
timezone string
smg *utils.BiRPCInternalClient
timezone string
}
func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) {
ca := func(body, connId string) {
ev := new(FSEvent).AsEvent(body)
sm.onChannelAnswer(ev, connId)
sm.onChannelAnswer(
NewFSEvent(body), connId)
}
ch := func(body, connId string) {
ev := new(FSEvent).AsEvent(body)
sm.onChannelHangupComplete(ev)
sm.onChannelHangupComplete(
NewFSEvent(body), connId)
}
handlers := map[string][]func(string, string){
"CHANNEL_ANSWER": []func(string, string){ca},
@@ -78,8 +64,8 @@ func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) {
}
if sm.cfg.SubscribePark {
cp := func(body, connId string) {
ev := new(FSEvent).AsEvent(body)
sm.onChannelPark(ev, connId)
sm.onChannelPark(
NewFSEvent(body), connId)
}
handlers["CHANNEL_PARK"] = []func(string, string){cp}
}
@@ -87,31 +73,37 @@ func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) {
}
// Sets the call timeout valid of starting of the call
func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string, maxDur time.Duration, destNr string) error {
// _, err := fsock.FS.SendApiCmd(fmt.Sprintf("sched_hangup +%d %s\n\n", int(maxDur.Seconds()), uuid))
func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string,
maxDur time.Duration, destNr string) error {
if len(sm.cfg.EmptyBalanceContext) != 0 {
_, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s execute_on_answer sched_transfer +%d %s XML %s\n\n",
uuid, int(maxDur.Seconds()), destNr, sm.cfg.EmptyBalanceContext))
_, err := sm.conns[connId].SendApiCmd(
fmt.Sprintf("uuid_setvar %s execute_on_answer sched_transfer +%d %s XML %s\n\n",
uuid, int(maxDur.Seconds()), destNr, sm.cfg.EmptyBalanceContext))
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not transfer the call to empty balance context, error: <%s>, connId: %s",
err.Error(), connId))
utils.Logger.Err(
fmt.Sprintf("<SM-FreeSWITCH> Could not transfer the call to empty balance context, error: <%s>, connId: %s",
err.Error(), connId))
return err
}
return nil
} else if len(sm.cfg.EmptyBalanceAnnFile) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("sched_broadcast +%d %s playback!manager_request::%s aleg\n\n",
int(maxDur.Seconds()), uuid, sm.cfg.EmptyBalanceAnnFile)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s",
err.Error(), connId))
if _, err := sm.conns[connId].SendApiCmd(
fmt.Sprintf("sched_broadcast +%d %s playback!manager_request::%s aleg\n\n",
int(maxDur.Seconds()), uuid, sm.cfg.EmptyBalanceAnnFile)); err != nil {
utils.Logger.Err(
fmt.Sprintf("<SM-FreeSWITCH> Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s",
err.Error(), connId))
return err
}
return nil
} else {
_, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n",
uuid, int(maxDur.Seconds())))
_, err := sm.conns[connId].SendApiCmd(
fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n",
uuid, int(maxDur.Seconds())))
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send sched_hangup command to freeswitch, error: <%s>, connId: %s",
err.Error(), connId))
utils.Logger.Err(
fmt.Sprintf("<SM-FreeSWITCH> Could not send sched_hangup command to freeswitch, error: <%s>, connId: %s",
err.Error(), connId))
return err
}
return nil
@@ -119,179 +111,132 @@ func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string, maxDur time.
return nil
}
// Queries LCR and sets the cgr_lcr channel variable
func (sm *FSSessionManager) setCgrLcr(ev engine.Event, connId string) error {
var lcrCost engine.LCRCost
startTime, err := ev.GetSetupTime(utils.META_DEFAULT, sm.timezone)
if err != nil {
return err
}
cd := &engine.CallDescriptor{
CgrID: ev.GetCgrId(sm.Timezone()),
Direction: utils.OUT,
Tenant: ev.GetTenant(utils.META_DEFAULT),
Category: ev.GetCategory(utils.META_DEFAULT),
Subject: ev.GetSubject(utils.META_DEFAULT),
Account: ev.GetAccount(utils.META_DEFAULT),
Destination: ev.GetDestination(utils.META_DEFAULT),
TimeStart: startTime,
TimeEnd: startTime.Add(config.CgrConfig().MaxCallDuration),
}
if err := sm.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcrCost); err != nil {
return err
}
supps := []string{}
for _, supplCost := range lcrCost.SupplierCosts {
if dtcs, err := utils.NewDTCSFromRPKey(supplCost.Supplier); err != nil {
return err
} else if len(dtcs.Subject) != 0 {
supps = append(supps, dtcs.Subject)
}
}
fsArray := SliceAsFsArray(supps)
if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), fsArray)); err != nil {
return err
}
return nil
}
func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) {
fsev := ev.(FSEvent)
if fsev[IGNOREPARK] == "true" { // Not for us
return
}
if ev.GetReqType(utils.META_DEFAULT) != utils.META_NONE { // Do not process this request
var maxCallDuration float64 // This will be the maximum duration this channel will be allowed to last
if err := sm.rater.Call("Responder.GetDerivedMaxSessionTime",
ev.AsCDR(config.CgrConfig().DefaultTimezone), &maxCallDuration); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not get max session time for %s, error: %s",
ev.GetUUID(), err.Error()))
}
if maxCallDuration != -1 { // For calls different than unlimited, set limits
maxCallDur := time.Duration(maxCallDuration)
if maxCallDur <= sm.cfg.MinCallDuration {
//utils.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject)))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS)
return
}
sm.setMaxCallDuration(ev.GetUUID(), connId, maxCallDur, ev.GetCallDestNr(utils.META_DEFAULT))
}
}
// ComputeLcr
if ev.ComputeLcr() {
cd, err := fsev.AsCallDescriptor()
cd.CgrID = fsev.GetCgrId(sm.Timezone())
if err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_PREPROCESS_ERROR: %s", err.Error()))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
}
var lcr engine.LCRCost
if err = sm.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_API_ERROR: %s", err.Error()))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
}
if lcr.HasErrors() {
lcr.LogErrors()
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
}
if supps, err := lcr.SuppliersSlice(); err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_ERROR: %s", err.Error()))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
} else {
fsArray := SliceAsFsArray(supps)
if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n",
ev.GetUUID(), utils.CGR_SUPPLIERS, fsArray)); err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_ERROR: %s", err.Error()))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
}
}
}
if sm.rls != nil {
var reply string
attrRU := utils.ArgRSv1ResourceUsage{
CGREvent: utils.CGREvent{
Tenant: ev.(FSEvent).GetTenant(utils.META_DEFAULT),
Event: ev.(FSEvent).AsMapStringInterface(sm.timezone),
},
UsageID: ev.GetUUID(),
Units: 1,
}
if err := sm.rls.Call(utils.ResourceSv1AllocateResource, attrRU, &reply); err != nil {
if err.Error() == utils.ErrResourceUnavailable.Error() {
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), "-"+utils.ErrResourceUnavailable.Error())
} else {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> RLs API error: %s", err.Error()))
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
}
return
}
}
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK)
}
// Sends the transfer command to unpark the call to freeswitch
func (sm *FSSessionManager) unparkCall(uuid, connId, call_dest_nb, notify string) {
_, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
func (sm *FSSessionManager) unparkCall(uuid, connId, call_dest_nb, notify string) (err error) {
_, err = sm.conns[connId].SendApiCmd(
fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send unpark api notification to freeswitch, error: <%s>, connId: %s",
err.Error(), connId))
utils.Logger.Err(
fmt.Sprintf("<SM-FreeSWITCH> Could not send unpark api notification to freeswitch, error: <%s>, connId: %s",
err.Error(), connId))
return
}
if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send unpark api call to freeswitch, error: <%s>, connId: %s",
err.Error(), connId))
if _, err = sm.conns[connId].SendApiCmd(
fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil {
utils.Logger.Err(
fmt.Sprintf("<SM-FreeSWITCH> Could not send unpark api call to freeswitch, error: <%s>, connId: %s",
err.Error(), connId))
}
return
}
func (sm *FSSessionManager) onChannelPark(fsev FSEvent, connId string) {
if fsev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Not for us
return
}
authArgs := fsev.V1AuthorizeArgs()
var authReply V1AuthorizeReply
if err := sm.smg.Call(utils.SMGv1AuthorizeEvent, authArgs, &authReply); err != nil {
utils.Logger.Err(
fmt.Sprintf("<SM-FreeSWITCH> Could not authorize event %s, error: %s",
fsev.GetUUID(), err.Error()))
sm.unparkCall(fsev.GetUUID(), connId,
fsev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
}
if authArgs.GetMaxUsage {
if authReply.MaxUsage != -1 { // For calls different than unlimited, set limits
if authReply.MaxUsage == 0 {
sm.unparkCall(fsev.GetUUID(), connId,
fsev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS)
return
}
sm.setMaxCallDuration(fsev.GetUUID(), connId,
authReply.MaxUsage, fsev.GetCallDestNr(utils.META_DEFAULT))
}
}
if authArgs.CheckResources {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %b\n\n",
fsev.GetUUID(), CGRResourcesAllowed, authReply.ResourcesAllowed)); err != nil {
utils.Logger.Info(
fmt.Sprintf("<%s> error %s setting channel variabile: %s",
utils.SMFreeSWITCH, err.Error(), CGRResourcesAllowed))
sm.unparkCall(fsev.GetUUID(), connId,
fsev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
}
}
if authArgs.GetSuppliers {
fsArray := SliceAsFsArray(authReply.Suppliers.SupplierIDs())
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n",
fsev.GetUUID(), utils.CGR_SUPPLIERS, fsArray)); err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> LCR_ERROR: %s", err.Error()))
sm.unparkCall(fsev.GetUUID(), connId, fsev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
}
}
if authArgs.GetAttributes {
if authReply.Attributes != nil {
for _, fldName := range authReply.Attributes.AlteredFields {
if _, err := sm.conns[connId].SendApiCmd(
fmt.Sprintf("uuid_setvar %s %s %s\n\n", fsev.GetUUID(), fldName,
authReply.Attributes.CGREvent.Event[fldName])); err != nil {
utils.Logger.Info(
fmt.Sprintf("<%s> error %s setting channel variabile: %s",
utils.SMFreeSWITCH, err.Error(), fldName))
sm.unparkCall(fsev.GetUUID(), connId,
fsev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR)
return
}
}
}
}
sm.unparkCall(fsev.GetUUID(), connId,
fsev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK)
}
func (sm *FSSessionManager) onChannelAnswer(fsev FSEvent, connId string) {
if fsev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
return
}
if fsev.MissingParameter(sm.timezone) {
sm.DisconnectSession(fsev, connId, MISSING_PARAMETER)
}
initSessionArgs := fsev.V1InitSessionArgs()
var initReply V1InitSessionReply
if err := sm.smg.Call(utils.SMGv1InitiateSession,
initSessionArgs, &initReply); err != nil {
utils.Logger.Err(
fmt.Sprintf("<SM-FreeSWITCH> Could not answer session with event %s, error: %s",
fsev.GetUUID(), err.Error()))
sm.DisconnectSession(fsev, connId, SYSTEM_ERROR)
return
}
if initSessionArgs.AllocateResources {
if initReply.ResAllocMessage == "" {
sm.DisconnectSession(fsev, connId,
utils.ErrUnallocatedResource.Error())
}
}
}
func (sm *FSSessionManager) onChannelAnswer(ev engine.Event, connId string) {
if ev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
func (sm *FSSessionManager) onChannelHangupComplete(fsev FSEvent, connId string) {
if fsev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
return
}
if ev.MissingParameter(sm.timezone) {
sm.DisconnectSession(ev, connId, MISSING_PARAMETER)
}
s := NewSession(ev, connId, sm)
if s != nil {
sm.sessions.indexSession(s)
}
}
func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) {
if ev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
return
}
var s *Session
for i := 0; i < 2; i++ { // Protect us against concurrency, wait a couple of seconds for the answer to be populated before we process hangup
s = sm.sessions.getSession(ev.GetUUID())
if s != nil {
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if s != nil { // Handled by us, cleanup here
if err := sm.sessions.removeSession(s, ev); err != nil {
utils.Logger.Err(err.Error())
}
}
if sm.cfg.CreateCdr {
sm.ProcessCdr(ev.AsCDR(config.CgrConfig().DefaultTimezone))
}
var reply string
attrRU := utils.ArgRSv1ResourceUsage{
CGREvent: utils.CGREvent{
Tenant: ev.(FSEvent).GetTenant(utils.META_DEFAULT),
Event: ev.(FSEvent).AsMapStringInterface(sm.timezone),
},
UsageID: ev.GetUUID(),
Units: 1,
if err := sm.smg.Call(utils.SMGv1TerminateSession,
fsev.V1TerminateSessionArgs(), &reply); err != nil {
utils.Logger.Err(
fmt.Sprintf("<SM-FreeSWITCH> Could not terminate session with event %s, error: %s",
fsev.GetUUID(), err.Error()))
return
}
if sm.rls != nil {
if err := sm.rls.Call(utils.ResourceSv1ReleaseResource, attrRU, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> RLs API error: %s", err.Error()))
if sm.cfg.CreateCdr {
cdr := fsev.AsCDR(sm.timezone)
if err := sm.smg.Call(utils.SMGv1ProcessCDR, cdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>",
cdr.CGRID, cdr.OriginID, err.Error()))
}
}
}
@@ -325,24 +270,15 @@ func (sm *FSSessionManager) Connect() error {
} else {
sm.senderPools[connId] = fsSenderPool
}
if sm.cfg.ChannelSyncInterval != 0 { // Schedule running of the callsync
go func() {
for { // Schedule sync channels to run repetately
time.Sleep(sm.cfg.ChannelSyncInterval)
sm.SyncSessions()
}
}()
}
}
err := <-errChan // Will keep the Connect locked until the first error in one of the connections
return err
}
// Disconnects a session by sending hangup command to freeswitch
func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
func (sm *FSSessionManager) DisconnectSession(fsev FSEvent, connId, notify string) error {
if _, err := sm.conns[connId].SendApiCmd(
fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil {
fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", fsev.GetUUID(), notify)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send disconect api notification to freeswitch, error: <%s>, connId: %s",
err.Error(), connId))
return err
@@ -350,7 +286,7 @@ func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify st
if notify == INSUFFICIENT_FUNDS {
if len(sm.cfg.EmptyBalanceContext) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s XML %s\n\n",
ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil {
fsev.GetUUID(), fsev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not transfer the call to empty balance context, error: <%s>, connId: %s",
err.Error(), connId))
return err
@@ -358,7 +294,7 @@ func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify st
return nil
} else if len(sm.cfg.EmptyBalanceAnnFile) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n",
ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil {
fsev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s",
err.Error(), connId))
return err
@@ -366,51 +302,13 @@ func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify st
return nil
}
}
if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
if err := sm.conns[connId].SendMsgCmd(fsev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return nil
}
func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.CDR) error {
var reply string
if err := sm.cdrsrv.Call("CdrsV1.ProcessCDR", storedCdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>",
storedCdr.CGRID, storedCdr.OriginID, err.Error()))
}
return nil
}
func (sm *FSSessionManager) DebitInterval() time.Duration {
return sm.cfg.DebitInterval
}
func (sm *FSSessionManager) CdrSrv() rpcclient.RpcClientConnection {
return sm.cdrsrv
}
func (sm *FSSessionManager) Rater() rpcclient.RpcClientConnection {
return sm.rater
}
func (sm *FSSessionManager) Sessions() []*Session {
return sm.sessions.getSessions()
}
func (sm *FSSessionManager) Timezone() string {
return sm.timezone
}
// Called when call goes under the minimum duratio threshold, so FreeSWITCH can play an announcement message
func (sm *FSSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n",
sessionUuid, sm.cfg.LowBalanceAnnFile)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send uuid_broadcast to freeswitch, error: %s, connection id: %s",
err.Error(), connId))
}
}
func (sm *FSSessionManager) Shutdown() (err error) {
for connId, fSock := range sm.conns {
if !fSock.Connected() {
@@ -422,76 +320,5 @@ func (sm *FSSessionManager) Shutdown() (err error) {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Error on calls shutdown: %s, connection id: %s", err.Error(), connId))
}
}
for i := 0; len(sm.sessions.getSessions()) > 0 && i < 20; i++ {
time.Sleep(100 * time.Millisecond) // wait for the hungup event to be fired
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> Shutdown waiting on sessions: %v", sm.sessions))
}
return nil
}
// Sync sessions with FS
/*
map[secure: hostname:CgrDev1 callstate:ACTIVE callee_num:1002 initial_dest:1002 state:CS_EXECUTE dialplan:XML read_codec:SPEEX initial_ip_addr:127.0.0.1 write_codec:SPEEX write_bit_rate:44000
call_uuid:3427e500-10e5-4864-a589-e306b70419a2 presence_id: initial_cid_name:1001 context:default read_rate:32000 read_bit_rate:44000 callee_direction:SEND initial_context:default created:2015-06-15 18:48:13
dest:1002 callee_name:Outbound Call direction:inbound ip_addr:127.0.0.1 sent_callee_name:Outbound Call write_rate:32000 presence_data: sent_callee_num:1002 created_epoch:1434386893 cid_name:1001 application:sched_hangup
application_data:+10800 alloted_timeout uuid:3427e500-10e5-4864-a589-e306b70419a2 name:sofia/cgrtest/1001@127.0.0.1 cid_num:1001 initial_cid_num:1001 initial_dialplan:XML]
*/
func (sm *FSSessionManager) SyncSessions() error {
for connId, senderPool := range sm.senderPools {
var aChans []map[string]string
fsConn, err := senderPool.PopFSock()
if err != nil {
if err == fsock.ErrConnectionPoolTimeout { // Timeout waiting for connections to re-establish, cleanup calls
aChans = make([]map[string]string, 0) // Emulate no call information so we can disconnect bellow
} else {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Error on syncing active calls, senderPool: %+v, error: %s",
senderPool, err.Error()))
continue
}
} else {
activeChanStr, err := fsConn.SendApiCmd("show channels")
senderPool.PushFSock(fsConn)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Error on syncing active calls, senderPool: %+v, error: %s",
senderPool, err.Error()))
continue
}
aChans = fsock.MapChanData(activeChanStr)
if len(aChans) == 0 && strings.HasPrefix(activeChanStr, "uuid,direction") { // Failed converting output from FS
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Syncing active calls, failed converting output from FS: %s",
activeChanStr))
continue
}
}
for _, session := range sm.sessions.getSessions() {
if session.connId != connId { // This session belongs to another connectionId
continue
}
var stillActive bool
for _, fsAChan := range aChans {
if fsAChan["call_uuid"] == session.eventStart.GetUUID() ||
(fsAChan["call_uuid"] == "" && fsAChan["uuid"] == session.eventStart.GetUUID()) { // Channel still active
stillActive = true
break
}
}
if stillActive { // No need to do anything since the channel is still there
continue
}
utils.Logger.Warning(fmt.Sprintf("<SM-FreeSWITCH> Sync active channels, stale session detected, uuid: %s",
session.eventStart.GetUUID()))
fsev := session.eventStart.(FSEvent)
now := time.Now()
aTime, _ := fsev.GetAnswerTime("", sm.timezone)
dur := now.Sub(aTime)
fsev[END_TIME] = now.String()
fsev[DURATION] = strconv.FormatFloat(dur.Seconds(), 'f', -1, 64)
if err := sm.sessions.removeSession(session, fsev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Error on removing stale session with uuid: %s, error: %s",
session.eventStart.GetUUID(), err.Error()))
continue
}
}
}
return nil
return
}

View File

@@ -1,26 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"testing"
)
func TestFSSMInterface(t *testing.T) {
var _ SessionManager = SessionManager(new(FSSessionManager))
}

View File

@@ -89,7 +89,7 @@ func (self *KamailioSessionManager) allocateResources(kev KamEvent) (err error)
Units: 1, // One channel reserved
}
var reply string
return self.rlS.Call(utils.ResourceSv1AllocateResource, attrRU, &reply)
return self.rlS.Call(utils.ResourceSv1AllocateResources, attrRU, &reply)
}
func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) {
@@ -224,7 +224,7 @@ func (self *KamailioSessionManager) onCallEnd(evData []byte, connId string) {
UsageID: kev.GetUUID(),
Units: 1,
}
if err := self.rlS.Call(utils.ResourceSv1ReleaseResource, attrRU, &reply); err != nil {
if err := self.rlS.Call(utils.ResourceSv1ReleaseResources, attrRU, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> RLs API error: %s", err.Error()))
}
}()

View File

@@ -1,180 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
//"github.com/cgrates/cgrates/config"
//"testing"
var (
newEventBody = `
"Event-Name": "HEARTBEAT",
"Core-UUID": "d5abc5b0-95c6-11e1-be05-43c90197c914",
"FreeSWITCH-Hostname": "grace",
"FreeSWITCH-Switchname": "grace",
"FreeSWITCH-IPv4": "172.17.77.126",
"variable_sip_full_from": "rif",
"variable_cgr_account": "rif",
"variable_sip_full_to": "0723045326",
"Caller-Dialplan": "vdf",
"FreeSWITCH-IPv6": "::1",
"Event-Date-Local": "2012-05-04 14:38:23",
"Event-Date-GMT": "Fri, 03 May 2012 11:38:23 GMT",
"Event-Date-Timestamp": "1336131503218867",
"Event-Calling-File": "switch_core.c",
"Event-Calling-Function": "send_heartbeat",
"Event-Calling-Line-Number": "68",
"Event-Sequence": "4171",
"Event-Info": "System Ready",
"Up-Time": "0 years, 0 days, 2 hours, 43 minutes, 21 seconds, 349 milliseconds, 683 microseconds",
"Session-Count": "0",
"Max-Sessions": "1000",
"Session-Per-Sec": "30",
"Session-Since-Startup": "122",
"Idle-CPU": "100.000000"
`
conf_data = []byte(`
### Test data, not for production usage
[global]
default_reqtype=
`)
)
/* Missing parameter is not longer tested in NewSession. ToDo: expand this test for more util information
func TestSessionNilSession(t *testing.T) {
var errCfg error
cfg, errCfg = config.NewCGRConfigBytes(conf_data) // Needed here to avoid nil on cfg variable
if errCfg != nil {
t.Errorf("Cannot get configuration %v", errCfg)
}
newEvent := new(FSEvent).New("")
sm := &FSSessionManager{}
s := NewSession(newEvent, sm)
if s != nil {
t.Error("no account and it still created session.")
}
}
*/
type MockRpcClient struct {
refundCd *engine.CallDescriptor
}
func (mc *MockRpcClient) Call(methodName string, arg interface{}, reply interface{}) error {
if cd, ok := arg.(*engine.CallDescriptor); ok {
mc.refundCd = cd
}
return nil
}
func (mc *MockRpcClient) GetCost(*engine.CallDescriptor, *engine.CallCost) error { return nil }
func (mc *MockRpcClient) Debit(*engine.CallDescriptor, *engine.CallCost) error { return nil }
func (mc *MockRpcClient) MaxDebit(*engine.CallDescriptor, *engine.CallCost) error { return nil }
func (mc *MockRpcClient) RefundIncrements(cd *engine.CallDescriptor, reply *float64) error {
mc.refundCd = cd
return nil
}
func (mc *MockRpcClient) RefundRounding(cd *engine.CallDescriptor, reply *float64) error {
return nil
}
func (mc *MockRpcClient) GetMaxSessionTime(*engine.CallDescriptor, *float64) error { return nil }
func (mc *MockRpcClient) GetDerivedChargers(*utils.AttrDerivedChargers, *utils.DerivedChargers) error {
return nil
}
func (mc *MockRpcClient) GetDerivedMaxSessionTime(*engine.CDR, *float64) error { return nil }
func (mc *MockRpcClient) GetSessionRuns(*engine.CDR, *[]*engine.SessionRun) error { return nil }
func (mc *MockRpcClient) ProcessCdr(*engine.CDR, *string) error { return nil }
func (mc *MockRpcClient) StoreSMCost(engine.AttrCDRSStoreSMCost, *string) error { return nil }
func (mc *MockRpcClient) GetLCR(*engine.AttrGetLcr, *engine.LCRCost) error { return nil }
func (mc *MockRpcClient) GetTimeout(int, *time.Duration) error { return nil }
func TestSessionRefund(t *testing.T) {
mc := &MockRpcClient{}
s := &Session{sessionManager: &FSSessionManager{rater: mc, timezone: time.UTC.String()}, eventStart: FSEvent{SETUP_TIME: time.Now().Format(time.RFC3339)}}
ts := &engine.TimeSpan{
TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC),
TimeEnd: time.Date(2015, 6, 10, 14, 7, 30, 0, time.UTC),
}
// add increments
for i := 0; i < 30; i++ {
ts.AddIncrement(&engine.Increment{Duration: time.Second, Cost: 1.0})
}
cc := &engine.CallCost{Timespans: engine.TimeSpans{ts}}
hangupTime := time.Date(2015, 6, 10, 14, 7, 20, 0, time.UTC)
s.Refund(cc, hangupTime)
if len(mc.refundCd.Increments) != 1 || mc.refundCd.Increments[0].GetCompressFactor() != 10 || len(cc.Timespans) != 1 || cc.Timespans[0].TimeEnd != hangupTime {
t.Errorf("Error refunding: %+v, %+v", mc.refundCd.Increments, cc.Timespans[0])
}
}
func TestSessionRefundAll(t *testing.T) {
mc := &MockRpcClient{}
s := &Session{sessionManager: &FSSessionManager{rater: mc, timezone: time.UTC.String()}, eventStart: FSEvent{SETUP_TIME: time.Now().Format(time.RFC3339)}}
ts := &engine.TimeSpan{
TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC),
TimeEnd: time.Date(2015, 6, 10, 14, 7, 30, 0, time.UTC),
}
// add increments
for i := 0; i < 30; i++ {
ts.AddIncrement(&engine.Increment{Duration: time.Second, Cost: 1.0})
}
cc := &engine.CallCost{Timespans: engine.TimeSpans{ts}}
hangupTime := time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC)
s.Refund(cc, hangupTime)
if len(mc.refundCd.Increments) != 1 || mc.refundCd.Increments[0].GetCompressFactor() != 30 || len(cc.Timespans) != 0 {
t.Errorf("Error refunding: %+v, %+v", len(mc.refundCd.Increments), cc.Timespans)
}
}
func TestSessionRefundManyAll(t *testing.T) {
mc := &MockRpcClient{}
s := &Session{sessionManager: &FSSessionManager{rater: mc, timezone: time.UTC.String()}, eventStart: FSEvent{SETUP_TIME: time.Now().Format(time.RFC3339)}}
ts1 := &engine.TimeSpan{
TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC),
TimeEnd: time.Date(2015, 6, 10, 14, 7, 30, 0, time.UTC),
}
// add increments
for i := 0; i < int(ts1.GetDuration().Seconds()); i++ {
ts1.AddIncrement(&engine.Increment{Duration: time.Second, Cost: 1.0})
}
ts2 := &engine.TimeSpan{
TimeStart: time.Date(2015, 6, 10, 14, 7, 30, 0, time.UTC),
TimeEnd: time.Date(2015, 6, 10, 14, 8, 0, 0, time.UTC),
}
// add increments
for i := 0; i < int(ts2.GetDuration().Seconds()); i++ {
ts2.AddIncrement(&engine.Increment{Duration: time.Second, Cost: 1.0})
}
cc := &engine.CallCost{Timespans: engine.TimeSpans{ts1, ts2}}
hangupTime := time.Date(2015, 6, 10, 14, 07, 0, 0, time.UTC)
s.Refund(cc, hangupTime)
if len(mc.refundCd.Increments) != 1 || mc.refundCd.Increments[0].GetCompressFactor() != 60 || len(cc.Timespans) != 0 {
t.Errorf("Error refunding: %+v, %+v", len(mc.refundCd.Increments), cc.Timespans)
}
}

View File

@@ -63,12 +63,15 @@ type SMGReplicationConn struct {
Synchronous bool
}
func NewSMGeneric(cgrCfg *config.CGRConfig, rals rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection,
func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS,
splS, attrS, cdrsrv rpcclient.RpcClientConnection,
smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric {
ssIdxCfg := cgrCfg.SmGenericConfig.SessionIndexes
ssIdxCfg[utils.OriginID] = true // Make sure we have indexing for OriginID since it is a requirement on prefix searching
return &SMGeneric{cgrCfg: cgrCfg,
rals: rals,
resS: resS,
attrS: attrS,
cdrsrv: cdrsrv,
smgReplConns: smgReplConns,
Timezone: timezone,
@@ -84,10 +87,13 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals rpcclient.RpcClientConnection,
}
type SMGeneric struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
rals rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
rals rpcclient.RpcClientConnection // RALs connections
resS rpcclient.RpcClientConnection // ResourceS connections
splS rpcclient.RpcClientConnection // SupplierS connections
attrS rpcclient.RpcClientConnection // AttributeS connections
cdrsrv rpcclient.RpcClientConnection // CDR server connections
smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data
Timezone string
activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging
aSessionsMux sync.RWMutex
@@ -120,8 +126,8 @@ type smgSessionTerminator struct {
// setSessionTerminator installs a new terminator for a session
func (smg *SMGeneric) setSessionTerminator(s *SMGSession) {
ttl := s.EventStart.GetSessionTTL(smg.cgrCfg.SmGenericConfig.SessionTTL,
smg.cgrCfg.SmGenericConfig.SessionTTLMaxDelay)
ttl := s.EventStart.GetSessionTTL(smg.cgrCfg.SMGConfig.SessionTTL,
smg.cgrCfg.SMGConfig.SessionTTLMaxDelay)
if ttl == 0 {
return
}
@@ -383,9 +389,9 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent,
CD: sessionRun.CallDescriptor, clntConn: clntConn}
smg.recordASession(s)
//utils.Logger.Info(fmt.Sprintf("<SMGeneric> Starting session: %s, runId: %s", sessionId, s.runId))
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 {
if smg.cgrCfg.SMGConfig.DebitInterval != 0 {
s.stopDebit = stopDebitChan
go s.debitLoop(smg.cgrCfg.SmGenericConfig.DebitInterval)
go s.debitLoop(smg.cgrCfg.SMGConfig.DebitInterval)
}
}
return nil, nil
@@ -465,7 +471,7 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro
// replicateSessions will replicate session based on configuration
func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool, smgReplConns []*SMGReplicationConn) (err error) {
if len(smgReplConns) == 0 ||
(smg.cgrCfg.SmGenericConfig.DebitInterval != 0 && !passiveSessions) { // Replicating active not supported
(smg.cgrCfg.SMGConfig.DebitInterval != 0 && !passiveSessions) { // Replicating active not supported
return
}
ssMux := &smg.aSessionsMux
@@ -712,7 +718,7 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClie
smg.sessionEnd(cgrID, 0)
return
}
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Session handled by debit loop
if smg.cgrCfg.SMGConfig.DebitInterval != 0 { // Session handled by debit loop
maxUsage = time.Duration(-1 * time.Second)
return
}
@@ -731,7 +737,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
return item.Value.(time.Duration), item.Err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err})
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active
if smg.cgrCfg.SMGConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active
err = errors.New("ACTIVE_DEBIT_LOOP")
return
}
@@ -748,8 +754,8 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
}
smg.resetTerminatorTimer(cgrID,
gev.GetSessionTTL(
smg.cgrCfg.SmGenericConfig.SessionTTL,
smg.cgrCfg.SmGenericConfig.SessionTTLMaxDelay),
smg.cgrCfg.SMGConfig.SessionTTL,
smg.cgrCfg.SMGConfig.SessionTTLMaxDelay),
gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage())
var lastUsed *time.Duration
var evLastUsed time.Duration
@@ -759,7 +765,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
return
}
if maxUsage, err = gev.GetMaxUsage(utils.META_DEFAULT,
smg.cgrCfg.SmGenericConfig.MaxCallDuration); err != nil {
smg.cgrCfg.SMGConfig.MaxCallDuration); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrMandatoryIeMissing
}
@@ -1273,3 +1279,214 @@ func (smg *SMGeneric) BiRPCV1ReplicatePassiveSessions(clnt rpcclient.RpcClientCo
*reply = utils.OK
return
}
type V1AuthorizeArgs struct {
GetMaxUsage bool
CheckResources bool
GetSuppliers bool
GetAttributes bool
utils.CGREvent
utils.Paginator
}
type V1AuthorizeReply struct {
MaxUsage time.Duration
ResourcesAllowed bool
Suppliers engine.SortedSuppliers
Attributes *engine.AttrSProcessEventReply
}
// BiRPCV1Authorize performs authorization for CGREvent based on specific components
func (smg *SMGeneric) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
args *V1AuthorizeArgs, authReply *V1AuthorizeReply) (err error) {
if args.GetMaxUsage {
maxUsage, err := smg.GetMaxUsage(args.CGREvent.Event)
if err != nil {
return utils.NewErrServerError(err)
}
authReply.MaxUsage = maxUsage
}
if args.CheckResources {
originID, err := args.CGREvent.FieldAsString(utils.ACCID)
if err != nil {
return utils.NewErrServerError(err)
}
var allowed bool
attrRU := utils.ArgRSv1ResourceUsage{
Tenant: args.CGREvent.Tenant,
UsageID: originID,
Event: args.CGREvent.Event,
Units: 1,
}
if err = smg.resS.Call(utils.ResourceSv1AllowUsage,
attrRU, &allowed); err != nil {
return err
}
authReply.ResourcesAllowed = allowed
}
if args.GetSuppliers {
var splsReply engine.SortedSuppliers
if err = smg.splS.Call(utils.SupplierSv1GetSuppliers,
args.CGREvent, &splsReply); err != nil {
return err
}
authReply.Suppliers = splsReply
}
if args.GetAttributes {
if args.CGREvent.Context == nil { // populate if not already in
args.CGREvent.Context = utils.StringPointer(utils.MetaSMG)
}
var rplyEv engine.AttrSProcessEventReply
if err = smg.attrS.Call(utils.AttributeSv1ProcessEvent,
args.CGREvent, &rplyEv); err != nil {
return
}
authReply.Attributes = &rplyEv
}
return nil
}
type V1InitSessionArgs struct {
InitSession bool
AllocateResources bool
GetAttributes bool
utils.CGREvent
}
type V1InitSessionReply struct {
MaxUsage time.Duration
ResAllocMessage string
Attributes *engine.AttrSProcessEventReply
}
// BiRPCV2InitiateSession initiates a new session, returns the maximum duration the session can last
func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
args *V1InitSessionArgs, rply *V1InitSessionReply) (err error) {
if args.AllocateResources {
originID, err := args.CGREvent.FieldAsString(utils.ACCID)
if err != nil {
return utils.NewErrServerError(err)
}
attrRU := utils.ArgRSv1ResourceUsage{
Tenant: args.CGREvent.Tenant,
UsageID: originID,
Event: args.CGREvent.Event,
Units: 1,
}
var allocMessage string
if err = smg.resS.Call(utils.ResourceSv1AllocateResources,
attrRU, &allocMessage); err != nil {
return err
}
rply.ResAllocMessage = allocMessage
}
if args.InitSession {
if rply.MaxUsage, err = smg.InitiateSession(args.CGREvent.Event, clnt); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
return
}
}
if args.GetAttributes {
if args.CGREvent.Context == nil {
args.CGREvent.Context = utils.StringPointer(utils.MetaSMG)
}
var rplyEv engine.AttrSProcessEventReply
if err = smg.attrS.Call(utils.AttributeSv1ProcessEvent,
args.CGREvent, &rplyEv); err != nil {
return
}
rply.Attributes = &rplyEv
}
return
}
type V1UpdateSessionArgs struct {
UpdateSession bool
AllocateResources bool
utils.CGREvent
}
type V1UpdateSessionReply struct {
MaxUsage time.Duration
ResAllocMessage string
}
// BiRPCV1UpdateSession updates an existing session, returning the duration which the session can still last
func (smg *SMGeneric) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection,
args *V1UpdateSessionArgs, rply *V1UpdateSessionReply) (err error) {
if args.UpdateSession {
if rply.MaxUsage, err = smg.UpdateSession(args.CGREvent.Event, clnt); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
return
}
}
if args.AllocateResources {
originID, err := args.CGREvent.FieldAsString(utils.ACCID)
if err != nil {
return utils.NewErrServerError(err)
}
attrRU := utils.ArgRSv1ResourceUsage{
Tenant: args.CGREvent.Tenant,
UsageID: originID,
Event: args.CGREvent.Event,
Units: 1,
}
var allocMessage string
if err = smg.resS.Call(utils.ResourceSv1AllocateResources,
attrRU, &allocMessage); err != nil {
return err
}
rply.ResAllocMessage = allocMessage
}
return
}
type V1TerminateSessionArgs struct {
TerminateSession bool
ReleaseResources bool
utils.CGREvent
}
// BiRPCV1TerminateSession will stop debit loops as well as release any used resources
func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection,
args *V1TerminateSessionArgs, rply *string) (err error) {
if args.TerminateSession {
if err = smg.TerminateSession(args.CGREvent.Event, clnt); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
return
}
}
if args.ReleaseResources {
originID, err := args.CGREvent.FieldAsString(utils.ACCID)
if err != nil {
return utils.NewErrServerError(err)
}
var reply string
argsRU := utils.ArgRSv1ResourceUsage{
Tenant: args.CGREvent.Tenant,
UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired
Event: args.CGREvent.Event,
}
if err = smg.resS.Call(utils.ResourceSv1ReleaseResources,
argsRU, &reply); err != nil {
return utils.NewErrServerError(err)
}
}
*rply = utils.OK
return
}
// Called on session end, should send the CDR to CDRS
func (smg *SMGeneric) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, cgrEv utils.CGREvent, reply *string) error {
if err := smg.ProcessCDR(cgrEv.Event); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return nil
}

View File

@@ -29,12 +29,12 @@ var smgCfg *config.CGRConfig
func init() {
smgCfg, _ = config.NewDefaultCGRConfig()
smgCfg.SmGenericConfig.SessionIndexes = utils.StringMap{"Tenant": true,
smgCfg.SMGConfig.SessionIndexes = utils.StringMap{"Tenant": true,
"Account": true, "Extra3": true, "Extra4": true}
}
func TestSMGSessionIndexing(t *testing.T) {
smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC")
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC")
smGev := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: "*voice",
@@ -389,7 +389,7 @@ func TestSMGSessionIndexing(t *testing.T) {
}
func TestSMGActiveSessions(t *testing.T) {
smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC")
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC")
smGev1 := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: "*voice",
@@ -463,7 +463,7 @@ func TestSMGActiveSessions(t *testing.T) {
}
func TestGetPassiveSessions(t *testing.T) {
smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC")
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC")
if pSS := smg.getSessions("", true); len(pSS) != 0 {
t.Errorf("PassiveSessions: %+v", pSS)
}

View File

@@ -26,6 +26,20 @@ import (
"github.com/cgrates/rpcclient"
)
// NewBiJSONrpcClient will create a bidirectional JSON client connection
func NewBiJSONrpcClient(addr string, handlers map[string]interface{}) (*rpc2.Client, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
clnt := rpc2.NewClientWithCodec(rpc2_jsonrpc.NewJSONCodec(conn))
for method, handlerFunc := range handlers {
clnt.Handle(method, handlerFunc)
}
go clnt.Run()
return clnt, nil
}
// Interface which the server needs to work as BiRPCServer
type BiRPCServer interface {
Call(string, interface{}, interface{}) error // So we can use it also as rpcclient.RpcClientConnection
@@ -51,17 +65,3 @@ func (clnt *BiRPCInternalClient) SetClientConn(clntConn rpcclient.RpcClientConne
func (clnt *BiRPCInternalClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
return clnt.serverConn.CallBiRPC(clnt.clntConn, serviceMethod, args, reply)
}
func NewBiJSONrpcClient(addr string, handlers map[string]interface{}) (*rpc2.Client, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
clnt := rpc2.NewClientWithCodec(rpc2_jsonrpc.NewJSONCodec(conn))
for method, handlerFunc := range handlers {
clnt.Handle(method, handlerFunc)
}
go clnt.Run()
return clnt, nil
}

View File

@@ -522,10 +522,6 @@ const (
MetaTpRatingProfile = "*tp_rating_profile"
MetaStorDB = "*stordb"
MetaDataDB = "*datadb"
SMGenericV2UpdateSession = "SMGenericV2.UpdateSession"
SMGenericV2InitiateSession = "SMGenericV2.InitiateSession"
SMGenericV1UpdateSession = "SMGenericV1.UpdateSession"
SMGenericV1InitiateSession = "SMGenericV1.InitiateSession"
SupplierS = "SupplierS"
MetaWeight = "*weight"
MetaLeastCost = "*least_cost"

View File

@@ -48,6 +48,7 @@ var (
ErrNoActiveSession = errors.New("NO_ACTIVE_SESSION")
ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED")
ErrMaxUsageExceeded = errors.New("MAX_USAGE_EXCEEDED")
ErrUnallocatedResource = errors.New("UNALLOCATED_RESOURCE")
)
// NewCGRError initialises a new CGRError