SessionSv1 argumets ProcessStatQueues->ProcessStats, boolPointer->bool, NavigableMap.Add->NavigableMap.Set, flags based methods in HTTPAgent

This commit is contained in:
DanB
2018-06-19 16:38:10 +02:00
parent ee7db952c1
commit 28411f4825
11 changed files with 256 additions and 187 deletions

View File

@@ -408,30 +408,16 @@ func (fsev FSEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) {
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.GetMaxUsage = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AuthorizeResources = true
}
if strings.Index(subsystems, utils.MetaSuppliers) != -1 {
args.GetSuppliers = true
if strings.Index(subsystems, utils.MetaSuppliersEventCost) != -1 {
args.SuppliersMaxCost = utils.MetaEventCost
}
if strings.Index(subsystems, utils.MetaSuppliersIgnoreErrors) != -1 {
args.SuppliersIgnoreErrors = true
}
}
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
args.GetMaxUsage = strings.Index(subsystems, utils.MetaAccounts) != -1
args.AuthorizeResources = strings.Index(subsystems, utils.MetaResources) != -1
args.GetSuppliers = strings.Index(subsystems, utils.MetaSuppliers) != -1
args.SuppliersIgnoreErrors = strings.Index(subsystems, utils.MetaSuppliersIgnoreErrors) != -1
if strings.Index(subsystems, utils.MetaSuppliersEventCost) != -1 {
args.SuppliersMaxCost = utils.MetaEventCost
}
args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
return
}
@@ -449,21 +435,11 @@ func (fsev FSEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) {
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.InitSession = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AllocateResources = true
}
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
}
args.InitSession = strings.Index(subsystems, utils.MetaAccounts) != -1
args.AllocateResources = strings.Index(subsystems, utils.MetaResources) != -1
args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
return
}
@@ -481,18 +457,10 @@ func (fsev FSEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.TerminateSession = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.ReleaseResources = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
}
args.TerminateSession = strings.Index(subsystems, utils.MetaAccounts) != -1
args.ReleaseResources = strings.Index(subsystems, utils.MetaResources) != -1
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
return
}

View File

