diff --git a/agents/fsagent.go b/agents/fsagent.go
index 84a51bb5a..232c6debe 100644
--- a/agents/fsagent.go
+++ b/agents/fsagent.go
@@ -21,15 +21,12 @@ package agents
import (
"errors"
"fmt"
- "reflect"
- "strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/fsock"
- "github.com/cgrates/rpcclient"
)
func NewFSSessionManager(fsAgentConfig *config.FsAgentConfig,
@@ -222,12 +219,6 @@ func (sm *FSSessionManager) onChannelAnswer(fsev FSEvent, connId string) {
sm.disconnectSession(connId, chanUUID, "", utils.ErrServerError.Error())
return
}
- if initSessionArgs.AllocateResources {
- if initReply.ResourceAllocation == nil {
- sm.disconnectSession(connId, chanUUID, "",
- utils.ErrUnallocatedResource.Error())
- }
- }
}
func (sm *FSSessionManager) onChannelHangupComplete(fsev FSEvent, connId string) {
@@ -241,14 +232,13 @@ func (sm *FSSessionManager) onChannelHangupComplete(fsev FSEvent, connId string)
utils.Logger.Err(
fmt.Sprintf("<%s> Could not terminate session with event %s, error: %s",
utils.FreeSWITCHAgent, fsev.GetUUID(), err.Error()))
- return
}
}
if sm.cfg.CreateCdr {
cdr := fsev.AsCDR(sm.timezone)
if err := sm.smg.Call(utils.SessionSv1ProcessCDR, cdr, &reply); err != nil {
- utils.Logger.Err(fmt.Sprintf("<%s> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>",
- utils.FreeSWITCHAgent, cdr.CGRID, cdr.OriginID, err.Error()))
+ utils.Logger.Err(fmt.Sprintf("<%s> Failed processing CDR: %s, error: <%s>",
+ utils.FreeSWITCHAgent, utils.ToJSON(cdr), err.Error()))
}
}
}
@@ -342,30 +332,8 @@ func (sm *FSSessionManager) Shutdown() (err error) {
}
// rpcclient.RpcClientConnection interface
-func (fsa *FSSessionManager) Call(serviceMethod string, args interface{}, reply interface{}) error {
- parts := strings.Split(serviceMethod, ".")
- if len(parts) != 2 {
- return rpcclient.ErrUnsupporteServiceMethod
- }
- // get method
- method := reflect.ValueOf(fsa).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
- if !method.IsValid() {
- return rpcclient.ErrUnsupporteServiceMethod
- }
- // construct the params
- params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
- ret := method.Call(params)
- if len(ret) != 1 {
- return utils.ErrServerError
- }
- if ret[0].Interface() == nil {
- return nil
- }
- err, ok := ret[0].Interface().(error)
- if !ok {
- return utils.ErrServerError
- }
- return err
+func (sm *FSSessionManager) Call(serviceMethod string, args interface{}, reply interface{}) error {
+ return utils.APIerRPCCall(sm, serviceMethod, args, reply)
}
// Internal method to disconnect session in asterisk
diff --git a/agents/fsevent.go b/agents/fsevent.go
index d4dd036ef..5688ac767 100644
--- a/agents/fsevent.go
+++ b/agents/fsevent.go
@@ -67,14 +67,10 @@ const (
IGNOREPARK = "variable_cgr_ignorepark"
FS_VARPREFIX = "variable_"
VarCGRSubsystems = "variable_cgr_subsystems"
- SubSAccountS = "accounts"
- SubSSupplierS = "suppliers"
- SubSResourceS = "resources"
- SubSAttributeS = "attributes"
CGRResourceAllocation = "cgr_resource_allocation"
VAR_CGR_DISCONNECT_CAUSE = "variable_" + utils.CGR_DISCONNECT_CAUSE
VAR_CGR_CMPUTELCR = "variable_" + utils.CGR_COMPUTELCR
- FsConnID = "FsConnID" // used to share connID info in event
+ FsConnID = "FsConnID" // used to share connID info in event for remote disconnects
VarAnswerEpoch = "variable_answer_epoch"
)
@@ -386,22 +382,22 @@ func (fsev FSEvent) V1AuthorizeArgs() (args *sessionmanager.V1AuthorizeArgs) {
if !has {
return
}
- if strings.Index(subsystems, SubSAccountS) == -1 {
+ if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.GetMaxUsage = false
}
- if strings.Index(subsystems, SubSResourceS) != -1 {
+ if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AuthorizeResources = true
}
- if strings.Index(subsystems, SubSSupplierS) != -1 {
+ if strings.Index(subsystems, utils.MetaSuppliers) != -1 {
args.GetSuppliers = true
}
- if strings.Index(subsystems, SubSAttributeS) != -1 {
+ if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
return
}
-// V2InitSessionArgs returns the arguments used in SMGv1.InitSession
+// V1InitSessionArgs returns the arguments used in SessionSv1.InitSession
func (fsev FSEvent) V1InitSessionArgs() (args *sessionmanager.V1InitSessionArgs) {
args = &sessionmanager.V1InitSessionArgs{ // defaults
InitSession: true,
@@ -415,13 +411,13 @@ func (fsev FSEvent) V1InitSessionArgs() (args *sessionmanager.V1InitSessionArgs)
if !has {
return
}
- if strings.Index(subsystems, SubSAccountS) == -1 {
+ if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.InitSession = false
}
- if strings.Index(subsystems, SubSResourceS) != -1 {
+ if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AllocateResources = true
}
- if strings.Index(subsystems, SubSAttributeS) != -1 {
+ if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
return
@@ -441,10 +437,10 @@ func (fsev FSEvent) V1TerminateSessionArgs() (args *sessionmanager.V1TerminateSe
if !has {
return
}
- if strings.Index(subsystems, SubSAccountS) == -1 {
+ if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.TerminateSession = false
}
- if strings.Index(subsystems, SubSResourceS) != -1 {
+ if strings.Index(subsystems, utils.MetaResources) != -1 {
args.ReleaseResources = true
}
return
diff --git a/agents/kamagent.go b/agents/kamagent.go
new file mode 100644
index 000000000..5d6f8df13
--- /dev/null
+++ b/agents/kamagent.go
@@ -0,0 +1,200 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package agents
+
+import (
+ "fmt"
+ "log"
+ "regexp"
+ "strings"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/sessionmanager"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/kamevapi"
+)
+
+func NewKamailioAgent(kaCfg *config.KamAgentCfg,
+ sessionS *utils.BiRPCInternalClient, timezone string) (ka *KamailioAgent) {
+ ka = &KamailioAgent{cfg: kaCfg, sessionS: sessionS,
+ timezone: timezone,
+ conns: make(map[string]*kamevapi.KamEvapi)}
+ ka.sessionS.SetClientConn(ka) // pass the connection to KA back into smg so we can receive the disconnects
+ return
+}
+
+type KamailioAgent struct {
+ cfg *config.KamAgentCfg
+ sessionS *utils.BiRPCInternalClient
+ timezone string
+ conns map[string]*kamevapi.KamEvapi
+}
+
+func (self *KamailioAgent) Connect() error {
+ var err error
+ eventHandlers := map[*regexp.Regexp][]func([]byte, string){
+ regexp.MustCompile(CGR_AUTH_REQUEST): []func([]byte, string){
+ self.onCgrAuth},
+ regexp.MustCompile(CGR_CALL_START): []func([]byte, string){
+ self.onCallStart},
+ regexp.MustCompile(CGR_CALL_END): []func([]byte, string){self.onCallEnd},
+ }
+ errChan := make(chan error)
+ for _, connCfg := range self.cfg.EvapiConns {
+ connID := utils.GenUUID()
+ logger := log.New(utils.Logger, "kamevapi:", 2)
+ if self.conns[connID], err = kamevapi.NewKamEvapi(connCfg.Address, connID, connCfg.Reconnects, eventHandlers, logger); err != nil {
+ return err
+ }
+ go func() { // Start reading in own goroutine, return on error
+ if err := self.conns[connID].ReadEvents(); err != nil {
+ errChan <- err
+ }
+ }()
+ }
+ err = <-errChan // Will keep the Connect locked until the first error in one of the connections
+ return err
+}
+
+func (self *KamailioAgent) Shutdown() error {
+ return nil
+}
+
+// rpcclient.RpcClientConnection interface
+func (ka *KamailioAgent) Call(serviceMethod string, args interface{}, reply interface{}) error {
+ return utils.APIerRPCCall(ka, serviceMethod, args, reply)
+}
+
+// onCgrAuth is called when new event of type CGR_AUTH_REQUEST is coming
+func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) {
+ kev, err := NewKamEvent(evData)
+ if err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s",
+ utils.KamailioAgent, evData, err.Error()))
+ return
+ }
+ if kev[utils.RequestType] == utils.META_NONE { // Do not process this request
+ return
+ }
+ if kev.MissingParameter() {
+ if kRply, err := kev.AsKamAuthReply(nil, nil, utils.ErrMandatoryIeMissing); err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s",
+ utils.KamailioAgent, kev[utils.OriginID], err.Error()))
+ } else if err = ka.conns[connID].Send(kRply.String()); err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error %s",
+ utils.KamailioAgent, kev[utils.OriginID], err.Error()))
+ }
+ return
+ }
+ authArgs := kev.V1AuthorizeArgs()
+ var authReply sessionmanager.V1AuthorizeReply
+ err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply)
+ if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s",
+ utils.KamailioAgent, kev[utils.OriginID], err.Error()))
+ } else if err = ka.conns[connID].Send(kar.String()); err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error: %s",
+ utils.KamailioAgent, kev[utils.OriginID], err.Error()))
+ }
+}
+
+func (ka *KamailioAgent) onCallStart(evData []byte, connID string) {
+ kev, err := NewKamEvent(evData)
+ if err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event: %s, error: %s",
+ utils.KamailioAgent, evData, err.Error()))
+ return
+ }
+ if kev[utils.RequestType] == utils.META_NONE { // Do not process this request
+ return
+ }
+ if kev.MissingParameter() {
+ ka.disconnectSession(connID,
+ NewKamSessionDisconnect(kev[KamHashEntry], kev[KamHashID],
+ utils.ErrMandatoryIeMissing.Error()))
+ }
+ initSessionArgs := kev.V1InitSessionArgs()
+ initSessionArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID so we can properly disconnect later
+ var initReply sessionmanager.V1InitSessionReply
+ if err := ka.sessionS.Call(utils.SessionSv1InitiateSession,
+ initSessionArgs, &initReply); err != nil {
+ utils.Logger.Err(
+ fmt.Sprintf("<%s> could not process answer for event %s, error: %s",
+ utils.KamailioAgent, kev[utils.OriginID], err.Error()))
+ ka.disconnectSession(connID,
+ NewKamSessionDisconnect(kev[KamHashEntry], kev[KamHashID],
+ utils.ErrServerError.Error()))
+ return
+ }
+}
+
+func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
+ kev, err := NewKamEvent(evData)
+ if err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event: %s, error: %s",
+ utils.KamailioAgent, evData, err.Error()))
+ return
+ }
+ if kev[utils.RequestType] == utils.META_NONE { // Do not process this request
+ return
+ }
+ if kev.MissingParameter() {
+ utils.Logger.Err(fmt.Sprintf("<%s> mandatory IE missing out from event: %s",
+ utils.KamailioAgent, kev[utils.OriginID]))
+ return
+ }
+ var reply string
+ if err := ka.sessionS.Call(utils.SessionSv1TerminateSession,
+ kev.V1TerminateSessionArgs(), &reply); err != nil {
+ utils.Logger.Err(
+ fmt.Sprintf("<%s> could not terminate session with event %s, error: %s",
+ utils.KamailioAgent, kev[utils.OriginID], err.Error()))
+ // no return here since we want CDR anyhow
+ }
+ if ka.cfg.CreateCdr || strings.Index(kev[KamCGRSubsystems], utils.MetaCDRs) != -1 {
+ cdr := kev.AsCDR(ka.timezone)
+ if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR, cdr, &reply); err != nil {
+ utils.Logger.Err(fmt.Sprintf("%s> failed processing CDR: %s, error: %s",
+ utils.KamailioAgent, utils.ToJSON(cdr), err.Error()))
+ }
+ }
+}
+
+func (self *KamailioAgent) disconnectSession(connID string, dscEv *KamSessionDisconnect) error {
+ if err := self.conns[connID].Send(dscEv.String()); err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %s, error %s",
+ utils.KamailioAgent, utils.ToJSON(dscEv), err.Error(), connID))
+ return err
+ }
+ return nil
+}
+
+// Internal method to disconnect session in Kamailio
+func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) (err error) {
+ hEntry, _ := utils.CastFieldIfToString(args.EventStart[KamHashEntry])
+ hID, _ := utils.CastFieldIfToString(args.EventStart[KamHashID])
+ connID, _ := utils.CastFieldIfToString(args.EventStart[EvapiConnID])
+ if err = ka.disconnectSession(connID,
+ NewKamSessionDisconnect(hEntry, hID,
+ utils.ErrInsufficientCredit.Error())); err != nil {
+ return
+ }
+ *reply = utils.OK
+ return
+}
diff --git a/agents/kamevent.go b/agents/kamevent.go
new file mode 100644
index 000000000..66c28ffa2
--- /dev/null
+++ b/agents/kamevent.go
@@ -0,0 +1,282 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package agents
+
+import (
+ "encoding/json"
+ "strings"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/sessionmanager"
+ "github.com/cgrates/cgrates/utils"
+)
+
+const (
+ EVENT = "event"
+ CGR_AUTH_REQUEST = "CGR_AUTH_REQUEST"
+ CGR_AUTH_REPLY = "CGR_AUTH_REPLY"
+ CGR_SESSION_DISCONNECT = "CGR_SESSION_DISCONNECT"
+ CGR_CALL_START = "CGR_CALL_START"
+ CGR_CALL_END = "CGR_CALL_END"
+ KamTRIndex = "tr_index"
+ KamTRLabel = "tr_label"
+ KamHashEntry = "h_entry"
+ KamHashID = "h_id"
+ KamCGRSubsystems = "cgr_subsystems"
+ EvapiConnID = "EvapiConnID" // used to share connID info in event for remote disconnects
+)
+
+var kamReservedFields = []string{EVENT, KamTRIndex, KamTRLabel,
+ KamHashEntry, KamHashID, KamCGRSubsystems}
+
+func NewKamSessionDisconnect(hEntry, hID, reason string) *KamSessionDisconnect {
+ return &KamSessionDisconnect{
+ Event: CGR_SESSION_DISCONNECT,
+ HashEntry: hEntry,
+ HashId: hID,
+ Reason: reason}
+}
+
+type KamSessionDisconnect struct {
+ Event string
+ HashEntry string
+ HashId string
+ Reason string
+}
+
+func (self *KamSessionDisconnect) String() string {
+ mrsh, _ := json.Marshal(self)
+ return string(mrsh)
+}
+
+// NewKamEvent parses bytes received over the wire from Kamailio into KamEvent
+func NewKamEvent(kamEvData []byte) (KamEvent, error) {
+ kev := make(map[string]string)
+ if err := json.Unmarshal(kamEvData, &kev); err != nil {
+ return nil, err
+ }
+ return kev, nil
+}
+
+// KamEvent represents one event received from Kamailio
+type KamEvent map[string]string
+
+func (kev KamEvent) MissingParameter() bool {
+ switch kev[EVENT] {
+ case CGR_AUTH_REQUEST:
+ return utils.IsSliceMember([]string{
+ kev[KamTRIndex],
+ kev[KamTRLabel],
+ kev[utils.SetupTime],
+ kev[utils.Account],
+ kev[utils.Destination],
+ }, "")
+ case CGR_CALL_START:
+ return utils.IsSliceMember([]string{
+ kev[KamHashEntry],
+ kev[KamHashID],
+ kev[utils.OriginID],
+ kev[utils.AnswerTime],
+ kev[utils.Account],
+ kev[utils.Destination],
+ }, "")
+ case CGR_CALL_END:
+ return utils.IsSliceMember([]string{
+ kev[KamHashEntry],
+ kev[KamHashID],
+ kev[utils.OriginID],
+ kev[utils.AnswerTime],
+ kev[utils.Account],
+ kev[utils.Destination],
+ }, "")
+ default: // no/unsupported event
+ return true
+ }
+
+}
+
+// AsMapStringIface converts KamEvent into event used by other subsystems
+func (kev KamEvent) AsMapStringInterface() (mp map[string]interface{}) {
+ mp = make(map[string]interface{})
+ for k, v := range kev {
+ if !utils.IsSliceMember(kamReservedFields, k) { // reserved attributes not getting into event
+ mp[k] = v
+ }
+ }
+ return
+}
+
+// AsCDR converts KamEvent into CDR
+func (kev KamEvent) AsCDR(timezone string) (cdr *engine.CDR) {
+ cdr = new(engine.CDR)
+ for fld, val := range kev { // first ExtraFields so we can overwrite
+ if !utils.IsSliceMember(utils.PrimaryCdrFields, fld) &&
+ !utils.IsSliceMember(kamReservedFields, fld) {
+ cdr.ExtraFields[fld] = val
+ }
+ }
+ cdr.ToR = utils.VOICE
+ cdr.OriginID = kev[utils.OriginID]
+ cdr.OriginHost = kev[utils.OriginHost]
+ cdr.Source = "KamailioEvent"
+ cdr.RequestType = utils.FirstNonEmpty(kev[utils.RequestType], config.CgrConfig().DefaultReqType)
+ cdr.Tenant = utils.FirstNonEmpty(kev[utils.Tenant], config.CgrConfig().DefaultTenant)
+ cdr.Category = utils.FirstNonEmpty(kev[utils.Category], config.CgrConfig().DefaultCategory)
+ cdr.Account = kev[utils.Account]
+ cdr.Subject = kev[utils.Subject]
+ cdr.Destination = kev[utils.Destination]
+ cdr.SetupTime, _ = utils.ParseTimeDetectLayout(kev[utils.SetupTime], timezone)
+ cdr.AnswerTime, _ = utils.ParseTimeDetectLayout(kev[utils.AnswerTime], timezone)
+ cdr.Usage, _ = utils.ParseDurationWithSecs(kev[utils.Usage])
+ cdr.Cost = -1
+ return cdr
+}
+
+// String is used for pretty printing event in logs
+func (kev KamEvent) String() string {
+ mrsh, _ := json.Marshal(kev)
+ return string(mrsh)
+}
+
+func (kev KamEvent) V1AuthorizeArgs() (args *sessionmanager.V1AuthorizeArgs) {
+ args = &sessionmanager.V1AuthorizeArgs{
+ GetMaxUsage: true,
+ CGREvent: utils.CGREvent{
+ Tenant: utils.FirstNonEmpty(kev[utils.Tenant],
+ config.CgrConfig().DefaultTenant),
+ ID: utils.UUIDSha1Prefix(),
+ Event: kev.AsMapStringInterface(),
+ },
+ }
+ subsystems, has := kev[KamCGRSubsystems]
+ if !has {
+ return
+ }
+ if strings.Index(subsystems, utils.MetaAccounts) == -1 {
+ args.GetMaxUsage = false
+ }
+ if strings.Index(subsystems, utils.MetaResources) != -1 {
+ args.AuthorizeResources = true
+ }
+ if strings.Index(subsystems, utils.MetaSuppliers) != -1 {
+ args.GetSuppliers = true
+ }
+ if strings.Index(subsystems, utils.MetaAttributes) != -1 {
+ args.GetAttributes = true
+ }
+ return
+}
+
+// AsKamAuthReply builds up a Kamailio AuthReply based on arguments and reply from SessionS
+func (kev KamEvent) AsKamAuthReply(authArgs *sessionmanager.V1AuthorizeArgs,
+ authReply *sessionmanager.V1AuthorizeReply, rplyErr error) (kar *KamAuthReply, err error) {
+ kar = &KamAuthReply{Event: CGR_AUTH_REPLY,
+ TransactionIndex: kev[KamTRIndex],
+ TransactionLabel: kev[KamTRLabel],
+ }
+ if rplyErr != nil {
+ kar.Error = rplyErr.Error()
+ return
+ }
+ if authArgs.GetAttributes && authReply.Attributes != nil {
+ kar.Attributes = authReply.Attributes.Digest()
+ }
+ if authArgs.AuthorizeResources {
+ kar.ResourceAllocation = *authReply.ResourceAllocation
+ }
+ if authArgs.GetMaxUsage {
+ if *authReply.MaxUsage == -1 { // For calls different than unlimited, set limits
+ kar.MaxUsage = -1
+ } else {
+ kar.MaxUsage = int(utils.Round(authReply.MaxUsage.Seconds(), 0, utils.ROUNDING_MIDDLE))
+ }
+ }
+ if authArgs.GetSuppliers && authReply.Suppliers != nil {
+ kar.Suppliers = authReply.Suppliers.Digest()
+ }
+ return
+}
+
+// V1InitSessionArgs returns the arguments used in SessionSv1.InitSession
+func (kev KamEvent) V1InitSessionArgs() (args *sessionmanager.V1InitSessionArgs) {
+ args = &sessionmanager.V1InitSessionArgs{ // defaults
+ InitSession: true,
+ CGREvent: utils.CGREvent{
+ Tenant: utils.FirstNonEmpty(kev[utils.Tenant],
+ config.CgrConfig().DefaultTenant),
+ ID: utils.UUIDSha1Prefix(),
+ Event: kev.AsMapStringInterface(),
+ },
+ }
+ subsystems, has := kev[KamCGRSubsystems]
+ if !has {
+ return
+ }
+ if strings.Index(subsystems, utils.MetaAccounts) == -1 {
+ args.InitSession = false
+ }
+ if strings.Index(subsystems, utils.MetaResources) != -1 {
+ args.AllocateResources = true
+ }
+ if strings.Index(subsystems, utils.MetaAttributes) != -1 {
+ args.GetAttributes = true
+ }
+ return
+}
+
+// V1TerminateSessionArgs returns the arguments used in SMGv1.TerminateSession
+func (kev KamEvent) V1TerminateSessionArgs() (args *sessionmanager.V1TerminateSessionArgs) {
+ args = &sessionmanager.V1TerminateSessionArgs{ // defaults
+ TerminateSession: true,
+ CGREvent: utils.CGREvent{
+ Tenant: utils.FirstNonEmpty(kev[utils.Tenant],
+ config.CgrConfig().DefaultTenant),
+ ID: utils.UUIDSha1Prefix(),
+ Event: kev.AsMapStringInterface(),
+ },
+ }
+ subsystems, has := kev[KamCGRSubsystems]
+ if !has {
+ return
+ }
+ if strings.Index(subsystems, utils.MetaAccounts) == -1 {
+ args.TerminateSession = false
+ }
+ if strings.Index(subsystems, utils.MetaResources) != -1 {
+ args.ReleaseResources = true
+ }
+ return
+}
+
+type KamAuthReply struct {
+ Event string // Kamailio will use this to differentiate between requests and replies
+ TransactionIndex string // Original transaction index
+ TransactionLabel string // Original transaction label
+ Attributes string
+ ResourceAllocation string
+ MaxUsage int // Maximum session time in case of success, -1 for unlimited
+ Suppliers string // List of suppliers, comma separated
+ Error string // Reply in case of error
+}
+
+func (self *KamAuthReply) String() string {
+ mrsh, _ := json.Marshal(self)
+ return string(mrsh)
+}
diff --git a/agents/kamevent_test.go b/agents/kamevent_test.go
new file mode 100644
index 000000000..1e4cfd59f
--- /dev/null
+++ b/agents/kamevent_test.go
@@ -0,0 +1,58 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package agents
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/cgrates/cgrates/utils"
+)
+
+var kamEv = KamEvent{KamTRIndex: "29223", KamTRLabel: "698469260",
+ "callid": "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ", "from_tag": "eb082607", "to_tag": "4ea9687f", "cgr_account": "dan",
+ "cgr_reqtype": utils.META_PREPAID, "cgr_subject": "dan", "cgr_destination": "+4986517174963", "cgr_tenant": "itsyscom.com",
+ "cgr_duration": "20", utils.CGR_SUPPLIER: "suppl2", utils.CGR_DISCONNECT_CAUSE: "200", "extra1": "val1", "extra2": "val2"}
+
+func TestNewKamEvent(t *testing.T) {
+ evStr := `{"event":"CGR_CALL_END",
+ "callid":"46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0",
+ "from_tag":"bf71ad59",
+ "to_tag":"7351fecf",
+ "cgr_reqtype":"*postpaid",
+ "cgr_account":"1001",
+ "cgr_destination":"1002",
+ "cgr_answertime":"1419839310",
+ "cgr_duration":"3",
+ "cgr_supplier":"supplier2",
+ "cgr_disconnectcause": "200",
+ "cgr_pdd": "4"}`
+ eKamEv := KamEvent{"event": "CGR_CALL_END",
+ "callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0",
+ "from_tag": "bf71ad59", "to_tag": "7351fecf",
+ "cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001",
+ "cgr_destination": "1002", "cgr_answertime": "1419839310",
+ "cgr_duration": "3", "cgr_pdd": "4",
+ utils.CGR_SUPPLIER: "supplier2",
+ utils.CGR_DISCONNECT_CAUSE: "200"}
+ if kamEv, err := NewKamEvent([]byte(evStr)); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(eKamEv, kamEv) {
+ t.Error("Received: ", kamEv)
+ }
+}
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index c44a33720..9142a037a 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -305,40 +305,17 @@ func startFsAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan c
exitChan <- true
}
-func startSmKamailio(internalRaterChan, internalCDRSChan, internalRsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
+func startKamAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
var err error
- utils.Logger.Info("Starting CGRateS SMKamailio service.")
- var ralsConn, cdrsConn, rlSConn *rpcclient.RpcClientPool
- if len(cfg.SmKamConfig.RALsConns) != 0 {
- ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
- cfg.SmKamConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error()))
- exitChan <- true
- return
- }
- }
- if len(cfg.SmKamConfig.CDRsConns) != 0 {
- cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
- cfg.SmKamConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRs: %s", err.Error()))
- exitChan <- true
- return
- }
- }
- if len(cfg.SmKamConfig.RLsConns) != 0 {
- rlSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
- cfg.SmKamConfig.RLsConns, internalRsChan, cfg.InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf(" Could not connect to RLsConns: %s", err.Error()))
- exitChan <- true
- return
- }
- }
- sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, ralsConn, cdrsConn, rlSConn, cfg.DefaultTimezone)
- if err = sm.Connect(); err != nil {
- utils.Logger.Err(fmt.Sprintf(" error: %s!", err))
+ utils.Logger.Info("Starting Kamailio agent")
+ smgRpcConn := <-internalSMGChan
+ internalSMGChan <- smgRpcConn
+ birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessionmanager.SMGeneric))
+ ka := agents.NewKamailioAgent(cfg.KamAgentCfg(),
+ birpcClnt, utils.FirstNonEmpty(cfg.KamAgentCfg().Timezone, cfg.DefaultTimezone))
+
+ if err = ka.Connect(); err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
}
exitChan <- true
}
@@ -906,8 +883,8 @@ func main() {
}
// Start SM-Kamailio
- if cfg.SmKamConfig.Enabled {
- go startSmKamailio(internalRaterChan, internalCdrSChan, internalRsChan, cdrDb, exitChan)
+ if cfg.KamAgentCfg().Enabled {
+ go startKamAgent(internalSMGChan, exitChan)
}
if cfg.AsteriskAgentCfg().Enabled {
diff --git a/config/config_defaults.go b/config/config_defaults.go
index ed40b1bf2..505251bb4 100755
--- a/config/config_defaults.go
+++ b/config/config_defaults.go
@@ -340,6 +340,7 @@ const CGRATES_CFG_JSON = `
{"address": "*internal"} // connection towards session service: <*internal>
],
"create_cdr": false, // create CDR out of events and sends them to CDRS component
+ "timezone": "", // timezone of the Kamailio server
"evapi_conns":[ // instantiate connections to multiple Kamailio servers
{"address": "127.0.0.1:8448", "reconnects": 5}
],
diff --git a/config/kamagent.go b/config/kamagent.go
index 36e26a4b1..12bc9b4aa 100644
--- a/config/kamagent.go
+++ b/config/kamagent.go
@@ -43,6 +43,7 @@ type KamAgentCfg struct {
SessionSConns []*HaPoolConfig
CreateCdr bool
EvapiConns []*KamConnConfig
+ Timezone string
}
func (ka *KamAgentCfg) loadFromJsonCfg(jsnCfg *KamAgentJsonCfg) error {
diff --git a/engine/cgrcdr_test.go b/engine/cgrcdr_test.go
index 9ac100bc1..56a4bfcca 100644
--- a/engine/cgrcdr_test.go
+++ b/engine/cgrcdr_test.go
@@ -59,11 +59,19 @@ func TestCgrCdrAsCDR(t *testing.T) {
// Make sure the replicated CDR matches the expected CDR
func TestReplicatedCgrCdrAsCDR(t *testing.T) {
- cgrCdr := CgrCdr{utils.CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41", utils.TOR: utils.VOICE, utils.OriginID: "dsafdsaf", utils.OriginHost: "192.168.1.1",
- utils.Source: "internal_test", utils.RequestType: utils.META_RATED,
- utils.Direction: utils.OUT, utils.Tenant: "cgrates.org", utils.Category: "call",
- utils.Account: "1001", utils.Subject: "1001", utils.Destination: "1002", utils.SetupTime: "2013-11-07T08:42:20Z", utils.PDD: "0.200", utils.AnswerTime: "2013-11-07T08:42:26Z",
- utils.Usage: "10s", utils.SUPPLIER: "SUPPL1", utils.DISCONNECT_CAUSE: "NORMAL_CLEARING", utils.COST: "0.12", utils.RATED: "true", "field_extr1": "val_extr1", "fieldextr2": "valextr2"}
+ cgrCdr := CgrCdr{utils.CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41",
+ utils.TOR: utils.VOICE, utils.OriginID: "dsafdsaf",
+ utils.OriginHost: "192.168.1.1",
+ utils.Source: "internal_test",
+ utils.RequestType: utils.META_RATED,
+ utils.Tenant: "cgrates.org", utils.Category: "call",
+ utils.Account: "1001", utils.Subject: "1001",
+ utils.Destination: "1002",
+ utils.SetupTime: "2013-11-07T08:42:20Z",
+ utils.AnswerTime: "2013-11-07T08:42:26Z",
+ utils.Usage: "10s", utils.COST: "0.12",
+ utils.RATED: "true", "field_extr1": "val_extr1",
+ "fieldextr2": "valextr2"}
expctRtCdr := &CDR{CGRID: cgrCdr[utils.CGRID],
ToR: cgrCdr[utils.TOR],
OriginID: cgrCdr[utils.OriginID],
diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go
deleted file mode 100644
index 4aca0c369..000000000
--- a/sessionmanager/kamailiosm.go
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package sessionmanager
-
-import (
- "errors"
- "fmt"
- "log"
- "reflect"
- "regexp"
- "time"
-
- "github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
- "github.com/cgrates/kamevapi"
- "github.com/cgrates/rpcclient"
-)
-
-func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv,
- rlS rpcclient.RpcClientConnection, timezone string) (ksm *KamailioSessionManager, err error) {
- if rlS != nil && reflect.ValueOf(rlS).IsNil() {
- rlS = nil
- }
- ksm = &KamailioSessionManager{cfg: smKamCfg, rater: rater, cdrsrv: cdrsrv, rlS: rlS,
- timezone: timezone, conns: make(map[string]*kamevapi.KamEvapi), sessions: NewSessions()}
- return
-}
-
-type KamailioSessionManager struct {
- cfg *config.SmKamConfig
- rater rpcclient.RpcClientConnection
- cdrsrv rpcclient.RpcClientConnection
- rlS rpcclient.RpcClientConnection
- timezone string
- conns map[string]*kamevapi.KamEvapi
- sessions *Sessions
-}
-
-func (self *KamailioSessionManager) getSuppliers(kev KamEvent) (string, error) {
- cd, err := kev.AsCallDescriptor()
- cd.CgrID = kev.GetCgrId(self.timezone)
- if err != nil {
- utils.Logger.Info(fmt.Sprintf(" LCR_PREPROCESS_ERROR error: %s", err.Error()))
- return "", errors.New("LCR_PREPROCESS_ERROR")
- }
- var lcr engine.LCRCost
- if err = self.Rater().Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
- utils.Logger.Info(fmt.Sprintf(" LCR_API_ERROR error: %s", err.Error()))
- return "", errors.New("LCR_API_ERROR")
- }
- if lcr.HasErrors() {
- lcr.LogErrors()
- return "", errors.New("LCR_COMPUTE_ERROR")
- }
- return lcr.SuppliersString()
-}
-
-func (self *KamailioSessionManager) allocateResources(kev KamEvent) (err error) {
- if self.rlS == nil {
- return errors.New("no RLs connection")
- }
- var ev map[string]interface{}
- if ev, err = kev.AsMapStringIface(); err != nil {
- return
- }
- attrRU := utils.ArgRSv1ResourceUsage{
- CGREvent: utils.CGREvent{
- Tenant: kev.GetTenant(utils.META_DEFAULT),
- Event: ev,
- },
- UsageID: kev.GetUUID(),
- Units: 1, // One channel reserved
- }
- var reply string
- return self.rlS.Call(utils.ResourceSv1AllocateResources, attrRU, &reply)
-}
-
-func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) {
- kev, err := NewKamEvent(evData)
- if err != nil {
- utils.Logger.Info(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
- return
- }
- if kev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
- return
- }
- if kev.MissingParameter(self.timezone) {
- if kar, err := kev.AsKamAuthReply(0.0, "", false, "", utils.ErrMandatoryIeMissing); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Failed building auth reply %s", err.Error()))
- } else if err = self.conns[connId].Send(kar.String()); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Failed sending auth reply %s", err.Error()))
- }
- return
- }
- var remainingDuration float64
- var errReply error
- if errReply = self.rater.Call("Responder.GetDerivedMaxSessionTime",
- kev.AsCDR(self.timezone), &remainingDuration); errReply != nil {
- utils.Logger.Err(fmt.Sprintf(" Could not get max session time, error: %s", errReply.Error()))
- }
- var supplStr string
- var errSuppl error
- if kev.ComputeLcr() {
- if supplStr, errSuppl = self.getSuppliers(kev); errSuppl != nil {
- utils.Logger.Err(fmt.Sprintf(" Could not get suppliers, error: %s", errSuppl.Error()))
- }
- }
- if errReply == nil { // Overwrite the error from maxSessionTime with the one from suppliers if nil
- errReply = errSuppl
- }
- resourceAllowed := true
- if self.rlS != nil {
- if err := self.allocateResources(kev); err != nil {
- utils.Logger.Err(fmt.Sprintf(" RLs error: %s", err.Error()))
- resourceAllowed = false
- }
- }
- if kar, err := kev.AsKamAuthReply(remainingDuration, supplStr, resourceAllowed, "", errReply); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Failed building auth reply %s", err.Error()))
- } else if err = self.conns[connId].Send(kar.String()); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Failed sending auth reply %s", err.Error()))
- }
-}
-
-func (self *KamailioSessionManager) onCgrLcrReq(evData []byte, connId string) {
- kev, err := NewKamEvent(evData)
- if err != nil {
- utils.Logger.Info(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", string(evData), err.Error()))
- return
- }
- supplStr, err := self.getSuppliers(kev)
- kamLcrReply, errReply := kev.AsKamAuthReply(0, supplStr, false, "", err)
- kamLcrReply.Event = CGR_LCR_REPLY // Hit the CGR_LCR_REPLY event route on Kamailio side
- if errReply != nil {
- utils.Logger.Err(fmt.Sprintf(" Failed building LCR reply %s", errReply.Error()))
- } else if err = self.conns[connId].Send(kamLcrReply.String()); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Failed sending LCR reply %s", err.Error()))
- }
-}
-
-// onCgrRLReq is the handler for CGR_RL_REQUEST events coming from Kamailio
-func (self *KamailioSessionManager) onCgrRLReq(evData []byte, connId string) {
- kev, err := NewKamEvent(evData)
- if err != nil {
- utils.Logger.Info(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", string(evData), err.Error()))
- return
- }
- resourceAllowed := true
- if err := self.allocateResources(kev); err != nil {
- utils.Logger.Err(fmt.Sprintf(" RLs error: %s", err.Error()))
- resourceAllowed = false
- }
- kamRLReply, errReply := kev.AsKamAuthReply(0, "", resourceAllowed, "", err)
- kamRLReply.Event = CGR_RL_REPLY // Hit the CGR_LCR_REPLY event route on Kamailio side
- if errReply != nil {
- utils.Logger.Err(fmt.Sprintf(" Failed building RL reply %s", errReply.Error()))
- } else if err = self.conns[connId].Send(kamRLReply.String()); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Failed sending RL reply %s", err.Error()))
- }
-}
-
-func (self *KamailioSessionManager) onCallStart(evData []byte, connId string) {
- kamEv, err := NewKamEvent(evData)
- if err != nil {
- utils.Logger.Err(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
- return
- }
- if kamEv.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
- return
- }
- if kamEv.MissingParameter(self.timezone) {
- self.DisconnectSession(kamEv, connId, utils.ErrMandatoryIeMissing.Error())
- return
- }
- s := NewSession(kamEv, connId, self)
- if s != nil {
- self.sessions.indexSession(s)
- }
-}
-
-func (self *KamailioSessionManager) onCallEnd(evData []byte, connId string) {
- kev, err := NewKamEvent(evData)
- if err != nil {
- utils.Logger.Err(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
- return
- }
- if kev.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
- return
- }
- if kev.MissingParameter(self.timezone) {
- utils.Logger.Err(fmt.Sprintf(" Mandatory IE missing out of event: %+v", kev))
- }
- go self.ProcessCdr(kev.AsCDR(self.Timezone()))
- if self.rlS != nil { // Release RLs resource
- go func() {
- ev, err := kev.AsMapStringIface()
- if err != nil {
- utils.Logger.Err(fmt.Sprintf(" RLs error: %s", err.Error()))
- return
- }
- var reply string
- attrRU := utils.ArgRSv1ResourceUsage{
- CGREvent: utils.CGREvent{
- Tenant: kev.GetTenant(utils.META_DEFAULT),
- Event: ev,
- },
- UsageID: kev.GetUUID(),
- Units: 1,
- }
- if err := self.rlS.Call(utils.ResourceSv1ReleaseResources, attrRU, &reply); err != nil {
- utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error()))
- }
- }()
- }
- if s := self.sessions.getSession(kev.GetUUID()); s != nil {
- if err := self.sessions.removeSession(s, kev); err != nil {
- utils.Logger.Err(err.Error())
- }
- }
-
-}
-
-func (self *KamailioSessionManager) Connect() error {
- var err error
- eventHandlers := map[*regexp.Regexp][]func([]byte, string){
- regexp.MustCompile(CGR_AUTH_REQUEST): []func([]byte, string){self.onCgrAuth},
- regexp.MustCompile(CGR_LCR_REQUEST): []func([]byte, string){self.onCgrLcrReq},
- regexp.MustCompile(CGR_RL_REQUEST): []func([]byte, string){self.onCgrRLReq},
- regexp.MustCompile(CGR_CALL_START): []func([]byte, string){self.onCallStart},
- regexp.MustCompile(CGR_CALL_END): []func([]byte, string){self.onCallEnd},
- }
- errChan := make(chan error)
- for _, connCfg := range self.cfg.EvapiConns {
- connId := utils.GenUUID()
- logger := log.New(utils.Logger, "KamEvapi:", 2)
- if self.conns[connId], err = kamevapi.NewKamEvapi(connCfg.Address, connId, connCfg.Reconnects, eventHandlers, logger); err != nil {
- return err
- }
- go func() { // Start reading in own goroutine, return on error
- if err := self.conns[connId].ReadEvents(); err != nil {
- errChan <- err
- }
- }()
- }
- err = <-errChan // Will keep the Connect locked until the first error in one of the connections
- return err
-}
-
-func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
- sessionIds := ev.GetSessionIds()
- disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: sessionIds[0], HashId: sessionIds[1], Reason: notify}
- if err := self.conns[connId].Send(disconnectEv.String()); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Failed sending disconnect request, error %s, connection id: %s", err.Error(), connId))
- return err
- }
- return nil
-}
-
-func (self *KamailioSessionManager) DebitInterval() time.Duration {
- return self.cfg.DebitInterval
-}
-func (self *KamailioSessionManager) CdrSrv() rpcclient.RpcClientConnection {
- return self.cdrsrv
-}
-func (self *KamailioSessionManager) Rater() rpcclient.RpcClientConnection {
- return self.rater
-}
-
-func (self *KamailioSessionManager) ProcessCdr(cdr *engine.CDR) error {
- if !self.cfg.CreateCdr {
- return nil
- }
- var reply string
- if err := self.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CGRID, cdr.OriginID, err.Error()))
- }
- return nil
-}
-
-func (sm *KamailioSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
-}
-
-func (self *KamailioSessionManager) Shutdown() error {
- return nil
-}
-
-func (self *KamailioSessionManager) Sessions() []*Session {
- return self.sessions.getSessions()
-}
-
-func (self *KamailioSessionManager) SyncSessions() error {
- return nil
-}
-
-func (self *KamailioSessionManager) Timezone() string {
- return self.timezone
-}
diff --git a/sessionmanager/kamailiosm_test.go b/sessionmanager/kamailiosm_test.go
deleted file mode 100644
index 3562e8337..000000000
--- a/sessionmanager/kamailiosm_test.go
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-package sessionmanager
-
-import (
- "testing"
-)
-
-func TestKamSMInterface(t *testing.T) {
- var _ SessionManager = SessionManager(new(KamailioSessionManager))
-}
diff --git a/sessionmanager/kamevent.go b/sessionmanager/kamevent.go
deleted file mode 100644
index eed1da06e..000000000
--- a/sessionmanager/kamevent.go
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package sessionmanager
-
-import (
- "encoding/json"
- "strconv"
- "strings"
- "time"
-
- "github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
-)
-
-const (
- EVENT = "event"
- CGR_AUTH_REQUEST = "CGR_AUTH_REQUEST"
- CGR_LCR_REQUEST = "CGR_LCR_REQUEST"
- CGR_AUTH_REPLY = "CGR_AUTH_REPLY"
- CGR_LCR_REPLY = "CGR_LCR_REPLY"
- CGR_SESSION_DISCONNECT = "CGR_SESSION_DISCONNECT"
- CGR_CALL_START = "CGR_CALL_START"
- CGR_CALL_END = "CGR_CALL_END"
- CGR_RL_REQUEST = "CGR_RL_REQUEST"
- CGR_RL_REPLY = "CGR_RL_REPLY"
- CGR_SETUPTIME = "cgr_setuptime"
- CGR_ANSWERTIME = "cgr_answertime"
- CGR_STOPTIME = "cgr_stoptime"
- CGR_DURATION = "cgr_duration"
- CGR_PDD = "cgr_pdd"
-
- KAM_TR_INDEX = "tr_index"
- KAM_TR_LABEL = "tr_label"
- HASH_ENTRY = "h_entry"
- HASH_ID = "h_id"
-)
-
-var primaryFields = []string{EVENT, CALLID, FROM_TAG, HASH_ENTRY, HASH_ID, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION,
- CGR_CATEGORY, CGR_TENANT, CGR_REQTYPE, CGR_ANSWERTIME, CGR_SETUPTIME, CGR_STOPTIME, CGR_DURATION, CGR_PDD, utils.CGR_SUPPLIER, utils.CGR_DISCONNECT_CAUSE}
-
-type KamAuthReply struct {
- Event string // Kamailio will use this to differentiate between requests and replies
- TransactionIndex int // Original transaction index
- TransactionLabel int // Original transaction label
- MaxSessionTime int // Maximum session time in case of success, -1 for unlimited
- Suppliers string // List of suppliers, comma separated
- ResourceAllocated bool
- AllocationMessage string
- Error string // Reply in case of error
-}
-
-func (self *KamAuthReply) String() string {
- mrsh, _ := json.Marshal(self)
- return string(mrsh)
-}
-
-type KamLcrReply struct {
- Event string
- Suppliers string
- Error error
-}
-
-func (self *KamLcrReply) String() string {
- self.Event = CGR_LCR_REPLY
- mrsh, _ := json.Marshal(self)
- return string(mrsh)
-}
-
-type KamSessionDisconnect struct {
- Event string
- HashEntry string
- HashId string
- Reason string
-}
-
-func (self *KamSessionDisconnect) String() string {
- mrsh, _ := json.Marshal(self)
- return string(mrsh)
-}
-
-func NewKamEvent(kamEvData []byte) (KamEvent, error) {
- kev := make(map[string]string)
- if err := json.Unmarshal(kamEvData, &kev); err != nil {
- return nil, err
- }
- return kev, nil
-}
-
-// Hold events received from Kamailio
-type KamEvent map[string]string
-
-// Backwards compatibility, should be AsEvent
-func (kev KamEvent) AsEvent(ignored string) engine.Event {
- return engine.Event(kev)
-}
-
-func (kev KamEvent) GetName() string {
- return kev[EVENT]
-}
-func (kev KamEvent) GetCgrId(timezone string) string {
- setupTime, _ := kev.GetSetupTime(utils.META_DEFAULT, timezone)
- return utils.Sha1(kev.GetUUID(), setupTime.UTC().String())
-}
-func (kev KamEvent) GetUUID() string {
- return kev[CALLID] + ";" + kev[FROM_TAG] // ToTag not available in callStart event
-}
-func (kev KamEvent) GetSessionIds() []string {
- return []string{kev[HASH_ENTRY], kev[HASH_ID]}
-}
-func (kev KamEvent) GetDirection(fieldName string) string {
- return utils.OUT
-}
-func (kev KamEvent) GetAccount(fieldName string) string {
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- return fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.FirstNonEmpty(kev[fieldName], kev[CGR_ACCOUNT])
-}
-
-func (kev KamEvent) GetSubject(fieldName string) string {
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- return fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.FirstNonEmpty(kev[fieldName], kev[CGR_SUBJECT], kev.GetAccount(fieldName))
-}
-func (kev KamEvent) GetDestination(fieldName string) string {
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- return fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.FirstNonEmpty(kev[fieldName], kev[CGR_DESTINATION])
-}
-func (kev KamEvent) GetCallDestNr(fieldName string) string {
- return kev.GetDestination(utils.META_DEFAULT)
-}
-func (kev KamEvent) GetCategory(fieldName string) string {
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- return fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.FirstNonEmpty(kev[fieldName], kev[CGR_CATEGORY], config.CgrConfig().DefaultCategory)
-}
-func (kev KamEvent) GetTenant(fieldName string) string {
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- return fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.FirstNonEmpty(kev[fieldName], kev[CGR_TENANT], config.CgrConfig().DefaultTenant)
-}
-func (kev KamEvent) GetReqType(fieldName string) string {
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- return fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.FirstNonEmpty(kev[fieldName], kev[CGR_REQTYPE], config.CgrConfig().DefaultReqType)
-}
-func (kev KamEvent) GetAnswerTime(fieldName, timezone string) (time.Time, error) {
- aTimeStr := utils.FirstNonEmpty(kev[fieldName], kev[CGR_ANSWERTIME])
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- aTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.ParseTimeDetectLayout(aTimeStr, timezone)
-}
-func (kev KamEvent) GetSetupTime(fieldName, timezone string) (time.Time, error) {
- sTimeStr := utils.FirstNonEmpty(kev[fieldName], kev[CGR_SETUPTIME], kev[CGR_ANSWERTIME])
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- sTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.ParseTimeDetectLayout(sTimeStr, timezone)
-}
-func (kev KamEvent) GetEndTime(fieldName, timezone string) (time.Time, error) {
- return utils.ParseTimeDetectLayout(kev[CGR_STOPTIME], timezone)
-}
-func (kev KamEvent) GetDuration(fieldName string) (time.Duration, error) {
- durStr := utils.FirstNonEmpty(kev[fieldName], kev[CGR_DURATION])
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- durStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.ParseDurationWithSecs(durStr)
-}
-func (kev KamEvent) GetPdd(fieldName string) (time.Duration, error) {
- var pddStr string
- if utils.IsSliceMember([]string{utils.PDD, utils.META_DEFAULT}, fieldName) {
- pddStr = kev[CGR_PDD]
- } else if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- pddStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
- } else {
- pddStr = kev[fieldName]
- }
- return utils.ParseDurationWithSecs(pddStr)
-}
-func (kev KamEvent) GetSupplier(fieldName string) string {
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- return fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.FirstNonEmpty(kev[fieldName], kev[utils.CGR_SUPPLIER])
-}
-
-func (kev KamEvent) GetDisconnectCause(fieldName string) string {
- if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
- return fieldName[len(utils.STATIC_VALUE_PREFIX):]
- }
- return utils.FirstNonEmpty(kev[fieldName], kev[utils.CGR_DISCONNECT_CAUSE])
-}
-
-//ToDo: extract the IP of the kamailio server generating the event
-func (kev KamEvent) GetOriginatorIP(string) string {
- return "127.0.0.1"
-}
-func (kev KamEvent) GetExtraFields() map[string]string {
- extraFields := make(map[string]string)
- for field, val := range kev {
- if !utils.IsSliceMember(primaryFields, field) {
- extraFields[field] = val
- }
- }
- return extraFields
-}
-func (kev KamEvent) GetCdrSource() string {
- return "KAMAILIO_" + kev.GetName()
-}
-
-func (kev KamEvent) MissingParameter(timezone string) bool {
- var nullTime time.Time
- switch kev.GetName() {
- case CGR_AUTH_REQUEST:
- if setupTime, err := kev.GetSetupTime(utils.META_DEFAULT, timezone); err != nil || setupTime == nullTime {
- return true
- }
- return len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
- len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
- len(kev[KAM_TR_INDEX]) == 0 || len(kev[KAM_TR_LABEL]) == 0
- case CGR_LCR_REQUEST:
- return len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
- len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
- len(kev[KAM_TR_INDEX]) == 0 || len(kev[KAM_TR_LABEL]) == 0
- case CGR_CALL_START:
- if aTime, err := kev.GetAnswerTime(utils.META_DEFAULT, timezone); err != nil || aTime == nullTime {
- return true
- }
- return len(kev.GetUUID()) == 0 ||
- len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
- len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
- len(kev[HASH_ENTRY]) == 0 || len(kev[HASH_ID]) == 0
- case CGR_CALL_END:
- return len(kev.GetUUID()) == 0 ||
- len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
- len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
- len(kev[CGR_DURATION]) == 0
- default:
- return true
- }
-
-}
-
-// Useful for CDR generation
-func (kev KamEvent) ParseEventValue(rsrFld *utils.RSRField, timezone string) string {
- sTime, _ := kev.GetSetupTime(utils.META_DEFAULT, config.CgrConfig().DefaultTimezone)
- aTime, _ := kev.GetAnswerTime(utils.META_DEFAULT, config.CgrConfig().DefaultTimezone)
- duration, _ := kev.GetDuration(utils.META_DEFAULT)
- switch rsrFld.Id {
- case utils.CGRID:
- return rsrFld.ParseValue(kev.GetCgrId(timezone))
- case utils.TOR:
- return rsrFld.ParseValue(utils.VOICE)
- case utils.OriginID:
- return rsrFld.ParseValue(kev.GetUUID())
- case utils.OriginHost:
- return rsrFld.ParseValue(kev.GetOriginatorIP(utils.META_DEFAULT))
- case utils.Source:
- return rsrFld.ParseValue(kev.GetCdrSource())
- case utils.RequestType:
- return rsrFld.ParseValue(kev.GetReqType(utils.META_DEFAULT))
- case utils.Direction:
- return rsrFld.ParseValue(kev.GetDirection(utils.META_DEFAULT))
- case utils.Tenant:
- return rsrFld.ParseValue(kev.GetTenant(utils.META_DEFAULT))
- case utils.Category:
- return rsrFld.ParseValue(kev.GetCategory(utils.META_DEFAULT))
- case utils.Account:
- return rsrFld.ParseValue(kev.GetAccount(utils.META_DEFAULT))
- case utils.Subject:
- return rsrFld.ParseValue(kev.GetSubject(utils.META_DEFAULT))
- case utils.Destination:
- return rsrFld.ParseValue(kev.GetDestination(utils.META_DEFAULT))
- case utils.SetupTime:
- return rsrFld.ParseValue(sTime.String())
- case utils.AnswerTime:
- return rsrFld.ParseValue(aTime.String())
- case utils.Usage:
- return rsrFld.ParseValue(strconv.FormatFloat(utils.Round(duration.Seconds(), 0, utils.ROUNDING_MIDDLE), 'f', -1, 64))
- case utils.PDD:
- return rsrFld.ParseValue(strconv.FormatFloat(utils.Round(duration.Seconds(), 0, utils.ROUNDING_MIDDLE), 'f', -1, 64))
- case utils.SUPPLIER:
- return rsrFld.ParseValue(kev.GetSupplier(utils.META_DEFAULT))
- case utils.DISCONNECT_CAUSE:
- return rsrFld.ParseValue(kev.GetDisconnectCause(utils.META_DEFAULT))
- case utils.MEDI_RUNID:
- return rsrFld.ParseValue(utils.META_DEFAULT)
- case utils.COST:
- return rsrFld.ParseValue("-1.0")
- default:
- return rsrFld.ParseValue(kev.GetExtraFields()[rsrFld.Id])
- }
-}
-func (kev KamEvent) PassesFieldFilter(*utils.RSRField) (bool, string) {
- return false, ""
-}
-
-func (kev KamEvent) AsCDR(timezone string) *engine.CDR {
- storCdr := new(engine.CDR)
- storCdr.CGRID = kev.GetCgrId(timezone)
- storCdr.ToR = utils.VOICE
- storCdr.OriginID = kev.GetUUID()
- storCdr.OriginHost = kev.GetOriginatorIP(utils.META_DEFAULT)
- storCdr.Source = kev.GetCdrSource()
- storCdr.RequestType = kev.GetReqType(utils.META_DEFAULT)
- storCdr.Tenant = kev.GetTenant(utils.META_DEFAULT)
- storCdr.Category = kev.GetCategory(utils.META_DEFAULT)
- storCdr.Account = kev.GetAccount(utils.META_DEFAULT)
- storCdr.Subject = kev.GetSubject(utils.META_DEFAULT)
- storCdr.Destination = kev.GetDestination(utils.META_DEFAULT)
- storCdr.SetupTime, _ = kev.GetSetupTime(utils.META_DEFAULT, timezone)
- storCdr.AnswerTime, _ = kev.GetAnswerTime(utils.META_DEFAULT, timezone)
- storCdr.Usage, _ = kev.GetDuration(utils.META_DEFAULT)
- storCdr.ExtraFields = kev.GetExtraFields()
- storCdr.Cost = -1
-
- return storCdr
-}
-
-func (kev KamEvent) String() string {
- mrsh, _ := json.Marshal(kev)
- return string(mrsh)
-}
-
-func (kev KamEvent) AsKamAuthReply(maxSessionTime float64, suppliers string,
- resAllocated bool, allocationMessage string, rplyErr error) (kar *KamAuthReply, err error) {
- kar = &KamAuthReply{Event: CGR_AUTH_REPLY, Suppliers: suppliers,
- ResourceAllocated: resAllocated, AllocationMessage: allocationMessage}
- if rplyErr != nil {
- kar.Error = rplyErr.Error()
- }
- if _, hasIt := kev[KAM_TR_INDEX]; !hasIt {
- return nil, utils.NewErrMandatoryIeMissing(KAM_TR_INDEX, "")
- }
- if kar.TransactionIndex, err = strconv.Atoi(kev[KAM_TR_INDEX]); err != nil {
- return nil, err
- }
- if _, hasIt := kev[KAM_TR_LABEL]; !hasIt {
- return nil, utils.NewErrMandatoryIeMissing(KAM_TR_LABEL, "")
- }
- if kar.TransactionLabel, err = strconv.Atoi(kev[KAM_TR_LABEL]); err != nil {
- return nil, err
- }
- if maxSessionTime != -1 { // Convert maxSessionTime from nanoseconds into seconds
- maxSessionDur := time.Duration(maxSessionTime)
- maxSessionTime = maxSessionDur.Seconds()
- }
- kar.MaxSessionTime = int(utils.Round(maxSessionTime, 0, utils.ROUNDING_MIDDLE))
-
- return kar, nil
-}
-
-// Converts into CallDescriptor due to responder interface needs
-func (kev KamEvent) AsCallDescriptor() (*engine.CallDescriptor, error) {
- lcrReq := &engine.LcrRequest{
- Direction: kev.GetDirection(utils.META_DEFAULT),
- Tenant: kev.GetTenant(utils.META_DEFAULT),
- Category: kev.GetCategory(utils.META_DEFAULT),
- Account: kev.GetAccount(utils.META_DEFAULT),
- Subject: kev.GetSubject(utils.META_DEFAULT),
- Destination: kev.GetDestination(utils.META_DEFAULT),
- SetupTime: utils.FirstNonEmpty(kev[CGR_SETUPTIME], kev[CGR_ANSWERTIME]),
- Duration: kev[CGR_DURATION],
- }
- return lcrReq.AsCallDescriptor(config.CgrConfig().DefaultTimezone)
-}
-
-func (kev KamEvent) ComputeLcr() bool {
- if computeLcr, err := strconv.ParseBool(kev[utils.CGR_COMPUTELCR]); err != nil {
- return false
- } else {
- return computeLcr
- }
-}
-
-func (kev KamEvent) AsMapStringIface() (mp map[string]interface{}, err error) {
- mp = make(map[string]interface{}, len(kev))
- for k, v := range kev {
- mp[k] = v
- }
- return
-}
diff --git a/sessionmanager/kamevent_test.go b/sessionmanager/kamevent_test.go
deleted file mode 100644
index 7a068c94a..000000000
--- a/sessionmanager/kamevent_test.go
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-package sessionmanager
-
-import (
- "reflect"
- "testing"
- "time"
-
- "github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
-)
-
-var kamEv = KamEvent{KAM_TR_INDEX: "29223", KAM_TR_LABEL: "698469260", "callid": "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ", "from_tag": "eb082607", "to_tag": "4ea9687f", "cgr_account": "dan",
- "cgr_reqtype": utils.META_PREPAID, "cgr_subject": "dan", "cgr_destination": "+4986517174963", "cgr_tenant": "itsyscom.com",
- "cgr_duration": "20", utils.CGR_SUPPLIER: "suppl2", utils.CGR_DISCONNECT_CAUSE: "200", "extra1": "val1", "extra2": "val2"}
-
-func TestKamailioEventInterface(t *testing.T) {
- var _ engine.Event = engine.Event(kamEv)
-}
-
-func TestNewKamEvent(t *testing.T) {
- evStr := `{"event":"CGR_CALL_END",
- "callid":"46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0",
- "from_tag":"bf71ad59",
- "to_tag":"7351fecf",
- "cgr_reqtype":"*postpaid",
- "cgr_account":"1001",
- "cgr_destination":"1002",
- "cgr_answertime":"1419839310",
- "cgr_duration":"3",
- "cgr_supplier":"supplier2",
- "cgr_disconnectcause": "200",
- "cgr_pdd": "4"}`
- eKamEv := KamEvent{"event": "CGR_CALL_END", "callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0", "from_tag": "bf71ad59", "to_tag": "7351fecf",
- "cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001", "cgr_destination": "1002", "cgr_answertime": "1419839310", "cgr_duration": "3", CGR_PDD: "4",
- utils.CGR_SUPPLIER: "supplier2",
- utils.CGR_DISCONNECT_CAUSE: "200"}
- if kamEv, err := NewKamEvent([]byte(evStr)); err != nil {
- t.Error(err)
- } else if !reflect.DeepEqual(eKamEv, kamEv) {
- t.Error("Received: ", kamEv)
- }
-}
-
-func TestKevAsKamAuthReply(t *testing.T) {
- expectedKar := &KamAuthReply{Event: CGR_AUTH_REPLY, TransactionIndex: 29223, TransactionLabel: 698469260,
- MaxSessionTime: 1200, ResourceAllocated: true, Suppliers: "supplier1,supplier2"}
- if rcvKar, err := kamEv.AsKamAuthReply(1200000000000.0, "supplier1,supplier2", true, "", nil); err != nil {
- t.Error(err)
- } else if !reflect.DeepEqual(expectedKar, rcvKar) {
- t.Error("Received KAR: ", rcvKar)
- }
-}
-
-func TestKevMissingParameter(t *testing.T) {
- kamEv := KamEvent{"event": "CGR_AUTH_REQUEST", "tr_index": "36045", "tr_label": "612369399", "cgr_reqtype": utils.META_POSTPAID,
- "cgr_account": "1001", "cgr_destination": "1002"}
- if !kamEv.MissingParameter("") {
- t.Error("Failed detecting missing parameters")
- }
- kamEv["cgr_setuptime"] = "1419962256"
- if kamEv.MissingParameter("") {
- t.Error("False detecting missing parameters")
- }
- kamEv = KamEvent{"event": "UNKNOWN"}
- if !kamEv.MissingParameter("") {
- t.Error("Failed detecting missing parameters")
- }
- kamEv = KamEvent{"event": CGR_LCR_REQUEST, "tr_index": "36045", "tr_label": "612369399", "cgr_reqtype": utils.META_POSTPAID,
- "cgr_account": "1001"}
- if !kamEv.MissingParameter("") {
- t.Error("Failed detecting missing parameters")
- }
- kamEv = KamEvent{"event": CGR_LCR_REQUEST, CGR_ACCOUNT: "1001", CGR_DESTINATION: "1002", "tr_index": "36045", "tr_label": "612369399"}
- if kamEv.MissingParameter("") {
- t.Error("False detecting missing parameters")
- }
- kamEv = KamEvent{"event": "CGR_CALL_START", "callid": "9d28ec3ee068babdfe036623f42c0969@0:0:0:0:0:0:0:0", "from_tag": "3131b566",
- "cgr_reqtype": utils.META_POSTPAID, "cgr_account": "1001", "cgr_destination": "1002"}
- if !kamEv.MissingParameter("") {
- t.Error("Failed detecting missing parameters")
- }
- kamEv["h_entry"] = "463"
- kamEv["h_id"] = "2605"
- kamEv["cgr_answertime"] = "1419964961"
- if kamEv.MissingParameter("") {
- t.Error("False detecting missing parameters")
- }
-}
-
-func TestKevAsCallDescriptor(t *testing.T) {
- sTime := time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC)
- kamEv := KamEvent{"event": CGR_LCR_REQUEST, CGR_ACCOUNT: "1001", CGR_DESTINATION: "1002", CGR_SETUPTIME: sTime.String()}
- eCd := &engine.CallDescriptor{
- Direction: utils.OUT,
- Tenant: config.CgrConfig().DefaultTenant,
- Category: config.CgrConfig().DefaultCategory,
- Account: kamEv[CGR_ACCOUNT],
- Subject: kamEv[CGR_ACCOUNT],
- Destination: kamEv[CGR_DESTINATION],
- TimeStart: sTime,
- TimeEnd: sTime.Add(time.Duration(1) * time.Minute),
- }
-
- if cd, err := kamEv.AsCallDescriptor(); err != nil {
- t.Error(err)
- } else if !reflect.DeepEqual(eCd, cd) {
- t.Errorf("Expecting: %+v, received: %+v", eCd, cd)
- }
-}
diff --git a/sessionmanager/osipsevent.go b/sessionmanager/osipsevent.go
index d246d2ee3..152769f87 100644
--- a/sessionmanager/osipsevent.go
+++ b/sessionmanager/osipsevent.go
@@ -53,6 +53,11 @@ const (
OSIPS_INSUFFICIENT_FUNDS = "INSUFFICIENT_FUNDS"
OSIPS_DIALOG_ID = "dialog_id"
OSIPS_SIPCODE = "sip_code"
+ CGR_SETUPTIME = "cgr_setuptime"
+ CGR_ANSWERTIME = "cgr_answertime"
+ CGR_STOPTIME = "cgr_stoptime"
+ CGR_DURATION = "cgr_duration"
+ CGR_PDD = "cgr_pdd"
)
func NewOsipsEvent(osipsDagramEvent *osipsdagram.OsipsEvent) (*OsipsEvent, error) {
diff --git a/sessionmanager/smg_event_test.go b/sessionmanager/smg_event_test.go
index e67f668e0..6252713cf 100644
--- a/sessionmanager/smg_event_test.go
+++ b/sessionmanager/smg_event_test.go
@@ -35,7 +35,6 @@ func TestSMGenericEventParseFields(t *testing.T) {
smGev[utils.EVENT_NAME] = "TEST_EVENT"
smGev[utils.TOR] = "*voice"
smGev[utils.OriginID] = "12345"
- smGev[utils.Direction] = "*out"
smGev[utils.Account] = "account1"
smGev[utils.Subject] = "subject1"
smGev[utils.Destination] = "+4986517174963"
@@ -46,9 +45,6 @@ func TestSMGenericEventParseFields(t *testing.T) {
smGev[utils.AnswerTime] = "2015-11-09 14:22:02"
smGev[utils.Usage] = "1m23s"
smGev[utils.LastUsed] = "21s"
- smGev[utils.PDD] = "300ms"
- smGev[utils.SUPPLIER] = "supplier1"
- smGev[utils.DISCONNECT_CAUSE] = "NORMAL_DISCONNECT"
smGev[utils.OriginHost] = "127.0.0.1"
smGev["Extra1"] = "Value1"
smGev["Extra2"] = 5
@@ -64,9 +60,6 @@ func TestSMGenericEventParseFields(t *testing.T) {
if !reflect.DeepEqual(smGev.GetSessionIds(), []string{"12345"}) {
t.Error("Unexpected: ", smGev.GetSessionIds())
}
- if smGev.GetDirection(utils.META_DEFAULT) != "*out" {
- t.Error("Unexpected: ", smGev.GetDirection(utils.META_DEFAULT))
- }
if smGev.GetTOR(utils.META_DEFAULT) != "*voice" {
t.Error("Unexpected: ", smGev.GetTOR(utils.META_DEFAULT))
}
@@ -113,21 +106,11 @@ func TestSMGenericEventParseFields(t *testing.T) {
} else if lastUsed != time.Duration(21)*time.Second {
t.Error("Unexpected: ", lastUsed)
}
- if pdd, err := smGev.GetPdd(utils.META_DEFAULT); err != nil {
- t.Error(err)
- } else if pdd != time.Duration(300)*time.Millisecond {
- t.Error("Unexpected: ", pdd)
- }
- if smGev.GetSupplier(utils.META_DEFAULT) != "supplier1" {
- t.Error("Unexpected: ", smGev.GetSupplier(utils.META_DEFAULT))
- }
- if smGev.GetDisconnectCause(utils.META_DEFAULT) != "NORMAL_DISCONNECT" {
- t.Error("Unexpected: ", smGev.GetDisconnectCause(utils.META_DEFAULT))
- }
if smGev.GetOriginatorIP(utils.META_DEFAULT) != "127.0.0.1" {
t.Error("Unexpected: ", smGev.GetOriginatorIP(utils.META_DEFAULT))
}
- if extrFlds := smGev.GetExtraFields(); !reflect.DeepEqual(extrFlds, map[string]string{"Extra1": "Value1", "Extra2": "5", "LastUsed": "21s"}) {
+ if extrFlds := smGev.GetExtraFields(); !reflect.DeepEqual(extrFlds,
+ map[string]string{"Extra1": "Value1", "Extra2": "5", "LastUsed": "21s"}) {
t.Error("Unexpected: ", extrFlds)
}
}
@@ -155,7 +138,6 @@ func TestSMGenericEventAsCDR(t *testing.T) {
smGev[utils.EVENT_NAME] = "TEST_EVENT"
smGev[utils.TOR] = utils.SMS
smGev[utils.OriginID] = "12345"
- smGev[utils.Direction] = utils.OUT
smGev[utils.Account] = "account1"
smGev[utils.Subject] = "subject1"
smGev[utils.Destination] = "+4986517174963"
@@ -165,9 +147,6 @@ func TestSMGenericEventAsCDR(t *testing.T) {
smGev[utils.SetupTime] = "2015-11-09 14:21:24"
smGev[utils.AnswerTime] = "2015-11-09 14:22:02"
smGev[utils.Usage] = "1m23s"
- smGev[utils.PDD] = "300ms"
- smGev[utils.SUPPLIER] = "supplier1"
- smGev[utils.DISCONNECT_CAUSE] = "NORMAL_DISCONNECT"
smGev[utils.OriginHost] = "10.0.3.15"
smGev["Extra1"] = "Value1"
smGev["Extra2"] = 5
diff --git a/utils/consts.go b/utils/consts.go
index e03c65932..f969dfc49 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -20,8 +20,8 @@ package utils
var (
CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap}
- PrimaryCdrFields = []string{CGRID, Source, OriginHost, OriginID, TOR, RequestType, Direction, Tenant, Category, Account, Subject, Destination, SetupTime, PDD, AnswerTime, Usage,
- SUPPLIER, DISCONNECT_CAUSE, COST, RATED, PartialField, MEDI_RUNID}
+ PrimaryCdrFields = []string{CGRID, Source, OriginHost, OriginID, TOR, RequestType, Tenant, Category, Account, Subject, Destination, SetupTime, AnswerTime, Usage,
+ COST, RATED, PartialField, MEDI_RUNID}
GitLastLog string // If set, it will be processed as part of versioning
PosterTransportContentTypes = map[string]string{
MetaHTTPjsonCDR: CONTENT_JSON,
@@ -377,6 +377,8 @@ const (
MetaThresholds = "*thresholds"
MetaSuppliers = "*suppliers"
MetaAttributes = "*attributes"
+ MetaResources = "*resources"
+ MetaCDRs = "*cdrs"
Migrator = "migrator"
UnsupportedMigrationTask = "unsupported migration task"
NoStorDBConnection = "not connected to StorDB"
@@ -499,6 +501,7 @@ const (
MetaSessionS = "*sessions"
FreeSWITCHAgent = "FreeSWITCHAgent"
MetaDefault = "*default"
+ KamailioAgent = "KamailioAgent"
)
//MetaMetrics