Added CacheSv1 to DispatcherSv1

This commit is contained in:
Trial97
2019-03-18 13:51:47 +02:00
committed by Dan Christian Bogos
parent a8309d5b50
commit ac31d88d43
8 changed files with 369 additions and 1 deletions

View File

@@ -35,7 +35,7 @@ type CacheSv1 struct {
cacheS *engine.CacheS
}
// GetItemExpiryTime returns the expiryTime for an item
// GetItemIDs returns the IDs for cacheID with given prefix
func (chSv1 *CacheSv1) GetItemIDs(args *engine.ArgsGetCacheItemIDs,
reply *[]string) error {
return chSv1.cacheS.V1GetItemIDs(args, reply)

View File

@@ -25,6 +25,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
// GetDispatcherProfile returns a Dispatcher Profile
@@ -409,3 +410,96 @@ func (dS *DispatcherResponder) Shutdown(args *dispatchers.TntWithApiKey, reply *
func (dS *DispatcherResponder) GetTimeout(args *dispatchers.TntWithApiKey, reply *time.Duration) error {
return dS.dS.ResponderGetTimeout(args, reply)
}
// Ping used to detreminate if component is active
func (dS *DispatcherResponder) Ping(args *dispatchers.CGREvWithApiKey, reply *string) error {
return dS.dS.ResponderPing(args, reply)
}
func NewDispatcherCacheSv1(dps *dispatchers.DispatcherService) *DispatcherCacheSv1 {
return &DispatcherCacheSv1{dS: dps}
}
// Exports RPC from CacheSv1
type DispatcherCacheSv1 struct {
dS *dispatchers.DispatcherService
}
// GetItemIDs returns the IDs for cacheID with given prefix
func (dS *DispatcherCacheSv1) GetItemIDs(args *dispatchers.ArgsGetCacheItemIDsWithApiKey,
reply *[]string) error {
return dS.dS.CacheSv1GetItemIDs(args, reply)
}
// HasItem verifies the existence of an Item in cache
func (dS *DispatcherCacheSv1) HasItem(args *dispatchers.ArgsGetCacheItemWithApiKey,
reply *bool) error {
return dS.dS.CacheSv1HasItem(args, reply)
}
// GetItemExpiryTime returns the expiryTime for an item
func (dS *DispatcherCacheSv1) GetItemExpiryTime(args *dispatchers.ArgsGetCacheItemWithApiKey,
reply *time.Time) error {
return dS.dS.CacheSv1GetItemExpiryTime(args, reply)
}
// RemoveItem removes the Item with ID from cache
func (dS *DispatcherCacheSv1) RemoveItem(args *dispatchers.ArgsGetCacheItemWithApiKey,
reply *string) error {
return dS.dS.CacheSv1RemoveItem(args, reply)
}
// Clear will clear partitions in the cache (nil fol all, empty slice for none)
func (dS *DispatcherCacheSv1) Clear(args *dispatchers.AttrCacheIDsWithApiKey,
reply *string) error {
return dS.dS.CacheSv1Clear(args, reply)
}
// FlushCache wipes out cache for a prefix or completely
func (dS *DispatcherCacheSv1) FlushCache(args dispatchers.AttrReloadCacheWithApiKey, reply *string) (err error) {
return dS.dS.CacheSv1FlushCache(args, reply)
}
// GetCacheStats returns CacheStats filtered by cacheIDs
func (dS *DispatcherCacheSv1) GetCacheStats(args *dispatchers.AttrCacheIDsWithApiKey,
reply *map[string]*ltcache.CacheStats) error {
return dS.dS.CacheSv1GetCacheStats(args, reply)
}
// PrecacheStatus checks status of active precache processes
func (dS *DispatcherCacheSv1) PrecacheStatus(args *dispatchers.AttrCacheIDsWithApiKey, reply *map[string]string) error {
return dS.dS.CacheSv1PrecacheStatus(args, reply)
}
// HasGroup checks existence of a group in cache
func (dS *DispatcherCacheSv1) HasGroup(args *dispatchers.ArgsGetGroupWithApiKey,
reply *bool) (err error) {
return dS.dS.CacheSv1HasGroup(args, reply)
}
// GetGroupItemIDs returns a list of itemIDs in a cache group
func (dS *DispatcherCacheSv1) GetGroupItemIDs(args *dispatchers.ArgsGetGroupWithApiKey,
reply *[]string) (err error) {
return dS.dS.CacheSv1GetGroupItemIDs(args, reply)
}
// RemoveGroup will remove a group and all items belonging to it from cache
func (dS *DispatcherCacheSv1) RemoveGroup(args *dispatchers.ArgsGetGroupWithApiKey,
reply *string) (err error) {
return dS.dS.CacheSv1RemoveGroup(args, reply)
}
// ReloadCache reloads cache from DB for a prefix or completely
func (dS *DispatcherCacheSv1) ReloadCache(args dispatchers.AttrReloadCacheWithApiKey, reply *string) (err error) {
return dS.dS.CacheSv1ReloadCache(args, reply)
}
// LoadCache loads cache from DB for a prefix or completely
func (dS *DispatcherCacheSv1) LoadCache(args dispatchers.AttrReloadCacheWithApiKey, reply *string) (err error) {
return dS.dS.CacheSv1LoadCache(args, reply)
}
// Ping used to detreminate if component is active
func (dS *DispatcherCacheSv1) Ping(args *dispatchers.CGREvWithApiKey, reply *string) error {
return dS.dS.CacheSv1Ping(args, reply)
}

View File

@@ -1000,6 +1000,9 @@ func startDispatcherService(internalDispatcherSChan chan *dispatchers.Dispatcher
server.RpcRegisterName(utils.Responder,
v1.NewDispatcherResponder(dspS))
server.RpcRegisterName(utils.CacheSv1,
v1.NewDispatcherCacheSv1(dspS))
internalDispatcherSChan <- dspS
}

218
dispatchers/caches.go Normal file
View File

@@ -0,0 +1,218 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package dispatchers
import (
"time"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
// CacheSv1Ping interogates CacheSv1 server responsible to process the event
func (dS *DispatcherService) CacheSv1Ping(args *CGREvWithApiKey,
reply *string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1Ping,
args.CGREvent.Tenant,
args.APIKey, args.CGREvent.Time); err != nil {
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaCache, args.RouteID,
utils.CacheSv1Ping, args.CGREvent, reply)
}
// GetItemIDs returns the IDs for cacheID with given prefix
func (dS *DispatcherService) CacheSv1GetItemIDs(args *ArgsGetCacheItemIDsWithApiKey,
reply *[]string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1GetItemIDs,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1GetItemIDs, &args.ArgsGetCacheItemIDs, reply)
}
// HasItem verifies the existence of an Item in cache
func (dS *DispatcherService) CacheSv1HasItem(args *ArgsGetCacheItemWithApiKey,
reply *bool) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1HasItem,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1HasItem, &args.ArgsGetCacheItem, reply)
}
// GetItemExpiryTime returns the expiryTime for an item
func (dS *DispatcherService) CacheSv1GetItemExpiryTime(args *ArgsGetCacheItemWithApiKey,
reply *time.Time) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1GetItemExpiryTime,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1GetItemExpiryTime, &args.ArgsGetCacheItem, reply)
}
// RemoveItem removes the Item with ID from cache
func (dS *DispatcherService) CacheSv1RemoveItem(args *ArgsGetCacheItemWithApiKey,
reply *string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1RemoveItem,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1RemoveItem, &args.ArgsGetCacheItem, reply)
}
// Clear will clear partitions in the cache (nil fol all, empty slice for none)
func (dS *DispatcherService) CacheSv1Clear(args *AttrCacheIDsWithApiKey,
reply *string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1Clear,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1Clear, args.CacheIDs, reply)
}
// FlushCache wipes out cache for a prefix or completely
func (dS *DispatcherService) CacheSv1FlushCache(args AttrReloadCacheWithApiKey, reply *string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1FlushCache,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1FlushCache, args.AttrReloadCache, reply)
}
// GetCacheStats returns CacheStats filtered by cacheIDs
func (dS *DispatcherService) CacheSv1GetCacheStats(args *AttrCacheIDsWithApiKey,
reply *map[string]*ltcache.CacheStats) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1GetCacheStats,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1GetCacheStats, args.CacheIDs, reply)
}
// PrecacheStatus checks status of active precache processes
func (dS *DispatcherService) CacheSv1PrecacheStatus(args *AttrCacheIDsWithApiKey, reply *map[string]string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1PrecacheStatus,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1PrecacheStatus, args.CacheIDs, reply)
}
// HasGroup checks existence of a group in cache
func (dS *DispatcherService) CacheSv1HasGroup(args *ArgsGetGroupWithApiKey,
reply *bool) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1HasGroup,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1HasGroup, args.ArgsGetGroup, reply)
}
// GetGroupItemIDs returns a list of itemIDs in a cache group
func (dS *DispatcherService) CacheSv1GetGroupItemIDs(args *ArgsGetGroupWithApiKey,
reply *[]string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1GetGroupItemIDs,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1GetGroupItemIDs, args.ArgsGetGroup, reply)
}
// RemoveGroup will remove a group and all items belonging to it from cache
func (dS *DispatcherService) CacheSv1RemoveGroup(args *ArgsGetGroupWithApiKey,
reply *string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1RemoveGroup,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1RemoveGroup, args.ArgsGetGroup, reply)
}
// ReloadCache reloads cache from DB for a prefix or completely
func (dS *DispatcherService) CacheSv1ReloadCache(args AttrReloadCacheWithApiKey, reply *string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1ReloadCache,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1ReloadCache, args.AttrReloadCache, reply)
}
// LoadCache loads cache from DB for a prefix or completely
func (dS *DispatcherService) CacheSv1LoadCache(args AttrReloadCacheWithApiKey, reply *string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.CacheSv1LoadCache,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaCache, args.RouteID,
utils.CacheSv1LoadCache, args.AttrReloadCache, reply)
}

