diff --git a/agents/asterisk_event.go b/agents/asterisk_event.go index dd553db2a..a51986b62 100644 --- a/agents/asterisk_event.go +++ b/agents/asterisk_event.go @@ -19,6 +19,7 @@ along with this program. If not, see package agents import ( + "fmt" "strings" "github.com/cgrates/cgrates/config" @@ -244,6 +245,7 @@ func (smaEv *SMAsteriskEvent) AsMapStringInterface() (mp map[string]interface{}) for extraKey, extraVal := range smaEv.ExtraParameters() { // Append extraParameters mp[extraKey] = extraVal } + mp[utils.Source] = utils.AsteriskAgent return } @@ -273,37 +275,19 @@ func (smaEv *SMAsteriskEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) GetMaxUsage: true, CGREvent: *cgrEv, } - // For the moment hardcoded only GetMaxUsage : true - /* - subsystems, has := kev[KamCGRSubsystems] - if !has { - return - } - if strings.Index(subsystems, utils.MetaAccounts) == -1 { - args.GetMaxUsage = false - } - if strings.Index(subsystems, utils.MetaResources) != -1 { - args.AuthorizeResources = true - } - if strings.Index(subsystems, utils.MetaSuppliers) != -1 { - args.GetSuppliers = true - if strings.Index(subsystems, utils.MetaSuppliersEventCost) != -1 { - args.SuppliersMaxCost = utils.MetaEventCost - } - if strings.Index(subsystems, utils.MetaSuppliersIgnoreErrors) != -1 { - args.SuppliersIgnoreErrors = true - } - } - if strings.Index(subsystems, utils.MetaAttributes) != -1 { - args.GetAttributes = true - } - if strings.Index(subsystems, utils.MetaThresholds) != -1 { - args.ProcessThresholds = utils.BoolPointer(true) - } - if strings.Index(subsystems, utils.MetaStats) != -1 { - args.ProcessStatQueues = utils.BoolPointer(true) - } - */ + if smaEv.Subsystems() == utils.EmptyString { + return + } + args.GetMaxUsage = strings.Index(smaEv.Subsystems(), utils.MetaAccounts) != -1 + args.AuthorizeResources = strings.Index(smaEv.Subsystems(), utils.MetaResources) != -1 + args.GetSuppliers = strings.Index(smaEv.Subsystems(), utils.MetaSuppliers) != -1 + args.SuppliersIgnoreErrors = strings.Index(smaEv.Subsystems(), utils.MetaSuppliersIgnoreErrors) != -1 + if strings.Index(smaEv.Subsystems(), utils.MetaSuppliersEventCost) != -1 { + args.SuppliersMaxCost = utils.MetaEventCost + } + args.GetAttributes = strings.Index(smaEv.Subsystems(), utils.MetaAttributes) != -1 + args.ProcessThresholds = strings.Index(smaEv.Subsystems(), utils.MetaThresholds) != -1 + args.ProcessStats = strings.Index(smaEv.Subsystems(), utils.MetaStats) != -1 return } @@ -312,27 +296,17 @@ func (smaEv *SMAsteriskEvent) V1InitSessionArgs(cgrEv utils.CGREvent) (args *ses InitSession: true, CGREvent: cgrEv, } - /* - subsystems, has := kev[KamCGRSubsystems] - if !has { - return - } - if strings.Index(subsystems, utils.MetaAccounts) == -1 { - args.InitSession = false - } - if strings.Index(subsystems, utils.MetaResources) != -1 { - args.AllocateResources = true - } - if strings.Index(subsystems, utils.MetaAttributes) != -1 { - args.GetAttributes = true - } - if strings.Index(subsystems, utils.MetaThresholds) != -1 { - args.ProcessThresholds = utils.BoolPointer(true) - } - if strings.Index(subsystems, utils.MetaStats) != -1 { - args.ProcessStatQueues = utils.BoolPointer(true) - } - */ + subsystems, err := cgrEv.FieldAsString(utils.CGRSubsystems) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> event: %s don't have cgr_subsystems variable", + utils.AsteriskAgent, utils.ToJSON(cgrEv))) + return nil + } + args.InitSession = strings.Index(subsystems, utils.MetaAccounts) != -1 + args.AllocateResources = strings.Index(subsystems, utils.MetaResources) != -1 + args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1 + args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 + args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 return } @@ -341,23 +315,15 @@ func (smaEv *SMAsteriskEvent) V1TerminateSessionArgs(cgrEv utils.CGREvent) (args TerminateSession: true, CGREvent: cgrEv, } - /* - subsystems, has := kev[KamCGRSubsystems] - if !has { - return - } - if strings.Index(subsystems, utils.MetaAccounts) == -1 { - args.TerminateSession = false - } - if strings.Index(subsystems, utils.MetaResources) != -1 { - args.ReleaseResources = true - } - if strings.Index(subsystems, utils.MetaThresholds) != -1 { - args.ProcessThresholds = utils.BoolPointer(true) - } - if strings.Index(subsystems, utils.MetaStats) != -1 { - args.ProcessStatQueues = utils.BoolPointer(true) - } - */ + subsystems, err := cgrEv.FieldAsString(utils.CGRSubsystems) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> event: %s don't have cgr_subsystems variable", + utils.AsteriskAgent, utils.ToJSON(cgrEv))) + return nil + } + args.TerminateSession = strings.Index(subsystems, utils.MetaAccounts) != -1 + args.ReleaseResources = strings.Index(subsystems, utils.MetaResources) != -1 + args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 + args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 return } diff --git a/agents/asteriskagent.go b/agents/asteriskagent.go index f3c655b94..768e2838c 100644 --- a/agents/asteriskagent.go +++ b/agents/asteriskagent.go @@ -19,6 +19,7 @@ along with this program. If not, see package agents import ( + "encoding/json" "fmt" "net/url" "strconv" @@ -99,6 +100,7 @@ func (sma *AsteriskAgent) ListenAndServe() (err error) { case astRawEv := <-sma.astEvChan: smAsteriskEvent := NewSMAsteriskEvent(astRawEv, strings.Split(sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ":")[0]) + switch smAsteriskEvent.EventType() { case ARIStasisStart: go sma.handleStasisStart(smAsteriskEvent) @@ -192,12 +194,13 @@ func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) { if *authReply.MaxUsage == time.Duration(0) { sma.hangupChannel(ev.ChannelID(), "") return - } else if *authReply.MaxUsage != time.Duration(-1) { - // Set absolute timeout for non-postpaid calls - if !sma.setChannelVar(ev.ChannelID(), CGRMaxSessionTime, - strconv.Itoa(int(authReply.MaxUsage.Seconds()*1000))) { - return - } + } else if *authReply.MaxUsage == time.Duration(-1) { + *authReply.MaxUsage = sma.cgrCfg.SessionSCfg().MaxCallDuration + } + // Set absolute timeout for non-postpaid calls + if !sma.setChannelVar(ev.ChannelID(), CGRMaxSessionTime, + strconv.Itoa(int(authReply.MaxUsage.Seconds()*1000))) { + return } } if authReply.ResourceAllocation != nil { @@ -267,7 +270,6 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) { sma.hangupChannel(ev.ChannelID(), "") return } - } // Channel disconnect @@ -328,6 +330,26 @@ func (sma *AsteriskAgent) Call(serviceMethod string, args interface{}, reply int return utils.RPCCall(sma, serviceMethod, args, reply) } -func (fsa *AsteriskAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) { - return utils.ErrNotImplemented +func (sma *AsteriskAgent) V1GetActiveSessionIDs(ignParam string, + sessionIDs *[]*sessions.SessionID) error { + var slMpIface []map[string]interface{} // decode the result from ari into a slice of map[string]interface{} + if mp, err := sma.astConn.Call( + aringo.HTTP_GET, + fmt.Sprintf("http://%s/ari/channels", + sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address), + nil); err != nil { + return err + } else if err := json.Unmarshal(mp, &slMpIface); err != nil { + return err + } + var sIDs []*sessions.SessionID + for _, mpIface := range slMpIface { + sIDs = append(sIDs, &sessions.SessionID{ + OriginHost: strings.Split(sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ":")[0], + OriginID: mpIface["id"].(string)}, + ) + } + *sessionIDs = sIDs + return nil + } diff --git a/agents/libhttpagent_test.go b/agents/libhttpagent_test.go index c35f83931..311a42ac7 100644 --- a/agents/libhttpagent_test.go +++ b/agents/libhttpagent_test.go @@ -130,4 +130,14 @@ func TestHttpXmlDPFieldAsInterface(t *testing.T) { } else if data != "37" { t.Errorf("expecting: 37, received: <%s>", data) } + if data, err := dP.FieldAsString([]string{"complete-success-notification", "callleg", "@calllegid"}); err != nil { + t.Error(err) + } else if data != "222146" { + t.Errorf("expecting: 222146, received: <%s>", data) + } + if data, err := dP.FieldAsString([]string{"complete-success-notification", "callleg[1]", "@calllegid"}); err != nil { + t.Error(err) + } else if data != "222147" { + t.Errorf("expecting: 222147, received: <%s>", data) + } } diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index c0bbf1141..ab7144d63 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -698,7 +698,7 @@ func TestApierLoadAccountActions(t *testing.T) { if err := rater.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) } else if !reflect.DeepEqual(expectedStats, rcvStats) { - t.Errorf("Calling ApierV1.GetCacheStats expected: %+v, received: %+v", expectedStats, rcvStats) + t.Errorf("Calling ApierV1.GetCacheStats expected: %+v, received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats)) } } diff --git a/apier/v1/cdre_it_test.go b/apier/v1/cdre_it_test.go index 4f0032cc4..fce76fdb4 100755 --- a/apier/v1/cdre_it_test.go +++ b/apier/v1/cdre_it_test.go @@ -24,6 +24,7 @@ import ( "net/rpc" "net/rpc/jsonrpc" "path" + "reflect" "testing" "time" @@ -49,6 +50,8 @@ var sTestsCDRE = []func(t *testing.T){ testCDReRPCConn, testCDReAddCDRs, testCDReExportCDRs, + testCDReFromFolder, + testCDReProcessExternalCdr, testCDReKillEngine, } @@ -156,6 +159,93 @@ func testCDReExportCDRs(t *testing.T) { } } +func testCDReFromFolder(t *testing.T) { + var reply string + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} + if err := cdreRPC.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { + t.Error(err) + } + time.Sleep(500 * time.Millisecond) +} + +// Test CDR from external sources +func testCDReProcessExternalCdr(t *testing.T) { + cdr := &engine.ExternalCDR{ToR: utils.VOICE, + OriginID: "testextcdr1", OriginHost: "127.0.0.1", Source: utils.UNIT_TEST, RequestType: utils.META_RATED, + Tenant: "cgrates.org", Category: "call", Account: "1003", Subject: "1003", Destination: "1001", + SetupTime: "2014-08-04T13:00:00Z", AnswerTime: "2014-08-04T13:00:07Z", + Usage: "1s", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + } + var reply string + if err := cdreRPC.Call("CdrsV1.ProcessExternalCdr", cdr, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if reply != utils.OK { + t.Error("Unexpected reply received: ", reply) + } + time.Sleep(50 * time.Millisecond) + var cdrs []*engine.ExternalCDR + args := utils.RPCCDRsFilter{OriginIDs: []string{"testextcdr1"}} + if err := cdreRPC.Call("ApierV2.GetCdrs", args, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 2 { + t.Errorf("Unexpected number of CDRs returned: %v, cdrs=%s ", len(cdrs), utils.ToJSON(cdrs)) + return + } else { + for _, c := range cdrs { + if c.RunID == utils.MetaRaw && c.Cost != -1 { + t.Errorf("Expected for *raw cdr cost to be -1, recived: %v", c.Cost) + } + if c.RunID == utils.MetaDefault && c.Cost != 0.3 { + t.Errorf("Expected for *default cdr cost to be 0.3, recived: %v", c.Cost) + } + if c.RunID == utils.MetaDefault { + acdr, err := engine.NewCDRFromExternalCDR(c, "") + if err != nil { + t.Error(err) + return + } + if acdr.CostDetails == nil { + t.Errorf("CostDetails should not be nil") + return + } + if acdr.CostDetails.Usage == nil { + t.Errorf("CostDetails for procesed cdr has usage nil") + } + if acdr.CostDetails.Cost == nil { + t.Errorf("CostDetails for procesed cdr has cost nil") + } + } + if c.Usage != "1s" { + t.Errorf("Expected 1s,recived %s", c.Usage) + } + if c.Source != utils.UNIT_TEST { + t.Errorf("Expected %s,recived %s", utils.UNIT_TEST, c.Source) + } + if c.ToR != utils.VOICE { + t.Errorf("Expected %s,recived %s", utils.VOICE, c.ToR) + } + if c.RequestType != utils.META_RATED { + t.Errorf("Expected %s,recived %s", utils.META_RATED, c.RequestType) + } + if c.Category != "call" { + t.Errorf("Expected call,recived %s", c.Category) + } + if c.Account != "1003" { + t.Errorf("Expected 1003,recived %s", c.Account) + } + if c.Subject != "1003" { + t.Errorf("Expected 1003,recived %s", c.Subject) + } + if c.Destination != "1001" { + t.Errorf("Expected 1001,recived %s", c.Destination) + } + if !reflect.DeepEqual(c.ExtraFields, cdr.ExtraFields) { + t.Errorf("Expected %s,recived %s", utils.ToJSON(c.ExtraFields), utils.ToJSON(cdr.ExtraFields)) + } + } + } +} + func testCDReKillEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index c207ab68b..5b9676040 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -393,13 +393,12 @@ func testV1STSUpdateStatQueueProfile(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - time.Sleep(time.Second) var reply *engine.StatQueueProfile if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil { t.Error(err) } else if !reflect.DeepEqual(statConfig, reply) { - t.Errorf("Expecting: %+v, received: %+v", statConfig, reply) + t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(statConfig), utils.ToJSON(reply)) } } @@ -411,6 +410,9 @@ func testV1STSRemoveStatQueueProfile(t *testing.T) { } else if resp != utils.OK { t.Error("Unexpected reply returned", resp) } + if tSv1ConfDIR == "tutmongo" { + time.Sleep(150 * time.Millisecond) + } var sqp *engine.StatQueueProfile if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &sqp); err == nil || err.Error() != utils.ErrNotFound.Error() { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 82fea2dbb..a23ebbfa5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -23,10 +23,13 @@ import ( "fmt" "log" "os" + "os/signal" + "path" "runtime" "runtime/pprof" "strconv" "strings" + "syscall" "time" "github.com/cgrates/cgrates/agents" @@ -62,8 +65,10 @@ var ( cfgDir = flag.String("config_dir", utils.CONFIG_DIR, "Configuration directory path.") version = flag.Bool("version", false, "Prints the application version.") pidFile = flag.String("pid", "", "Write pid file") - cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") - memprofile = flag.String("memprofile", "", "write memory profile to file") + cpuProfDir = flag.String("cpuprof_dir", "", "write cpu profile to files") + memProfDir = flag.String("memprof_dir", "", "write memory profile to file") + memProfInterval = flag.Duration("memprof_interval", 5*time.Second, "Time betwen memory profile saves") + memProfNrFiles = flag.Int("memprof_nrfiles", 1, "Number of memory profile to write") scheduledShutdown = flag.String("scheduled_shutdown", "", "shutdown the engine after this duration") singlecpu = flag.Bool("singlecpu", false, "Run on single CPU core") syslogger = flag.String("logger", "", "logger <*syslog|*stdout>") @@ -310,15 +315,19 @@ func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit smgRpcConn := <-internalSMGChan internalSMGChan <- smgRpcConn birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SMGeneric)) + var reply string for connIdx := range cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers sma, err := agents.NewAsteriskAgent(cfg, connIdx, birpcClnt) if err != nil { - utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err)) exitChan <- true return } + if err := birpcClnt.Call(utils.SessionSv1RegisterInternalBiJSONConn, "", &reply); err != nil { // for session sync + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err)) + } if err = sma.ListenAndServe(); err != nil { - utils.Logger.Err(fmt.Sprintf(" runtime error: %s!", err)) + utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err)) } } exitChan <- true @@ -1292,6 +1301,57 @@ func schedCDRsConns(internalCDRSChan chan rpcclient.RpcClientConnection, exitCha engine.SetSchedCdrsConns(cdrsConn) } +func memProfFile(memProfPath string) bool { + f, err := os.Create(memProfPath) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("could not create memory profile file: %s", err)) + return false + } + runtime.GC() // get up-to-date statistics + if err := pprof.WriteHeapProfile(f); err != nil { + utils.Logger.Crit(fmt.Sprintf("could not write memory profile: %s", err)) + f.Close() + return false + } + f.Close() + return true +} + +func memProfiling(memProfDir string, interval time.Duration, nrFiles int, exitChan chan bool) { + for i := 1; ; i++ { + time.Sleep(interval) + memPath := path.Join(memProfDir, fmt.Sprintf("mem%v.prof", i)) + if !memProfFile(memPath) { + exitChan <- true + } + if i%nrFiles == 0 { + i = 0 // reset the counting + } + } +} + +func cpuProfiling(cpuProfDir string, exitChan chan bool, stopChan, doneChan chan struct{}) { + cpuPath := path.Join(cpuProfDir, "cpu.prof") + f, err := os.Create(cpuPath) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("could not create cpu profile file: %s", err)) + exitChan <- true + return + } + pprof.StartCPUProfile(f) + <-stopChan + pprof.StopCPUProfile() + f.Close() + doneChan <- struct{}{} +} + +func shutdownSingnalHandler(exitChan chan bool) { + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + <-c + exitChan <- true +} + func main() { flag.Parse() if *version { @@ -1304,16 +1364,19 @@ func main() { if *singlecpu { runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases } + exitChan := make(chan bool) - if *cpuprofile != "" { - f, err := os.Create(*cpuprofile) - if err != nil { - log.Fatal(err) - } - defer f.Close() - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() + go shutdownSingnalHandler(exitChan) + + if *memProfDir != "" { + go memProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, exitChan) } + cpuProfChanStop := make(chan struct{}) + cpuProfChanDone := make(chan struct{}) + if *cpuProfDir != "" { + go cpuProfiling(*cpuProfDir, exitChan, cpuProfChanStop, cpuProfChanDone) + } + if *scheduledShutdown != "" { shutdownDur, err := utils.ParseDurationWithNanosecs(*scheduledShutdown) if err != nil { @@ -1347,6 +1410,7 @@ func main() { lgLevel = *logLevel } utils.Logger.SetLogLevel(lgLevel) + var loadDb engine.LoadStorage var cdrDb engine.CdrStorage var dm *engine.DataManager @@ -1481,8 +1545,6 @@ func main() { // Start FreeSWITCHAgent if cfg.FsAgentCfg().Enabled { go startFsAgent(internalSMGChan, exitChan) - // close all sessions on shutdown - go shutdownSessionmanagerSingnalHandler(exitChan) } // Start SM-Kamailio @@ -1574,18 +1636,13 @@ func main() { internalSMGChan, internalDispatcherSChan, internalAnalyzerSChan, exitChan) <-exitChan - if *memprofile != "" { - f, err := os.Create(*memprofile) - if err != nil { - log.Fatal("could not create memory profile file: ", err) - } - defer f.Close() - runtime.GC() // get up-to-date statistics - if err := pprof.WriteHeapProfile(f); err != nil { - log.Fatal("could not write memory profile: ", err) - } + if *cpuProfDir != "" { // wait to end cpuProfiling + cpuProfChanStop <- struct{}{} + <-cpuProfChanDone + } + if *memProfDir != "" { // write last memory profiling + memProfFile(path.Join(*memProfDir, "mem_final.prof")) } - if *pidFile != "" { if err := os.Remove(*pidFile); err != nil { utils.Logger.Warning("Could not remove pid file: " + err.Error()) diff --git a/cmd/cgr-engine/registration.go b/cmd/cgr-engine/registration.go deleted file mode 100644 index a65348da0..000000000 --- a/cmd/cgr-engine/registration.go +++ /dev/null @@ -1,57 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package main - -import ( - "fmt" - "os" - "os/signal" - "syscall" - - "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" -) - -/* -Listens for the SIGTERM, SIGINT, SIGQUIT system signals and closes the storage before exiting. -*/ -func stopRaterSignalHandler(internalCdrStatSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) - sig := <-c - - utils.Logger.Info(fmt.Sprintf("Caught signal %v", sig)) - var dummyInt int - select { - case cdrStats := <-internalCdrStatSChan: - cdrStats.Call("CDRStatsV1.Stop", dummyInt, &dummyInt) - default: - } - exitChan <- true -} - -/* -Listens for the SIGTERM, SIGINT, SIGQUIT system signals and shuts down the session manager. -*/ -func shutdownSessionmanagerSingnalHandler(exitChan chan bool) { - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) - <-c - exitChan <- true -} diff --git a/data/tariffplans/tutorial/Filters.csv b/data/tariffplans/tutorial/Filters.csv index 3b62ee7ef..143bd00b7 100644 --- a/data/tariffplans/tutorial/Filters.csv +++ b/data/tariffplans/tutorial/Filters.csv @@ -4,7 +4,7 @@ cgrates.org,FLTR_DST_FS,*string,Account,1001,2014-07-29T15:00:00Z cgrates.org,FLTR_DST_FS,*destinations,Destination,DST_FS, cgrates.org,FLTR_ACNT_1001_1002,*string,Account,1001;1002,2014-07-29T15:00:00Z cgrates.org,FLTR_ACNT_1001_1002,*string,RunID,*default, -cgrates.org,FLTR_ACNT_1001_1003,*string,Destination,1001;1002, +cgrates.org,FLTR_ACNT_1001_1002,*string,Destination,1001;1002;1003, cgrates.org,FLTR_ACNT_1001,*string,Account,1001,2014-07-29T15:00:00Z cgrates.org,FLTR_ACNT_1002,*string,Account,1002,2014-07-29T15:00:00Z cgrates.org,FLTR_ACNT_1003,*string,Account,1003,2014-07-29T15:00:00Z diff --git a/data/tariffplans/tutorial/Resources.csv b/data/tariffplans/tutorial/Resources.csv index 60af73072..945587cd6 100644 --- a/data/tariffplans/tutorial/Resources.csv +++ b/data/tariffplans/tutorial/Resources.csv @@ -1,2 +1,2 @@ #Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],TTL[4],Limit[5],AllocationMessage[6],Blocker[7],Stored[8],Weight[9],ThresholdIDs[10] -cgrates.org,ResGroup1,FLTR_RES,2014-07-29T15:00:00Z,3600s,7,,false,true,10, \ No newline at end of file +cgrates.org,ResGroup1,FLTR_RES,2014-07-29T15:00:00Z,-1,7,,false,true,10,*none \ No newline at end of file diff --git a/data/tariffplans/tutorial/Stats.csv b/data/tariffplans/tutorial/Stats.csv index a84b1490d..3323cb960 100644 --- a/data/tariffplans/tutorial/Stats.csv +++ b/data/tariffplans/tutorial/Stats.csv @@ -1,3 +1,3 @@ #Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],Metrics[6],MetricParams[7],Blocker[8],Stored[9],Weight[10],MinItems[11],ThresholdIDs[12] -cgrates.org,Stats2,FLTR_ACNT_1001_1002,2014-07-29T15:00:00Z,100,1s,*tcc;*tcd,,false,true,30,0, -cgrates.org,Stats2_1,FLTR_ACNT_1003_1001,2014-07-29T15:00:00Z,100,1s,*tcc;*tcd,,false,true,30,0, +cgrates.org,Stats2,FLTR_ACNT_1001_1002,2014-07-29T15:00:00Z,100,-1,*tcc;*tcd,,false,true,30,0,*none +cgrates.org,Stats2_1,FLTR_ACNT_1003_1001,2014-07-29T15:00:00Z,100,-1,*tcc;*tcd,,false,true,30,0,*none diff --git a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf index b2368c755..7c3c8da6c 100755 --- a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf +++ b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf @@ -2,7 +2,7 @@ exten => _1XXX,1,NoOp() same => n,Set(CGRMaxSessionTime=0); use it to disconnect automatically the call if CGRateS is not active same => n,DumpChan() - same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_supplier=supplier1,cgr_subsystems=*attributes;*accounts) + same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_supplier=supplier1,cgr_subsystems=*accounts*attributes*resources*stats*suppliers*thresholds) same => n,Dial(PJSIP/${EXTEN},30,L(${CGRMaxSessionTime})) same => n,Hangup() diff --git a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/http.conf b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/http.conf index 12b5e2e8b..9d1f51885 100644 --- a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/http.conf +++ b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/http.conf @@ -1,4 +1,4 @@ [general] enabled = yes -bindaddr = 127.0.0.1 +bindaddr = 0.0.0.0 bindport = 8088 \ No newline at end of file diff --git a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf index e99652665..a545acda4 100755 --- a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf +++ b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf @@ -1,7 +1,7 @@ [simpletrans] type=transport protocol=udp -bind=0.0.0.0 +bind=0.0.0.0:5080 [1001] type = endpoint @@ -27,11 +27,11 @@ password=CGRateS.org type = endpoint transport = simpletrans context = internal -aors = 1002 -auth = 1002 disallow = all allow = ulaw allow = alaw +aors = 1002 +auth = 1002 [1002] type = aor diff --git a/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json b/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json index ba6c82254..1a8cfb9d7 100644 --- a/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json @@ -5,6 +5,7 @@ "general": { "log_level": 7, + "node_id":"CGRAsterisk", }, @@ -28,21 +29,21 @@ "rals": { "enabled": true, "thresholds_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "stats_conns": [ - {"address": "*internal"} - ], - "attributes_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], }, "cdrs": { "enabled": true, + "sessions_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"}, + ], "stats_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "sessions_cost_retries": 5, }, @@ -51,80 +52,80 @@ "sessions": { "enabled": true, "rals_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "cdrs_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "resources_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "suppliers_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "attributes_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "stats_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "thresholds_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], - "debit_interval": "10s", + "debit_interval": "5s", + "channel_sync_interval":"7s", }, "asterisk_agent": { - "enabled": true, - "sessions_conns": [ - {"address": "*internal"} - ], - "create_cdr": true, - "asterisk_conns":[ - {"address": "192.168.56.103:8088", "user": "cgrates", + "enabled": true, + "asterisk_conns":[ + {"address": "192.168.56.203:8088", "user": "cgrates", "password": "CGRateS.org", "connect_attempts": 3,"reconnects": 10} ], + "sessions_conns": [ + {"address": "*internal"}, + ], + "create_cdr": true, }, "attributes": { - "enabled": true, + "enabled": true, + "string_indexed_fields": ["Account"], }, "resources": { "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], + "string_indexed_fields": ["Account"], }, "stats": { "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], + "string_indexed_fields": ["Account","RunID","Destination"], }, "thresholds": { "enabled": true, + "string_indexed_fields": ["Account"], }, "suppliers": { "enabled": true, "rals_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], "resources_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], "stats_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], + "string_indexed_fields": ["Account"], }, diff --git a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json index 60d12d4cb..89506a251 100644 --- a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -1,13 +1,11 @@ { -// Real-time Charging System for Telecom & ISP environments +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments // Copyright (C) ITsysCOM GmbH -// -// This file contains the default configuration hardcoded into CGRateS. -// This is what you get when you load CGRateS with an empty configuration file. "general": { - "log_level":7, + "log_level": 7, + "node_id":"CGRFreeswitch", }, @@ -31,16 +29,10 @@ "rals": { "enabled": true, "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "stats_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} - ], - "pubsubs_conns": [ - {"address": "*internal"} - ], - "attributes_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], }, @@ -48,10 +40,10 @@ "cdrs": { "enabled": true, "sessions_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "stats_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "sessions_cost_retries": 5, }, @@ -60,25 +52,25 @@ "sessions": { "enabled": true, "rals_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "cdrs_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "resources_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "suppliers_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "attributes_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "stats_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "debit_interval": "5s", "channel_sync_interval":"7s", @@ -87,17 +79,12 @@ "freeswitch_agent": { "enabled": true, - "sessions_conns": [ - {"address": "*internal"} - ], "event_socket_conns":[ {"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": -1,"alias":""} ], -}, - - -"pubsubs": { - "enabled": true, + "sessions_conns": [ + {"address": "*internal"}, + ], }, @@ -109,18 +96,12 @@ "resources": { "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], "string_indexed_fields": ["Account"], }, "stats": { "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], "string_indexed_fields": ["Account","RunID","Destination"], }, @@ -134,13 +115,13 @@ "suppliers": { "enabled": true, "rals_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], "resources_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], "stats_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], "string_indexed_fields": ["Account"], }, diff --git a/data/tutorials/fs_evsock/freeswitch/etc/freeswitch_conf.tar.gz b/data/tutorials/fs_evsock/freeswitch/etc/freeswitch_conf.tar.gz index a40e883a3..28ba3a0fb 100644 Binary files a/data/tutorials/fs_evsock/freeswitch/etc/freeswitch_conf.tar.gz and b/data/tutorials/fs_evsock/freeswitch/etc/freeswitch_conf.tar.gz differ diff --git a/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json b/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json index d947b5357..9a3701cb7 100644 --- a/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json @@ -5,6 +5,7 @@ "general": { "log_level": 7, + "node_id":"CGRKamailio", }, @@ -28,13 +29,10 @@ "rals": { "enabled": true, "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "stats_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} - ], - "attributes_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], }, @@ -42,10 +40,10 @@ "cdrs": { "enabled": true, "sessions_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "stats_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "sessions_cost_retries": 5, }, @@ -54,25 +52,25 @@ "sessions": { "enabled": true, "rals_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "cdrs_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "resources_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "suppliers_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "attributes_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "stats_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} + {"address": "127.0.0.1:2012", "transport": "*json"}, ], "debit_interval": "5s", "channel_sync_interval":"7s", @@ -81,18 +79,13 @@ "kamailio_agent": { "enabled": true, - "evapi_conns":[ // instantiate connections to multiple Kamailio servers + "evapi_conns":[ {"address": "127.0.0.1:8448", "reconnects": 5} ], "sessions_conns": [ - {"address": "*internal"} // connection towards session service: <*internal> + {"address": "*internal"}, ], - "create_cdr": true, // create CDR out of events and sends them to CDRS component -}, - - -"pubsubs": { - "enabled": true, + "create_cdr": true, }, @@ -104,20 +97,13 @@ "resources": { "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], "string_indexed_fields": ["Account"], - "prefix_indexed_fields": ["Destination"], }, "stats": { "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], - "string_indexed_fields": ["Account"], + "string_indexed_fields": ["Account","RunID","Destination"], }, @@ -130,17 +116,16 @@ "suppliers": { "enabled": true, "rals_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], "resources_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], "stats_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], "string_indexed_fields": ["Account"], - "prefix_indexed_fields": ["Destination"], }, -} +} \ No newline at end of file diff --git a/engine/cdr.go b/engine/cdr.go index 251c5fcb7..b8435625b 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -55,6 +55,7 @@ func NewCDRFromExternalCDR(extCdr *ExternalCDR, timezone string) (*CDR, error) { } } if len(extCdr.CostDetails) != 0 { + cdr.CostDetails = &EventCost{} if err = json.Unmarshal([]byte(extCdr.CostDetails), cdr.CostDetails); err != nil { return nil, err } diff --git a/engine/cdrs.go b/engine/cdrs.go index 66b5515ca..2cbbe8e42 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -283,6 +283,9 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, cdrstats, // Store rated CDRs if store { for _, ratedCDR := range ratedCDRs { + if ratedCDR.CostDetails != nil { + ratedCDR.CostDetails.Compute() + } if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil { utils.Logger.Err(fmt.Sprintf(" Storing rated CDR %+v, got error: %s", ratedCDR, err.Error())) } diff --git a/general_tests/tutorial_calls_test.go b/general_tests/tutorial_calls_test.go index cb897ab28..55c0ad396 100755 --- a/general_tests/tutorial_calls_test.go +++ b/general_tests/tutorial_calls_test.go @@ -27,6 +27,7 @@ import ( "os" "path" "reflect" + "strings" "testing" "time" @@ -39,7 +40,7 @@ import ( var tutorialCallsCfg *config.CGRConfig var tutorialCallsRpc *rpc.Client var tutorialCallsPjSuaListener *os.File -var waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache") +var waitRater = flag.Int("wait_rater", 1000, "Number of miliseconds to wait for rater to start and cache") var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") var fsConfig = flag.String("fsConfig", "/usr/share/cgrates/tutorials/fs_evsock", "FreeSwitch tutorial folder") var kamConfig = flag.String("kamConfig", "/usr/share/cgrates/tutorials/kamevapi", "Kamailio tutorial folder") @@ -72,6 +73,7 @@ var sTestsCalls = []func(t *testing.T){ testCallAccount1001, testCall1001Cdrs, testCall1002Cdrs, + testCall1003Cdrs, testCallStatMetrics, testCallCheckResourceRelease, testCallCheckThreshold1001After, @@ -104,14 +106,12 @@ func TestOpensipsCalls(t *testing.T) { } } -/* Need to be checked func TestAsteriskCalls(t *testing.T) { optConf = utils.Asterisk for _, stest := range sTestsCalls { t.Run("", stest) } } -*/ func testCallInitCfg(t *testing.T) { // Init config first @@ -243,11 +243,10 @@ func testCallRestartFS(t *testing.T) { // Connect rpc client to rater func testCallRpcConn(t *testing.T) { var err error - tutorialCallsRpc, err = jsonrpc.Dial("tcp", tutorialCallsCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + tutorialCallsRpc, err = jsonrpc.Dial("tcp", tutorialCallsCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal(err) } - time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups } // Load the tariff plan, creating accounts and their balances @@ -366,6 +365,7 @@ func testCallStartPjsuaListener(t *testing.T) { acnts, 5070, time.Duration(*waitRater)*time.Millisecond); err != nil { t.Fatal(err) } + time.Sleep(1 * time.Second) } // Call from 1001 (prepaid) to 1002 @@ -375,7 +375,8 @@ func testCallCall1001To1002(t *testing.T) { "sip:1002@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(67)*time.Second, 5071); err != nil { t.Fatal(err) } - time.Sleep(1 * time.Second) + // give time to session to start so we can check it + time.Sleep(time.Second) } // GetActiveSessions @@ -431,16 +432,18 @@ func testCallCall1003To1001(t *testing.T) { "sip:1001@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(20)*time.Second, 5074); err != nil { t.Fatal(err) } - time.Sleep(22 * time.Second) + // after this call from 1001 to 1003 and call from 1003 to 1001 should be done } // Call from 1003 (prepaid) to 1001 for 15 seconds func testCallCall1003To1001SecondTime(t *testing.T) { + time.Sleep(22 * time.Second) if err := engine.PjsuaCallUri( &engine.PjsuaAccount{Id: "sip:1003@127.0.0.1", Username: "1003", Password: "CGRateS.org", Realm: "*"}, "sip:1001@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(15)*time.Second, 5075); err != nil { t.Fatal(err) } + time.Sleep(time.Second) } // Check if the resource was Allocated @@ -461,15 +464,16 @@ func testCallCheckResourceAllocation(t *testing.T) { t.Errorf("Resources: %+v", utils.ToJSON(rs)) } for _, r := range *rs { - if r.ID == "ResGroup1" && (len(r.Usages) != 2 || len(r.TTLIdx) != 2) { + if r.ID == "ResGroup1" && len(r.Usages) != 3 { t.Errorf("Unexpected resource: %+v", utils.ToJSON(r)) } } + // Allow calls to finish before start querying the results + time.Sleep(time.Duration(50) * time.Second) } // Make sure account was debited properly func testCallAccount1001(t *testing.T) { - time.Sleep(time.Duration(60) * time.Second) // Allow calls to finish before start querying the results var reply *engine.Account attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} if err := tutorialCallsRpc.Call("ApierV2.GetAccount", attrs, &reply); err != nil { @@ -495,6 +499,10 @@ func testCall1001Cdrs(t *testing.T) { t.Errorf("Unexpected RequestType for CDR: %+v", cdr.RequestType) } if cdr.Destination == "1002" { + // in case of Asterisk take the integer part from usage + if optConf == utils.Asterisk { + cdr.Usage = strings.Split(cdr.Usage, ".")[0] + "s" + } if cdr.Usage != "1m7s" && cdr.Usage != "1m8s" { // Usage as seconds t.Errorf("Unexpected Usage for CDR: %+v", cdr.Usage) } @@ -502,6 +510,10 @@ func testCall1001Cdrs(t *testing.T) { t.Errorf("Unexpected CostSource for CDR: %+v", cdr.CostSource) } } else if cdr.Destination == "1003" { + // in case of Asterisk take the integer part from usage + if optConf == utils.Asterisk { + cdr.Usage = strings.Split(cdr.Usage, ".")[0] + "s" + } if cdr.Usage != "12s" && cdr.Usage != "13s" { // Usage as seconds t.Errorf("Unexpected Usage for CDR: %+v", cdr.Usage) } @@ -516,7 +528,8 @@ func testCall1001Cdrs(t *testing.T) { // Make sure account was debited properly func testCall1002Cdrs(t *testing.T) { var reply []*engine.ExternalCDR - req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, Accounts: []string{"1002"}, DestinationPrefixes: []string{"1001"}} + req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, + Accounts: []string{"1002"}, DestinationPrefixes: []string{"1001"}} if err := tutorialCallsRpc.Call("ApierV2.GetCdrs", req, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if len(reply) != 1 { @@ -525,7 +538,11 @@ func testCall1002Cdrs(t *testing.T) { if reply[0].RequestType != utils.META_POSTPAID { t.Errorf("Unexpected RequestType for CDR: %+v", reply[0].RequestType) } - if reply[0].Usage != "1m5s" { // Usage as seconds + // in case of Asterisk take the integer part from usage + if optConf == utils.Asterisk { + reply[0].Usage = strings.Split(reply[0].Usage, ".")[0] + "s" + } + if reply[0].Usage != "1m5s" && reply[0].Usage != "1m6s" { // Usage as seconds t.Errorf("Unexpected Usage for CDR: %+v", reply[0].Usage) } if reply[0].CostSource != utils.MetaCDRs { @@ -534,33 +551,86 @@ func testCall1002Cdrs(t *testing.T) { } } +// Make sure account was debited properly +func testCall1003Cdrs(t *testing.T) { + var reply []*engine.ExternalCDR + req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, + Accounts: []string{"1003"}, DestinationPrefixes: []string{"1001"}} + if err := tutorialCallsRpc.Call("ApierV2.GetCdrs", req, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 2 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } else { + for _, cdr := range reply { + if cdr.RequestType != utils.META_PREPAID { + t.Errorf("Unexpected RequestType for CDR: %+v", cdr.RequestType) + } + // in case of Asterisk take the integer part from usage + if optConf == utils.Asterisk { + cdr.Usage = strings.Split(cdr.Usage, ".")[0] + "s" + } + if cdr.Usage != "15s" && cdr.Usage != "16s" && + cdr.Usage != "20s" && cdr.Usage != "21s" { // Usage as seconds + t.Errorf("Unexpected Usage for CDR: %+v", cdr.Usage) + } + if cdr.CostSource != utils.MetaSessionS { + t.Errorf("Unexpected CostSource for CDR: %+v", cdr.CostSource) + } + + } + } +} + func testCallStatMetrics(t *testing.T) { var metrics map[string]string firstStatMetrics1 := map[string]string{ + utils.MetaTCC: "1.35346", + utils.MetaTCD: "2m27s", + } + firstStatMetrics2 := map[string]string{ utils.MetaTCC: "1.35009", utils.MetaTCD: "2m25s", } - firstStatMetrics2 := map[string]string{ + firstStatMetrics3 := map[string]string{ utils.MetaTCC: "1.34009", utils.MetaTCD: "2m24s", } - secondStatMetrics := map[string]string{ + firstStatMetrics4 := map[string]string{ + utils.MetaTCC: "1.35346", + utils.MetaTCD: "2m24s", + } + secondStatMetrics1 := map[string]string{ utils.MetaTCC: "0.6", utils.MetaTCD: "35s", } + secondStatMetrics2 := map[string]string{ + utils.MetaTCC: "0.6", + utils.MetaTCD: "37s", + } if err := tutorialCallsRpc.Call(utils.StatSv1GetQueueStringMetrics, &utils.TenantID{Tenant: "cgrates.org", ID: "Stats2"}, &metrics); err != nil { t.Error(err) - } else if !reflect.DeepEqual(firstStatMetrics1, metrics) && - !reflect.DeepEqual(firstStatMetrics2, metrics) { + } + if optConf == utils.Asterisk { + metrics[utils.MetaTCD] = strings.Split(metrics[utils.MetaTCD], ".")[0] + "s" + } + if !reflect.DeepEqual(firstStatMetrics1, metrics) && + !reflect.DeepEqual(firstStatMetrics2, metrics) && + !reflect.DeepEqual(firstStatMetrics3, metrics) && + !reflect.DeepEqual(firstStatMetrics4, metrics) { t.Errorf("expecting: %+v, received reply: %s", firstStatMetrics1, metrics) } if err := tutorialCallsRpc.Call(utils.StatSv1GetQueueStringMetrics, &utils.TenantID{Tenant: "cgrates.org", ID: "Stats2_1"}, &metrics); err != nil { t.Error(err) - } else if !reflect.DeepEqual(secondStatMetrics, metrics) { - t.Errorf("expecting: %+v, received reply: %s", secondStatMetrics, metrics) + } + if optConf == utils.Asterisk { + metrics[utils.MetaTCD] = strings.Split(metrics[utils.MetaTCD], ".")[0] + "s" + } + if !reflect.DeepEqual(secondStatMetrics1, metrics) && + !reflect.DeepEqual(secondStatMetrics2, metrics) { + t.Errorf("expecting: %+v, received reply: %s", secondStatMetrics1, metrics) } } @@ -581,9 +651,8 @@ func testCallCheckResourceRelease(t *testing.T) { t.Errorf("Resources: %+v", rs) } for _, r := range *rs { - if r.ID == "ResGroup1" && - (len(r.Usages) != 0 || len(r.TTLIdx) != 0) { - t.Errorf("Unexpected resource: %+v", r) + if r.ID == "ResGroup1" && len(r.Usages) != 0 { + t.Errorf("Unexpected resource: %+v", utils.ToJSON(r)) } } } @@ -616,19 +685,19 @@ func testCallSyncSessions(t *testing.T) { var reply *[]*sessions.ActiveSession // activeSessions shouldn't be active if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions, - &map[string]string{}, &reply); err.Error() != utils.ErrNotFound.Error() { - t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error()) + &map[string]string{}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error("Got error on SessionSv1.GetActiveSessions: ", err) } // 1001 call 1002 stop the call after 12 seconds if err := engine.PjsuaCallUri( &engine.PjsuaAccount{Id: "sip:1001@127.0.0.1", Username: "1001", Password: "CGRateS.org", Realm: "*"}, - "sip:1002@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(120)*time.Second, 5071); err != nil { + "sip:1002@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(120)*time.Second, 5076); err != nil { t.Fatal(err) } // 1001 call 1003 stop the call after 11 seconds if err := engine.PjsuaCallUri( &engine.PjsuaAccount{Id: "sip:1001@127.0.0.1", Username: "1001", Password: "CGRateS.org", Realm: "*"}, - "sip:1003@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(120)*time.Second, 5073); err != nil { + "sip:1003@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(120)*time.Second, 5077); err != nil { t.Fatal(err) } time.Sleep(1 * time.Second) @@ -637,28 +706,26 @@ func testCallSyncSessions(t *testing.T) { &map[string]string{}, &reply); err != nil { t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error()) } else if len(*reply) != 2 { - t.Errorf("expecting 2, received reply: %+v", utils.ToJSON(reply)) + t.Errorf("expecting 2 active sessions, received: %+v", utils.ToJSON(reply)) } //check if resource was allocated for 2 calls(1001->1002;1001->1003) var rs *engine.Resources args := &utils.ArgRSv1ResourceUsage{ CGREvent: utils.CGREvent{ Tenant: "cgrates.org", - ID: "ResourceAllocation", + ID: "AllocateResource", Event: map[string]interface{}{ utils.Account: "1001", utils.Subject: "1001", utils.Destination: "1002"}, - }, - Units: 1, - } + }} if err := tutorialCallsRpc.Call(utils.ResourceSv1GetResourcesForEvent, args, &rs); err != nil { t.Error(err) } else if len(*rs) != 1 { t.Errorf("Resources: %+v", utils.ToJSON(rs)) } for _, r := range *rs { - if r.ID == "ResGroup1" && (len(r.Usages) != 2 || len(r.TTLIdx) != 2) { + if r.ID == "ResGroup1" && len(r.Usages) != 2 { t.Errorf("Unexpected resource: %+v", utils.ToJSON(r)) } } @@ -686,8 +753,8 @@ func testCallSyncSessions(t *testing.T) { // activeSessions shouldn't be active if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions, - &map[string]string{}, &reply); err.Error() != utils.ErrNotFound.Error() { - t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error()) + &map[string]string{}, &reply); err != nil && err.Error() != utils.ErrNotFound.Error() { + t.Error("Got error on SessionSv1.GetActiveSessions: ", err) } var sourceForCDR string @@ -699,6 +766,9 @@ func testCallSyncSessions(t *testing.T) { case utils.Kamailio: sourceForCDR = utils.KamailioAgent numberOfCDR = 3 + case utils.Asterisk: + sourceForCDR = utils.AsteriskAgent + numberOfCDR = 3 } // verify cdr var rplCdrs []*engine.ExternalCDR @@ -732,8 +802,7 @@ func testCallSyncSessions(t *testing.T) { t.Errorf("Resources: %+v", rsAfter) } for _, r := range *rsAfter { - if r.ID == "ResGroup1" && - (len(r.Usages) != 0 || len(r.TTLIdx) != 0) { + if r.ID == "ResGroup1" && len(r.Usages) != 0 { t.Errorf("Unexpected resource: %+v", utils.ToJSON(r)) } } diff --git a/glide.lock b/glide.lock index 1fbb5c00e..c0f2f3d18 100644 --- a/glide.lock +++ b/glide.lock @@ -8,7 +8,7 @@ imports: subpackages: - jsonrpc - name: github.com/cgrates/aringo - version: 47cdb110c5ff42bddf2b801dc5ae8ceb15d2d602 + version: f996da7890eaec95ba13240253744446e17e6598 - name: github.com/cgrates/fsock version: bcbd5e75c07dddb12ac86f1f861f2bdddc1d4596 - name: github.com/cgrates/kamevapi diff --git a/sessions/sessions.go b/sessions/sessions.go index 15643a6df..a2271ee83 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -292,7 +292,7 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) { var reply string argsRU := utils.ArgRSv1ResourceUsage{ CGREvent: utils.CGREvent{ - Tenant: s.EventStart.GetStringIgnoreErrors(utils.Tenant), + Tenant: s.Tenant, Event: s.EventStart.AsMapInterface(), }, UsageID: s.ResourceID,