diff --git a/apier/v1/caches.go b/apier/v1/caches.go index d8184862d..daa04af72 100644 --- a/apier/v1/caches.go +++ b/apier/v1/caches.go @@ -35,7 +35,7 @@ type CacheSv1 struct { cacheS *engine.CacheS } -// GetItemExpiryTime returns the expiryTime for an item +// GetItemIDs returns the IDs for cacheID with given prefix func (chSv1 *CacheSv1) GetItemIDs(args *engine.ArgsGetCacheItemIDs, reply *[]string) error { return chSv1.cacheS.V1GetItemIDs(args, reply) diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index 616ef252c..d289cd5de 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -25,6 +25,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" ) // GetDispatcherProfile returns a Dispatcher Profile @@ -409,3 +410,96 @@ func (dS *DispatcherResponder) Shutdown(args *dispatchers.TntWithApiKey, reply * func (dS *DispatcherResponder) GetTimeout(args *dispatchers.TntWithApiKey, reply *time.Duration) error { return dS.dS.ResponderGetTimeout(args, reply) } + +// Ping used to detreminate if component is active +func (dS *DispatcherResponder) Ping(args *dispatchers.CGREvWithApiKey, reply *string) error { + return dS.dS.ResponderPing(args, reply) +} + +func NewDispatcherCacheSv1(dps *dispatchers.DispatcherService) *DispatcherCacheSv1 { + return &DispatcherCacheSv1{dS: dps} +} + +// Exports RPC from CacheSv1 +type DispatcherCacheSv1 struct { + dS *dispatchers.DispatcherService +} + +// GetItemIDs returns the IDs for cacheID with given prefix +func (dS *DispatcherCacheSv1) GetItemIDs(args *dispatchers.ArgsGetCacheItemIDsWithApiKey, + reply *[]string) error { + return dS.dS.CacheSv1GetItemIDs(args, reply) +} + +// HasItem verifies the existence of an Item in cache +func (dS *DispatcherCacheSv1) HasItem(args *dispatchers.ArgsGetCacheItemWithApiKey, + reply *bool) error { + return dS.dS.CacheSv1HasItem(args, reply) +} + +// GetItemExpiryTime returns the expiryTime for an item +func (dS *DispatcherCacheSv1) GetItemExpiryTime(args *dispatchers.ArgsGetCacheItemWithApiKey, + reply *time.Time) error { + return dS.dS.CacheSv1GetItemExpiryTime(args, reply) +} + +// RemoveItem removes the Item with ID from cache +func (dS *DispatcherCacheSv1) RemoveItem(args *dispatchers.ArgsGetCacheItemWithApiKey, + reply *string) error { + return dS.dS.CacheSv1RemoveItem(args, reply) +} + +// Clear will clear partitions in the cache (nil fol all, empty slice for none) +func (dS *DispatcherCacheSv1) Clear(args *dispatchers.AttrCacheIDsWithApiKey, + reply *string) error { + return dS.dS.CacheSv1Clear(args, reply) +} + +// FlushCache wipes out cache for a prefix or completely +func (dS *DispatcherCacheSv1) FlushCache(args dispatchers.AttrReloadCacheWithApiKey, reply *string) (err error) { + return dS.dS.CacheSv1FlushCache(args, reply) +} + +// GetCacheStats returns CacheStats filtered by cacheIDs +func (dS *DispatcherCacheSv1) GetCacheStats(args *dispatchers.AttrCacheIDsWithApiKey, + reply *map[string]*ltcache.CacheStats) error { + return dS.dS.CacheSv1GetCacheStats(args, reply) +} + +// PrecacheStatus checks status of active precache processes +func (dS *DispatcherCacheSv1) PrecacheStatus(args *dispatchers.AttrCacheIDsWithApiKey, reply *map[string]string) error { + return dS.dS.CacheSv1PrecacheStatus(args, reply) +} + +// HasGroup checks existence of a group in cache +func (dS *DispatcherCacheSv1) HasGroup(args *dispatchers.ArgsGetGroupWithApiKey, + reply *bool) (err error) { + return dS.dS.CacheSv1HasGroup(args, reply) +} + +// GetGroupItemIDs returns a list of itemIDs in a cache group +func (dS *DispatcherCacheSv1) GetGroupItemIDs(args *dispatchers.ArgsGetGroupWithApiKey, + reply *[]string) (err error) { + return dS.dS.CacheSv1GetGroupItemIDs(args, reply) +} + +// RemoveGroup will remove a group and all items belonging to it from cache +func (dS *DispatcherCacheSv1) RemoveGroup(args *dispatchers.ArgsGetGroupWithApiKey, + reply *string) (err error) { + return dS.dS.CacheSv1RemoveGroup(args, reply) +} + +// ReloadCache reloads cache from DB for a prefix or completely +func (dS *DispatcherCacheSv1) ReloadCache(args dispatchers.AttrReloadCacheWithApiKey, reply *string) (err error) { + return dS.dS.CacheSv1ReloadCache(args, reply) +} + +// LoadCache loads cache from DB for a prefix or completely +func (dS *DispatcherCacheSv1) LoadCache(args dispatchers.AttrReloadCacheWithApiKey, reply *string) (err error) { + return dS.dS.CacheSv1LoadCache(args, reply) +} + +// Ping used to detreminate if component is active +func (dS *DispatcherCacheSv1) Ping(args *dispatchers.CGREvWithApiKey, reply *string) error { + return dS.dS.CacheSv1Ping(args, reply) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 0e6f23300..8988895ef 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -1000,6 +1000,9 @@ func startDispatcherService(internalDispatcherSChan chan *dispatchers.Dispatcher server.RpcRegisterName(utils.Responder, v1.NewDispatcherResponder(dspS)) + server.RpcRegisterName(utils.CacheSv1, + v1.NewDispatcherCacheSv1(dspS)) + internalDispatcherSChan <- dspS } diff --git a/dispatchers/caches.go b/dispatchers/caches.go new file mode 100644 index 000000000..d61f4d81a --- /dev/null +++ b/dispatchers/caches.go @@ -0,0 +1,218 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatchers + +import ( + "time" + + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" +) + +// CacheSv1Ping interogates CacheSv1 server responsible to process the event +func (dS *DispatcherService) CacheSv1Ping(args *CGREvWithApiKey, + reply *string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1Ping, + args.CGREvent.Tenant, + args.APIKey, args.CGREvent.Time); err != nil { + return + } + } + return dS.Dispatch(&args.CGREvent, utils.MetaCache, args.RouteID, + utils.CacheSv1Ping, args.CGREvent, reply) +} + +// GetItemIDs returns the IDs for cacheID with given prefix +func (dS *DispatcherService) CacheSv1GetItemIDs(args *ArgsGetCacheItemIDsWithApiKey, + reply *[]string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1GetItemIDs, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1GetItemIDs, &args.ArgsGetCacheItemIDs, reply) +} + +// HasItem verifies the existence of an Item in cache +func (dS *DispatcherService) CacheSv1HasItem(args *ArgsGetCacheItemWithApiKey, + reply *bool) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1HasItem, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1HasItem, &args.ArgsGetCacheItem, reply) +} + +// GetItemExpiryTime returns the expiryTime for an item +func (dS *DispatcherService) CacheSv1GetItemExpiryTime(args *ArgsGetCacheItemWithApiKey, + reply *time.Time) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1GetItemExpiryTime, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1GetItemExpiryTime, &args.ArgsGetCacheItem, reply) +} + +// RemoveItem removes the Item with ID from cache +func (dS *DispatcherService) CacheSv1RemoveItem(args *ArgsGetCacheItemWithApiKey, + reply *string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1RemoveItem, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1RemoveItem, &args.ArgsGetCacheItem, reply) +} + +// Clear will clear partitions in the cache (nil fol all, empty slice for none) +func (dS *DispatcherService) CacheSv1Clear(args *AttrCacheIDsWithApiKey, + reply *string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1Clear, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1Clear, args.CacheIDs, reply) +} + +// FlushCache wipes out cache for a prefix or completely +func (dS *DispatcherService) CacheSv1FlushCache(args AttrReloadCacheWithApiKey, reply *string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1FlushCache, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1FlushCache, args.AttrReloadCache, reply) +} + +// GetCacheStats returns CacheStats filtered by cacheIDs +func (dS *DispatcherService) CacheSv1GetCacheStats(args *AttrCacheIDsWithApiKey, + reply *map[string]*ltcache.CacheStats) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1GetCacheStats, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1GetCacheStats, args.CacheIDs, reply) +} + +// PrecacheStatus checks status of active precache processes +func (dS *DispatcherService) CacheSv1PrecacheStatus(args *AttrCacheIDsWithApiKey, reply *map[string]string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1PrecacheStatus, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1PrecacheStatus, args.CacheIDs, reply) +} + +// HasGroup checks existence of a group in cache +func (dS *DispatcherService) CacheSv1HasGroup(args *ArgsGetGroupWithApiKey, + reply *bool) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1HasGroup, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1HasGroup, args.ArgsGetGroup, reply) +} + +// GetGroupItemIDs returns a list of itemIDs in a cache group +func (dS *DispatcherService) CacheSv1GetGroupItemIDs(args *ArgsGetGroupWithApiKey, + reply *[]string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1GetGroupItemIDs, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1GetGroupItemIDs, args.ArgsGetGroup, reply) +} + +// RemoveGroup will remove a group and all items belonging to it from cache +func (dS *DispatcherService) CacheSv1RemoveGroup(args *ArgsGetGroupWithApiKey, + reply *string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1RemoveGroup, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1RemoveGroup, args.ArgsGetGroup, reply) +} + +// ReloadCache reloads cache from DB for a prefix or completely +func (dS *DispatcherService) CacheSv1ReloadCache(args AttrReloadCacheWithApiKey, reply *string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1ReloadCache, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1ReloadCache, args.AttrReloadCache, reply) +} + +// LoadCache loads cache from DB for a prefix or completely +func (dS *DispatcherService) CacheSv1LoadCache(args AttrReloadCacheWithApiKey, reply *string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.CacheSv1LoadCache, + args.TenantArg.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID, + utils.CacheSv1LoadCache, args.AttrReloadCache, reply) +} diff --git a/dispatchers/responder.go b/dispatchers/responder.go index ac0586d91..956600199 100644 --- a/dispatchers/responder.go +++ b/dispatchers/responder.go @@ -25,6 +25,20 @@ import ( "github.com/cgrates/cgrates/utils" ) +// ResponderPing interogates Responder server responsible to process the event +func (dS *DispatcherService) ResponderPing(args *CGREvWithApiKey, + reply *string) (err error) { + if dS.attrS != nil { + if err = dS.authorize(utils.ResponderPing, + args.CGREvent.Tenant, + args.APIKey, args.CGREvent.Time); err != nil { + return + } + } + return dS.Dispatch(&args.CGREvent, utils.MetaResponder, args.RouteID, + utils.ResponderPing, args.CGREvent, reply) +} + func (dS *DispatcherService) ResponderStatus(args *TntWithApiKey, reply *map[string]interface{}) (err error) { if dS.attrS != nil { diff --git a/dispatchers/utils.go b/dispatchers/utils.go index b2beaf3d5..d0baef4f9 100755 --- a/dispatchers/utils.go +++ b/dispatchers/utils.go @@ -125,6 +125,36 @@ type CallDescriptorWithApiKey struct { engine.CallDescriptor } +type ArgsGetCacheItemIDsWithApiKey struct { + DispatcherResource + utils.TenantArg + engine.ArgsGetCacheItemIDs +} + +type ArgsGetCacheItemWithApiKey struct { + DispatcherResource + utils.TenantArg + engine.ArgsGetCacheItem +} + +type AttrReloadCacheWithApiKey struct { + DispatcherResource + utils.TenantArg + utils.AttrReloadCache +} + +type AttrCacheIDsWithApiKey struct { + DispatcherResource + utils.TenantArg + CacheIDs []string +} + +type ArgsGetGroupWithApiKey struct { + DispatcherResource + utils.TenantArg + engine.ArgsGetGroup +} + func ParseStringMap(s string) utils.StringMap { if s == utils.ZERO { return make(utils.StringMap) diff --git a/engine/responder.go b/engine/responder.go index 78b954e9f..f59d9976e 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -269,6 +269,12 @@ func (rs *Responder) Shutdown(arg string, reply *string) (err error) { return } +// Ping used to detreminate if component is active +func (chSv1 *Responder) Ping(ign *utils.CGREvent, reply *string) error { + *reply = utils.Pong + return nil +} + func (rs *Responder) Call(serviceMethod string, args interface{}, reply interface{}) error { parts := strings.Split(serviceMethod, ".") if len(parts) != 2 { diff --git a/utils/consts.go b/utils/consts.go index 5466b5726..6e039b159 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -389,6 +389,7 @@ const ( MetaResources = "*resources" MetaFilters = "*filters" MetaCDRs = "*cdrs" + MetaCache = "*cache" Migrator = "migrator" UnsupportedMigrationTask = "unsupported migration task" NoStorDBConnection = "not connected to StorDB" @@ -791,6 +792,7 @@ const ( ResponderGetCost = "Responder.GetCost" ResponderShutdown = "Responder.Shutdown" ResponderGetTimeout = "Responder.GetTimeout" + ResponderPing = "Responder.Ping" ) // DispatcherS APIs @@ -811,6 +813,7 @@ const ( // CacheS APIs const ( + CacheSv1 = "CacheSv1" CacheSv1GetCacheStats = "CacheSv1.GetCacheStats" CacheSv1GetItemIDs = "CacheSv1.GetItemIDs" CacheSv1HasItem = "CacheSv1.HasItem"