mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
514 lines
16 KiB
Go
514 lines
16 KiB
Go
/*
|
|
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
|
Copyright (C) ITsysCOM GmbH
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>
|
|
*/
|
|
|
|
package engine
|
|
|
|
import (
|
|
"encoding/gob"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/cgrates/birpc/context"
|
|
"github.com/cgrates/cgrates/config"
|
|
"github.com/cgrates/cgrates/utils"
|
|
"github.com/cgrates/ltcache"
|
|
)
|
|
|
|
var Cache *CacheS
|
|
|
|
func init() {
|
|
Cache = NewCacheS(config.CgrConfig(), nil, nil, nil)
|
|
|
|
// Register objects for cache replication/remotes
|
|
// AttributeS
|
|
gob.Register(new(utils.AttributeProfile))
|
|
gob.Register(new(utils.AttributeProfileWithAPIOpts))
|
|
gob.Register(new(utils.TPAttributeProfile))
|
|
// Threshold
|
|
gob.Register(new(Threshold))
|
|
gob.Register(new(ThresholdProfile))
|
|
gob.Register(new(ThresholdProfileWithAPIOpts))
|
|
gob.Register(new(ThresholdWithAPIOpts))
|
|
gob.Register(new(utils.TPThresholdProfile))
|
|
// Resource
|
|
gob.Register(new(utils.Resource))
|
|
gob.Register(new(utils.ResourceProfile))
|
|
gob.Register(new(utils.ResourceProfileWithAPIOpts))
|
|
gob.Register(new(utils.ResourceWithAPIOpts))
|
|
gob.Register(new(utils.TPResourceProfile))
|
|
// Stats
|
|
gob.Register(new(StatQueue))
|
|
gob.Register(new(StatQueueProfile))
|
|
gob.Register(new(StatQueueProfileWithAPIOpts))
|
|
gob.Register(new(StoredStatQueue))
|
|
gob.Register(new(StatQueueProfileWithAPIOpts))
|
|
gob.Register(new(utils.TPStatProfile))
|
|
// RankingS
|
|
gob.Register(new(utils.Ranking))
|
|
gob.Register(new(utils.RankingProfile))
|
|
gob.Register(new(utils.TPRankingProfile))
|
|
// RouteS
|
|
gob.Register(new(utils.RouteProfile))
|
|
gob.Register(new(utils.RouteProfileWithAPIOpts))
|
|
gob.Register(new(utils.TPRouteProfile))
|
|
// TrendS
|
|
gob.Register(new(utils.Trend))
|
|
gob.Register(new(utils.TrendProfile))
|
|
gob.Register(new(utils.TPTrendsProfile))
|
|
// Filters
|
|
gob.Register(new(Filter))
|
|
gob.Register(new(FilterWithAPIOpts))
|
|
gob.Register(new(utils.TPFilterProfile))
|
|
// DispatcherS
|
|
gob.Register(new(DispatcherHost))
|
|
gob.Register(new(DispatcherHostProfile))
|
|
gob.Register(new(DispatcherHostWithAPIOpts))
|
|
// RateProfiles
|
|
gob.Register(new(utils.RateProfile))
|
|
gob.Register(new(utils.RateProfileWithAPIOpts))
|
|
// ActionProfiles
|
|
gob.Register(new(utils.ActionProfile))
|
|
gob.Register(new(utils.ActionProfileWithAPIOpts))
|
|
// Account
|
|
gob.Register(new(utils.Account))
|
|
// CDR
|
|
gob.Register(new(utils.CGREvent))
|
|
// ChargerS
|
|
gob.Register(new(utils.ChargerProfile))
|
|
gob.Register(new(utils.TPChargerProfile))
|
|
// StatMetrics
|
|
gob.Register(new(StatASR))
|
|
gob.Register(new(StatACD))
|
|
gob.Register(new(StatTCD))
|
|
gob.Register(new(StatACC))
|
|
gob.Register(new(StatTCC))
|
|
gob.Register(new(StatPDD))
|
|
gob.Register(new(StatDDC))
|
|
gob.Register(new(StatSum))
|
|
gob.Register(new(StatAverage))
|
|
gob.Register(new(StatDistinct))
|
|
// others
|
|
gob.Register([]any{})
|
|
gob.Register([]byte{})
|
|
gob.Register([]map[string]any{})
|
|
gob.Register(map[string]int64{})
|
|
gob.Register(Versions{})
|
|
gob.Register(map[string]any{})
|
|
gob.Register(map[string][]map[string]any{})
|
|
gob.Register(map[string]string{})
|
|
gob.Register(time.Duration(0))
|
|
gob.Register(time.Time{})
|
|
gob.Register(url.Values{})
|
|
gob.Register(json.RawMessage{})
|
|
|
|
gob.Register(new(utils.ArgCacheReplicateSet))
|
|
gob.Register(new(utils.ArgCacheReplicateRemove))
|
|
|
|
gob.Register(utils.StringSet{})
|
|
}
|
|
|
|
// NewCacheS initializes the Cache service and executes the precaching
|
|
func NewCacheS(cfg *config.CGRConfig, dm *DataManager, connMgr *ConnManager, cpS *CapsStats) (c *CacheS) {
|
|
tCache := cfg.CacheCfg().AsTransCacheConfig()
|
|
if len(cfg.CacheCfg().ReplicationConns) != 0 {
|
|
var reply string
|
|
for k, val := range tCache {
|
|
if !cfg.CacheCfg().Partitions[k].Replicate ||
|
|
k == utils.CacheCapsEvents {
|
|
continue
|
|
}
|
|
val.OnEvicted = []func(itmID string, value interface{}){
|
|
func(itmID string, value any) {
|
|
if err := connMgr.Call(context.TODO(), cfg.CacheCfg().ReplicationConns, utils.CacheSv1ReplicateRemove,
|
|
&utils.ArgCacheReplicateRemove{
|
|
CacheID: k,
|
|
ItemID: itmID,
|
|
}, &reply); err != nil {
|
|
utils.Logger.Warning(fmt.Sprintf("error: %+v when autoexpired item: %+v from: %+v", err, itmID, k))
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
if _, has := tCache[utils.CacheCapsEvents]; has && cpS != nil {
|
|
tCache[utils.CacheCapsEvents].OnEvicted = []func(itmID string, value interface{}){cpS.OnEvict}
|
|
}
|
|
c = &CacheS{
|
|
cfg: cfg,
|
|
dm: dm,
|
|
connMgr: connMgr,
|
|
pcItems: make(map[string]chan struct{}),
|
|
tCache: ltcache.NewTransCache(tCache),
|
|
}
|
|
for cacheID := range cfg.CacheCfg().Partitions {
|
|
c.pcItems[cacheID] = make(chan struct{})
|
|
}
|
|
return
|
|
}
|
|
|
|
// CacheS deals with cache preload and other cache related tasks/APIs
|
|
type CacheS struct {
|
|
cfg *config.CGRConfig
|
|
dm *DataManager
|
|
connMgr *ConnManager
|
|
pcItems map[string]chan struct{} // signal precaching
|
|
tCache *ltcache.TransCache
|
|
}
|
|
|
|
// Set is an exported method from TransCache
|
|
// handled Replicate functionality
|
|
func (chS *CacheS) Set(ctx *context.Context, chID, itmID string, value any,
|
|
groupIDs []string, commit bool, transID string) (err error) {
|
|
chS.tCache.Set(chID, itmID, value, groupIDs, commit, transID)
|
|
return chS.ReplicateSet(ctx, chID, itmID, value)
|
|
}
|
|
|
|
// ReplicateSet replicate an item to ReplicationConns
|
|
func (chS *CacheS) ReplicateSet(ctx *context.Context, chID, itmID string, value any) (err error) {
|
|
if len(chS.cfg.CacheCfg().ReplicationConns) == 0 ||
|
|
!chS.cfg.CacheCfg().Partitions[chID].Replicate {
|
|
return
|
|
}
|
|
var reply string
|
|
return connMgr.Call(ctx, chS.cfg.CacheCfg().ReplicationConns, utils.CacheSv1ReplicateSet,
|
|
&utils.ArgCacheReplicateSet{
|
|
CacheID: chID,
|
|
ItemID: itmID,
|
|
Value: value,
|
|
}, &reply)
|
|
}
|
|
|
|
// SetWithoutReplicate is an exported method from TransCache
|
|
// handled Replicate functionality
|
|
func (chS *CacheS) SetWithoutReplicate(chID, itmID string, value any,
|
|
groupIDs []string, commit bool, transID string) {
|
|
chS.tCache.Set(chID, itmID, value, groupIDs, commit, transID)
|
|
}
|
|
|
|
// SetWithReplicate combines local set with replicate, receiving the arguments needed by dispatcher
|
|
func (chS *CacheS) SetWithReplicate(ctx *context.Context, args *utils.ArgCacheReplicateSet) (err error) {
|
|
chS.tCache.Set(args.CacheID, args.ItemID, args.Value, args.GroupIDs, true, utils.EmptyString)
|
|
if len(chS.cfg.CacheCfg().ReplicationConns) == 0 ||
|
|
!chS.cfg.CacheCfg().Partitions[args.CacheID].Replicate {
|
|
return
|
|
}
|
|
var reply string
|
|
return chS.connMgr.Call(ctx, chS.cfg.CacheCfg().ReplicationConns,
|
|
utils.CacheSv1ReplicateSet, args, &reply)
|
|
}
|
|
|
|
// HasItem is an exported method from TransCache
|
|
func (chS *CacheS) HasItem(chID, itmID string) (has bool) {
|
|
return chS.tCache.HasItem(chID, itmID)
|
|
}
|
|
|
|
// Get is an exported method from TransCache
|
|
func (chS *CacheS) Get(chID, itmID string) (any, bool) {
|
|
return chS.tCache.Get(chID, itmID)
|
|
}
|
|
|
|
// GetWithRemote queries locally the cache, followed by remotes
|
|
func (chS *CacheS) GetWithRemote(ctx *context.Context, args *utils.ArgsGetCacheItemWithAPIOpts) (itm any, err error) {
|
|
var has bool
|
|
if itm, has = chS.tCache.Get(args.CacheID, args.ItemID); has {
|
|
return
|
|
}
|
|
if len(chS.cfg.CacheCfg().RemoteConns) == 0 ||
|
|
!chS.cfg.CacheCfg().Partitions[args.CacheID].Remote {
|
|
return nil, utils.ErrNotFound
|
|
}
|
|
// item was not found locally, query from remote
|
|
if err = chS.connMgr.Call(ctx, chS.cfg.CacheCfg().RemoteConns,
|
|
utils.CacheSv1GetItem, args, &itm); err != nil &&
|
|
err.Error() == utils.ErrNotFound.Error() {
|
|
return nil, utils.ErrNotFound // correct the error coming as string type
|
|
}
|
|
return
|
|
}
|
|
|
|
// GetItemIDs is an exported method from TransCache
|
|
func (chS *CacheS) GetItemIDs(chID, prfx string) (itmIDs []string) {
|
|
return chS.tCache.GetItemIDs(chID, prfx)
|
|
}
|
|
|
|
// Remove is an exported method from TransCache
|
|
func (chS *CacheS) Remove(ctx *context.Context, chID, itmID string, commit bool, transID string) (err error) {
|
|
chS.tCache.Remove(chID, itmID, commit, transID)
|
|
if err := chS.ReplicateRemove(ctx, chID, itmID); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RemoveWithoutReplicate is an exported method from TransCache
|
|
func (chS *CacheS) RemoveWithoutReplicate(chID, itmID string, commit bool, transID string) {
|
|
chS.tCache.Remove(chID, itmID, commit, transID)
|
|
}
|
|
|
|
// Clear is an exported method from TransCache
|
|
func (chS *CacheS) Clear(chIDs []string) {
|
|
chS.tCache.Clear(chIDs)
|
|
}
|
|
|
|
// BeginTransaction is an exported method from TransCache
|
|
func (chS *CacheS) BeginTransaction() string {
|
|
return chS.tCache.BeginTransaction()
|
|
}
|
|
|
|
// RollbackTransaction is an exported method from TransCache
|
|
func (chS *CacheS) RollbackTransaction(transID string) {
|
|
chS.tCache.RollbackTransaction(transID)
|
|
}
|
|
|
|
// CommitTransaction is an exported method from TransCache
|
|
func (chS *CacheS) CommitTransaction(transID string) {
|
|
chS.tCache.CommitTransaction(transID)
|
|
}
|
|
|
|
// GetPrecacheChannel returns the channel used to signal precaching
|
|
func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} {
|
|
return chS.pcItems[chID]
|
|
}
|
|
|
|
// Precache loads data from DataDB into cache at engine start
|
|
func (chS *CacheS) Precache(shutdown *utils.SyncedChan) {
|
|
for cacheID, cacheCfg := range chS.cfg.CacheCfg().Partitions {
|
|
if !cacheCfg.Precache {
|
|
close(chS.pcItems[cacheID]) // no need of precache
|
|
continue
|
|
}
|
|
go func(cacheID string) {
|
|
err := chS.dm.CacheDataFromDB(context.TODO(),
|
|
utils.CacheInstanceToPrefix[cacheID],
|
|
[]string{utils.MetaAny},
|
|
false)
|
|
if err != nil && err != context.Canceled {
|
|
utils.Logger.Crit(fmt.Sprintf("<%s> precaching cacheID <%s>, got error: %s", utils.CacheS, cacheID, err))
|
|
shutdown.CloseOnce()
|
|
return
|
|
}
|
|
close(chS.pcItems[cacheID])
|
|
}(cacheID)
|
|
}
|
|
}
|
|
|
|
// APIs start here
|
|
func (chS *CacheS) V1GetItemIDs(_ *context.Context, args *utils.ArgsGetCacheItemIDsWithAPIOpts,
|
|
reply *[]string) (err error) {
|
|
itmIDs := chS.tCache.GetItemIDs(args.CacheID, args.ItemIDPrefix)
|
|
if len(itmIDs) == 0 {
|
|
return utils.ErrNotFound
|
|
}
|
|
*reply = itmIDs
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1HasItem(_ *context.Context, args *utils.ArgsGetCacheItemWithAPIOpts,
|
|
reply *bool) (err error) {
|
|
*reply = chS.tCache.HasItem(args.CacheID, args.ItemID)
|
|
return
|
|
}
|
|
|
|
// V1GetItem returns a single item from the cache
|
|
func (chS *CacheS) V1GetItem(_ *context.Context, args *utils.ArgsGetCacheItemWithAPIOpts,
|
|
reply *any) (err error) {
|
|
itmIface, has := chS.tCache.Get(args.CacheID, args.ItemID)
|
|
if !has {
|
|
return utils.ErrNotFound
|
|
}
|
|
*reply = itmIface
|
|
return
|
|
}
|
|
|
|
// V1GetItemWithRemote queries the item from remote if not found locally
|
|
func (chS *CacheS) V1GetItemWithRemote(ctx *context.Context, args *utils.ArgsGetCacheItemWithAPIOpts,
|
|
reply *any) (err error) {
|
|
var itmIface any
|
|
if itmIface, err = chS.GetWithRemote(ctx, args); err != nil {
|
|
return
|
|
}
|
|
*reply = itmIface
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1GetItemExpiryTime(_ *context.Context, args *utils.ArgsGetCacheItemWithAPIOpts,
|
|
reply *time.Time) (err error) {
|
|
expTime, has := chS.tCache.GetItemExpiryTime(args.CacheID, args.ItemID)
|
|
if !has {
|
|
return utils.ErrNotFound
|
|
}
|
|
*reply = expTime
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1RemoveItem(_ *context.Context, args *utils.ArgsGetCacheItemWithAPIOpts,
|
|
reply *string) (err error) {
|
|
chS.tCache.Remove(args.CacheID, args.ItemID, true, utils.NonTransactional)
|
|
*reply = utils.OK
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1RemoveItems(_ *context.Context, args *utils.AttrReloadCacheWithAPIOpts,
|
|
reply *string) (err error) {
|
|
for cacheID, ids := range args.Map() {
|
|
for _, id := range ids {
|
|
chS.tCache.Remove(cacheID, id, true, utils.NonTransactional)
|
|
}
|
|
}
|
|
*reply = utils.OK
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1Clear(ctx *context.Context, args *utils.AttrCacheIDsWithAPIOpts,
|
|
reply *string) (err error) {
|
|
chS.tCache.Clear(args.CacheIDs)
|
|
*reply = utils.OK
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1GetCacheStats(ctx *context.Context, args *utils.AttrCacheIDsWithAPIOpts,
|
|
rply *map[string]*ltcache.CacheStats) (err error) {
|
|
cs := chS.tCache.GetCacheStats(args.CacheIDs)
|
|
*rply = cs
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1PrecacheStatus(_ *context.Context, args *utils.AttrCacheIDsWithAPIOpts, rply *map[string]string) (err error) {
|
|
if len(args.CacheIDs) == 0 {
|
|
args.CacheIDs = utils.CachePartitions.AsSlice()
|
|
}
|
|
pCacheStatus := make(map[string]string)
|
|
for _, cacheID := range args.CacheIDs {
|
|
if _, has := chS.pcItems[cacheID]; !has && cacheID != utils.CacheTrendProfiles {
|
|
return fmt.Errorf("unknown cacheID: %s", cacheID)
|
|
}
|
|
select {
|
|
case <-chS.GetPrecacheChannel(cacheID):
|
|
pCacheStatus[cacheID] = utils.MetaReady
|
|
default:
|
|
pCacheStatus[cacheID] = utils.MetaPrecaching
|
|
}
|
|
}
|
|
*rply = pCacheStatus
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1HasGroup(_ *context.Context, args *utils.ArgsGetGroupWithAPIOpts,
|
|
rply *bool) (err error) {
|
|
*rply = chS.tCache.HasGroup(args.CacheID, args.GroupID)
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1GetGroupItemIDs(_ *context.Context, args *utils.ArgsGetGroupWithAPIOpts,
|
|
rply *[]string) (err error) {
|
|
if has := chS.tCache.HasGroup(args.CacheID, args.GroupID); !has {
|
|
return utils.ErrNotFound
|
|
}
|
|
*rply = chS.tCache.GetGroupItemIDs(args.CacheID, args.GroupID)
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1RemoveGroup(_ *context.Context, args *utils.ArgsGetGroupWithAPIOpts,
|
|
rply *string) (err error) {
|
|
chS.tCache.RemoveGroup(args.CacheID, args.GroupID, true, utils.NonTransactional)
|
|
*rply = utils.OK
|
|
return
|
|
}
|
|
|
|
func (chS *CacheS) V1ReloadCache(ctx *context.Context, attrs *utils.AttrReloadCacheWithAPIOpts, reply *string) (err error) {
|
|
return chS.cacheDataFromDB(ctx, attrs, reply, true)
|
|
}
|
|
|
|
func (chS *CacheS) V1LoadCache(ctx *context.Context, attrs *utils.AttrReloadCacheWithAPIOpts, reply *string) (err error) {
|
|
return chS.cacheDataFromDB(ctx, attrs, reply, false)
|
|
}
|
|
|
|
func (chS *CacheS) cacheDataFromDB(ctx *context.Context, attrs *utils.AttrReloadCacheWithAPIOpts, reply *string, mustBeCached bool) (err error) {
|
|
argCache := attrs.Map()
|
|
for key, ids := range argCache {
|
|
if prfx, has := utils.CacheInstanceToPrefix[key]; has {
|
|
if err = chS.dm.CacheDataFromDB(ctx, prfx, ids, mustBeCached); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
//get loadIDs from database for all types
|
|
var loadIDs map[string]int64
|
|
if loadIDs, err = chS.dm.GetItemLoadIDs(ctx, utils.EmptyString, false); err != nil {
|
|
if err != utils.ErrNotFound { // we can receive cache reload from LoaderS and we store the LoadID only after all Items was processed
|
|
return
|
|
}
|
|
err = nil
|
|
loadIDs = make(map[string]int64)
|
|
}
|
|
for key, val := range populateCacheLoadIDs(loadIDs, argCache) {
|
|
chS.tCache.Set(utils.CacheLoadIDs, key, val, nil,
|
|
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
|
}
|
|
*reply = utils.OK
|
|
return
|
|
}
|
|
|
|
// populateCacheLoadIDs populate cacheLoadIDs based on attrs
|
|
func populateCacheLoadIDs(loadIDs map[string]int64, attrs map[string][]string) (cacheLoadIDs map[string]int64) {
|
|
cacheLoadIDs = make(map[string]int64)
|
|
//based on IDs of each type populate cacheLoadIDs and add into cache
|
|
for inst, ids := range attrs {
|
|
if len(ids) != 0 {
|
|
cacheLoadIDs[inst] = loadIDs[inst]
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// V1ReplicateSet receives an item via replication to store in the cache
|
|
func (chS *CacheS) V1ReplicateSet(_ *context.Context, args *utils.ArgCacheReplicateSet, reply *string) (err error) {
|
|
if cmp, canCast := args.Value.(utils.Compiler); canCast {
|
|
if err = cmp.Compile(); err != nil {
|
|
return
|
|
}
|
|
}
|
|
chS.tCache.Set(args.CacheID, args.ItemID, args.Value, nil, true, utils.EmptyString)
|
|
*reply = utils.OK
|
|
return
|
|
}
|
|
|
|
// ReplicateRemove replicate an item to ReplicationConns
|
|
func (chS *CacheS) ReplicateRemove(ctx *context.Context, chID, itmID string) (err error) {
|
|
if len(chS.cfg.CacheCfg().ReplicationConns) == 0 ||
|
|
!chS.cfg.CacheCfg().Partitions[chID].Replicate {
|
|
return
|
|
}
|
|
var reply string
|
|
return chS.connMgr.Call(ctx, chS.cfg.CacheCfg().ReplicationConns, utils.CacheSv1ReplicateRemove,
|
|
&utils.ArgCacheReplicateRemove{
|
|
CacheID: chID,
|
|
ItemID: itmID,
|
|
}, &reply)
|
|
}
|
|
|
|
// V1ReplicateRemove replicate an item
|
|
func (chS *CacheS) V1ReplicateRemove(_ *context.Context, args *utils.ArgCacheReplicateRemove, reply *string) (err error) {
|
|
chS.tCache.Remove(args.CacheID, args.ItemID, true, utils.EmptyString)
|
|
*reply = utils.OK
|
|
return
|
|
}
|