View File

@@ -25,6 +25,20 @@ import (
"github.com/cgrates/cgrates/utils"
)
// ResponderPing interogates Responder server responsible to process the event
func (dS *DispatcherService) ResponderPing(args *CGREvWithApiKey,
reply *string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.ResponderPing,
args.CGREvent.Tenant,
args.APIKey, args.CGREvent.Time); err != nil {
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaResponder, args.RouteID,
utils.ResponderPing, args.CGREvent, reply)
}
func (dS *DispatcherService) ResponderStatus(args *TntWithApiKey,
reply *map[string]interface{}) (err error) {
if dS.attrS != nil {

View File

@@ -125,6 +125,36 @@ type CallDescriptorWithApiKey struct {
engine.CallDescriptor
}
type ArgsGetCacheItemIDsWithApiKey struct {
DispatcherResource
utils.TenantArg
engine.ArgsGetCacheItemIDs
}
type ArgsGetCacheItemWithApiKey struct {
DispatcherResource
utils.TenantArg
engine.ArgsGetCacheItem
}
type AttrReloadCacheWithApiKey struct {
DispatcherResource
utils.TenantArg
utils.AttrReloadCache
}
type AttrCacheIDsWithApiKey struct {
DispatcherResource
utils.TenantArg
CacheIDs []string
}
type ArgsGetGroupWithApiKey struct {
DispatcherResource
utils.TenantArg
engine.ArgsGetGroup
}
func ParseStringMap(s string) utils.StringMap {
if s == utils.ZERO {
return make(utils.StringMap)

View File

@@ -269,6 +269,12 @@ func (rs *Responder) Shutdown(arg string, reply *string) (err error) {
return
}
// Ping used to detreminate if component is active
func (chSv1 *Responder) Ping(ign *utils.CGREvent, reply *string) error {
*reply = utils.Pong
return nil
}
func (rs *Responder) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {

View File

@@ -389,6 +389,7 @@ const (
MetaResources = "*resources"
MetaFilters = "*filters"
MetaCDRs = "*cdrs"
MetaCache = "*cache"
Migrator = "migrator"
UnsupportedMigrationTask = "unsupported migration task"
NoStorDBConnection = "not connected to StorDB"
@@ -791,6 +792,7 @@ const (
ResponderGetCost = "Responder.GetCost"
ResponderShutdown = "Responder.Shutdown"
ResponderGetTimeout = "Responder.GetTimeout"
ResponderPing = "Responder.Ping"
)
// DispatcherS APIs
@@ -811,6 +813,7 @@ const (
// CacheS APIs
const (
CacheSv1 = "CacheSv1"
CacheSv1GetCacheStats = "CacheSv1.GetCacheStats"
CacheSv1GetItemIDs = "CacheSv1.GetItemIDs"
CacheSv1HasItem = "CacheSv1.HasItem"