@@ -19,12 +19,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package agents
import (
"errors"
"fmt"
"net/http"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -117,23 +119,111 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.HttpAgntProcCfg,
Time: utils.TimePointer(time.Now()),
Event: reqEv.AsMapStringInterface(),
}
if reqProcessor.DryRun {
var reqType string
for _, typ := range []string{
utils.MetaDryRun, utils.MetaAuth,
utils.MetaInitiate, utils.MetaUpdate,
utils.MetaTerminate, utils.MetaEvent} {
if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags
reqType = typ
}
}
switch reqType {
default:
return false, errors.New("unknown request type")
case utils.MetaDryRun:
utils.Logger.Info(
fmt.Sprintf("<%s> DRY_RUN, processorID: %s, HTTP request: %s",
utils.HTTPAgent, reqProcessor.Id, utils.ToJSON(agReq.Request)))
utils.Logger.Info(
fmt.Sprintf("<%s> DRY_RUN, processorID: %s, CGREvent: %s",
utils.HTTPAgent, reqProcessor.Id, agReq))
utils.HTTPAgent, reqProcessor.Id, utils.ToJSON(cgrEv)))
case utils.MetaAuth:
authArgs := sessions.NewV1AuthorizeArgs(
reqProcessor.Flags.HasKey(utils.MetaAttributes),
reqProcessor.Flags.HasKey(utils.MetaResources),
reqProcessor.Flags.HasKey(utils.MetaAccounts),
reqProcessor.Flags.HasKey(utils.MetaThresholds),
reqProcessor.Flags.HasKey(utils.MetaStats),
reqProcessor.Flags.HasKey(utils.MetaSuppliers),
reqProcessor.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
reqProcessor.Flags.HasKey(utils.MetaSuppliersEventCost),
*cgrEv)
var authReply sessions.V1AuthorizeReply
err = ha.sessionS.Call(utils.SessionSv1AuthorizeEvent,
authArgs, &authReply)
if agReq.CGRReply, err = NewCGRReply(&authReply, err); err != nil {
return
}
case utils.MetaInitiate:
initArgs := sessions.NewV1InitSessionArgs(
reqProcessor.Flags.HasKey(utils.MetaAttributes),
reqProcessor.Flags.HasKey(utils.MetaResources),
reqProcessor.Flags.HasKey(utils.MetaAccounts),
reqProcessor.Flags.HasKey(utils.MetaThresholds),
reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv)
var initReply sessions.V1InitSessionReply
err = ha.sessionS.Call(utils.SessionSv1InitiateSession,
initArgs, &initReply)
if agReq.CGRReply, err = NewCGRReply(&initReply, err); err != nil {
return
}
case utils.MetaUpdate:
updateArgs := sessions.NewV1UpdateSessionArgs(
reqProcessor.Flags.HasKey(utils.MetaAttributes),
reqProcessor.Flags.HasKey(utils.MetaAccounts), *cgrEv)
var updateReply sessions.V1UpdateSessionReply
err = ha.sessionS.Call(utils.SessionSv1UpdateSession,
updateArgs, &updateReply)
if agReq.CGRReply, err = NewCGRReply(&updateReply, err); err != nil {
return
}
case utils.MetaTerminate:
terminateArgs := sessions.NewV1TerminateSessionArgs(
reqProcessor.Flags.HasKey(utils.MetaAccounts),
reqProcessor.Flags.HasKey(utils.MetaResources),
reqProcessor.Flags.HasKey(utils.MetaThresholds),
reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv)
var tRply string
err = ha.sessionS.Call(utils.SessionSv1TerminateSession,
terminateArgs, &tRply)
if agReq.CGRReply, err = NewCGRReply(nil, err); err != nil {
return
}
case utils.MetaEvent:
evArgs := sessions.NewV1ProcessEventArgs(
reqProcessor.Flags.HasKey(utils.MetaResources),
reqProcessor.Flags.HasKey(utils.MetaAccounts),
reqProcessor.Flags.HasKey(utils.MetaAttributes), *cgrEv)
var eventRply sessions.V1ProcessEventReply
err = ha.sessionS.Call(utils.SessionSv1ProcessEvent,
evArgs, &eventRply)
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
cgrEv.Event[utils.Usage] = 0 // avoid further debits
} else if eventRply.MaxUsage != nil {
cgrEv.Event[utils.Usage] = *eventRply.MaxUsage // make sure the CDR reflects the debit
}
if agReq.CGRReply, err = NewCGRReply(&eventRply, err); err != nil {
return
}
}
fmt.Printf("cgrEv: %+v", cgrEv)
/*
ev, err := radReqAsCGREvent(req, procVars, reqProcessor.Flags, reqProcessor.RequestFields)
if err != nil {
return false, err
if reqProcessor.Flags.HasKey(utils.MetaCDRs) &&
utils.IsSliceMember([]string{utils.MetaTerminate, utils.MetaEvent}, reqType) {
var rplyCDRs string
if err = ha.sessionS.Call(utils.SessionSv1ProcessCDR,
*cgrEv, &rplyCDRs); err != nil {
agReq.CGRReply.Set([]string{utils.Error}, err.Error(), false)
}
if reqProcessor.DryRun {
utils.Logger.Info(fmt.Sprintf("<%s> DRY_RUN, CGREvent: %s", utils.RadiusAgent, utils.ToJSON(cgrEv)))
}
*/
}
if nM, err := agReq.AsNavigableMap(reqProcessor.ReplyFields); err != nil {
return false, err
} else {
agReq.Reply.Merge(nM)
}
if reqType == utils.MetaDryRun {
utils.Logger.Info(
fmt.Sprintf("<%s> DRY_RUN, HTTP reply: %s",
utils.HTTPAgent, agReq.Reply))
}
return
}

View File

