CDRs.processEvent with support for cached CDRIDs

This commit is contained in:
DanB
2019-12-09 18:36:56 +01:00
parent 2558d55607
commit 74ed5e9958
6 changed files with 72 additions and 51 deletions

View File

@@ -166,20 +166,20 @@ const CGRATES_CFG_JSON = `
"*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resource profiles caching
"*resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resources caching
"*event_resources": {"limit": -1, "ttl": "", "static_ttl": false}, // matching resources to events
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueue profiles
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueues with metrics
"*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control threshold profiles caching
"*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control thresholds caching
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueue profiles
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueues with metrics
"*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control threshold profiles caching
"*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control thresholds caching
"*filters": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control filters caching
"*supplier_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control supplier profile caching
"*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control attribute profile caching
"*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control attribute profile caching
"*charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control charger profile caching
"*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control dispatcher profile caching
"*dispatcher_hosts": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control dispatcher hosts caching
"*resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control resource filter indexes caching
"*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control stat filter indexes caching
"*resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control resource filter indexes caching
"*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control stat filter indexes caching
"*threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control threshold filter indexes caching
"*supplier_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control supplier filter indexes caching
"*supplier_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control supplier filter indexes caching
"*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control attribute filter indexes caching
"*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control charger filter indexes caching
"*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher filter indexes caching
@@ -187,6 +187,7 @@ const CGRATES_CFG_JSON = `
"*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false}, // diameter messages caching
"*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false}, // RPC responses caching
"*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false}, // closed sessions cached for CDRs
"*cdr_ids": {"limit": -1, "ttl": "", "static_ttl": false}, // protects CDRs against double-charging
"*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control the load_ids for items
"*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false}, // RPC connections caching
},

View File

@@ -159,6 +159,8 @@ func TestCacheJsonCfg(t *testing.T) {
Ttl: utils.StringPointer("2s"), Static_ttl: utils.BoolPointer(false)},
utils.CacheClosedSessions: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("10s"), Static_ttl: utils.BoolPointer(false)},
utils.CacheCDRIDs: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheLoadIDs: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},

View File

