Update Asterisk Agent

This commit is contained in:
TeoV
2018-05-11 11:30:24 -04:00
committed by Dan Christian Bogos
parent d2c4d481f6
commit 64426affc4
6 changed files with 428 additions and 116 deletions

View File

@@ -21,6 +21,7 @@ package agents
import (
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
@@ -227,6 +228,7 @@ func (smaEv *SMAsteriskEvent) UpdateSMGEvent(smgEv *sessions.SMGenericEvent) err
if _, hasIt := resSMGEv[utils.AnswerTime]; !hasIt {
resSMGEv[utils.Usage] = "0s"
} else {
if aTime, err := smgEv.GetAnswerTime(utils.META_DEFAULT, ""); err != nil {
return err
} else if aTime.IsZero() {
@@ -243,3 +245,195 @@ func (smaEv *SMAsteriskEvent) UpdateSMGEvent(smgEv *sessions.SMGenericEvent) err
*smgEv = resSMGEv
return nil
}
func (smaEv *SMAsteriskEvent) UpdateCGREvent(cgrEv *utils.CGREvent) error {
resCGREv := *cgrEv
switch smaEv.EventType() {
case ARIChannelStateChange:
if smaEv.ChannelState() == channelUp {
resCGREv.Event[utils.EVENT_NAME] = SMASessionStart
resCGREv.Event[utils.AnswerTime] = smaEv.Timestamp()
}
case ARIChannelDestroyed:
resCGREv.Event[utils.EVENT_NAME] = SMASessionTerminate
resCGREv.Event[utils.DISCONNECT_CAUSE] = smaEv.DisconnectCause()
if _, hasIt := resCGREv.Event[utils.AnswerTime]; !hasIt {
resCGREv.Event[utils.Usage] = "0s"
} else {
if aTime, err := utils.IfaceAsTime(resCGREv.Event[utils.AnswerTime], config.CgrConfig().DefaultTimezone); err != nil {
return err
} else if aTime.IsZero() {
resCGREv.Event[utils.Usage] = "0s"
} else {
actualTime, err := utils.ParseTimeDetectLayout(smaEv.Timestamp(), "")
if err != nil {
return err
}
resCGREv.Event[utils.Usage] = actualTime.Sub(aTime).String()
}
}
}
*cgrEv = resCGREv
return nil
}
func (smaEv *SMAsteriskEvent) AsMapStringInterface() (mp map[string]interface{}) {
mp = make(map[string]interface{})
var evName string
switch smaEv.EventType() {
case ARIStasisStart:
evName = SMAAuthorization
case ARIChannelStateChange:
evName = SMASessionStart
case ARIChannelDestroyed:
evName = SMASessionTerminate
}
mp[utils.EVENT_NAME] = evName
mp[utils.OriginID] = smaEv.ChannelID()
if smaEv.RequestType() != "" {
mp[utils.RequestType] = smaEv.RequestType()
}
if smaEv.Tenant() != "" {
mp[utils.Tenant] = smaEv.Tenant()
}
if smaEv.Category() != "" {
mp[utils.Category] = smaEv.Category()
}
if smaEv.Subject() != "" {
mp[utils.Subject] = smaEv.Subject()
}
mp[utils.OriginHost] = smaEv.OriginatorIP()
mp[utils.Account] = smaEv.Account()
mp[utils.Destination] = smaEv.Destination()
mp[utils.SetupTime] = smaEv.Timestamp()
if smaEv.Supplier() != "" {
mp[utils.SUPPLIER] = smaEv.Supplier()
}
for extraKey, extraVal := range smaEv.ExtraParameters() { // Append extraParameters
mp[extraKey] = extraVal
}
return
}
// AsCDR converts KamEvent into CGREvent
func (smaEv *SMAsteriskEvent) AsCGREvent(timezone string) (cgrEv *utils.CGREvent, err error) {
setupTime, err := utils.ParseTimeDetectLayout(
smaEv.Timestamp(), timezone)
if err != nil {
return
}
cgrEv = &utils.CGREvent{
Tenant: utils.FirstNonEmpty(smaEv.Tenant(),
config.CgrConfig().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Time: &setupTime,
Event: smaEv.AsMapStringInterface(),
}
return cgrEv, nil
}
func (smaEv *SMAsteriskEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) {
cgrEv, err := smaEv.AsCGREvent(config.CgrConfig().DefaultTimezone)
if err != nil {
return
}
args = &sessions.V1AuthorizeArgs{
GetMaxUsage: true,
CGREvent: *cgrEv,
}
// For the moment hardcoded only GetMaxUsage : true
/*
subsystems, has := kev[KamCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.GetMaxUsage = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AuthorizeResources = true
}
if strings.Index(subsystems, utils.MetaSuppliers) != -1 {
args.GetSuppliers = true
if strings.Index(subsystems, utils.MetaSuppliersEventCost) != -1 {
args.SuppliersMaxCost = utils.MetaEventCost
}
if strings.Index(subsystems, utils.MetaSuppliersIgnoreErrors) != -1 {
args.SuppliersIgnoreErrors = true
}
}
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
}
*/
return
}
func (smaEv *SMAsteriskEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) {
cgrEv, err := smaEv.AsCGREvent(config.CgrConfig().DefaultTimezone)
if err != nil {
return
}
args = &sessions.V1InitSessionArgs{ // defaults
InitSession: true,
CGREvent: *cgrEv,
}
/*
subsystems, has := kev[KamCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.InitSession = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AllocateResources = true
}
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
}
*/
return
}
func (smaEv *SMAsteriskEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionArgs) {
cgrEv, err := smaEv.AsCGREvent(config.CgrConfig().DefaultTimezone)
if err != nil {
return
}
args = &sessions.V1TerminateSessionArgs{ // defaults
TerminateSession: true,
CGREvent: *cgrEv,
}
/*
subsystems, has := kev[KamCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.TerminateSession = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.ReleaseResources = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
}
*/
return
}

View File

@@ -48,30 +48,33 @@ const (
SMASessionTerminate = "SMA_SESSION_TERMINATE"
)
func NewSMAsterisk(cgrCfg *config.CGRConfig, astConnIdx int, smgConn *utils.BiRPCInternalClient) (*SMAsterisk, error) {
sma := &SMAsterisk{cgrCfg: cgrCfg, smg: smgConn,
eventsCache: make(map[string]*sessions.SMGenericEvent)}
func NewAsteriskAgent(cgrCfg *config.CGRConfig, astConnIdx int,
smgConn *utils.BiRPCInternalClient) (*AsteriskAgent, error) {
sma := &AsteriskAgent{cgrCfg: cgrCfg, smg: smgConn,
eventsCache: make(map[string]*utils.CGREvent)}
sma.smg.SetClientConn(sma) // pass the connection to SMA back into smg so we can receive the disconnects
return sma, nil
}
type SMAsterisk struct {
type AsteriskAgent struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
astConnIdx int
smg *utils.BiRPCInternalClient
astConn *aringo.ARInGO
astEvChan chan map[string]interface{}
astErrChan chan error
eventsCache map[string]*sessions.SMGenericEvent // used to gather information about events during various phases
evCacheMux sync.RWMutex // Protect eventsCache
eventsCache map[string]*utils.CGREvent // used to gather information about events during various phases
evCacheMux sync.RWMutex // Protect eventsCache
}
func (sma *SMAsterisk) connectAsterisk() (err error) {
func (sma *AsteriskAgent) connectAsterisk() (err error) {
connCfg := sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx]
sma.astEvChan = make(chan map[string]interface{})
sma.astErrChan = make(chan error)
sma.astConn, err = aringo.NewARInGO(fmt.Sprintf("ws://%s/ari/events?api_key=%s:%s&app=%s", connCfg.Address, connCfg.User, connCfg.Password, CGRAuthAPP), "http://cgrates.org",
connCfg.User, connCfg.Password, fmt.Sprintf("%s %s", utils.CGRateS, utils.VERSION), sma.astEvChan, sma.astErrChan, connCfg.ConnectAttempts, connCfg.Reconnects)
sma.astConn, err = aringo.NewARInGO(fmt.Sprintf("ws://%s/ari/events?api_key=%s:%s&app=%s",
connCfg.Address, connCfg.User, connCfg.Password, CGRAuthAPP), "http://cgrates.org",
connCfg.User, connCfg.Password, fmt.Sprintf("%s %s", utils.CGRateS, utils.VERSION),
sma.astEvChan, sma.astErrChan, connCfg.ConnectAttempts, connCfg.Reconnects)
if err != nil {
return err
}
@@ -79,7 +82,7 @@ func (sma *SMAsterisk) connectAsterisk() (err error) {
}
// Called to start the service
func (sma *SMAsterisk) ListenAndServe() (err error) {
func (sma *AsteriskAgent) ListenAndServe() (err error) {
if err := sma.connectAsterisk(); err != nil {
return err
}
@@ -88,7 +91,8 @@ func (sma *SMAsterisk) ListenAndServe() (err error) {
case err = <-sma.astErrChan:
return
case astRawEv := <-sma.astEvChan:
smAsteriskEvent := NewSMAsteriskEvent(astRawEv, strings.Split(sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ":")[0])
smAsteriskEvent := NewSMAsteriskEvent(astRawEv,
strings.Split(sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ":")[0])
switch smAsteriskEvent.EventType() {
case ARIStasisStart:
go sma.handleStasisStart(smAsteriskEvent)
@@ -99,52 +103,71 @@ func (sma *SMAsterisk) ListenAndServe() (err error) {
}
}
}
panic("<SMAsterisk> ListenAndServe out of select")
panic("<AsteriskAgent> ListenAndServe out of select")
}
// hangupChannel will disconnect from CGRateS side with congestion reason
func (sma *SMAsterisk) hangupChannel(channelID string) (err error) {
func (sma *AsteriskAgent) hangupChannel(channelID string) (err error) {
_, err = sma.astConn.Call(aringo.HTTP_DELETE, fmt.Sprintf("http://%s/ari/channels/%s",
sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, channelID),
url.Values{"reason": {"congestion"}})
return
}
func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) {
func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) {
// Subscribe for channel updates even after we leave Stasis
if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("http://%s/ari/applications/%s/subscription?eventSource=channel:%s",
sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, CGRAuthAPP, ev.ChannelID()), nil); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when subscribing to events for channelID: %s", err.Error(), ev.ChannelID()))
if _, err := sma.astConn.Call(aringo.HTTP_POST,
fmt.Sprintf("http://%s/ari/applications/%s/subscription?eventSource=channel:%s",
sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address,
CGRAuthAPP, ev.ChannelID()), nil); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when subscribingto events for channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
// Since we got error, disconnect channel
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
}
return
}
// Query the SMG via RPC for maxUsage
var maxUsage float64
smgEv := ev.AsSMGenericEvent()
if err := sma.smg.Call("SMGenericV1.GetMaxUsage", *smgEv, &maxUsage); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to authorize session for channelID: %s", err.Error(), ev.ChannelID()))
//authorize Session
authArgs := ev.V1AuthorizeArgs()
if authArgs == nil {
utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate auth session arguments",
utils.AsteriskAgent, ev.ChannelID()))
return
}
// var maxUsage float64
// smgEv := ev.AsSMGenericEvent()
var authReply sessions.V1AuthorizeReply
if err := sma.smg.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to authorize session for channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
}
return
}
if maxUsage == 0 {
if authReply.MaxUsage != nil && *authReply.MaxUsage == time.Duration(0) {
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
}
return
} else if maxUsage != -1 {
} else if *authReply.MaxUsage != time.Duration(-1) {
// Set absolute timeout for non-postpaid calls
if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("http://%s/ari/channels/%s/variable?variable=%s", // Asterisk having issue with variable terminating empty so harcoding param in url
sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ev.ChannelID(), CGRMaxSessionTime),
url.Values{"value": {strconv.FormatFloat(maxUsage*1000, 'f', -1, 64)}}); err != nil { // Asterisk expects value in ms
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when setting %s for channelID: %s", err.Error(), CGRMaxSessionTime, ev.ChannelID()))
if _, err := sma.astConn.Call(aringo.HTTP_POST,
fmt.Sprintf("http://%s/ari/channels/%s/variable?variable=%s", // Asterisk having issue with variable terminating empty so harcoding param in url
sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address,
ev.ChannelID(), CGRMaxSessionTime),
url.Values{"value": {strconv.FormatFloat(
float64(authReply.MaxUsage.Nanoseconds())*1000, 'f', -1, 64)}}); err != nil { // Asterisk expects value in ms
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when setting %s for channelID: %s",
utils.AsteriskAgent, err.Error(), CGRMaxSessionTime, ev.ChannelID()))
// Since we got error, disconnect channel
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
}
return
}
@@ -152,108 +175,141 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) {
// Exit channel from stasis
if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("http://%s/ari/channels/%s/continue",
sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ev.ChannelID()), nil); err != nil {
sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address,
ev.ChannelID()), nil); err != nil {
}
// Done with processing event, cache it for later use
sma.evCacheMux.Lock()
sma.eventsCache[ev.ChannelID()] = smgEv
sma.eventsCache[ev.ChannelID()] = &authArgs.CGREvent
sma.evCacheMux.Unlock()
}
// Ussually channelUP
func (sma *SMAsterisk) handleChannelStateChange(ev *SMAsteriskEvent) {
func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) {
if ev.ChannelState() != channelUp {
return
}
sma.evCacheMux.RLock()
smgEv, hasIt := sma.eventsCache[ev.ChannelID()]
_, hasIt := sma.eventsCache[ev.ChannelID()]
sma.evCacheMux.RUnlock()
if !hasIt { // Not handled by us
return
}
sma.evCacheMux.Lock()
err := ev.UpdateSMGEvent(smgEv) // Updates the event directly in the cache
sma.evCacheMux.Unlock()
if err != nil {
initSessionArgs := ev.V1InitSessionArgs()
if initSessionArgs == nil {
utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate init session arguments",
utils.AsteriskAgent, ev.ChannelID()))
return
}
//initit Session
var initReply sessions.V1InitSessionReply
if err := sma.smg.Call(utils.SessionSv1InitiateSession,
initSessionArgs, &initReply); err != nil {
utils.Logger.Err(
fmt.Sprintf("<SMAsterisk> Error: %s when attempting to initiate session for channelID: %s",
err.Error(), ev.ChannelID()))
fmt.Sprintf("<%s> Error: %s when attempting to initiate session for channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(
fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s",
err.Error(), ev.ChannelID()))
fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
}
return
} else if initReply.MaxUsage != nil && *initReply.MaxUsage == time.Duration(0) {
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
}
return
}
var maxUsage time.Duration
if err := sma.smg.Call(utils.SMGenericV2InitiateSession,
*smgEv, &maxUsage); err != nil {
sma.evCacheMux.Lock()
err := ev.UpdateCGREvent(&initSessionArgs.CGREvent) // Updates the event directly in the cache
sma.evCacheMux.Unlock()
if err != nil {
utils.Logger.Err(
fmt.Sprintf("<SMAsterisk> Error: %s when attempting to initiate session for channelID: %s",
err.Error(), ev.ChannelID()))
fmt.Sprintf("<%s> Error: %s when attempting to initiate session for channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(
fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s",
err.Error(), ev.ChannelID()))
}
return
} else if maxUsage == 0 {
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(
fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s",
err.Error(), ev.ChannelID()))
fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
}
return
}
}
// Channel disconnect
func (sma *SMAsterisk) handleChannelDestroyed(ev *SMAsteriskEvent) {
func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) {
sma.evCacheMux.RLock()
smgEv, hasIt := sma.eventsCache[ev.ChannelID()]
_, hasIt := sma.eventsCache[ev.ChannelID()]
sma.evCacheMux.RUnlock()
if !hasIt { // Not handled by us
return
}
sma.evCacheMux.Lock()
err := ev.UpdateSMGEvent(smgEv) // Updates the event directly in the cache
sma.evCacheMux.Unlock()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
//terminate session
tsArgs := ev.V1TerminateSessionArgs()
if tsArgs == nil {
utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate terminate session arguments",
utils.AsteriskAgent, ev.ChannelID()))
return
}
var reply string
if err := sma.smg.Call("SMGenericV1.TerminateSession", *smgEv, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to terminate session for channelID: %s", err.Error(), ev.ChannelID()))
if err := sma.smg.Call(utils.SessionSv1TerminateSession,
tsArgs, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to terminate session for channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
}
if sma.cgrCfg.AsteriskAgentCfg().CreateCDR {
if err := sma.smg.Call("SMGenericV1.ProcessCDR", *smgEv, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to process CDR for channelID: %s", err.Error(), ev.ChannelID()))
setupTime, err := utils.ParseTimeDetectLayout(
ev.SetupTime(), config.CgrConfig().DefaultTimezone)
if err != nil {
return
}
cgrEv := utils.CGREvent{
Tenant: ev.Tenant(),
ID: utils.UUIDSha1Prefix(),
Time: &setupTime,
Event: ev.AsMapStringInterface(),
}
if err := sma.smg.Call(utils.SessionSv1ProcessCDR, cgrEv, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to process CDR for channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
}
}
sma.evCacheMux.Lock()
err := ev.UpdateCGREvent(&tsArgs.CGREvent) // Updates the event directly in the cache
sma.evCacheMux.Unlock()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to initiate session for channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s",
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
}
return
}
}
// Called to shutdown the service
func (sma *SMAsterisk) ServiceShutdown() error {
func (sma *AsteriskAgent) ServiceShutdown() error {
return nil
}
// Internal method to disconnect session in asterisk
func (sma *SMAsterisk) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) error {
func (sma *AsteriskAgent) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) error {
channelID := sessions.SMGenericEvent(args.EventStart).GetOriginID(utils.META_DEFAULT)
if err := sma.hangupChannel(channelID); err != nil {
utils.Logger.Err(
fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s",
err.Error(), channelID))
fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s",
utils.AsteriskAgent, err.Error(), channelID))
}
*reply = utils.OK
return nil
}
// rpcclient.RpcClientConnection interface
func (sma *SMAsterisk) Call(serviceMethod string, args interface{}, reply interface{}) error {
func (sma *AsteriskAgent) Call(serviceMethod string, args interface{}, reply interface{}) error {
return utils.RPCCall(sma, serviceMethod, args, reply)
}

View File

@@ -241,7 +241,7 @@ func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit
internalSMGChan <- smgRpcConn
birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SMGeneric))
for connIdx := range cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers
sma, err := agents.NewSMAsterisk(cfg, connIdx, birpcClnt)
sma, err := agents.NewAsteriskAgent(cfg, connIdx, birpcClnt)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> error: %s!", err))
exitChan <- true