@@ -211,30 +211,16 @@ func (kev KamEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) {
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.GetMaxUsage = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AuthorizeResources = true
}
if strings.Index(subsystems, utils.MetaSuppliers) != -1 {
args.GetSuppliers = true
if strings.Index(subsystems, utils.MetaSuppliersEventCost) != -1 {
args.SuppliersMaxCost = utils.MetaEventCost
}
if strings.Index(subsystems, utils.MetaSuppliersIgnoreErrors) != -1 {
args.SuppliersIgnoreErrors = true
}
}
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
args.GetMaxUsage = strings.Index(subsystems, utils.MetaAccounts) != -1
args.AuthorizeResources = strings.Index(subsystems, utils.MetaResources) != -1
args.GetSuppliers = strings.Index(subsystems, utils.MetaSuppliers) != -1
args.SuppliersIgnoreErrors = strings.Index(subsystems, utils.MetaSuppliersIgnoreErrors) != -1
if strings.Index(subsystems, utils.MetaSuppliersEventCost) != -1 {
args.SuppliersMaxCost = utils.MetaEventCost
}
args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
return
}
@@ -270,10 +256,10 @@ func (kev KamEvent) AsKamAuthReply(authArgs *sessions.V1AuthorizeArgs,
kar.Suppliers = authReply.Suppliers.Digest()
}
if authArgs.ProcessThresholds != nil && *authArgs.ProcessThresholds {
if authArgs.ProcessThresholds {
kar.Thresholds = strings.Join(*authReply.ThresholdIDs, utils.FIELDS_SEP)
}
if authArgs.ProcessStatQueues != nil && *authArgs.ProcessStatQueues {
if authArgs.ProcessStats {
kar.StatQueues = strings.Join(*authReply.StatQueueIDs, utils.FIELDS_SEP)
}
return
@@ -293,21 +279,11 @@ func (kev KamEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) {
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.InitSession = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AllocateResources = true
}
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
}
args.InitSession = strings.Index(subsystems, utils.MetaAccounts) != -1
args.AllocateResources = strings.Index(subsystems, utils.MetaResources) != -1
args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
return
}
@@ -325,18 +301,10 @@ func (kev KamEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.TerminateSession = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.ReleaseResources = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
}
args.TerminateSession = strings.Index(subsystems, utils.MetaAccounts) != -1
args.ReleaseResources = strings.Index(subsystems, utils.MetaResources) != -1
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
return
}

View File

