moved response cache ttl in general config

This commit is contained in:
Radu Ioan Fericean
2015-07-24 17:57:46 +03:00
parent c02ee71899
commit 4f11a40a57
9 changed files with 78 additions and 60 deletions

View File

@@ -4,6 +4,8 @@ import (
"errors"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
)
var ErrNotFound = errors.New("NOT_FOUND")
@@ -25,10 +27,14 @@ func NewResponseCache(ttl time.Duration) *ResponseCache {
ttl: ttl,
cache: make(map[string]*CacheItem),
semaphore: make(map[string]chan bool),
mu: sync.RWMutex{},
}
}
func (rc *ResponseCache) Cache(key string, item *CacheItem) {
if rc.ttl == 0 {
return
}
rc.mu.Lock()
rc.cache[key] = item
if _, found := rc.semaphore[key]; found {
@@ -45,6 +51,9 @@ func (rc *ResponseCache) Cache(key string, item *CacheItem) {
}
func (rc *ResponseCache) Get(key string) (*CacheItem, error) {
if rc.ttl == 0 {
return nil, utils.ErrNotImplemented
}
rc.wait(key) // wait for other goroutine processsing this key
rc.mu.RLock()
defer rc.mu.RUnlock()

View File

@@ -567,7 +567,7 @@ func main() {
}()
wg.Wait()
responder := &engine.Responder{ExitChan: exitChan, Stats: cdrStats, Timeout: 10 * time.Minute}
responder := engine.NewResponder(exitChan, nil, cdrStats, 10*time.Minute, cfg.ResponseCacheTTL)
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats}
apierRpcV2 := &v2.ApierV2{ApierV1: v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats}}

View File

@@ -188,6 +188,7 @@ type CGRConfig struct {
DefaultSubject string // set default rating subject, useful in case of fallback
Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb>
ConnectAttempts int // number of initial connection attempts before giving up
ResponseCacheTTL time.Duration // the life span of a cached response
RoundingDecimals int // Number of decimals to round end prices at
HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate
TpExportPath string // Path towards export folder for offline Tariff Plans
@@ -524,6 +525,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
if jsnGeneralCfg.Connect_attempts != nil {
self.ConnectAttempts = *jsnGeneralCfg.Connect_attempts
}
if jsnGeneralCfg.Response_cache_ttl != nil {
if self.ResponseCacheTTL, err = utils.ParseDurationWithSecs(*jsnGeneralCfg.Response_cache_ttl); err != nil {
return err
}
}
if jsnGeneralCfg.Reconnects != nil {
self.Reconnects = *jsnGeneralCfg.Reconnects
}

View File

@@ -37,6 +37,7 @@ const CGRATES_CFG_JSON = `
"default_tenant": "cgrates.org", // default Tenant to consider when missing from requests
"default_subject": "cgrates", // default rating Subject to consider when missing from requests
"connect_attempts": 3, // initial server connect attempts
"response_cache_ttl": "3s", // the life span of a cached response
"reconnects": -1, // number of retries in case of connection lost
},
@@ -189,10 +190,10 @@ const CGRATES_CFG_JSON = `
"sm_freeswitch": {
"enabled": false, // starts SessionManager service: <true|false>
"ha_rater": [
{"server": "internal", "timeout": "100ms", "time_to_live": "3s"}
{"server": "internal", "timeout": "100ms"}
],
"ha_cdrs": [
{"server": "internal", "timeout": "100ms", "time_to_live": "3s"}
{"server": "internal", "timeout": "100ms"}
],
"reconnects": 5, // number of reconnect attempts to rater or cdrs
"create_cdr": false, // create CDR out of events and sends them to CDRS component
@@ -215,10 +216,10 @@ const CGRATES_CFG_JSON = `
"sm_kamailio": {
"enabled": false, // starts SessionManager service: <true|false>
"ha_rater": [
{"server": "internal", "timeout": "100ms", "time_to_live": "3s"}
{"server": "internal", "timeout": "100ms"}
],
"ha_cdrs": [
{"server": "internal", "timeout": "100ms", "time_to_live": "3s"}
{"server": "internal", "timeout": "100ms"}
],
"reconnects": 5, // number of reconnect attempts to rater or cdrs
"create_cdr": false, // create CDR out of events and sends them to CDRS component
@@ -235,10 +236,10 @@ const CGRATES_CFG_JSON = `
"enabled": false, // starts SessionManager service: <true|false>
"listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS
"ha_rater": [
{"server": "internal", "timeout": "100ms", "time_to_live": "3s"}
{"server": "internal", "timeout": "100ms"}
],
"ha_cdrs": [
{"server": "internal", "timeout": "100ms", "time_to_live": "3s"}
{"server": "internal", "timeout": "100ms"}
],
"reconnects": 5, // number of reconnects if connection is lost
"create_cdr": false, // create CDR out of events and sends them to CDRS component

View File

@@ -47,7 +47,8 @@ func TestDfGeneralJsonCfg(t *testing.T) {
Default_tenant: utils.StringPointer("cgrates.org"),
Default_subject: utils.StringPointer("cgrates"),
Connect_attempts: utils.IntPointer(3),
Reconnects: utils.IntPointer(-1)}
Reconnects: utils.IntPointer(-1),
Response_cache_ttl: utils.StringPointer("3s")}
if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, gCfg) {
@@ -315,15 +316,13 @@ func TestSmFsJsonCfg(t *testing.T) {
Enabled: utils.BoolPointer(false),
Ha_rater: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
Time_to_live: utils.StringPointer("3s"),
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
}},
Ha_cdrs: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
Time_to_live: utils.StringPointer("3s"),
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
}},
Reconnects: utils.IntPointer(5),
Create_cdr: utils.BoolPointer(false),
@@ -356,15 +355,13 @@ func TestSmKamJsonCfg(t *testing.T) {
Enabled: utils.BoolPointer(false),
Ha_rater: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
Time_to_live: utils.StringPointer("3s"),
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
}},
Ha_cdrs: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
Time_to_live: utils.StringPointer("3s"),
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
}},
Reconnects: utils.IntPointer(5),
Create_cdr: utils.BoolPointer(false),
@@ -391,15 +388,13 @@ func TestSmOsipsJsonCfg(t *testing.T) {
Listen_udp: utils.StringPointer("127.0.0.1:2020"),
Ha_rater: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
Time_to_live: utils.StringPointer("3s"),
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
}},
Ha_cdrs: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
Time_to_live: utils.StringPointer("3s"),
Server: utils.StringPointer("internal"),
Timeout: utils.StringPointer("100ms"),
}},
Reconnects: utils.IntPointer(5),
Create_cdr: utils.BoolPointer(false),

