First cut of KamailioAgent

This commit is contained in:
DanB
2018-01-26 20:17:56 +01:00
parent dd2d330f58
commit 72ed8dcb0d
16 changed files with 594 additions and 990 deletions

View File

@@ -21,15 +21,12 @@ package agents
import (
"errors"
"fmt"
"reflect"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/fsock"
"github.com/cgrates/rpcclient"
)
func NewFSSessionManager(fsAgentConfig *config.FsAgentConfig,
@@ -222,12 +219,6 @@ func (sm *FSSessionManager) onChannelAnswer(fsev FSEvent, connId string) {
sm.disconnectSession(connId, chanUUID, "", utils.ErrServerError.Error())
return
}
if initSessionArgs.AllocateResources {
if initReply.ResourceAllocation == nil {
sm.disconnectSession(connId, chanUUID, "",
utils.ErrUnallocatedResource.Error())
}
}
}
func (sm *FSSessionManager) onChannelHangupComplete(fsev FSEvent, connId string) {
@@ -241,14 +232,13 @@ func (sm *FSSessionManager) onChannelHangupComplete(fsev FSEvent, connId string)
utils.Logger.Err(
fmt.Sprintf("<%s> Could not terminate session with event %s, error: %s",
utils.FreeSWITCHAgent, fsev.GetUUID(), err.Error()))
return
}
}
if sm.cfg.CreateCdr {
cdr := fsev.AsCDR(sm.timezone)
if err := sm.smg.Call(utils.SessionSv1ProcessCDR, cdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>",
utils.FreeSWITCHAgent, cdr.CGRID, cdr.OriginID, err.Error()))
utils.Logger.Err(fmt.Sprintf("<%s> Failed processing CDR: %s, error: <%s>",
utils.FreeSWITCHAgent, utils.ToJSON(cdr), err.Error()))
}
}
}
@@ -342,30 +332,8 @@ func (sm *FSSessionManager) Shutdown() (err error) {
}
// rpcclient.RpcClientConnection interface
func (fsa *FSSessionManager) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return rpcclient.ErrUnsupporteServiceMethod
}
// get method
method := reflect.ValueOf(fsa).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
if !method.IsValid() {
return rpcclient.ErrUnsupporteServiceMethod
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
func (sm *FSSessionManager) Call(serviceMethod string, args interface{}, reply interface{}) error {
return utils.APIerRPCCall(sm, serviceMethod, args, reply)
}
// Internal method to disconnect session in asterisk

View File

@@ -67,14 +67,10 @@ const (
IGNOREPARK = "variable_cgr_ignorepark"
FS_VARPREFIX = "variable_"
VarCGRSubsystems = "variable_cgr_subsystems"
SubSAccountS = "accounts"
SubSSupplierS = "suppliers"
SubSResourceS = "resources"
SubSAttributeS = "attributes"
CGRResourceAllocation = "cgr_resource_allocation"
VAR_CGR_DISCONNECT_CAUSE = "variable_" + utils.CGR_DISCONNECT_CAUSE
VAR_CGR_CMPUTELCR = "variable_" + utils.CGR_COMPUTELCR
FsConnID = "FsConnID" // used to share connID info in event
FsConnID = "FsConnID" // used to share connID info in event for remote disconnects
VarAnswerEpoch = "variable_answer_epoch"
)
@@ -386,22 +382,22 @@ func (fsev FSEvent) V1AuthorizeArgs() (args *sessionmanager.V1AuthorizeArgs) {
if !has {
return
}
if strings.Index(subsystems, SubSAccountS) == -1 {
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.GetMaxUsage = false
}
if strings.Index(subsystems, SubSResourceS) != -1 {
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AuthorizeResources = true
}
if strings.Index(subsystems, SubSSupplierS) != -1 {
if strings.Index(subsystems, utils.MetaSuppliers) != -1 {
args.GetSuppliers = true
}
if strings.Index(subsystems, SubSAttributeS) != -1 {
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
return
}
// V2InitSessionArgs returns the arguments used in SMGv1.InitSession
// V1InitSessionArgs returns the arguments used in SessionSv1.InitSession
func (fsev FSEvent) V1InitSessionArgs() (args *sessionmanager.V1InitSessionArgs) {
args = &sessionmanager.V1InitSessionArgs{ // defaults
InitSession: true,
@@ -415,13 +411,13 @@ func (fsev FSEvent) V1InitSessionArgs() (args *sessionmanager.V1InitSessionArgs)
if !has {
return
}
if strings.Index(subsystems, SubSAccountS) == -1 {
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.InitSession = false
}
if strings.Index(subsystems, SubSResourceS) != -1 {
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AllocateResources = true
}
if strings.Index(subsystems, SubSAttributeS) != -1 {
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
return
@@ -441,10 +437,10 @@ func (fsev FSEvent) V1TerminateSessionArgs() (args *sessionmanager.V1TerminateSe
if !has {
return
}
if strings.Index(subsystems, SubSAccountS) == -1 {
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.TerminateSession = false
}
if strings.Index(subsystems, SubSResourceS) != -1 {
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.ReleaseResources = true
}
return

200
agents/kamagent.go Normal file
View File

@@ -0,0 +1,200 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"fmt"
"log"
"regexp"
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/kamevapi"
)
func NewKamailioAgent(kaCfg *config.KamAgentCfg,
sessionS *utils.BiRPCInternalClient, timezone string) (ka *KamailioAgent) {
ka = &KamailioAgent{cfg: kaCfg, sessionS: sessionS,
timezone: timezone,
conns: make(map[string]*kamevapi.KamEvapi)}
ka.sessionS.SetClientConn(ka) // pass the connection to KA back into smg so we can receive the disconnects
return
}
type KamailioAgent struct {
cfg *config.KamAgentCfg
sessionS *utils.BiRPCInternalClient
timezone string
conns map[string]*kamevapi.KamEvapi
}
func (self *KamailioAgent) Connect() error {
var err error
eventHandlers := map[*regexp.Regexp][]func([]byte, string){
regexp.MustCompile(CGR_AUTH_REQUEST): []func([]byte, string){
self.onCgrAuth},
regexp.MustCompile(CGR_CALL_START): []func([]byte, string){
self.onCallStart},
regexp.MustCompile(CGR_CALL_END): []func([]byte, string){self.onCallEnd},
}
errChan := make(chan error)
for _, connCfg := range self.cfg.EvapiConns {
connID := utils.GenUUID()
logger := log.New(utils.Logger, "kamevapi:", 2)
if self.conns[connID], err = kamevapi.NewKamEvapi(connCfg.Address, connID, connCfg.Reconnects, eventHandlers, logger); err != nil {
return err
}
go func() { // Start reading in own goroutine, return on error
if err := self.conns[connID].ReadEvents(); err != nil {
errChan <- err
}
}()
}
err = <-errChan // Will keep the Connect locked until the first error in one of the connections
return err
}
func (self *KamailioAgent) Shutdown() error {
return nil
}
// rpcclient.RpcClientConnection interface
func (ka *KamailioAgent) Call(serviceMethod string, args interface{}, reply interface{}) error {
return utils.APIerRPCCall(ka, serviceMethod, args, reply)
}
// onCgrAuth is called when new event of type CGR_AUTH_REQUEST is coming
func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) {
kev, err := NewKamEvent(evData)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s",
utils.KamailioAgent, evData, err.Error()))
return
}
if kev[utils.RequestType] == utils.META_NONE { // Do not process this request
return
}
if kev.MissingParameter() {
if kRply, err := kev.AsKamAuthReply(nil, nil, utils.ErrMandatoryIeMissing); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
} else if err = ka.conns[connID].Send(kRply.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
}
return
}
authArgs := kev.V1AuthorizeArgs()
var authReply sessionmanager.V1AuthorizeReply
err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply)
if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
} else if err = ka.conns[connID].Send(kar.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
}
}
func (ka *KamailioAgent) onCallStart(evData []byte, connID string) {
kev, err := NewKamEvent(evData)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event: %s, error: %s",
utils.KamailioAgent, evData, err.Error()))
return
}
if kev[utils.RequestType] == utils.META_NONE { // Do not process this request
return
}
if kev.MissingParameter() {
ka.disconnectSession(connID,
NewKamSessionDisconnect(kev[KamHashEntry], kev[KamHashID],
utils.ErrMandatoryIeMissing.Error()))
}
initSessionArgs := kev.V1InitSessionArgs()
initSessionArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID so we can properly disconnect later
var initReply sessionmanager.V1InitSessionReply
if err := ka.sessionS.Call(utils.SessionSv1InitiateSession,
initSessionArgs, &initReply); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> could not process answer for event %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
ka.disconnectSession(connID,
NewKamSessionDisconnect(kev[KamHashEntry], kev[KamHashID],
utils.ErrServerError.Error()))
return
}
}
func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
kev, err := NewKamEvent(evData)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event: %s, error: %s",
utils.KamailioAgent, evData, err.Error()))
return
}
if kev[utils.RequestType] == utils.META_NONE { // Do not process this request
return
}
if kev.MissingParameter() {
utils.Logger.Err(fmt.Sprintf("<%s> mandatory IE missing out from event: %s",
utils.KamailioAgent, kev[utils.OriginID]))
return
}
var reply string
if err := ka.sessionS.Call(utils.SessionSv1TerminateSession,
kev.V1TerminateSessionArgs(), &reply); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> could not terminate session with event %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
// no return here since we want CDR anyhow
}
if ka.cfg.CreateCdr || strings.Index(kev[KamCGRSubsystems], utils.MetaCDRs) != -1 {
cdr := kev.AsCDR(ka.timezone)
if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR, cdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("%s> failed processing CDR: %s, error: %s",
utils.KamailioAgent, utils.ToJSON(cdr), err.Error()))
}
}
}
func (self *KamailioAgent) disconnectSession(connID string, dscEv *KamSessionDisconnect) error {
if err := self.conns[connID].Send(dscEv.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %s, error %s",
utils.KamailioAgent, utils.ToJSON(dscEv), err.Error(), connID))
return err
}
return nil
}
// Internal method to disconnect session in Kamailio
func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) (err error) {
hEntry, _ := utils.CastFieldIfToString(args.EventStart[KamHashEntry])
hID, _ := utils.CastFieldIfToString(args.EventStart[KamHashID])
connID, _ := utils.CastFieldIfToString(args.EventStart[EvapiConnID])
if err = ka.disconnectSession(connID,
NewKamSessionDisconnect(hEntry, hID,
utils.ErrInsufficientCredit.Error())); err != nil {
return
}
*reply = utils.OK
return
}

