diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go
index f62871721..7795d959a 100755
--- a/apier/v1/dispatcher.go
+++ b/apier/v1/dispatcher.go
@@ -38,20 +38,22 @@ func (dT *DispatcherThresholdSv1) Ping(ign string, reply *string) error {
return dT.dS.ThresholdSv1Ping(ign, reply)
}
+/* To be implemented in console
// GetThresholdIDs implements ThresholdSv1GetThresholdIDs
func (dT *DispatcherThresholdSv1) GetThresholdIDs(tenant string,
tIDs *[]string) error {
return dT.dS.ThresholdSv1GetThresholdIDs(tenant, tIDs)
}
+*/
// GetThreshold implements ThresholdSv1GetThreshold
-func (dT *DispatcherThresholdSv1) GetThreshold(tntID *utils.TenantID,
+func (dT *DispatcherThresholdSv1) GetThresholdsForEvent(tntID *dispatcher.ArgsProcessEventWithApiKey,
t *engine.Threshold) error {
- return dT.dS.ThresholdSv1GetThreshold(tntID, t)
+ return dT.dS.ThresholdSv1GetThresholdForEvent(tntID, t)
}
// ProcessEvent implements ThresholdSv1ProcessEvent
-func (dT *DispatcherThresholdSv1) ProcessEvent(args *engine.ArgsProcessEvent,
+func (dT *DispatcherThresholdSv1) ProcessEvent(args *dispatcher.ArgsProcessEventWithApiKey,
tIDs *[]string) error {
return dT.dS.ThresholdSv1ProcessEvent(args, tIDs)
}
@@ -71,19 +73,19 @@ func (dSts *DispatcherStatSv1) Ping(ign string, reply *string) error {
}
// GetStatQueuesForEvent implements StatSv1GetStatQueuesForEvent
-func (dSts *DispatcherStatSv1) GetStatQueuesForEvent(ev *utils.CGREvent, reply *[]string) error {
- return dSts.dS.StatSv1GetStatQueuesForEvent(ev, reply)
+func (dSts *DispatcherStatSv1) GetStatQueuesForEvent(args *dispatcher.CGREvWithApiKey, reply *[]string) error {
+ return dSts.dS.StatSv1GetStatQueuesForEvent(args, reply)
}
// GetQueueStringMetrics implements StatSv1GetQueueStringMetrics
-func (dSts *DispatcherStatSv1) GetQueueStringMetrics(args *utils.TenantID,
+func (dSts *DispatcherStatSv1) GetQueueStringMetrics(args *dispatcher.TntIDWithApiKey,
reply *map[string]string) error {
return dSts.dS.StatSv1GetQueueStringMetrics(args, reply)
}
// GetQueueStringMetrics implements StatSv1ProcessEvent
-func (dSts *DispatcherStatSv1) ProcessEvent(ev *utils.CGREvent, reply *[]string) error {
- return dSts.dS.StatSv1ProcessEvent(ev, reply)
+func (dSts *DispatcherStatSv1) ProcessEvent(args *dispatcher.CGREvWithApiKey, reply *[]string) error {
+ return dSts.dS.StatSv1ProcessEvent(args, reply)
}
func NewDispatcherResourceSv1(dps *dispatcher.DispatcherService) *DispatcherResourceSv1 {
@@ -101,7 +103,7 @@ func (dRs *DispatcherResourceSv1) Ping(ign string, reply *string) error {
}
// GetResourcesForEvent implements ResourceSv1GetResourcesForEvent
-func (dRs *DispatcherResourceSv1) GetResourcesForEvent(args utils.ArgRSv1ResourceUsage,
+func (dRs *DispatcherResourceSv1) GetResourcesForEvent(args dispatcher.ArgsV1ResUsageWithApiKey,
reply *engine.Resources) error {
return dRs.dRs.ResourceSv1GetResourcesForEvent(args, reply)
}
diff --git a/apier/v1/resourcesv1.go b/apier/v1/resourcesv1.go
index 59aeb2b31..e1e014679 100644
--- a/apier/v1/resourcesv1.go
+++ b/apier/v1/resourcesv1.go
@@ -106,7 +106,7 @@ func (apierV1 *ApierV1) RemoveResourceProfile(arg utils.TenantID, reply *string)
return nil
}
-func (rsv1 *ResourceSv1) Ping(ign string, reply *string) error {
+func (rsv1 *ResourceSv1) Ping(ign struct{}, reply *string) error {
*reply = utils.Pong
return nil
}
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index 5a14ef23e..1d39845c1 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -743,6 +743,66 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
return
}
}
+ if len(cfg.DispatcherSCfg().ResSConns) != 0 {
+ resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
+ cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
+ cfg.DispatcherSCfg().ResSConns, nil, cfg.InternalTtl)
+ if err != nil {
+ utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResoruceS: %s", utils.DispatcherS, err.Error()))
+ exitChan <- true
+ return
+ }
+ }
+ if len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
+ threshSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
+ cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
+ cfg.DispatcherSCfg().ThreshSConns, nil, cfg.InternalTtl)
+ if err != nil {
+ utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.DispatcherS, err.Error()))
+ exitChan <- true
+ return
+ }
+ }
+ if len(cfg.DispatcherSCfg().StatSConns) != 0 {
+ statSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
+ cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
+ cfg.DispatcherSCfg().StatSConns, nil, cfg.InternalTtl)
+ if err != nil {
+ utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatQueueS: %s", utils.DispatcherS, err.Error()))
+ exitChan <- true
+ return
+ }
+ }
+ if len(cfg.DispatcherSCfg().SupplSConns) != 0 {
+ suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
+ cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
+ cfg.DispatcherSCfg().SupplSConns, nil, cfg.InternalTtl)
+ if err != nil {
+ utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s", utils.DispatcherS, err.Error()))
+ exitChan <- true
+ return
+ }
+ }
+ if len(cfg.DispatcherSCfg().AttrSConns) != 0 {
+ attrSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
+ cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
+ cfg.DispatcherSCfg().AttrSConns, nil, cfg.InternalTtl)
+ if err != nil {
+ utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s", utils.DispatcherS, err.Error()))
+ exitChan <- true
+ return
+ }
+ }
+ if len(cfg.DispatcherSCfg().SessionSConns) != 0 {
+ sessionsSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
+ cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
+ cfg.DispatcherSCfg().SessionSConns, nil, cfg.InternalTtl)
+ if err != nil {
+ utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SessionS: %s", utils.DispatcherS, err.Error()))
+ exitChan <- true
+ return
+ }
+ }
dspS, err := dispatcher.NewDispatcherService(dm, ralsConns, resSConns, threshSConns, statSConns,
suplSConns, attrSConns, sessionsSConns)
if err != nil {
@@ -778,6 +838,7 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
server.RpcRegisterName(utils.AttributeSv1,
v1.NewDispatcherAttributeSv1(dspS))
}
+
}
func startRpc(server *utils.Server, internalRaterChan,
diff --git a/console/resources_for_event.go b/console/resources_for_event.go
index ca187e502..512944d15 100644
--- a/console/resources_for_event.go
+++ b/console/resources_for_event.go
@@ -21,6 +21,7 @@ package console
import (
"time"
+ "github.com/cgrates/cgrates/dispatcher"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -29,7 +30,7 @@ func init() {
c := &CmdGetResourceForEvent{
name: "resources_for_event",
rpcMethod: utils.ResourceSv1GetResourcesForEvent,
- rpcParams: &utils.ArgRSv1ResourceUsage{},
+ rpcParams: &dispatcher.ArgsV1ResUsageWithApiKey{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
@@ -39,7 +40,7 @@ func init() {
type CmdGetResourceForEvent struct {
name string
rpcMethod string
- rpcParams *utils.ArgRSv1ResourceUsage
+ rpcParams *dispatcher.ArgsV1ResUsageWithApiKey
*CommandExecuter
}
@@ -53,7 +54,7 @@ func (self *CmdGetResourceForEvent) RpcMethod() string {
func (self *CmdGetResourceForEvent) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
- self.rpcParams = &utils.ArgRSv1ResourceUsage{}
+ self.rpcParams = &dispatcher.ArgsV1ResUsageWithApiKey{}
}
return self.rpcParams
}
diff --git a/console/stats_for_event.go b/console/stats_for_event.go
index 6607679c7..b11962329 100644
--- a/console/stats_for_event.go
+++ b/console/stats_for_event.go
@@ -19,6 +19,9 @@ along with this program. If not, see
package console
import (
+ "time"
+
+ "github.com/cgrates/cgrates/dispatcher"
"github.com/cgrates/cgrates/utils"
)
@@ -26,7 +29,7 @@ func init() {
c := &CmdStatsQueueForEvent{
name: "stats_for_event",
rpcMethod: utils.StatSv1GetStatQueuesForEvent,
- rpcParams: &utils.CGREvent{},
+ rpcParams: &dispatcher.CGREvWithApiKey{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
@@ -36,7 +39,7 @@ func init() {
type CmdStatsQueueForEvent struct {
name string
rpcMethod string
- rpcParams *utils.CGREvent
+ rpcParams *dispatcher.CGREvWithApiKey
*CommandExecuter
}
@@ -50,12 +53,15 @@ func (self *CmdStatsQueueForEvent) RpcMethod() string {
func (self *CmdStatsQueueForEvent) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
- self.rpcParams = &utils.CGREvent{}
+ self.rpcParams = &dispatcher.CGREvWithApiKey{}
}
return self.rpcParams
}
func (self *CmdStatsQueueForEvent) PostprocessRpcParams() error {
+ if self.rpcParams.Time == nil {
+ self.rpcParams.Time = utils.TimePointer(time.Now())
+ }
return nil
}
diff --git a/console/stats_metrics.go b/console/stats_metrics.go
index 63e2d085d..3792366ad 100644
--- a/console/stats_metrics.go
+++ b/console/stats_metrics.go
@@ -19,6 +19,7 @@ along with this program. If not, see
package console
import (
+ "github.com/cgrates/cgrates/dispatcher"
"github.com/cgrates/cgrates/utils"
)
@@ -26,7 +27,7 @@ func init() {
c := &CmdGetStatQueueStringMetrics{
name: "stats_metrics",
rpcMethod: utils.StatSv1GetQueueStringMetrics,
- rpcParams: &utils.TenantID{},
+ rpcParams: &dispatcher.TntIDWithApiKey{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
@@ -36,7 +37,7 @@ func init() {
type CmdGetStatQueueStringMetrics struct {
name string
rpcMethod string
- rpcParams *utils.TenantID
+ rpcParams *dispatcher.TntIDWithApiKey
*CommandExecuter
}
@@ -50,7 +51,7 @@ func (self *CmdGetStatQueueStringMetrics) RpcMethod() string {
func (self *CmdGetStatQueueStringMetrics) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
- self.rpcParams = &utils.TenantID{}
+ self.rpcParams = &dispatcher.TntIDWithApiKey{}
}
return self.rpcParams
}
diff --git a/console/stats_process_event.go b/console/stats_process_event.go
index 3547c74b1..96cc1e6eb 100644
--- a/console/stats_process_event.go
+++ b/console/stats_process_event.go
@@ -21,6 +21,7 @@ package console
import (
"time"
+ "github.com/cgrates/cgrates/dispatcher"
"github.com/cgrates/cgrates/utils"
)
@@ -28,7 +29,7 @@ func init() {
c := &CmdStatQueueProcessEvent{
name: "stats_process_event",
rpcMethod: utils.StatSv1ProcessEvent,
- rpcParams: &utils.CGREvent{},
+ rpcParams: &dispatcher.CGREvWithApiKey{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
@@ -38,7 +39,7 @@ func init() {
type CmdStatQueueProcessEvent struct {
name string
rpcMethod string
- rpcParams *utils.CGREvent
+ rpcParams *dispatcher.CGREvWithApiKey
*CommandExecuter
}
@@ -52,7 +53,7 @@ func (self *CmdStatQueueProcessEvent) RpcMethod() string {
func (self *CmdStatQueueProcessEvent) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
- self.rpcParams = &utils.CGREvent{}
+ self.rpcParams = &dispatcher.CGREvWithApiKey{}
}
return self.rpcParams
}
diff --git a/console/thresholds_for_event.go b/console/thresholds_for_event.go
index e137771fc..ab07856de 100755
--- a/console/thresholds_for_event.go
+++ b/console/thresholds_for_event.go
@@ -19,15 +19,18 @@ along with this program. If not, see
package console
import (
+ "github.com/cgrates/cgrates/dispatcher"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
+//
+
func init() {
c := &CmdThresholdsForEvent{
name: "thresholds_for_event",
rpcMethod: utils.ThresholdSv1GetThresholdsForEvent,
- rpcParams: &engine.ArgsProcessEvent{},
+ rpcParams: &dispatcher.ArgsProcessEventWithApiKey{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
@@ -36,7 +39,7 @@ func init() {
type CmdThresholdsForEvent struct {
name string
rpcMethod string
- rpcParams *engine.ArgsProcessEvent
+ rpcParams *dispatcher.ArgsProcessEventWithApiKey
*CommandExecuter
}
@@ -50,7 +53,7 @@ func (self *CmdThresholdsForEvent) RpcMethod() string {
func (self *CmdThresholdsForEvent) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
- self.rpcParams = &engine.ArgsProcessEvent{}
+ self.rpcParams = &dispatcher.ArgsProcessEventWithApiKey{}
}
return self.rpcParams
}
diff --git a/console/thresholds_process_event.go b/console/thresholds_process_event.go
index 9de9bf382..1886878a1 100644
--- a/console/thresholds_process_event.go
+++ b/console/thresholds_process_event.go
@@ -21,7 +21,7 @@ package console
import (
"time"
- "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/dispatcher"
"github.com/cgrates/cgrates/utils"
)
@@ -29,7 +29,7 @@ func init() {
c := &CmdThresholdProcessEvent{
name: "thresholds_process_event",
rpcMethod: utils.ThresholdSv1ProcessEvent,
- rpcParams: new(engine.ArgsProcessEvent),
+ rpcParams: &dispatcher.ArgsProcessEventWithApiKey{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
@@ -38,7 +38,7 @@ func init() {
type CmdThresholdProcessEvent struct {
name string
rpcMethod string
- rpcParams *engine.ArgsProcessEvent
+ rpcParams *dispatcher.ArgsProcessEventWithApiKey
*CommandExecuter
}
@@ -52,7 +52,7 @@ func (self *CmdThresholdProcessEvent) RpcMethod() string {
func (self *CmdThresholdProcessEvent) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
- self.rpcParams = &engine.ArgsProcessEvent{}
+ self.rpcParams = &dispatcher.ArgsProcessEventWithApiKey{}
}
return self.rpcParams
}
diff --git a/data/conf/samples/dispatcher/cgrates.json b/data/conf/samples/dispatcher/cgrates.json
index e2c7eb56a..ecfbbac56 100755
--- a/data/conf/samples/dispatcher/cgrates.json
+++ b/data/conf/samples/dispatcher/cgrates.json
@@ -5,6 +5,7 @@
"general": {
"log_level": 7,
+ "node_id":"Dispatcher",
},
@@ -12,10 +13,6 @@
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
- "rpc_json_tls":":2022",
- "rpc_gob_tls":":2023",
- "tls_server_certificate" : "/usr/share/cgrates/server.crt", // path to server certificate(must conatin server.crt + ca.crt)
- "tls_server_key":"/usr/share/cgrates/server.key", // path to server key
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
@@ -35,11 +32,31 @@
},
+"resources": { // Resource service (*new)
+ "enabled": false, // starts ResourceLimiter service: .
+},
+
+
"dispatcher":{
- "enabled": true,
- "thresholds_conns": [
- {"address": "192.168.56.204:2012", "transport": "*json"}
+ "enabled": true, // starts DispatcherS service: .
+ "rals_conns": [
+ {"address": "*internal"},
],
+ "resources_conns": [
+ {"address": "192.168.56.204:2012", "transport": "*json"},
+ ],
+ "thresholds_conns": [
+ {"address": "192.168.56.204:2012", "transport": "*json"},
+ ],
+ "stats_conns": [
+ {"address": "192.168.56.204:2012", "transport": "*json"},
+ ],
+ "suppliers_conns": [
+ {"address": "192.168.56.204:2012", "transport": "*json"},
+ ],
+ "attributes_conns": [
+ {"address": "192.168.56.204:2012", "transport": "*json"},
+ ],
"dispatching_strategy":"*random",
},
diff --git a/dispatcher/attributes.go b/dispatcher/attributes.go
new file mode 100755
index 000000000..90ac25eae
--- /dev/null
+++ b/dispatcher/attributes.go
@@ -0,0 +1,72 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package dispatcher
+
+import (
+ "fmt"
+
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func (dS *DispatcherService) AttributeSv1Ping(ign string, reply *string) error {
+ if dS.attrS != nil {
+
+ if err := dS.attrS.Call(utils.AttributeSv1Ping, ign, reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(" error: %s AttributeS.", err.Error()))
+ }
+ }
+ return nil
+}
+
+func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(ev *utils.CGREvent,
+ reply *engine.AttributeProfile) error {
+ if dS.attrS != nil {
+ if err := dS.attrS.Call(utils.AttributeSv1GetAttributeForEvent, ev, reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(" error: %s AttributeS.", err.Error()))
+ }
+ }
+ return nil
+}
+
+func (dS *DispatcherService) AttributeSv1ProcessEvent(ev *utils.CGREvent,
+ reply *engine.AttrSProcessEventReply) (err error) {
+ if dS.attrS == nil {
+ return utils.NewErrNotConnected(utils.AttributeS)
+ }
+ if err = dS.attrS.Call(utils.AttributeSv1ProcessEvent, ev, reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(" error: %s AttributeS.", err.Error()))
+ }
+ return
+}
+
+func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent,
+ reply *engine.AttrSProcessEventReply) (err error) {
+ if dS.attrS == nil {
+ return utils.NewErrNotConnected(utils.AttributeS)
+ }
+ if err = dS.attrS.Call(utils.AttributeSv1ProcessEvent, ev, reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(" error: %s AttributeS.", err.Error()))
+ }
+ return
+}
diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go
index 63ad3cee1..901127598 100755
--- a/dispatcher/dispatcher.go
+++ b/dispatcher/dispatcher.go
@@ -86,156 +86,3 @@ func (dS *DispatcherService) Shutdown() error {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherS))
return nil
}
-
-func (dS *DispatcherService) ThresholdSv1Ping(ign string, reply *string) error {
- if dS.thdS != nil {
- if err := dS.thdS.Call(utils.ThresholdSv1Ping, ign, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s ThresholdS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) ThresholdSv1GetThresholdIDs(tenant string, tIDs *[]string) error {
- if dS.thdS != nil {
- if err := dS.thdS.Call(utils.ThresholdSv1GetThresholdIDs, tenant, tIDs); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s ThresholdS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) ThresholdSv1GetThreshold(tntID *utils.TenantID, t *engine.Threshold) error {
- if dS.thdS != nil {
- if err := dS.thdS.Call(utils.ThresholdSv1GetThreshold, tntID, t); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s ThresholdS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) ThresholdSv1ProcessEvent(args *engine.ArgsProcessEvent, tIDs *[]string) error {
- if dS.thdS != nil {
- if err := dS.thdS.Call(utils.ThresholdSv1ProcessEvent, args, tIDs); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s ThresholdS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) StatSv1Ping(ign string, reply *string) error {
- if dS.statS != nil {
- if err := dS.statS.Call(utils.StatSv1Ping, ign, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s StatS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) StatSv1GetStatQueuesForEvent(ev *utils.CGREvent, reply *[]string) error {
- if dS.statS != nil {
- if err := dS.statS.Call(utils.StatSv1GetStatQueuesForEvent, ev, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s StatS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) StatSv1GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) error {
- if dS.statS != nil {
- if err := dS.statS.Call(utils.StatSv1GetQueueStringMetrics, args, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s StatS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) StatSv1ProcessEvent(ev *utils.CGREvent, reply *[]string) error {
- if dS.statS != nil {
- if err := dS.statS.Call(utils.StatSv1ProcessEvent, ev, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s StatS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) ResourceSv1Ping(ign string, reply *string) error {
- if dS.resS != nil {
- if err := dS.resS.Call(utils.ResourceSv1Ping, ign, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s ResourceS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) ResourceSv1GetResourcesForEvent(args utils.ArgRSv1ResourceUsage, reply *engine.Resources) error {
- if dS.resS != nil {
- if err := dS.resS.Call(utils.ResourceSv1GetResourcesForEvent, args, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s ResourceS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) SupplierSv1Ping(ign string, reply *string) error {
- if dS.splS != nil {
- if err := dS.splS.Call(utils.SupplierSv1Ping, ign, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s SupplierS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) SupplierSv1GetSuppliers(args *engine.ArgsGetSuppliers,
- reply *engine.SortedSuppliers) error {
- if dS.splS != nil {
- if err := dS.splS.Call(utils.SupplierSv1GetSuppliers, args, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s SupplierS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) AttributeSv1Ping(ign string, reply *string) error {
- if dS.attrS != nil {
- if err := dS.attrS.Call(utils.AttributeSv1Ping, ign, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s AttributeS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(ev *utils.CGREvent,
- reply *engine.AttributeProfile) error {
- if dS.attrS != nil {
- if err := dS.attrS.Call(utils.AttributeSv1GetAttributeForEvent, ev, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s AttributeS.", err.Error()))
- }
- }
- return nil
-}
-
-func (dS *DispatcherService) AttributeSv1ProcessEvent(ev *utils.CGREvent,
- reply *engine.AttrSProcessEventReply) error {
- if dS.attrS != nil {
- if err := dS.attrS.Call(utils.AttributeSv1ProcessEvent, ev, reply); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf(" error: %s AttributeS.", err.Error()))
- }
- }
- return nil
-}
diff --git a/dispatcher/resources.go b/dispatcher/resources.go
new file mode 100755
index 000000000..4576267f3
--- /dev/null
+++ b/dispatcher/resources.go
@@ -0,0 +1,58 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package dispatcher
+
+import (
+ "time"
+
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func (dS *DispatcherService) ResourceSv1Ping(ign string, rpl *string) (err error) {
+ if dS.resS == nil {
+ return utils.NewErrNotConnected(utils.ResourceS)
+ }
+ return dS.resS.Call(utils.ResourceSv1Ping, ign, rpl)
+}
+
+func (dS *DispatcherService) ResourceSv1GetResourcesForEvent(args ArgsV1ResUsageWithApiKey, reply *engine.Resources) (err error) {
+ if dS.resS == nil {
+ return utils.NewErrNotConnected(utils.ResourceS)
+ }
+ ev := &utils.CGREvent{
+ Tenant: args.Tenant,
+ ID: utils.UUIDSha1Prefix(),
+ Context: utils.StringPointer(utils.MetaAuth),
+ Time: utils.TimePointer(time.Now()),
+ Event: map[string]interface{}{
+ utils.APIKey: args.APIKey,
+ },
+ }
+ var rplyEv engine.AttrSProcessEventReply
+ if err = dS.authorizeEvent(ev, &rplyEv); err != nil {
+ return
+ }
+ mp := utils.ParseStringMap(rplyEv.CGREvent.Event[utils.APIMethods].(string))
+ if !mp.HasKey(utils.ResourceSv1GetResourcesForEvent) {
+ return utils.ErrUnauthorizedApi
+ }
+ return dS.resS.Call(utils.ResourceSv1GetResourcesForEvent, args.ArgRSv1ResourceUsage, reply)
+
+}
diff --git a/dispatcher/stats.go b/dispatcher/stats.go
new file mode 100755
index 000000000..2d67ade86
--- /dev/null
+++ b/dispatcher/stats.go
@@ -0,0 +1,108 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package dispatcher
+
+import (
+ "fmt"
+
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func (dS *DispatcherService) StatSv1Ping(ign string, reply *string) error {
+ if dS.statS != nil {
+ if err := dS.statS.Call(utils.StatSv1Ping, ign, reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(" error: %s StatS.", err.Error()))
+ }
+ }
+ return nil
+}
+
+func (dS *DispatcherService) StatSv1GetStatQueuesForEvent(args *CGREvWithApiKey,
+ reply *[]string) (err error) {
+ if dS.statS == nil {
+ return utils.NewErrNotConnected(utils.StatS)
+ }
+ ev := &utils.CGREvent{
+ Tenant: args.Tenant,
+ ID: utils.UUIDSha1Prefix(),
+ Context: utils.StringPointer(utils.MetaAuth),
+ Event: map[string]interface{}{
+ utils.APIKey: args.APIKey,
+ },
+ }
+ var rplyEv engine.AttrSProcessEventReply
+ if err = dS.authorizeEvent(ev, &rplyEv); err != nil {
+ return
+ }
+ mp := utils.ParseStringMap(rplyEv.CGREvent.Event[utils.APIMethods].(string))
+ if !mp.HasKey(utils.StatSv1GetStatQueuesForEvent) {
+ return utils.ErrUnauthorizedApi
+ }
+ return dS.statS.Call(utils.StatSv1GetStatQueuesForEvent, args.CGREvent, reply)
+}
+
+func (dS *DispatcherService) StatSv1GetQueueStringMetrics(args *TntIDWithApiKey,
+ reply *map[string]string) (err error) {
+ if dS.statS == nil {
+ return utils.NewErrNotConnected(utils.StatS)
+ }
+ ev := &utils.CGREvent{
+ Tenant: args.Tenant,
+ ID: utils.UUIDSha1Prefix(),
+ Context: utils.StringPointer(utils.MetaAuth),
+ Event: map[string]interface{}{
+ utils.APIKey: args.APIKey,
+ },
+ }
+ var rplyEv engine.AttrSProcessEventReply
+ if err = dS.authorizeEvent(ev, &rplyEv); err != nil {
+ return
+ }
+ mp := utils.ParseStringMap(rplyEv.CGREvent.Event[utils.APIMethods].(string))
+ if !mp.HasKey(utils.StatSv1GetQueueStringMetrics) {
+ return utils.ErrUnauthorizedApi
+ }
+ return dS.statS.Call(utils.StatSv1GetQueueStringMetrics, args.TenantID, reply)
+}
+
+func (dS *DispatcherService) StatSv1ProcessEvent(args *CGREvWithApiKey,
+ reply *[]string) (err error) {
+ if dS.statS == nil {
+ return utils.NewErrNotConnected(utils.StatS)
+ }
+ ev := &utils.CGREvent{
+ Tenant: args.Tenant,
+ ID: utils.UUIDSha1Prefix(),
+ Context: utils.StringPointer(utils.MetaAuth),
+ Event: map[string]interface{}{
+ utils.APIKey: args.APIKey,
+ },
+ }
+ var rplyEv engine.AttrSProcessEventReply
+ if err = dS.authorizeEvent(ev, &rplyEv); err != nil {
+ return
+ }
+ mp := utils.ParseStringMap(rplyEv.CGREvent.Event[utils.APIMethods].(string))
+ if !mp.HasKey(utils.StatSv1ProcessEvent) {
+ return utils.ErrUnauthorizedApi
+ }
+ return dS.statS.Call(utils.StatSv1ProcessEvent, args.CGREvent, reply)
+}
diff --git a/dispatcher/suppliers.go b/dispatcher/suppliers.go
new file mode 100755
index 000000000..a67d21105
--- /dev/null
+++ b/dispatcher/suppliers.go
@@ -0,0 +1,47 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package dispatcher
+
+import (
+ "fmt"
+
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func (dS *DispatcherService) SupplierSv1Ping(ign string, reply *string) error {
+ if dS.splS != nil {
+ if err := dS.splS.Call(utils.SupplierSv1Ping, ign, reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(" error: %s SupplierS.", err.Error()))
+ }
+ }
+ return nil
+}
+
+func (dS *DispatcherService) SupplierSv1GetSuppliers(args *engine.ArgsGetSuppliers,
+ reply *engine.SortedSuppliers) error {
+ if dS.splS != nil {
+ if err := dS.splS.Call(utils.SupplierSv1GetSuppliers, args, reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(" error: %s SupplierS.", err.Error()))
+ }
+ }
+ return nil
+}
diff --git a/dispatcher/thresholds.go b/dispatcher/thresholds.go
new file mode 100755
index 000000000..4899cae5c
--- /dev/null
+++ b/dispatcher/thresholds.go
@@ -0,0 +1,115 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package dispatcher
+
+import (
+ "fmt"
+
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func (dS *DispatcherService) ThresholdSv1Ping(ign string, reply *string) error {
+ if dS.thdS != nil {
+ if err := dS.thdS.Call(utils.ThresholdSv1Ping, ign, reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(" error: %s ThresholdS.", err.Error()))
+ }
+ }
+ return nil
+}
+
+/* To be implemented (+console)
+func (dS *DispatcherService) ThresholdSv1GetThresholdIDs(tenant string, tIDs *[]string) (err error) {
+ if dS.thdS == nil {
+ return utils.NewErrNotConnected(utils.ThresholdS)
+ }
+ ev := &utils.CGREvent{
+ Tenant: args.Tenant,
+ ID: utils.UUIDSha1Prefix(),
+ Context: utils.StringPointer(utils.MetaAuth),
+ Time: args.ArgRSv1ResourceUsage.Time,
+ Event: map[string]interface{}{
+ utils.APIKey: args.APIKey,
+ },
+ }
+ var rplyEv engine.AttrSProcessEventReply
+ if err = dS.authorizeEvent(ev, &rplyEv); err != nil {
+ return
+ }
+ mp := utils.ParseStringMap(rplyEv.CGREvent.Event[utils.APIMethods])
+ if !mp.HasKey(utils.ResourceSv1GetResourcesForEvent) {
+ return utils.ErrUnauthorizedApi
+ }
+ if err = dS.thdS.Call(utils.ThresholdSv1GetThresholdIDs, tenant, tIDs); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(" error: %s ThresholdS.", err.Error()))
+ }
+ return
+}
+*/
+
+func (dS *DispatcherService) ThresholdSv1GetThresholdForEvent(args *ArgsProcessEventWithApiKey,
+ t *engine.Threshold) (err error) {
+ if dS.thdS == nil {
+ return utils.NewErrNotConnected(utils.ThresholdS)
+ }
+ ev := &utils.CGREvent{
+ Tenant: args.Tenant,
+ ID: utils.UUIDSha1Prefix(),
+ Context: utils.StringPointer(utils.MetaAuth),
+ Event: map[string]interface{}{
+ utils.APIKey: args.APIKey,
+ },
+ }
+ var rplyEv engine.AttrSProcessEventReply
+ if err = dS.authorizeEvent(ev, &rplyEv); err != nil {
+ return
+ }
+ mp := utils.ParseStringMap(rplyEv.CGREvent.Event[utils.APIMethods].(string))
+ if !mp.HasKey(utils.ThresholdSv1GetThresholdsForEvent) {
+ return utils.ErrUnauthorizedApi
+ }
+ return dS.thdS.Call(utils.ThresholdSv1GetThresholdsForEvent, args.TenantID, t)
+}
+
+func (dS *DispatcherService) ThresholdSv1ProcessEvent(args *ArgsProcessEventWithApiKey,
+ tIDs *[]string) (err error) {
+ if dS.thdS == nil {
+ return utils.NewErrNotConnected(utils.ThresholdS)
+ }
+ ev := &utils.CGREvent{
+ Tenant: args.Tenant,
+ ID: utils.UUIDSha1Prefix(),
+ Context: utils.StringPointer(utils.MetaAuth),
+ Event: map[string]interface{}{
+ utils.APIKey: args.APIKey,
+ },
+ }
+ var rplyEv engine.AttrSProcessEventReply
+ if err = dS.authorizeEvent(ev, &rplyEv); err != nil {
+ return
+ }
+ mp := utils.ParseStringMap(rplyEv.CGREvent.Event[utils.APIMethods].(string))
+ if !mp.HasKey(utils.ThresholdSv1ProcessEvent) {
+ return utils.ErrUnauthorizedApi
+ }
+ return dS.thdS.Call(utils.ThresholdSv1ProcessEvent, args.ArgsProcessEvent, tIDs)
+
+}
diff --git a/dispatcher/utils.go b/dispatcher/utils.go
new file mode 100755
index 000000000..d28604cbc
--- /dev/null
+++ b/dispatcher/utils.go
@@ -0,0 +1,49 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package dispatcher
+
+import (
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+type CGREvWithApiKey struct {
+ APIKey string
+ utils.CGREvent
+}
+
+type TntIDWithApiKey struct {
+ utils.TenantID
+ APIKey string
+}
+
+type TntWithApiKey struct {
+ Tenant string
+ ApiKey string
+}
+
+type ArgsV1ResUsageWithApiKey struct {
+ APIKey string
+ utils.ArgRSv1ResourceUsage
+}
+
+type ArgsProcessEventWithApiKey struct {
+ APIKey string
+ engine.ArgsProcessEvent
+}
diff --git a/utils/consts.go b/utils/consts.go
index 19ca6392e..fc0d4f5b8 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -541,6 +541,7 @@ const (
MetaDivide = "*divide"
MetaUrl = "*url"
MetaXml = "*xml"
+ ApiKey = "apikey"
)
// Migrator Action
@@ -625,7 +626,7 @@ const (
CapStatQueues = "StatQueues"
)
-// DispatcherStrategy
+// Dispatcher Const
const (
MetaRandom = "*random"
MetaBalancer = "*balancer"
@@ -636,6 +637,9 @@ const (
ResourceSv1 = "ResourceSv1"
SupplierSv1 = "SupplierSv1"
AttributeSv1 = "AttributeSv1"
+ MetaAuth = "*auth"
+ APIKey = "APIKey"
+ APIMethods = "APIMethods"
)
// MetaFilterIndexesAPIs
diff --git a/utils/coreutils.go b/utils/coreutils.go
index e7874e462..865a1ec99 100644
--- a/utils/coreutils.go
+++ b/utils/coreutils.go
@@ -838,3 +838,8 @@ func APIerRPCCall(inst interface{}, serviceMethod string, args interface{}, repl
}
return err
}
+
+type AuthStruct struct {
+ Tenant string
+ ApiKey string
+}
diff --git a/utils/errors.go b/utils/errors.go
index 9d8a4eadb..345a4c039 100644
--- a/utils/errors.go
+++ b/utils/errors.go
@@ -53,6 +53,7 @@ var (
ErrFilterNotPassingNoCaps = errors.New("filter not passing")
ErrNotConvertibleNoCaps = errors.New("not convertible")
ErrMandatoryIeMissingNoCaps = errors.New("mandatory information missing")
+ ErrUnauthorizedApi = errors.New("UNAUTHORIZED_API")
)
// NewCGRError initialises a new CGRError
diff --git a/utils/map.go b/utils/map.go
index d667a97ab..e3fd801af 100644
--- a/utils/map.go
+++ b/utils/map.go
@@ -176,6 +176,11 @@ func (sm StringMap) Join(mps ...StringMap) {
}
}
+func (sm StringMap) HasKey(key string) (has bool) {
+ _, has = sm[key]
+ return
+}
+
/*
func NoDots(m map[string]struct{}) map[string]struct{} {
return MapKeysReplace(m, ".", ".")
diff --git a/utils/map_test.go b/utils/map_test.go
index ccd576451..e65c36186 100644
--- a/utils/map_test.go
+++ b/utils/map_test.go
@@ -103,3 +103,14 @@ func TestMapStringToInt64(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", t1, t3)
}
}
+
+func TestMapHasKey(t *testing.T) {
+ mp := ParseStringMap("Item1;Item2;Item3")
+ if mp.HasKey("Item1") != true {
+ t.Errorf("Expecting: true, received: %+v", mp.HasKey("Item1"))
+ }
+ if mp.HasKey("Item4") != false {
+ t.Errorf("Expecting: true, received: %+v", mp.HasKey("Item4"))
+ }
+
+}