diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index 84b4ee251..de596a8aa 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -664,7 +664,7 @@ type DispatcherSchedulerSv1 struct { } // Reload reloads scheduler instructions -func (dS *DispatcherSchedulerSv1) Reload(attr *dispatchers.StringWithApiKey, reply *string) (err error) { +func (dS *DispatcherSchedulerSv1) Reload(attr *utils.CGREventWithArgDispatcher, reply *string) (err error) { return dS.dS.SchedulerSv1Reload(attr, reply) } diff --git a/apier/v1/dispatcher_interface.go b/apier/v1/dispatcher_interface.go index 4b22b69d5..506457519 100644 --- a/apier/v1/dispatcher_interface.go +++ b/apier/v1/dispatcher_interface.go @@ -127,7 +127,7 @@ type GuardianSv1Interface interface { } type SchedulerSv1Interface interface { - Reload(arg *dispatchers.StringWithApiKey, reply *string) error + Reload(arg *utils.CGREventWithArgDispatcher, reply *string) error Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error } diff --git a/apier/v1/schedulers.go b/apier/v1/schedulers.go index bc96f705a..a9d71c69f 100644 --- a/apier/v1/schedulers.go +++ b/apier/v1/schedulers.go @@ -19,7 +19,6 @@ along with this program. If not, see package v1 import ( - "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -34,8 +33,8 @@ type SchedulerSv1 struct { } // Reload reloads scheduler instructions -func (schdSv1 *SchedulerSv1) Reload(arg *dispatchers.StringWithApiKey, reply *string) error { - return schdSv1.schdS.V1Reload(arg.Arg, reply) +func (schdSv1 *SchedulerSv1) Reload(arg *utils.CGREventWithArgDispatcher, reply *string) error { + return schdSv1.schdS.V1Reload(utils.EmptyString, reply) } func (schdSv1 *SchedulerSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error { diff --git a/dispatchers/scheduler.go b/dispatchers/scheduler.go index bb3387c7d..ab4801c66 100644 --- a/dispatchers/scheduler.go +++ b/dispatchers/scheduler.go @@ -19,8 +19,6 @@ along with this program. If not, see package dispatchers import ( - "time" - "github.com/cgrates/cgrates/utils" ) @@ -39,17 +37,17 @@ func (dS *DispatcherService) SchedulerSv1Ping(args *utils.CGREventWithArgDispatc utils.SchedulerSv1Ping, args, reply) } -func (dS *DispatcherService) SchedulerSv1Reload(args *StringWithApiKey, reply *string) (err error) { +func (dS *DispatcherService) SchedulerSv1Reload(args *utils.CGREventWithArgDispatcher, reply *string) (err error) { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing("ArgDispatcher") } if dS.attrS != nil { if err = dS.authorize(utils.SchedulerSv1Ping, - args.TenantArg.Tenant, - args.APIKey, utils.TimePointer(time.Now())); err != nil { + args.CGREvent.Tenant, + args.APIKey, args.CGREvent.Time); err != nil { return } } - return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaScheduler, - args.RouteID, utils.SchedulerSv1Reload, args.Arg, reply) + return dS.Dispatch(args.CGREvent, utils.MetaScheduler, args.RouteID, + utils.SchedulerSv1Reload, args, reply) } diff --git a/engine/tpreader.go b/engine/tpreader.go index dbf075b79..e50018e0b 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -2475,7 +2475,8 @@ func (tpr *TpReader) ReloadCache(flush, verbose bool) (err error) { if verbose { log.Print("Reloading scheduler") } - if err = tpr.schedulerS.Call(utils.SchedulerSv1Reload, "", &reply); err != nil { + if err = tpr.schedulerS.Call(utils.SchedulerSv1Reload, + new(utils.CGREventWithArgDispatcher), &reply); err != nil { log.Printf("WARNING: Got error on scheduler reload: %s\n", err.Error()) } } diff --git a/sessions/sessions_bench_test.go b/sessions/sessions_bench_test.go index c4e4a72e1..4b23eda4a 100644 --- a/sessions/sessions_bench_test.go +++ b/sessions/sessions_bench_test.go @@ -19,6 +19,7 @@ along with this program. If not, see package sessions import ( + "flag" "fmt" "log" "net/rpc" @@ -33,50 +34,49 @@ import ( ) var ( - SessionBenchmarkCfg *config.CGRConfig - SessionBenchmarkRPC *rpc.Client - ConnectOnce sync.Once - NoSessions int + sBenchCfg *config.CGRConfig + sBenchRPC *rpc.Client + connOnce sync.Once + initRuns = flag.Int("init_runs", 25000, "number of loops to run in init") ) func startRPC() { var err error - SessionBenchmarkCfg, err = config.NewCGRConfigFromPath(path.Join(config.CgrConfig().DataFolderPath, "conf", "samples", "tutmysql")) + sBenchCfg, err = config.NewCGRConfigFromPath( + path.Join(config.CgrConfig().DataFolderPath, "conf", "samples", "tutmysql")) if err != nil { log.Fatal(err) } - config.SetCgrConfig(SessionBenchmarkCfg) - if SessionBenchmarkRPC, err = jsonrpc.Dial("tcp", SessionBenchmarkCfg.ListenCfg().RPCJSONListen); err != nil { + config.SetCgrConfig(sBenchCfg) + if sBenchRPC, err = jsonrpc.Dial("tcp", sBenchCfg.ListenCfg().RPCJSONListen); err != nil { log.Fatalf("Error at dialing rcp client:%v\n", err) } } -func addBalance(SessionBenchmarkRPC *rpc.Client, sraccount string) { +func addBalance(sBenchRPC *rpc.Client, sraccount string) { attrSetBalance := utils.AttrSetBalance{ - Tenant: "cgrates.org", - Account: sraccount, - BalanceType: utils.VOICE, - BalanceID: utils.StringPointer("TestDynamicDebitBalance"), - Value: utils.Float64Pointer(5 * float64(time.Hour)), - RatingSubject: utils.StringPointer("*zero5ms"), + Tenant: "cgrates.org", + Account: sraccount, + BalanceType: utils.VOICE, + BalanceID: utils.StringPointer("TestDynamicDebitBalance"), + Value: utils.Float64Pointer(5 * float64(time.Hour)), } var reply string - if err := SessionBenchmarkRPC.Call("ApierV2.SetBalance", attrSetBalance, &reply); err != nil { + if err := sBenchRPC.Call("ApierV2.SetBalance", + attrSetBalance, &reply); err != nil { log.Fatal(err) - // } else if reply != utils.OK { - // log.Fatalf("Received: %s", reply) } } func addAccouns() { var wg sync.WaitGroup - for i := 0; i < 23000; i++ { + for i := 0; i < *initRuns; i++ { wg.Add(1) - go func(i int, SessionBenchmarkRPC *rpc.Client) { - addBalance(SessionBenchmarkRPC, fmt.Sprintf("1001%v1002", i)) - addBalance(SessionBenchmarkRPC, fmt.Sprintf("1001%v1001", i)) + go func(i int, sBenchRPC *rpc.Client) { + addBalance(sBenchRPC, fmt.Sprintf("1001%v", i)) + addBalance(sBenchRPC, fmt.Sprintf("1002%v", i)) wg.Done() - }(i, SessionBenchmarkRPC) + }(i, sBenchRPC) } wg.Wait() } @@ -90,47 +90,37 @@ func sendInit() { Event: map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.ToR: utils.VOICE, - utils.OriginID: "123491", - utils.Account: "1001", - utils.Subject: "1001", - utils.Destination: "1002", utils.Category: "call", utils.Tenant: "cgrates.org", utils.RequestType: utils.META_PREPAID, - utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 59, 0, time.UTC), utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC), - utils.Usage: "10", // 5MB }, }, } - // var wg sync.WaitGroup - for i := 0; i < 23000; i++ { - // wg.Add(1) - // go func(i int, SessionBenchmarkRPC *rpc.Client) { - initArgs.ID = utils.UUIDSha1Prefix() - initArgs.Event[utils.OriginID] = utils.UUIDSha1Prefix() - initArgs.Event[utils.Account] = fmt.Sprintf("1001%v1002", i) - initArgs.Event[utils.Subject] = initArgs.Event[utils.Account] - initArgs.Event[utils.Destination] = fmt.Sprintf("1001%v1001", i) + var wg sync.WaitGroup + for i := 0; i < *initRuns; i++ { + wg.Add(1) + go func(i int) { + initArgs.ID = utils.UUIDSha1Prefix() + initArgs.Event[utils.OriginID] = utils.UUIDSha1Prefix() + initArgs.Event[utils.Account] = fmt.Sprintf("1001%v", i) + initArgs.Event[utils.Subject] = initArgs.Event[utils.Account] + initArgs.Event[utils.Destination] = fmt.Sprintf("1002%v", i) - var initRpl *V1InitSessionReply - if err := SessionBenchmarkRPC.Call(utils.SessionSv1InitiateSession, - initArgs, &initRpl); err != nil { - log.Fatal(err) - } - // _ = getCount(SessionBenchmarkRPC) - // if c := getCount(SessionBenchmarkRPC); i+1 != c { - // log.Fatalf("Not Enough sessions %v!=%v", i+1, c) - // } - // wg.Done() - // }(i, SessionBenchmarkRPC) + var initRpl *V1InitSessionReply + if err := sBenchRPC.Call(utils.SessionSv1InitiateSession, + initArgs, &initRpl); err != nil { + log.Fatal(err) + } + wg.Done() + }(i) } - // wg.Wait() + wg.Wait() } -func getCount(SessionBenchmarkRPC *rpc.Client) int { +func getCount() int { var count int - if err := SessionBenchmarkRPC.Call(utils.SessionSv1GetActiveSessionsCount, + if err := sBenchRPC.Call(utils.SessionSv1GetActiveSessionsCount, map[string]string{}, &count); err != nil { log.Fatal(err) } @@ -138,7 +128,7 @@ func getCount(SessionBenchmarkRPC *rpc.Client) int { } func BenchmarkSendInitSession(b *testing.B) { - ConnectOnce.Do(func() { + connOnce.Do(func() { startRPC() // addAccouns() sendInit() @@ -146,9 +136,6 @@ func BenchmarkSendInitSession(b *testing.B) { }) b.ResetTimer() for i := 0; i < b.N; i++ { - _ = getCount(SessionBenchmarkRPC) - // if count < 2000 { - // b.Fatal("Not Enough sessions") - // } + _ = getCount() } }