diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index df2258907..dc693b73c 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -254,7 +254,7 @@ func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cache } cdrsConn = &engine.RPCClientConnector{Client: client} } - sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn) + sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn, cdrDb) if err := sm.Connect(); err != nil { engine.Logger.Err(fmt.Sprintf(" error: %s!", err)) } diff --git a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg index 9230be6f6..96b5d7d74 100644 --- a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg +++ b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg @@ -222,8 +222,8 @@ route{ send_reply("500","Internal Server Error"); exit; } - $avp(cgr_reqtype)="*pseudoprepaid"; - #$avp(cgr_account)=$fU; + $avp(cgr_reqtype)="*prepaid"; + $avp(cgr_account)=$fU; $avp(cgr_destination)=$rU; $avp(cgr_supplier)="supplier1"; setflag(CDR); diff --git a/general_tests/tutorial_fs_calls_test.go b/general_tests/tutorial_fs_calls_test.go index 22e268fd7..cb2e903c9 100644 --- a/general_tests/tutorial_fs_calls_test.go +++ b/general_tests/tutorial_fs_calls_test.go @@ -23,7 +23,6 @@ import ( "net/rpc/jsonrpc" "os" "path" - "reflect" "strings" "testing" "time" @@ -133,66 +132,6 @@ func TestTutFsCallsLoadTariffPlanFromFolder(t *testing.T) { time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups } -// Check loaded stats -func TestTutFsCallsCacheStats(t *testing.T) { - if !*testCalls { - return - } - var rcvStats *utils.CacheStats - expectedStats := &utils.CacheStats{Destinations: 4, RatingPlans: 3, RatingProfiles: 8, Actions: 6, SharedGroups: 1, RatingAliases: 1, AccountAliases: 1, DerivedChargers: 1} - var args utils.AttrCacheStats - if err := tutFsCallsRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { - t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) - } else if !reflect.DeepEqual(expectedStats, rcvStats) { - t.Errorf("Calling ApierV1.GetCacheStats expected: %v, received: %v", expectedStats, rcvStats) - } -} - -// Check items age -func TestTutFsCallsGetCachedItemAge(t *testing.T) { - if !*testCalls { - return - } - var rcvAge *utils.CachedItemAge - if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "1002", &rcvAge); err != nil { - t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) - } else if rcvAge.Destination > time.Duration(2)*time.Second { - t.Errorf("Cache too old: %d", rcvAge) - } - if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "RP_RETAIL1", &rcvAge); err != nil { - t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) - } else if rcvAge.RatingPlan > time.Duration(2)*time.Second { - t.Errorf("Cache too old: %d", rcvAge) - } - if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "*out:cgrates.org:call:*any", &rcvAge); err != nil { - t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) - } else if rcvAge.RatingProfile > time.Duration(2)*time.Second { - t.Errorf("Cache too old: %d", rcvAge) - } - if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "LOG_WARNING", &rcvAge); err != nil { - t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) - } else if rcvAge.Action > time.Duration(2)*time.Second { - t.Errorf("Cache too old: %d", rcvAge) - } - if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "SHARED_A", &rcvAge); err != nil { - t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) - } else if rcvAge.SharedGroup > time.Duration(2)*time.Second { - t.Errorf("Cache too old: %d", rcvAge) - } - /* - if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "1006", &rcvAge); err != nil { - t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) - } else if rcvAge.RatingAlias > time.Duration(2)*time.Second { - t.Errorf("Cache too old: %d", rcvAge) - } - if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "1006", &rcvAge); err != nil { - t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) - } else if rcvAge.RatingAlias > time.Duration(2)*time.Second || rcvAge.AccountAlias > time.Duration(2)*time.Second { - t.Errorf("Cache too old: %d", rcvAge) - } - */ -} - // Make sure account was debited properly func TestTutFsCallsAccountsBefore(t *testing.T) { if !*testCalls { @@ -235,122 +174,6 @@ func TestTutFsCallsAccountsBefore(t *testing.T) { } } -// Check call costs -func TestTutFsCallsGetCosts(t *testing.T) { - if !*testCalls { - return - } - tStart, _ := utils.ParseDate("2014-08-04T13:00:00Z") - tEnd, _ := utils.ParseDate("2014-08-04T13:00:20Z") - cd := engine.CallDescriptor{ - Direction: "*out", - Category: "call", - Tenant: "cgrates.org", - Subject: "1001", - Account: "1001", - Destination: "1002", - DurationIndex: 0, - TimeStart: tStart, - TimeEnd: tEnd, - } - var cc engine.CallCost - if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil { - t.Error("Got error on Responder.GetCost: ", err.Error()) - } else if cc.Cost != 0.6 { - t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) - } - tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") - tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z") - cd = engine.CallDescriptor{ - Direction: "*out", - Category: "call", - Tenant: "cgrates.org", - Subject: "1001", - Account: "1001", - Destination: "1002", - DurationIndex: 0, - TimeStart: tStart, - TimeEnd: tEnd, - } - if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil { - t.Error("Got error on Responder.GetCost: ", err.Error()) - } else if cc.Cost != 0.6417 { // 0.01 first minute, 0.04 25 seconds with RT_20CNT - t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) - } - tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") - tEnd, _ = utils.ParseDate("2014-08-04T13:00:20Z") - cd = engine.CallDescriptor{ - Direction: "*out", - Category: "call", - Tenant: "cgrates.org", - Subject: "1001", - Account: "1001", - Destination: "1003", - DurationIndex: 0, - TimeStart: tStart, - TimeEnd: tEnd, - } - if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil { - t.Error("Got error on Responder.GetCost: ", err.Error()) - } else if cc.Cost != 1 { - t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) - } - tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") - tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z") - cd = engine.CallDescriptor{ - Direction: "*out", - Category: "call", - Tenant: "cgrates.org", - Subject: "1001", - Account: "1001", - Destination: "1003", - DurationIndex: 0, - TimeStart: tStart, - TimeEnd: tEnd, - } - if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil { - t.Error("Got error on Responder.GetCost: ", err.Error()) - } else if cc.Cost != 1.3 { - t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) - } - tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") - tEnd, _ = utils.ParseDate("2014-08-04T13:00:20Z") - cd = engine.CallDescriptor{ - Direction: "*out", - Category: "call", - Tenant: "cgrates.org", - Subject: "1001", - Account: "1001", - Destination: "1004", - DurationIndex: 0, - TimeStart: tStart, - TimeEnd: tEnd, - } - if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil { - t.Error("Got error on Responder.GetCost: ", err.Error()) - } else if cc.Cost != 1 { - t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) - } - tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") - tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z") - cd = engine.CallDescriptor{ - Direction: "*out", - Category: "call", - Tenant: "cgrates.org", - Subject: "1001", - Account: "1001", - Destination: "1004", - DurationIndex: 0, - TimeStart: tStart, - TimeEnd: tEnd, - } - if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil { - t.Error("Got error on Responder.GetCost: ", err.Error()) - } else if cc.Cost != 1.3 { - t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) - } -} - func TestTutFsCallsCdrStats(t *testing.T) { if !*testCalls { return diff --git a/general_tests/tutorial_local_test.go b/general_tests/tutorial_local_test.go new file mode 100644 index 000000000..b09122bb4 --- /dev/null +++ b/general_tests/tutorial_local_test.go @@ -0,0 +1,322 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can Storagetribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITH*out ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package general_tests + +import ( + "net/rpc" + "net/rpc/jsonrpc" + //"os" + "path" + "reflect" + //"strings" + "testing" + "time" + + //"github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var tutLocalCfgPath string +var tutFsLocalCfg *config.CGRConfig +var tutLocalRpc *rpc.Client + +func TestTutLocalInitCfg(t *testing.T) { + if !*testLocal { + return + } + tutLocalCfgPath = path.Join(*dataDir, "conf", "samples", "cgradmin") + // Init config first + var err error + tutFsLocalCfg, err = config.NewCGRConfigFromFolder(tutLocalCfgPath) + if err != nil { + t.Error(err) + } + tutFsLocalCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(tutFsLocalCfg) +} + +// Remove data in both rating and accounting db +func TestTutLocalResetDataDb(t *testing.T) { + if !*testLocal { + return + } + if err := engine.InitDataDb(tutFsLocalCfg); err != nil { + t.Fatal(err) + } +} + +// Wipe out the cdr database +func TestTutLocalResetStorDb(t *testing.T) { + if !*testLocal { + return + } + if err := engine.InitStorDb(tutFsLocalCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func TestTutLocalStartEngine(t *testing.T) { + if !*testLocal { + return + } + if _, err := engine.StopStartEngine(tutLocalCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestTutLocalRpcConn(t *testing.T) { + if !*testLocal { + return + } + var err error + tutLocalRpc, err = jsonrpc.Dial("tcp", tutFsLocalCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +// Load the tariff plan, creating accounts and their balances +func TestTutLocalLoadTariffPlanFromFolder(t *testing.T) { + if !*testLocal { + return + } + reply := "" + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} + if err := tutLocalRpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { + t.Error(err) + } else if reply != "OK" { + t.Error(reply) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups +} + +// Check loaded stats +func TestTutLocalCacheStats(t *testing.T) { + if !*testLocal { + return + } + var rcvStats *utils.CacheStats + expectedStats := &utils.CacheStats{Destinations: 4, RatingPlans: 3, RatingProfiles: 8, Actions: 6, SharedGroups: 1, RatingAliases: 1, AccountAliases: 1, DerivedChargers: 1} + var args utils.AttrCacheStats + if err := tutLocalRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { + t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) + } else if !reflect.DeepEqual(expectedStats, rcvStats) { + t.Errorf("Calling ApierV1.GetCacheStats expected: %v, received: %v", expectedStats, rcvStats) + } +} + +// Check items age +func TestTutLocalGetCachedItemAge(t *testing.T) { + if !*testLocal { + return + } + var rcvAge *utils.CachedItemAge + if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "1002", &rcvAge); err != nil { + t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) + } else if rcvAge.Destination > time.Duration(2)*time.Second { + t.Errorf("Cache too old: %d", rcvAge) + } + if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "RP_RETAIL1", &rcvAge); err != nil { + t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) + } else if rcvAge.RatingPlan > time.Duration(2)*time.Second { + t.Errorf("Cache too old: %d", rcvAge) + } + if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "*out:cgrates.org:call:*any", &rcvAge); err != nil { + t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) + } else if rcvAge.RatingProfile > time.Duration(2)*time.Second { + t.Errorf("Cache too old: %d", rcvAge) + } + if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "LOG_WARNING", &rcvAge); err != nil { + t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) + } else if rcvAge.Action > time.Duration(2)*time.Second { + t.Errorf("Cache too old: %d", rcvAge) + } + if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "SHARED_A", &rcvAge); err != nil { + t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) + } else if rcvAge.SharedGroup > time.Duration(2)*time.Second { + t.Errorf("Cache too old: %d", rcvAge) + } + /* + if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "1006", &rcvAge); err != nil { + t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) + } else if rcvAge.RatingAlias > time.Duration(2)*time.Second { + t.Errorf("Cache too old: %d", rcvAge) + } + if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "1006", &rcvAge); err != nil { + t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error()) + } else if rcvAge.RatingAlias > time.Duration(2)*time.Second || rcvAge.AccountAlias > time.Duration(2)*time.Second { + t.Errorf("Cache too old: %d", rcvAge) + } + */ +} + +// Check call costs +func TestTutLocalGetCosts(t *testing.T) { + if !*testLocal { + return + } + tStart, _ := utils.ParseDate("2014-08-04T13:00:00Z") + tEnd, _ := utils.ParseDate("2014-08-04T13:00:20Z") + cd := engine.CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: "cgrates.org", + Subject: "1001", + Account: "1001", + Destination: "1002", + DurationIndex: 0, + TimeStart: tStart, + TimeEnd: tEnd, + } + var cc engine.CallCost + if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil { + t.Error("Got error on Responder.GetCost: ", err.Error()) + } else if cc.Cost != 0.6 { + t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) + } + tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") + tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z") + cd = engine.CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: "cgrates.org", + Subject: "1001", + Account: "1001", + Destination: "1002", + DurationIndex: 0, + TimeStart: tStart, + TimeEnd: tEnd, + } + if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil { + t.Error("Got error on Responder.GetCost: ", err.Error()) + } else if cc.Cost != 0.6417 { // 0.01 first minute, 0.04 25 seconds with RT_20CNT + t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) + } + tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") + tEnd, _ = utils.ParseDate("2014-08-04T13:00:20Z") + cd = engine.CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: "cgrates.org", + Subject: "1001", + Account: "1001", + Destination: "1003", + DurationIndex: 0, + TimeStart: tStart, + TimeEnd: tEnd, + } + if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil { + t.Error("Got error on Responder.GetCost: ", err.Error()) + } else if cc.Cost != 1 { + t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) + } + tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") + tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z") + cd = engine.CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: "cgrates.org", + Subject: "1001", + Account: "1001", + Destination: "1003", + DurationIndex: 0, + TimeStart: tStart, + TimeEnd: tEnd, + } + if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil { + t.Error("Got error on Responder.GetCost: ", err.Error()) + } else if cc.Cost != 1.3 { + t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) + } + tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") + tEnd, _ = utils.ParseDate("2014-08-04T13:00:20Z") + cd = engine.CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: "cgrates.org", + Subject: "1001", + Account: "1001", + Destination: "1004", + DurationIndex: 0, + TimeStart: tStart, + TimeEnd: tEnd, + } + if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil { + t.Error("Got error on Responder.GetCost: ", err.Error()) + } else if cc.Cost != 1 { + t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) + } + tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z") + tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z") + cd = engine.CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: "cgrates.org", + Subject: "1001", + Account: "1001", + Destination: "1004", + DurationIndex: 0, + TimeStart: tStart, + TimeEnd: tEnd, + } + if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil { + t.Error("Got error on Responder.GetCost: ", err.Error()) + } else if cc.Cost != 1.3 { + t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost) + } +} + +// Check call costs +func TestTutLocalMaxDebit(t *testing.T) { + if !*testLocal { + return + } + tStart, _ := utils.ParseDate("2014-08-04T13:00:00Z") + tEnd, _ := utils.ParseDate("2014-08-04T13:00:20Z") + cd := engine.CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: "cgrates.org", + Subject: "1001", + Account: "1001", + Destination: "1002", + DurationIndex: 0, + TimeStart: tStart, + TimeEnd: tEnd, + } + var cc engine.CallCost + if err := tutLocalRpc.Call("Responder.MaxDebit", cd, &cc); err != nil { + t.Error("Got error on Responder.GetCost: ", err.Error()) + } else if cc.GetDuration() == 20 { + t.Errorf("Calling Responder.MaxDebit got callcost: %v", cc.GetDuration()) + } +} + +func TestTutLocalStopCgrEngine(t *testing.T) { + if !*testLocal { + return + } + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 1d4c532bd..641988d35 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -101,27 +101,31 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session { } // Disconnects a session by sending hangup command to freeswitch -func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify string) { +func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error { if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not send disconect api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) + return err } if notify == INSUFFICIENT_FUNDS { if len(sm.cfg.EmptyBalanceContext) != 0 { if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not transfer the call to empty balance context, error: <%s>, connId: %s", err.Error(), connId)) + return err } - return + return nil } else if len(sm.cfg.EmptyBalanceAnnFile) != 0 { if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) + return err } - return + return nil } } if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) + return err } - return + return nil } // Remove session from session list, removes all related in case of multiple runs diff --git a/sessionmanager/fssessionmanager_test.go b/sessionmanager/fssessionmanager_test.go index b4492c1c5..b3449dab1 100644 --- a/sessionmanager/fssessionmanager_test.go +++ b/sessionmanager/fssessionmanager_test.go @@ -1,6 +1,6 @@ /* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012-2015 ITsysCOM +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -18,17 +18,10 @@ along with this program. If not, see package sessionmanager -// "github.com/cgrates/cgrates/timespans" -// "testing" +import ( + "testing" +) -/*func TestConnect(t *testing.T) { - sm := &FSSessionManager{} - sm.Connect(&SessionDelegate{×pans.Responder{}}, "localhost:8021", "ClueCon") - //for { - ev := sm.readNextEvent() - if ev == nil { - t.Error("Got nil event!") - } - //log.Print(ev) - //} -}*/ +func TestFSSMInterface(t *testing.T) { + var _ SessionManager = SessionManager(new(FSSessionManager)) +} diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index af510e6bb..94abc2499 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -134,12 +134,14 @@ func (self *KamailioSessionManager) Connect() error { return err } -func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, notify string) { +func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error { sessionIds := ev.GetSessionIds() disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: sessionIds[0], HashId: sessionIds[1], Reason: notify} if err := self.conns[connId].Send(disconnectEv.String()); err != nil { engine.Logger.Err(fmt.Sprintf(" Failed sending disconnect request, error %s, connection id: %s", err.Error(), connId)) + return err } + return nil } func (self *KamailioSessionManager) RemoveSession(uuid string) { for i, ss := range self.sessions { diff --git a/sessionmanager/kamailiosm_test.go b/sessionmanager/kamailiosm_test.go new file mode 100644 index 000000000..174ff960e --- /dev/null +++ b/sessionmanager/kamailiosm_test.go @@ -0,0 +1,27 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package sessionmanager + +import ( + "testing" +) + +func TestKamSMInterface(t *testing.T) { + var _ SessionManager = SessionManager(new(KamailioSessionManager)) +} diff --git a/sessionmanager/osipsevent.go b/sessionmanager/osipsevent.go index 29b6b4a77..017261db3 100644 --- a/sessionmanager/osipsevent.go +++ b/sessionmanager/osipsevent.go @@ -20,6 +20,7 @@ package sessionmanager import ( "encoding/json" + "fmt" "strings" "time" @@ -80,7 +81,7 @@ func (osipsev *OsipsEvent) GetCgrId() string { } func (osipsev *OsipsEvent) GetUUID() string { - return osipsev.osipsEvent.AttrValues[CALLID] + ";" + osipsev.osipsEvent.AttrValues[FROM_TAG] + ";" + osipsev.osipsEvent.AttrValues[TO_TAG] + return osipsev.osipsEvent.AttrValues[CALLID] } // Returns the dialog identifier which opensips needs to disconnect a dialog @@ -187,11 +188,38 @@ func (osipsEv *OsipsEvent) GetOriginatorIP(fieldName string) string { return osipsEv.osipsEvent.OriginatorAddress.IP.String() } func (osipsev *OsipsEvent) MissingParameter() bool { + engine.Logger.Debug(fmt.Sprintf("Missing parameters on: %+v", osipsev.osipsEvent)) + var nilTime time.Time if osipsev.GetName() == "E_ACC_EVENT" && osipsev.osipsEvent.AttrValues["method"] == "INVITE" { return len(osipsev.GetUUID()) == 0 || len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 || len(osipsev.GetDestination(utils.META_DEFAULT)) == 0 || len(osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID]) == 0 + } else if osipsev.GetName() == "E_ACC_EVENT" && osipsev.osipsEvent.AttrValues["method"] == "BYE" { + return len(osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID]) == 0 || + len(osipsev.osipsEvent.AttrValues[TIME]) == 0 + } else if osipsev.GetName() == "E_ACC_EVENT" && osipsev.osipsEvent.AttrValues["method"] == "UPDATE" { // Updated event out of start/stop + // Data needed when stopping a prepaid loop or building a CDR with start/stop event + setupTime, err := osipsev.GetSetupTime(TIME) + if err != nil || setupTime.Equal(nilTime) { + return true + } + aTime, err := osipsev.GetAnswerTime(utils.META_DEFAULT) + if err != nil || aTime.Equal(nilTime) { + return true + } + endTime, err := osipsev.GetEndTime() + if err != nil || endTime.Equal(nilTime) { + return true + } + _, err = osipsev.GetDuration(utils.META_DEFAULT) + if err != nil { + return true + } + if osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID] == "" { + return true + } + return false } return true } @@ -213,6 +241,10 @@ func (osipsev *OsipsEvent) GetExtraFields() map[string]string { return extraFields } +func (osipsev *OsipsEvent) DialogId() string { + return osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID] +} + func (osipsEv *OsipsEvent) AsStoredCdr() *engine.StoredCdr { storCdr := new(engine.StoredCdr) storCdr.CgrId = osipsEv.GetCgrId() @@ -244,5 +276,6 @@ func (osipsEv *OsipsEvent) updateDurationFromEvent(updatedOsipsEv *OsipsEvent) e } answerTime, err := osipsEv.GetAnswerTime(utils.META_DEFAULT) osipsEv.osipsEvent.AttrValues[OSIPS_DURATION] = endTime.Sub(answerTime).String() + osipsEv.osipsEvent.AttrValues["method"] = "UPDATE" // So we can know it is an end event return nil } diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index 89440f49f..00c92e3f4 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -79,8 +79,8 @@ duration:: */ -func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engine.Connector) (*OsipsSessionManager, error) { - osm := &OsipsSessionManager{cfg: smOsipsCfg, rater: rater, cdrsrv: cdrsrv} +func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engine.Connector, cdrDb engine.CdrStorage) (*OsipsSessionManager, error) { + osm := &OsipsSessionManager{cfg: smOsipsCfg, rater: rater, cdrsrv: cdrsrv, cdrDb: cdrDb, cdrStartEvents: make(map[string]*OsipsEvent)} osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){ "E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, // Raised when OpenSIPS starts so we can register our event handlers "E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if cdr_flag is configured @@ -94,10 +94,13 @@ type OsipsSessionManager struct { cfg *config.SmOsipsConfig rater engine.Connector cdrsrv engine.Connector + cdrDb engine.CdrStorage eventHandlers map[string][]func(*osipsdagram.OsipsEvent) evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it stopServing chan struct{} // Stop serving datagrams miConn *osipsdagram.OsipsMiDatagramConnector // Pool of connections used to various OpenSIPS servers, keep reference towards events received so we can issue commands always to the same remote + sessions []*Session + cdrStartEvents map[string]*OsipsEvent // Used when building CDRs } func (osm *OsipsSessionManager) Connect() (err error) { @@ -118,14 +121,21 @@ func (osm *OsipsSessionManager) Connect() (err error) { return errors.New(" Stopped reading events") } func (osm *OsipsSessionManager) RemoveSession(uuid string) { - return + for i, ss := range osm.sessions { + if ss.eventStart.GetUUID() == uuid { + osm.sessions = append(osm.sessions[:i], osm.sessions[i+1:]...) + return + } + } } func (osm *OsipsSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error { return nil } func (osm *OsipsSessionManager) DebitInterval() time.Duration { - var nilDuration time.Duration - return nilDuration + return osm.cfg.DebitInterval +} +func (osm *OsipsSessionManager) CdrDb() engine.CdrStorage { + return osm.cdrDb } func (osm *OsipsSessionManager) DbLogger() engine.LogStorage { return nil @@ -139,6 +149,39 @@ func (osm *OsipsSessionManager) WarnSessionMinDuration(sessionUuid, connId strin func (osm *OsipsSessionManager) Shutdown() error { return nil } +func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error { + var reply string + return osm.cdrsrv.ProcessCdr(storedCdr, &reply) +} + +func (osm *OsipsSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error { + sessionIds := ev.GetSessionIds() + if len(sessionIds) != 2 { + errMsg := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds) + engine.Logger.Err(fmt.Sprintf(" " + errMsg)) + return errors.New(errMsg) + } + cmd := fmt.Sprintf(":dlg_end_dlg:\n%s\n%s\n\n", sessionIds[0], sessionIds[1]) + if reply, err := osm.miConn.SendCommand([]byte(cmd)); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed disconnecting session for event: %+v, notify: %s, dialogId: %v, error: <%s>", ev, notify, sessionIds, err)) + return err + } else if !bytes.HasPrefix(reply, []byte("200 OK")) { + errStr := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds) + engine.Logger.Err(" " + errStr) + return errors.New(errStr) + } + return nil +} + +// Searches and return the session with the specifed uuid +func (osm *OsipsSessionManager) getSession(uuid string) *Session { + for _, s := range osm.sessions { + if s.eventStart.GetUUID() == uuid { + return s + } + } + return nil +} // Automatic subscribe to OpenSIPS for events, trigered on Connect or OpenSIPS restart func (osm *OsipsSessionManager) SubscribeEvents(evStop chan struct{}) error { @@ -151,8 +194,7 @@ func (osm *OsipsSessionManager) SubscribeEvents(evStop chan struct{}) error { return nil case <-time.After(osm.cfg.EventsSubscribeInterval): // Subscribe on interval if err := osm.subscribeEvents(); err != nil { - close(osm.stopServing) // Do not serve anymore since we got errors on subscribing - return err + close(osm.stopServing) // Order stop serving, do not return here since we will block the channel consuming } } } @@ -191,6 +233,13 @@ func (osm *OsipsSessionManager) onOpensipsStart(cdrDagram *osipsdagram.OsipsEven go osm.SubscribeEvents(osm.evSubscribeStop) } +func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) { + osipsEv, _ := NewOsipsEvent(cdrDagram) + if err := osm.ProcessCdr(osipsEv.AsStoredCdr()); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", osipsEv.GetCgrId(), osipsEv.GetUUID(), err.Error())) + } +} + func (osm *OsipsSessionManager) onAccEvent(osipsDgram *osipsdagram.OsipsEvent) { osipsEv, _ := NewOsipsEvent(osipsDgram) if osipsEv.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request @@ -200,44 +249,19 @@ func (osm *OsipsSessionManager) onAccEvent(osipsDgram *osipsdagram.OsipsEvent) { if err := osm.callStart(osipsEv); err != nil { engine.Logger.Err(fmt.Sprintf(" Failed processing CALL_START out of %+v, error: <%s>", osipsDgram, err.Error())) } + if err := osm.processCdrStart(osipsEv); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed processing cdr start out of %+v, error: <%s>", osipsDgram, err.Error())) + } } else if osipsDgram.AttrValues["method"] == "BYE" { if err := osm.callEnd(osipsEv); err != nil { engine.Logger.Err(fmt.Sprintf(" Failed processing CALL_END out of %+v, error: <%s>", osipsDgram, err.Error())) } + if err := osm.processCdrStop(osipsEv); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed processing cdr stop out of %+v, error: <%s>", osipsDgram, err.Error())) + } } } -func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) { - osipsEv, _ := NewOsipsEvent(cdrDagram) - if err := osm.ProcessCdr(osipsEv.AsStoredCdr()); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", osipsEv.GetCgrId(), osipsEv.GetUUID(), err.Error())) - } -} - -func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error { - var reply string - return osm.cdrsrv.ProcessCdr(storedCdr, &reply) -} - -func (osm *OsipsSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error { - sessionIds := ev.GetSessionIds() - if len(sessionIds) != 2 { - errMsg := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds) - engine.Logger.Err(fmt.Sprintf(" " + errMsg)) - return errors.New(errMsg) - } - cmd := fmt.Sprintf(":dlg_end_dlg:\n%s\n%s\n\n", sessionIds[0], sessionIds[1]) - if reply, err := osm.miConn.SendCommand([]byte(cmd)); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed disconnecting session for event: %+v, notify: %s, dialogId: %v, error: <%s>", ev, notify, sessionIds, err)) - return err - } else if !bytes.HasPrefix(reply, []byte("200 OK")) { - errStr := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds) - engine.Logger.Err(" " + errStr) - return errors.New(errStr) - } - return nil -} - func (osm *OsipsSessionManager) callStart(osipsEv *OsipsEvent) error { if osipsEv.MissingParameter() { if err := osm.DisconnectSession(osipsEv, "", utils.ERR_MANDATORY_IE_MISSING); err != nil { @@ -245,9 +269,61 @@ func (osm *OsipsSessionManager) callStart(osipsEv *OsipsEvent) error { } return errors.New(utils.ERR_MANDATORY_IE_MISSING) } + s := NewSession(osipsEv, "", osm) + if s != nil { + osm.sessions = append(osm.sessions, s) + } return nil } func (osm *OsipsSessionManager) callEnd(osipsEv *OsipsEvent) error { + s := osm.getSession(osipsEv.GetUUID()) + if s == nil { // Not handled by us + return nil + } + osm.RemoveSession(s.eventStart.GetUUID()) // Unreference it early so we avoid concurrency + origEvent := s.eventStart.(*OsipsEvent) // Need a complete event for methods in close + if err := origEvent.updateDurationFromEvent(osipsEv); err != nil { + return err + } + if origEvent.MissingParameter() { + return errors.New(utils.ERR_MANDATORY_IE_MISSING) + } + if err := s.Close(origEvent); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database + return err + } return nil } + +// Records the event start in case of received so we can create CDR out of it +func (osm *OsipsSessionManager) processCdrStart(osipsEv *OsipsEvent) error { + if osm.cdrsrv == nil { + return nil + } + if dialogId := osipsEv.DialogId(); dialogId == "" { + return errors.New("Missing dialog_id") + } else { + osm.cdrStartEvents[dialogId] = osipsEv + } + return nil +} + +// processCdrStop builds the complete CDR out of eventStart+eventStop and sends it to the CDRS component +func (osm *OsipsSessionManager) processCdrStop(osipsEv *OsipsEvent) error { + if osm.cdrsrv == nil { + return nil + } + var osipsEvStart *OsipsEvent + var hasIt bool + if dialogId := osipsEv.DialogId(); dialogId == "" { + return errors.New("Missing dialog_id") + } else if osipsEvStart, hasIt = osm.cdrStartEvents[dialogId]; !hasIt { + return errors.New("Missing event start info") + } else { + delete(osm.cdrStartEvents, dialogId) // Cleanup the event once we got it + } + if err := osipsEvStart.updateDurationFromEvent(osipsEv); err != nil { + return err + } + return osm.ProcessCdr(osipsEvStart.AsStoredCdr()) +} diff --git a/sessionmanager/osipssm_test.go b/sessionmanager/osipssm_test.go new file mode 100644 index 000000000..fa9baae64 --- /dev/null +++ b/sessionmanager/osipssm_test.go @@ -0,0 +1,27 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package sessionmanager + +import ( + "testing" +) + +func TestOsipsSMInterface(t *testing.T) { + var _ SessionManager = SessionManager(new(OsipsSessionManager)) +} diff --git a/sessionmanager/session.go b/sessionmanager/session.go index e5b710c12..0a7b2cf1a 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -111,7 +111,7 @@ func (s *Session) debitLoop(runIdx int) { func (s *Session) Close(ev engine.Event) error { close(s.stopDebit) // Close the channel so all the sessionRuns listening will be notified if _, err := ev.GetEndTime(); err != nil { - engine.Logger.Err("Error parsing answer event stop time.") + engine.Logger.Err("Error parsing event stop time.") for idx := range s.sessionRuns { s.sessionRuns[idx].CallDescriptor.TimeEnd = s.sessionRuns[idx].CallDescriptor.TimeStart.Add(s.sessionRuns[idx].CallDescriptor.DurationIndex) } diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index ce101ac72..3673b361e 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -29,7 +29,7 @@ type SessionManager interface { Rater() engine.Connector DebitInterval() time.Duration Connect() error - DisconnectSession(engine.Event, string, string) + DisconnectSession(engine.Event, string, string) error WarnSessionMinDuration(string, string) RemoveSession(string) ProcessCdr(*engine.StoredCdr) error