diff --git a/.gitignore b/.gitignore
index dfd8ef924..bd8808a87 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,3 +14,4 @@ data/vagrant/.vagrant
data/vagrant/vagrant_ansible_inventory_default
data/tutorials/fs_evsock/freeswitch/etc/freeswitch/
vendor
+*.test
diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go
index b4fee06fe..10ed9ec9d 100644
--- a/apier/v1/apier_local_test.go
+++ b/apier/v1/apier_local_test.go
@@ -1624,7 +1624,7 @@ func TestApierLocalGetScheduledActions(t *testing.T) {
}
var rply []*ScheduledActions
if err := rater.Call("ApierV1.GetScheduledActions", AttrsGetScheduledActions{}, &rply); err != nil {
- t.Error("Unexpected error: ", err.Error)
+ t.Error("Unexpected error: ", err)
}
}
diff --git a/apier/v1/scheduler.go b/apier/v1/scheduler.go
index ec02807a5..69871f61a 100644
--- a/apier/v1/scheduler.go
+++ b/apier/v1/scheduler.go
@@ -110,22 +110,11 @@ type ScheduledActions struct {
}
func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *[]*ScheduledActions) error {
- schedActions := make([]*ScheduledActions, 0)
if self.Sched == nil {
return errors.New("SCHEDULER_NOT_ENABLED")
}
+ schedActions := make([]*ScheduledActions, 0) // needs to be initialized if remains empty
scheduledActions := self.Sched.GetQueue()
- var min, max int
- if attrs.Paginator.Offset != nil {
- min = *attrs.Paginator.Offset
- }
- if attrs.Paginator.Limit != nil {
- max = *attrs.Paginator.Limit
- }
- if max > len(scheduledActions) {
- max = len(scheduledActions)
- }
- scheduledActions = scheduledActions[min : min+max]
for _, qActions := range scheduledActions {
sas := &ScheduledActions{ActionsId: qActions.ActionsId, ActionPlanId: qActions.Id, ActionPlanUuid: qActions.Uuid}
if attrs.SearchTerm != "" &&
@@ -140,28 +129,40 @@ func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *
if !attrs.TimeEnd.IsZero() && (sas.NextRunTime.After(attrs.TimeEnd) || sas.NextRunTime.Equal(attrs.TimeEnd)) {
continue
}
- acntFiltersMatch := false
- for _, acntKey := range qActions.AccountIds {
- tenantMatched := len(attrs.Tenant) == 0
- accountMatched := len(attrs.Account) == 0
- dta, _ := utils.NewTAFromAccountKey(acntKey)
- sas.Accounts = append(sas.Accounts, dta)
- // One member matching
- if !tenantMatched && attrs.Tenant == dta.Tenant {
- tenantMatched = true
+ // filter on account
+ if attrs.Tenant != "" || attrs.Account != "" {
+ found := false
+ for _, accID := range qActions.AccountIds {
+ split := strings.Split(accID, utils.CONCATENATED_KEY_SEP)
+ if len(split) != 2 {
+ continue // malformed account id
+ }
+ if attrs.Tenant != "" && attrs.Tenant != split[0] {
+ continue
+ }
+ if attrs.Account != "" && attrs.Account != split[1] {
+ continue
+ }
+ found = true
+ break
}
- if !accountMatched && attrs.Account == dta.Account {
- accountMatched = true
- }
- if tenantMatched && accountMatched {
- acntFiltersMatch = true
+ if !found {
+ continue
}
}
- if !acntFiltersMatch {
- continue
- }
+ // we have a winner
schedActions = append(schedActions, sas)
}
+ if attrs.Paginator.Offset != nil {
+ if *attrs.Paginator.Offset <= len(schedActions) {
+ schedActions = schedActions[*attrs.Paginator.Offset:]
+ }
+ }
+ if attrs.Paginator.Limit != nil {
+ if *attrs.Paginator.Limit <= len(schedActions) {
+ schedActions = schedActions[:*attrs.Paginator.Limit]
+ }
+ }
*reply = schedActions
return nil
}
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index d519c7c9c..048c09119 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -484,7 +484,10 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
internalCdrSChan <- cdrServer // Signal that cdrS is operational
}
-func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, ratingDb engine.RatingStorage, exitChan chan bool) {
+func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneChan chan struct{}, ratingDb engine.RatingStorage, exitChan chan bool) {
+ // Wait for cache to load data before starting
+ cacheDone := <- cacheDoneChan
+ cacheDoneChan <- cacheDone
utils.Logger.Info("Starting CGRateS Scheduler.")
sched := scheduler.NewScheduler()
go reloadSchedulerSingnalHandler(sched, ratingDb)
@@ -591,7 +594,7 @@ func main() {
writePid()
}
if *singlecpu {
- runtime.GOMAXPROCS(1) // Having multiple cpus slows down computing due to CPU management, to be reviewed in future Go releases
+ runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases
}
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
@@ -666,6 +669,7 @@ func main() {
// Define internal connections via channels
internalBalancerChan := make(chan *balancer2go.Balancer, 1)
internalRaterChan := make(chan *engine.Responder, 1)
+ cacheDoneChan := make(chan struct{}, 1)
internalSchedulerChan := make(chan *scheduler.Scheduler, 1)
internalCdrSChan := make(chan *engine.CdrServer, 1)
internalCdrStatSChan := make(chan engine.StatsInterface, 1)
@@ -681,13 +685,13 @@ func main() {
// Start rater service
if cfg.RaterEnabled {
- go startRater(internalRaterChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
+ go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
server, ratingDb, accountDb, loadDb, cdrDb, logDb, &stopHandled, exitChan)
}
// Start Scheduler
if cfg.SchedulerEnabled {
- go startScheduler(internalSchedulerChan, ratingDb, exitChan)
+ go startScheduler(internalSchedulerChan, cacheDoneChan, ratingDb, exitChan)
}
// Start CDR Server
diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go
index f5a5a655f..0ee787728 100644
--- a/cmd/cgr-engine/rater.go
+++ b/cmd/cgr-engine/rater.go
@@ -39,13 +39,13 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled
}
// Starts rater and reports on chan
-func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
+func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
internalCdrStatSChan chan engine.StatsInterface, internalHistorySChan chan history.Scribe,
internalPubSubSChan chan engine.PublisherSubscriber, internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService,
server *utils.Server,
ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, logDb engine.LogStorage,
stopHandled *bool, exitChan chan bool) {
- waitTasks := make([]chan struct{}, 0)
+ var waitTasks []chan struct{}
//Cache load
cacheTaskChan := make(chan struct{})
@@ -62,7 +62,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
exitChan <- true
return
}
-
+ cacheDoneChan <- struct{}{}
}()
// Retrieve scheduler for it's API methods
diff --git a/console/scheduler_queue.go b/console/scheduler_queue.go
new file mode 100644
index 000000000..a409c3a68
--- /dev/null
+++ b/console/scheduler_queue.go
@@ -0,0 +1,63 @@
+/*
+Rating system designed to be used in VoIP Carriers World
+Copyright (C) 2012-2015 ITsysCOM
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package console
+
+import "github.com/cgrates/cgrates/apier/v1"
+
+func init() {
+ c := &CmdGetScheduledActions{
+ name: "scheduler_queue",
+ rpcMethod: "ApierV1.GetScheduledActions",
+ rpcParams: &v1.AttrsGetScheduledActions{},
+ }
+ commands[c.Name()] = c
+ c.CommandExecuter = &CommandExecuter{c}
+}
+
+// Commander implementation
+type CmdGetScheduledActions struct {
+ name string
+ rpcMethod string
+ rpcParams *v1.AttrsGetScheduledActions
+ *CommandExecuter
+}
+
+func (self *CmdGetScheduledActions) Name() string {
+ return self.name
+}
+
+func (self *CmdGetScheduledActions) RpcMethod() string {
+ return self.rpcMethod
+}
+
+func (self *CmdGetScheduledActions) RpcParams(reset bool) interface{} {
+ if reset || self.rpcParams == nil {
+ self.rpcParams = &v1.AttrsGetScheduledActions{}
+ }
+ return self.rpcParams
+}
+
+func (self *CmdGetScheduledActions) PostprocessRpcParams() error {
+ return nil
+}
+
+func (self *CmdGetScheduledActions) RpcResult() interface{} {
+ s := v1.ScheduledActions{}
+ return &s
+}
diff --git a/engine/storage_utils.go b/engine/storage_utils.go
index da1292c3e..934e99378 100644
--- a/engine/storage_utils.go
+++ b/engine/storage_utils.go
@@ -41,6 +41,9 @@ func ConfigureRatingStorage(db_type, host, port, name, user, pass, marshaler str
host += ":" + port
}
d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS)
+ case utils.MONGO:
+ d, err = NewMongoStorage(host, port, name, user, pass)
+ db = d.(RatingStorage)
default:
err = errors.New("unknown db")
}
diff --git a/general_tests/tutorial_local_test.go b/general_tests/tutorial_local_test.go
index 5d4beec49..5cbf192df 100644
--- a/general_tests/tutorial_local_test.go
+++ b/general_tests/tutorial_local_test.go
@@ -707,6 +707,7 @@ func TestTutLocalLcrStatic(t *testing.T) {
},
}
var lcr engine.LCRCost
+ cd.CgrId = "1"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eStLcr.Entry, lcr.Entry) {
@@ -731,6 +732,7 @@ func TestTutLocalLcrStatic(t *testing.T) {
&engine.LCRSupplierCost{Supplier: "*out:cgrates.org:lcr_profile1:suppl2", Cost: 1.2, Duration: 60 * time.Second},
},
}
+ cd.CgrId = "2"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eStLcr.Entry, lcr.Entry) {
@@ -809,6 +811,7 @@ func TestTutLocalLcrQos(t *testing.T) {
}
var lcr engine.LCRCost
// Since there is no real quality difference, the suppliers will come in random order here
+ cd.CgrId = "3"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eStLcr.Entry, lcr.Entry) {
@@ -847,6 +850,7 @@ func TestTutLocalLcrQos(t *testing.T) {
QOS: map[string]float64{engine.TCD: 90, engine.ACC: 0.325, engine.TCC: 0.325, engine.ASR: 100, engine.ACD: 90}},
},
}
+ cd.CgrId = "4"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eStLcr.Entry, lcr.Entry) {
@@ -874,6 +878,7 @@ func TestTutLocalLcrQos(t *testing.T) {
QOS: map[string]float64{engine.TCD: 240, engine.ACC: 0.35, engine.TCC: 0.7, engine.ASR: 100, engine.ACD: 120}},
},
}
+ cd.CgrId = "5"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eStLcr.Entry, lcr.Entry) {
@@ -909,6 +914,7 @@ func TestTutLocalLcrQosThreshold(t *testing.T) {
},
}
var lcr engine.LCRCost
+ cd.CgrId = "6"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eLcr.Entry, lcr.Entry) {
@@ -934,6 +940,7 @@ func TestTutLocalLcrQosThreshold(t *testing.T) {
QOS: map[string]float64{engine.TCD: 240, engine.ACC: 0.35, engine.TCC: 0.7, engine.ASR: 100, engine.ACD: 120}},
},
}
+ cd.CgrId = "7"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eLcr.Entry, lcr.Entry) {
@@ -970,6 +977,7 @@ func TestTutLocalLcrQosThreshold(t *testing.T) {
},
}
*/
+ cd.CgrId = "8"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eLcr.Entry, lcr.Entry) {
@@ -994,6 +1002,7 @@ func TestTutLocalLcrQosThreshold(t *testing.T) {
QOS: map[string]float64{engine.TCD: 240, engine.ACC: 0.35, engine.TCC: 0.7, engine.ASR: 100, engine.ACD: 120}},
},
}
+ cd.CgrId = "9"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eLcr.Entry, lcr.Entry) {
@@ -1028,6 +1037,7 @@ func TestTutLocalLeastCost(t *testing.T) {
},
}
var lcr engine.LCRCost
+ cd.CgrId = "10"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eStLcr.Entry, lcr.Entry) {
@@ -1059,6 +1069,7 @@ func TestTutLocalLeastCost(t *testing.T) {
&engine.LCRSupplierCost{Supplier: "*out:cgrates.org:lcr_profile1:suppl1", Cost: 1.2, Duration: 60 * time.Second},
},
}
+ cd.CgrId = "11"
if err := tutLocalRpc.Call("Responder.GetLCR", cd, &lcr); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eStLcr.Entry, lcr.Entry) {
diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go
index db9728bb0..0493fb6dd 100644
--- a/scheduler/scheduler.go
+++ b/scheduler/scheduler.go
@@ -66,12 +66,12 @@ func (s *Scheduler) Loop() {
} else {
s.Unlock()
d := a0.GetNextStartTime(now).Sub(now)
- //utils.Logger.Info(fmt.Sprintf("Timer set to wait for %v", d))
+ utils.Logger.Info(fmt.Sprintf(" Time to next action (%s): %v", a0.Id, d))
s.timer = time.NewTimer(d)
select {
case <-s.timer.C:
// timer has expired
- utils.Logger.Info(fmt.Sprintf("Time for action on %v", a0))
+ utils.Logger.Info(fmt.Sprintf(" Time for action on %v", a0))
case <-s.restartLoop:
// nothing to do, just continue the loop
}
@@ -82,29 +82,32 @@ func (s *Scheduler) Loop() {
func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
actionPlans, err := storage.GetAllActionPlans()
if err != nil && err != utils.ErrNotFound {
- utils.Logger.Warning(fmt.Sprintf("Cannot get action plans: %v", err))
+ utils.Logger.Warning(fmt.Sprintf(" Cannot get action plans: %v", err))
}
+ utils.Logger.Info(fmt.Sprintf(" processing %d action plans", len(actionPlans)))
// recreate the queue
s.Lock()
s.queue = engine.ActionPlanPriotityList{}
for key, aps := range actionPlans {
toBeSaved := false
isAsap := false
- newApls := make([]*engine.ActionPlan, 0) // will remove the one time runs from the database
+ var newApls []*engine.ActionPlan // will remove the one time runs from the database
for _, ap := range aps {
if ap.Timing == nil {
utils.Logger.Warning(fmt.Sprintf(" Nil timing on action plan: %+v, discarding!", ap))
continue
}
+ if len(ap.AccountIds) == 0 { // no accounts just ignore
+ continue
+ }
isAsap = ap.IsASAP()
toBeSaved = toBeSaved || isAsap
if isAsap {
- if len(ap.AccountIds) > 0 {
- utils.Logger.Info(fmt.Sprintf("Time for one time action on %v", key))
- }
+ utils.Logger.Info(fmt.Sprintf(" Time for one time action on %v", key))
ap.Execute()
ap.AccountIds = make([]string, 0)
} else {
+
now := time.Now()
if ap.GetNextStartTime(now).Before(now) {
// the task is obsolete, do not add it to the queue
@@ -124,6 +127,7 @@ func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
}
}
sort.Sort(s.queue)
+ utils.Logger.Info(fmt.Sprintf(" queued %d action plans", len(s.queue)))
s.Unlock()
}