282
agents/kamevent.go Normal file
View File

@@ -0,0 +1,282 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"encoding/json"
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
)
const (
EVENT = "event"
CGR_AUTH_REQUEST = "CGR_AUTH_REQUEST"
CGR_AUTH_REPLY = "CGR_AUTH_REPLY"
CGR_SESSION_DISCONNECT = "CGR_SESSION_DISCONNECT"
CGR_CALL_START = "CGR_CALL_START"
CGR_CALL_END = "CGR_CALL_END"
KamTRIndex = "tr_index"
KamTRLabel = "tr_label"
KamHashEntry = "h_entry"
KamHashID = "h_id"
KamCGRSubsystems = "cgr_subsystems"
EvapiConnID = "EvapiConnID" // used to share connID info in event for remote disconnects
)
var kamReservedFields = []string{EVENT, KamTRIndex, KamTRLabel,
KamHashEntry, KamHashID, KamCGRSubsystems}
func NewKamSessionDisconnect(hEntry, hID, reason string) *KamSessionDisconnect {
return &KamSessionDisconnect{
Event: CGR_SESSION_DISCONNECT,
HashEntry: hEntry,
HashId: hID,
Reason: reason}
}
type KamSessionDisconnect struct {
Event string
HashEntry string
HashId string
Reason string
}
func (self *KamSessionDisconnect) String() string {
mrsh, _ := json.Marshal(self)
return string(mrsh)
}
// NewKamEvent parses bytes received over the wire from Kamailio into KamEvent
func NewKamEvent(kamEvData []byte) (KamEvent, error) {
kev := make(map[string]string)
if err := json.Unmarshal(kamEvData, &kev); err != nil {
return nil, err
}
return kev, nil
}
// KamEvent represents one event received from Kamailio
type KamEvent map[string]string
func (kev KamEvent) MissingParameter() bool {
switch kev[EVENT] {
case CGR_AUTH_REQUEST:
return utils.IsSliceMember([]string{
kev[KamTRIndex],
kev[KamTRLabel],
kev[utils.SetupTime],
kev[utils.Account],
kev[utils.Destination],
}, "")
case CGR_CALL_START:
return utils.IsSliceMember([]string{
kev[KamHashEntry],
kev[KamHashID],
kev[utils.OriginID],
kev[utils.AnswerTime],
kev[utils.Account],
kev[utils.Destination],
}, "")
case CGR_CALL_END:
return utils.IsSliceMember([]string{
kev[KamHashEntry],
kev[KamHashID],
kev[utils.OriginID],
kev[utils.AnswerTime],
kev[utils.Account],
kev[utils.Destination],
}, "")
default: // no/unsupported event
return true
}
}
// AsMapStringIface converts KamEvent into event used by other subsystems
func (kev KamEvent) AsMapStringInterface() (mp map[string]interface{}) {
mp = make(map[string]interface{})
for k, v := range kev {
if !utils.IsSliceMember(kamReservedFields, k) { // reserved attributes not getting into event
mp[k] = v
}
}
return
}
// AsCDR converts KamEvent into CDR
func (kev KamEvent) AsCDR(timezone string) (cdr *engine.CDR) {
cdr = new(engine.CDR)
for fld, val := range kev { // first ExtraFields so we can overwrite
if !utils.IsSliceMember(utils.PrimaryCdrFields, fld) &&
!utils.IsSliceMember(kamReservedFields, fld) {
cdr.ExtraFields[fld] = val
}
}
cdr.ToR = utils.VOICE
cdr.OriginID = kev[utils.OriginID]
cdr.OriginHost = kev[utils.OriginHost]
cdr.Source = "KamailioEvent"
cdr.RequestType = utils.FirstNonEmpty(kev[utils.RequestType], config.CgrConfig().DefaultReqType)
cdr.Tenant = utils.FirstNonEmpty(kev[utils.Tenant], config.CgrConfig().DefaultTenant)
cdr.Category = utils.FirstNonEmpty(kev[utils.Category], config.CgrConfig().DefaultCategory)
cdr.Account = kev[utils.Account]
cdr.Subject = kev[utils.Subject]
cdr.Destination = kev[utils.Destination]
cdr.SetupTime, _ = utils.ParseTimeDetectLayout(kev[utils.SetupTime], timezone)
cdr.AnswerTime, _ = utils.ParseTimeDetectLayout(kev[utils.AnswerTime], timezone)
cdr.Usage, _ = utils.ParseDurationWithSecs(kev[utils.Usage])
cdr.Cost = -1
return cdr
}
// String is used for pretty printing event in logs
func (kev KamEvent) String() string {
mrsh, _ := json.Marshal(kev)
return string(mrsh)
}
func (kev KamEvent) V1AuthorizeArgs() (args *sessionmanager.V1AuthorizeArgs) {
args = &sessionmanager.V1AuthorizeArgs{
GetMaxUsage: true,
CGREvent: utils.CGREvent{
Tenant: utils.FirstNonEmpty(kev[utils.Tenant],
config.CgrConfig().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: kev.AsMapStringInterface(),
},
}
subsystems, has := kev[KamCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.GetMaxUsage = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AuthorizeResources = true
}
if strings.Index(subsystems, utils.MetaSuppliers) != -1 {
args.GetSuppliers = true
}
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
return
}
// AsKamAuthReply builds up a Kamailio AuthReply based on arguments and reply from SessionS
func (kev KamEvent) AsKamAuthReply(authArgs *sessionmanager.V1AuthorizeArgs,
authReply *sessionmanager.V1AuthorizeReply, rplyErr error) (kar *KamAuthReply, err error) {
kar = &KamAuthReply{Event: CGR_AUTH_REPLY,
TransactionIndex: kev[KamTRIndex],
TransactionLabel: kev[KamTRLabel],
}
if rplyErr != nil {
kar.Error = rplyErr.Error()
return
}
if authArgs.GetAttributes && authReply.Attributes != nil {
kar.Attributes = authReply.Attributes.Digest()
}
if authArgs.AuthorizeResources {
kar.ResourceAllocation = *authReply.ResourceAllocation
}
if authArgs.GetMaxUsage {
if *authReply.MaxUsage == -1 { // For calls different than unlimited, set limits
kar.MaxUsage = -1
} else {
kar.MaxUsage = int(utils.Round(authReply.MaxUsage.Seconds(), 0, utils.ROUNDING_MIDDLE))
}
}
if authArgs.GetSuppliers && authReply.Suppliers != nil {
kar.Suppliers = authReply.Suppliers.Digest()
}
return
}
// V1InitSessionArgs returns the arguments used in SessionSv1.InitSession
func (kev KamEvent) V1InitSessionArgs() (args *sessionmanager.V1InitSessionArgs) {
args = &sessionmanager.V1InitSessionArgs{ // defaults
InitSession: true,
CGREvent: utils.CGREvent{
Tenant: utils.FirstNonEmpty(kev[utils.Tenant],
config.CgrConfig().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: kev.AsMapStringInterface(),
},
}
subsystems, has := kev[KamCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.InitSession = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AllocateResources = true
}
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
return
}
// V1TerminateSessionArgs returns the arguments used in SMGv1.TerminateSession
func (kev KamEvent) V1TerminateSessionArgs() (args *sessionmanager.V1TerminateSessionArgs) {
args = &sessionmanager.V1TerminateSessionArgs{ // defaults
TerminateSession: true,
CGREvent: utils.CGREvent{
Tenant: utils.FirstNonEmpty(kev[utils.Tenant],
config.CgrConfig().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: kev.AsMapStringInterface(),
},
}
subsystems, has := kev[KamCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.TerminateSession = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.ReleaseResources = true
}
return
}
type KamAuthReply struct {
Event string // Kamailio will use this to differentiate between requests and replies
TransactionIndex string // Original transaction index
TransactionLabel string // Original transaction label
Attributes string
ResourceAllocation string
MaxUsage int // Maximum session time in case of success, -1 for unlimited
Suppliers string // List of suppliers, comma separated
Error string // Reply in case of error
}
func (self *KamAuthReply) String() string {
mrsh, _ := json.Marshal(self)
return string(mrsh)
}

58
agents/kamevent_test.go Normal file
View File

@@ -0,0 +1,58 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"reflect"
"testing"
"github.com/cgrates/cgrates/utils"
)
var kamEv = KamEvent{KamTRIndex: "29223", KamTRLabel: "698469260",
"callid": "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ", "from_tag": "eb082607", "to_tag": "4ea9687f", "cgr_account": "dan",
"cgr_reqtype": utils.META_PREPAID, "cgr_subject": "dan", "cgr_destination": "+4986517174963", "cgr_tenant": "itsyscom.com",
"cgr_duration": "20", utils.CGR_SUPPLIER: "suppl2", utils.CGR_DISCONNECT_CAUSE: "200", "extra1": "val1", "extra2": "val2"}
func TestNewKamEvent(t *testing.T) {
evStr := `{"event":"CGR_CALL_END",
"callid":"46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0",
"from_tag":"bf71ad59",
"to_tag":"7351fecf",
"cgr_reqtype":"*postpaid",
"cgr_account":"1001",
"cgr_destination":"1002",
"cgr_answertime":"1419839310",
"cgr_duration":"3",
"cgr_supplier":"supplier2",
"cgr_disconnectcause": "200",
"cgr_pdd": "4"}`
eKamEv := KamEvent{"event": "CGR_CALL_END",
"callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0",
"from_tag": "bf71ad59", "to_tag": "7351fecf",
"cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001",
"cgr_destination": "1002", "cgr_answertime": "1419839310",
"cgr_duration": "3", "cgr_pdd": "4",
utils.CGR_SUPPLIER: "supplier2",
utils.CGR_DISCONNECT_CAUSE: "200"}
if kamEv, err := NewKamEvent([]byte(evStr)); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eKamEv, kamEv) {
t.Error("Received: ", kamEv)
}
}

View File

@@ -305,40 +305,17 @@ func startFsAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan c
exitChan <- true
}
func startSmKamailio(internalRaterChan, internalCDRSChan, internalRsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
func startKamAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
var err error
utils.Logger.Info("Starting CGRateS SMKamailio service.")
var ralsConn, cdrsConn, rlSConn *rpcclient.RpcClientPool
if len(cfg.SmKamConfig.RALsConns) != 0 {
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmKamConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMKamailio> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
}
if len(cfg.SmKamConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmKamConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMKamailio> Could not connect to CDRs: %s", err.Error()))
exitChan <- true
return
}
}
if len(cfg.SmKamConfig.RLsConns) != 0 {
rlSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmKamConfig.RLsConns, internalRsChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMKamailio> Could not connect to RLsConns: %s", err.Error()))
exitChan <- true
return
}
}
sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, ralsConn, cdrsConn, rlSConn, cfg.DefaultTimezone)
if err = sm.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMKamailio> error: %s!", err))
utils.Logger.Info("Starting Kamailio agent")
smgRpcConn := <-internalSMGChan
internalSMGChan <- smgRpcConn
birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessionmanager.SMGeneric))
ka := agents.NewKamailioAgent(cfg.KamAgentCfg(),
birpcClnt, utils.FirstNonEmpty(cfg.KamAgentCfg().Timezone, cfg.DefaultTimezone))
if err = ka.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
}
exitChan <- true
}
@@ -906,8 +883,8 @@ func main() {
}
// Start SM-Kamailio
if cfg.SmKamConfig.Enabled {
go startSmKamailio(internalRaterChan, internalCdrSChan, internalRsChan, cdrDb, exitChan)
if cfg.KamAgentCfg().Enabled {
go startKamAgent(internalSMGChan, exitChan)
}
if cfg.AsteriskAgentCfg().Enabled {

View File

@@ -340,6 +340,7 @@ const CGRATES_CFG_JSON = `
{"address": "*internal"} // connection towards session service: <*internal>
],
"create_cdr": false, // create CDR out of events and sends them to CDRS component
"timezone": "", // timezone of the Kamailio server
"evapi_conns":[ // instantiate connections to multiple Kamailio servers
{"address": "127.0.0.1:8448", "reconnects": 5}
],

View File

@@ -43,6 +43,7 @@ type KamAgentCfg struct {
SessionSConns []*HaPoolConfig
CreateCdr bool
EvapiConns []*KamConnConfig
Timezone string
}
func (ka *KamAgentCfg) loadFromJsonCfg(jsnCfg *KamAgentJsonCfg) error {

View File

@@ -59,11 +59,19 @@ func TestCgrCdrAsCDR(t *testing.T) {
// Make sure the replicated CDR matches the expected CDR
func TestReplicatedCgrCdrAsCDR(t *testing.T) {
cgrCdr := CgrCdr{utils.CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41", utils.TOR: utils.VOICE, utils.OriginID: "dsafdsaf", utils.OriginHost: "192.168.1.1",
utils.Source: "internal_test", utils.RequestType: utils.META_RATED,
utils.Direction: utils.OUT, utils.Tenant: "cgrates.org", utils.Category: "call",
utils.Account: "1001", utils.Subject: "1001", utils.Destination: "1002", utils.SetupTime: "2013-11-07T08:42:20Z", utils.PDD: "0.200", utils.AnswerTime: "2013-11-07T08:42:26Z",
utils.Usage: "10s", utils.SUPPLIER: "SUPPL1", utils.DISCONNECT_CAUSE: "NORMAL_CLEARING", utils.COST: "0.12", utils.RATED: "true", "field_extr1": "val_extr1", "fieldextr2": "valextr2"}
cgrCdr := CgrCdr{utils.CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41",
utils.TOR: utils.VOICE, utils.OriginID: "dsafdsaf",
utils.OriginHost: "192.168.1.1",
utils.Source: "internal_test",
utils.RequestType: utils.META_RATED,
utils.Tenant: "cgrates.org", utils.Category: "call",
utils.Account: "1001", utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: "2013-11-07T08:42:20Z",
utils.AnswerTime: "2013-11-07T08:42:26Z",
utils.Usage: "10s", utils.COST: "0.12",
utils.RATED: "true", "field_extr1": "val_extr1",
"fieldextr2": "valextr2"}
expctRtCdr := &CDR{CGRID: cgrCdr[utils.CGRID],
ToR: cgrCdr[utils.TOR],
OriginID: cgrCdr[utils.OriginID],

View File

@@ -1,314 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"errors"
"fmt"
"log"
"reflect"
"regexp"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/kamevapi"
"github.com/cgrates/rpcclient"
)
func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv,
rlS rpcclient.RpcClientConnection, timezone string) (ksm *KamailioSessionManager, err error) {
if rlS != nil && reflect.ValueOf(rlS).IsNil() {
rlS = nil
}
ksm = &KamailioSessionManager{cfg: smKamCfg, rater: rater, cdrsrv: cdrsrv, rlS: rlS,
timezone: timezone, conns: make(map[string]*kamevapi.KamEvapi), sessions: NewSessions()}
return
}
type KamailioSessionManager struct {
cfg *config.SmKamConfig
rater rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
rlS rpcclient.RpcClientConnection
timezone string
conns map[string]*kamevapi.KamEvapi
sessions *Sessions
}
func (self *KamailioSessionManager) getSuppliers(kev KamEvent) (string, error) {
cd, err := kev.AsCallDescriptor()
cd.CgrID = kev.GetCgrId(self.timezone)
if err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-Kamailio> LCR_PREPROCESS_ERROR error: %s", err.Error()))
return "", errors.New("LCR_PREPROCESS_ERROR")
}
var lcr engine.LCRCost
if err = self.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-Kamailio> LCR_API_ERROR error: %s", err.Error()))
return "", errors.New("LCR_API_ERROR")
}
if lcr.HasErrors() {
lcr.LogErrors()
return "", errors.New("LCR_COMPUTE_ERROR")
}
return lcr.SuppliersString()
}
func (self *KamailioSessionManager) allocateResources(kev KamEvent) (err error) {
if self.rlS == nil {
return errors.New("no RLs connection")
}
var ev map[string]interface{}
if ev, err = kev.AsMapStringIface(); err != nil {
return
}
attrRU := utils.ArgRSv1ResourceUsage{
CGREvent: utils.CGREvent{
Tenant: kev.GetTenant(utils.META_DEFAULT),
Event: ev,
},
UsageID: kev.GetUUID(),
Units: 1, // One channel reserved
}
var reply string
return self.rlS.Call(utils.ResourceSv1AllocateResources, attrRU, &reply)
}
func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) {
kev, err := NewKamEvent(evData)
if err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
return
}
if kev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
return
}
if kev.MissingParameter(self.timezone) {
if kar, err := kev.AsKamAuthReply(0.0, "", false, "", utils.ErrMandatoryIeMissing); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed building auth reply %s", err.Error()))
} else if err = self.conns[connId].Send(kar.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending auth reply %s", err.Error()))
}
return
}
var remainingDuration float64
var errReply error
if errReply = self.rater.Call("Responder.GetDerivedMaxSessionTime",
kev.AsCDR(self.timezone), &remainingDuration); errReply != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Could not get max session time, error: %s", errReply.Error()))
}
var supplStr string
var errSuppl error
if kev.ComputeLcr() {
if supplStr, errSuppl = self.getSuppliers(kev); errSuppl != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Could not get suppliers, error: %s", errSuppl.Error()))
}
}
if errReply == nil { // Overwrite the error from maxSessionTime with the one from suppliers if nil
errReply = errSuppl
}
resourceAllowed := true
if self.rlS != nil {
if err := self.allocateResources(kev); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> RLs error: %s", err.Error()))
resourceAllowed = false
}
}
if kar, err := kev.AsKamAuthReply(remainingDuration, supplStr, resourceAllowed, "", errReply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed building auth reply %s", err.Error()))
} else if err = self.conns[connId].Send(kar.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending auth reply %s", err.Error()))
}
}
func (self *KamailioSessionManager) onCgrLcrReq(evData []byte, connId string) {
kev, err := NewKamEvent(evData)
if err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", string(evData), err.Error()))
return
}
supplStr, err := self.getSuppliers(kev)
kamLcrReply, errReply := kev.AsKamAuthReply(0, supplStr, false, "", err)
kamLcrReply.Event = CGR_LCR_REPLY // Hit the CGR_LCR_REPLY event route on Kamailio side
if errReply != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed building LCR reply %s", errReply.Error()))
} else if err = self.conns[connId].Send(kamLcrReply.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending LCR reply %s", err.Error()))
}
}
// onCgrRLReq is the handler for CGR_RL_REQUEST events coming from Kamailio
func (self *KamailioSessionManager) onCgrRLReq(evData []byte, connId string) {
kev, err := NewKamEvent(evData)
if err != nil {
utils.Logger.Info(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", string(evData), err.Error()))
return
}
resourceAllowed := true
if err := self.allocateResources(kev); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> RLs error: %s", err.Error()))
resourceAllowed = false
}
kamRLReply, errReply := kev.AsKamAuthReply(0, "", resourceAllowed, "", err)
kamRLReply.Event = CGR_RL_REPLY // Hit the CGR_LCR_REPLY event route on Kamailio side
if errReply != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed building RL reply %s", errReply.Error()))
} else if err = self.conns[connId].Send(kamRLReply.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending RL reply %s", err.Error()))
}
}
func (self *KamailioSessionManager) onCallStart(evData []byte, connId string) {
kamEv, err := NewKamEvent(evData)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
return
}
if kamEv.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
return
}
if kamEv.MissingParameter(self.timezone) {
self.DisconnectSession(kamEv, connId, utils.ErrMandatoryIeMissing.Error())
return
}
s := NewSession(kamEv, connId, self)
if s != nil {
self.sessions.indexSession(s)
}
}
func (self *KamailioSessionManager) onCallEnd(evData []byte, connId string) {
kev, err := NewKamEvent(evData)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
return
}
if kev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
return
}
if kev.MissingParameter(self.timezone) {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Mandatory IE missing out of event: %+v", kev))
}
go self.ProcessCdr(kev.AsCDR(self.Timezone()))
if self.rlS != nil { // Release RLs resource
go func() {
ev, err := kev.AsMapStringIface()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> RLs error: %s", err.Error()))
return
}
var reply string
attrRU := utils.ArgRSv1ResourceUsage{
CGREvent: utils.CGREvent{
Tenant: kev.GetTenant(utils.META_DEFAULT),
Event: ev,
},
UsageID: kev.GetUUID(),
Units: 1,
}
if err := self.rlS.Call(utils.ResourceSv1ReleaseResources, attrRU, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> RLs API error: %s", err.Error()))
}
}()
}
if s := self.sessions.getSession(kev.GetUUID()); s != nil {
if err := self.sessions.removeSession(s, kev); err != nil {
utils.Logger.Err(err.Error())
}
}
}
func (self *KamailioSessionManager) Connect() error {
var err error
eventHandlers := map[*regexp.Regexp][]func([]byte, string){
regexp.MustCompile(CGR_AUTH_REQUEST): []func([]byte, string){self.onCgrAuth},
regexp.MustCompile(CGR_LCR_REQUEST): []func([]byte, string){self.onCgrLcrReq},
regexp.MustCompile(CGR_RL_REQUEST): []func([]byte, string){self.onCgrRLReq},
regexp.MustCompile(CGR_CALL_START): []func([]byte, string){self.onCallStart},
regexp.MustCompile(CGR_CALL_END): []func([]byte, string){self.onCallEnd},
}
errChan := make(chan error)
for _, connCfg := range self.cfg.EvapiConns {
connId := utils.GenUUID()
logger := log.New(utils.Logger, "KamEvapi:", 2)
if self.conns[connId], err = kamevapi.NewKamEvapi(connCfg.Address, connId, connCfg.Reconnects, eventHandlers, logger); err != nil {
return err
}
go func() { // Start reading in own goroutine, return on error
if err := self.conns[connId].ReadEvents(); err != nil {
errChan <- err
}
}()
}
err = <-errChan // Will keep the Connect locked until the first error in one of the connections
return err
}
func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
sessionIds := ev.GetSessionIds()
disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: sessionIds[0], HashId: sessionIds[1], Reason: notify}
if err := self.conns[connId].Send(disconnectEv.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending disconnect request, error %s, connection id: %s", err.Error(), connId))
return err
}
return nil
}
func (self *KamailioSessionManager) DebitInterval() time.Duration {
return self.cfg.DebitInterval
}
func (self *KamailioSessionManager) CdrSrv() rpcclient.RpcClientConnection {
return self.cdrsrv
}
func (self *KamailioSessionManager) Rater() rpcclient.RpcClientConnection {
return self.rater
}
func (self *KamailioSessionManager) ProcessCdr(cdr *engine.CDR) error {
if !self.cfg.CreateCdr {
return nil
}
var reply string
if err := self.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CGRID, cdr.OriginID, err.Error()))
}
return nil
}
func (sm *KamailioSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
}
func (self *KamailioSessionManager) Shutdown() error {
return nil
}
func (self *KamailioSessionManager) Sessions() []*Session {
return self.sessions.getSessions()
}
func (self *KamailioSessionManager) SyncSessions() error {
return nil
}
func (self *KamailioSessionManager) Timezone() string {
return self.timezone
}

View File

@@ -1,26 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"testing"
)
func TestKamSMInterface(t *testing.T) {
var _ SessionManager = SessionManager(new(KamailioSessionManager))
}

View File

@@ -1,407 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"encoding/json"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
const (
EVENT = "event"
CGR_AUTH_REQUEST = "CGR_AUTH_REQUEST"
CGR_LCR_REQUEST = "CGR_LCR_REQUEST"
CGR_AUTH_REPLY = "CGR_AUTH_REPLY"
CGR_LCR_REPLY = "CGR_LCR_REPLY"
CGR_SESSION_DISCONNECT = "CGR_SESSION_DISCONNECT"
CGR_CALL_START = "CGR_CALL_START"
CGR_CALL_END = "CGR_CALL_END"
CGR_RL_REQUEST = "CGR_RL_REQUEST"
CGR_RL_REPLY = "CGR_RL_REPLY"
CGR_SETUPTIME = "cgr_setuptime"
CGR_ANSWERTIME = "cgr_answertime"
CGR_STOPTIME = "cgr_stoptime"
CGR_DURATION = "cgr_duration"
CGR_PDD = "cgr_pdd"
KAM_TR_INDEX = "tr_index"
KAM_TR_LABEL = "tr_label"
HASH_ENTRY = "h_entry"
HASH_ID = "h_id"
)
var primaryFields = []string{EVENT, CALLID, FROM_TAG, HASH_ENTRY, HASH_ID, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION,
CGR_CATEGORY, CGR_TENANT, CGR_REQTYPE, CGR_ANSWERTIME, CGR_SETUPTIME, CGR_STOPTIME, CGR_DURATION, CGR_PDD, utils.CGR_SUPPLIER, utils.CGR_DISCONNECT_CAUSE}
type KamAuthReply struct {
Event string // Kamailio will use this to differentiate between requests and replies
TransactionIndex int // Original transaction index
TransactionLabel int // Original transaction label
MaxSessionTime int // Maximum session time in case of success, -1 for unlimited
Suppliers string // List of suppliers, comma separated
ResourceAllocated bool
AllocationMessage string
Error string // Reply in case of error
}
func (self *KamAuthReply) String() string {
mrsh, _ := json.Marshal(self)
return string(mrsh)
}
type KamLcrReply struct {
Event string
Suppliers string
Error error
}
func (self *KamLcrReply) String() string {
self.Event = CGR_LCR_REPLY
mrsh, _ := json.Marshal(self)
return string(mrsh)
}
type KamSessionDisconnect struct {
Event string
HashEntry string
HashId string
Reason string
}
func (self *KamSessionDisconnect) String() string {
mrsh, _ := json.Marshal(self)
return string(mrsh)
}
func NewKamEvent(kamEvData []byte) (KamEvent, error) {
kev := make(map[string]string)
if err := json.Unmarshal(kamEvData, &kev); err != nil {
return nil, err
}
return kev, nil
}
// Hold events received from Kamailio
type KamEvent map[string]string
// Backwards compatibility, should be AsEvent
func (kev KamEvent) AsEvent(ignored string) engine.Event {
return engine.Event(kev)
}
func (kev KamEvent) GetName() string {
return kev[EVENT]
}
func (kev KamEvent) GetCgrId(timezone string) string {
setupTime, _ := kev.GetSetupTime(utils.META_DEFAULT, timezone)
return utils.Sha1(kev.GetUUID(), setupTime.UTC().String())
}
func (kev KamEvent) GetUUID() string {
return kev[CALLID] + ";" + kev[FROM_TAG] // ToTag not available in callStart event
}
func (kev KamEvent) GetSessionIds() []string {
return []string{kev[HASH_ENTRY], kev[HASH_ID]}
}
func (kev KamEvent) GetDirection(fieldName string) string {
return utils.OUT
}
func (kev KamEvent) GetAccount(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(kev[fieldName], kev[CGR_ACCOUNT])
}
func (kev KamEvent) GetSubject(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(kev[fieldName], kev[CGR_SUBJECT], kev.GetAccount(fieldName))
}
func (kev KamEvent) GetDestination(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(kev[fieldName], kev[CGR_DESTINATION])
}
func (kev KamEvent) GetCallDestNr(fieldName string) string {
return kev.GetDestination(utils.META_DEFAULT)
}
func (kev KamEvent) GetCategory(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(kev[fieldName], kev[CGR_CATEGORY], config.CgrConfig().DefaultCategory)
}
func (kev KamEvent) GetTenant(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(kev[fieldName], kev[CGR_TENANT], config.CgrConfig().DefaultTenant)
}
func (kev KamEvent) GetReqType(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(kev[fieldName], kev[CGR_REQTYPE], config.CgrConfig().DefaultReqType)
}
func (kev KamEvent) GetAnswerTime(fieldName, timezone string) (time.Time, error) {
aTimeStr := utils.FirstNonEmpty(kev[fieldName], kev[CGR_ANSWERTIME])
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
aTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.ParseTimeDetectLayout(aTimeStr, timezone)
}
func (kev KamEvent) GetSetupTime(fieldName, timezone string) (time.Time, error) {
sTimeStr := utils.FirstNonEmpty(kev[fieldName], kev[CGR_SETUPTIME], kev[CGR_ANSWERTIME])
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
sTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.ParseTimeDetectLayout(sTimeStr, timezone)
}
func (kev KamEvent) GetEndTime(fieldName, timezone string) (time.Time, error) {
return utils.ParseTimeDetectLayout(kev[CGR_STOPTIME], timezone)
}
func (kev KamEvent) GetDuration(fieldName string) (time.Duration, error) {
durStr := utils.FirstNonEmpty(kev[fieldName], kev[CGR_DURATION])
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
durStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.ParseDurationWithSecs(durStr)
}
func (kev KamEvent) GetPdd(fieldName string) (time.Duration, error) {
var pddStr string
if utils.IsSliceMember([]string{utils.PDD, utils.META_DEFAULT}, fieldName) {
pddStr = kev[CGR_PDD]
} else if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
pddStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
} else {
pddStr = kev[fieldName]
}
return utils.ParseDurationWithSecs(pddStr)
}
func (kev KamEvent) GetSupplier(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(kev[fieldName], kev[utils.CGR_SUPPLIER])
}
func (kev KamEvent) GetDisconnectCause(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(kev[fieldName], kev[utils.CGR_DISCONNECT_CAUSE])
}
//ToDo: extract the IP of the kamailio server generating the event
func (kev KamEvent) GetOriginatorIP(string) string {
return "127.0.0.1"
}
func (kev KamEvent) GetExtraFields() map[string]string {
extraFields := make(map[string]string)
for field, val := range kev {
if !utils.IsSliceMember(primaryFields, field) {
extraFields[field] = val
}
}
return extraFields
}
func (kev KamEvent) GetCdrSource() string {
return "KAMAILIO_" + kev.GetName()
}
func (kev KamEvent) MissingParameter(timezone string) bool {
var nullTime time.Time
switch kev.GetName() {
case CGR_AUTH_REQUEST:
if setupTime, err := kev.GetSetupTime(utils.META_DEFAULT, timezone); err != nil || setupTime == nullTime {
return true
}
return len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
len(kev[KAM_TR_INDEX]) == 0 || len(kev[KAM_TR_LABEL]) == 0
case CGR_LCR_REQUEST:
return len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
len(kev[KAM_TR_INDEX]) == 0 || len(kev[KAM_TR_LABEL]) == 0
case CGR_CALL_START:
if aTime, err := kev.GetAnswerTime(utils.META_DEFAULT, timezone); err != nil || aTime == nullTime {
return true
}
return len(kev.GetUUID()) == 0 ||
len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
len(kev[HASH_ENTRY]) == 0 || len(kev[HASH_ID]) == 0
case CGR_CALL_END:
return len(kev.GetUUID()) == 0 ||
len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
len(kev[CGR_DURATION]) == 0
default:
return true
}
}
// Useful for CDR generation
func (kev KamEvent) ParseEventValue(rsrFld *utils.RSRField, timezone string) string {
sTime, _ := kev.GetSetupTime(utils.META_DEFAULT, config.CgrConfig().DefaultTimezone)
aTime, _ := kev.GetAnswerTime(utils.META_DEFAULT, config.CgrConfig().DefaultTimezone)
duration, _ := kev.GetDuration(utils.META_DEFAULT)
switch rsrFld.Id {
case utils.CGRID:
return rsrFld.ParseValue(kev.GetCgrId(timezone))
case utils.TOR:
return rsrFld.ParseValue(utils.VOICE)
case utils.OriginID:
return rsrFld.ParseValue(kev.GetUUID())
case utils.OriginHost:
return rsrFld.ParseValue(kev.GetOriginatorIP(utils.META_DEFAULT))
case utils.Source:
return rsrFld.ParseValue(kev.GetCdrSource())
case utils.RequestType:
return rsrFld.ParseValue(kev.GetReqType(utils.META_DEFAULT))
case utils.Direction:
return rsrFld.ParseValue(kev.GetDirection(utils.META_DEFAULT))
case utils.Tenant:
return rsrFld.ParseValue(kev.GetTenant(utils.META_DEFAULT))
case utils.Category:
return rsrFld.ParseValue(kev.GetCategory(utils.META_DEFAULT))
case utils.Account:
return rsrFld.ParseValue(kev.GetAccount(utils.META_DEFAULT))
case utils.Subject:
return rsrFld.ParseValue(kev.GetSubject(utils.META_DEFAULT))
case utils.Destination:
return rsrFld.ParseValue(kev.GetDestination(utils.META_DEFAULT))
case utils.SetupTime:
return rsrFld.ParseValue(sTime.String())
case utils.AnswerTime:
return rsrFld.ParseValue(aTime.String())
case utils.Usage:
return rsrFld.ParseValue(strconv.FormatFloat(utils.Round(duration.Seconds(), 0, utils.ROUNDING_MIDDLE), 'f', -1, 64))
case utils.PDD:
return rsrFld.ParseValue(strconv.FormatFloat(utils.Round(duration.Seconds(), 0, utils.ROUNDING_MIDDLE), 'f', -1, 64))
case utils.SUPPLIER:
return rsrFld.ParseValue(kev.GetSupplier(utils.META_DEFAULT))
case utils.DISCONNECT_CAUSE:
return rsrFld.ParseValue(kev.GetDisconnectCause(utils.META_DEFAULT))
case utils.MEDI_RUNID:
return rsrFld.ParseValue(utils.META_DEFAULT)
case utils.COST:
return rsrFld.ParseValue("-1.0")
default:
return rsrFld.ParseValue(kev.GetExtraFields()[rsrFld.Id])
}
}
func (kev KamEvent) PassesFieldFilter(*utils.RSRField) (bool, string) {
return false, ""
}
func (kev KamEvent) AsCDR(timezone string) *engine.CDR {
storCdr := new(engine.CDR)
storCdr.CGRID = kev.GetCgrId(timezone)
storCdr.ToR = utils.VOICE
storCdr.OriginID = kev.GetUUID()
storCdr.OriginHost = kev.GetOriginatorIP(utils.META_DEFAULT)
storCdr.Source = kev.GetCdrSource()
storCdr.RequestType = kev.GetReqType(utils.META_DEFAULT)
storCdr.Tenant = kev.GetTenant(utils.META_DEFAULT)
storCdr.Category = kev.GetCategory(utils.META_DEFAULT)
storCdr.Account = kev.GetAccount(utils.META_DEFAULT)
storCdr.Subject = kev.GetSubject(utils.META_DEFAULT)
storCdr.Destination = kev.GetDestination(utils.META_DEFAULT)
storCdr.SetupTime, _ = kev.GetSetupTime(utils.META_DEFAULT, timezone)
storCdr.AnswerTime, _ = kev.GetAnswerTime(utils.META_DEFAULT, timezone)
storCdr.Usage, _ = kev.GetDuration(utils.META_DEFAULT)
storCdr.ExtraFields = kev.GetExtraFields()
storCdr.Cost = -1
return storCdr
}
func (kev KamEvent) String() string {
mrsh, _ := json.Marshal(kev)
return string(mrsh)
}
func (kev KamEvent) AsKamAuthReply(maxSessionTime float64, suppliers string,
resAllocated bool, allocationMessage string, rplyErr error) (kar *KamAuthReply, err error) {
kar = &KamAuthReply{Event: CGR_AUTH_REPLY, Suppliers: suppliers,
ResourceAllocated: resAllocated, AllocationMessage: allocationMessage}
if rplyErr != nil {
kar.Error = rplyErr.Error()
}
if _, hasIt := kev[KAM_TR_INDEX]; !hasIt {
return nil, utils.NewErrMandatoryIeMissing(KAM_TR_INDEX, "")
}
if kar.TransactionIndex, err = strconv.Atoi(kev[KAM_TR_INDEX]); err != nil {
return nil, err
}
if _, hasIt := kev[KAM_TR_LABEL]; !hasIt {
return nil, utils.NewErrMandatoryIeMissing(KAM_TR_LABEL, "")
}
if kar.TransactionLabel, err = strconv.Atoi(kev[KAM_TR_LABEL]); err != nil {
return nil, err
}
if maxSessionTime != -1 { // Convert maxSessionTime from nanoseconds into seconds
maxSessionDur := time.Duration(maxSessionTime)
maxSessionTime = maxSessionDur.Seconds()
}
kar.MaxSessionTime = int(utils.Round(maxSessionTime, 0, utils.ROUNDING_MIDDLE))
return kar, nil
}
// Converts into CallDescriptor due to responder interface needs
func (kev KamEvent) AsCallDescriptor() (*engine.CallDescriptor, error) {
lcrReq := &engine.LcrRequest{
Direction: kev.GetDirection(utils.META_DEFAULT),
Tenant: kev.GetTenant(utils.META_DEFAULT),
Category: kev.GetCategory(utils.META_DEFAULT),
Account: kev.GetAccount(utils.META_DEFAULT),
Subject: kev.GetSubject(utils.META_DEFAULT),
Destination: kev.GetDestination(utils.META_DEFAULT),
SetupTime: utils.FirstNonEmpty(kev[CGR_SETUPTIME], kev[CGR_ANSWERTIME]),
Duration: kev[CGR_DURATION],
}
return lcrReq.AsCallDescriptor(config.CgrConfig().DefaultTimezone)
}
func (kev KamEvent) ComputeLcr() bool {
if computeLcr, err := strconv.ParseBool(kev[utils.CGR_COMPUTELCR]); err != nil {
return false
} else {
return computeLcr
}
}
func (kev KamEvent) AsMapStringIface() (mp map[string]interface{}, err error) {
mp = make(map[string]interface{}, len(kev))
for k, v := range kev {
mp[k] = v
}
return
}

View File

@@ -1,127 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var kamEv = KamEvent{KAM_TR_INDEX: "29223", KAM_TR_LABEL: "698469260", "callid": "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ", "from_tag": "eb082607", "to_tag": "4ea9687f", "cgr_account": "dan",
"cgr_reqtype": utils.META_PREPAID, "cgr_subject": "dan", "cgr_destination": "+4986517174963", "cgr_tenant": "itsyscom.com",
"cgr_duration": "20", utils.CGR_SUPPLIER: "suppl2", utils.CGR_DISCONNECT_CAUSE: "200", "extra1": "val1", "extra2": "val2"}
func TestKamailioEventInterface(t *testing.T) {
var _ engine.Event = engine.Event(kamEv)
}
func TestNewKamEvent(t *testing.T) {
evStr := `{"event":"CGR_CALL_END",
"callid":"46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0",
"from_tag":"bf71ad59",
"to_tag":"7351fecf",
"cgr_reqtype":"*postpaid",
"cgr_account":"1001",
"cgr_destination":"1002",
"cgr_answertime":"1419839310",
"cgr_duration":"3",
"cgr_supplier":"supplier2",
"cgr_disconnectcause": "200",
"cgr_pdd": "4"}`
eKamEv := KamEvent{"event": "CGR_CALL_END", "callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0", "from_tag": "bf71ad59", "to_tag": "7351fecf",
"cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001", "cgr_destination": "1002", "cgr_answertime": "1419839310", "cgr_duration": "3", CGR_PDD: "4",
utils.CGR_SUPPLIER: "supplier2",
utils.CGR_DISCONNECT_CAUSE: "200"}
if kamEv, err := NewKamEvent([]byte(evStr)); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eKamEv, kamEv) {
t.Error("Received: ", kamEv)
}
}
func TestKevAsKamAuthReply(t *testing.T) {
expectedKar := &KamAuthReply{Event: CGR_AUTH_REPLY, TransactionIndex: 29223, TransactionLabel: 698469260,
MaxSessionTime: 1200, ResourceAllocated: true, Suppliers: "supplier1,supplier2"}
if rcvKar, err := kamEv.AsKamAuthReply(1200000000000.0, "supplier1,supplier2", true, "", nil); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expectedKar, rcvKar) {
t.Error("Received KAR: ", rcvKar)
}
}
func TestKevMissingParameter(t *testing.T) {
kamEv := KamEvent{"event": "CGR_AUTH_REQUEST", "tr_index": "36045", "tr_label": "612369399", "cgr_reqtype": utils.META_POSTPAID,
"cgr_account": "1001", "cgr_destination": "1002"}
if !kamEv.MissingParameter("") {
t.Error("Failed detecting missing parameters")
}
kamEv["cgr_setuptime"] = "1419962256"
if kamEv.MissingParameter("") {
t.Error("False detecting missing parameters")
}
kamEv = KamEvent{"event": "UNKNOWN"}
if !kamEv.MissingParameter("") {
t.Error("Failed detecting missing parameters")
}
kamEv = KamEvent{"event": CGR_LCR_REQUEST, "tr_index": "36045", "tr_label": "612369399", "cgr_reqtype": utils.META_POSTPAID,
"cgr_account": "1001"}
if !kamEv.MissingParameter("") {
t.Error("Failed detecting missing parameters")
}
kamEv = KamEvent{"event": CGR_LCR_REQUEST, CGR_ACCOUNT: "1001", CGR_DESTINATION: "1002", "tr_index": "36045", "tr_label": "612369399"}
if kamEv.MissingParameter("") {
t.Error("False detecting missing parameters")
}
kamEv = KamEvent{"event": "CGR_CALL_START", "callid": "9d28ec3ee068babdfe036623f42c0969@0:0:0:0:0:0:0:0", "from_tag": "3131b566",
"cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001", "cgr_destination": "1002"}
if !kamEv.MissingParameter("") {
t.Error("Failed detecting missing parameters")
}
kamEv["h_entry"] = "463"
kamEv["h_id"] = "2605"
kamEv["cgr_answertime"] = "1419964961"
if kamEv.MissingParameter("") {
t.Error("False detecting missing parameters")
}
}
func TestKevAsCallDescriptor(t *testing.T) {
sTime := time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC)
kamEv := KamEvent{"event": CGR_LCR_REQUEST, CGR_ACCOUNT: "1001", CGR_DESTINATION: "1002", CGR_SETUPTIME: sTime.String()}
eCd := &engine.CallDescriptor{
Direction: utils.OUT,
Tenant: config.CgrConfig().DefaultTenant,
Category: config.CgrConfig().DefaultCategory,
Account: kamEv[CGR_ACCOUNT],
Subject: kamEv[CGR_ACCOUNT],
Destination: kamEv[CGR_DESTINATION],
TimeStart: sTime,
TimeEnd: sTime.Add(time.Duration(1) * time.Minute),
}
if cd, err := kamEv.AsCallDescriptor(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCd, cd) {
t.Errorf("Expecting: %+v, received: %+v", eCd, cd)
}
}

View File

@@ -53,6 +53,11 @@ const (
OSIPS_INSUFFICIENT_FUNDS = "INSUFFICIENT_FUNDS"
OSIPS_DIALOG_ID = "dialog_id"
OSIPS_SIPCODE = "sip_code"
CGR_SETUPTIME = "cgr_setuptime"
CGR_ANSWERTIME = "cgr_answertime"
CGR_STOPTIME = "cgr_stoptime"
CGR_DURATION = "cgr_duration"
CGR_PDD = "cgr_pdd"
)
func NewOsipsEvent(osipsDagramEvent *osipsdagram.OsipsEvent) (*OsipsEvent, error) {

View File

@@ -35,7 +35,6 @@ func TestSMGenericEventParseFields(t *testing.T) {
smGev[utils.EVENT_NAME] = "TEST_EVENT"
smGev[utils.TOR] = "*voice"
smGev[utils.OriginID] = "12345"
smGev[utils.Direction] = "*out"
smGev[utils.Account] = "account1"
smGev[utils.Subject] = "subject1"
smGev[utils.Destination] = "+4986517174963"
@@ -46,9 +45,6 @@ func TestSMGenericEventParseFields(t *testing.T) {
smGev[utils.AnswerTime] = "2015-11-09 14:22:02"
smGev[utils.Usage] = "1m23s"
smGev[utils.LastUsed] = "21s"
smGev[utils.PDD] = "300ms"
smGev[utils.SUPPLIER] = "supplier1"
smGev[utils.DISCONNECT_CAUSE] = "NORMAL_DISCONNECT"
smGev[utils.OriginHost] = "127.0.0.1"
smGev["Extra1"] = "Value1"
smGev["Extra2"] = 5
@@ -64,9 +60,6 @@ func TestSMGenericEventParseFields(t *testing.T) {
if !reflect.DeepEqual(smGev.GetSessionIds(), []string{"12345"}) {
t.Error("Unexpected: ", smGev.GetSessionIds())
}
if smGev.GetDirection(utils.META_DEFAULT) != "*out" {
t.Error("Unexpected: ", smGev.GetDirection(utils.META_DEFAULT))
}
if smGev.GetTOR(utils.META_DEFAULT) != "*voice" {
t.Error("Unexpected: ", smGev.GetTOR(utils.META_DEFAULT))
}
@@ -113,21 +106,11 @@ func TestSMGenericEventParseFields(t *testing.T) {
} else if lastUsed != time.Duration(21)*time.Second {
t.Error("Unexpected: ", lastUsed)
}
if pdd, err := smGev.GetPdd(utils.META_DEFAULT); err != nil {
t.Error(err)
} else if pdd != time.Duration(300)*time.Millisecond {
t.Error("Unexpected: ", pdd)
}
if smGev.GetSupplier(utils.META_DEFAULT) != "supplier1" {
t.Error("Unexpected: ", smGev.GetSupplier(utils.META_DEFAULT))
}
if smGev.GetDisconnectCause(utils.META_DEFAULT) != "NORMAL_DISCONNECT" {
t.Error("Unexpected: ", smGev.GetDisconnectCause(utils.META_DEFAULT))
}
if smGev.GetOriginatorIP(utils.META_DEFAULT) != "127.0.0.1" {
t.Error("Unexpected: ", smGev.GetOriginatorIP(utils.META_DEFAULT))
}
if extrFlds := smGev.GetExtraFields(); !reflect.DeepEqual(extrFlds, map[string]string{"Extra1": "Value1", "Extra2": "5", "LastUsed": "21s"}) {
if extrFlds := smGev.GetExtraFields(); !reflect.DeepEqual(extrFlds,
map[string]string{"Extra1": "Value1", "Extra2": "5", "LastUsed": "21s"}) {
t.Error("Unexpected: ", extrFlds)
}
}
@@ -155,7 +138,6 @@ func TestSMGenericEventAsCDR(t *testing.T) {
smGev[utils.EVENT_NAME] = "TEST_EVENT"
smGev[utils.TOR] = utils.SMS
smGev[utils.OriginID] = "12345"
smGev[utils.Direction] = utils.OUT
smGev[utils.Account] = "account1"
smGev[utils.Subject] = "subject1"
smGev[utils.Destination] = "+4986517174963"
@@ -165,9 +147,6 @@ func TestSMGenericEventAsCDR(t *testing.T) {
smGev[utils.SetupTime] = "2015-11-09 14:21:24"
smGev[utils.AnswerTime] = "2015-11-09 14:22:02"
smGev[utils.Usage] = "1m23s"
smGev[utils.PDD] = "300ms"
smGev[utils.SUPPLIER] = "supplier1"
smGev[utils.DISCONNECT_CAUSE] = "NORMAL_DISCONNECT"
smGev[utils.OriginHost] = "10.0.3.15"
smGev["Extra1"] = "Value1"
smGev["Extra2"] = 5

View File

@@ -20,8 +20,8 @@ package utils
var (
CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap}
PrimaryCdrFields = []string{CGRID, Source, OriginHost, OriginID, TOR, RequestType, Direction, Tenant, Category, Account, Subject, Destination, SetupTime, PDD, AnswerTime, Usage,
SUPPLIER, DISCONNECT_CAUSE, COST, RATED, PartialField, MEDI_RUNID}
PrimaryCdrFields = []string{CGRID, Source, OriginHost, OriginID, TOR, RequestType, Tenant, Category, Account, Subject, Destination, SetupTime, AnswerTime, Usage,
COST, RATED, PartialField, MEDI_RUNID}
GitLastLog string // If set, it will be processed as part of versioning
PosterTransportContentTypes = map[string]string{
MetaHTTPjsonCDR: CONTENT_JSON,
@@ -377,6 +377,8 @@ const (
MetaThresholds = "*thresholds"
MetaSuppliers = "*suppliers"
MetaAttributes = "*attributes"
MetaResources = "*resources"
MetaCDRs = "*cdrs"
Migrator = "migrator"
UnsupportedMigrationTask = "unsupported migration task"
NoStorDBConnection = "not connected to StorDB"
@@ -499,6 +501,7 @@ const (
MetaSessionS = "*sessions"
FreeSWITCHAgent = "FreeSWITCHAgent"
MetaDefault = "*default"
KamailioAgent = "KamailioAgent"
)
//MetaMetrics