From 4835ba2238b7c005476f198c167cde1bffbed801 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 4 Mar 2019 20:49:33 +0100 Subject: [PATCH] CDRs with RPC response caching, removing old RespCache methods and response_cache_ttl config option --- cmd/cgr-engine/rater.go | 1 - config/config_defaults.go | 1 - config/config_json_test.go | 1 - config/config_test.go | 3 - config/generalcfg.go | 6 -- config/generalcfg_test.go | 1 - config/libconfig_json.go | 1 - engine/cdrs.go | 124 +++++++++++++++++++++++++++-------- engine/responder.go | 13 ---- sessions/sessions.go | 4 +- utils/consts.go | 2 + utils/response_cache.go | 97 --------------------------- utils/response_cache_test.go | 44 ------------- 13 files changed, 101 insertions(+), 197 deletions(-) delete mode 100644 utils/response_cache.go delete mode 100644 utils/response_cache_test.go diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index edb772b97..ab8b8d5f1 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -104,7 +104,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en responder := &engine.Responder{ ExitChan: exitChan, MaxComputedUsage: cfg.RalsCfg().RALsMaxComputedUsage} - responder.SetTimeToLive(cfg.GeneralCfg().ResponseCacheTTL, nil) apierRpcV1 := &v1.ApierV1{ StorDb: loadDb, DataManager: dm, diff --git a/config/config_defaults.go b/config/config_defaults.go index 2c4a69e42..b109b8c62 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -45,7 +45,6 @@ const CGRATES_CFG_JSON = ` "reconnects": -1, // number of retries in case of connection lost "connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature "reply_timeout": "2s", // consider connection down for replies taking longer than this value - "response_cache_ttl": "0s", // the life span of a cached response "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up "locking_timeout": "0", // timeout internal locks to avoid deadlocks "digest_separator": ",", diff --git a/config/config_json_test.go b/config/config_json_test.go index 68a40a494..4c5e2b051 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -55,7 +55,6 @@ func TestDfGeneralJsonCfg(t *testing.T) { Reconnects: utils.IntPointer(-1), Connect_timeout: utils.StringPointer("1s"), Reply_timeout: utils.StringPointer("2s"), - Response_cache_ttl: utils.StringPointer("0s"), Internal_ttl: utils.StringPointer("2m"), Locking_timeout: utils.StringPointer("0"), Digest_separator: utils.StringPointer(","), diff --git a/config/config_test.go b/config/config_test.go index f0e8dea2f..a6877b21a 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -361,9 +361,6 @@ func TestCgrCfgJSONDefaultsGeneral(t *testing.T) { if cgrCfg.GeneralCfg().ReplyTimeout != 2*time.Second { t.Errorf("Expected: 2s, received: %+v", cgrCfg.GeneralCfg().ReplyTimeout) } - if cgrCfg.GeneralCfg().ResponseCacheTTL != 0*time.Second { - t.Errorf("Expected: 0s, received: %+v", cgrCfg.GeneralCfg().ResponseCacheTTL) - } if cgrCfg.GeneralCfg().InternalTtl != 2*time.Minute { t.Errorf("Expected: 2m, received: %+v", cgrCfg.GeneralCfg().InternalTtl) } diff --git a/config/generalcfg.go b/config/generalcfg.go index 07052f093..a58080e99 100644 --- a/config/generalcfg.go +++ b/config/generalcfg.go @@ -44,7 +44,6 @@ type GeneralCfg struct { Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb> ConnectTimeout time.Duration // timeout for RPC connection attempts ReplyTimeout time.Duration // timeout replies if not reaching back - ResponseCacheTTL time.Duration // the life span of a cached response InternalTtl time.Duration // maximum duration to wait for internal connections before giving up LockingTimeout time.Duration // locking mechanism timeout to avoid deadlocks DigestSeparator string // @@ -82,11 +81,6 @@ func (gencfg *GeneralCfg) loadFromJsonCfg(jsnGeneralCfg *GeneralJsonCfg) (err er if jsnGeneralCfg.Connect_attempts != nil { gencfg.ConnectAttempts = *jsnGeneralCfg.Connect_attempts } - if jsnGeneralCfg.Response_cache_ttl != nil { - if gencfg.ResponseCacheTTL, err = utils.ParseDurationWithNanosecs(*jsnGeneralCfg.Response_cache_ttl); err != nil { - return err - } - } if jsnGeneralCfg.Reconnects != nil { gencfg.Reconnects = *jsnGeneralCfg.Reconnects } diff --git a/config/generalcfg_test.go b/config/generalcfg_test.go index 75f8c6840..002aef6cf 100644 --- a/config/generalcfg_test.go +++ b/config/generalcfg_test.go @@ -80,7 +80,6 @@ func TestGeneralCfgloadFromJsonCfg(t *testing.T) { Reconnects: -1, ConnectTimeout: time.Duration(1 * time.Second), ReplyTimeout: time.Duration(2 * time.Second), - ResponseCacheTTL: time.Duration(0), InternalTtl: time.Duration(2 * time.Minute), LockingTimeout: time.Duration(0), DigestSeparator: ",", diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 3364db6d5..1f01cc2e6 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -37,7 +37,6 @@ type GeneralJsonCfg struct { Reconnects *int Connect_timeout *string Reply_timeout *string - Response_cache_ttl *string Internal_ttl *string Locking_timeout *string Digest_separator *string diff --git a/engine/cdrs.go b/engine/cdrs.go index ee438b09d..67e0b9403 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -89,7 +89,6 @@ func NewCDRServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r rals: rater, attrS: attrS, statS: statS, thdS: thdS, chargerS: chargerS, guard: guardian.Guardian, - respCache: utils.NewResponseCache(cgrCfg.GeneralCfg().ResponseCacheTTL), httpPoster: NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, cgrCfg.GeneralCfg().ReplyTimeout), filterS: filterS} } @@ -105,7 +104,6 @@ type CDRServer struct { statS rpcclient.RpcClientConnection chargerS rpcclient.RpcClientConnection guard *guardian.GuardianLocker - respCache *utils.ResponseCache httpPoster *HTTPPoster // used for replication filterS *FilterS } @@ -412,15 +410,24 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) { if cdr.CGRID == utils.EmptyString { // Populate CGRID if not present cdr.ComputeCGRID() } - cacheKey := "V1ProcessCDR" + cdr.CGRID + cdr.RunID - if item, err := cdrS.respCache.Get(cacheKey); err == nil && item != nil { - if item.Err == nil { - *reply = *item.Value.(*string) + // RPC caching + if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessCDR, cdr.CGRID, cdr.RunID) + guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(cacheKey) + + if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *reply = *cachedResp.Result.(*string) + } + return cachedResp.Error } - return item.Err + defer Cache.Set(utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: reply, Error: err}, + nil, true, utils.NonTransactional) } - defer cdrS.respCache.Cache(cacheKey, - &utils.ResponseCacheItem{Value: reply, Err: err}) + // end of RPC caching if cdr.RequestType == utils.EmptyString { cdr.RequestType = cdrS.cgrCfg.GeneralCfg().DefaultReqType @@ -447,7 +454,8 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) { } if cdrS.attrS != nil { if err = cdrS.attrSProcessEvent(cgrEv); err != nil { - return utils.NewErrServerError(err) + err = utils.NewErrServerError(err) + return } } if cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs { // Store *raw CDR @@ -455,7 +463,8 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) { utils.Logger.Warning( fmt.Sprintf("<%s> storing primary CDR %+v, got error: %s", utils.CDRs, cdr, err.Error())) - return utils.NewErrServerError(err) // Cannot store CDR + err = utils.NewErrServerError(err) // Cannot store CDR + return } } if len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 { @@ -488,20 +497,43 @@ type ArgV2ProcessCDR struct { // V2ProcessCDR will process the CDR out of CGREvent func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err error) { + if arg.CGREvent.ID == "" { + arg.CGREvent.ID = utils.GenUUID() + } + // RPC caching + if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.CDRsV2ProcessCDR, arg.CGREvent.ID) + guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(cacheKey) + + if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *reply = *cachedResp.Result.(*string) + } + return cachedResp.Error + } + defer Cache.Set(utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: reply, Error: err}, + nil, true, utils.NonTransactional) + } + // end of RPC caching attrS := cdrS.attrS != nil if arg.AttributeS != nil { attrS = *arg.AttributeS } cgrEv := &arg.CGREvent if attrS { - if err := cdrS.attrSProcessEvent(cgrEv); err != nil { - return utils.NewErrServerError(err) + if err = cdrS.attrSProcessEvent(cgrEv); err != nil { + err = utils.NewErrServerError(err) + return } } - rawCDR, err := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, - cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone) - if err != nil { - return utils.NewErrServerError(err) + var rawCDR *CDR + if rawCDR, err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, + cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil { + err = utils.NewErrServerError(err) + return } store := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs if arg.Store != nil { @@ -509,7 +541,8 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er } if store { // Store *raw CDR if err = cdrS.cdrDb.SetCDR(rawCDR, false); err != nil { - return utils.NewErrServerError(err) // Cannot store CDR + err = utils.NewErrServerError(err) // Cannot store CDR + return } } export := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 @@ -546,26 +579,63 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er } // V1StoreSMCost handles storing of the cost into session_costs table -func (cdrS *CDRServer) V1StoreSessionCost(attr *AttrCDRSStoreSMCost, reply *string) error { +func (cdrS *CDRServer) V1StoreSessionCost(attr *AttrCDRSStoreSMCost, reply *string) (err error) { if attr.Cost.CGRID == "" { return utils.NewCGRError(utils.CDRSCtx, utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing), "SMCost: %+v with empty CGRID") } - if err := cdrS.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil { - return utils.NewErrServerError(err) + // RPC caching + if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, attr.Cost.CGRID, attr.Cost.RunID) + guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(cacheKey) + + if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *reply = *cachedResp.Result.(*string) + } + return cachedResp.Error + } + defer Cache.Set(utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: reply, Error: err}, + nil, true, utils.NonTransactional) + } + // end of RPC caching + if err = cdrS.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil { + err = utils.NewErrServerError(err) + return } *reply = utils.OK return nil } // V2StoreSessionCost will store the SessionCost into session_costs table -func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *string) error { +func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *string) (err error) { if args.Cost.CGRID == "" { return utils.NewCGRError(utils.CDRSCtx, utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing), "SMCost: %+v with empty CGRID") } + // RPC caching + if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, args.Cost.CGRID, args.Cost.RunID) + guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(cacheKey) + + if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *reply = *cachedResp.Result.(*string) + } + return cachedResp.Error + } + defer Cache.Set(utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: reply, Error: err}, + nil, true, utils.NonTransactional) + } + // end of RPC caching cc := args.Cost.CostDetails.AsCallCost() cc.Round() roundIncrements := cc.GetRoundIncrements() @@ -575,14 +645,14 @@ func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *st cd.RunID = args.Cost.RunID cd.Increments = roundIncrements var response float64 - if err := cdrS.rals.Call("Responder.RefundRounding", + if err := cdrS.rals.Call(utils.ResponderRefundRounding, cd, &response); err != nil { utils.Logger.Warning( fmt.Sprintf(" RefundRounding for cc: %+v, got error: %s", cc, err.Error())) } } - if err := cdrS.storeSMCost( + if err = cdrS.storeSMCost( &SMCost{ CGRID: args.Cost.CGRID, RunID: args.Cost.RunID, @@ -592,10 +662,11 @@ func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *st Usage: args.Cost.Usage, CostDetails: args.Cost.CostDetails}, args.CheckDuplicate); err != nil { - return utils.NewErrServerError(err) + err = utils.NewErrServerError(err) + return } *reply = utils.OK - return nil + return } @@ -609,6 +680,7 @@ type ArgRateCDRs struct { } // V1RateCDRs is used for re-/rate CDRs which are already stored within StorDB +// FixMe: add RPC caching func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) { cdrFltr, err := arg.RPCCDRsFilter.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone) if err != nil { diff --git a/engine/responder.go b/engine/responder.go index 42f787289..eb49de921 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -43,19 +43,6 @@ type Responder struct { Timeout time.Duration Timezone string MaxComputedUsage map[string]time.Duration - responseCache *utils.ResponseCache -} - -func (rs *Responder) SetTimeToLive(timeToLive time.Duration, out *int) error { - rs.responseCache = utils.NewResponseCache(timeToLive) - return nil -} - -func (rs *Responder) getCache() *utils.ResponseCache { - if rs.responseCache == nil { - rs.responseCache = utils.NewResponseCache(0) - } - return rs.responseCache } // usageAllowed checks requested usage against configured MaxComputedUsage diff --git a/sessions/sessions.go b/sessions/sessions.go index 97d89f948..4125325d6 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -108,7 +108,6 @@ func NewSessionS(cgrCfg *config.CGRConfig, ralS, resS, thdS, splS: splS, attrS: attrS, cdrS: cdrS, - respCache: utils.NewResponseCache(cgrCfg.GeneralCfg().ResponseCacheTTL), sReplConns: sReplConns, biJClnts: make(map[rpcclient.RpcClientConnection]string), biJIDs: make(map[string]*biJClient), @@ -139,8 +138,7 @@ type SessionS struct { attrS rpcclient.RpcClientConnection // AttributeS connections cdrS rpcclient.RpcClientConnection // CDR server connections - respCache *utils.ResponseCache // cache replies - sReplConns []*SReplConn // list of connections where we will replicate our session data + sReplConns []*SReplConn // list of connections where we will replicate our session data biJMux sync.RWMutex // mux protecting BI-JSON connections biJClnts map[rpcclient.RpcClientConnection]string // index BiJSONConnection so we can sync them later diff --git a/utils/consts.go b/utils/consts.go index b20ed064b..90166aa24 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -818,6 +818,8 @@ const ( CDRsV1CountCDRs = "CDRsV1.CountCDRs" CDRsV1RateCDRs = "CDRsV1.RateCDRs" CDRsV1GetCDRs = "CDRsV1.GetCDRs" + CDRsV1ProcessCDR = "CDRsV1.ProcessCDR" + CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost" CDRsV2ProcessCDR = "CDRsV2.ProcessCDR" CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost" ) diff --git a/utils/response_cache.go b/utils/response_cache.go deleted file mode 100644 index bdbcf3339..000000000 --- a/utils/response_cache.go +++ /dev/null @@ -1,97 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package utils - -import ( - "sync" - "time" -) - -type ResponseCacheItem struct { - Value interface{} - Err error -} - -type ResponseCache struct { - ttl time.Duration - cache map[string]*ResponseCacheItem - semaphore map[string]chan bool // used for waiting till the first goroutine processes the response - mu sync.RWMutex -} - -func NewResponseCache(ttl time.Duration) *ResponseCache { - return &ResponseCache{ - ttl: ttl, - cache: make(map[string]*ResponseCacheItem), - semaphore: make(map[string]chan bool), - mu: sync.RWMutex{}, - } -} - -func (rc *ResponseCache) Cache(key string, item *ResponseCacheItem) { - if rc.ttl == 0 { - return - } - rc.mu.Lock() - rc.cache[key] = item - if _, found := rc.semaphore[key]; found { - close(rc.semaphore[key]) // send release signal - delete(rc.semaphore, key) // delete key - } - rc.mu.Unlock() - go func() { - time.Sleep(rc.ttl) - rc.mu.Lock() - delete(rc.cache, key) - rc.mu.Unlock() - }() -} - -func (rc *ResponseCache) Get(key string) (*ResponseCacheItem, error) { - if rc.ttl == 0 { - return nil, ErrNotImplemented - } - rc.mu.RLock() - item, ok := rc.cache[key] - rc.mu.RUnlock() - if ok { - return item, nil - } - rc.wait(key) // wait for other goroutine processsing this key - rc.mu.RLock() - defer rc.mu.RUnlock() - item, ok = rc.cache[key] - if !ok { - return nil, ErrNotFound - } - return item, nil -} - -func (rc *ResponseCache) wait(key string) { - rc.mu.RLock() - lockChan, found := rc.semaphore[key] - rc.mu.RUnlock() - if found { - <-lockChan - } else { - rc.mu.Lock() - rc.semaphore[key] = make(chan bool) - rc.mu.Unlock() - } -} diff --git a/utils/response_cache_test.go b/utils/response_cache_test.go deleted file mode 100644 index 9390f8c2f..000000000 --- a/utils/response_cache_test.go +++ /dev/null @@ -1,44 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package utils - -import ( - "testing" - "time" -) - -func TestRCacheSetGet(t *testing.T) { - rc := NewResponseCache(5 * time.Second) - rc.Cache("test", &ResponseCacheItem{Value: "best"}) - v, err := rc.Get("test") - if err != nil || v.Value.(string) != "best" { - t.Error("Error retriving response cache: ", v, err) - } -} - -/* -func TestRCacheExpire(t *testing.T) { - rc := NewResponseCache(1 * time.Microsecond) - rc.Cache("test", &CacheItem{Value: "best"}) - time.Sleep(3 * time.Millisecond) - o, err := rc.Get("test") - if err == nil { - t.Error("Error expiring response cache: ", o) - } -} -*/