CDRs with RPC response caching, removing old RespCache methods and response_cache_ttl config option

This commit is contained in:
DanB
2019-03-04 20:49:33 +01:00
parent 26ea90c351
commit 4835ba2238
13 changed files with 101 additions and 197 deletions

View File

@@ -104,7 +104,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
responder := &engine.Responder{
ExitChan: exitChan,
MaxComputedUsage: cfg.RalsCfg().RALsMaxComputedUsage}
responder.SetTimeToLive(cfg.GeneralCfg().ResponseCacheTTL, nil)
apierRpcV1 := &v1.ApierV1{
StorDb: loadDb,
DataManager: dm,

View File

@@ -45,7 +45,6 @@ const CGRATES_CFG_JSON = `
"reconnects": -1, // number of retries in case of connection lost
"connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature
"reply_timeout": "2s", // consider connection down for replies taking longer than this value
"response_cache_ttl": "0s", // the life span of a cached response
"internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
"locking_timeout": "0", // timeout internal locks to avoid deadlocks
"digest_separator": ",",

View File

@@ -55,7 +55,6 @@ func TestDfGeneralJsonCfg(t *testing.T) {
Reconnects: utils.IntPointer(-1),
Connect_timeout: utils.StringPointer("1s"),
Reply_timeout: utils.StringPointer("2s"),
Response_cache_ttl: utils.StringPointer("0s"),
Internal_ttl: utils.StringPointer("2m"),
Locking_timeout: utils.StringPointer("0"),
Digest_separator: utils.StringPointer(","),

View File

@@ -361,9 +361,6 @@ func TestCgrCfgJSONDefaultsGeneral(t *testing.T) {
if cgrCfg.GeneralCfg().ReplyTimeout != 2*time.Second {
t.Errorf("Expected: 2s, received: %+v", cgrCfg.GeneralCfg().ReplyTimeout)
}
if cgrCfg.GeneralCfg().ResponseCacheTTL != 0*time.Second {
t.Errorf("Expected: 0s, received: %+v", cgrCfg.GeneralCfg().ResponseCacheTTL)
}
if cgrCfg.GeneralCfg().InternalTtl != 2*time.Minute {
t.Errorf("Expected: 2m, received: %+v", cgrCfg.GeneralCfg().InternalTtl)
}

View File

@@ -44,7 +44,6 @@ type GeneralCfg struct {
Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb>
ConnectTimeout time.Duration // timeout for RPC connection attempts
ReplyTimeout time.Duration // timeout replies if not reaching back
ResponseCacheTTL time.Duration // the life span of a cached response
InternalTtl time.Duration // maximum duration to wait for internal connections before giving up
LockingTimeout time.Duration // locking mechanism timeout to avoid deadlocks
DigestSeparator string //
@@ -82,11 +81,6 @@ func (gencfg *GeneralCfg) loadFromJsonCfg(jsnGeneralCfg *GeneralJsonCfg) (err er
if jsnGeneralCfg.Connect_attempts != nil {
gencfg.ConnectAttempts = *jsnGeneralCfg.Connect_attempts
}
if jsnGeneralCfg.Response_cache_ttl != nil {
if gencfg.ResponseCacheTTL, err = utils.ParseDurationWithNanosecs(*jsnGeneralCfg.Response_cache_ttl); err != nil {
return err
}
}
if jsnGeneralCfg.Reconnects != nil {
gencfg.Reconnects = *jsnGeneralCfg.Reconnects
}

View File

@@ -80,7 +80,6 @@ func TestGeneralCfgloadFromJsonCfg(t *testing.T) {
Reconnects: -1,
ConnectTimeout: time.Duration(1 * time.Second),
ReplyTimeout: time.Duration(2 * time.Second),
ResponseCacheTTL: time.Duration(0),
InternalTtl: time.Duration(2 * time.Minute),
LockingTimeout: time.Duration(0),
DigestSeparator: ",",

View File

@@ -37,7 +37,6 @@ type GeneralJsonCfg struct {
Reconnects *int
Connect_timeout *string
Reply_timeout *string
Response_cache_ttl *string
Internal_ttl *string
Locking_timeout *string
Digest_separator *string

View File

@@ -89,7 +89,6 @@ func NewCDRServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r
rals: rater, attrS: attrS,
statS: statS, thdS: thdS,
chargerS: chargerS, guard: guardian.Guardian,
respCache: utils.NewResponseCache(cgrCfg.GeneralCfg().ResponseCacheTTL),
httpPoster: NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
cgrCfg.GeneralCfg().ReplyTimeout), filterS: filterS}
}
@@ -105,7 +104,6 @@ type CDRServer struct {
statS rpcclient.RpcClientConnection
chargerS rpcclient.RpcClientConnection
guard *guardian.GuardianLocker
respCache *utils.ResponseCache
httpPoster *HTTPPoster // used for replication
filterS *FilterS
}
@@ -412,15 +410,24 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) {
if cdr.CGRID == utils.EmptyString { // Populate CGRID if not present
cdr.ComputeCGRID()
}
cacheKey := "V1ProcessCDR" + cdr.CGRID + cdr.RunID
if item, err := cdrS.respCache.Get(cacheKey); err == nil && item != nil {
if item.Err == nil {
*reply = *item.Value.(*string)
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessCDR, cdr.CGRID, cdr.RunID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
return item.Err
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
defer cdrS.respCache.Cache(cacheKey,
&utils.ResponseCacheItem{Value: reply, Err: err})
// end of RPC caching
if cdr.RequestType == utils.EmptyString {
cdr.RequestType = cdrS.cgrCfg.GeneralCfg().DefaultReqType
@@ -447,7 +454,8 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) {
}
if cdrS.attrS != nil {
if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
return utils.NewErrServerError(err)
err = utils.NewErrServerError(err)
return
}
}
if cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs { // Store *raw CDR
@@ -455,7 +463,8 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) {
utils.Logger.Warning(
fmt.Sprintf("<%s> storing primary CDR %+v, got error: %s",
utils.CDRs, cdr, err.Error()))
return utils.NewErrServerError(err) // Cannot store CDR
err = utils.NewErrServerError(err) // Cannot store CDR
return
}
}
if len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 {
@@ -488,20 +497,43 @@ type ArgV2ProcessCDR struct {
// V2ProcessCDR will process the CDR out of CGREvent
func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err error) {
if arg.CGREvent.ID == "" {
arg.CGREvent.ID = utils.GenUUID()
}
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV2ProcessCDR, arg.CGREvent.ID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
attrS := cdrS.attrS != nil
if arg.AttributeS != nil {
attrS = *arg.AttributeS
}
cgrEv := &arg.CGREvent
if attrS {
if err := cdrS.attrSProcessEvent(cgrEv); err != nil {
return utils.NewErrServerError(err)
if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
err = utils.NewErrServerError(err)
return
}
}
rawCDR, err := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
return utils.NewErrServerError(err)
var rawCDR *CDR
if rawCDR, err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
err = utils.NewErrServerError(err)
return
}
store := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs
if arg.Store != nil {
@@ -509,7 +541,8 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er
}
if store { // Store *raw CDR
if err = cdrS.cdrDb.SetCDR(rawCDR, false); err != nil {
return utils.NewErrServerError(err) // Cannot store CDR
err = utils.NewErrServerError(err) // Cannot store CDR
return
}
}
export := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0
@@ -546,26 +579,63 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er
}
// V1StoreSMCost handles storing of the cost into session_costs table
func (cdrS *CDRServer) V1StoreSessionCost(attr *AttrCDRSStoreSMCost, reply *string) error {
func (cdrS *CDRServer) V1StoreSessionCost(attr *AttrCDRSStoreSMCost, reply *string) (err error) {
if attr.Cost.CGRID == "" {
return utils.NewCGRError(utils.CDRSCtx,
utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing),
"SMCost: %+v with empty CGRID")
}
if err := cdrS.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil {
return utils.NewErrServerError(err)
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, attr.Cost.CGRID, attr.Cost.RunID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
if err = cdrS.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil {
err = utils.NewErrServerError(err)
return
}
*reply = utils.OK
return nil
}
// V2StoreSessionCost will store the SessionCost into session_costs table
func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *string) error {
func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *string) (err error) {
if args.Cost.CGRID == "" {
return utils.NewCGRError(utils.CDRSCtx,
utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing),
"SMCost: %+v with empty CGRID")
}
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, args.Cost.CGRID, args.Cost.RunID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
cc := args.Cost.CostDetails.AsCallCost()
cc.Round()
roundIncrements := cc.GetRoundIncrements()
@@ -575,14 +645,14 @@ func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *st
cd.RunID = args.Cost.RunID
cd.Increments = roundIncrements
var response float64
if err := cdrS.rals.Call("Responder.RefundRounding",
if err := cdrS.rals.Call(utils.ResponderRefundRounding,
cd, &response); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<CDRS> RefundRounding for cc: %+v, got error: %s",
cc, err.Error()))
}
}
if err := cdrS.storeSMCost(
if err = cdrS.storeSMCost(
&SMCost{
CGRID: args.Cost.CGRID,
RunID: args.Cost.RunID,
@@ -592,10 +662,11 @@ func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *st
Usage: args.Cost.Usage,
CostDetails: args.Cost.CostDetails},
args.CheckDuplicate); err != nil {
return utils.NewErrServerError(err)
err = utils.NewErrServerError(err)
return
}
*reply = utils.OK
return nil
return
}
@@ -609,6 +680,7 @@ type ArgRateCDRs struct {
}
// V1RateCDRs is used for re-/rate CDRs which are already stored within StorDB
// FixMe: add RPC caching
func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) {
cdrFltr, err := arg.RPCCDRsFilter.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {

View File

@@ -43,19 +43,6 @@ type Responder struct {
Timeout time.Duration
Timezone string
MaxComputedUsage map[string]time.Duration
responseCache *utils.ResponseCache
}
func (rs *Responder) SetTimeToLive(timeToLive time.Duration, out *int) error {
rs.responseCache = utils.NewResponseCache(timeToLive)
return nil
}
func (rs *Responder) getCache() *utils.ResponseCache {
if rs.responseCache == nil {
rs.responseCache = utils.NewResponseCache(0)
}
return rs.responseCache
}
// usageAllowed checks requested usage against configured MaxComputedUsage

View File

@@ -108,7 +108,6 @@ func NewSessionS(cgrCfg *config.CGRConfig, ralS, resS, thdS,
splS: splS,
attrS: attrS,
cdrS: cdrS,
respCache: utils.NewResponseCache(cgrCfg.GeneralCfg().ResponseCacheTTL),
sReplConns: sReplConns,
biJClnts: make(map[rpcclient.RpcClientConnection]string),
biJIDs: make(map[string]*biJClient),
@@ -139,8 +138,7 @@ type SessionS struct {
attrS rpcclient.RpcClientConnection // AttributeS connections
cdrS rpcclient.RpcClientConnection // CDR server connections
respCache *utils.ResponseCache // cache replies
sReplConns []*SReplConn // list of connections where we will replicate our session data
sReplConns []*SReplConn // list of connections where we will replicate our session data
biJMux sync.RWMutex // mux protecting BI-JSON connections
biJClnts map[rpcclient.RpcClientConnection]string // index BiJSONConnection so we can sync them later

View File

@@ -818,6 +818,8 @@ const (
CDRsV1CountCDRs = "CDRsV1.CountCDRs"
CDRsV1RateCDRs = "CDRsV1.RateCDRs"
CDRsV1GetCDRs = "CDRsV1.GetCDRs"
CDRsV1ProcessCDR = "CDRsV1.ProcessCDR"
CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost"
CDRsV2ProcessCDR = "CDRsV2.ProcessCDR"
CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost"
)

View File

@@ -1,97 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package utils
import (
"sync"
"time"
)
type ResponseCacheItem struct {
Value interface{}
Err error
}
type ResponseCache struct {
ttl time.Duration
cache map[string]*ResponseCacheItem
semaphore map[string]chan bool // used for waiting till the first goroutine processes the response
mu sync.RWMutex
}
func NewResponseCache(ttl time.Duration) *ResponseCache {
return &ResponseCache{
ttl: ttl,
cache: make(map[string]*ResponseCacheItem),
semaphore: make(map[string]chan bool),
mu: sync.RWMutex{},
}
}
func (rc *ResponseCache) Cache(key string, item *ResponseCacheItem) {
if rc.ttl == 0 {
return
}
rc.mu.Lock()
rc.cache[key] = item
if _, found := rc.semaphore[key]; found {
close(rc.semaphore[key]) // send release signal
delete(rc.semaphore, key) // delete key
}
rc.mu.Unlock()
go func() {
time.Sleep(rc.ttl)
rc.mu.Lock()
delete(rc.cache, key)
rc.mu.Unlock()
}()
}
func (rc *ResponseCache) Get(key string) (*ResponseCacheItem, error) {
if rc.ttl == 0 {
return nil, ErrNotImplemented
}
rc.mu.RLock()
item, ok := rc.cache[key]
rc.mu.RUnlock()
if ok {
return item, nil
}
rc.wait(key) // wait for other goroutine processsing this key
rc.mu.RLock()
defer rc.mu.RUnlock()
item, ok = rc.cache[key]
if !ok {
return nil, ErrNotFound
}
return item, nil
}
func (rc *ResponseCache) wait(key string) {
rc.mu.RLock()
lockChan, found := rc.semaphore[key]
rc.mu.RUnlock()
if found {
<-lockChan
} else {
rc.mu.Lock()
rc.semaphore[key] = make(chan bool)
rc.mu.Unlock()
}
}

View File

@@ -1,44 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package utils
import (
"testing"
"time"
)
func TestRCacheSetGet(t *testing.T) {
rc := NewResponseCache(5 * time.Second)
rc.Cache("test", &ResponseCacheItem{Value: "best"})
v, err := rc.Get("test")
if err != nil || v.Value.(string) != "best" {
t.Error("Error retriving response cache: ", v, err)
}
}
/*
func TestRCacheExpire(t *testing.T) {
rc := NewResponseCache(1 * time.Microsecond)
rc.Cache("test", &CacheItem{Value: "best"})
time.Sleep(3 * time.Millisecond)
o, err := rc.Get("test")
if err == nil {
t.Error("Error expiring response cache: ", o)
}
}
*/