mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Add service in APIer + test ping method for RateS
This commit is contained in:
committed by
Dan Christian Bogos
parent
c1c295d276
commit
490ca92160
@@ -21,6 +21,8 @@ package v1
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/rates"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
@@ -132,3 +134,27 @@ func (APIerSv1 *APIerSv1) RemoveRateProfile(arg *utils.TenantIDWithCache, reply
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewRateSv1(rateS *rates.RateS) *RateSv1 {
|
||||
return &RateSv1{rS: rateS}
|
||||
}
|
||||
|
||||
// Exports RPC from RLs
|
||||
type RateSv1 struct {
|
||||
rS *rates.RateS
|
||||
}
|
||||
|
||||
// Call implements rpcclient.ClientConnector interface for internal RPC
|
||||
func (rSv1 *RateSv1) Call(serviceMethod string,
|
||||
args interface{}, reply interface{}) error {
|
||||
return utils.APIerRPCCall(rSv1, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
func (rSv1 *RateSv1) CostForEvent(args *rates.ArgsCostForEvent, cC *utils.ChargedCost) (err error) {
|
||||
return rSv1.rS.V1CostForEvent(args, cC)
|
||||
}
|
||||
|
||||
func (rSv1 *RateSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error {
|
||||
*reply = utils.Pong
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -49,6 +49,7 @@ var (
|
||||
testV1RatePrfVerifyRateProfile,
|
||||
testV1RatePrfRemoveRateProfile,
|
||||
testV1RatePrfNotFound,
|
||||
testV1RatePing,
|
||||
testV1RatePrfStopEngine,
|
||||
}
|
||||
)
|
||||
@@ -180,6 +181,15 @@ func testV1RatePrfRemoveRateProfile(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testV1RatePing(t *testing.T) {
|
||||
var resp string
|
||||
if err := ratePrfRpc.Call(utils.RateSv1Ping, new(utils.CGREvent), &resp); err != nil {
|
||||
t.Error(err)
|
||||
} else if resp != utils.Pong {
|
||||
t.Error("Unexpected reply returned", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func testV1RatePrfStopEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(*waitRater); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -530,7 +530,7 @@ func main() {
|
||||
ldrs, anz, dspS, dmService, storDBService,
|
||||
services.NewEventExporterService(cfg, filterSChan,
|
||||
connManager, server, exitChan, internalEEsChan),
|
||||
services.NewRateService(cfg, filterSChan, dmService,
|
||||
services.NewRateService(cfg, cacheS, filterSChan, dmService,
|
||||
server, exitChan, internalRateSChan),
|
||||
services.NewSIPAgent(cfg, filterSChan, exitChan, connManager),
|
||||
)
|
||||
@@ -564,6 +564,7 @@ func main() {
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ConfigSv1, internalConfigChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.RALsV1, internalRALsChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.RateSv1, internalRateSChan)
|
||||
|
||||
initConfigSv1(internalConfigChan, server)
|
||||
|
||||
|
||||
@@ -81,6 +81,8 @@ func (self *CmdApierPing) RpcMethod() string {
|
||||
return utils.APIerSv1Ping
|
||||
case utils.EEsLow:
|
||||
return utils.EventExporterSv1Ping
|
||||
case utils.RateSLow:
|
||||
return utils.RateSv1Ping
|
||||
default:
|
||||
}
|
||||
return self.rpcMethod
|
||||
|
||||
@@ -101,6 +101,11 @@
|
||||
},
|
||||
|
||||
|
||||
"rates": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
|
||||
"filters": {
|
||||
"apiers_conns": ["*internal"],
|
||||
},
|
||||
|
||||
@@ -131,6 +131,11 @@
|
||||
},
|
||||
|
||||
|
||||
"rates": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
|
||||
"filters": {
|
||||
"apiers_conns": ["*internal"],
|
||||
},
|
||||
|
||||
@@ -128,6 +128,10 @@
|
||||
},
|
||||
|
||||
|
||||
"rates": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
"filters": {
|
||||
"apiers_conns": ["*internal"],
|
||||
},
|
||||
|
||||
@@ -93,7 +93,7 @@ func (attrS *AttributeService) Start() (err error) {
|
||||
|
||||
// Reload handles the change of config
|
||||
func (attrS *AttributeService) Reload() (err error) {
|
||||
return // for the momment nothing to reload
|
||||
return // for the moment nothing to reload
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
|
||||
@@ -22,6 +22,8 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/rates"
|
||||
@@ -34,12 +36,14 @@ import (
|
||||
|
||||
// NewRateService constructs RateService
|
||||
func NewRateService(
|
||||
cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
|
||||
cfg *config.CGRConfig, cacheS *engine.CacheS,
|
||||
filterSChan chan *engine.FilterS,
|
||||
dmS *DataDBService,
|
||||
server *utils.Server, exitChan chan bool,
|
||||
intConnChan chan rpcclient.ClientConnector) servmanager.Service {
|
||||
return &RateService{
|
||||
cfg: cfg,
|
||||
cacheS: cacheS,
|
||||
filterSChan: filterSChan,
|
||||
dmS: dmS,
|
||||
server: server,
|
||||
@@ -56,13 +60,15 @@ type RateService struct {
|
||||
cfg *config.CGRConfig
|
||||
filterSChan chan *engine.FilterS
|
||||
dmS *DataDBService
|
||||
cacheS *engine.CacheS
|
||||
server *utils.Server
|
||||
exitChan chan bool
|
||||
intConnChan chan rpcclient.ClientConnector
|
||||
rldChan chan struct{}
|
||||
|
||||
rateS *rates.RateS
|
||||
//rpc *v1.EventExporterSv1
|
||||
rldChan chan struct{}
|
||||
|
||||
rateS *rates.RateS
|
||||
rpc *v1.RateSv1
|
||||
intConnChan chan rpcclient.ClientConnector
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
@@ -106,20 +112,26 @@ func (rs *RateService) Start() (err error) {
|
||||
return utils.ErrServiceAlreadyRunning
|
||||
}
|
||||
|
||||
<-rs.cacheS.GetPrecacheChannel(utils.CacheRateProfiles)
|
||||
<-rs.cacheS.GetPrecacheChannel(utils.CacheRateProfilesFilterIndexes)
|
||||
<-rs.cacheS.GetPrecacheChannel(utils.CacheRateFilterIndexes)
|
||||
|
||||
fltrS := <-rs.filterSChan
|
||||
rs.filterSChan <- fltrS
|
||||
|
||||
dbchan := rs.dmS.GetDMChan()
|
||||
dm := <-dbchan
|
||||
dbchan <- dm
|
||||
rs.Lock()
|
||||
rs.rateS = rates.NewRateS(rs.cfg, fltrS, dm)
|
||||
rs.Unlock()
|
||||
/*rs.rpc = v1.NewEventExporterSv1(es.eeS)
|
||||
|
||||
rs.rpc = v1.NewRateSv1(rs.rateS)
|
||||
if !rs.cfg.DispatcherSCfg().Enabled {
|
||||
rs.server.RpcRegister(es.rpc)
|
||||
rs.server.RpcRegister(rs.rpc)
|
||||
}
|
||||
*/
|
||||
rs.intConnChan <- rs.rateS
|
||||
|
||||
rs.intConnChan <- rs.rpc
|
||||
|
||||
go func(rtS *rates.RateS, exitChan chan bool, rldChan chan struct{}) {
|
||||
if err := rtS.ListenAndServe(exitChan, rldChan); err != nil {
|
||||
|
||||
@@ -45,7 +45,8 @@ func TestRateSReload(t *testing.T) {
|
||||
server := utils.NewServer()
|
||||
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
|
||||
db := NewDataDBService(cfg, nil)
|
||||
rS := NewRateService(cfg, filterSChan, db, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
|
||||
chS := engine.NewCacheS(cfg, nil)
|
||||
rS := NewRateService(cfg, chS, filterSChan, db, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
|
||||
srvMngr.AddServices(rS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
|
||||
@@ -831,6 +831,7 @@ const (
|
||||
ReplicatorLow = "replicator"
|
||||
ApierSLow = "apiers"
|
||||
EEsLow = "ees"
|
||||
RateSLow = "rates"
|
||||
)
|
||||
|
||||
// Actions
|
||||
@@ -1295,6 +1296,12 @@ const (
|
||||
RALsV1Ping = "RALsV1.Ping"
|
||||
)
|
||||
|
||||
const (
|
||||
RateSv1 = "RateSv1"
|
||||
RateSv1CostForEvent = "RALsV1.CostForEvent"
|
||||
RateSv1Ping = "RateSv1.Ping"
|
||||
)
|
||||
|
||||
const (
|
||||
CoreS = "CoreS"
|
||||
CoreSv1 = "CoreSv1"
|
||||
|
||||
Reference in New Issue
Block a user