@@ -729,6 +729,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(2 * time.Second), StaticTTL: false},
utils.CacheClosedSessions: &CacheParamCfg{Limit: -1,
TTL: time.Duration(10 * time.Second), StaticTTL: false},
utils.CacheCDRIDs: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false},
utils.CacheLoadIDs: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheRPCConnections: &CacheParamCfg{Limit: -1,

View File

@@ -34,42 +34,6 @@ func init() {
InitCache(nil)
}
var precachedPartitions = utils.StringMap{
utils.CacheDestinations: true,
utils.CacheReverseDestinations: true,
utils.CacheRatingPlans: true,
utils.CacheRatingProfiles: true,
utils.CacheActions: true,
utils.CacheActionPlans: true,
utils.CacheAccountActionPlans: true,
utils.CacheActionTriggers: true,
utils.CacheSharedGroups: true,
utils.CacheResourceProfiles: true,
utils.CacheResources: true,
utils.CacheStatQueueProfiles: true,
utils.CacheStatQueues: true,
utils.CacheThresholdProfiles: true,
utils.CacheThresholds: true,
utils.CacheFilters: true,
utils.CacheSupplierProfiles: true,
utils.CacheAttributeProfiles: true,
utils.CacheChargerProfiles: true,
utils.CacheDispatcherProfiles: true,
utils.CacheDispatcherHosts: true,
utils.CacheAttributeFilterIndexes: true,
utils.CacheResourceFilterIndexes: true,
utils.CacheStatFilterIndexes: true,
utils.CacheThresholdFilterIndexes: true,
utils.CacheSupplierFilterIndexes: true,
utils.CacheChargerFilterIndexes: true,
utils.CacheDispatcherFilterIndexes: true,
utils.CacheDiameterMessages: true,
utils.CacheTimings: true,
utils.CacheLoadIDs: true,
}
// InitCache will instantiate the cache with specific or default configuraiton
func InitCache(cfg config.CacheCfg) {
if cfg == nil {
@@ -84,9 +48,6 @@ func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) {
c = &CacheS{cfg: cfg, dm: dm,
pcItems: make(map[string]chan struct{})}
for cacheID := range cfg.CacheCfg() {
if !precachedPartitions.HasKey(cacheID) {
continue
}
c.pcItems[cacheID] = make(chan struct{})
}
return
@@ -110,9 +71,6 @@ func (chS *CacheS) Precache() (err error) {
errChan := make(chan error)
doneChan := make(chan struct{})
for cacheID, cacheCfg := range chS.cfg.CacheCfg() {
if !precachedPartitions.HasKey(cacheID) {
continue
}
if !cacheCfg.Precache {
close(chS.pcItems[cacheID]) // no need of precache
continue
@@ -197,7 +155,7 @@ func (chS *CacheS) V1GetCacheStats(args *utils.AttrCacheIDsWithArgDispatcher,
func (chS *CacheS) V1PrecacheStatus(args *utils.AttrCacheIDsWithArgDispatcher, rply *map[string]string) (err error) {
if len(args.CacheIDs) == 0 {
for cacheID := range precachedPartitions {
for cacheID := range utils.CachePartitions {
args.CacheIDs = append(args.CacheIDs, cacheID)
}
}

View File

@@ -477,6 +477,23 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher,
}
}
}
// Check if the unique ID was not already processed
for _, cgrEv := range cgrEvs {
me := MapEvent(cgrEv.CGREvent.Event)
uID := utils.ConcatenatedKey(
me.GetStringIgnoreErrors(utils.CGRID),
me.GetStringIgnoreErrors(utils.RunID),
)
if Cache.HasItem(utils.CacheCDRIDs, uID) {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), cgrEv, utils.CacheS))
return utils.ErrExists
}
Cache.Set(utils.CacheCDRIDs, uID, true, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
// Populate CDR list out of events
cdrs := make([]*CDR, len(cgrEvs))
if ralS || store || reRate || export {
for i, cgrEv := range cgrEvs {

View File

@@ -58,6 +58,46 @@ var (
MetaFileCSV: CSVSuffix,
MetaFileFWV: FWVSuffix,
}
// CachePartitions enables creation of cache partitions
CachePartitions = StringMap{
CacheDestinations: true,
CacheReverseDestinations: true,
CacheRatingPlans: true,
CacheRatingProfiles: true,
CacheActions: true,
CacheActionPlans: true,
CacheAccountActionPlans: true,
CacheActionTriggers: true,
CacheSharedGroups: true,
CacheTimings: true,
CacheResourceProfiles: true,
CacheResources: true,
CacheEventResources: true,
CacheStatQueueProfiles: true,
CacheStatQueues: true,
CacheThresholdProfiles: true,
CacheThresholds: true,
CacheFilters: true,
CacheSupplierProfiles: true,
CacheAttributeProfiles: true,
CacheChargerProfiles: true,
CacheDispatcherProfiles: true,
CacheDispatcherHosts: true,
CacheResourceFilterIndexes: true,
CacheStatFilterIndexes: true,
CacheThresholdFilterIndexes: true,
CacheSupplierFilterIndexes: true,
CacheAttributeFilterIndexes: true,
CacheChargerFilterIndexes: true,
CacheDispatcherFilterIndexes: true,
CacheDispatcherRoutes: true,
CacheDiameterMessages: true,
CacheRPCResponses: true,
CacheClosedSessions: true,
CacheCDRIDs: true,
CacheLoadIDs: true,
CacheRPCConnections: true,
}
CacheInstanceToPrefix = map[string]string{
CacheDestinations: DESTINATION_PREFIX,
CacheReverseDestinations: REVERSE_DESTINATION_PREFIX,
@@ -1447,6 +1487,7 @@ const (
CacheLoadIDs = "*load_ids"
CacheAccounts = "*accounts"
CacheRPCConnections = "*rpc_connections"
CacheCDRIDs = "*cdr_ids"
)
// Prefix for indexing