Replacing sessions.SEvent with engine.SafEvent to secure the event over multiple goroutines

This commit is contained in:
DanB
2018-08-16 13:59:54 +02:00
parent befd4386d9
commit db954c8406
19 changed files with 810 additions and 888 deletions

View File

@@ -28,6 +28,7 @@ import (
"github.com/cgrates/aringo"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
@@ -289,7 +290,7 @@ func (sma *AsteriskAgent) ServiceShutdown() error {
// Internal method to disconnect session in asterisk
func (sma *AsteriskAgent) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) error {
channelID := sessions.SMGenericEvent(args.EventStart).GetOriginID(utils.META_DEFAULT)
channelID := engine.NewMapEvent(args.EventStart).GetStringIgnoreErrors(utils.OriginID)
if err := sma.hangupChannel(channelID); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s",

View File

@@ -98,7 +98,7 @@ func (da DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProce
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
procVars = make(processorVars)
}
smgEv, err := ccr.AsSMGenericEvent(reqProcessor.CCRFields)
smgEv, err := ccr.AsMapIface(reqProcessor.CCRFields)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v AsSMGenericEvent, error: %s", ccr.diamMessage, err))
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
@@ -116,7 +116,7 @@ func (da DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProce
}
}
if reqProcessor.PublishEvent && da.pubsubs != nil {
evt, err := smgEv.AsMapStringString()
evt, err := engine.NewMapEvent(smgEv).AsMapString(nil)
if err != nil {
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed),

View File

@@ -24,6 +24,7 @@ import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/fsock"
@@ -366,9 +367,10 @@ func (sm *FSsessions) Call(serviceMethod string, args interface{}, reply interfa
// Internal method to disconnect session in asterisk
func (fsa *FSsessions) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) (err error) {
fsEv := sessions.SMGenericEvent(args.EventStart)
channelID := fsEv.GetOriginID(utils.META_DEFAULT)
if err = fsa.disconnectSession(fsEv[FsConnID].(string), channelID, fsEv.GetCallDestNr(utils.META_DEFAULT),
ev := engine.NewMapEvent(args.EventStart)
channelID := ev.GetStringIgnoreErrors(utils.OriginID)
if err = fsa.disconnectSession(ev.GetStringIgnoreErrors(FsConnID), channelID,
utils.FirstNonEmpty(ev.GetStringIgnoreErrors(CALL_DEST_NR), ev.GetStringIgnoreErrors(SIP_REQ_USER)),
utils.ErrInsufficientCredit.Error()); err != nil {
return
}

View File

@@ -388,7 +388,7 @@ func (fsev FSEvent) AsMapStringInterface(timezone string) map[string]interface{}
mp[utils.Usage], _ = fsev.GetDuration(utils.META_DEFAULT)
mp[utils.PDD], _ = fsev.GetPdd(utils.META_DEFAULT)
mp[utils.ACD], _ = fsev.GetADC(utils.META_DEFAULT)
mp[utils.COST] = -1
mp[utils.COST] = -1.0
mp[utils.SUPPLIER] = fsev.GetSupplier(utils.META_DEFAULT)
mp[utils.DISCONNECT_CAUSE] = fsev.GetDisconnectCause(utils.META_DEFAULT)
return mp

View File

@@ -604,13 +604,13 @@ func TestFsEvAsMapStringInterface(t *testing.T) {
expectedMap[utils.Tenant] = "cgrates.org"
expectedMap[utils.Account] = "1001"
expectedMap[utils.Subject] = "1001"
expectedMap[utils.Cost] = -1
expectedMap[utils.Cost] = -1.0
expectedMap[utils.PDD] = time.Duration(28) * time.Millisecond
expectedMap[utils.ACD] = time.Duration(30) * time.Second
expectedMap[utils.DISCONNECT_CAUSE] = "NORMAL_CLEARING"
expectedMap[utils.SUPPLIER] = "supplier1"
if storedMap := ev.AsMapStringInterface(""); !reflect.DeepEqual(expectedMap, storedMap) {
t.Errorf("Expecting: %+v, received: %+v", expectedMap, storedMap)
t.Errorf("Expecting: %s, received: %s", utils.ToJSON(expectedMap), utils.ToJSON(storedMap))
}
}
@@ -966,8 +966,12 @@ variable_rtp_audio_rtcp_octet_count: 0`
timezone := config.CgrConfig().DefaultTimezone
fsCdrCfg, _ = config.NewDefaultCGRConfig()
fsCdr, _ := engine.NewFSCdr(body, fsCdrCfg)
smGev := sessions.SMGenericEvent(NewFSEvent(hangUp).AsMapStringInterface(timezone))
smCDR := smGev.AsCDR(fsCdrCfg, timezone)
smGev := engine.NewSafEvent(NewFSEvent(hangUp).AsMapStringInterface(timezone))
sessions.GetSetCGRID(smGev)
smCDR, err := smGev.AsCDR(fsCdrCfg, timezone)
if err != nil {
t.Error(err)
}
fsCDR := fsCdr.AsCDR(timezone)
if fsCDR.CGRID != smCDR.CGRID {
t.Errorf("Expecting: %s, received: %s", fsCDR.CGRID, smCDR.CGRID)

View File

@@ -793,7 +793,7 @@ func (self *CCR) AsDiameterMessage() (*diam.Message, error) {
}
// Extracts data out of CCR into a SMGenericEvent based on the configured template
func (self *CCR) AsSMGenericEvent(cfgFlds []*config.CfgCdrField) (sessions.SMGenericEvent, error) {
func (self *CCR) AsMapIface(cfgFlds []*config.CfgCdrField) (map[string]interface{}, error) {
outMap := make(map[string]string) // work with it so we can append values to keys
outMap[utils.EVENT_NAME] = DIAMETER_CCR
for _, cfgFld := range cfgFlds {
@@ -814,7 +814,7 @@ func (self *CCR) AsSMGenericEvent(cfgFlds []*config.CfgCdrField) (sessions.SMGen
break
}
}
return sessions.SMGenericEvent(utils.ConvertMapValStrIf(outMap)), nil
return utils.ConvertMapValStrIf(outMap), nil
}
func NewBareCCAFromCCR(ccr *CCR, originHost, originRealm string) *CCA {

View File

@@ -27,7 +27,6 @@ import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/fiorix/go-diameter/diam"
"github.com/fiorix/go-diameter/diam/avp"
@@ -488,8 +487,8 @@ func TestCCRAsSMGenericEvent(t *testing.T) {
})
ccr.diamMessage.NewAVP("FramedIPAddress", avp.Mbit, 0, datatype.OctetString("0AE40041"))
cfgFlds := make([]*config.CfgCdrField, 0)
eSMGEv := sessions.SMGenericEvent{"EventName": "DIAMETER_CCR"}
if rSMGEv, err := ccr.AsSMGenericEvent(cfgFlds); err != nil {
eSMGEv := map[string]interface{}{"EventName": "DIAMETER_CCR"}
if rSMGEv, err := ccr.AsMapIface(cfgFlds); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, rSMGEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, rSMGEv)
@@ -505,8 +504,8 @@ func TestCCRAsSMGenericEvent(t *testing.T) {
Mandatory: true,
},
}
eSMGEv = sessions.SMGenericEvent{"EventName": "DIAMETER_CCR", "LastUsed": "54239"}
if rSMGEv, err := ccr.AsSMGenericEvent(cfgFlds); err != nil {
eSMGEv = map[string]interface{}{"EventName": "DIAMETER_CCR", "LastUsed": "54239"}
if rSMGEv, err := ccr.AsMapIface(cfgFlds); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, rSMGEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, rSMGEv)
@@ -522,8 +521,8 @@ func TestCCRAsSMGenericEvent(t *testing.T) {
Mandatory: true,
},
}
eSMGEv = sessions.SMGenericEvent{"EventName": "DIAMETER_CCR", "LastUsed": "4420"}
if rSMGEv, err := ccr.AsSMGenericEvent(cfgFlds); err != nil {
eSMGEv = map[string]interface{}{"EventName": "DIAMETER_CCR", "LastUsed": "4420"}
if rSMGEv, err := ccr.AsMapIface(cfgFlds); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, rSMGEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, rSMGEv)

View File

@@ -49,55 +49,67 @@ func (self *SMGenericBiRpcV1) Handlers() map[string]interface{} {
}
/// Returns MaxUsage (for calls in seconds), -1 for no limit
func (self *SMGenericBiRpcV1) GetMaxUsage(clnt *rpc2.Client, ev sessions.SMGenericEvent, maxUsage *float64) error {
func (self *SMGenericBiRpcV1) GetMaxUsage(clnt *rpc2.Client,
ev map[string]interface{}, maxUsage *float64) error {
return self.sm.BiRPCV1GetMaxUsage(clnt, ev, maxUsage)
}
// Called on session start, returns the maximum number of seconds the session can last
func (self *SMGenericBiRpcV1) InitiateSession(clnt *rpc2.Client, ev sessions.SMGenericEvent, maxUsage *float64) error {
func (self *SMGenericBiRpcV1) InitiateSession(clnt *rpc2.Client,
ev map[string]interface{}, maxUsage *float64) error {
return self.sm.BiRPCV1InitiateSession(clnt, ev, maxUsage)
}
// Interim updates, returns remaining duration from the rater
func (self *SMGenericBiRpcV1) UpdateSession(clnt *rpc2.Client, ev sessions.SMGenericEvent, maxUsage *float64) error {
func (self *SMGenericBiRpcV1) UpdateSession(clnt *rpc2.Client,
ev map[string]interface{}, maxUsage *float64) error {
return self.sm.BiRPCV1UpdateSession(clnt, ev, maxUsage)
}
// Called on session end, should stop debit loop
func (self *SMGenericBiRpcV1) TerminateSession(clnt *rpc2.Client, ev sessions.SMGenericEvent, reply *string) error {
func (self *SMGenericBiRpcV1) TerminateSession(clnt *rpc2.Client,
ev map[string]interface{}, reply *string) error {
return self.sm.BiRPCV1TerminateSession(clnt, ev, reply)
}
// Called on individual Events (eg SMS)
func (self *SMGenericBiRpcV1) ChargeEvent(clnt *rpc2.Client, ev sessions.SMGenericEvent, maxUsage *float64) error {
func (self *SMGenericBiRpcV1) ChargeEvent(clnt *rpc2.Client,
ev map[string]interface{}, maxUsage *float64) error {
return self.sm.BiRPCV1ChargeEvent(clnt, ev, maxUsage)
}
// Called on session end, should send the CDR to CDRS
func (self *SMGenericBiRpcV1) ProcessCDR(clnt *rpc2.Client, ev sessions.SMGenericEvent, reply *string) error {
func (self *SMGenericBiRpcV1) ProcessCDR(clnt *rpc2.Client,
ev map[string]interface{}, reply *string) error {
return self.sm.BiRPCV1ProcessCDR(clnt, ev, reply)
}
func (self *SMGenericBiRpcV1) GetActiveSessions(clnt *rpc2.Client, attrs map[string]string, reply *[]*sessions.ActiveSession) error {
func (self *SMGenericBiRpcV1) GetActiveSessions(clnt *rpc2.Client,
attrs map[string]string, reply *[]*sessions.ActiveSession) error {
return self.sm.BiRPCV1GetActiveSessions(clnt, attrs, reply)
}
func (self *SMGenericBiRpcV1) GetActiveSessionsCount(clnt *rpc2.Client, attrs map[string]string, reply *int) error {
func (self *SMGenericBiRpcV1) GetActiveSessionsCount(clnt *rpc2.Client,
attrs map[string]string, reply *int) error {
return self.sm.BiRPCV1GetActiveSessionsCount(clnt, attrs, reply)
}
func (self *SMGenericBiRpcV1) GetPassiveSessions(clnt *rpc2.Client, attrs map[string]string, reply *[]*sessions.ActiveSession) error {
func (self *SMGenericBiRpcV1) GetPassiveSessions(clnt *rpc2.Client,
attrs map[string]string, reply *[]*sessions.ActiveSession) error {
return self.sm.BiRPCV1GetPassiveSessions(clnt, attrs, reply)
}
func (self *SMGenericBiRpcV1) GetPassiveSessionsCount(clnt *rpc2.Client, attrs map[string]string, reply *int) error {
func (self *SMGenericBiRpcV1) GetPassiveSessionsCount(clnt *rpc2.Client,
attrs map[string]string, reply *int) error {
return self.sm.BiRPCV1GetPassiveSessionsCount(clnt, attrs, reply)
}
func (self *SMGenericBiRpcV1) ReplicateActiveSessions(clnt *rpc2.Client, args sessions.ArgsReplicateSessions, reply *string) error {
func (self *SMGenericBiRpcV1) ReplicateActiveSessions(clnt *rpc2.Client,
args sessions.ArgsReplicateSessions, reply *string) error {
return self.sm.BiRPCV1ReplicateActiveSessions(clnt, args, reply)
}
func (self *SMGenericBiRpcV1) ReplicatePassiveSessions(clnt *rpc2.Client, args sessions.ArgsReplicateSessions, reply *string) error {
func (self *SMGenericBiRpcV1) ReplicatePassiveSessions(clnt *rpc2.Client,
args sessions.ArgsReplicateSessions, reply *string) error {
return self.sm.BiRPCV1ReplicateActiveSessions(clnt, args, reply)
}

View File

@@ -37,65 +37,79 @@ type SMGenericV1 struct {
}
// Returns MaxUsage (for calls in seconds), -1 for no limit
func (self *SMGenericV1) GetMaxUsage(ev sessions.SMGenericEvent, maxUsage *float64) error {
func (self *SMGenericV1) GetMaxUsage(ev map[string]interface{},
maxUsage *float64) error {
return self.SMG.BiRPCV1GetMaxUsage(nil, ev, maxUsage)
}
// Called on session start, returns the maximum number of seconds the session can last
func (self *SMGenericV1) InitiateSession(ev sessions.SMGenericEvent, maxUsage *float64) error {
func (self *SMGenericV1) InitiateSession(ev map[string]interface{},
maxUsage *float64) error {
return self.SMG.BiRPCV1InitiateSession(nil, ev, maxUsage)
}
// Interim updates, returns remaining duration from the rater
func (self *SMGenericV1) UpdateSession(ev sessions.SMGenericEvent, maxUsage *float64) error {
func (self *SMGenericV1) UpdateSession(ev map[string]interface{},
maxUsage *float64) error {
return self.SMG.BiRPCV1UpdateSession(nil, ev, maxUsage)
}
// Called on session end, should stop debit loop
func (self *SMGenericV1) TerminateSession(ev sessions.SMGenericEvent, reply *string) error {
func (self *SMGenericV1) TerminateSession(ev map[string]interface{},
reply *string) error {
return self.SMG.BiRPCV1TerminateSession(nil, ev, reply)
}
// Called on individual Events (eg SMS)
func (self *SMGenericV1) ChargeEvent(ev sessions.SMGenericEvent, maxUsage *float64) error {
func (self *SMGenericV1) ChargeEvent(ev map[string]interface{},
maxUsage *float64) error {
return self.SMG.BiRPCV1ChargeEvent(nil, ev, maxUsage)
}
// Called on session end, should send the CDR to CDRS
func (self *SMGenericV1) ProcessCDR(ev sessions.SMGenericEvent, reply *string) error {
func (self *SMGenericV1) ProcessCDR(ev map[string]interface{},
reply *string) error {
return self.SMG.BiRPCV1ProcessCDR(nil, ev, reply)
}
func (self *SMGenericV1) GetActiveSessions(attrs map[string]string, reply *[]*sessions.ActiveSession) error {
func (self *SMGenericV1) GetActiveSessions(attrs map[string]string,
reply *[]*sessions.ActiveSession) error {
return self.SMG.BiRPCV1GetActiveSessions(nil, attrs, reply)
}
func (self *SMGenericV1) GetActiveSessionsCount(attrs map[string]string, reply *int) error {
func (self *SMGenericV1) GetActiveSessionsCount(attrs map[string]string,
reply *int) error {
return self.SMG.BiRPCV1GetActiveSessionsCount(nil, attrs, reply)
}
func (self *SMGenericV1) GetPassiveSessions(attrs map[string]string, reply *[]*sessions.ActiveSession) error {
func (self *SMGenericV1) GetPassiveSessions(attrs map[string]string,
reply *[]*sessions.ActiveSession) error {
return self.SMG.BiRPCV1GetPassiveSessions(nil, attrs, reply)
}
func (self *SMGenericV1) GetPassiveSessionsCount(attrs map[string]string, reply *int) error {
func (self *SMGenericV1) GetPassiveSessionsCount(attrs map[string]string,
reply *int) error {
return self.SMG.BiRPCV1GetPassiveSessionsCount(nil, attrs, reply)
}
func (self *SMGenericV1) SetPassiveSessions(args sessions.ArgsSetPassiveSessions, reply *string) error {
func (self *SMGenericV1) SetPassiveSessions(args sessions.ArgsSetPassiveSessions,
reply *string) error {
return self.SMG.BiRPCV1SetPassiveSessions(nil, args, reply)
}
func (self *SMGenericV1) ReplicateActiveSessions(args sessions.ArgsReplicateSessions, reply *string) error {
func (self *SMGenericV1) ReplicateActiveSessions(args sessions.ArgsReplicateSessions,
reply *string) error {
return self.SMG.BiRPCV1ReplicateActiveSessions(nil, args, reply)
}
func (self *SMGenericV1) ReplicatePassiveSessions(args sessions.ArgsReplicateSessions, reply *string) error {
func (self *SMGenericV1) ReplicatePassiveSessions(args sessions.ArgsReplicateSessions,
reply *string) error {
return self.SMG.BiRPCV1ReplicatePassiveSessions(nil, args, reply)
}
// rpcclient.RpcClientConnection interface
func (self *SMGenericV1) Call(serviceMethod string, args interface{}, reply interface{}) error {
func (self *SMGenericV1) Call(serviceMethod string,
args interface{}, reply interface{}) error {
methodSplit := strings.Split(serviceMethod, ".")
if len(methodSplit) != 2 {
return rpcclient.ErrUnsupporteServiceMethod

View File

@@ -22,7 +22,6 @@ import (
"time"
"github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/sessions"
)
type SMGenericV2 struct {
@@ -30,21 +29,25 @@ type SMGenericV2 struct {
}
// GetMaxUsage returns maxUsage as time.Duration/int64
func (smgv2 *SMGenericV2) GetMaxUsage(ev sessions.SMGenericEvent, maxUsage *time.Duration) error {
func (smgv2 *SMGenericV2) GetMaxUsage(ev map[string]interface{},
maxUsage *time.Duration) error {
return smgv2.SMG.BiRPCV2GetMaxUsage(nil, ev, maxUsage)
}
// Called on session start, returns the maximum number of seconds the session can last
func (smgv2 *SMGenericV2) InitiateSession(ev sessions.SMGenericEvent, maxUsage *time.Duration) error {
func (smgv2 *SMGenericV2) InitiateSession(ev map[string]interface{},
maxUsage *time.Duration) error {
return smgv2.SMG.BiRPCV2InitiateSession(nil, ev, maxUsage)
}
// Interim updates, returns remaining duration from the rater
func (smgv2 *SMGenericV2) UpdateSession(ev sessions.SMGenericEvent, maxUsage *time.Duration) error {
func (smgv2 *SMGenericV2) UpdateSession(ev map[string]interface{},
maxUsage *time.Duration) error {
return smgv2.SMG.BiRPCV2UpdateSession(nil, ev, maxUsage)
}
// Called on individual Events (eg SMS)
func (smgv2 *SMGenericV2) ChargeEvent(ev sessions.SMGenericEvent, maxUsage *time.Duration) error {
func (smgv2 *SMGenericV2) ChargeEvent(ev map[string]interface{},
maxUsage *time.Duration) error {
return smgv2.SMG.BiRPCV2ChargeEvent(nil, ev, maxUsage)
}

223
engine/mapevent.go Normal file
View File

@@ -0,0 +1,223 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
// NewMapEvent makes sure the content is not nil
func NewMapEvent(mp map[string]interface{}) (me MapEvent) {
if mp == nil {
mp = make(map[string]interface{})
}
return MapEvent(mp)
}
// MapEvent is a map[string]interface{} with convenience methods on top
type MapEvent map[string]interface{}
func (me MapEvent) String() string {
return utils.ToJSON(me)
}
func (me MapEvent) HasField(fldName string) (has bool) {
_, has = me[fldName]
return
}
func (me MapEvent) GetString(fldName string) (out string, err error) {
fldIface, has := me[fldName]
if !has {
return "", utils.ErrNotFound
}
return utils.IfaceAsString(fldIface)
}
func (me MapEvent) GetStringIgnoreErrors(fldName string) (out string) {
out, _ = me.GetString(fldName)
return
}
// GetDuration returns a field as Duration
func (me MapEvent) GetDuration(fldName string) (d time.Duration, err error) {
fldIface, has := me[fldName]
if !has {
return d, utils.ErrNotFound
}
return utils.IfaceAsDuration(fldIface)
}
// GetDuration returns a field as Duration, ignoring errors
func (me MapEvent) GetDurationIgnoreErrors(fldName string) (d time.Duration) {
d, _ = me.GetDuration(fldName)
return
}
// GetTime returns a field as Time
func (me MapEvent) GetTime(fldName string, timezone string) (t time.Time, err error) {
fldIface, has := me[fldName]
if !has {
return t, utils.ErrNotFound
}
return utils.IfaceAsTime(fldIface, timezone)
}
// GetTimeIgnoreErrors returns a field as Time instance, ignoring errors
func (me MapEvent) GetTimeIgnoreErrors(fldName string, tmz string) (t time.Time) {
t, _ = me.GetTime(fldName, tmz)
return
}
// Clone returns the cloned map
func (me MapEvent) Clone() (mp map[string]interface{}) {
mp = make(map[string]interface{}, len(me))
for k, v := range me {
mp[k] = v
}
return
}
// AsMapString returns a map[string]string out of mp, ignoring specific fields if needed
// most used when needing to export extraFields
func (me MapEvent) AsMapString(ignoredFlds utils.StringMap) (mp map[string]string, err error) {
mp = make(map[string]string)
for k, v := range me {
if ignoredFlds.HasKey(k) {
continue
}
var out string
if out, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
me[k] = out
}
return
}
func (me MapEvent) AsMapStringIgnoreErrors(ignoredFlds utils.StringMap) (mp map[string]string) {
mp = make(map[string]string)
for k, v := range me {
if ignoredFlds.HasKey(k) {
continue
}
if out, can := utils.CastFieldIfToString(v); can {
me[k] = out
}
}
return
}
// AsCDR exports the SafEvent as CDR
func (me MapEvent) AsCDR(cfg *config.CGRConfig, tmz string) (cdr *CDR, err error) {
cdr = NewCDRWithDefaults(cfg)
for k, v := range me {
if !utils.IsSliceMember(utils.PrimaryCdrFields, k) { // not primary field, populate extra ones
if cdr.ExtraFields[k], err = utils.IfaceAsString(v); err != nil {
return nil, err
}
continue
}
switch k {
default:
return nil, fmt.Errorf("unimplemented CDR field: <%s>", k)
case utils.CGRID:
if cdr.CGRID, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.RunID:
if cdr.RunID, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.OriginHost:
if cdr.OriginHost, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.Source:
if cdr.Source, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.OriginID:
if cdr.OriginID, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.ToR:
if cdr.ToR, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.RequestType:
if cdr.RequestType, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.Tenant:
if cdr.Tenant, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.Category:
if cdr.Category, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.Account:
if cdr.Account, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.Subject:
if cdr.Subject, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.Destination:
if cdr.Destination, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.SetupTime:
if cdr.SetupTime, err = utils.IfaceAsTime(v, tmz); err != nil {
return nil, err
}
case utils.AnswerTime:
if cdr.AnswerTime, err = utils.IfaceAsTime(v, tmz); err != nil {
return nil, err
}
case utils.Usage:
if cdr.Usage, err = utils.IfaceAsDuration(v); err != nil {
return nil, err
}
case utils.Partial:
if cdr.Partial, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
case utils.PreRated:
if cdr.PreRated, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
case utils.CostSource:
if cdr.CostSource, err = utils.IfaceAsString(v); err != nil {
return nil, err
}
case utils.Cost:
if cdr.Cost, err = utils.IfaceAsFloat64(v); err != nil {
return nil, err
}
}
}
return
}

207
engine/safevent.go Normal file
View File

@@ -0,0 +1,207 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func NewSafEvent(mp map[string]interface{}) *SafEvent {
return &SafEvent{me: NewMapEvent(mp)}
}
// SafEvent is a generic event which is safe to read/write from multiple goroutines
type SafEvent struct {
sync.RWMutex
me MapEvent
}
// MapEvent offers access to MapEvent methods, avoiding locks
func (se *SafEvent) MapEvent() (mp MapEvent) {
return se.me
}
func (se *SafEvent) String() (out string) {
se.RLock()
out = se.me.String()
se.RUnlock()
return
}
func (se *SafEvent) HasField(fldName string) (has bool) {
se.RLock()
has = se.me.HasField(fldName)
se.RUnlock()
return
}
func (se *SafEvent) Get(fldName string) (out interface{}, has bool) {
se.RLock()
out, has = se.me[fldName]
se.RUnlock()
return
}
func (se *SafEvent) GetIgnoreErrors(fldName string) (out interface{}) {
out, _ = se.Get(fldName)
return
}
// Set will set a field's value
func (se *SafEvent) Set(fldName string, val interface{}) {
se.Lock()
se.me[fldName] = val
se.Unlock()
return
}
// Remove will remove a field from map
func (se *SafEvent) Remove(fldName string) {
se.Lock()
delete(se.me, fldName)
se.Unlock()
return
}
func (se *SafEvent) GetString(fldName string) (out string, err error) {
se.RLock()
out, err = se.me.GetString(fldName)
se.RUnlock()
return
}
func (se *SafEvent) GetStringIgnoreErrors(fldName string) (out string) {
out, _ = se.GetString(fldName)
return
}
// GetDuration returns a field as Duration
func (se *SafEvent) GetDuration(fldName string) (d time.Duration, err error) {
se.RLock()
d, err = se.me.GetDuration(fldName)
se.RUnlock()
return
}
// GetDuration returns a field as Duration, ignoring errors
func (se *SafEvent) GetDurationIgnoreErrors(fldName string) (d time.Duration) {
d, _ = se.GetDuration(fldName)
return
}
// GetDurationPointer returns pointer towards duration, useful to detect presence of duration
func (se *SafEvent) GetDurationPtr(fldName string) (d *time.Duration, err error) {
fldIface, has := se.Get(fldName)
if !has {
return
}
var dReal time.Duration
if dReal, err = utils.IfaceAsDuration(fldIface); err != nil {
return
}
return &dReal, nil
}
// GetDurationPointer returns pointer towards duration, useful to detect presence of duration
func (se *SafEvent) GetDurationPtrOrDefault(fldName string, dflt *time.Duration) (d *time.Duration, err error) {
fldIface, has := se.Get(fldName)
if !has {
return dflt, nil
}
var dReal time.Duration
if dReal, err = utils.IfaceAsDuration(fldIface); err != nil {
return
}
return &dReal, nil
}
// GetTime returns a field as Time
func (se *SafEvent) GetTime(fldName string, tmz string) (t time.Time, err error) {
se.RLock()
t, err = se.me.GetTime(fldName, tmz)
se.RUnlock()
return
}
// GetTimeIgnoreErrors returns a field as Time instance, ignoring errors
func (se *SafEvent) GetTimeIgnoreErrors(fldName string, tmz string) (t time.Time) {
t, _ = se.GetTime(fldName, tmz)
return
}
// GetSet will attempt to get a field value
// if field not present set it to the value received as parameter
func (se *SafEvent) GetSetString(fldName string, setVal string) (out string, err error) {
se.Lock()
defer se.Unlock()
outIface, has := se.me[fldName]
if !has {
se.me[fldName] = setVal
out = setVal
return
}
// should be present, return it as string
return utils.IfaceAsString(outIface)
}
// GetMapInterface returns the map stored internally without cloning it
func (se *SafEvent) GetMapInterface() (mp map[string]interface{}) {
se.RLock()
mp = se.me
se.RUnlock()
return
}
// AsMapInterface returns the cloned map stored internally
func (se *SafEvent) AsMapInterface() (mp map[string]interface{}) {
se.RLock()
mp = se.me.Clone()
se.RUnlock()
return
}
// AsMapString returns a map[string]string out of mp, ignoring specific fields if needed
// most used when needing to export extraFields
func (se *SafEvent) AsMapString(ignoredFlds utils.StringMap) (mp map[string]string, err error) {
se.RLock()
mp, err = se.me.AsMapString(ignoredFlds)
se.RUnlock()
return
}
// AsMapString returns a map[string]string out of mp, ignoring specific fields if needed
// most used when needing to export extraFields
func (se *SafEvent) AsMapStringIgnoreErrors(ignoredFlds utils.StringMap) (mp map[string]string) {
se.RLock()
mp = se.me.AsMapStringIgnoreErrors(ignoredFlds)
se.RUnlock()
return
}
// AsCDR exports the SafEvent as CDR
func (se *SafEvent) AsCDR(cfg *config.CGRConfig, tmz string) (cdr *CDR, err error) {
se.RLock()
cdr, err = se.me.AsCDR(cfg, tmz)
se.RUnlock()
return
}

69
sessions/libsessions.go Normal file
View File

@@ -0,0 +1,69 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessions
import (
"math/rand"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// getSessionTTL retrieves SessionTTL setting out of S
func getSessionTTL(ev *engine.SafEvent, cfgSessionTTL time.Duration,
cfgSessionTTLMaxDelay *time.Duration) (ttl time.Duration, err error) {
if ttl, err = ev.GetDuration(utils.SessionTTL); err != nil {
if err != utils.ErrNotFoundNoCaps {
return
}
ttl = cfgSessionTTL
}
if ttl == 0 {
return
}
// random delay computation
var sessionTTLMaxDelay int64
maxDelay, err := ev.GetDuration(utils.SessionTTLMaxDelay)
if err != nil {
if err != utils.ErrNotFoundNoCaps {
return
}
err = nil // clear the error for return
if cfgSessionTTLMaxDelay != nil {
maxDelay = *cfgSessionTTLMaxDelay
}
}
sessionTTLMaxDelay = maxDelay.Nanoseconds() / 1000000 // Milliseconds precision for randomness
if sessionTTLMaxDelay != 0 {
rand.Seed(time.Now().Unix())
ttl += time.Duration(rand.Int63n(sessionTTLMaxDelay) * 1000000)
}
return
}
func GetSetCGRID(ev *engine.SafEvent) (cgrID string) {
cgrID = ev.GetStringIgnoreErrors(utils.CGRID)
if cgrID == "" {
cgrID = utils.Sha1(ev.GetStringIgnoreErrors(utils.OriginID),
ev.GetStringIgnoreErrors(utils.OriginHost))
ev.Set(utils.CGRID, cgrID)
}
return
}

View File

@@ -42,7 +42,7 @@ type SMGSession struct {
CGRID string // Unique identifier for this session
RunID string // Keep a reference for the derived run
Timezone string
EventStart SMGenericEvent // Event which started the session
EventStart *engine.SafEvent // Event which started the session
CD *engine.CallDescriptor // initial CD used for debits, updated on each debit
ResourceID string
@@ -163,17 +163,18 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time.
// Send disconnect order to remote connection
func (self *SMGSession) disconnectSession(reason string) error {
self.EventStart[utils.Usage] = self.TotalUsage.Nanoseconds() // Set the usage to total one debitted
if self.clntConn == nil || reflect.ValueOf(self.clntConn).IsNil() {
return errors.New("Calling SMGClientV1.DisconnectSession requires bidirectional JSON connection")
}
self.EventStart.Set(utils.Usage, self.TotalUsage) // Set the usage to total one debitted
var reply string
servMethod := "SessionSv1.DisconnectSession"
if self.clientProto == 0 { // competibility with OpenSIPS
servMethod = "SMGClientV1.DisconnectSession"
}
if err := self.clntConn.Call(servMethod,
utils.AttrDisconnectSession{EventStart: self.EventStart, Reason: reason},
utils.AttrDisconnectSession{EventStart: self.EventStart.AsMapInterface(),
Reason: reason},
&reply); err != nil {
return err
} else if reply != utils.OK {
@@ -264,8 +265,8 @@ func (self *SMGSession) storeSMCost() error {
CGRID: self.CGRID,
CostSource: utils.MetaSessionS,
RunID: self.RunID,
OriginHost: self.EventStart.GetOriginatorIP(utils.META_DEFAULT),
OriginID: self.EventStart.GetOriginID(utils.META_DEFAULT),
OriginHost: self.EventStart.GetStringIgnoreErrors(utils.OriginHost),
OriginID: self.EventStart.GetStringIgnoreErrors(utils.OriginID),
Usage: self.TotalUsage,
CostDetails: self.EventCost,
}
@@ -285,25 +286,23 @@ func (self *SMGSession) storeSMCost() error {
func (self *SMGSession) AsActiveSession(timezone string) *ActiveSession {
self.mux.RLock()
defer self.mux.RUnlock()
sTime, _ := self.EventStart.GetSetupTime(utils.META_DEFAULT, timezone)
aTime, _ := self.EventStart.GetAnswerTime(utils.META_DEFAULT, timezone)
aSession := &ActiveSession{
CGRID: self.CGRID,
TOR: self.EventStart.GetTOR(utils.META_DEFAULT),
TOR: self.EventStart.GetStringIgnoreErrors(utils.ToR),
RunID: self.RunID,
OriginID: self.EventStart.GetOriginID(utils.META_DEFAULT),
CdrHost: self.EventStart.GetOriginatorIP(utils.META_DEFAULT),
CdrSource: self.EventStart.GetCdrSource(),
ReqType: self.EventStart.GetReqType(utils.META_DEFAULT),
Tenant: self.EventStart.GetTenant(utils.META_DEFAULT),
Category: self.EventStart.GetCategory(utils.META_DEFAULT),
Account: self.EventStart.GetAccount(utils.META_DEFAULT),
Subject: self.EventStart.GetSubject(utils.META_DEFAULT),
Destination: self.EventStart.GetDestination(utils.META_DEFAULT),
SetupTime: sTime,
AnswerTime: aTime,
OriginID: self.EventStart.GetStringIgnoreErrors(utils.OriginID),
CdrHost: self.EventStart.GetStringIgnoreErrors(utils.OriginHost),
CdrSource: utils.SessionS + "_" + self.EventStart.GetStringIgnoreErrors(utils.EVENT_NAME),
ReqType: self.EventStart.GetStringIgnoreErrors(utils.RequestType),
Tenant: self.EventStart.GetStringIgnoreErrors(utils.Tenant),
Category: self.EventStart.GetStringIgnoreErrors(utils.Category),
Account: self.EventStart.GetStringIgnoreErrors(utils.Account),
Subject: self.EventStart.GetStringIgnoreErrors(utils.Subject),
Destination: self.EventStart.GetStringIgnoreErrors(utils.Destination),
SetupTime: self.EventStart.GetTimeIgnoreErrors(utils.SetupTime, self.Timezone),
AnswerTime: self.EventStart.GetTimeIgnoreErrors(utils.AnswerTime, self.Timezone),
Usage: self.TotalUsage,
ExtraFields: self.EventStart.GetExtraFields(),
ExtraFields: self.EventStart.AsMapStringIgnoreErrors(utils.NewStringMap(utils.PrimaryCdrFields...)),
SMId: "CGR-DA",
}
if self.CD != nil {

View File

@@ -21,6 +21,7 @@ package sessions
import (
"errors"
"fmt"
"math/rand"
"reflect"
"strings"
"sync"
@@ -42,6 +43,7 @@ const (
var (
ErrPartiallyExecuted = errors.New("Partially executed")
ErrActiveSession = errors.New("ACTIVE_SESSION")
debug bool
)
func NewSessionReplicationConns(conns []*config.HaPoolConfig, reconnects int,
@@ -161,11 +163,55 @@ type smgSessionTerminator struct {
// setSessionTerminator installs a new terminator for a session
func (smg *SMGeneric) setSessionTerminator(s *SMGSession) {
ttl := s.EventStart.GetSessionTTL(smg.cgrCfg.SessionSCfg().SessionTTL,
smg.cgrCfg.SessionSCfg().SessionTTLMaxDelay)
ttl, err := s.EventStart.GetDuration(utils.SessionTTL)
switch err {
case nil: // all good
case utils.ErrNotFoundNoCaps:
ttl = smg.cgrCfg.SessionSCfg().SessionTTL
default: // not nil
utils.Logger.Warning(
fmt.Sprintf("<%s>, cannot extract %s from event, disabling session timeout for event: <%s>",
utils.SessionS, utils.SessionTTL, s.EventStart.String()))
ttl = time.Duration(0)
}
if ttl == 0 {
return
}
// random delay computation
var sessionTTLMaxDelay int64
maxDelay, err := s.EventStart.GetDuration(utils.SessionTTLMaxDelay)
switch err {
case nil: // all good
case utils.ErrNotFoundNoCaps:
if smg.cgrCfg.SessionSCfg().SessionTTLMaxDelay != nil {
maxDelay = *smg.cgrCfg.SessionSCfg().SessionTTLMaxDelay
}
default: // not nil
utils.Logger.Warning(
fmt.Sprintf("<%s>, cannot extract %s from event, disabling session timeout for event: <%s>",
utils.SessionS, utils.SessionTTLMaxDelay, s.EventStart.String()))
return
}
sessionTTLMaxDelay = maxDelay.Nanoseconds() / 1000000 // Milliseconds precision for randomness
if sessionTTLMaxDelay != 0 {
rand.Seed(time.Now().Unix())
ttl += time.Duration(rand.Int63n(sessionTTLMaxDelay) * 1000000)
}
ttlLastUsed, err := s.EventStart.GetDurationPtrOrDefault(utils.SessionTTLLastUsed, smg.cgrCfg.SessionSCfg().SessionTTLLastUsed)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s>, cannot extract %s from event, disabling session timeout for event: <%s>",
utils.SessionS, utils.SessionTTLLastUsed, s.EventStart.String()))
return
}
ttlUsage, err := s.EventStart.GetDurationPtrOrDefault(utils.SessionTTLUsage, smg.cgrCfg.SessionSCfg().SessionTTLUsage)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s>, cannot extract %s from event, disabling session timeout for event: <%s>",
utils.SessionS, utils.SessionTTLUsage, s.EventStart.String()))
return
}
// add to sessionTerimnations
smg.sTsMux.Lock()
defer smg.sTsMux.Unlock()
if _, found := smg.sessionTerminators[s.CGRID]; found { // already there, no need to set up
@@ -177,8 +223,8 @@ func (smg *SMGeneric) setSessionTerminator(s *SMGSession) {
timer: timer,
endChan: endChan,
ttl: ttl,
ttlLastUsed: s.EventStart.GetSessionTTLLastUsed(),
ttlUsage: s.EventStart.GetSessionTTLUsage(),
ttlLastUsed: ttlLastUsed,
ttlUsage: ttlUsage,
}
smg.sessionTerminators[s.CGRID] = terminator
go func(cgrID string) {
@@ -226,7 +272,12 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) {
s.debit(debitUsage, tmtr.ttlLastUsed)
}
smg.sessionEnd(s.CGRID, s.TotalUsage)
cdr := s.EventStart.AsCDR(smg.cgrCfg, smg.Timezone)
cdr, err := s.EventStart.AsCDR(smg.cgrCfg, smg.Timezone)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> could not create CDR out of event %s, err: %s",
utils.SessionS, s.EventStart.String(), err.Error()))
}
cdr.Usage = s.TotalUsage
var reply string
smg.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply)
@@ -234,8 +285,8 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) {
var reply string
argsRU := utils.ArgRSv1ResourceUsage{
CGREvent: utils.CGREvent{
Tenant: s.EventStart.GetTenant(utils.META_DEFAULT),
Event: s.EventStart,
Tenant: s.EventStart.GetStringIgnoreErrors(utils.Tenant),
Event: s.EventStart.AsMapInterface(),
},
UsageID: s.ResourceID,
Units: 1,
@@ -291,7 +342,7 @@ func (smg *SMGeneric) indexSession(s *SMGSession, passiveSessions bool) {
s.mux.RLock()
defer s.mux.RUnlock()
for fieldName := range smg.ssIdxCfg {
fieldVal, err := utils.ReflectFieldAsString(s.EventStart, fieldName, "")
fieldVal, err := s.EventStart.GetString(fieldName)
if err != nil {
if err == utils.ErrNotFound {
fieldVal = utils.NOT_AVAILABLE
@@ -419,16 +470,21 @@ func (smg *SMGeneric) getSessionIDsForPrefix(prefix string, passiveSessions bool
}
// sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request
func (smg *SMGeneric) sessionStart(evStart SMGenericEvent,
func (smg *SMGeneric) sessionStart(evStart *engine.SafEvent,
clntConn rpcclient.RpcClientConnection, resourceID string) (err error) {
cgrID := evStart.GetCGRID(utils.META_DEFAULT)
cgrID := evStart.GetStringIgnoreErrors(utils.CGRID)
_, err = guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level
if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 {
return nil, nil // ToDo: handle here also debits
}
cdr, err := evStart.AsCDR(smg.cgrCfg, smg.Timezone)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> could not convert event: %s to CDR, err: %s",
utils.SessionS, evStart.String(), err.Error()))
}
var sessionRuns []*engine.SessionRun
if err := smg.rals.Call("Responder.GetSessionRuns",
evStart.AsCDR(smg.cgrCfg, smg.Timezone), &sessionRuns); err != nil {
cdr, &sessionRuns); err != nil {
return nil, err
} else if len(sessionRuns) == 0 {
s := &SMGSession{CGRID: cgrID, ResourceID: resourceID, EventStart: evStart,
@@ -477,9 +533,9 @@ func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error {
if idx == 0 && s.stopDebit != nil {
close(s.stopDebit) // Stop automatic debits
}
aTime, err := s.EventStart.GetAnswerTime(utils.META_DEFAULT, smg.Timezone)
aTime, err := s.EventStart.GetTime(utils.AnswerTime, smg.Timezone)
if err != nil || aTime.IsZero() {
utils.Logger.Err(fmt.Sprintf("<%s> Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v",
utils.Logger.Warning(fmt.Sprintf("<%s> could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v",
utils.SessionS, cgrID, s.RunID, aTime, err))
continue // Unanswered session
}
@@ -516,8 +572,8 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro
}
for i, s := range ss[initialID] {
s.mux.Lock()
s.CGRID = cgrID // Overwrite initial CGRID with new one
s.EventStart[utils.OriginID] = newOriginID // Overwrite OriginID for session indexing
s.CGRID = cgrID // Overwrite initial CGRID with new one
s.EventStart.Set(utils.OriginID, newOriginID) // Overwrite OriginID for session indexing
s.mux.Unlock()
smg.recordASession(s)
if i == 0 {
@@ -687,16 +743,12 @@ func (smg *SMGeneric) asActiveSessions(fltrs map[string]string, count, passiveSe
}
if len(fltrs) != 0 { // Still have some filters to match
for i := 0; i < len(remainingSessions); {
sMp, err := remainingSessions[i].EventStart.AsMapStringString()
if err != nil {
return nil, 0, err
}
if _, hasRunID := sMp[utils.RunID]; !hasRunID {
sMp[utils.RunID] = utils.META_DEFAULT
if !remainingSessions[i].EventStart.HasField(utils.RunID) {
remainingSessions[i].EventStart.Set(utils.RunID, utils.META_DEFAULT)
}
matchingAll := true
for fltrFldName, fltrFldVal := range fltrs {
if fldVal, hasIt := sMp[fltrFldName]; !hasIt || fltrFldVal != fldVal { // No Match
if remainingSessions[i].EventStart.GetStringIgnoreErrors(fltrFldName) != fltrFldVal { // No Match
matchingAll = false
break
}
@@ -720,14 +772,18 @@ func (smg *SMGeneric) asActiveSessions(fltrs map[string]string, count, passiveSe
// Methods to apply on sessions, mostly exported through RPC/Bi-RPC
// MaxUsage calculates maximum usage allowed for given gevent
func (smg *SMGeneric) GetMaxUsage(gev SMGenericEvent) (maxUsage time.Duration, err error) {
cacheKey := "MaxUsage" + gev.GetCGRID(utils.META_DEFAULT)
func (smg *SMGeneric) GetMaxUsage(gev *engine.SafEvent) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(gev)
cacheKey := "MaxUsage" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return (item.Value.(time.Duration)), item.Err
}
defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: maxUsage, Err: err})
storedCdr := gev.AsCDR(config.CgrConfig(), smg.Timezone)
if _, has := gev[utils.Usage]; !has { // make sure we have a minimum duration configured
storedCdr, err := gev.AsCDR(config.CgrConfig(), smg.Timezone)
if err != nil {
return maxUsage, err
}
if has := gev.HasField(utils.Usage); !has { // make sure we have a minimum duration configured
storedCdr.Usage = smg.cgrCfg.SessionSCfg().MaxCallDuration
}
var maxDur float64
@@ -743,9 +799,9 @@ func (smg *SMGeneric) GetMaxUsage(gev SMGenericEvent) (maxUsage time.Duration, e
}
// Called on session start
func (smg *SMGeneric) InitiateSession(gev SMGenericEvent,
func (smg *SMGeneric) InitiateSession(gev *engine.SafEvent,
clnt rpcclient.RpcClientConnection, resourceID string) (maxUsage time.Duration, err error) {
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cgrID := GetSetCGRID(gev)
cacheKey := "InitiateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
@@ -769,9 +825,9 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent,
}
// Execute debits for usage/maxUsage
func (smg *SMGeneric) UpdateSession(gev SMGenericEvent,
func (smg *SMGeneric) UpdateSession(gev *engine.SafEvent,
clnt rpcclient.RpcClientConnection, resourceID string) (maxUsage time.Duration, err error) {
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cgrID := GetSetCGRID(gev)
cacheKey := "UpdateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
@@ -783,9 +839,10 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent,
return
}
if gev.HasField(utils.InitialOriginID) {
initialCGRID := gev.GetCGRID(utils.InitialOriginID)
initialCGRID := utils.Sha1(gev.GetStringIgnoreErrors(utils.InitialOriginID),
gev.GetStringIgnoreErrors(utils.OriginHost))
err = smg.sessionRelocate(initialCGRID,
cgrID, gev.GetOriginID(utils.META_DEFAULT))
cgrID, gev.GetStringIgnoreErrors(utils.OriginID))
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(gev, clnt, resourceID)
}
@@ -794,23 +851,34 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent,
}
smg.replicateSessionsWithID(initialCGRID, false, smg.smgReplConns)
}
smg.resetTerminatorTimer(cgrID,
gev.GetSessionTTL(
smg.cgrCfg.SessionSCfg().SessionTTL,
smg.cgrCfg.SessionSCfg().SessionTTLMaxDelay),
gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage())
sesTTL, err := getSessionTTL(gev, smg.cgrCfg.SessionSCfg().SessionTTL,
smg.cgrCfg.SessionSCfg().SessionTTLMaxDelay)
if err != nil {
return maxUsage, err
}
ttlLastUsed, err := gev.GetDurationPtrOrDefault(utils.SessionTTLLastUsed,
smg.cgrCfg.SessionSCfg().SessionTTLLastUsed)
if err != nil {
return maxUsage, err
}
ttlUsage, err := gev.GetDurationPtrOrDefault(utils.SessionTTLUsage,
smg.cgrCfg.SessionSCfg().SessionTTLUsage)
if err != nil {
return maxUsage, err
}
smg.resetTerminatorTimer(cgrID, sesTTL, ttlLastUsed, ttlUsage)
var lastUsed *time.Duration
var evLastUsed time.Duration
if evLastUsed, err = gev.GetLastUsed(utils.META_DEFAULT); err == nil {
if evLastUsed, err = gev.GetDuration(utils.LastUsed); err == nil {
lastUsed = &evLastUsed
} else if err != utils.ErrNotFound {
return
}
if maxUsage, err = gev.GetMaxUsage(utils.META_DEFAULT,
smg.cgrCfg.SessionSCfg().MaxCallDuration); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrMandatoryIeMissing
if maxUsage, err = gev.GetDuration(utils.Usage); err != nil {
if err != utils.ErrNotFound {
return
}
maxUsage = smg.cgrCfg.SessionSCfg().MaxCallDuration
return
}
aSessions := smg.getSessions(cgrID, false)
@@ -823,8 +891,8 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent,
return
}
}
defer smg.replicateSessionsWithID(gev.GetCGRID(utils.META_DEFAULT),
false, smg.smgReplConns)
defer smg.replicateSessionsWithID(cgrID, false, smg.smgReplConns)
for _, s := range aSessions[cgrID] {
if s.RunID == utils.META_NONE {
maxUsage = time.Duration(-1)
@@ -841,18 +909,19 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent,
}
// Called on session end, should stop debit loop
func (smg *SMGeneric) TerminateSession(gev SMGenericEvent,
func (smg *SMGeneric) TerminateSession(gev *engine.SafEvent,
clnt rpcclient.RpcClientConnection, resourceID string) (err error) {
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cgrID := GetSetCGRID(gev)
cacheKey := "TerminateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Err
}
defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Err: err})
if gev.HasField(utils.InitialOriginID) {
initialCGRID := gev.GetCGRID(utils.InitialOriginID)
initialCGRID := utils.Sha1(gev.GetStringIgnoreErrors(utils.InitialOriginID),
gev.GetStringIgnoreErrors(utils.OriginHost))
err = smg.sessionRelocate(initialCGRID, cgrID,
gev.GetOriginID(utils.META_DEFAULT))
gev.GetStringIgnoreErrors(utils.OriginID))
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(gev, clnt, resourceID)
}
@@ -863,23 +932,22 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent,
}
sessionIDs := []string{cgrID}
if gev.HasField(utils.OriginIDPrefix) { // OriginIDPrefix is present, OriginID will not be anymore considered
if sessionIDPrefix, errPrefix := gev.GetFieldAsString(utils.OriginIDPrefix); errPrefix == nil {
if sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix, false); len(sessionIDs) == 0 {
sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix, true)
for _, sessionID := range sessionIDs { // activate sessions for prefix
smg.passiveToActive(sessionID)
}
sessionIDPrefix := gev.GetStringIgnoreErrors(utils.OriginIDPrefix)
if sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix, false); len(sessionIDs) == 0 {
sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix, true)
for _, sessionID := range sessionIDs { // activate sessions for prefix
smg.passiveToActive(sessionID)
}
}
}
usage, errUsage := gev.GetUsage(utils.META_DEFAULT)
usage, errUsage := gev.GetDuration(utils.Usage)
var lastUsed time.Duration
if errUsage != nil {
if errUsage != utils.ErrNotFound {
err = errUsage
return
}
lastUsed, err = gev.GetLastUsed(utils.META_DEFAULT)
lastUsed, err = gev.GetDuration(utils.LastUsed)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrMandatoryIeMissing
@@ -914,15 +982,19 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent,
}
// Processes one time events (eg: SMS)
func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, err error) {
cgrID := gev.GetCGRID(utils.META_DEFAULT)
func (smg *SMGeneric) ChargeEvent(gev *engine.SafEvent) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(gev)
cacheKey := "ChargeEvent" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
}
defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: maxUsage, Err: err})
var sessionRuns []*engine.SessionRun
if err = smg.rals.Call("Responder.GetSessionRuns", gev.AsCDR(smg.cgrCfg, smg.Timezone), &sessionRuns); err != nil {
cdr, err := gev.AsCDR(smg.cgrCfg, smg.Timezone)
if err != nil {
return maxUsage, err
}
if err = smg.rals.Call("Responder.GetSessionRuns", cdr, &sessionRuns); err != nil {
return
} else if len(sessionRuns) == 0 {
return
@@ -1002,8 +1074,8 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, e
CGRID: cgrID,
CostSource: utils.MetaSessionS,
RunID: sR.DerivedCharger.RunID,
OriginHost: gev.GetOriginatorIP(utils.META_DEFAULT),
OriginID: gev.GetOriginID(utils.META_DEFAULT),
OriginHost: gev.GetStringIgnoreErrors(utils.OriginHost),
OriginID: gev.GetStringIgnoreErrors(utils.OriginID),
CostDetails: engine.NewEventCostFromCallCost(cc, cgrID, sR.DerivedCharger.RunID),
}
if errStore := smg.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost,
@@ -1019,16 +1091,16 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, e
return
}
func (smg *SMGeneric) ProcessCDR(gev SMGenericEvent) (err error) {
cgrID := gev.GetCGRID(utils.META_DEFAULT)
func (smg *SMGeneric) ProcessCDR(gev *engine.SafEvent) (err error) {
cgrID := GetSetCGRID(gev)
cacheKey := "ProcessCDR" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Err
}
defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Err: err})
cdr, err := gev.AsCDR(smg.cgrCfg, smg.Timezone)
var reply string
if err = smg.cdrsrv.Call("CdrsV1.ProcessCDR",
gev.AsCDR(smg.cgrCfg, smg.Timezone), &reply); err != nil {
if err = smg.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil {
return
}
return
@@ -1098,8 +1170,8 @@ func (smg *SMGeneric) CallBiRPC(clnt rpcclient.RpcClientConnection,
}
func (smg *SMGeneric) BiRPCV1GetMaxUsage(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *float64) error {
maxUsageDur, err := smg.GetMaxUsage(ev)
ev map[string]interface{}, maxUsage *float64) error {
maxUsageDur, err := smg.GetMaxUsage(engine.NewSafEvent(ev))
if err != nil {
return utils.NewErrServerError(err)
}
@@ -1113,8 +1185,8 @@ func (smg *SMGeneric) BiRPCV1GetMaxUsage(clnt rpcclient.RpcClientConnection,
// BiRPCV2GetMaxUsage returns the maximum usage as duration/int64
func (smg *SMGeneric) BiRPCV2GetMaxUsage(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *time.Duration) error {
maxUsageDur, err := smg.GetMaxUsage(ev)
ev map[string]interface{}, maxUsage *time.Duration) error {
maxUsageDur, err := smg.GetMaxUsage(engine.NewSafEvent(ev))
if err != nil {
return utils.NewErrServerError(err)
}
@@ -1124,9 +1196,9 @@ func (smg *SMGeneric) BiRPCV2GetMaxUsage(clnt rpcclient.RpcClientConnection,
// Called on session start, returns the maximum number of seconds the session can last
func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *float64) (err error) {
ev map[string]interface{}, maxUsage *float64) (err error) {
var minMaxUsage time.Duration
if minMaxUsage, err = smg.InitiateSession(ev, clnt, ""); err != nil {
if minMaxUsage, err = smg.InitiateSession(engine.NewSafEvent(ev), clnt, ""); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1142,9 +1214,9 @@ func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection,
// BiRPCV2InitiateSession initiates a new session, returns the maximum duration the session can last
func (smg *SMGeneric) BiRPCV2InitiateSession(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *time.Duration) (err error) {
ev map[string]interface{}, maxUsage *time.Duration) (err error) {
var minMaxUsage time.Duration
if minMaxUsage, err = smg.InitiateSession(ev, clnt, ""); err != nil {
if minMaxUsage, err = smg.InitiateSession(engine.NewSafEvent(ev), clnt, ""); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1157,9 +1229,9 @@ func (smg *SMGeneric) BiRPCV2InitiateSession(clnt rpcclient.RpcClientConnection,
// Interim updates, returns remaining duration from the RALs
func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *float64) (err error) {
ev map[string]interface{}, maxUsage *float64) (err error) {
var minMaxUsage time.Duration
if minMaxUsage, err = smg.UpdateSession(ev, clnt, ""); err != nil {
if minMaxUsage, err = smg.UpdateSession(engine.NewSafEvent(ev), clnt, ""); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1175,9 +1247,9 @@ func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection,
// BiRPCV1UpdateSession updates an existing session, returning the duration which the session can still last
func (smg *SMGeneric) BiRPCV2UpdateSession(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *time.Duration) (err error) {
ev map[string]interface{}, maxUsage *time.Duration) (err error) {
var minMaxUsage time.Duration
if minMaxUsage, err = smg.UpdateSession(ev, clnt, ""); err != nil {
if minMaxUsage, err = smg.UpdateSession(engine.NewSafEvent(ev), clnt, ""); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1189,8 +1261,8 @@ func (smg *SMGeneric) BiRPCV2UpdateSession(clnt rpcclient.RpcClientConnection,
// Called on session end, should stop debit loop
func (smg *SMGeneric) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, reply *string) (err error) {
if err = smg.TerminateSession(ev, clnt, ""); err != nil {
ev map[string]interface{}, reply *string) (err error) {
if err = smg.TerminateSession(engine.NewSafEvent(ev), clnt, ""); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1202,8 +1274,8 @@ func (smg *SMGeneric) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection
// Called on individual Events (eg SMS)
func (smg *SMGeneric) BiRPCV1ChargeEvent(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *float64) error {
if minMaxUsage, err := smg.ChargeEvent(ev); err != nil {
ev map[string]interface{}, maxUsage *float64) error {
if minMaxUsage, err := smg.ChargeEvent(engine.NewSafEvent(ev)); err != nil {
return utils.NewErrServerError(err)
} else {
*maxUsage = minMaxUsage.Seconds()
@@ -1213,8 +1285,8 @@ func (smg *SMGeneric) BiRPCV1ChargeEvent(clnt rpcclient.RpcClientConnection,
// Called on individual Events (eg SMS)
func (smg *SMGeneric) BiRPCV2ChargeEvent(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, maxUsage *time.Duration) error {
if minMaxUsage, err := smg.ChargeEvent(ev); err != nil {
ev map[string]interface{}, maxUsage *time.Duration) error {
if minMaxUsage, err := smg.ChargeEvent(engine.NewSafEvent(ev)); err != nil {
return utils.NewErrServerError(err)
} else {
*maxUsage = minMaxUsage
@@ -1224,8 +1296,8 @@ func (smg *SMGeneric) BiRPCV2ChargeEvent(clnt rpcclient.RpcClientConnection,
// Called on session end, should send the CDR to CDRS
func (smg *SMGeneric) BiRPCV1ProcessCDR(clnt rpcclient.RpcClientConnection,
ev SMGenericEvent, reply *string) error {
if err := smg.ProcessCDR(ev); err != nil {
ev map[string]interface{}, reply *string) error {
if err := smg.ProcessCDR(engine.NewSafEvent(ev)); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
@@ -1467,7 +1539,7 @@ func (smg *SMGeneric) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
if smg.rals == nil {
return utils.NewErrNotConnected(utils.RALService)
}
maxUsage, err := smg.GetMaxUsage(args.CGREvent.Event)
maxUsage, err := smg.GetMaxUsage(engine.NewSafEvent(args.CGREvent.Event))
if err != nil {
return utils.NewErrRALs(err)
}
@@ -1713,7 +1785,8 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
}
if maxUsage, err := smg.InitiateSession(args.CGREvent.Event, clnt, originID); err != nil {
if maxUsage, err := smg.InitiateSession(
engine.NewSafEvent(args.CGREvent.Event), clnt, originID); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
@@ -1869,7 +1942,8 @@ func (smg *SMGeneric) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection,
if err != nil {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
if maxUsage, err := smg.UpdateSession(args.CGREvent.Event, clnt, originID); err != nil {
if maxUsage, err := smg.UpdateSession(
engine.NewSafEvent(args.CGREvent.Event), clnt, originID); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
@@ -1916,7 +1990,8 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection
if err != nil {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
if err = smg.TerminateSession(args.CGREvent.Event, clnt, originID); err != nil {
if err = smg.TerminateSession(
engine.NewSafEvent(args.CGREvent.Event), clnt, originID); err != nil {
return utils.NewErrRALs(err)
}
}
@@ -1968,7 +2043,7 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection
// Called on session end, should send the CDR to CDRS
func (smg *SMGeneric) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
cgrEv utils.CGREvent, reply *string) error {
if err := smg.ProcessCDR(cgrEv.Event); err != nil {
if err := smg.ProcessCDR(engine.NewSafEvent(cgrEv.Event)); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
@@ -2053,7 +2128,7 @@ func (smg *SMGeneric) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
if smg.rals == nil {
return utils.NewErrNotConnected(utils.RALService)
}
if maxUsage, err := smg.ChargeEvent(args.CGREvent.Event); err != nil {
if maxUsage, err := smg.ChargeEvent(engine.NewSafEvent(args.CGREvent.Event)); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage

View File

@@ -1,492 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessions
import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
nilTime time.Time
nilDuration time.Duration
)
type SMGenericEvent map[string]interface{}
func (ev SMGenericEvent) HasField(fieldName string) (hasField bool) {
_, hasField = ev[fieldName]
return
}
func (self SMGenericEvent) GetName() string {
result, _ := utils.CastFieldIfToString(self[utils.EVENT_NAME])
return result
}
func (self SMGenericEvent) GetTOR(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.ToR
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetCGRID(oIDFieldName string) string {
return utils.Sha1(self.GetOriginID(oIDFieldName), self.GetOriginatorIP(utils.META_DEFAULT))
}
// GetOriginID returns the OriginID from event
// fieldName offers the possibility to extract info from other fields, eg: InitialOriginID
func (self SMGenericEvent) GetOriginID(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.OriginID
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetSessionIds() []string {
return []string{self.GetOriginID(utils.META_DEFAULT)}
}
func (self SMGenericEvent) GetDirection(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.Direction
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetAccount(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.Account
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetSubject(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.Subject
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetDestination(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.Destination
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetCallDestNr(fieldName string) string {
return self.GetDestination(fieldName)
}
func (self SMGenericEvent) GetCategory(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.Category
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetTenant(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.Tenant
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetReqType(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.RequestType
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetSetupTime(fieldName, timezone string) (time.Time, error) {
if fieldName == utils.META_DEFAULT {
fieldName = utils.SetupTime
}
return utils.IfaceAsTime(self[fieldName], timezone)
}
func (self SMGenericEvent) GetAnswerTime(fieldName, timezone string) (time.Time, error) {
if fieldName == utils.META_DEFAULT {
fieldName = utils.AnswerTime
}
return utils.IfaceAsTime(self[fieldName], timezone)
}
func (self SMGenericEvent) GetEndTime(fieldName, timezone string) (time.Time, error) {
var nilTime time.Time
aTime, err := self.GetAnswerTime(utils.META_DEFAULT, timezone)
if err != nil {
return nilTime, err
}
dur, err := self.GetUsage(utils.META_DEFAULT)
if err != nil {
return nilTime, err
}
return aTime.Add(dur), nil
}
func (self SMGenericEvent) GetUsage(fieldName string) (time.Duration, error) {
if fieldName == utils.META_DEFAULT {
fieldName = utils.Usage
}
valIf, hasVal := self[fieldName]
if !hasVal {
return nilDuration, utils.ErrNotFound
}
result, _ := utils.CastFieldIfToString(valIf)
return utils.ParseDurationWithNanosecs(result)
}
func (self SMGenericEvent) GetLastUsed(fieldName string) (time.Duration, error) {
if fieldName == utils.META_DEFAULT {
fieldName = utils.LastUsed
}
valStr, hasVal := self[fieldName]
if !hasVal {
return nilDuration, utils.ErrNotFound
}
result, _ := utils.CastFieldIfToString(valStr)
return utils.ParseDurationWithNanosecs(result)
}
// GetSessionTTL retrieves SessionTTL setting out of SMGenericEvent
func (self SMGenericEvent) GetSessionTTL(sesTTL time.Duration,
cfgSessionTTLMaxDelay *time.Duration) time.Duration {
valIf, hasVal := self[utils.SessionTTL]
if hasVal {
ttlStr, converted := utils.CastFieldIfToString(valIf)
if !converted {
utils.Logger.Warning(
fmt.Sprintf("SMGenericEvent, cannot convert SessionTTL, disabling functionality for event: <%s>",
self.GetCGRID(utils.META_DEFAULT)))
return time.Duration(0)
}
var err error
if sesTTL, err = utils.ParseDurationWithNanosecs(ttlStr); err != nil {
utils.Logger.Warning(
fmt.Sprintf("SMGenericEvent, cannot parse SessionTTL, disabling functionality for event: <%s>",
self.GetCGRID(utils.META_DEFAULT)))
return time.Duration(0)
}
}
// Variable sessionTTL
var sessionTTLMaxDelay int64
if cfgSessionTTLMaxDelay != nil {
sessionTTLMaxDelay = cfgSessionTTLMaxDelay.Nanoseconds() / 1000000 // Milliseconds precision
}
if sesTTLMaxDelayIf, hasVal := self[utils.SessionTTLMaxDelay]; hasVal {
maxTTLDelaxStr, converted := utils.CastFieldIfToString(sesTTLMaxDelayIf)
if !converted {
utils.Logger.Warning(fmt.Sprintf("SMGenericEvent, cannot convert SessionTTLMaxDelay, disabling functionality for event: <%s>",
self.GetCGRID(utils.META_DEFAULT)))
return time.Duration(0)
}
if maxTTLDelay, err := utils.ParseDurationWithNanosecs(maxTTLDelaxStr); err != nil {
utils.Logger.Warning(fmt.Sprintf("SMGenericEvent, cannot parse SessionTTLMaxDelay, disabling functionality for event: <%s>",
self.GetCGRID(utils.META_DEFAULT)))
return time.Duration(0)
} else {
sessionTTLMaxDelay = maxTTLDelay.Nanoseconds() / 1000000
}
}
if sessionTTLMaxDelay != 0 {
rand.Seed(time.Now().Unix())
sesTTL += time.Duration(rand.Int63n(sessionTTLMaxDelay) * 1000000)
}
return sesTTL
}
// GetSessionTTLLastUsed retrieves SessionTTLLastUsed setting out of SMGenericEvent
func (self SMGenericEvent) GetSessionTTLLastUsed() *time.Duration {
valIf, hasVal := self[utils.SessionTTLLastUsed]
if !hasVal {
return nil
}
ttlStr, converted := utils.CastFieldIfToString(valIf)
if !converted {
return nil
}
if ttl, err := utils.ParseDurationWithNanosecs(ttlStr); err != nil {
return nil
} else {
return &ttl
}
}
// GetSessionTTLUsage retrieves SessionTTLUsage setting out of SMGenericEvent
func (self SMGenericEvent) GetSessionTTLUsage() *time.Duration {
valIf, hasVal := self[utils.SessionTTLUsage]
if !hasVal {
return nil
}
ttlStr, converted := utils.CastFieldIfToString(valIf)
if !converted {
return nil
}
if ttl, err := utils.ParseDurationWithNanosecs(ttlStr); err != nil {
return nil
} else {
return &ttl
}
}
func (self SMGenericEvent) GetMaxUsage(fieldName string, cfgMaxUsage time.Duration) (time.Duration, error) {
if fieldName == utils.META_DEFAULT {
fieldName = utils.Usage
}
maxUsageStr, hasIt := self[fieldName]
if !hasIt {
return cfgMaxUsage, nil
}
result, _ := utils.CastFieldIfToString(maxUsageStr)
return utils.ParseDurationWithNanosecs(result)
}
func (self SMGenericEvent) GetPdd(fieldName string) (time.Duration, error) {
if fieldName == utils.META_DEFAULT {
fieldName = utils.PDD
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return utils.ParseDurationWithNanosecs(result)
}
func (self SMGenericEvent) GetSupplier(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.SUPPLIER
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetDisconnectCause(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.DISCONNECT_CAUSE
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetOriginatorIP(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.OriginHost
}
result, _ := utils.CastFieldIfToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetCdrSource() string {
if self.GetName() != "" {
return utils.MetaSessionS + "_" + self.GetName()
}
return utils.MetaSessionS
}
func (self SMGenericEvent) GetExtraFields() map[string]string {
extraFields := make(map[string]string)
for key, val := range self {
primaryFields := append(utils.PrimaryCdrFields, utils.EVENT_NAME)
if utils.IsSliceMember(primaryFields, key) {
continue
}
result, _ := utils.CastFieldIfToString(val)
extraFields[key] = result
}
return extraFields
}
func (self SMGenericEvent) GetFieldAsString(fieldName string) (string, error) {
valIf, hasVal := self[fieldName]
if !hasVal {
return "", utils.ErrNotFound
}
result, converted := utils.CastFieldIfToString(valIf)
if !converted {
return "", utils.ErrNotConvertible
}
return result, nil
}
func (self SMGenericEvent) MissingParameter(timezone string) bool {
switch self.GetName() {
case utils.CGR_AUTHORIZATION:
if setupTime, err := self.GetSetupTime(utils.META_DEFAULT, timezone); err != nil || setupTime == nilTime {
return true
}
return len(self.GetAccount(utils.META_DEFAULT)) == 0 ||
len(self.GetDestination(utils.META_DEFAULT)) == 0
case utils.CGR_SESSION_START:
return false
case utils.CGR_SESSION_UPDATE:
return false
case utils.CGR_SESSION_END:
return false
case utils.CGR_LCR_REQUEST:
return false
}
return true // Unhandled event
}
func (self SMGenericEvent) ParseEventValue(rsrFld *utils.RSRField, timezone string) (parsed string, err error) {
switch rsrFld.Id {
case utils.CGRID:
rsrFld.Parse(self.GetCGRID(utils.META_DEFAULT))
case utils.ToR:
return rsrFld.Parse(utils.VOICE)
case utils.OriginID:
return rsrFld.Parse(self.GetOriginID(utils.META_DEFAULT))
case utils.OriginHost:
return rsrFld.Parse(self.GetOriginatorIP(utils.META_DEFAULT))
case utils.Source:
return rsrFld.Parse(self.GetName())
case utils.RequestType:
return rsrFld.Parse(self.GetReqType(utils.META_DEFAULT))
case utils.Direction:
return rsrFld.Parse(self.GetDirection(utils.META_DEFAULT))
case utils.Tenant:
return rsrFld.Parse(self.GetTenant(utils.META_DEFAULT))
case utils.Category:
return rsrFld.Parse(self.GetCategory(utils.META_DEFAULT))
case utils.Account:
return rsrFld.Parse(self.GetAccount(utils.META_DEFAULT))
case utils.Subject:
return rsrFld.Parse(self.GetSubject(utils.META_DEFAULT))
case utils.Destination:
return rsrFld.Parse(self.GetDestination(utils.META_DEFAULT))
case utils.SetupTime:
st, _ := self.GetSetupTime(utils.META_DEFAULT, timezone)
return rsrFld.Parse(st.String())
case utils.AnswerTime:
at, _ := self.GetAnswerTime(utils.META_DEFAULT, timezone)
return rsrFld.Parse(at.String())
case utils.Usage:
dur, _ := self.GetUsage(utils.META_DEFAULT)
return rsrFld.Parse(strconv.FormatInt(dur.Nanoseconds(), 10))
case utils.PDD:
pdd, _ := self.GetPdd(utils.META_DEFAULT)
return rsrFld.Parse(strconv.FormatFloat(pdd.Seconds(), 'f', -1, 64))
case utils.SUPPLIER:
return rsrFld.Parse(self.GetSupplier(utils.META_DEFAULT))
case utils.DISCONNECT_CAUSE:
return rsrFld.Parse(self.GetDisconnectCause(utils.META_DEFAULT))
case utils.RunID:
return rsrFld.Parse(utils.META_DEFAULT)
case utils.COST:
return rsrFld.Parse(strconv.FormatFloat(-1, 'f', -1, 64)) // Recommended to use FormatCost
default:
return rsrFld.Parse(self[rsrFld.Id])
}
return
}
func (self SMGenericEvent) PassesFieldFilter(*utils.RSRField) (bool, string) {
return true, ""
}
func (self SMGenericEvent) AsCDR(cfg *config.CGRConfig, timezone string) *engine.CDR {
storCdr := engine.NewCDRWithDefaults(cfg)
storCdr.CGRID = self.GetCGRID(utils.META_DEFAULT)
storCdr.ToR = utils.FirstNonEmpty(self.GetTOR(utils.META_DEFAULT),
storCdr.ToR) // Keep default if none in the event
storCdr.OriginID = self.GetOriginID(utils.META_DEFAULT)
storCdr.OriginHost = self.GetOriginatorIP(utils.META_DEFAULT)
storCdr.Source = self.GetCdrSource()
storCdr.RequestType = utils.FirstNonEmpty(self.GetReqType(utils.META_DEFAULT),
storCdr.RequestType)
storCdr.Tenant = utils.FirstNonEmpty(self.GetTenant(utils.META_DEFAULT),
storCdr.Tenant)
storCdr.Category = utils.FirstNonEmpty(self.GetCategory(utils.META_DEFAULT),
storCdr.Category)
storCdr.Account = self.GetAccount(utils.META_DEFAULT)
storCdr.Subject = utils.FirstNonEmpty(self.GetSubject(utils.META_DEFAULT),
self.GetAccount(utils.META_DEFAULT))
storCdr.Destination = self.GetDestination(utils.META_DEFAULT)
storCdr.SetupTime, _ = self.GetSetupTime(utils.META_DEFAULT, timezone)
storCdr.AnswerTime, _ = self.GetAnswerTime(utils.META_DEFAULT, timezone)
storCdr.Usage, _ = self.GetUsage(utils.META_DEFAULT)
storCdr.ExtraFields = self.GetExtraFields()
storCdr.Cost = -1
return storCdr
}
func (self SMGenericEvent) String() string {
jsn, _ := json.Marshal(self)
return string(jsn)
}
func (self SMGenericEvent) ComputeLcr() bool {
computeLcr, _ := self[utils.COMPUTE_LCR].(bool)
return computeLcr
}
func (self SMGenericEvent) AsLcrRequest() *engine.LcrRequest {
setupTimeStr, _ := utils.CastFieldIfToString(self[utils.SetupTime])
usageStr, _ := utils.CastFieldIfToString(self[utils.Usage])
return &engine.LcrRequest{
Direction: self.GetDirection(utils.META_DEFAULT),
Tenant: self.GetTenant(utils.META_DEFAULT),
Category: self.GetCategory(utils.META_DEFAULT),
Account: self.GetAccount(utils.META_DEFAULT),
Subject: self.GetSubject(utils.META_DEFAULT),
Destination: self.GetDestination(utils.META_DEFAULT),
SetupTime: utils.FirstNonEmpty(setupTimeStr),
Duration: usageStr,
}
}
// AsMapStringString Converts into map[string]string, used for example as pubsub event
func (self SMGenericEvent) AsMapStringString() (map[string]string, error) {
mp := make(map[string]string, len(self))
for k, v := range self {
if strV, casts := utils.CastIfToString(v); !casts {
return nil, fmt.Errorf("Value %+v does not cast to string", v)
} else {
mp[k] = strV
}
}
return mp, nil
}
func (self SMGenericEvent) Clone() SMGenericEvent {
evOut := make(SMGenericEvent, len(self))
for key, val := range self {
evOut[key] = val
}
return evOut
}

View File

@@ -1,208 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessions
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var cfg, _ = config.NewDefaultCGRConfig()
var err error
func TestSMGenericEventParseFields(t *testing.T) {
smGev := SMGenericEvent{}
smGev[utils.EVENT_NAME] = "TEST_EVENT"
smGev[utils.ToR] = "*voice"
smGev[utils.OriginID] = "12345"
smGev[utils.Account] = "account1"
smGev[utils.Subject] = "subject1"
smGev[utils.Destination] = "+4986517174963"
smGev[utils.Category] = "call"
smGev[utils.Tenant] = "cgrates.org"
smGev[utils.RequestType] = "*prepaid"
smGev[utils.SetupTime] = "2015-11-09 14:21:24"
smGev[utils.AnswerTime] = "2015-11-09 14:22:02"
smGev[utils.Usage] = "1m23s"
smGev[utils.LastUsed] = "21s"
smGev[utils.OriginHost] = "127.0.0.1"
smGev["Extra1"] = "Value1"
smGev["Extra2"] = 5
if smGev.GetName() != "TEST_EVENT" {
t.Error("Unexpected: ", smGev.GetName())
}
if smGev.GetCGRID(utils.META_DEFAULT) != "cade401f46f046311ed7f62df3dfbb84adb98aad" {
t.Error("Unexpected: ", smGev.GetCGRID(utils.META_DEFAULT))
}
if smGev.GetOriginID(utils.META_DEFAULT) != "12345" {
t.Error("Unexpected: ", smGev.GetOriginID(utils.META_DEFAULT))
}
if !reflect.DeepEqual(smGev.GetSessionIds(), []string{"12345"}) {
t.Error("Unexpected: ", smGev.GetSessionIds())
}
if smGev.GetTOR(utils.META_DEFAULT) != "*voice" {
t.Error("Unexpected: ", smGev.GetTOR(utils.META_DEFAULT))
}
if smGev.GetAccount(utils.META_DEFAULT) != "account1" {
t.Error("Unexpected: ", smGev.GetAccount(utils.META_DEFAULT))
}
if smGev.GetSubject(utils.META_DEFAULT) != "subject1" {
t.Error("Unexpected: ", smGev.GetSubject(utils.META_DEFAULT))
}
if smGev.GetDestination(utils.META_DEFAULT) != "+4986517174963" {
t.Error("Unexpected: ", smGev.GetDestination(utils.META_DEFAULT))
}
if smGev.GetCategory(utils.META_DEFAULT) != "call" {
t.Error("Unexpected: ", smGev.GetCategory(utils.META_DEFAULT))
}
if smGev.GetTenant(utils.META_DEFAULT) != "cgrates.org" {
t.Error("Unexpected: ", smGev.GetTenant(utils.META_DEFAULT))
}
if smGev.GetReqType(utils.META_DEFAULT) != "*prepaid" {
t.Error("Unexpected: ", smGev.GetReqType(utils.META_DEFAULT))
}
if st, err := smGev.GetSetupTime(utils.META_DEFAULT, "UTC"); err != nil {
t.Error(err)
} else if !st.Equal(time.Date(2015, 11, 9, 14, 21, 24, 0, time.UTC)) {
t.Error("Unexpected: ", st)
}
if at, err := smGev.GetAnswerTime(utils.META_DEFAULT, "UTC"); err != nil {
t.Error(err)
} else if !at.Equal(time.Date(2015, 11, 9, 14, 22, 2, 0, time.UTC)) {
t.Error("Unexpected: ", at)
}
if et, err := smGev.GetEndTime(utils.META_DEFAULT, "UTC"); err != nil {
t.Error(err)
} else if !et.Equal(time.Date(2015, 11, 9, 14, 23, 25, 0, time.UTC)) {
t.Error("Unexpected: ", et)
}
if dur, err := smGev.GetUsage(utils.META_DEFAULT); err != nil {
t.Error(err)
} else if dur != time.Duration(83)*time.Second {
t.Error("Unexpected: ", dur)
}
if lastUsed, err := smGev.GetLastUsed(utils.META_DEFAULT); err != nil {
t.Error(err)
} else if lastUsed != time.Duration(21)*time.Second {
t.Error("Unexpected: ", lastUsed)
}
if smGev.GetOriginatorIP(utils.META_DEFAULT) != "127.0.0.1" {
t.Error("Unexpected: ", smGev.GetOriginatorIP(utils.META_DEFAULT))
}
if extrFlds := smGev.GetExtraFields(); !reflect.DeepEqual(extrFlds,
map[string]string{"Extra1": "Value1", "Extra2": "5", "LastUsed": "21s"}) {
t.Error("Unexpected: ", extrFlds)
}
}
func TestSMGenericEventGetSessionTTL(t *testing.T) {
smGev := SMGenericEvent{}
smGev[utils.EVENT_NAME] = "TEST_SESSION_TTL"
cfgSesTTL := time.Duration(5 * time.Second)
if sTTL := smGev.GetSessionTTL(time.Duration(5*time.Second), nil); sTTL != cfgSesTTL {
t.Errorf("Expecting: %v, received: %v", cfgSesTTL, sTTL)
}
smGev[utils.SessionTTL] = "6s"
eSesTTL := time.Duration(6 * time.Second)
if sTTL := smGev.GetSessionTTL(time.Duration(5*time.Second), nil); sTTL != eSesTTL {
t.Errorf("Expecting: %v, received: %v", eSesTTL, sTTL)
}
sesTTLMaxDelay := time.Duration(10 * time.Second)
if sTTL := smGev.GetSessionTTL(time.Duration(5*time.Second), &sesTTLMaxDelay); sTTL == eSesTTL || sTTL > eSesTTL+sesTTLMaxDelay {
t.Errorf("Received: %v", sTTL)
}
}
func TestSMGenericEventAsCDR(t *testing.T) {
smGev := SMGenericEvent{}
smGev[utils.EVENT_NAME] = "TEST_EVENT"
smGev[utils.ToR] = utils.SMS
smGev[utils.OriginID] = "12345"
smGev[utils.Account] = "account1"
smGev[utils.Subject] = "subject1"
smGev[utils.Destination] = "+4986517174963"
smGev[utils.Category] = "call"
smGev[utils.Tenant] = "cgrates.org"
smGev[utils.RequestType] = utils.META_PREPAID
smGev[utils.SetupTime] = "2015-11-09 14:21:24"
smGev[utils.AnswerTime] = "2015-11-09 14:22:02"
smGev[utils.Usage] = "1m23s"
smGev[utils.OriginHost] = "10.0.3.15"
smGev["Extra1"] = "Value1"
smGev["Extra2"] = 5
eStoredCdr := &engine.CDR{CGRID: "70c4d16dce41d1f2777b4e8442cff39cf87f5f19",
ToR: utils.SMS, OriginID: "12345", OriginHost: "10.0.3.15", Source: "*sessions_TEST_EVENT",
RequestType: utils.META_PREPAID,
Tenant: "cgrates.org", Category: "call", Account: "account1", Subject: "subject1",
Destination: "+4986517174963", SetupTime: time.Date(2015, 11, 9, 14, 21, 24, 0, time.UTC),
AnswerTime: time.Date(2015, 11, 9, 14, 22, 2, 0, time.UTC),
Usage: time.Duration(83) * time.Second,
ExtraFields: map[string]string{"Extra1": "Value1", "Extra2": "5"}, Cost: -1}
if storedCdr := smGev.AsCDR(cfg, "UTC"); !reflect.DeepEqual(eStoredCdr, storedCdr) {
t.Errorf("Expecting: %+v, received: %+v", eStoredCdr, storedCdr)
}
}
func TestSMGenericEventAsLcrRequest(t *testing.T) {
smGev := SMGenericEvent{}
smGev[utils.EVENT_NAME] = "TEST_EVENT"
smGev[utils.ToR] = utils.VOICE
smGev[utils.OriginID] = "12345"
smGev[utils.Direction] = utils.OUT
smGev[utils.Account] = "account1"
smGev[utils.Subject] = "subject1"
smGev[utils.Destination] = "+4986517174963"
smGev[utils.Category] = "call"
smGev[utils.Tenant] = "cgrates.org"
smGev[utils.RequestType] = utils.META_PREPAID
smGev[utils.SetupTime] = "2015-11-09 14:21:24"
smGev[utils.AnswerTime] = "2015-11-09 14:22:02"
smGev[utils.Usage] = "1m23s"
smGev[utils.PDD] = "300ms"
smGev[utils.SUPPLIER] = "supplier1"
smGev[utils.DISCONNECT_CAUSE] = "NORMAL_DISCONNECT"
smGev[utils.OriginHost] = "10.0.3.15"
smGev["Extra1"] = "Value1"
smGev["Extra2"] = 5
eLcrReq := &engine.LcrRequest{Direction: utils.OUT, Tenant: "cgrates.org", Category: "call",
Account: "account1", Subject: "subject1", Destination: "+4986517174963", SetupTime: "2015-11-09 14:21:24", Duration: "1m23s"}
if lcrReq := smGev.AsLcrRequest(); !reflect.DeepEqual(eLcrReq, lcrReq) {
t.Errorf("Expecting: %+v, received: %+v", eLcrReq, lcrReq)
}
}
func TestSMGenericEventGetFieldAsString(t *testing.T) {
smGev := SMGenericEvent{}
smGev[utils.EVENT_NAME] = "TEST_EVENT"
smGev[utils.ToR] = utils.VOICE
smGev[utils.OriginID] = "12345"
smGev[utils.Direction] = utils.OUT
smGev[utils.Account] = "account1"
smGev[utils.Subject] = "subject1"
eFldVal := utils.VOICE
if strVal, err := smGev.GetFieldAsString(utils.ToR); err != nil {
t.Error(err)
} else if strVal != eFldVal {
t.Errorf("Expecting:\n%s\nReceived:\n%s", eFldVal, strVal)
}
}

View File

@@ -22,6 +22,7 @@ import (
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -35,7 +36,7 @@ func init() {
func TestSMGSessionIndexing(t *testing.T) {
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
smGev := SMGenericEvent{
smGev := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
utils.OriginID: "12345",
@@ -57,10 +58,11 @@ func TestSMGSessionIndexing(t *testing.T) {
"Extra1": "Value1",
"Extra2": 5,
"Extra3": "",
}
})
// Index first session
smgSession := &SMGSession{CGRID: smGev.GetCGRID(utils.META_DEFAULT), RunID: utils.META_DEFAULT, EventStart: smGev}
cgrID := smGev.GetCGRID(utils.META_DEFAULT)
smgSession := &SMGSession{CGRID: GetSetCGRID(smGev),
RunID: utils.META_DEFAULT, EventStart: smGev}
cgrID := GetSetCGRID(smGev)
smg.indexSession(smgSession, false)
eIndexes := map[string]map[string]map[string]utils.StringMap{
"OriginID": map[string]map[string]utils.StringMap{
@@ -100,7 +102,8 @@ func TestSMGSessionIndexing(t *testing.T) {
},
}
if !reflect.DeepEqual(eIndexes, smg.aSessionsIndex) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.aSessionsIndex)
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eIndexes), utils.ToJSON(smg.aSessionsIndex))
}
eRIdxes := map[string][]*riFieldNameVal{
cgrID: []*riFieldNameVal{
@@ -116,7 +119,7 @@ func TestSMGSessionIndexing(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", eRIdxes, smg.aSessionsRIndex)
}
// Index second session
smGev2 := SMGenericEvent{
smGev2 := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT2",
utils.OriginID: "12346",
utils.Direction: "*out",
@@ -125,19 +128,21 @@ func TestSMGSessionIndexing(t *testing.T) {
utils.Tenant: "itsyscom.com",
"Extra3": "",
"Extra4": "info2",
}
cgrID2 := smGev2.GetCGRID(utils.META_DEFAULT)
smgSession2 := &SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), RunID: utils.META_DEFAULT, EventStart: smGev2}
})
cgrID2 := GetSetCGRID(smGev2)
smgSession2 := &SMGSession{CGRID: cgrID2,
RunID: utils.META_DEFAULT, EventStart: smGev2}
smg.indexSession(smgSession2, false)
smGev3 := SMGenericEvent{
smGev3 := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT3",
utils.Tenant: "cgrates.org",
utils.OriginID: "12347",
utils.Account: "account2",
"Extra5": "info5",
}
cgrID3 := smGev3.GetCGRID(utils.META_DEFAULT)
smgSession3 := &SMGSession{CGRID: smGev3.GetCGRID(utils.META_DEFAULT), RunID: "secondRun", EventStart: smGev3}
})
cgrID3 := GetSetCGRID(smGev3)
smgSession3 := &SMGSession{CGRID: cgrID3,
RunID: "secondRun", EventStart: smGev3}
smg.indexSession(smgSession3, false)
eIndexes = map[string]map[string]map[string]utils.StringMap{
"OriginID": map[string]map[string]utils.StringMap{
@@ -390,7 +395,7 @@ func TestSMGSessionIndexing(t *testing.T) {
func TestSMGActiveSessions(t *testing.T) {
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
smGev1 := SMGenericEvent{
smGev1 := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
utils.OriginID: "111",
@@ -412,9 +417,10 @@ func TestSMGActiveSessions(t *testing.T) {
"Extra1": "Value1",
"Extra2": 5,
"Extra3": "",
}
smg.recordASession(&SMGSession{CGRID: smGev1.GetCGRID(utils.META_DEFAULT), RunID: utils.META_DEFAULT, EventStart: smGev1})
smGev2 := SMGenericEvent{
})
smg.recordASession(&SMGSession{CGRID: GetSetCGRID(smGev1),
RunID: utils.META_DEFAULT, EventStart: smGev1})
smGev2 := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
utils.OriginID: "222",
@@ -433,8 +439,9 @@ func TestSMGActiveSessions(t *testing.T) {
utils.OriginHost: "127.0.0.1",
"Extra1": "Value1",
"Extra3": "extra3",
}
smg.recordASession(&SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), RunID: utils.META_DEFAULT, EventStart: smGev2})
})
smg.recordASession(&SMGSession{CGRID: GetSetCGRID(smGev2),
RunID: utils.META_DEFAULT, EventStart: smGev2})
if aSessions, _, err := smg.asActiveSessions(nil, false, false); err != nil {
t.Error(err)
} else if len(aSessions) != 2 {
@@ -467,7 +474,7 @@ func TestGetPassiveSessions(t *testing.T) {
if pSS := smg.getSessions("", true); len(pSS) != 0 {
t.Errorf("PassiveSessions: %+v", pSS)
}
smGev1 := SMGenericEvent{
smGev1 := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
utils.OriginID: "12345",
@@ -489,12 +496,14 @@ func TestGetPassiveSessions(t *testing.T) {
"Extra1": "Value1",
"Extra2": 5,
"Extra3": "",
}
})
// Index first session
smgSession11 := &SMGSession{CGRID: smGev1.GetCGRID(utils.META_DEFAULT), EventStart: smGev1, RunID: utils.META_DEFAULT}
smgSession12 := &SMGSession{CGRID: smGev1.GetCGRID(utils.META_DEFAULT), EventStart: smGev1, RunID: "second_run"}
smgSession11 := &SMGSession{CGRID: GetSetCGRID(smGev1),
EventStart: smGev1, RunID: utils.META_DEFAULT}
smgSession12 := &SMGSession{CGRID: GetSetCGRID(smGev1),
EventStart: smGev1, RunID: "second_run"}
smg.passiveSessions[smgSession11.CGRID] = []*SMGSession{smgSession11, smgSession12}
smGev2 := SMGenericEvent{
smGev2 := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
utils.OriginID: "23456",
@@ -516,11 +525,12 @@ func TestGetPassiveSessions(t *testing.T) {
"Extra1": "Value1",
"Extra2": 5,
"Extra3": "",
}
})
if pSS := smg.getSessions("", true); len(pSS) != 1 {
t.Errorf("PassiveSessions: %+v", pSS)
}
smgSession21 := &SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), EventStart: smGev2, RunID: utils.META_DEFAULT}
smgSession21 := &SMGSession{CGRID: GetSetCGRID(smGev2),
EventStart: smGev2, RunID: utils.META_DEFAULT}
smg.passiveSessions[smgSession21.CGRID] = []*SMGSession{smgSession21}
if pSS := smg.getSessions("", true); len(pSS) != 2 {
t.Errorf("PassiveSessions: %+v", pSS)

View File

@@ -145,6 +145,10 @@ func NewErrStringCast(valIface interface{}) error {
return fmt.Errorf("cannot cast value: %v to string", valIface)
}
func NewErrFldStringCast(fldName string, valIface interface{}) error {
return fmt.Errorf("cannot cast field: %s with value: %v to string", fldName, valIface)
}
func ErrHasPrefix(err error, prfx string) (has bool) {
if err == nil {
return