This commit is contained in:
TeoV
2017-09-07 05:45:30 -04:00
11 changed files with 128 additions and 61 deletions

View File

@@ -58,6 +58,7 @@ information, please see the [`CONTRIBUTING.md`](CONTRIBUTING.md) file.
| @wasimbaig | Wasim Baig |
| @MrGab | Gabriele Proni |
| @TeoV | Teofil Voivozeanu |
| @paolovisintin | Paolo Visintin |
<!-- to sign, include a single line above this comment containing the following text:
| @username | First Last |

View File

@@ -24,7 +24,7 @@
### Documentation ###
[Step by steps tutorials](https://cgrates.readthedocs.org/en/latest/tut_freeswitch.html)
[Debian apt-get repository](https://cgrates.readthedocs.org/en/latest/tut_freeswitch_installs.html#cgrates)
[Debian apt-get repository](https://cgrates.readthedocs.io/en/latest/installation.html#debian-jessie-wheezy)
[Installing CGRateS from sources on minimal debian](https://asciinema.org/a/0lwlputceg52xssqgra7wjza0) (for devel or testing)

View File

@@ -1242,7 +1242,7 @@ func TestApierComputeReverse(t *testing.T) {
}
func TestApierResetDataAfterLoadFromFolder(t *testing.T) {
expStats := &utils.CacheStats{Destinations: 3, Actions: 5, ActionPlans: 7, AccountActionPlans: 13, Aliases: 1} // We get partial cache info during load, maybe fix this in the future
expStats := &utils.CacheStats{Destinations: 3, Actions: 6, ActionPlans: 7, AccountActionPlans: 13, Aliases: 1} // We get partial cache info during load, maybe fix this in the future
var rcvStats *utils.CacheStats
if err := rater.Call("ApierV1.GetCacheStats", utils.AttrCacheStats{}, &rcvStats); err != nil {
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())

View File

@@ -50,23 +50,25 @@ const CGRATES_CFG_JSON = `
"cache":{
"destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control destination caching
"reverse_destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control reverse destinations index caching
"rating_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control rating plans caching
"rating_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control rating profiles caching
"lcr_rules": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control lcr rules caching
"cdr_stats": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control cdr stats queues caching
"actions": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control actions caching
"action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control action plans caching
"account_action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control account action plans index caching
"action_triggers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control action triggers caching
"shared_groups": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control shared groups caching
"aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control aliases caching
"reverse_aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control reverse aliases index caching
"derived_chargers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control derived charging rule caching
"destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // destination caching
"reverse_destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // reverse destinations index caching
"rating_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // rating plans caching
"rating_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // rating profiles caching
"lcr_rules": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // lcr rules caching
"cdr_stats": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // cdr stats queues caching
"actions": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // actions caching
"action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // action plans caching
"account_action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // account action plans index caching
"action_triggers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // action triggers caching
"shared_groups": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // shared groups caching
"aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // aliases caching
"reverse_aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // reverse aliases index caching
"derived_chargers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // derived charging rule caching
"resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resource profiles caching
"resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resources caching
"timings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control timings caching
"timings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // timings caching
"stats_queues": {"limit": -1, "ttl": "5m", "static_ttl": false, "precache": false}, // queues with metrics
"stats_event_queues": {"limit": -1, "ttl": "5m", "static_ttl": false, "precache": false}, // matching queues to events
},
@@ -455,5 +457,4 @@ const CGRATES_CFG_JSON = `
"sales_type_code": "^R", // template extracting sales type code out of StoredCdr; <$RSRFields>
"tax_exemption_code_list": "", // template extracting tax exemption code list out of StoredCdr; <$RSRFields>
},
}`

View File

@@ -118,6 +118,12 @@ func TestCacheJsonCfg(t *testing.T) {
utils.CacheTimings: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheStatSQueues: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("5m"), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheStatSEventQueues: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("5m"), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
}
if gCfg, err := dfCgrJsonCfg.CacheJsonCfg(); err != nil {

View File

@@ -441,9 +441,13 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheTimings: &CacheParamConfig{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheStatSQueues: &CacheParamConfig{Limit: -1,
TTL: time.Duration(5 * time.Minute), StaticTTL: false, Precache: false},
utils.CacheStatSEventQueues: &CacheParamConfig{Limit: -1,
TTL: time.Duration(5 * time.Minute), StaticTTL: false, Precache: false},
}
if !reflect.DeepEqual(eCacheCfg, cgrCfg.CacheConfig) {
t.Errorf("received: %s, \nexpecting: %s", utils.ToIJSON(eCacheCfg), utils.ToIJSON(cgrCfg.CacheConfig))
t.Errorf("received: %s, \nexpecting: %s", utils.ToJSON(eCacheCfg), utils.ToJSON(cgrCfg.CacheConfig))
}
}

View File

@@ -18,9 +18,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/cache"
"github.com/cgrates/cgrates/utils"
)
@@ -240,3 +242,52 @@ func TestRSAllocateResource(t *testing.T) {
t.Error("Duplicate ResourceUsage id should not be allowed")
}
}
// TestRSCacheSetGet assurace the presence of private params in cached resource
func TestRSCacheSetGet(t *testing.T) {
r := &Resource{
ID: "RL",
rPrf: &ResourceProfile{
ID: "RL",
Filters: []*RequestFilter{
&RequestFilter{
Type: MetaString,
FieldName: "Account",
Values: []string{"1001", "1002"},
},
&RequestFilter{
Type: MetaRSRFields,
Values: []string{"Subject(~^1.*1$)", "Destination(1002)"},
rsrFields: utils.ParseRSRFieldsMustCompile("Subject(~^1.*1$);Destination(1002)", utils.INFIELD_SEP),
},
},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
},
AllocationMessage: "ALLOC_RL",
Weight: 50,
Limit: 2,
Thresholds: []string{"TEST_ACTIONS"},
UsageTTL: time.Duration(1 * time.Millisecond),
},
Usages: map[string]*ResourceUsage{
"RU2": &ResourceUsage{
ID: "RU2",
ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Units: 2,
},
},
tUsage: utils.Float64Pointer(2),
dirty: utils.BoolPointer(true),
}
key := utils.ResourcesPrefix + r.ID
cache.Set(key, r, true, "")
if x, ok := cache.Get(key); !ok {
t.Error("not in cache")
} else if x == nil {
t.Error("nil resource")
} else if !reflect.DeepEqual(r, x.(*Resource)) {
t.Errorf("Expecting: +v, received: %+v", r, x)
}
}

View File

@@ -27,16 +27,16 @@ import (
"github.com/cgrates/cgrates/utils"
)
// StatsInstances is a sortable list of StatsInstance
type StatsInstances []*StatsInstance
// StatQueues is a sortable list of StatQueue
type StatQueues []*StatQueue
// Sort is part of sort interface, sort based on Weight
func (sis StatsInstances) Sort() {
func (sis StatQueues) Sort() {
sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight })
}
// remWithID removes the queue with ID from slice
func (sis StatsInstances) remWithID(qID string) {
func (sis StatQueues) remWithID(qID string) {
for i, q := range sis {
if q.cfg.ID == qID {
copy(sis[i:], sis[i+1:])
@@ -47,10 +47,10 @@ func (sis StatsInstances) remWithID(qID string) {
}
}
// NewStatsInstance instantiates a StatsInstance
func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler,
sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) {
si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)}
// NewStatQueue instantiates a StatQueue
func NewStatQueue(sec *StatsEventCache, ms engine.Marshaler,
sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatQueue, err error) {
si = &StatQueue{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)}
for _, metricID := range sqCfg.Metrics {
if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
return
@@ -77,8 +77,8 @@ func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler,
return
}
// StatsInstance represents an individual stats instance
type StatsInstance struct {
// StatQueue represents an individual stats instance
type StatQueue struct {
sync.RWMutex
dirty bool // needs save
sec *StatsEventCache
@@ -89,7 +89,7 @@ type StatsInstance struct {
}
// GetSQStoredMetrics retrieves the data used for store to DB
func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
sq.RLock()
defer sq.RUnlock()
sEvents := make(map[string]engine.StatsEvent)
@@ -97,7 +97,7 @@ func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache
ev := sq.sec.GetEvent(sqItem.EventID)
if ev == nil {
utils.Logger.Warning(fmt.Sprintf("<StatsInstance> querying for storage eventID: %s, error: event not cached",
utils.Logger.Warning(fmt.Sprintf("<StatQueue> querying for storage eventID: %s, error: event not cached",
sqItem.EventID))
continue
}
@@ -111,7 +111,7 @@ func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
for metricID, metric := range sq.sqMetrics {
var err error
if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatsInstance> querying for storage metricID: %s, error: %s",
utils.Logger.Warning(fmt.Sprintf("<StatQueue> querying for storage metricID: %s, error: %s",
metricID, err.Error()))
continue
}
@@ -120,7 +120,7 @@ func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
}
// ProcessEvent processes a StatsEvent, returns true if processed
func (sq *StatsInstance) ProcessEvent(ev engine.StatsEvent) (err error) {
func (sq *StatQueue) ProcessEvent(ev engine.StatsEvent) (err error) {
sq.Lock()
sq.remExpired()
sq.remOnQueueLength()
@@ -130,7 +130,7 @@ func (sq *StatsInstance) ProcessEvent(ev engine.StatsEvent) (err error) {
}
// remExpired expires items in queue
func (sq *StatsInstance) remExpired() {
func (sq *StatQueue) remExpired() {
var expIdx *int // index of last item to be expired
for i, item := range sq.sqItems {
if item.ExpiryTime == nil {
@@ -151,7 +151,7 @@ func (sq *StatsInstance) remExpired() {
}
// remOnQueueLength rems elements based on QueueLength setting
func (sq *StatsInstance) remOnQueueLength() {
func (sq *StatQueue) remOnQueueLength() {
if sq.cfg.QueueLength == 0 {
return
}
@@ -164,26 +164,26 @@ func (sq *StatsInstance) remOnQueueLength() {
}
// addStatsEvent computes metrics for an event
func (sq *StatsInstance) addStatsEvent(ev engine.StatsEvent) {
func (sq *StatQueue) addStatsEvent(ev engine.StatsEvent) {
evID := ev.ID()
for metricID, metric := range sq.sqMetrics {
if err := metric.AddEvent(ev); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatsInstance> metricID: %s, add eventID: %s, error: %s",
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, add eventID: %s, error: %s",
metricID, evID, err.Error()))
}
}
}
// remStatsEvent removes an event from metrics
func (sq *StatsInstance) remEventWithID(evID string) {
func (sq *StatQueue) remEventWithID(evID string) {
ev := sq.sec.GetEvent(evID)
if ev == nil {
utils.Logger.Warning(fmt.Sprintf("<StatsInstance> removing eventID: %s, error: event not cached", evID))
utils.Logger.Warning(fmt.Sprintf("<StatQueue> removing eventID: %s, error: event not cached", evID))
return
}
for metricID, metric := range sq.sqMetrics {
if err := metric.RemEvent(ev); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatsInstance> metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
}
}
}

View File

@@ -24,19 +24,19 @@ import (
"github.com/cgrates/cgrates/engine"
)
func TestStatsInstancesSort(t *testing.T) {
sInsts := StatsInstances{
&StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
func TestStatQueuesSort(t *testing.T) {
sInsts := StatQueues{
&StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
}
sInsts.Sort()
eSInst := StatsInstances{
&StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
eSInst := StatQueues{
&StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
}
if !reflect.DeepEqual(eSInst, sInsts) {
t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts)

View File

@@ -43,8 +43,8 @@ func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval tim
if err != nil {
return nil, err
}
ss.queuesCache = make(map[string]*StatsInstance)
ss.queues = make(StatsInstances, 0)
ss.queuesCache = make(map[string]*StatQueue)
ss.queues = make(StatQueues, 0)
for _, prfx := range sqPrfxs {
if q, err := ss.loadQueue(prfx[len(utils.StatsConfigPrefix):]); err != nil {
utils.Logger.Err(fmt.Sprintf("<StatS> failed loading quueue with id: <%s>, err: <%s>",
@@ -66,9 +66,9 @@ type StatService struct {
ms engine.Marshaler
storeInterval time.Duration
stopStoring chan struct{}
evCache *StatsEventCache // so we can pass it to queues
queuesCache map[string]*StatsInstance // unordered db of StatsQueues, used for fast queries
queues StatsInstances // ordered list of StatsQueues
evCache *StatsEventCache // so we can pass it to queues
queuesCache map[string]*StatQueue // unordered db of StatQueues, used for fast queries
queues StatQueues // ordered list of StatQueues
}
@@ -91,7 +91,7 @@ func (ss *StatService) Shutdown() error {
// setQueue adds or modifies a queue into cache
// sort will reorder the ss.queues
func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) {
func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) {
sq, err := ss.dataDB.GetStatsConfig(qID)
if err != nil {
return nil, err
@@ -102,16 +102,16 @@ func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) {
return nil, err
}
}
return NewStatsInstance(ss.evCache, ss.ms, sq, sqSM)
return NewStatQueue(ss.evCache, ss.ms, sq, sqSM)
}
func (ss *StatService) setQueue(q *StatsInstance) {
func (ss *StatService) setQueue(q *StatQueue) {
ss.queuesCache[q.cfg.ID] = q
ss.queues = append(ss.queues, q)
}
// remQueue will remove a queue based on it's ID
func (ss *StatService) remQueue(qID string) (si *StatsInstance) {
func (ss *StatService) remQueue(qID string) (si *StatQueue) {
si = ss.queuesCache[qID]
ss.queues.remWithID(qID)
delete(ss.queuesCache, qID)
@@ -240,7 +240,7 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err
if qIDs == nil || len(*qIDs) == 0 {
return utils.ErrNotFound
}
var sQs []*StatsInstance // cache here so we lock only later when data available
var sQs []*StatQueue // cache here so we lock only later when data available
for _, qID := range *qIDs {
if _, hasPrev := ss.queuesCache[qID]; hasPrev {
continue // don't overwrite previous, could be extended in the future by carefully checking cached events

View File

@@ -57,6 +57,8 @@ var (
CacheResourceProfiles: ResourceProfilesPrefix,
CacheResources: ResourcesPrefix,
CacheTimings: TimingsPrefix,
CacheStatSQueues: META_NONE,
CacheStatSEventQueues: META_NONE,
}
CachePrefixToInstance map[string]string // will be built on init
)
@@ -433,6 +435,8 @@ const (
CostSource = "CostSource"
ExtraInfo = "ExtraInfo"
MetaPrefix = "*"
CacheStatSQueues = "stats_queues"
CacheStatSEventQueues = "stats_event_queues"
)
func buildCacheInstRevPrefixes() {