View File

@@ -1,7 +1,7 @@
[simpletrans]
type=transport
protocol=udp
bind=0.0.0.0
bind=192.168.56.203
[1001]
type = endpoint

View File

@@ -24,44 +24,70 @@
},
"stor_db": { // database used to store offline tariff plans and CDRs
"db_password": "CGRateS.org", // password to use when connecting to stordb
},
"rals": {
"enabled": true,
"cdrstats_conns": [
{"address": "*internal"}
"thresholds_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"stats_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"pubsubs_conns": [
{"address": "*internal"}
],
"users_conns": [
{"address": "*internal"}
],
"aliases_conns": [
{"address": "*internal"}
"attributes_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
},
"stor_db": { // database used to store offline tariff plans and CDRs
"db_password": "CGRateS.org", // password to use when connecting to stordb
"cdrs": {
"enabled": true,
"sessions_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"stats_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"sessions_cost_retries": 5,
},
"sessions": {
"enabled": true,
"rals_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"cdrs_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"resources_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"suppliers_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"attributes_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"stats_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"thresholds_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"debit_interval": "10s",
},
"scheduler": {
"enabled": true,
},
"cdrs": {
"enabled": true, // start the CDR Server service: <true|false>
"cdrstats_conns": [
{"address": "*internal"}
],
},
"cdrstats": {
"enabled": true, // starts the cdrstats service: <true|false>
},
"cdre": {
"*default": {
"cdr_format": "csv", // exported CDRs format <csv>
@@ -125,33 +151,68 @@
},
"sessions": {
"enabled": true,
"debit_interval": "5s", // interval to perform debits on.
},
"sm_asterisk": {
"enabled": true, // starts Asterisk SessionManager service: <true|false>
"asterisk_agent": {
"enabled": true,
"sessions_conns": [
{"address": "*internal"}
],
"create_cdr": true,
"asterisk_conns":[ // instantiate connections to multiple Asterisk servers
"asterisk_conns":[
{"address": "127.0.0.1:8088", "user": "cgrates", "password": "CGRateS.org", "connect_attempts": 3,"reconnects": 10}
],
},
"pubsubs": {
"enabled": true, // starts PubSub service: <true|false>.
"enabled": true,
},
"aliases": {
"enabled": true, // starts PubSub service: <true|false>.
"attributes": {
"enabled": true,
"string_indexed_fields": ["Account"],
},
"users": {
"enabled": true, // starts User service: <true|false>.
"indexes": ["Uuid"], // user profile field indexes
"resources": {
"enabled": true,
"thresholds_conns": [
{"address": "*internal"}
],
"string_indexed_fields": ["Account"],
"prefix_indexed_fields": ["Destination"],
},
"stats": {
"enabled": true,
"thresholds_conns": [
{"address": "*internal"}
],
"string_indexed_fields": ["Account"],
},
"thresholds": {
"enabled": true,
"string_indexed_fields": ["Account"],
},
"suppliers": {
"enabled": true,
"rals_conns": [
{"address": "*internal"}
],
"resources_conns": [
{"address": "*internal"}
],
"stats_conns": [
{"address": "*internal"}
],
"string_indexed_fields": ["Account"],
"prefix_indexed_fields": ["Destination"],
},
}

View File

@@ -842,6 +842,7 @@ const (
RadiusAgent = "RadiusAgent"
DiameterAgent = "DiameterAgent"
FreeSWITCHAgent = "FreeSWITCHAgent"
AsteriskAgent = "AsteriskAgent"
)
func buildCacheInstRevPrefixes() {