diff --git a/cache2go/response_cache.go b/cache2go/response_cache.go index e4c9cef81..4ac57e3f7 100644 --- a/cache2go/response_cache.go +++ b/cache2go/response_cache.go @@ -4,6 +4,8 @@ import ( "errors" "sync" "time" + + "github.com/cgrates/cgrates/utils" ) var ErrNotFound = errors.New("NOT_FOUND") @@ -25,10 +27,14 @@ func NewResponseCache(ttl time.Duration) *ResponseCache { ttl: ttl, cache: make(map[string]*CacheItem), semaphore: make(map[string]chan bool), + mu: sync.RWMutex{}, } } func (rc *ResponseCache) Cache(key string, item *CacheItem) { + if rc.ttl == 0 { + return + } rc.mu.Lock() rc.cache[key] = item if _, found := rc.semaphore[key]; found { @@ -45,6 +51,9 @@ func (rc *ResponseCache) Cache(key string, item *CacheItem) { } func (rc *ResponseCache) Get(key string) (*CacheItem, error) { + if rc.ttl == 0 { + return nil, utils.ErrNotImplemented + } rc.wait(key) // wait for other goroutine processsing this key rc.mu.RLock() defer rc.mu.RUnlock() diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c5c2dc815..23331770c 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -567,7 +567,7 @@ func main() { }() wg.Wait() - responder := &engine.Responder{ExitChan: exitChan, Stats: cdrStats, Timeout: 10 * time.Minute} + responder := engine.NewResponder(exitChan, nil, cdrStats, 10*time.Minute, cfg.ResponseCacheTTL) apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats} apierRpcV2 := &v2.ApierV2{ApierV1: v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats}} diff --git a/config/config.go b/config/config.go index 3c382c2c4..828799ed1 100644 --- a/config/config.go +++ b/config/config.go @@ -188,6 +188,7 @@ type CGRConfig struct { DefaultSubject string // set default rating subject, useful in case of fallback Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb> ConnectAttempts int // number of initial connection attempts before giving up + ResponseCacheTTL time.Duration // the life span of a cached response RoundingDecimals int // Number of decimals to round end prices at HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate TpExportPath string // Path towards export folder for offline Tariff Plans @@ -524,6 +525,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnGeneralCfg.Connect_attempts != nil { self.ConnectAttempts = *jsnGeneralCfg.Connect_attempts } + if jsnGeneralCfg.Response_cache_ttl != nil { + if self.ResponseCacheTTL, err = utils.ParseDurationWithSecs(*jsnGeneralCfg.Response_cache_ttl); err != nil { + return err + } + } if jsnGeneralCfg.Reconnects != nil { self.Reconnects = *jsnGeneralCfg.Reconnects } diff --git a/config/config_defaults.go b/config/config_defaults.go index babc6765f..ca8737f54 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -37,6 +37,7 @@ const CGRATES_CFG_JSON = ` "default_tenant": "cgrates.org", // default Tenant to consider when missing from requests "default_subject": "cgrates", // default rating Subject to consider when missing from requests "connect_attempts": 3, // initial server connect attempts + "response_cache_ttl": "3s", // the life span of a cached response "reconnects": -1, // number of retries in case of connection lost }, @@ -189,10 +190,10 @@ const CGRATES_CFG_JSON = ` "sm_freeswitch": { "enabled": false, // starts SessionManager service: "ha_rater": [ - {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + {"server": "internal", "timeout": "100ms"} ], "ha_cdrs": [ - {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + {"server": "internal", "timeout": "100ms"} ], "reconnects": 5, // number of reconnect attempts to rater or cdrs "create_cdr": false, // create CDR out of events and sends them to CDRS component @@ -215,10 +216,10 @@ const CGRATES_CFG_JSON = ` "sm_kamailio": { "enabled": false, // starts SessionManager service: "ha_rater": [ - {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + {"server": "internal", "timeout": "100ms"} ], "ha_cdrs": [ - {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + {"server": "internal", "timeout": "100ms"} ], "reconnects": 5, // number of reconnect attempts to rater or cdrs "create_cdr": false, // create CDR out of events and sends them to CDRS component @@ -235,10 +236,10 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts SessionManager service: "listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS "ha_rater": [ - {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + {"server": "internal", "timeout": "100ms"} ], "ha_cdrs": [ - {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + {"server": "internal", "timeout": "100ms"} ], "reconnects": 5, // number of reconnects if connection is lost "create_cdr": false, // create CDR out of events and sends them to CDRS component diff --git a/config/config_json_test.go b/config/config_json_test.go index 4a0658c0f..7e819e100 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -47,7 +47,8 @@ func TestDfGeneralJsonCfg(t *testing.T) { Default_tenant: utils.StringPointer("cgrates.org"), Default_subject: utils.StringPointer("cgrates"), Connect_attempts: utils.IntPointer(3), - Reconnects: utils.IntPointer(-1)} + Reconnects: utils.IntPointer(-1), + Response_cache_ttl: utils.StringPointer("3s")} if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCfg, gCfg) { @@ -315,15 +316,13 @@ func TestSmFsJsonCfg(t *testing.T) { Enabled: utils.BoolPointer(false), Ha_rater: &[]*HaPoolJsonCfg{ &HaPoolJsonCfg{ - Server: utils.StringPointer("internal"), - Timeout: utils.StringPointer("100ms"), - Time_to_live: utils.StringPointer("3s"), + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), }}, Ha_cdrs: &[]*HaPoolJsonCfg{ &HaPoolJsonCfg{ - Server: utils.StringPointer("internal"), - Timeout: utils.StringPointer("100ms"), - Time_to_live: utils.StringPointer("3s"), + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), }}, Reconnects: utils.IntPointer(5), Create_cdr: utils.BoolPointer(false), @@ -356,15 +355,13 @@ func TestSmKamJsonCfg(t *testing.T) { Enabled: utils.BoolPointer(false), Ha_rater: &[]*HaPoolJsonCfg{ &HaPoolJsonCfg{ - Server: utils.StringPointer("internal"), - Timeout: utils.StringPointer("100ms"), - Time_to_live: utils.StringPointer("3s"), + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), }}, Ha_cdrs: &[]*HaPoolJsonCfg{ &HaPoolJsonCfg{ - Server: utils.StringPointer("internal"), - Timeout: utils.StringPointer("100ms"), - Time_to_live: utils.StringPointer("3s"), + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), }}, Reconnects: utils.IntPointer(5), Create_cdr: utils.BoolPointer(false), @@ -391,15 +388,13 @@ func TestSmOsipsJsonCfg(t *testing.T) { Listen_udp: utils.StringPointer("127.0.0.1:2020"), Ha_rater: &[]*HaPoolJsonCfg{ &HaPoolJsonCfg{ - Server: utils.StringPointer("internal"), - Timeout: utils.StringPointer("100ms"), - Time_to_live: utils.StringPointer("3s"), + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), }}, Ha_cdrs: &[]*HaPoolJsonCfg{ &HaPoolJsonCfg{ - Server: utils.StringPointer("internal"), - Timeout: utils.StringPointer("100ms"), - Time_to_live: utils.StringPointer("3s"), + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), }}, Reconnects: utils.IntPointer(5), Create_cdr: utils.BoolPointer(false), diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 287a455c2..02d6a3523 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -30,6 +30,7 @@ type GeneralJsonCfg struct { Default_subject *string Reconnects *int Connect_attempts *int + Response_cache_ttl *string } // Listen config section @@ -164,9 +165,8 @@ type SmFsJsonCfg struct { // Represents one connection instance towards a rater/cdrs server type HaPoolJsonCfg struct { - Server *string - Timeout *string - Time_to_live *string + Server *string + Timeout *string } // Represents one connection instance towards FreeSWITCH diff --git a/config/smconfig.go b/config/smconfig.go index 6d4644988..43bfce4c0 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -35,9 +35,8 @@ func NewDfltHaPoolConfig() *HaPoolConfig { // One connection to FreeSWITCH server type HaPoolConfig struct { - Server string - Timeout time.Duration - TimeToLive time.Duration + Server string + Timeout time.Duration } func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error { @@ -53,11 +52,6 @@ func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error { return err } } - if jsnCfg.Time_to_live != nil { - if self.TimeToLive, err = utils.ParseDurationWithSecs(*jsnCfg.Time_to_live); err != nil { - return err - } - } return nil } diff --git a/engine/responder.go b/engine/responder.go index 0e861c2b6..cd7da4e29 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -41,17 +41,29 @@ type SessionRun struct { CallCosts []*CallCost } -var ( - timeToLive = 5 * time.Second - responseCache = cache2go.NewResponseCache(timeToLive) -) - type Responder struct { - Bal *balancer2go.Balancer - ExitChan chan bool - CdrSrv *CdrServer - Stats StatsInterface - Timeout time.Duration + Bal *balancer2go.Balancer + ExitChan chan bool + CdrSrv *CdrServer + Stats StatsInterface + Timeout time.Duration + responseCache *cache2go.ResponseCache +} + +func NewResponder(exitChan chan bool, cdrSrv *CdrServer, stats StatsInterface, timeout, timeToLive time.Duration) *Responder { + return &Responder{ + ExitChan: exitChan, + Stats: stats, + Timeout: timeToLive, + responseCache: cache2go.NewResponseCache(timeToLive), + } +} + +func (rs *Responder) getCache() *cache2go.ResponseCache { + if rs.responseCache == nil { + rs.responseCache = cache2go.NewResponseCache(0) + } + return rs.responseCache } /* @@ -96,7 +108,7 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) { } func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) { - if item, err := responseCache.Get(utils.MAX_DEBIT_CACHE_PREFIX + arg.CgrId); err == nil && item != nil { + if item, err := rs.getCache().Get(utils.MAX_DEBIT_CACHE_PREFIX + arg.CgrId); err == nil && item != nil { *reply = *(item.Value.(*CallCost)) return item.Err } @@ -109,7 +121,7 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) } else { r, e := arg.MaxDebit() if e != nil { - responseCache.Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{ + rs.getCache().Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{ Err: e, }) return e @@ -117,7 +129,7 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) *reply = *r } } - responseCache.Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{ + rs.getCache().Cache(utils.MAX_DEBIT_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{ Value: reply, Err: err, }) @@ -125,7 +137,7 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) } func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err error) { - if item, err := responseCache.Get(utils.REFUND_INCR_CACHE_PREFIX + arg.CgrId); err == nil && item != nil { + if item, err := rs.getCache().Get(utils.REFUND_INCR_CACHE_PREFIX + arg.CgrId); err == nil && item != nil { *reply = *(item.Value.(*float64)) return item.Err } @@ -140,7 +152,7 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err }, arg.GetAccountKey()) *reply, err = r.(float64), e } - responseCache.Cache(utils.REFUND_INCR_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{ + rs.getCache().Cache(utils.REFUND_INCR_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{ Value: reply, Err: err, }) @@ -231,7 +243,7 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err // Used by SM to get all the prepaid CallDescriptors attached to a session func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { - if item, err := responseCache.Get(utils.GET_SESS_RUNS_CACHE_PREFIX + ev.CgrId); err == nil && item != nil { + if item, err := rs.getCache().Get(utils.GET_SESS_RUNS_CACHE_PREFIX + ev.CgrId); err == nil && item != nil { *sRuns = *(item.Value.(*[]*SessionRun)) return item.Err } @@ -245,7 +257,7 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} var dcs utils.DerivedChargers if err := rs.GetDerivedChargers(attrsDC, &dcs); err != nil { - responseCache.Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{ + rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{ Err: err, }) return err @@ -258,7 +270,7 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { } startTime, err := ev.GetAnswerTime(dc.AnswerTimeField) if err != nil { - responseCache.Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{ + rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{ Err: err, }) return errors.New("Error parsing answer event start time") @@ -274,7 +286,7 @@ func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error { sesRuns = append(sesRuns, &SessionRun{DerivedCharger: dc, CallDescriptor: cd}) } *sRuns = sesRuns - responseCache.Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{ + rs.getCache().Cache(utils.GET_SESS_RUNS_CACHE_PREFIX+ev.CgrId, &cache2go.CacheItem{ Value: sRuns, }) return nil @@ -304,25 +316,25 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error { } func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *string) error { - if item, err := responseCache.Get(utils.LOG_CALL_COST_CACHE_PREFIX + ccl.CgrId); err == nil && item != nil { + if item, err := rs.getCache().Get(utils.LOG_CALL_COST_CACHE_PREFIX + ccl.CgrId); err == nil && item != nil { *reply = item.Value.(string) return item.Err } if rs.CdrSrv == nil { err := errors.New("CDR_SERVER_NOT_RUNNING") - responseCache.Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{ + rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{ Err: err, }) return err } if err := rs.CdrSrv.LogCallCost(ccl); err != nil { - responseCache.Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{ + rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{ Err: err, }) return err } *reply = utils.OK - responseCache.Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{ + rs.getCache().Cache(utils.LOG_CALL_COST_CACHE_PREFIX+ccl.CgrId, &cache2go.CacheItem{ Value: utils.OK, }) return nil diff --git a/sessionmanager/session_test.go b/sessionmanager/session_test.go index d982f9a13..e088fada1 100644 --- a/sessionmanager/session_test.go +++ b/sessionmanager/session_test.go @@ -100,6 +100,7 @@ func (mc *MockConnector) GetSessionRuns(*engine.StoredCdr, *[]*engine.SessionRun func (mc *MockConnector) ProcessCdr(*engine.StoredCdr, *string) error { return nil } func (mc *MockConnector) LogCallCost(*engine.CallCostLog, *string) error { return nil } func (mc *MockConnector) GetLCR(*engine.CallDescriptor, *engine.LCRCost) error { return nil } +func (mc *MockConnector) GetTimeout() time.Duration { return 0 } func TestSessionRefund(t *testing.T) { mc := &MockConnector{}