View File

@@ -30,6 +30,7 @@ type GeneralJsonCfg struct {
Default_subject *string
Reconnects *int
Connect_attempts *int
Response_cache_ttl *string
}
// Listen config section
@@ -164,9 +165,8 @@ type SmFsJsonCfg struct {
// Represents one connection instance towards a rater/cdrs server
type HaPoolJsonCfg struct {
Server *string
Timeout *string
Time_to_live *string
Server *string
Timeout *string
}
// Represents one connection instance towards FreeSWITCH

View File

@@ -35,9 +35,8 @@ func NewDfltHaPoolConfig() *HaPoolConfig {
// One connection to FreeSWITCH server
type HaPoolConfig struct {
Server string
Timeout time.Duration
TimeToLive time.Duration
Server string
Timeout time.Duration
}
func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error {
@@ -53,11 +52,6 @@ func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error {
return err
}
}
if jsnCfg.Time_to_live != nil {
if self.TimeToLive, err = utils.ParseDurationWithSecs(*jsnCfg.Time_to_live); err != nil {
return err
}
}
return nil
}

View File

@@ -41,17 +41,29 @@ type SessionRun struct {
CallCosts []*CallCost
}
var (
timeToLive = 5 * time.Second
responseCache = cache2go.NewResponseCache(timeToLive)
)
type Responder struct {
Bal *balancer2go.Balancer
ExitChan chan bool
CdrSrv *CdrServer
Stats StatsInterface
Timeout time.Duration
Bal *balancer2go.Balancer
ExitChan chan bool
CdrSrv *CdrServer
Stats StatsInterface
Timeout time.Duration
responseCache *cache2go.ResponseCache
}
func NewResponder(exitChan chan bool, cdrSrv *CdrServer, stats StatsInterface, timeout, timeToLive time.Duration) *Responder {
return &Responder{
ExitChan: exitChan,
Stats: stats,
Timeout: timeToLive,
responseCache: cache2go.NewResponseCache(timeToLive),
}
}
func (rs *Responder) getCache() *cache2go.ResponseCache {
if rs.responseCache == nil {
rs.responseCache = cache2go.NewResponseCache(0)
}
return rs.responseCache
}
/*
@@ -96,7 +108,7 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) {
}
func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) {
if item, err := responseCache.Get(utils.MAX_DEBIT_CACHE_PREFIX + arg.CgrId); err == nil && item != nil {
if item, err := rs.getCache().Get(utils.MAX_DEBIT_CACHE_PREFIX + arg.CgrId); err == nil && item != nil {
*reply = *(item.Value.(*CallCost))
return item.Err
}
@@ -109,7 +121,7 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error)
} else {
r, e := arg.MaxDebit()
if e != nil {
responseCache.Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{
rs.getCache().Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{
Err: e,
})
return e
@@ -117,7 +129,7 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error)
*reply = *r
}
}
responseCache.Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{
rs.getCache().Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{
Value: reply,
Err: err,
})
@@ -125,7 +137,7 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error)
}
func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err error) {
if item, err := responseCache.Get(utils.REFUND_INCR_CACHE_PREFIX + arg.CgrId); err == nil && item != nil {
if item, err := rs.getCache().Get(utils.REFUND_INCR_CACHE_PREFIX + arg.CgrId); err == nil && item != nil {
*reply = *(item.Value.(*float64))
return item.Err
}
@@ -140,7 +152,7 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err
}, arg.GetAccountKey())
*reply, err = r.(float64), e
}
responseCache.Cache(utils.REFUND_INCR_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{
rs.getCache().Cache(utils.REFUND_INCR_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{
Value: reply,
Err: err,
})
@@ -231,7 +243,7 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err
// Used by SM to get all the prepaid CallDescriptors attached to a session
func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
if item, err := responseCache.Get(utils.GET_SESS_RUNS_CACHE_PREFIX + ev.CgrId); err == nil && item != nil {
if item, err := rs.getCache().Get(utils.GET_SESS_RUNS_CACHE_PREFIX + ev.CgrId); err == nil && item != nil {
*sRuns = *(item.Value.(*[]*SessionRun))
return item.Err
}
@@ -245,7 +257,7 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := rs.GetDerivedChargers(attrsDC, &dcs); err != nil {
responseCache.Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{
rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{
Err: err,
})
return err
@@ -258,7 +270,7 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
}
startTime, err := ev.GetAnswerTime(dc.AnswerTimeField)
if err != nil {
responseCache.Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{
rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{
Err: err,
})
return errors.New("Error parsing answer event start time")
@@ -274,7 +286,7 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
sesRuns = append(sesRuns, &SessionRun{DerivedCharger: dc, CallDescriptor: cd})
}
*sRuns = sesRuns
responseCache.Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{
rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{
Value: sRuns,
})
return nil
@@ -304,25 +316,25 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error {
}
func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *string) error {
if item, err := responseCache.Get(utils.LOG_CALL_COST_CACHE_PREFIX + ccl.CgrId); err == nil && item != nil {
if item, err := rs.getCache().Get(utils.LOG_CALL_COST_CACHE_PREFIX + ccl.CgrId); err == nil && item != nil {
*reply = item.Value.(string)
return item.Err
}
if rs.CdrSrv == nil {
err := errors.New("CDR_SERVER_NOT_RUNNING")
responseCache.Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{
rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{
Err: err,
})
return err
}
if err := rs.CdrSrv.LogCallCost(ccl); err != nil {
responseCache.Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{
rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{
Err: err,
})
return err
}
*reply = utils.OK
responseCache.Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{
rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{
Value: utils.OK,
})
return nil

View File

@@ -100,6 +100,7 @@ func (mc *MockConnector) GetSessionRuns(*engine.StoredCdr, *[]*engine.SessionRun
func (mc *MockConnector) ProcessCdr(*engine.StoredCdr, *string) error { return nil }
func (mc *MockConnector) LogCallCost(*engine.CallCostLog, *string) error { return nil }
func (mc *MockConnector) GetLCR(*engine.CallDescriptor, *engine.LCRCost) error { return nil }
func (mc *MockConnector) GetTimeout() time.Duration { return 0 }
func TestSessionRefund(t *testing.T) {
mc := &MockConnector{}