@@ -402,16 +402,15 @@ func radReplyAppendAttributes(reply *radigo.Packet, procVars map[string]interfac
// NewCGRReply is specific to replies coming from CGRateS
func NewCGRReply(rply engine.NavigableMapper,
errRply error) (mp map[string]interface{}, err error) {
errRply error) (mp *engine.NavigableMap, err error) {
if errRply != nil {
return map[string]interface{}{
utils.Error: errRply.Error()}, nil
return engine.NewNavigableMap(map[string]interface{}{
utils.Error: errRply.Error()}), nil
}
nM, err := rply.AsNavigableMap(nil)
mp, err = rply.AsNavigableMap(nil)
if err != nil {
return nil, err
}
mp = nM.AsMapStringInterface()
mp[utils.Error] = "" // enforce empty error
mp.Set([]string{utils.Error}, "", false) // enforce empty error
return mp, nil
}

View File

@@ -433,9 +433,9 @@ func (ev myEv) AsNavigableMap(tpl []*config.CfgCdrField) (*engine.NavigableMap,
}
func TestNewCGRReply(t *testing.T) {
eCgrRply := map[string]interface{}{
eCgrRply := engine.NewNavigableMap(map[string]interface{}{
utils.Error: "some",
}
})
if rpl, err := NewCGRReply(nil, errors.New("some")); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCgrRply, rpl) {
@@ -449,8 +449,8 @@ func TestNewCGRReply(t *testing.T) {
},
},
}
eCgrRply = ev
eCgrRply[utils.Error] = ""
eCgrRply = engine.NewNavigableMap(ev)
eCgrRply.Set([]string{utils.Error}, "", false)
if rpl, err := NewCGRReply(engine.NavigableMapper(ev), nil); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCgrRply, rpl) {

View File

@@ -85,7 +85,6 @@ func (ca *HttpAgentCfg) loadFromJsonCfg(jsnCfg *HttpAgentJsonCfg) (err error) {
type HttpAgntProcCfg struct {
Id string
DryRun bool
Filters []string
Flags utils.StringMap
ContinueOnSuccess bool
@@ -100,9 +99,6 @@ func (ha *HttpAgntProcCfg) loadFromJsonCfg(jsnCfg *HttpAgentProcessorJsnCfg) (er
if jsnCfg.Id != nil {
ha.Id = *jsnCfg.Id
}
if jsnCfg.Dry_run != nil {
ha.DryRun = *jsnCfg.Dry_run
}
if jsnCfg.Filters != nil {
ha.Filters = make([]string, len(*jsnCfg.Filters))
for i, fltr := range *jsnCfg.Filters {

View File

@@ -386,7 +386,6 @@ type HttpAgentJsonCfg struct {
type HttpAgentProcessorJsnCfg struct {
Id *string
Dry_run *bool
Filters *[]string
Flags *[]string
Continue_on_success *bool

View File

@@ -48,20 +48,21 @@ type NavigableMap struct {
}
// Add will add items into NavigableMap populating also order
func (nM *NavigableMap) Add(path []string, data interface{}) {
nM.order = append(nM.order)
func (nM *NavigableMap) Set(path []string, data interface{}, ordered bool) {
mp := nM.data
for i, spath := range path {
_, has := mp[spath]
if !has {
if i == len(path)-1 { // last path
mp[spath] = data
return
}
if i == len(path)-1 { // last path
mp[spath] = data
return
}
if _, has := mp[spath]; !has {
mp[spath] = make(map[string]interface{})
}
mp = mp[spath].(map[string]interface{}) // so we can check further down
}
if ordered {
nM.order = append(nM.order)
}
}
// FieldAsInterface returns the field value as interface{} for the path specified
@@ -162,3 +163,15 @@ func (nM *NavigableMap) Items() (itms []*NMItem) {
func (nM *NavigableMap) AsNavigableMap(tpl []*config.CfgCdrField) (oNM *NavigableMap, err error) {
return nil, utils.ErrNotImplemented
}
func (nM *NavigableMap) Merge(nM2 *NavigableMap) {
if nM2 == nil {
return
}
for k, v := range nM2.data {
nM.data[k] = v
}
if len(nM2.order) != 0 {
nM.order = append(nM.order, nM2.order...)
}
}

View File

@@ -148,16 +148,16 @@ func TestNavMapAdd(t *testing.T) {
nM := NewNavigableMap(nil)
path := []string{"FistLever2", "SecondLevel2", "Field2"}
data := "Value2"
nM.Add(path, data)
nM.Set(path, data, true)
path = []string{"FirstLevel", "SecondLevel", "ThirdLevel", "Fld1"}
data = "Val1"
nM.Add(path, data)
nM.Set(path, data, true)
path = []string{"FistLever2", "Field3"}
data = "Value3"
nM.Add(path, data)
nM.Set(path, data, true)
path = []string{"Field4"}
data = "Val4"
nM.Add(path, data)
nM.Set(path, data, true)
eNavMap := NavigableMap{
data: map[string]interface{}{
"FirstLevel": map[string]interface{}{
@@ -190,19 +190,19 @@ func TestNavMapAdd2(t *testing.T) {
nM := NewNavigableMap(nil)
path := []string{"FistLever2", "SecondLevel2", "Field2"}
data := 123
nM.Add(path, data)
nM.Set(path, data, true)
path = []string{"FirstLevel", "SecondLevel", "ThirdLevel", "Fld1"}
data1 := 123.123
nM.Add(path, data1)
nM.Set(path, data1, true)
path = []string{"FistLever2", "Field3"}
data2 := "Value3"
nM.Add(path, data2)
nM.Set(path, data2, true)
path = []string{"Field4"}
data3 := &testStruct{
Item1: "Ten",
Item2: 10,
}
nM.Add(path, data3)
nM.Set(path, data3, true)
eNavMap := NavigableMap{
data: map[string]interface{}{
"FirstLevel": map[string]interface{}{

View File

@@ -1327,15 +1327,35 @@ func (smg *SMGeneric) BiRPCV1ReplicatePassiveSessions(clnt rpcclient.RpcClientCo
return
}
// NewV1AuthorizeArgs is a constructor for V1AuthorizeArgs
func NewV1AuthorizeArgs(attrs, res, maxUsage, thrslds,
statQueues, suppls, supplsIgnoreErrs, supplsEventCost bool,
cgrEv utils.CGREvent) (args *V1AuthorizeArgs) {
args = &V1AuthorizeArgs{
GetAttributes: attrs,
AuthorizeResources: res,
GetMaxUsage: maxUsage,
ProcessThresholds: thrslds,
ProcessStats: statQueues,
SuppliersIgnoreErrors: supplsIgnoreErrs,
GetSuppliers: suppls,
CGREvent: cgrEv,
}
if supplsEventCost {
args.SuppliersMaxCost = utils.MetaSuppliersEventCost
}
return
}
type V1AuthorizeArgs struct {
GetAttributes bool
AuthorizeResources bool
GetMaxUsage bool
ProcessThresholds bool
ProcessStats bool
GetSuppliers bool
SuppliersMaxCost string
SuppliersIgnoreErrors bool
ProcessThresholds *bool
ProcessStatQueues *bool
utils.CGREvent
utils.Paginator
}
@@ -1462,11 +1482,7 @@ func (smg *SMGeneric) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
authReply.Suppliers = &splsReply
}
}
checkThresholds := smg.thdS != nil
if args.ProcessThresholds != nil {
checkThresholds = *args.ProcessThresholds
}
if checkThresholds {
if smg.thdS != nil && args.ProcessThresholds {
if smg.thdS == nil {
return utils.NewErrNotConnected(utils.ThresholdS)
}
@@ -1481,11 +1497,7 @@ func (smg *SMGeneric) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
}
authReply.ThresholdIDs = &tIDs
}
checkStatQueues := smg.statS != nil
if args.ProcessStatQueues != nil {
checkStatQueues = *args.ProcessStatQueues
}
if checkStatQueues {
if smg.statS != nil && args.ProcessStats {
if smg.statS == nil {
return utils.NewErrNotConnected(utils.StatService)
}
@@ -1536,23 +1548,35 @@ func (smg *SMGeneric) BiRPCv1AuthorizeEventWithDigest(clnt rpcclient.RpcClientCo
if args.GetSuppliers {
authReply.SuppliersDigest = utils.StringPointer(initAuthRply.Suppliers.Digest())
}
if args.ProcessThresholds != nil && *args.ProcessThresholds {
if args.ProcessThresholds {
authReply.Thresholds = utils.StringPointer(
strings.Join(*initAuthRply.ThresholdIDs, utils.FIELDS_SEP))
}
if args.ProcessStatQueues != nil && *args.ProcessStatQueues {
if args.ProcessStats {
authReply.StatQueues = utils.StringPointer(
strings.Join(*initAuthRply.StatQueueIDs, utils.FIELDS_SEP))
}
return nil
}
func NewV1InitSessionArgs(attrs, resrc, acnt, thrslds, stats bool,
cgrEv utils.CGREvent) *V1InitSessionArgs {
return &V1InitSessionArgs{
GetAttributes: attrs,
AllocateResources: resrc,
InitSession: acnt,
ProcessThresholds: thrslds,
ProcessStats: stats,
CGREvent: cgrEv,
}
}
type V1InitSessionArgs struct {
GetAttributes bool
AllocateResources bool
InitSession bool
ProcessThresholds *bool
ProcessStatQueues *bool
ProcessThresholds bool
ProcessStats bool
utils.CGREvent
}
@@ -1650,11 +1674,7 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
rply.MaxUsage = &maxUsage
}
}
checkThresholds := smg.thdS != nil
if args.ProcessThresholds != nil {
checkThresholds = *args.ProcessThresholds
}
if checkThresholds {
if smg.thdS != nil && args.ProcessThresholds {
if smg.thdS == nil {
return utils.NewErrNotConnected(utils.ThresholdS)
}
@@ -1669,11 +1689,7 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
}
rply.ThresholdIDs = &tIDs
}
checkStatQueues := smg.statS != nil
if args.ProcessStatQueues != nil {
checkStatQueues = *args.ProcessStatQueues
}
if checkStatQueues {
if smg.statS != nil && args.ProcessStats {
if smg.statS == nil {
return utils.NewErrNotConnected(utils.StatService)
}
@@ -1722,17 +1738,23 @@ func (smg *SMGeneric) BiRPCv1InitiateSessionWithDigest(clnt rpcclient.RpcClientC
}
}
if args.ProcessThresholds != nil && *args.ProcessThresholds {
if args.ProcessThresholds {
initReply.Thresholds = utils.StringPointer(
strings.Join(*initSessionRply.ThresholdIDs, utils.FIELDS_SEP))
}
if args.ProcessStatQueues != nil && *args.ProcessStatQueues {
if args.ProcessStats {
initReply.StatQueues = utils.StringPointer(
strings.Join(*initSessionRply.StatQueueIDs, utils.FIELDS_SEP))
}
return nil
}
func NewV1UpdateSessionArgs(attrs, acnts bool,
cgrEv utils.CGREvent) *V1UpdateSessionArgs {
return &V1UpdateSessionArgs{GetAttributes: attrs,
UpdateSession: acnts, CGREvent: cgrEv}
}
type V1UpdateSessionArgs struct {
GetAttributes bool
UpdateSession bool
@@ -1804,11 +1826,21 @@ func (smg *SMGeneric) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection,
return
}
func NewV1TerminateSessionArgs(acnts, resrc, thrds, stats bool,
cgrEv utils.CGREvent) *V1TerminateSessionArgs {
return &V1TerminateSessionArgs{
TerminateSession: acnts,
ReleaseResources: resrc,
ProcessThresholds: thrds,
ProcessStats: stats,
CGREvent: cgrEv}
}
type V1TerminateSessionArgs struct {
TerminateSession bool
ReleaseResources bool
ProcessThresholds *bool
ProcessStatQueues *bool
ProcessThresholds bool
ProcessStats bool
utils.CGREvent
}
@@ -1851,11 +1883,7 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection
return utils.NewErrResourceS(err)
}
}
checkThresholds := smg.thdS != nil
if args.ProcessThresholds != nil {
checkThresholds = *args.ProcessThresholds
}
if checkThresholds {
if smg.thdS != nil && args.ProcessThresholds {
if smg.thdS == nil {
return utils.NewErrNotConnected(utils.ThresholdS)
}
@@ -1869,14 +1897,7 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection
fmt.Sprintf("<SessionS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
}
}
checkStatQueues := smg.statS != nil
if args.ProcessStatQueues != nil {
checkStatQueues = *args.ProcessStatQueues
}
if checkStatQueues {
if smg.statS == nil {
return utils.NewErrNotConnected(utils.StatService)
}
if smg.statS != nil && args.ProcessStats {
var statReply []string
if err := smg.statS.Call(utils.StatSv1ProcessEvent, &args.CGREvent, &statReply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
@@ -1898,6 +1919,16 @@ func (smg *SMGeneric) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
return nil
}
func NewV1ProcessEventArgs(resrc, acnts, attrs bool,
cgrEv utils.CGREvent) *V1ProcessEventArgs {
return &V1ProcessEventArgs{
AllocateResources: resrc,
Debit: acnts,
GetAttributes: attrs,
CGREvent: cgrEv,
}
}
type V1ProcessEventArgs struct {
AllocateResources bool
Debit bool

View File

@@ -547,6 +547,11 @@ const (
MetaVars = "*vars"
MetaReply = "*reply"
CGROriginHost = "cgr_originhost"
MetaInitiate = "*initiate"
MetaUpdate = "*update"
MetaTerminate = "*terminate"
MetaEvent = "*event"
MetaDryRun = "*dryrun"
)
// Migrator Action