diff --git a/apier/v1/resourcesv1.go b/apier/v1/resourcesv1.go index 57aa2b6cf..ff6d0f0f5 100644 --- a/apier/v1/resourcesv1.go +++ b/apier/v1/resourcesv1.go @@ -48,12 +48,12 @@ func (rsv1 *ResourceSv1) AuthorizeResources(args utils.ArgRSv1ResourceUsage, rep } // V1InitiateResourceUsage records usage for an event -func (rsv1 *ResourceSv1) AllocateResource(args utils.ArgRSv1ResourceUsage, reply *string) error { +func (rsv1 *ResourceSv1) AllocateResources(args utils.ArgRSv1ResourceUsage, reply *string) error { return rsv1.rls.V1AllocateResource(args, reply) } // V1TerminateResourceUsage releases usage for an event -func (rsv1 *ResourceSv1) ReleaseResource(args utils.ArgRSv1ResourceUsage, reply *string) error { +func (rsv1 *ResourceSv1) ReleaseResources(args utils.ArgRSv1ResourceUsage, reply *string) error { return rsv1.rls.V1ReleaseResource(args, reply) } diff --git a/apier/v1/smg.go b/apier/v1/smg.go new file mode 100644 index 000000000..2837b0794 --- /dev/null +++ b/apier/v1/smg.go @@ -0,0 +1,66 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "github.com/cgrates/cgrates/sessionmanager" + "github.com/cgrates/cgrates/utils" +) + +func NewSMGv1(sm *sessionmanager.SMGeneric) *SMGv1 { + return &SMGv1{SMG: sm} +} + +// Exports RPC from SMGv1 +type SMGv1 struct { + SMG *sessionmanager.SMGeneric +} + +// Publishes BiJSONRPC methods exported by SMGv1 +func (smgv1 *SMGv1) Handlers() map[string]interface{} { + return map[string]interface{}{ + "SMGv1.InitiateSession": smgv1.SMG.BiRPCv1InitiateSession, + "SMGv1.UpdateSession": smgv1.SMG.BiRPCv1UpdateSession, + "SMGv1.TerminateSession": smgv1.SMG.BiRPCv1TerminateSession, + "SMGv1.ProcessCDR": smgv1.SMG.BiRPCv1ProcessCDR, + } +} + +// Called on session start, returns the maximum number of seconds the session can last +func (smgv1 *SMGv1) InitiateSession(args *sessionmanager.V1InitSessionArgs, + rply *sessionmanager.V1InitSessionReply) error { + return smgv1.SMG.BiRPCv1InitiateSession(nil, args, rply) +} + +// Interim updates, returns remaining duration from the rater +func (smgv1 *SMGv1) UpdateSession(args *sessionmanager.V1UpdateSessionArgs, + rply *sessionmanager.V1UpdateSessionReply) error { + return smgv1.SMG.BiRPCv1UpdateSession(nil, args, rply) +} + +// Called on session end, should stop debit loop +func (smgv1 *SMGv1) TerminateSession(args *sessionmanager.V1TerminateSessionArgs, + rply *string) error { + return smgv1.SMG.BiRPCv1TerminateSession(nil, args, rply) +} + +// Called on session end, should stop debit loop +func (smgv1 *SMGv1) ProcessCDR(cgrEv utils.CGREvent, rply *string) error { + return smgv1.SMG.BiRPCv1ProcessCDR(nil, cgrEv, rply) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 845007628..f9546628b 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -130,36 +130,64 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne } } -func startSmGeneric(internalSMGChan, internalRaterChan, - internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { +func startSmGeneric(internalSMGChan, internalRaterChan, internalResourceSChan, internalSupplierSChan, + internalAttrSChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS SMGeneric service.") var err error - var ralsConns, cdrsConn *rpcclient.RpcClientPool - if len(cfg.SmGenericConfig.RALsConns) != 0 { + var ralsConns, resSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool + if len(cfg.SMGConfig.RALsConns) != 0 { ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.SmGenericConfig.RALsConns, internalRaterChan, cfg.InternalTtl) + cfg.SMGConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RALs: %s", err.Error())) exitChan <- true return } } - if len(cfg.SmGenericConfig.CDRsConns) != 0 { + if len(cfg.SMGConfig.ResSConns) != 0 { + ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.SMGConfig.ResSConns, internalResourceSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to ResourceS: %s", err.Error())) + exitChan <- true + return + } + } + if len(cfg.SMGConfig.SupplSConns) != 0 { + ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.SMGConfig.SupplSConns, internalSupplierSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to SupplierS: %s", err.Error())) + exitChan <- true + return + } + } + if len(cfg.SMGConfig.AttrSConns) != 0 { + ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.SMGConfig.AttrSConns, internalAttrSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to AttributeS: %s", err.Error())) + exitChan <- true + return + } + } + if len(cfg.SMGConfig.CDRsConns) != 0 { cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.SmGenericConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) + cfg.SMGConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RALs: %s", err.Error())) exitChan <- true return } } - smgReplConns, err := sessionmanager.NewSMGReplicationConns(cfg.SmGenericConfig.SMGReplicationConns, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout) + smgReplConns, err := sessionmanager.NewSMGReplicationConns(cfg.SMGConfig.SMGReplicationConns, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMGReplicationConnection error: <%s>", err.Error())) exitChan <- true return } - sm := sessionmanager.NewSMGeneric(cfg, ralsConns, cdrsConn, smgReplConns, cfg.DefaultTimezone) + sm := sessionmanager.NewSMGeneric(cfg, ralsConns, resSConns, suplSConns, + attrSConns, cdrsConn, smgReplConns, cfg.DefaultTimezone) if err = sm.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) } @@ -169,37 +197,28 @@ func startSmGeneric(internalSMGChan, internalRaterChan, smgRpc := v1.NewSMGenericV1(sm) server.RpcRegister(smgRpc) server.RpcRegister(&v2.SMGenericV2{*smgRpc}) + smgv1 := v1.NewSMGv1(sm) // methods with multiple options + server.RpcRegister(smgv1) // Register BiRpc handlers - if cfg.SmGenericConfig.ListenBijson != "" { + if cfg.SMGConfig.ListenBijson != "" { smgBiRpc := v1.NewSMGenericBiRpcV1(sm) for method, handler := range smgBiRpc.Handlers() { server.BiRPCRegisterName(method, handler) } - server.ServeBiJSON(cfg.SmGenericConfig.ListenBijson) + for method, handler := range smgv1.Handlers() { + server.BiRPCRegisterName(method, handler) + } + server.ServeBiJSON(cfg.SMGConfig.ListenBijson) exitChan <- true } } func startSMAsterisk(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) { - utils.Logger.Info("Starting CGRateS SMAsterisk service.") - /* - var smgConn *rpcclient.RpcClientPool - if len(cfg.SMAsteriskCfg().SMGConns) != 0 { - smgConn, err = engine.NewRPCPool(rpcclient.POOL_BROADCAST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.SMAsteriskCfg().SMGConns, internalSMGChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMG: %s", err.Error())) - exitChan <- true - return - } - } - */ + utils.Logger.Info("Starting CGRateS SM-Asterisk service.") smgRpcConn := <-internalSMGChan internalSMGChan <- smgRpcConn birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessionmanager.SMGeneric)) for connIdx := range cfg.SMAsteriskCfg().AsteriskConns { // Instantiate connections towards asterisk servers - smgRpcConn := <-internalSMGChan - internalSMGChan <- smgRpcConn sma, err := sessionmanager.NewSMAsterisk(cfg, connIdx, birpcClnt) if err != nil { utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) @@ -217,9 +236,9 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC var err error utils.Logger.Info("Starting CGRateS DiameterAgent service") var smgConn, pubsubConn *rpcclient.RpcClientPool - if len(cfg.DiameterAgentCfg().SMGenericConns) != 0 { + if len(cfg.DiameterAgentCfg().SMGConns) != 0 { smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan, cfg.InternalTtl) + cfg.DiameterAgentCfg().SMGConns, internalSMGChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMG: %s", err.Error())) exitChan <- true @@ -251,10 +270,10 @@ func startRadiusAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitCh var err error utils.Logger.Info("Starting CGRateS RadiusAgent service") var smgConn *rpcclient.RpcClientPool - if len(cfg.RadiusAgentCfg().SMGenericConns) != 0 { + if len(cfg.RadiusAgentCfg().SMGConns) != 0 { smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.RadiusAgentCfg().SMGenericConns, internalSMGChan, cfg.InternalTtl) + cfg.RadiusAgentCfg().SMGConns, internalSMGChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMG: %s", err.Error())) exitChan <- true @@ -273,39 +292,13 @@ func startRadiusAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitCh exitChan <- true } -func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { +func startSmFreeSWITCH(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) { var err error - utils.Logger.Info("Starting CGRateS SMFreeSWITCH service") - var ralsConn, cdrsConn, rlsConn *rpcclient.RpcClientPool - if len(cfg.SmFsConfig.RALsConns) != 0 { - ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.SmFsConfig.RALsConns, internalRaterChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) - exitChan <- true - return - } - } - if len(cfg.SmFsConfig.CDRsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.SmFsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) - exitChan <- true - return - } - } - if len(cfg.SmFsConfig.RLsConns) != 0 { - rlsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.SmFsConfig.RLsConns, rlsChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RLs: %s", err.Error())) - exitChan <- true - return - } - } - sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, ralsConn, cdrsConn, rlsConn, cfg.DefaultTimezone) - smRpc.SMs = append(smRpc.SMs, sm) + utils.Logger.Info("Starting CGRateS SM-FreeSWITCH service") + smgRpcConn := <-internalSMGChan + internalSMGChan <- smgRpcConn + birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessionmanager.SMGeneric)) + sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, birpcClnt, cfg.DefaultTimezone) if err = sm.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) } @@ -927,12 +920,13 @@ func main() { go startCdrcs(internalCdrSChan, internalRaterChan, exitChan) // Start SM-Generic - if cfg.SmGenericConfig.Enabled { - go startSmGeneric(internalSMGChan, internalRaterChan, internalCdrSChan, server, exitChan) + if cfg.SMGConfig.Enabled { + go startSmGeneric(internalSMGChan, internalRaterChan, internalRsChan, + internalSupplierSChan, internalAttributeSChan, internalCdrSChan, server, exitChan) } // Start SM-FreeSWITCH if cfg.SmFsConfig.Enabled { - go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, internalRsChan, cdrDb, exitChan) + go startSmFreeSWITCH(internalSMGChan, exitChan) // close all sessions on shutdown go shutdownSessionmanagerSingnalHandler(exitChan) } @@ -947,12 +941,6 @@ func main() { go startSmOpenSIPS(internalRaterChan, internalCdrSChan, cdrDb, exitChan) } - // Register session manager service // FixMe: make sure this is thread safe - if cfg.SmGenericConfig.Enabled || cfg.SmFsConfig.Enabled || cfg.SmKamConfig.Enabled || cfg.SmOsipsConfig.Enabled || cfg.SMAsteriskCfg().Enabled { // Register SessionManagerV1 service - smRpc = new(v1.SessionManagerV1) - server.RpcRegister(smRpc) - } - if cfg.SMAsteriskCfg().Enabled { go startSMAsterisk(internalSMGChan, exitChan) } diff --git a/config/cfg_data.json b/config/cfg_data.json index a7724e2ac..9e2d6c45b 100644 --- a/config/cfg_data.json +++ b/config/cfg_data.json @@ -51,4 +51,16 @@ ], }, +"resources": { + "enabled": true, +}, + +"suppliers": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + } diff --git a/config/config.go b/config/config.go index b833988f5..da4b484ef 100755 --- a/config/config.go +++ b/config/config.go @@ -414,6 +414,21 @@ func (self *CGRConfig) checkConfigSanity() error { return errors.New(" RALs not enabled but requested by SMGeneric component.") } } + for _, conn := range self.SMGConfig.ResSConns { + if conn.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { + return errors.New(" ResourceS not enabled but requested by SMGeneric component.") + } + } + for _, conn := range self.SMGConfig.SupplSConns { + if conn.Address == utils.MetaInternal && !self.supplierSCfg.Enabled { + return errors.New(" SupplierS not enabled but requested by SMGeneric component.") + } + } + for _, conn := range self.SMGConfig.AttrSConns { + if conn.Address == utils.MetaInternal && !self.attributeSCfg.Enabled { + return errors.New(" AttributeS not enabled but requested by SMGeneric component.") + } + } if len(self.SMGConfig.CDRsConns) == 0 { return errors.New(" CDRs definition is mandatory!") } diff --git a/config/config_defaults.go b/config/config_defaults.go index 1a87b1f1f..73436dcfb 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -283,6 +283,15 @@ const CGRATES_CFG_JSON = ` "rals_conns": [ {"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013> ], + "resources_conns": [ + {"address": "*internal"} // address where to reach the ResourceS <""|*internal|127.0.0.1:2013> + ], + "suppliers_conns": [ + {"address": "*internal"} // address where to reach the SupplierS <""|*internal|127.0.0.1:2013> + ], + "attributes_conns": [ + {"address": "*internal"} // address where to reach the AttributeS <""|*internal|127.0.0.1:2013> + ], "cdrs_conns": [ {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234> ], diff --git a/config/config_json_test.go b/config/config_json_test.go index db22c003c..a611097bf 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -491,6 +491,18 @@ func TestSmgJsonCfg(t *testing.T) { &HaPoolJsonCfg{ Address: utils.StringPointer(utils.MetaInternal), }}, + Resources_conns: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Address: utils.StringPointer(utils.MetaInternal), + }}, + Suppliers_conns: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Address: utils.StringPointer(utils.MetaInternal), + }}, + Attributes_conns: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Address: utils.StringPointer(utils.MetaInternal), + }}, Cdrs_conns: &[]*HaPoolJsonCfg{ &HaPoolJsonCfg{ Address: utils.StringPointer(utils.MetaInternal), diff --git a/config/config_test.go b/config/config_test.go index 34667a37a..963161b24 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -447,10 +447,18 @@ func TestCgrCfgJSONDefaultsCdreProfiles(t *testing.T) { func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) { eSmGeCfg := &SMGConfig{ - Enabled: false, - ListenBijson: "127.0.0.1:2014", - RALsConns: []*HaPoolConfig{&HaPoolConfig{Address: "*internal"}}, - CDRsConns: []*HaPoolConfig{&HaPoolConfig{Address: "*internal"}}, + Enabled: false, + ListenBijson: "127.0.0.1:2014", + RALsConns: []*HaPoolConfig{ + &HaPoolConfig{Address: "*internal"}}, + ResSConns: []*HaPoolConfig{ + &HaPoolConfig{Address: "*internal"}}, + SupplSConns: []*HaPoolConfig{ + &HaPoolConfig{Address: "*internal"}}, + AttrSConns: []*HaPoolConfig{ + &HaPoolConfig{Address: "*internal"}}, + CDRsConns: []*HaPoolConfig{ + &HaPoolConfig{Address: "*internal"}}, SMGReplicationConns: []*HaPoolConfig{}, DebitInterval: 0 * time.Second, MinCallDuration: 0 * time.Second, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 396387be6..a03d24bf7 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -198,8 +198,11 @@ type SmgJsonCfg struct { Enabled *bool Listen_bijson *string Rals_conns *[]*HaPoolJsonCfg + Resources_conns *[]*HaPoolJsonCfg + Suppliers_conns *[]*HaPoolJsonCfg Cdrs_conns *[]*HaPoolJsonCfg Smg_replication_conns *[]*HaPoolJsonCfg + Attributes_conns *[]*HaPoolJsonCfg Debit_interval *string Min_call_duration *string Max_call_duration *string diff --git a/config/smconfig.go b/config/smconfig.go index fdf6e1314..d1978c1a1 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -92,6 +92,9 @@ type SMGConfig struct { Enabled bool ListenBijson string RALsConns []*HaPoolConfig + ResSConns []*HaPoolConfig + SupplSConns []*HaPoolConfig + AttrSConns []*HaPoolConfig CDRsConns []*HaPoolConfig SMGReplicationConns []*HaPoolConfig DebitInterval time.Duration @@ -122,6 +125,27 @@ func (self *SMGConfig) loadFromJsonCfg(jsnCfg *SmgJsonCfg) error { self.RALsConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCfg.Resources_conns != nil { + self.ResSConns = make([]*HaPoolConfig, len(*jsnCfg.Resources_conns)) + for idx, jsnHaCfg := range *jsnCfg.Resources_conns { + self.ResSConns[idx] = NewDfltHaPoolConfig() + self.ResSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } + if jsnCfg.Suppliers_conns != nil { + self.SupplSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns)) + for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns { + self.SupplSConns[idx] = NewDfltHaPoolConfig() + self.SupplSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } + if jsnCfg.Attributes_conns != nil { + self.AttrSConns = make([]*HaPoolConfig, len(*jsnCfg.Attributes_conns)) + for idx, jsnHaCfg := range *jsnCfg.Attributes_conns { + self.AttrSConns[idx] = NewDfltHaPoolConfig() + self.AttrSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } if jsnCfg.Cdrs_conns != nil { self.CDRsConns = make([]*HaPoolConfig, len(*jsnCfg.Cdrs_conns)) for idx, jsnHaCfg := range *jsnCfg.Cdrs_conns { diff --git a/engine/libsuppliers.go b/engine/libsuppliers.go index 1dcc481d4..82ca3762f 100644 --- a/engine/libsuppliers.go +++ b/engine/libsuppliers.go @@ -26,6 +26,12 @@ import ( "github.com/cgrates/cgrates/utils" ) +// SupplierReply represents one supplier in +type SortedSupplier struct { + SupplierID string + SortingData map[string]interface{} // store here extra info like cost or stats +} + // SuppliersReply is returned as part of GetSuppliers call type SortedSuppliers struct { ProfileID string // Profile matched @@ -33,10 +39,13 @@ type SortedSuppliers struct { SortedSuppliers []*SortedSupplier // list of supplier IDs and SortingData data } -// SupplierReply represents one supplier in -type SortedSupplier struct { - SupplierID string - SortingData map[string]interface{} // store here extra info like cost or stats +// SupplierIDs returns list of suppliers +func (sSpls *SortedSuppliers) SupplierIDs() (sIDs []string) { + sIDs = make([]string, len(sSpls.SortedSuppliers)) + for i, spl := range sSpls.SortedSuppliers { + sIDs[i] = spl.SupplierID + } + return } // SortWeight is part of sort interface, sort based on Weight diff --git a/general_tests/fsevcorelate_test.go b/general_tests/fsevcorelate_test.go index fd125ddad..c778bea7d 100644 --- a/general_tests/fsevcorelate_test.go +++ b/general_tests/fsevcorelate_test.go @@ -213,7 +213,7 @@ variable_sip_local_sdp_str: v%3D0%0Ao%3DFreeSWITCH%201396951687%201396951689%20I var jsonCdr = []byte(`{"core-uuid":"feef0b51-7fdf-4c4a-878e-aff233752de2","channel_data":{"state":"CS_REPORTING","direction":"inbound","state_number":"11","flags":"0=1;1=1;3=1;36=1;37=1;39=1;42=1;47=1;52=1;73=1;75=1;94=1","caps":"1=1;2=1;3=1;4=1;5=1;6=1"},"variables":{"direction":"inbound","uuid":"86cfd6e2-dbda-45a3-b59d-f683ec368e8b","session_id":"5","sip_from_user":"1001","sip_from_uri":"1001@192.168.56.74","sip_from_host":"192.168.56.74","channel_name":"sofia/internal/1001@192.168.56.74","sip_local_network_addr":"192.168.56.74","sip_network_ip":"192.168.56.1","sip_network_port":"5060","sip_received_ip":"192.168.56.1","sip_received_port":"5060","sip_via_protocol":"udp","sip_authorized":"true","Event-Name":"REQUEST_PARAMS","Core-UUID":"feef0b51-7fdf-4c4a-878e-aff233752de2","FreeSWITCH-Hostname":"CGRTest","FreeSWITCH-Switchname":"CGRTest","FreeSWITCH-IPv4":"192.168.178.32","FreeSWITCH-IPv6":"::1","Event-Date-Local":"2014-04-08 21:10:21","Event-Date-GMT":"Tue, 08 Apr 2014 19:10:21 GMT","Event-Date-Timestamp":"1396984221278217","Event-Calling-File":"sofia.c","Event-Calling-Function":"sofia_handle_sip_i_invite","Event-Calling-Line-Number":"8076","Event-Sequence":"1423","sip_number_alias":"1001","sip_auth_username":"1001","sip_auth_realm":"192.168.56.74","number_alias":"1001","requested_domain_name":"192.168.56.66","record_stereo":"true","default_gateway":"example.com","default_areacode":"918","transfer_fallback_extension":"operator","toll_allow":"domestic,international,local","accountcode":"1001","user_context":"default","effective_caller_id_name":"Extension 1001","effective_caller_id_number":"1001","outbound_caller_id_name":"FreeSWITCH","outbound_caller_id_number":"0000000000","callgroup":"techsupport","user_name":"1001","domain_name":"192.168.56.66","sip_from_user_stripped":"1001","sofia_profile_name":"internal","recovery_profile_name":"internal","sip_req_user":"1002","sip_req_uri":"1002@192.168.56.74","sip_req_host":"192.168.56.74","sip_to_user":"1002","sip_to_uri":"1002@192.168.56.74","sip_to_host":"192.168.56.74","sip_contact_params":"transport=udp;registering_acc=192_168_56_74","sip_contact_user":"1001","sip_contact_port":"5060","sip_contact_uri":"1001@192.168.56.1:5060","sip_contact_host":"192.168.56.1","sip_via_host":"192.168.56.1","sip_via_port":"5060","presence_id":"1001@192.168.56.74","ep_codec_string":"G722@8000h@20i@64000b,PCMU@8000h@20i@64000b,PCMA@8000h@20i@64000b,GSM@8000h@20i@13200b","cgr_notify":"+AUTH_OK","max_forwards":"69","transfer_history":"1396984221:caefc538-5da4-4245-8716-112c706383d8:bl_xfer:1002/default/XML","transfer_source":"1396984221:caefc538-5da4-4245-8716-112c706383d8:bl_xfer:1002/default/XML","DP_MATCH":"ARRAY::1002|:1002","call_uuid":"86cfd6e2-dbda-45a3-b59d-f683ec368e8b","RFC2822_DATE":"Tue, 08 Apr 2014 21:10:21 +0200","dialed_extension":"1002","export_vars":"RFC2822_DATE,RFC2822_DATE,dialed_extension","ringback":"%(2000,4000,440,480)","transfer_ringback":"local_stream://moh","call_timeout":"30","hangup_after_bridge":"true","continue_on_fail":"true","called_party_callgroup":"techsupport","current_application_data":"user/1002@192.168.56.66","current_application":"bridge","dialed_user":"1002","dialed_domain":"192.168.56.66","inherit_codec":"true","originated_legs":"ARRAY::402f0929-fa14-4a5f-9642-3a1311bb4ddd;Outbound Call;1002|:402f0929-fa14-4a5f-9642-3a1311bb4ddd;Outbound Call;1002","rtp_use_codec_string":"G722,PCMU,PCMA,GSM","sip_use_codec_name":"G722","sip_use_codec_rate":"8000","sip_use_codec_ptime":"20","write_codec":"G722","write_rate":"16000","video_possible":"true","local_media_ip":"192.168.56.74","local_media_port":"32534","advertised_media_ip":"192.168.56.74","sip_use_pt":"9","rtp_use_ssrc":"1431080133","zrtp_secure_media_confirmed_audio":"true","zrtp_sas1_string_audio":"j6ff","switch_m_sdp":"v=0\r\no=1002 0 0 IN IP4 192.168.56.1\r\ns=-\r\nc=IN IP4 192.168.56.1\r\nt=0 0\r\nm=audio 5020 RTP/AVP 9 0 8 3 101\r\na=rtpmap:9 G722/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:3 GSM/8000\r\na=rtpmap:101 telephone-event/8000\r\n","read_codec":"G722","read_rate":"16000","endpoint_disposition":"ANSWER","originate_causes":"ARRAY::402f0929-fa14-4a5f-9642-3a1311bb4ddd;NONE|:402f0929-fa14-4a5f-9642-3a1311bb4ddd;NONE","originate_disposition":"SUCCESS","DIALSTATUS":"SUCCESS","last_bridge_to":"402f0929-fa14-4a5f-9642-3a1311bb4ddd","bridge_channel":"sofia/internal/sip:1002@192.168.56.1:5060","bridge_uuid":"402f0929-fa14-4a5f-9642-3a1311bb4ddd","signal_bond":"402f0929-fa14-4a5f-9642-3a1311bb4ddd","last_sent_callee_id_name":"Outbound Call","last_sent_callee_id_number":"1002","cgr_RequestType":"*prepaid","sip_reinvite_sdp":"v=0\r\no=1001 0 1 IN IP4 192.168.56.1\r\ns=-\r\nc=IN IP4 192.168.56.1\r\nt=0 0\r\nm=audio 5016 RTP/AVP 96 97 98 9 100 102 0 8 103 3 104 101\r\na=sendonly\r\na=rtpmap:96 opus/48000/2\r\na=fmtp:96 usedtx=1\r\na=rtpmap:97 SILK/24000\r\na=rtpmap:98 SILK/16000\r\na=rtpmap:9 G722/8000\r\na=rtpmap:100 speex/32000\r\na=rtpmap:102 speex/16000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:103 iLBC/8000\r\na=rtpmap:3 GSM/8000\r\na=rtpmap:104 speex/8000\r\na=rtpmap:101 telephone-event/8000\r\na=extmap:1 urn:ietf:params:rtp-hdrext:csrc-audio-level\r\na=zrtp-hash:1.10 722d57097aaabea2749ea8938472478f8d88645b23521fa5f8005a7a2bed8286\r\nm=video 0 RTP/AVP 105 99\r\n","switch_r_sdp":"v=0\r\no=1001 0 1 IN IP4 192.168.56.1\r\ns=-\r\nc=IN IP4 192.168.56.1\r\nt=0 0\r\nm=audio 5016 RTP/AVP 96 97 98 9 100 102 0 8 103 3 104 101\r\na=rtpmap:96 opus/48000/2\r\na=fmtp:96 usedtx=1\r\na=rtpmap:97 SILK/24000\r\na=rtpmap:98 SILK/16000\r\na=rtpmap:9 G722/8000\r\na=rtpmap:100 speex/32000\r\na=rtpmap:102 speex/16000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:103 iLBC/8000\r\na=rtpmap:3 GSM/8000\r\na=rtpmap:104 speex/8000\r\na=rtpmap:101 telephone-event/8000\r\na=sendonly\r\na=extmap:1 urn:ietf:params:rtp-hdrext:csrc-audio-level\r\na=zrtp-hash:1.10 722d57097aaabea2749ea8938472478f8d88645b23521fa5f8005a7a2bed8286\r\nm=video 0 RTP/AVP 105 99\r\n","r_sdp_audio_zrtp_hash":"1.10 722d57097aaabea2749ea8938472478f8d88645b23521fa5f8005a7a2bed8286","remote_media_ip":"192.168.56.1","remote_media_port":"5016","sip_audio_recv_pt":"9","dtmf_type":"rfc2833","sip_2833_send_payload":"101","sip_2833_recv_payload":"101","sip_local_sdp_str":"v=0\no=FreeSWITCH 1396951687 1396951690 IN IP4 192.168.56.74\ns=FreeSWITCH\nc=IN IP4 192.168.56.74\nt=0 0\nm=audio 32534 RTP/AVP 9 101\na=rtpmap:9 G722/8000\na=rtpmap:101 telephone-event/8000\na=fmtp:101 0-16\na=ptime:20\na=sendrecv\n","sip_to_tag":"rXc9vZpv9eFaF","sip_from_tag":"1afc7eca","sip_cseq":"3","sip_call_id":"6691dbf8ffdc02bdacee02bc305d5c71@0:0:0:0:0:0:0:0","sip_full_via":"SIP/2.0/UDP 192.168.56.1:5060;branch=z9hG4bK-323133-5d083abc0d3f327b9101586e71b5fce4","sip_from_display":"1001","sip_full_from":"\"1001\" ;tag=1afc7eca","sip_full_to":";tag=rXc9vZpv9eFaF","sip_term_status":"200","proto_specific_hangup_cause":"sip:200","sip_term_cause":"16","last_bridge_role":"originator","sip_user_agent":"Jitsi2.5.5065Linux","sip_hangup_disposition":"recv_bye","bridge_hangup_cause":"NORMAL_CLEARING","hangup_cause":"NORMAL_CLEARING","hangup_cause_q850":"16","digits_dialed":"none","start_stamp":"2014-04-08 21:10:21","profile_start_stamp":"2014-04-08 21:10:21","answer_stamp":"2014-04-08 21:10:27","bridge_stamp":"2014-04-08 21:10:27","hold_stamp":"2014-04-08 21:10:27","progress_stamp":"2014-04-08 21:10:21","progress_media_stamp":"2014-04-08 21:10:21","hold_events":"{{1396984227824182,1396984242247995}}","end_stamp":"2014-04-08 21:10:42","start_epoch":"1396984221","start_uepoch":"1396984221278217","profile_start_epoch":"1396984221","profile_start_uepoch":"1396984221377035","answer_epoch":"1396984227","answer_uepoch":"1396984227717006","bridge_epoch":"1396984227","bridge_uepoch":"1396984227737268","last_hold_epoch":"1396984227","last_hold_uepoch":"1396984227824167","hold_accum_seconds":"14","hold_accum_usec":"14423816","hold_accum_ms":"14423","resurrect_epoch":"0","resurrect_uepoch":"0","progress_epoch":"1396984221","progress_uepoch":"1396984221497331","progress_media_epoch":"1396984221","progress_media_uepoch":"1396984221517042","end_epoch":"1396984242","end_uepoch":"1396984242257026","last_app":"bridge","last_arg":"user/1002@192.168.56.66","caller_id":"\"1001\" <1001>","duration":"21","billsec":"15","progresssec":"0","answersec":"6","waitsec":"6","progress_mediasec":"0","flow_billsec":"21","mduration":"20979","billmsec":"14540","progressmsec":"219","answermsec":"6439","waitmsec":"6459","progress_mediamsec":"239","flow_billmsec":"20979","uduration":"20978809","billusec":"14540020","progressusec":"219114","answerusec":"6438789","waitusec":"6459051","progress_mediausec":"238825","flow_billusec":"20978809","rtp_audio_in_raw_bytes":"181360","rtp_audio_in_media_bytes":"180304","rtp_audio_in_packet_count":"1031","rtp_audio_in_media_packet_count":"1025","rtp_audio_in_skip_packet_count":"45","rtp_audio_in_jb_packet_count":"0","rtp_audio_in_dtmf_packet_count":"0","rtp_audio_in_cng_packet_count":"0","rtp_audio_in_flush_packet_count":"6","rtp_audio_in_largest_jb_size":"0","rtp_audio_out_raw_bytes":"165780","rtp_audio_out_media_bytes":"165780","rtp_audio_out_packet_count":"942","rtp_audio_out_media_packet_count":"942","rtp_audio_out_skip_packet_count":"0","rtp_audio_out_dtmf_packet_count":"0","rtp_audio_out_cng_packet_count":"0","rtp_audio_rtcp_packet_count":"0","rtp_audio_rtcp_octet_count":"0"},"app_log":{"applications":[{"app_name":"hash","app_data":"insert/192.168.56.66-spymap/1001/86cfd6e2-dbda-45a3-b59d-f683ec368e8b"},{"app_name":"hash","app_data":"insert/192.168.56.66-last_dial/1001/1002"},{"app_name":"hash","app_data":"insert/192.168.56.66-last_dial/global/86cfd6e2-dbda-45a3-b59d-f683ec368e8b"},{"app_name":"export","app_data":"RFC2822_DATE=Tue, 08 Apr 2014 21:10:21 +0200"},{"app_name":"park","app_data":""},{"app_name":"hash","app_data":"insert/192.168.56.66-spymap/1001/86cfd6e2-dbda-45a3-b59d-f683ec368e8b"},{"app_name":"hash","app_data":"insert/192.168.56.66-last_dial/1001/1002"},{"app_name":"hash","app_data":"insert/192.168.56.66-last_dial/global/86cfd6e2-dbda-45a3-b59d-f683ec368e8b"},{"app_name":"export","app_data":"RFC2822_DATE=Tue, 08 Apr 2014 21:10:21 +0200"},{"app_name":"export","app_data":"dialed_extension=1002"},{"app_name":"bind_meta_app","app_data":"1 b s execute_extension::dx XML features"},{"app_name":"bind_meta_app","app_data":"2 b s record_session::/var/lib/freeswitch/recordings/1001.2014-04-08-21-10-21.wav"},{"app_name":"bind_meta_app","app_data":"3 b s execute_extension::cf XML features"},{"app_name":"bind_meta_app","app_data":"4 b s execute_extension::att_xfer XML features"},{"app_name":"set","app_data":"ringback=%(2000,4000,440,480)"},{"app_name":"set","app_data":"transfer_ringback=local_stream://moh"},{"app_name":"set","app_data":"call_timeout=30"},{"app_name":"set","app_data":"hangup_after_bridge=true"},{"app_name":"set","app_data":"continue_on_fail=true"},{"app_name":"hash","app_data":"insert/192.168.56.66-call_return/1002/1001"},{"app_name":"hash","app_data":"insert/192.168.56.66-last_dial_ext/1002/86cfd6e2-dbda-45a3-b59d-f683ec368e8b"},{"app_name":"set","app_data":"called_party_callgroup=techsupport"},{"app_name":"hash","app_data":"insert/192.168.56.66-last_dial_ext/techsupport/86cfd6e2-dbda-45a3-b59d-f683ec368e8b"},{"app_name":"hash","app_data":"insert/192.168.56.66-last_dial_ext/global/86cfd6e2-dbda-45a3-b59d-f683ec368e8b"},{"app_name":"hash","app_data":"insert/192.168.56.66-last_dial/techsupport/86cfd6e2-dbda-45a3-b59d-f683ec368e8b"},{"app_name":"bridge","app_data":"user/1002@192.168.56.66"}]},"callflow":{"dialplan":"XML","profile_index":"2","extension":{"name":"global","number":"1002","applications":[{"app_name":"hash","app_data":"insert/${domain_name}-spymap/${caller_id_number}/${uuid}"},{"app_name":"hash","app_data":"insert/${domain_name}-last_dial/${caller_id_number}/${destination_number}"},{"app_name":"hash","app_data":"insert/${domain_name}-last_dial/global/${uuid}"},{"app_name":"export","app_data":"RFC2822_DATE=${strftime(%a, %d %b %Y %T %z)}"},{"app_name":"export","app_data":"dialed_extension=1002"},{"app_name":"bind_meta_app","app_data":"1 b s execute_extension::dx XML features"},{"app_name":"bind_meta_app","app_data":"2 b s record_session::/var/lib/freeswitch/recordings/${caller_id_number}.${strftime(%Y-%m-%d-%H-%M-%S)}.wav"},{"app_name":"bind_meta_app","app_data":"3 b s execute_extension::cf XML features"},{"app_name":"bind_meta_app","app_data":"4 b s execute_extension::att_xfer XML features"},{"app_name":"set","app_data":"ringback=${us-ring}"},{"app_name":"set","app_data":"transfer_ringback=local_stream://moh"},{"app_name":"set","app_data":"call_timeout=30"},{"app_name":"set","app_data":"hangup_after_bridge=true"},{"app_name":"set","app_data":"continue_on_fail=true"},{"app_name":"hash","app_data":"insert/${domain_name}-call_return/${dialed_extension}/${caller_id_number}"},{"app_name":"hash","app_data":"insert/${domain_name}-last_dial_ext/${dialed_extension}/${uuid}"},{"app_name":"set","app_data":"called_party_callgroup=${user_data(${dialed_extension}@${domain_name} var callgroup)}"},{"app_name":"hash","app_data":"insert/${domain_name}-last_dial_ext/${called_party_callgroup}/${uuid}"},{"app_name":"hash","app_data":"insert/${domain_name}-last_dial_ext/global/${uuid}"},{"app_name":"hash","app_data":"insert/${domain_name}-last_dial/${called_party_callgroup}/${uuid}"},{"app_name":"bridge","app_data":"user/${dialed_extension}@${domain_name}"},{"last_executed":"true","app_name":"answer","app_data":""},{"app_name":"sleep","app_data":"1000"},{"app_name":"bridge","app_data":"loopback/app=voicemail:default ${domain_name} ${dialed_extension}"}],"current_app":"answer"},"caller_profile":{"username":"1001","dialplan":"XML","caller_id_name":"1001","ani":"1001","aniii":"","caller_id_number":"1001","network_addr":"192.168.56.1","rdnis":"1002","destination_number":"1002","uuid":"86cfd6e2-dbda-45a3-b59d-f683ec368e8b","source":"mod_sofia","context":"default","chan_name":"sofia/internal/1001@192.168.56.74","originatee":{"originatee_caller_profiles":[{"username":"1001","dialplan":"XML","caller_id_name":"Extension 1001","ani":"1001","aniii":"","caller_id_number":"1001","network_addr":"192.168.56.1","rdnis":"1002","destination_number":"1002","uuid":"402f0929-fa14-4a5f-9642-3a1311bb4ddd","source":"mod_sofia","context":"default","chan_name":"sofia/internal/sip:1002@192.168.56.1:5060"},{"username":"1001","dialplan":"XML","caller_id_name":"Extension 1001","ani":"1001","aniii":"","caller_id_number":"1001","network_addr":"192.168.56.1","rdnis":"1002","destination_number":"1002","uuid":"402f0929-fa14-4a5f-9642-3a1311bb4ddd","source":"mod_sofia","context":"default","chan_name":"sofia/internal/sip:1002@192.168.56.1:5060"}]}},"times":{"created_time":"1396984221278217","profile_created_time":"1396984221377035","progress_time":"1396984221497331","progress_media_time":"1396984221517042","answered_time":"1396984227717006","hangup_time":"1396984242257026","resurrect_time":"0","transfer_time":"0"}},"callflow":{"dialplan":"XML","profile_index":"1","extension":{"name":"global","number":"1002","applications":[{"app_name":"hash","app_data":"insert/${domain_name}-spymap/${caller_id_number}/${uuid}"},{"app_name":"hash","app_data":"insert/${domain_name}-last_dial/${caller_id_number}/${destination_number}"},{"app_name":"hash","app_data":"insert/${domain_name}-last_dial/global/${uuid}"},{"app_name":"export","app_data":"RFC2822_DATE=${strftime(%a, %d %b %Y %T %z)}"},{"app_name":"park","app_data":""}]},"caller_profile":{"username":"1001","dialplan":"XML","caller_id_name":"1001","ani":"1001","aniii":"","caller_id_number":"1001","network_addr":"192.168.56.1","rdnis":"","destination_number":"1002","uuid":"86cfd6e2-dbda-45a3-b59d-f683ec368e8b","source":"mod_sofia","context":"default","chan_name":"sofia/internal/1001@192.168.56.74"},"times":{"created_time":"1396984221278217","profile_created_time":"1396984221278217","progress_time":"0","progress_media_time":"0","answered_time":"0","hangup_time":"0","resurrect_time":"0","transfer_time":"1396984221377035"}}}`) func TestEvCorelate(t *testing.T) { - answerEv := new(sessionmanager.FSEvent).AsEvent(answerEvent) + answerEv := sessionmanager.NewFSEvent(answerEvent) if answerEv.GetName() != "CHANNEL_ANSWER" { t.Error("Event not parsed correctly: ", answerEv) } @@ -543,7 +543,7 @@ var jsonCdr2 = []byte(`{"core-uuid":"651a8db2-4f67-4cf8-b622-169e8a482e50","swit // Make sure that both hangup and json cdr produce the same CGR primary fields func TestEvCdrCorelate(t *testing.T) { - hangupEv := new(sessionmanager.FSEvent).AsEvent(hangupEv) + hangupEv := sessionmanager.NewFSEvent(hangupEv) if hangupEv.GetName() != "CHANNEL_HANGUP_COMPLETE" { t.Error("Event not parsed correctly: ", hangupEv) } diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index c4eadfdf9..3477d605d 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -72,6 +72,12 @@ const ( PDD_NOMEDIA_MS = "variable_progressmsec" IGNOREPARK = "variable_cgr_ignorepark" FS_VARPREFIX = "variable_" + VarCGRSubsystems = "variable_cgr_subsystems" + SubSAccountS = "accounts" + SubSSupplierS = "suppliers" + SubSResourceS = "resources" + SubSAttributeS = "attributes" + CGRResourcesAllowed = "cgr_resources_allowed" VAR_CGR_DISCONNECT_CAUSE = "variable_" + utils.CGR_DISCONNECT_CAUSE VAR_CGR_CMPUTELCR = "variable_" + utils.CGR_COMPUTELCR @@ -86,11 +92,8 @@ func (fsev FSEvent) String() (result string) { return } -// Loads the new event data from a body of text containing the key value proprieties. -// It stores the parsed proprieties in the internal map. -func (fsev FSEvent) AsEvent(body string) engine.Event { - fsev = fsock.FSEventStrToMap(body, nil) - return fsev +func NewFSEvent(strEv string) (fsev FSEvent) { + return fsock.FSEventStrToMap(strEv, nil) } func (fsev FSEvent) GetName() string { @@ -415,6 +418,87 @@ func (fsev FSEvent) AsMapStringIface() (map[string]interface{}, error) { return nil, utils.ErrNotImplemented } +// V1AuthorizeArgs returns the arguments used in SMGv1.Authorize +func (fsev FSEvent) V1AuthorizeArgs() (args *V1AuthorizeArgs) { + args = &V1AuthorizeArgs{ // defaults + GetMaxUsage: true, + } + subsystems, has := fsev[VarCGRSubsystems] + if !has { + return + } + if strings.Index(subsystems, SubSAccountS) == -1 { + args.GetMaxUsage = false + } + if strings.Index(subsystems, SubSResourceS) != -1 { + args.CheckResources = true + } + if strings.Index(subsystems, SubSSupplierS) != -1 { + args.GetSuppliers = true + } + if strings.Index(subsystems, SubSAttributeS) != -1 { + args.GetAttributes = true + } + return +} + +// V2InitSessionArgs returns the arguments used in SMGv1.InitSession +func (fsev FSEvent) V1InitSessionArgs() (args *V1InitSessionArgs) { + args = &V1InitSessionArgs{ // defaults + InitSession: true, + } + subsystems, has := fsev[VarCGRSubsystems] + if !has { + return + } + if strings.Index(subsystems, SubSAccountS) == -1 { + args.InitSession = false + } + if strings.Index(subsystems, SubSResourceS) != -1 { + args.AllocateResources = true + } + if strings.Index(subsystems, SubSAttributeS) != -1 { + args.GetAttributes = true + } + return +} + +// V1UpdateSessionArgs returns the arguments used in SMGv1.UpdateSession +func (fsev FSEvent) V1UpdateSessionArgs() (args *V1UpdateSessionArgs) { + args = &V1UpdateSessionArgs{ // defaults + UpdateSession: true, + } + subsystems, has := fsev[VarCGRSubsystems] + if !has { + return + } + if strings.Index(subsystems, SubSAccountS) == -1 { + args.UpdateSession = false + } + if strings.Index(subsystems, SubSResourceS) != -1 { + args.AllocateResources = true + } + return +} + +// V1TerminateSessionArgs returns the arguments used in SMGv1.TerminateSession +func (fsev FSEvent) V1TerminateSessionArgs() (args *V1TerminateSessionArgs) { + args = &V1TerminateSessionArgs{ // defaults + TerminateSession: true, + } + subsystems, has := fsev[VarCGRSubsystems] + if !has { + return + } + if strings.Index(subsystems, SubSAccountS) == -1 { + args.TerminateSession = false + } + if strings.Index(subsystems, SubSResourceS) != -1 { + args.ReleaseResources = true + } + return +} + // Converts a slice of strings into a FS array string, contains len(array) at first index since FS does not support len(ARRAY::) for now func SliceAsFsArray(slc []string) string { arry := "" diff --git a/sessionmanager/fsevent_test.go b/sessionmanager/fsevent_test.go index e0ff37b57..dd15538cb 100644 --- a/sessionmanager/fsevent_test.go +++ b/sessionmanager/fsevent_test.go @@ -354,11 +354,11 @@ Task-ID: 2 Task-Desc: heartbeat Task-Group: core Task-Runtime: 1349437318` - ev := new(FSEvent).AsEvent(body) + ev := NewFSEvent(body) if ev.GetName() != "RE_SCHEDULE" { t.Error("Event not parsed correctly: ", ev) } - l := len(ev.(FSEvent)) + l := len(ev) if l != 17 { t.Error("Incorrect number of event fields: ", l) } @@ -366,7 +366,7 @@ Task-Runtime: 1349437318` // Detects if any of the parsers do not return static values func TestEventParseStatic(t *testing.T) { - ev := new(FSEvent).AsEvent("") + ev := NewFSEvent("") setupTime, _ := ev.GetSetupTime("^2013-12-07 08:42:24", "") answerTime, _ := ev.GetAnswerTime("^2013-12-07 08:42:24", "") dur, _ := ev.GetDuration("^60s") @@ -413,7 +413,7 @@ Task-Group: core Task-Runtime: 1349437318` cfg, _ := config.NewDefaultCGRConfig() config.SetCgrConfig(cfg) - ev := new(FSEvent).AsEvent(body) + ev := NewFSEvent(body) setupTime, _ := ev.GetSetupTime("Event-Date-Local", "") answerTime, _ := ev.GetAnswerTime("Event-Date-Local", "") dur, _ := ev.GetDuration("Event-Calling-Line-Number") @@ -449,7 +449,7 @@ Caller-Channel-Created-Time: 0 Caller-Channel-Answered-Time Task-Runtime: 1349437318` var nilTime time.Time - ev := new(FSEvent).AsEvent(body) + ev := NewFSEvent(body) if setupTime, err := ev.GetSetupTime("", ""); err != nil { t.Error("Error when parsing empty setupTime") } else if setupTime != nilTime { @@ -465,7 +465,7 @@ Task-Runtime: 1349437318` func TestParseFsHangup(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() config.SetCgrConfig(cfg) - ev := new(FSEvent).AsEvent(hangupEv) + ev := NewFSEvent(hangupEv) setupTime, _ := ev.GetSetupTime(utils.META_DEFAULT, "") answerTime, _ := ev.GetAnswerTime(utils.META_DEFAULT, "") dur, _ := ev.GetDuration(utils.META_DEFAULT) @@ -494,7 +494,7 @@ func TestParseFsHangup(t *testing.T) { func TestParseEventValue(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() config.SetCgrConfig(cfg) - ev := new(FSEvent).AsEvent(hangupEv) + ev := NewFSEvent(hangupEv) if cgrid := ev.ParseEventValue(&utils.RSRField{Id: utils.CGRID}, ""); cgrid != "164b0422fdc6a5117031b427439482c6a4f90e41" { t.Error("Unexpected cgrid parsed", cgrid) } @@ -559,61 +559,10 @@ func TestParseEventValue(t *testing.T) { } } -/* -func TestPassesFieldFilterDn1(t *testing.T) { - body := `Event-Name: RE_SCHEDULE -Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d -FreeSWITCH-Hostname: h1.ip-switch.net -FreeSWITCH-Switchname: h1.ip-switch.net -FreeSWITCH-IPv4: 88.198.12.156 -Caller-Username: futurem0005` - ev := new(FSEvent).AsEvent(body) - acntPrefxFltr, _ := utils.NewRSRField(`~Account:s/^\w+[shmp]\d{4}$//`) - if pass, _ := ev.PassesFieldFilter(acntPrefxFltr); !pass { - t.Error("Not passing valid filter") - } - body = `Event-Name: RE_SCHEDULE -Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d -FreeSWITCH-Hostname: h1.ip-switch.net -FreeSWITCH-Switchname: h1.ip-switch.net -FreeSWITCH-IPv4: 88.198.12.156 -Caller-Username: futurem00005` - ev = new(FSEvent).AsEvent(body) - if pass, _ := ev.PassesFieldFilter(acntPrefxFltr); pass { - t.Error("Should not pass filter") - } - body = `Event-Name: RE_SCHEDULE -Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d -FreeSWITCH-Hostname: h1.ip-switch.net -FreeSWITCH-Switchname: h1.ip-switch.net -FreeSWITCH-IPv4: 88.198.12.156 -Caller-Username: 0402129281` - ev = new(FSEvent).AsEvent(body) - acntPrefxFltr, _ = utils.NewRSRField(`~Account:s/^0\d{9}$//`) - if pass, _ := ev.PassesFieldFilter(acntPrefxFltr); !pass { - t.Error("Not passing valid filter") - } - acntPrefxFltr, _ = utils.NewRSRField(`~account:s/^0(\d{9})$/placeholder/`) - if pass, _ := ev.PassesFieldFilter(acntPrefxFltr); pass { - t.Error("Should not pass filter") - } - body = `Event-Name: RE_SCHEDULE -Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d -FreeSWITCH-Hostname: h1.ip-switch.net -FreeSWITCH-Switchname: h1.ip-switch.net -FreeSWITCH-IPv4: 88.198.12.156 -Caller-Username: 04021292812` - ev = new(FSEvent).AsEvent(body) - if pass, _ := ev.PassesFieldFilter(acntPrefxFltr); pass { - t.Error("Should not pass filter") - } -} -*/ - func TestFsEvAsCDR(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() config.SetCgrConfig(cfg) - ev := new(FSEvent).AsEvent(hangupEv) + ev := NewFSEvent(hangupEv) setupTime, _ := utils.ParseTimeDetectLayout("1436280728", "") aTime, _ := utils.ParseTimeDetectLayout("1436280728", "") eStoredCdr := &engine.CDR{CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41", @@ -630,10 +579,15 @@ func TestFsEvAsCDR(t *testing.T) { func TestFsEvGetExtraFields(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() - cfg.SmFsConfig.ExtraFields = []*utils.RSRField{&utils.RSRField{Id: "Channel-Read-Codec-Name"}, &utils.RSRField{Id: "Channel-Write-Codec-Name"}, &utils.RSRField{Id: "NonExistingHeader"}} + cfg.SmFsConfig.ExtraFields = []*utils.RSRField{ + &utils.RSRField{Id: "Channel-Read-Codec-Name"}, + &utils.RSRField{Id: "Channel-Write-Codec-Name"}, + &utils.RSRField{Id: "NonExistingHeader"}} config.SetCgrConfig(cfg) - ev := new(FSEvent).AsEvent(hangupEv) - expectedExtraFields := map[string]string{"Channel-Read-Codec-Name": "SPEEX", "Channel-Write-Codec-Name": "SPEEX", "NonExistingHeader": ""} + ev := NewFSEvent(hangupEv) + expectedExtraFields := map[string]string{ + "Channel-Read-Codec-Name": "SPEEX", + "Channel-Write-Codec-Name": "SPEEX", "NonExistingHeader": ""} if extraFields := ev.GetExtraFields(); !reflect.DeepEqual(expectedExtraFields, extraFields) { t.Errorf("Expecting: %+v, received: %+v", expectedExtraFields, extraFields) } diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 29b03ec50..59b806a97 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -21,30 +21,20 @@ package sessionmanager import ( "errors" "fmt" - "reflect" - "strconv" - "strings" "time" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/fsock" - "github.com/cgrates/rpcclient" ) -func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs, rls rpcclient.RpcClientConnection, timezone string) *FSSessionManager { - if rls != nil && reflect.ValueOf(rls).IsNil() { - rls = nil - } +func NewFSSessionManager(smFsConfig *config.SmFsConfig, + smg *utils.BiRPCInternalClient, timezone string) *FSSessionManager { return &FSSessionManager{ cfg: smFsConfig, conns: make(map[string]*fsock.FSock), senderPools: make(map[string]*fsock.FSockPool), - rater: rater, - cdrsrv: cdrs, - rls: rls, - sessions: NewSessions(), + smg: smg, timezone: timezone, } } @@ -55,22 +45,18 @@ type FSSessionManager struct { cfg *config.SmFsConfig conns map[string]*fsock.FSock // Keep the list here for connection management purposes senderPools map[string]*fsock.FSockPool // Keep sender pools here - rater rpcclient.RpcClientConnection - cdrsrv rpcclient.RpcClientConnection - rls rpcclient.RpcClientConnection - - sessions *Sessions - timezone string + smg *utils.BiRPCInternalClient + timezone string } func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) { ca := func(body, connId string) { - ev := new(FSEvent).AsEvent(body) - sm.onChannelAnswer(ev, connId) + sm.onChannelAnswer( + NewFSEvent(body), connId) } ch := func(body, connId string) { - ev := new(FSEvent).AsEvent(body) - sm.onChannelHangupComplete(ev) + sm.onChannelHangupComplete( + NewFSEvent(body), connId) } handlers := map[string][]func(string, string){ "CHANNEL_ANSWER": []func(string, string){ca}, @@ -78,8 +64,8 @@ func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) { } if sm.cfg.SubscribePark { cp := func(body, connId string) { - ev := new(FSEvent).AsEvent(body) - sm.onChannelPark(ev, connId) + sm.onChannelPark( + NewFSEvent(body), connId) } handlers["CHANNEL_PARK"] = []func(string, string){cp} } @@ -87,31 +73,37 @@ func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) { } // Sets the call timeout valid of starting of the call -func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string, maxDur time.Duration, destNr string) error { - // _, err := fsock.FS.SendApiCmd(fmt.Sprintf("sched_hangup +%d %s\n\n", int(maxDur.Seconds()), uuid)) +func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string, + maxDur time.Duration, destNr string) error { if len(sm.cfg.EmptyBalanceContext) != 0 { - _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s execute_on_answer sched_transfer +%d %s XML %s\n\n", - uuid, int(maxDur.Seconds()), destNr, sm.cfg.EmptyBalanceContext)) + _, err := sm.conns[connId].SendApiCmd( + fmt.Sprintf("uuid_setvar %s execute_on_answer sched_transfer +%d %s XML %s\n\n", + uuid, int(maxDur.Seconds()), destNr, sm.cfg.EmptyBalanceContext)) if err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not transfer the call to empty balance context, error: <%s>, connId: %s", - err.Error(), connId)) + utils.Logger.Err( + fmt.Sprintf(" Could not transfer the call to empty balance context, error: <%s>, connId: %s", + err.Error(), connId)) return err } return nil } else if len(sm.cfg.EmptyBalanceAnnFile) != 0 { - if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("sched_broadcast +%d %s playback!manager_request::%s aleg\n\n", - int(maxDur.Seconds()), uuid, sm.cfg.EmptyBalanceAnnFile)); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", - err.Error(), connId)) + if _, err := sm.conns[connId].SendApiCmd( + fmt.Sprintf("sched_broadcast +%d %s playback!manager_request::%s aleg\n\n", + int(maxDur.Seconds()), uuid, sm.cfg.EmptyBalanceAnnFile)); err != nil { + utils.Logger.Err( + fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", + err.Error(), connId)) return err } return nil } else { - _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n", - uuid, int(maxDur.Seconds()))) + _, err := sm.conns[connId].SendApiCmd( + fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n", + uuid, int(maxDur.Seconds()))) if err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not send sched_hangup command to freeswitch, error: <%s>, connId: %s", - err.Error(), connId)) + utils.Logger.Err( + fmt.Sprintf(" Could not send sched_hangup command to freeswitch, error: <%s>, connId: %s", + err.Error(), connId)) return err } return nil @@ -119,179 +111,132 @@ func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string, maxDur time. return nil } -// Queries LCR and sets the cgr_lcr channel variable -func (sm *FSSessionManager) setCgrLcr(ev engine.Event, connId string) error { - var lcrCost engine.LCRCost - startTime, err := ev.GetSetupTime(utils.META_DEFAULT, sm.timezone) - if err != nil { - return err - } - cd := &engine.CallDescriptor{ - CgrID: ev.GetCgrId(sm.Timezone()), - Direction: utils.OUT, - Tenant: ev.GetTenant(utils.META_DEFAULT), - Category: ev.GetCategory(utils.META_DEFAULT), - Subject: ev.GetSubject(utils.META_DEFAULT), - Account: ev.GetAccount(utils.META_DEFAULT), - Destination: ev.GetDestination(utils.META_DEFAULT), - TimeStart: startTime, - TimeEnd: startTime.Add(config.CgrConfig().MaxCallDuration), - } - if err := sm.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcrCost); err != nil { - return err - } - supps := []string{} - for _, supplCost := range lcrCost.SupplierCosts { - if dtcs, err := utils.NewDTCSFromRPKey(supplCost.Supplier); err != nil { - return err - } else if len(dtcs.Subject) != 0 { - supps = append(supps, dtcs.Subject) - } - } - fsArray := SliceAsFsArray(supps) - if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), fsArray)); err != nil { - return err - } - return nil -} - -func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) { - fsev := ev.(FSEvent) - if fsev[IGNOREPARK] == "true" { // Not for us - return - } - if ev.GetReqType(utils.META_DEFAULT) != utils.META_NONE { // Do not process this request - var maxCallDuration float64 // This will be the maximum duration this channel will be allowed to last - if err := sm.rater.Call("Responder.GetDerivedMaxSessionTime", - ev.AsCDR(config.CgrConfig().DefaultTimezone), &maxCallDuration); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not get max session time for %s, error: %s", - ev.GetUUID(), err.Error())) - } - if maxCallDuration != -1 { // For calls different than unlimited, set limits - maxCallDur := time.Duration(maxCallDuration) - if maxCallDur <= sm.cfg.MinCallDuration { - //utils.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject))) - sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS) - return - } - sm.setMaxCallDuration(ev.GetUUID(), connId, maxCallDur, ev.GetCallDestNr(utils.META_DEFAULT)) - } - } - // ComputeLcr - if ev.ComputeLcr() { - cd, err := fsev.AsCallDescriptor() - cd.CgrID = fsev.GetCgrId(sm.Timezone()) - if err != nil { - utils.Logger.Info(fmt.Sprintf(" LCR_PREPROCESS_ERROR: %s", err.Error())) - sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) - return - } - var lcr engine.LCRCost - if err = sm.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { - utils.Logger.Info(fmt.Sprintf(" LCR_API_ERROR: %s", err.Error())) - sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) - return - } - if lcr.HasErrors() { - lcr.LogErrors() - sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) - return - } - if supps, err := lcr.SuppliersSlice(); err != nil { - utils.Logger.Info(fmt.Sprintf(" LCR_ERROR: %s", err.Error())) - sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) - return - } else { - fsArray := SliceAsFsArray(supps) - if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n", - ev.GetUUID(), utils.CGR_SUPPLIERS, fsArray)); err != nil { - utils.Logger.Info(fmt.Sprintf(" LCR_ERROR: %s", err.Error())) - sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) - return - } - } - } - if sm.rls != nil { - var reply string - attrRU := utils.ArgRSv1ResourceUsage{ - CGREvent: utils.CGREvent{ - Tenant: ev.(FSEvent).GetTenant(utils.META_DEFAULT), - Event: ev.(FSEvent).AsMapStringInterface(sm.timezone), - }, - UsageID: ev.GetUUID(), - Units: 1, - } - if err := sm.rls.Call(utils.ResourceSv1AllocateResource, attrRU, &reply); err != nil { - if err.Error() == utils.ErrResourceUnavailable.Error() { - sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), "-"+utils.ErrResourceUnavailable.Error()) - } else { - utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error())) - sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) - } - return - } - } - sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK) -} - // Sends the transfer command to unpark the call to freeswitch -func (sm *FSSessionManager) unparkCall(uuid, connId, call_dest_nb, notify string) { - _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) +func (sm *FSSessionManager) unparkCall(uuid, connId, call_dest_nb, notify string) (err error) { + _, err = sm.conns[connId].SendApiCmd( + fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) if err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not send unpark api notification to freeswitch, error: <%s>, connId: %s", - err.Error(), connId)) + utils.Logger.Err( + fmt.Sprintf(" Could not send unpark api notification to freeswitch, error: <%s>, connId: %s", + err.Error(), connId)) + return } - if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not send unpark api call to freeswitch, error: <%s>, connId: %s", - err.Error(), connId)) + if _, err = sm.conns[connId].SendApiCmd( + fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil { + utils.Logger.Err( + fmt.Sprintf(" Could not send unpark api call to freeswitch, error: <%s>, connId: %s", + err.Error(), connId)) + } + return +} + +func (sm *FSSessionManager) onChannelPark(fsev FSEvent, connId string) { + if fsev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Not for us + return + } + authArgs := fsev.V1AuthorizeArgs() + var authReply V1AuthorizeReply + if err := sm.smg.Call(utils.SMGv1AuthorizeEvent, authArgs, &authReply); err != nil { + utils.Logger.Err( + fmt.Sprintf(" Could not authorize event %s, error: %s", + fsev.GetUUID(), err.Error())) + sm.unparkCall(fsev.GetUUID(), connId, + fsev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) + return + } + if authArgs.GetMaxUsage { + if authReply.MaxUsage != -1 { // For calls different than unlimited, set limits + if authReply.MaxUsage == 0 { + sm.unparkCall(fsev.GetUUID(), connId, + fsev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS) + return + } + sm.setMaxCallDuration(fsev.GetUUID(), connId, + authReply.MaxUsage, fsev.GetCallDestNr(utils.META_DEFAULT)) + } + } + if authArgs.CheckResources { + if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %b\n\n", + fsev.GetUUID(), CGRResourcesAllowed, authReply.ResourcesAllowed)); err != nil { + utils.Logger.Info( + fmt.Sprintf("<%s> error %s setting channel variabile: %s", + utils.SMFreeSWITCH, err.Error(), CGRResourcesAllowed)) + sm.unparkCall(fsev.GetUUID(), connId, + fsev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) + return + } + } + if authArgs.GetSuppliers { + fsArray := SliceAsFsArray(authReply.Suppliers.SupplierIDs()) + if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s %s %s\n\n", + fsev.GetUUID(), utils.CGR_SUPPLIERS, fsArray)); err != nil { + utils.Logger.Info(fmt.Sprintf(" LCR_ERROR: %s", err.Error())) + sm.unparkCall(fsev.GetUUID(), connId, fsev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) + return + } + } + if authArgs.GetAttributes { + if authReply.Attributes != nil { + for _, fldName := range authReply.Attributes.AlteredFields { + if _, err := sm.conns[connId].SendApiCmd( + fmt.Sprintf("uuid_setvar %s %s %s\n\n", fsev.GetUUID(), fldName, + authReply.Attributes.CGREvent.Event[fldName])); err != nil { + utils.Logger.Info( + fmt.Sprintf("<%s> error %s setting channel variabile: %s", + utils.SMFreeSWITCH, err.Error(), fldName)) + sm.unparkCall(fsev.GetUUID(), connId, + fsev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) + return + } + } + } + } + sm.unparkCall(fsev.GetUUID(), connId, + fsev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK) +} + +func (sm *FSSessionManager) onChannelAnswer(fsev FSEvent, connId string) { + if fsev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request + return + } + if fsev.MissingParameter(sm.timezone) { + sm.DisconnectSession(fsev, connId, MISSING_PARAMETER) + } + initSessionArgs := fsev.V1InitSessionArgs() + var initReply V1InitSessionReply + if err := sm.smg.Call(utils.SMGv1InitiateSession, + initSessionArgs, &initReply); err != nil { + utils.Logger.Err( + fmt.Sprintf(" Could not answer session with event %s, error: %s", + fsev.GetUUID(), err.Error())) + sm.DisconnectSession(fsev, connId, SYSTEM_ERROR) + return + } + if initSessionArgs.AllocateResources { + if initReply.ResAllocMessage == "" { + sm.DisconnectSession(fsev, connId, + utils.ErrUnallocatedResource.Error()) + } } } -func (sm *FSSessionManager) onChannelAnswer(ev engine.Event, connId string) { - if ev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request +func (sm *FSSessionManager) onChannelHangupComplete(fsev FSEvent, connId string) { + if fsev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request return } - if ev.MissingParameter(sm.timezone) { - sm.DisconnectSession(ev, connId, MISSING_PARAMETER) - } - s := NewSession(ev, connId, sm) - if s != nil { - sm.sessions.indexSession(s) - } -} - -func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) { - if ev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request - return - } - var s *Session - for i := 0; i < 2; i++ { // Protect us against concurrency, wait a couple of seconds for the answer to be populated before we process hangup - s = sm.sessions.getSession(ev.GetUUID()) - if s != nil { - break - } - time.Sleep(time.Duration(i+1) * time.Second) - } - if s != nil { // Handled by us, cleanup here - if err := sm.sessions.removeSession(s, ev); err != nil { - utils.Logger.Err(err.Error()) - } - } - if sm.cfg.CreateCdr { - sm.ProcessCdr(ev.AsCDR(config.CgrConfig().DefaultTimezone)) - } var reply string - attrRU := utils.ArgRSv1ResourceUsage{ - CGREvent: utils.CGREvent{ - Tenant: ev.(FSEvent).GetTenant(utils.META_DEFAULT), - Event: ev.(FSEvent).AsMapStringInterface(sm.timezone), - }, - UsageID: ev.GetUUID(), - Units: 1, + if err := sm.smg.Call(utils.SMGv1TerminateSession, + fsev.V1TerminateSessionArgs(), &reply); err != nil { + utils.Logger.Err( + fmt.Sprintf(" Could not terminate session with event %s, error: %s", + fsev.GetUUID(), err.Error())) + return } - if sm.rls != nil { - if err := sm.rls.Call(utils.ResourceSv1ReleaseResource, attrRU, &reply); err != nil { - utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error())) + if sm.cfg.CreateCdr { + cdr := fsev.AsCDR(sm.timezone) + if err := sm.smg.Call(utils.SMGv1ProcessCDR, cdr, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", + cdr.CGRID, cdr.OriginID, err.Error())) } } } @@ -325,24 +270,15 @@ func (sm *FSSessionManager) Connect() error { } else { sm.senderPools[connId] = fsSenderPool } - if sm.cfg.ChannelSyncInterval != 0 { // Schedule running of the callsync - go func() { - for { // Schedule sync channels to run repetately - time.Sleep(sm.cfg.ChannelSyncInterval) - sm.SyncSessions() - } - - }() - } } err := <-errChan // Will keep the Connect locked until the first error in one of the connections return err } // Disconnects a session by sending hangup command to freeswitch -func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error { +func (sm *FSSessionManager) DisconnectSession(fsev FSEvent, connId, notify string) error { if _, err := sm.conns[connId].SendApiCmd( - fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil { + fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", fsev.GetUUID(), notify)); err != nil { utils.Logger.Err(fmt.Sprintf(" Could not send disconect api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) return err @@ -350,7 +286,7 @@ func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify st if notify == INSUFFICIENT_FUNDS { if len(sm.cfg.EmptyBalanceContext) != 0 { if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s XML %s\n\n", - ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil { + fsev.GetUUID(), fsev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil { utils.Logger.Err(fmt.Sprintf(" Could not transfer the call to empty balance context, error: <%s>, connId: %s", err.Error(), connId)) return err @@ -358,7 +294,7 @@ func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify st return nil } else if len(sm.cfg.EmptyBalanceAnnFile) != 0 { if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", - ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil { + fsev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil { utils.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) return err @@ -366,51 +302,13 @@ func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify st return nil } } - if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil { + if err := sm.conns[connId].SendMsgCmd(fsev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil { utils.Logger.Err(fmt.Sprintf(" Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) return err } return nil } -func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.CDR) error { - var reply string - if err := sm.cdrsrv.Call("CdrsV1.ProcessCDR", storedCdr, &reply); err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", - storedCdr.CGRID, storedCdr.OriginID, err.Error())) - } - return nil -} - -func (sm *FSSessionManager) DebitInterval() time.Duration { - return sm.cfg.DebitInterval -} - -func (sm *FSSessionManager) CdrSrv() rpcclient.RpcClientConnection { - return sm.cdrsrv -} - -func (sm *FSSessionManager) Rater() rpcclient.RpcClientConnection { - return sm.rater -} - -func (sm *FSSessionManager) Sessions() []*Session { - return sm.sessions.getSessions() -} - -func (sm *FSSessionManager) Timezone() string { - return sm.timezone -} - -// Called when call goes under the minimum duratio threshold, so FreeSWITCH can play an announcement message -func (sm *FSSessionManager) WarnSessionMinDuration(sessionUuid, connId string) { - if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", - sessionUuid, sm.cfg.LowBalanceAnnFile)); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: %s, connection id: %s", - err.Error(), connId)) - } -} - func (sm *FSSessionManager) Shutdown() (err error) { for connId, fSock := range sm.conns { if !fSock.Connected() { @@ -422,76 +320,5 @@ func (sm *FSSessionManager) Shutdown() (err error) { utils.Logger.Err(fmt.Sprintf(" Error on calls shutdown: %s, connection id: %s", err.Error(), connId)) } } - for i := 0; len(sm.sessions.getSessions()) > 0 && i < 20; i++ { - time.Sleep(100 * time.Millisecond) // wait for the hungup event to be fired - utils.Logger.Info(fmt.Sprintf(" Shutdown waiting on sessions: %v", sm.sessions)) - } - return nil -} - -// Sync sessions with FS -/* -map[secure: hostname:CgrDev1 callstate:ACTIVE callee_num:1002 initial_dest:1002 state:CS_EXECUTE dialplan:XML read_codec:SPEEX initial_ip_addr:127.0.0.1 write_codec:SPEEX write_bit_rate:44000 -call_uuid:3427e500-10e5-4864-a589-e306b70419a2 presence_id: initial_cid_name:1001 context:default read_rate:32000 read_bit_rate:44000 callee_direction:SEND initial_context:default created:2015-06-15 18:48:13 -dest:1002 callee_name:Outbound Call direction:inbound ip_addr:127.0.0.1 sent_callee_name:Outbound Call write_rate:32000 presence_data: sent_callee_num:1002 created_epoch:1434386893 cid_name:1001 application:sched_hangup -application_data:+10800 alloted_timeout uuid:3427e500-10e5-4864-a589-e306b70419a2 name:sofia/cgrtest/1001@127.0.0.1 cid_num:1001 initial_cid_num:1001 initial_dialplan:XML] -*/ -func (sm *FSSessionManager) SyncSessions() error { - for connId, senderPool := range sm.senderPools { - var aChans []map[string]string - fsConn, err := senderPool.PopFSock() - if err != nil { - if err == fsock.ErrConnectionPoolTimeout { // Timeout waiting for connections to re-establish, cleanup calls - aChans = make([]map[string]string, 0) // Emulate no call information so we can disconnect bellow - } else { - utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", - senderPool, err.Error())) - continue - } - } else { - activeChanStr, err := fsConn.SendApiCmd("show channels") - senderPool.PushFSock(fsConn) - if err != nil { - utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", - senderPool, err.Error())) - continue - } - aChans = fsock.MapChanData(activeChanStr) - if len(aChans) == 0 && strings.HasPrefix(activeChanStr, "uuid,direction") { // Failed converting output from FS - utils.Logger.Err(fmt.Sprintf(" Syncing active calls, failed converting output from FS: %s", - activeChanStr)) - continue - } - } - for _, session := range sm.sessions.getSessions() { - if session.connId != connId { // This session belongs to another connectionId - continue - } - var stillActive bool - for _, fsAChan := range aChans { - if fsAChan["call_uuid"] == session.eventStart.GetUUID() || - (fsAChan["call_uuid"] == "" && fsAChan["uuid"] == session.eventStart.GetUUID()) { // Channel still active - stillActive = true - break - } - } - if stillActive { // No need to do anything since the channel is still there - continue - } - utils.Logger.Warning(fmt.Sprintf(" Sync active channels, stale session detected, uuid: %s", - session.eventStart.GetUUID())) - fsev := session.eventStart.(FSEvent) - now := time.Now() - aTime, _ := fsev.GetAnswerTime("", sm.timezone) - dur := now.Sub(aTime) - fsev[END_TIME] = now.String() - fsev[DURATION] = strconv.FormatFloat(dur.Seconds(), 'f', -1, 64) - if err := sm.sessions.removeSession(session, fsev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database - utils.Logger.Err(fmt.Sprintf(" Error on removing stale session with uuid: %s, error: %s", - session.eventStart.GetUUID(), err.Error())) - continue - } - } - } - return nil + return } diff --git a/sessionmanager/fssessionmanager_test.go b/sessionmanager/fssessionmanager_test.go deleted file mode 100644 index ef917d26f..000000000 --- a/sessionmanager/fssessionmanager_test.go +++ /dev/null @@ -1,26 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package sessionmanager - -import ( - "testing" -) - -func TestFSSMInterface(t *testing.T) { - var _ SessionManager = SessionManager(new(FSSessionManager)) -} diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index f3230d8d4..4aca0c369 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -89,7 +89,7 @@ func (self *KamailioSessionManager) allocateResources(kev KamEvent) (err error) Units: 1, // One channel reserved } var reply string - return self.rlS.Call(utils.ResourceSv1AllocateResource, attrRU, &reply) + return self.rlS.Call(utils.ResourceSv1AllocateResources, attrRU, &reply) } func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) { @@ -224,7 +224,7 @@ func (self *KamailioSessionManager) onCallEnd(evData []byte, connId string) { UsageID: kev.GetUUID(), Units: 1, } - if err := self.rlS.Call(utils.ResourceSv1ReleaseResource, attrRU, &reply); err != nil { + if err := self.rlS.Call(utils.ResourceSv1ReleaseResources, attrRU, &reply); err != nil { utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error())) } }() diff --git a/sessionmanager/session_test.go b/sessionmanager/session_test.go deleted file mode 100644 index 23e0e7c77..000000000 --- a/sessionmanager/session_test.go +++ /dev/null @@ -1,180 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package sessionmanager - -import ( - "testing" - "time" - - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -//"github.com/cgrates/cgrates/config" -//"testing" - -var ( - newEventBody = ` -"Event-Name": "HEARTBEAT", -"Core-UUID": "d5abc5b0-95c6-11e1-be05-43c90197c914", -"FreeSWITCH-Hostname": "grace", -"FreeSWITCH-Switchname": "grace", -"FreeSWITCH-IPv4": "172.17.77.126", -"variable_sip_full_from": "rif", -"variable_cgr_account": "rif", -"variable_sip_full_to": "0723045326", -"Caller-Dialplan": "vdf", -"FreeSWITCH-IPv6": "::1", -"Event-Date-Local": "2012-05-04 14:38:23", -"Event-Date-GMT": "Fri, 03 May 2012 11:38:23 GMT", -"Event-Date-Timestamp": "1336131503218867", -"Event-Calling-File": "switch_core.c", -"Event-Calling-Function": "send_heartbeat", -"Event-Calling-Line-Number": "68", -"Event-Sequence": "4171", -"Event-Info": "System Ready", -"Up-Time": "0 years, 0 days, 2 hours, 43 minutes, 21 seconds, 349 milliseconds, 683 microseconds", -"Session-Count": "0", -"Max-Sessions": "1000", -"Session-Per-Sec": "30", -"Session-Since-Startup": "122", -"Idle-CPU": "100.000000" -` - conf_data = []byte(` -### Test data, not for production usage - -[global] -default_reqtype= -`) -) - -/* Missing parameter is not longer tested in NewSession. ToDo: expand this test for more util information -func TestSessionNilSession(t *testing.T) { - var errCfg error - cfg, errCfg = config.NewCGRConfigBytes(conf_data) // Needed here to avoid nil on cfg variable - if errCfg != nil { - t.Errorf("Cannot get configuration %v", errCfg) - } - newEvent := new(FSEvent).New("") - sm := &FSSessionManager{} - s := NewSession(newEvent, sm) - if s != nil { - t.Error("no account and it still created session.") - } -} -*/ - -type MockRpcClient struct { - refundCd *engine.CallDescriptor -} - -func (mc *MockRpcClient) Call(methodName string, arg interface{}, reply interface{}) error { - if cd, ok := arg.(*engine.CallDescriptor); ok { - mc.refundCd = cd - } - return nil -} -func (mc *MockRpcClient) GetCost(*engine.CallDescriptor, *engine.CallCost) error { return nil } -func (mc *MockRpcClient) Debit(*engine.CallDescriptor, *engine.CallCost) error { return nil } -func (mc *MockRpcClient) MaxDebit(*engine.CallDescriptor, *engine.CallCost) error { return nil } -func (mc *MockRpcClient) RefundIncrements(cd *engine.CallDescriptor, reply *float64) error { - mc.refundCd = cd - return nil -} -func (mc *MockRpcClient) RefundRounding(cd *engine.CallDescriptor, reply *float64) error { - return nil -} -func (mc *MockRpcClient) GetMaxSessionTime(*engine.CallDescriptor, *float64) error { return nil } -func (mc *MockRpcClient) GetDerivedChargers(*utils.AttrDerivedChargers, *utils.DerivedChargers) error { - return nil -} -func (mc *MockRpcClient) GetDerivedMaxSessionTime(*engine.CDR, *float64) error { return nil } -func (mc *MockRpcClient) GetSessionRuns(*engine.CDR, *[]*engine.SessionRun) error { return nil } -func (mc *MockRpcClient) ProcessCdr(*engine.CDR, *string) error { return nil } -func (mc *MockRpcClient) StoreSMCost(engine.AttrCDRSStoreSMCost, *string) error { return nil } -func (mc *MockRpcClient) GetLCR(*engine.AttrGetLcr, *engine.LCRCost) error { return nil } -func (mc *MockRpcClient) GetTimeout(int, *time.Duration) error { return nil } - -func TestSessionRefund(t *testing.T) { - mc := &MockRpcClient{} - s := &Session{sessionManager: &FSSessionManager{rater: mc, timezone: time.UTC.String()}, eventStart: FSEvent{SETUP_TIME: time.Now().Format(time.RFC3339)}} - ts := &engine.TimeSpan{ - TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC), - TimeEnd: time.Date(2015, 6, 10, 14, 7, 30, 0, time.UTC), - } - // add increments - for i := 0; i < 30; i++ { - ts.AddIncrement(&engine.Increment{Duration: time.Second, Cost: 1.0}) - } - - cc := &engine.CallCost{Timespans: engine.TimeSpans{ts}} - hangupTime := time.Date(2015, 6, 10, 14, 7, 20, 0, time.UTC) - s.Refund(cc, hangupTime) - if len(mc.refundCd.Increments) != 1 || mc.refundCd.Increments[0].GetCompressFactor() != 10 || len(cc.Timespans) != 1 || cc.Timespans[0].TimeEnd != hangupTime { - t.Errorf("Error refunding: %+v, %+v", mc.refundCd.Increments, cc.Timespans[0]) - } -} - -func TestSessionRefundAll(t *testing.T) { - mc := &MockRpcClient{} - s := &Session{sessionManager: &FSSessionManager{rater: mc, timezone: time.UTC.String()}, eventStart: FSEvent{SETUP_TIME: time.Now().Format(time.RFC3339)}} - ts := &engine.TimeSpan{ - TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC), - TimeEnd: time.Date(2015, 6, 10, 14, 7, 30, 0, time.UTC), - } - // add increments - for i := 0; i < 30; i++ { - ts.AddIncrement(&engine.Increment{Duration: time.Second, Cost: 1.0}) - } - - cc := &engine.CallCost{Timespans: engine.TimeSpans{ts}} - hangupTime := time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC) - s.Refund(cc, hangupTime) - if len(mc.refundCd.Increments) != 1 || mc.refundCd.Increments[0].GetCompressFactor() != 30 || len(cc.Timespans) != 0 { - t.Errorf("Error refunding: %+v, %+v", len(mc.refundCd.Increments), cc.Timespans) - } -} - -func TestSessionRefundManyAll(t *testing.T) { - mc := &MockRpcClient{} - s := &Session{sessionManager: &FSSessionManager{rater: mc, timezone: time.UTC.String()}, eventStart: FSEvent{SETUP_TIME: time.Now().Format(time.RFC3339)}} - ts1 := &engine.TimeSpan{ - TimeStart: time.Date(2015, 6, 10, 14, 7, 0, 0, time.UTC), - TimeEnd: time.Date(2015, 6, 10, 14, 7, 30, 0, time.UTC), - } - // add increments - for i := 0; i < int(ts1.GetDuration().Seconds()); i++ { - ts1.AddIncrement(&engine.Increment{Duration: time.Second, Cost: 1.0}) - } - - ts2 := &engine.TimeSpan{ - TimeStart: time.Date(2015, 6, 10, 14, 7, 30, 0, time.UTC), - TimeEnd: time.Date(2015, 6, 10, 14, 8, 0, 0, time.UTC), - } - // add increments - for i := 0; i < int(ts2.GetDuration().Seconds()); i++ { - ts2.AddIncrement(&engine.Increment{Duration: time.Second, Cost: 1.0}) - } - - cc := &engine.CallCost{Timespans: engine.TimeSpans{ts1, ts2}} - hangupTime := time.Date(2015, 6, 10, 14, 07, 0, 0, time.UTC) - s.Refund(cc, hangupTime) - if len(mc.refundCd.Increments) != 1 || mc.refundCd.Increments[0].GetCompressFactor() != 60 || len(cc.Timespans) != 0 { - t.Errorf("Error refunding: %+v, %+v", len(mc.refundCd.Increments), cc.Timespans) - } -} diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index a5abd352d..9f1935616 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -63,12 +63,15 @@ type SMGReplicationConn struct { Synchronous bool } -func NewSMGeneric(cgrCfg *config.CGRConfig, rals rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, +func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, + splS, attrS, cdrsrv rpcclient.RpcClientConnection, smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric { ssIdxCfg := cgrCfg.SmGenericConfig.SessionIndexes ssIdxCfg[utils.OriginID] = true // Make sure we have indexing for OriginID since it is a requirement on prefix searching return &SMGeneric{cgrCfg: cgrCfg, rals: rals, + resS: resS, + attrS: attrS, cdrsrv: cdrsrv, smgReplConns: smgReplConns, Timezone: timezone, @@ -84,10 +87,13 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals rpcclient.RpcClientConnection, } type SMGeneric struct { - cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple - rals rpcclient.RpcClientConnection - cdrsrv rpcclient.RpcClientConnection - smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data + cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple + rals rpcclient.RpcClientConnection // RALs connections + resS rpcclient.RpcClientConnection // ResourceS connections + splS rpcclient.RpcClientConnection // SupplierS connections + attrS rpcclient.RpcClientConnection // AttributeS connections + cdrsrv rpcclient.RpcClientConnection // CDR server connections + smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data Timezone string activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging aSessionsMux sync.RWMutex @@ -120,8 +126,8 @@ type smgSessionTerminator struct { // setSessionTerminator installs a new terminator for a session func (smg *SMGeneric) setSessionTerminator(s *SMGSession) { - ttl := s.EventStart.GetSessionTTL(smg.cgrCfg.SmGenericConfig.SessionTTL, - smg.cgrCfg.SmGenericConfig.SessionTTLMaxDelay) + ttl := s.EventStart.GetSessionTTL(smg.cgrCfg.SMGConfig.SessionTTL, + smg.cgrCfg.SMGConfig.SessionTTLMaxDelay) if ttl == 0 { return } @@ -383,9 +389,9 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, CD: sessionRun.CallDescriptor, clntConn: clntConn} smg.recordASession(s) //utils.Logger.Info(fmt.Sprintf(" Starting session: %s, runId: %s", sessionId, s.runId)) - if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { + if smg.cgrCfg.SMGConfig.DebitInterval != 0 { s.stopDebit = stopDebitChan - go s.debitLoop(smg.cgrCfg.SmGenericConfig.DebitInterval) + go s.debitLoop(smg.cgrCfg.SMGConfig.DebitInterval) } } return nil, nil @@ -465,7 +471,7 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro // replicateSessions will replicate session based on configuration func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool, smgReplConns []*SMGReplicationConn) (err error) { if len(smgReplConns) == 0 || - (smg.cgrCfg.SmGenericConfig.DebitInterval != 0 && !passiveSessions) { // Replicating active not supported + (smg.cgrCfg.SMGConfig.DebitInterval != 0 && !passiveSessions) { // Replicating active not supported return } ssMux := &smg.aSessionsMux @@ -712,7 +718,7 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClie smg.sessionEnd(cgrID, 0) return } - if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Session handled by debit loop + if smg.cgrCfg.SMGConfig.DebitInterval != 0 { // Session handled by debit loop maxUsage = time.Duration(-1 * time.Second) return } @@ -731,7 +737,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient return item.Value.(time.Duration), item.Err } defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) - if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active + if smg.cgrCfg.SMGConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active err = errors.New("ACTIVE_DEBIT_LOOP") return } @@ -748,8 +754,8 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient } smg.resetTerminatorTimer(cgrID, gev.GetSessionTTL( - smg.cgrCfg.SmGenericConfig.SessionTTL, - smg.cgrCfg.SmGenericConfig.SessionTTLMaxDelay), + smg.cgrCfg.SMGConfig.SessionTTL, + smg.cgrCfg.SMGConfig.SessionTTLMaxDelay), gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage()) var lastUsed *time.Duration var evLastUsed time.Duration @@ -759,7 +765,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient return } if maxUsage, err = gev.GetMaxUsage(utils.META_DEFAULT, - smg.cgrCfg.SmGenericConfig.MaxCallDuration); err != nil { + smg.cgrCfg.SMGConfig.MaxCallDuration); err != nil { if err == utils.ErrNotFound { err = utils.ErrMandatoryIeMissing } @@ -1273,3 +1279,214 @@ func (smg *SMGeneric) BiRPCV1ReplicatePassiveSessions(clnt rpcclient.RpcClientCo *reply = utils.OK return } + +type V1AuthorizeArgs struct { + GetMaxUsage bool + CheckResources bool + GetSuppliers bool + GetAttributes bool + utils.CGREvent + utils.Paginator +} + +type V1AuthorizeReply struct { + MaxUsage time.Duration + ResourcesAllowed bool + Suppliers engine.SortedSuppliers + Attributes *engine.AttrSProcessEventReply +} + +// BiRPCV1Authorize performs authorization for CGREvent based on specific components +func (smg *SMGeneric) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, + args *V1AuthorizeArgs, authReply *V1AuthorizeReply) (err error) { + if args.GetMaxUsage { + maxUsage, err := smg.GetMaxUsage(args.CGREvent.Event) + if err != nil { + return utils.NewErrServerError(err) + } + authReply.MaxUsage = maxUsage + } + if args.CheckResources { + originID, err := args.CGREvent.FieldAsString(utils.ACCID) + if err != nil { + return utils.NewErrServerError(err) + } + var allowed bool + attrRU := utils.ArgRSv1ResourceUsage{ + Tenant: args.CGREvent.Tenant, + UsageID: originID, + Event: args.CGREvent.Event, + Units: 1, + } + if err = smg.resS.Call(utils.ResourceSv1AllowUsage, + attrRU, &allowed); err != nil { + return err + } + authReply.ResourcesAllowed = allowed + } + if args.GetSuppliers { + var splsReply engine.SortedSuppliers + if err = smg.splS.Call(utils.SupplierSv1GetSuppliers, + args.CGREvent, &splsReply); err != nil { + return err + } + authReply.Suppliers = splsReply + } + if args.GetAttributes { + if args.CGREvent.Context == nil { // populate if not already in + args.CGREvent.Context = utils.StringPointer(utils.MetaSMG) + } + var rplyEv engine.AttrSProcessEventReply + if err = smg.attrS.Call(utils.AttributeSv1ProcessEvent, + args.CGREvent, &rplyEv); err != nil { + return + } + authReply.Attributes = &rplyEv + } + return nil +} + +type V1InitSessionArgs struct { + InitSession bool + AllocateResources bool + GetAttributes bool + utils.CGREvent +} + +type V1InitSessionReply struct { + MaxUsage time.Duration + ResAllocMessage string + Attributes *engine.AttrSProcessEventReply +} + +// BiRPCV2InitiateSession initiates a new session, returns the maximum duration the session can last +func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, + args *V1InitSessionArgs, rply *V1InitSessionReply) (err error) { + if args.AllocateResources { + originID, err := args.CGREvent.FieldAsString(utils.ACCID) + if err != nil { + return utils.NewErrServerError(err) + } + attrRU := utils.ArgRSv1ResourceUsage{ + Tenant: args.CGREvent.Tenant, + UsageID: originID, + Event: args.CGREvent.Event, + Units: 1, + } + var allocMessage string + if err = smg.resS.Call(utils.ResourceSv1AllocateResources, + attrRU, &allocMessage); err != nil { + return err + } + rply.ResAllocMessage = allocMessage + } + if args.InitSession { + if rply.MaxUsage, err = smg.InitiateSession(args.CGREvent.Event, clnt); err != nil { + if err != rpcclient.ErrSessionNotFound { + err = utils.NewErrServerError(err) + } + return + } + } + if args.GetAttributes { + if args.CGREvent.Context == nil { + args.CGREvent.Context = utils.StringPointer(utils.MetaSMG) + } + var rplyEv engine.AttrSProcessEventReply + if err = smg.attrS.Call(utils.AttributeSv1ProcessEvent, + args.CGREvent, &rplyEv); err != nil { + return + } + rply.Attributes = &rplyEv + } + return +} + +type V1UpdateSessionArgs struct { + UpdateSession bool + AllocateResources bool + utils.CGREvent +} + +type V1UpdateSessionReply struct { + MaxUsage time.Duration + ResAllocMessage string +} + +// BiRPCV1UpdateSession updates an existing session, returning the duration which the session can still last +func (smg *SMGeneric) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, + args *V1UpdateSessionArgs, rply *V1UpdateSessionReply) (err error) { + if args.UpdateSession { + if rply.MaxUsage, err = smg.UpdateSession(args.CGREvent.Event, clnt); err != nil { + if err != rpcclient.ErrSessionNotFound { + err = utils.NewErrServerError(err) + } + return + } + } + if args.AllocateResources { + originID, err := args.CGREvent.FieldAsString(utils.ACCID) + if err != nil { + return utils.NewErrServerError(err) + } + attrRU := utils.ArgRSv1ResourceUsage{ + Tenant: args.CGREvent.Tenant, + UsageID: originID, + Event: args.CGREvent.Event, + Units: 1, + } + var allocMessage string + if err = smg.resS.Call(utils.ResourceSv1AllocateResources, + attrRU, &allocMessage); err != nil { + return err + } + rply.ResAllocMessage = allocMessage + } + return +} + +type V1TerminateSessionArgs struct { + TerminateSession bool + ReleaseResources bool + utils.CGREvent +} + +// BiRPCV1TerminateSession will stop debit loops as well as release any used resources +func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, + args *V1TerminateSessionArgs, rply *string) (err error) { + if args.TerminateSession { + if err = smg.TerminateSession(args.CGREvent.Event, clnt); err != nil { + if err != rpcclient.ErrSessionNotFound { + err = utils.NewErrServerError(err) + } + return + } + } + if args.ReleaseResources { + originID, err := args.CGREvent.FieldAsString(utils.ACCID) + if err != nil { + return utils.NewErrServerError(err) + } + var reply string + argsRU := utils.ArgRSv1ResourceUsage{ + Tenant: args.CGREvent.Tenant, + UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired + Event: args.CGREvent.Event, + } + if err = smg.resS.Call(utils.ResourceSv1ReleaseResources, + argsRU, &reply); err != nil { + return utils.NewErrServerError(err) + } + } + *rply = utils.OK + return +} + +// Called on session end, should send the CDR to CDRS +func (smg *SMGeneric) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, cgrEv utils.CGREvent, reply *string) error { + if err := smg.ProcessCDR(cgrEv.Event); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil +} diff --git a/sessionmanager/smgeneric_test.go b/sessionmanager/smgeneric_test.go index fadf0e0e5..ae82cdbed 100644 --- a/sessionmanager/smgeneric_test.go +++ b/sessionmanager/smgeneric_test.go @@ -29,12 +29,12 @@ var smgCfg *config.CGRConfig func init() { smgCfg, _ = config.NewDefaultCGRConfig() - smgCfg.SmGenericConfig.SessionIndexes = utils.StringMap{"Tenant": true, + smgCfg.SMGConfig.SessionIndexes = utils.StringMap{"Tenant": true, "Account": true, "Extra3": true, "Extra4": true} } func TestSMGSessionIndexing(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC") smGev := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -389,7 +389,7 @@ func TestSMGSessionIndexing(t *testing.T) { } func TestSMGActiveSessions(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC") smGev1 := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -463,7 +463,7 @@ func TestSMGActiveSessions(t *testing.T) { } func TestGetPassiveSessions(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC") if pSS := smg.getSessions("", true); len(pSS) != 0 { t.Errorf("PassiveSessions: %+v", pSS) } diff --git a/utils/birpcint_client.go b/utils/birpcint_client.go index 9a2edc477..b8612881a 100644 --- a/utils/birpcint_client.go +++ b/utils/birpcint_client.go @@ -26,6 +26,20 @@ import ( "github.com/cgrates/rpcclient" ) +// NewBiJSONrpcClient will create a bidirectional JSON client connection +func NewBiJSONrpcClient(addr string, handlers map[string]interface{}) (*rpc2.Client, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + clnt := rpc2.NewClientWithCodec(rpc2_jsonrpc.NewJSONCodec(conn)) + for method, handlerFunc := range handlers { + clnt.Handle(method, handlerFunc) + } + go clnt.Run() + return clnt, nil +} + // Interface which the server needs to work as BiRPCServer type BiRPCServer interface { Call(string, interface{}, interface{}) error // So we can use it also as rpcclient.RpcClientConnection @@ -51,17 +65,3 @@ func (clnt *BiRPCInternalClient) SetClientConn(clntConn rpcclient.RpcClientConne func (clnt *BiRPCInternalClient) Call(serviceMethod string, args interface{}, reply interface{}) error { return clnt.serverConn.CallBiRPC(clnt.clntConn, serviceMethod, args, reply) } - -func NewBiJSONrpcClient(addr string, handlers map[string]interface{}) (*rpc2.Client, error) { - conn, err := net.Dial("tcp", addr) - if err != nil { - return nil, err - } - - clnt := rpc2.NewClientWithCodec(rpc2_jsonrpc.NewJSONCodec(conn)) - for method, handlerFunc := range handlers { - clnt.Handle(method, handlerFunc) - } - go clnt.Run() - return clnt, nil -} diff --git a/utils/consts.go b/utils/consts.go index bd53cd595..95719e542 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -522,10 +522,6 @@ const ( MetaTpRatingProfile = "*tp_rating_profile" MetaStorDB = "*stordb" MetaDataDB = "*datadb" - SMGenericV2UpdateSession = "SMGenericV2.UpdateSession" - SMGenericV2InitiateSession = "SMGenericV2.InitiateSession" - SMGenericV1UpdateSession = "SMGenericV1.UpdateSession" - SMGenericV1InitiateSession = "SMGenericV1.InitiateSession" SupplierS = "SupplierS" MetaWeight = "*weight" MetaLeastCost = "*least_cost" diff --git a/utils/errors.go b/utils/errors.go index 21152785f..f688cb189 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -48,6 +48,7 @@ var ( ErrNoActiveSession = errors.New("NO_ACTIVE_SESSION") ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED") ErrMaxUsageExceeded = errors.New("MAX_USAGE_EXCEEDED") + ErrUnallocatedResource = errors.New("UNALLOCATED_RESOURCE") ) // NewCGRError initialises a new CGRError