mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
Merge branch 'master' into hapool
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -14,3 +14,4 @@ data/vagrant/.vagrant
|
||||
data/vagrant/vagrant_ansible_inventory_default
|
||||
data/tutorials/fs_evsock/freeswitch/etc/freeswitch/
|
||||
vendor
|
||||
*.test
|
||||
|
||||
@@ -1624,7 +1624,7 @@ func TestApierLocalGetScheduledActions(t *testing.T) {
|
||||
}
|
||||
var rply []*ScheduledActions
|
||||
if err := rater.Call("ApierV1.GetScheduledActions", AttrsGetScheduledActions{}, &rply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error)
|
||||
t.Error("Unexpected error: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -110,22 +110,11 @@ type ScheduledActions struct {
|
||||
}
|
||||
|
||||
func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *[]*ScheduledActions) error {
|
||||
schedActions := make([]*ScheduledActions, 0)
|
||||
if self.Sched == nil {
|
||||
return errors.New("SCHEDULER_NOT_ENABLED")
|
||||
}
|
||||
schedActions := make([]*ScheduledActions, 0) // needs to be initialized if remains empty
|
||||
scheduledActions := self.Sched.GetQueue()
|
||||
var min, max int
|
||||
if attrs.Paginator.Offset != nil {
|
||||
min = *attrs.Paginator.Offset
|
||||
}
|
||||
if attrs.Paginator.Limit != nil {
|
||||
max = *attrs.Paginator.Limit
|
||||
}
|
||||
if max > len(scheduledActions) {
|
||||
max = len(scheduledActions)
|
||||
}
|
||||
scheduledActions = scheduledActions[min : min+max]
|
||||
for _, qActions := range scheduledActions {
|
||||
sas := &ScheduledActions{ActionsId: qActions.ActionsId, ActionPlanId: qActions.Id, ActionPlanUuid: qActions.Uuid}
|
||||
if attrs.SearchTerm != "" &&
|
||||
@@ -140,28 +129,40 @@ func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *
|
||||
if !attrs.TimeEnd.IsZero() && (sas.NextRunTime.After(attrs.TimeEnd) || sas.NextRunTime.Equal(attrs.TimeEnd)) {
|
||||
continue
|
||||
}
|
||||
acntFiltersMatch := false
|
||||
for _, acntKey := range qActions.AccountIds {
|
||||
tenantMatched := len(attrs.Tenant) == 0
|
||||
accountMatched := len(attrs.Account) == 0
|
||||
dta, _ := utils.NewTAFromAccountKey(acntKey)
|
||||
sas.Accounts = append(sas.Accounts, dta)
|
||||
// One member matching
|
||||
if !tenantMatched && attrs.Tenant == dta.Tenant {
|
||||
tenantMatched = true
|
||||
// filter on account
|
||||
if attrs.Tenant != "" || attrs.Account != "" {
|
||||
found := false
|
||||
for _, accID := range qActions.AccountIds {
|
||||
split := strings.Split(accID, utils.CONCATENATED_KEY_SEP)
|
||||
if len(split) != 2 {
|
||||
continue // malformed account id
|
||||
}
|
||||
if attrs.Tenant != "" && attrs.Tenant != split[0] {
|
||||
continue
|
||||
}
|
||||
if attrs.Account != "" && attrs.Account != split[1] {
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
break
|
||||
}
|
||||
if !accountMatched && attrs.Account == dta.Account {
|
||||
accountMatched = true
|
||||
}
|
||||
if tenantMatched && accountMatched {
|
||||
acntFiltersMatch = true
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if !acntFiltersMatch {
|
||||
continue
|
||||
}
|
||||
// we have a winner
|
||||
schedActions = append(schedActions, sas)
|
||||
}
|
||||
if attrs.Paginator.Offset != nil {
|
||||
if *attrs.Paginator.Offset <= len(schedActions) {
|
||||
schedActions = schedActions[*attrs.Paginator.Offset:]
|
||||
}
|
||||
}
|
||||
if attrs.Paginator.Limit != nil {
|
||||
if *attrs.Paginator.Limit <= len(schedActions) {
|
||||
schedActions = schedActions[:*attrs.Paginator.Limit]
|
||||
}
|
||||
}
|
||||
*reply = schedActions
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -488,7 +488,10 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
|
||||
internalCdrSChan <- cdrServer // Signal that cdrS is operational
|
||||
}
|
||||
|
||||
func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, ratingDb engine.RatingStorage, exitChan chan bool) {
|
||||
func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneChan chan struct{}, ratingDb engine.RatingStorage, exitChan chan bool) {
|
||||
// Wait for cache to load data before starting
|
||||
cacheDone := <- cacheDoneChan
|
||||
cacheDoneChan <- cacheDone
|
||||
utils.Logger.Info("Starting CGRateS Scheduler.")
|
||||
sched := scheduler.NewScheduler()
|
||||
go reloadSchedulerSingnalHandler(sched, ratingDb)
|
||||
@@ -595,7 +598,7 @@ func main() {
|
||||
writePid()
|
||||
}
|
||||
if *singlecpu {
|
||||
runtime.GOMAXPROCS(1) // Having multiple cpus slows down computing due to CPU management, to be reviewed in future Go releases
|
||||
runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases
|
||||
}
|
||||
if *cpuprofile != "" {
|
||||
f, err := os.Create(*cpuprofile)
|
||||
@@ -670,6 +673,7 @@ func main() {
|
||||
// Define internal connections via channels
|
||||
internalBalancerChan := make(chan *balancer2go.Balancer, 1)
|
||||
internalRaterChan := make(chan *engine.Responder, 1)
|
||||
cacheDoneChan := make(chan struct{}, 1)
|
||||
internalSchedulerChan := make(chan *scheduler.Scheduler, 1)
|
||||
internalCdrSChan := make(chan *engine.CdrServer, 1)
|
||||
internalCdrStatSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
@@ -685,13 +689,13 @@ func main() {
|
||||
|
||||
// Start rater service
|
||||
if cfg.RaterEnabled {
|
||||
go startRater(internalRaterChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
|
||||
go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
|
||||
server, ratingDb, accountDb, loadDb, cdrDb, logDb, &stopHandled, exitChan)
|
||||
}
|
||||
|
||||
// Start Scheduler
|
||||
if cfg.SchedulerEnabled {
|
||||
go startScheduler(internalSchedulerChan, ratingDb, exitChan)
|
||||
go startScheduler(internalSchedulerChan, cacheDoneChan, ratingDb, exitChan)
|
||||
}
|
||||
|
||||
// Start CDR Server
|
||||
|
||||
@@ -39,13 +39,14 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled
|
||||
}
|
||||
|
||||
// Starts rater and reports on chan
|
||||
func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
|
||||
|
||||
func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
|
||||
internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection,
|
||||
internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
|
||||
server *utils.Server,
|
||||
ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, logDb engine.LogStorage,
|
||||
stopHandled *bool, exitChan chan bool) {
|
||||
waitTasks := make([]chan struct{}, 0)
|
||||
var waitTasks []chan struct{}
|
||||
|
||||
//Cache load
|
||||
cacheTaskChan := make(chan struct{})
|
||||
@@ -62,7 +63,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
|
||||
cacheDoneChan <- struct{}{}
|
||||
}()
|
||||
|
||||
// Retrieve scheduler for it's API methods
|
||||
|
||||
@@ -36,7 +36,7 @@ import (
|
||||
var (
|
||||
//separator = flag.String("separator", ",", "Default field separator")
|
||||
cgrConfig, _ = config.NewDefaultCGRConfig()
|
||||
migrateRC8 = flag.String("migrate_rc8", "", "Migrate Accounts, Actions, ActionTriggers and DerivedChargers to RC8 structures, possible values: *all,acc,atr,act,dcs")
|
||||
migrateRC8 = flag.String("migrate_rc8", "", "Migrate Accounts, Actions, ActionTriggers and DerivedChargers to RC8 structures, possible values: *all,acc,atr,act,dcs,apl")
|
||||
tpdb_type = flag.String("tpdb_type", cgrConfig.TpDbType, "The type of the TariffPlan database <redis>")
|
||||
tpdb_host = flag.String("tpdb_host", cgrConfig.TpDbHost, "The TariffPlan host to connect to.")
|
||||
tpdb_port = flag.String("tpdb_port", cgrConfig.TpDbPort, "The TariffPlan port to bind to.")
|
||||
@@ -142,6 +142,11 @@ func main() {
|
||||
log.Print(err.Error())
|
||||
}
|
||||
}
|
||||
if strings.Contains(*migrateRC8, "apl") || strings.Contains(*migrateRC8, "*all") {
|
||||
if err := migratorRC8rat.migrateActionPlans(); err != nil {
|
||||
log.Print(err.Error())
|
||||
}
|
||||
}
|
||||
log.Print("Done!")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -425,3 +425,45 @@ func (mig MigratorRC8) migrateDerivedChargers() error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mig MigratorRC8) migrateActionPlans() error {
|
||||
keys, err := mig.db.Cmd("KEYS", utils.ACTION_PLAN_PREFIX+"*").List()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aplsMap := make(map[string]engine.ActionPlans, len(keys))
|
||||
for _, key := range keys {
|
||||
log.Printf("Migrating action plans: %s...", key)
|
||||
var apls engine.ActionPlans
|
||||
var values []byte
|
||||
if values, err = mig.db.Cmd("GET", key).Bytes(); err == nil {
|
||||
if err := mig.ms.Unmarshal(values, &apls); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// change all AccountIds
|
||||
for _, apl := range apls {
|
||||
for idx, actionId := range apl.AccountIds {
|
||||
// fix id
|
||||
idElements := strings.Split(actionId, utils.CONCATENATED_KEY_SEP)
|
||||
if len(idElements) != 3 {
|
||||
log.Printf("Malformed account ID %s", actionId)
|
||||
continue
|
||||
}
|
||||
apl.AccountIds[idx] = fmt.Sprintf("%s:%s", idElements[1], idElements[2])
|
||||
}
|
||||
}
|
||||
aplsMap[key] = apls
|
||||
}
|
||||
// write data back
|
||||
for key, apl := range aplsMap {
|
||||
result, err := mig.ms.Marshal(apl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = mig.db.Cmd("SET", key, result).Err; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
63
console/scheduler_queue.go
Normal file
63
console/scheduler_queue.go
Normal file
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2012-2015 ITsysCOM
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/apier/v1"
|
||||
|
||||
func init() {
|
||||
c := &CmdGetScheduledActions{
|
||||
name: "scheduler_queue",
|
||||
rpcMethod: "ApierV1.GetScheduledActions",
|
||||
rpcParams: &v1.AttrsGetScheduledActions{},
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
// Commander implementation
|
||||
type CmdGetScheduledActions struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *v1.AttrsGetScheduledActions
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) RpcParams(reset bool) interface{} {
|
||||
if reset || self.rpcParams == nil {
|
||||
self.rpcParams = &v1.AttrsGetScheduledActions{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) RpcResult() interface{} {
|
||||
s := v1.ScheduledActions{}
|
||||
return &s
|
||||
}
|
||||
@@ -41,6 +41,9 @@ func ConfigureRatingStorage(db_type, host, port, name, user, pass, marshaler str
|
||||
host += ":" + port
|
||||
}
|
||||
d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS)
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass)
|
||||
db = d.(RatingStorage)
|
||||
default:
|
||||
err = errors.New("unknown db")
|
||||
}
|
||||
|
||||
@@ -66,12 +66,12 @@ func (s *Scheduler) Loop() {
|
||||
} else {
|
||||
s.Unlock()
|
||||
d := a0.GetNextStartTime(now).Sub(now)
|
||||
//utils.Logger.Info(fmt.Sprintf("Timer set to wait for %v", d))
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time to next action (%s): %v", a0.Id, d))
|
||||
s.timer = time.NewTimer(d)
|
||||
select {
|
||||
case <-s.timer.C:
|
||||
// timer has expired
|
||||
utils.Logger.Info(fmt.Sprintf("Time for action on %v", a0))
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %v", a0))
|
||||
case <-s.restartLoop:
|
||||
// nothing to do, just continue the loop
|
||||
}
|
||||
@@ -82,29 +82,32 @@ func (s *Scheduler) Loop() {
|
||||
func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
|
||||
actionPlans, err := storage.GetAllActionPlans()
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
utils.Logger.Warning(fmt.Sprintf("Cannot get action plans: %v", err))
|
||||
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Cannot get action plans: %v", err))
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> processing %d action plans", len(actionPlans)))
|
||||
// recreate the queue
|
||||
s.Lock()
|
||||
s.queue = engine.ActionPlanPriotityList{}
|
||||
for key, aps := range actionPlans {
|
||||
toBeSaved := false
|
||||
isAsap := false
|
||||
newApls := make([]*engine.ActionPlan, 0) // will remove the one time runs from the database
|
||||
var newApls []*engine.ActionPlan // will remove the one time runs from the database
|
||||
for _, ap := range aps {
|
||||
if ap.Timing == nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Nil timing on action plan: %+v, discarding!", ap))
|
||||
continue
|
||||
}
|
||||
if len(ap.AccountIds) == 0 { // no accounts just ignore
|
||||
continue
|
||||
}
|
||||
isAsap = ap.IsASAP()
|
||||
toBeSaved = toBeSaved || isAsap
|
||||
if isAsap {
|
||||
if len(ap.AccountIds) > 0 {
|
||||
utils.Logger.Info(fmt.Sprintf("Time for one time action on %v", key))
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for one time action on %v", key))
|
||||
ap.Execute()
|
||||
ap.AccountIds = make([]string, 0)
|
||||
} else {
|
||||
|
||||
now := time.Now()
|
||||
if ap.GetNextStartTime(now).Before(now) {
|
||||
// the task is obsolete, do not add it to the queue
|
||||
@@ -124,6 +127,7 @@ func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
|
||||
}
|
||||
}
|
||||
sort.Sort(s.queue)
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> queued %d action plans", len(s.queue)))
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user