From 74ed5e99586c02c87182d5edc76eec03e5de0951 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 9 Dec 2019 18:36:56 +0100 Subject: [PATCH] CDRs.processEvent with support for cached CDRIDs --- config/config_defaults.go | 17 ++++++++------- config/config_json_test.go | 2 ++ config/config_test.go | 2 ++ engine/caches.go | 44 +------------------------------------- engine/cdrs.go | 17 +++++++++++++++ utils/consts.go | 41 +++++++++++++++++++++++++++++++++++ 6 files changed, 72 insertions(+), 51 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 15263330e..5c8283b47 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -166,20 +166,20 @@ const CGRATES_CFG_JSON = ` "*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resource profiles caching "*resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resources caching "*event_resources": {"limit": -1, "ttl": "", "static_ttl": false}, // matching resources to events - "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueue profiles - "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueues with metrics - "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control threshold profiles caching - "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control thresholds caching + "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueue profiles + "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueues with metrics + "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control threshold profiles caching + "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control thresholds caching "*filters": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control filters caching "*supplier_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control supplier profile caching - "*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control attribute profile caching + "*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control attribute profile caching "*charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control charger profile caching "*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control dispatcher profile caching "*dispatcher_hosts": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control dispatcher hosts caching - "*resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control resource filter indexes caching - "*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control stat filter indexes caching + "*resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control resource filter indexes caching + "*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control stat filter indexes caching "*threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control threshold filter indexes caching - "*supplier_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control supplier filter indexes caching + "*supplier_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control supplier filter indexes caching "*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control attribute filter indexes caching "*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control charger filter indexes caching "*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher filter indexes caching @@ -187,6 +187,7 @@ const CGRATES_CFG_JSON = ` "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false}, // diameter messages caching "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false}, // RPC responses caching "*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false}, // closed sessions cached for CDRs + "*cdr_ids": {"limit": -1, "ttl": "", "static_ttl": false}, // protects CDRs against double-charging "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control the load_ids for items "*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false}, // RPC connections caching }, diff --git a/config/config_json_test.go b/config/config_json_test.go index eb1fe830a..d2d4b01e8 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -159,6 +159,8 @@ func TestCacheJsonCfg(t *testing.T) { Ttl: utils.StringPointer("2s"), Static_ttl: utils.BoolPointer(false)}, utils.CacheClosedSessions: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("10s"), Static_ttl: utils.BoolPointer(false)}, + utils.CacheCDRIDs: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), + Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)}, utils.CacheLoadIDs: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false), Precache: utils.BoolPointer(false)}, diff --git a/config/config_test.go b/config/config_test.go index 94039d1a3..18e3369cf 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -729,6 +729,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) { TTL: time.Duration(2 * time.Second), StaticTTL: false}, utils.CacheClosedSessions: &CacheParamCfg{Limit: -1, TTL: time.Duration(10 * time.Second), StaticTTL: false}, + utils.CacheCDRIDs: &CacheParamCfg{Limit: -1, + TTL: time.Duration(0), StaticTTL: false}, utils.CacheLoadIDs: &CacheParamCfg{Limit: -1, TTL: time.Duration(0), StaticTTL: false, Precache: false}, utils.CacheRPCConnections: &CacheParamCfg{Limit: -1, diff --git a/engine/caches.go b/engine/caches.go index da7d9425d..e771b6af6 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -34,42 +34,6 @@ func init() { InitCache(nil) } -var precachedPartitions = utils.StringMap{ - utils.CacheDestinations: true, - utils.CacheReverseDestinations: true, - utils.CacheRatingPlans: true, - utils.CacheRatingProfiles: true, - utils.CacheActions: true, - utils.CacheActionPlans: true, - utils.CacheAccountActionPlans: true, - utils.CacheActionTriggers: true, - utils.CacheSharedGroups: true, - utils.CacheResourceProfiles: true, - utils.CacheResources: true, - utils.CacheStatQueueProfiles: true, - utils.CacheStatQueues: true, - utils.CacheThresholdProfiles: true, - utils.CacheThresholds: true, - utils.CacheFilters: true, - utils.CacheSupplierProfiles: true, - utils.CacheAttributeProfiles: true, - utils.CacheChargerProfiles: true, - utils.CacheDispatcherProfiles: true, - utils.CacheDispatcherHosts: true, - - utils.CacheAttributeFilterIndexes: true, - utils.CacheResourceFilterIndexes: true, - utils.CacheStatFilterIndexes: true, - utils.CacheThresholdFilterIndexes: true, - utils.CacheSupplierFilterIndexes: true, - utils.CacheChargerFilterIndexes: true, - utils.CacheDispatcherFilterIndexes: true, - - utils.CacheDiameterMessages: true, - utils.CacheTimings: true, - utils.CacheLoadIDs: true, -} - // InitCache will instantiate the cache with specific or default configuraiton func InitCache(cfg config.CacheCfg) { if cfg == nil { @@ -84,9 +48,6 @@ func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) { c = &CacheS{cfg: cfg, dm: dm, pcItems: make(map[string]chan struct{})} for cacheID := range cfg.CacheCfg() { - if !precachedPartitions.HasKey(cacheID) { - continue - } c.pcItems[cacheID] = make(chan struct{}) } return @@ -110,9 +71,6 @@ func (chS *CacheS) Precache() (err error) { errChan := make(chan error) doneChan := make(chan struct{}) for cacheID, cacheCfg := range chS.cfg.CacheCfg() { - if !precachedPartitions.HasKey(cacheID) { - continue - } if !cacheCfg.Precache { close(chS.pcItems[cacheID]) // no need of precache continue @@ -197,7 +155,7 @@ func (chS *CacheS) V1GetCacheStats(args *utils.AttrCacheIDsWithArgDispatcher, func (chS *CacheS) V1PrecacheStatus(args *utils.AttrCacheIDsWithArgDispatcher, rply *map[string]string) (err error) { if len(args.CacheIDs) == 0 { - for cacheID := range precachedPartitions { + for cacheID := range utils.CachePartitions { args.CacheIDs = append(args.CacheIDs, cacheID) } } diff --git a/engine/cdrs.go b/engine/cdrs.go index 35688559e..d0ea67e25 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -477,6 +477,23 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher, } } } + // Check if the unique ID was not already processed + for _, cgrEv := range cgrEvs { + me := MapEvent(cgrEv.CGREvent.Event) + uID := utils.ConcatenatedKey( + me.GetStringIgnoreErrors(utils.CGRID), + me.GetStringIgnoreErrors(utils.RunID), + ) + if Cache.HasItem(utils.CacheCDRIDs, uID) { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> processing event %+v with %s", + utils.CDRs, err.Error(), cgrEv, utils.CacheS)) + return utils.ErrExists + } + Cache.Set(utils.CacheCDRIDs, uID, true, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + // Populate CDR list out of events cdrs := make([]*CDR, len(cgrEvs)) if ralS || store || reRate || export { for i, cgrEv := range cgrEvs { diff --git a/utils/consts.go b/utils/consts.go index 6fb91b501..90b7fe072 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -58,6 +58,46 @@ var ( MetaFileCSV: CSVSuffix, MetaFileFWV: FWVSuffix, } + // CachePartitions enables creation of cache partitions + CachePartitions = StringMap{ + CacheDestinations: true, + CacheReverseDestinations: true, + CacheRatingPlans: true, + CacheRatingProfiles: true, + CacheActions: true, + CacheActionPlans: true, + CacheAccountActionPlans: true, + CacheActionTriggers: true, + CacheSharedGroups: true, + CacheTimings: true, + CacheResourceProfiles: true, + CacheResources: true, + CacheEventResources: true, + CacheStatQueueProfiles: true, + CacheStatQueues: true, + CacheThresholdProfiles: true, + CacheThresholds: true, + CacheFilters: true, + CacheSupplierProfiles: true, + CacheAttributeProfiles: true, + CacheChargerProfiles: true, + CacheDispatcherProfiles: true, + CacheDispatcherHosts: true, + CacheResourceFilterIndexes: true, + CacheStatFilterIndexes: true, + CacheThresholdFilterIndexes: true, + CacheSupplierFilterIndexes: true, + CacheAttributeFilterIndexes: true, + CacheChargerFilterIndexes: true, + CacheDispatcherFilterIndexes: true, + CacheDispatcherRoutes: true, + CacheDiameterMessages: true, + CacheRPCResponses: true, + CacheClosedSessions: true, + CacheCDRIDs: true, + CacheLoadIDs: true, + CacheRPCConnections: true, + } CacheInstanceToPrefix = map[string]string{ CacheDestinations: DESTINATION_PREFIX, CacheReverseDestinations: REVERSE_DESTINATION_PREFIX, @@ -1447,6 +1487,7 @@ const ( CacheLoadIDs = "*load_ids" CacheAccounts = "*accounts" CacheRPCConnections = "*rpc_connections" + CacheCDRIDs = "*cdr_ids" ) // Prefix for indexing