SMAsterisk send init to SMGeneric and process answer

This commit is contained in:
DanB
2016-09-14 20:27:22 +02:00
parent 7fe5a0feba
commit 49696b89dd
10 changed files with 68 additions and 41 deletions

View File

@@ -18,7 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package cache2go
import (
"fmt"
"strings"
"sync"
@@ -184,7 +183,6 @@ func (cs lrustore) Put(key string, value interface{}) {
var err error
mp, err = lru.New(10000)
if err != nil {
utils.Logger.Debug(fmt.Sprintf("<cache>: error at init: %v", err))
return
}
cs[prefix] = mp

View File

@@ -19,11 +19,8 @@ package cache2go
import (
"container/list"
"fmt"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
)
// Cache is an LRU cache.
@@ -172,8 +169,6 @@ func (c *Cache) removeElement(e *list.Element) {
if e.Value != nil {
kv := e.Value.(*entry)
delete(c.cache, kv.key)
} else {
utils.Logger.Debug(fmt.Sprintf("<lruttl_cache>: nil element: %+v", e))
}
}

View File

@@ -1,6 +1,8 @@
[internal]
exten => _1XXX,1,NoOp()
same => n,Set(CGRMaxSessionTime=0); use it to disconnect automatically the call if bridge is not active
same => n,DumpChan()
same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_destination=${EXTEN})
same => n,Dial(PJSIP/${EXTEN},30)
same => n,Dial(PJSIP/${EXTEN},30,L(${CGRMaxSessionTime}))
same => n,Hangup()

View File

@@ -15,6 +15,7 @@ load = app_voicemail.so
load = app_directory.so
load = app_confbridge.so
load = app_queue.so
load = app_dumpchan.so
; Bridging

View File

@@ -201,7 +201,6 @@ func (cd *CallDescriptor) LoadRatingPlans() (err error) {
if err == utils.ErrNotFound && rec == 1 {
//if err != nil || !cd.continousRatingInfos() {
// use the default subject only if the initial one was not found
//utils.Logger.Debug(fmt.Sprintf("Try the default subject for %s and account: %s, subject: %s", cd.Destination, cd.GetAccountKey(), cd.GetKey(cd.Subject)))
err, _ = cd.getRatingPlansForPrefix(cd.GetKey(FALLBACK_SUBJECT), 1)
}
//load the rating plans
@@ -390,7 +389,6 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) {
//log.Printf("==============%v==================", i)
//log.Printf("TS: %+v", timespans[i])
rp := timespans[i].ratingInfo
// utils.Logger.Debug(fmt.Sprintf("rp: %+v", rp))
//timespans[i].RatingPlan = nil
rateIntervals := rp.SelectRatingIntevalsForTimespan(timespans[i])
//log.Print("RIs: ", utils.ToJSON(rateIntervals))
@@ -423,10 +421,8 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) {
//log.Print(timespans[i].RateInterval.Timing)
}
//utils.Logger.Debug(fmt.Sprintf("After SplitByRateInterval: %+v", timespans))
//log.Printf("After SplitByRateInterval: %+v", timespans[0].RateInterval.Timing)
timespans = cd.roundTimeSpansToIncrement(timespans)
// utils.Logger.Debug(fmt.Sprintf("After round: %+v", timespans))
//log.Printf("After round: %+v", timespans[0].RateInterval.Timing)
return
}
@@ -578,27 +574,18 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura
if origCD.TOR == "" {
origCD.TOR = utils.VOICE
}
//utils.Logger.Debug("ORIG: " + utils.ToJSON(origCD))
cd := origCD.Clone()
initialDuration := cd.TimeEnd.Sub(cd.TimeStart)
//utils.Logger.Debug(fmt.Sprintf("INITIAL_DURATION: %v", initialDuration))
defaultBalance := account.GetDefaultMoneyBalance()
//use this to check what increment was payed with debt
initialDefaultBalanceValue := defaultBalance.GetValue()
//utils.Logger.Debug("ACCOUNT: " + utils.ToJSON(account))
//utils.Logger.Debug("DEFAULT_BALANCE: " + utils.ToJSON(defaultBalance))
cc, err := cd.debit(account, true, false)
//utils.Logger.Debug("CC: " + utils.ToJSON(cc))
//log.Print("CC: ", utils.ToIJSON(cc))
//utils.Logger.Debug(fmt.Sprintf("ERR: %v", err))
if err != nil {
return 0, err
}
//log.Printf("CC: %+v", cc)
// not enough credit for connect fee
if cc.negativeConnectFee == true {
return 0, nil
@@ -607,16 +594,10 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura
var totalCost float64
var totalDuration time.Duration
cc.Timespans.Decompress()
//log.Printf("ACC: %+v", account)
for _, ts := range cc.Timespans {
//if ts.RateInterval != nil {
//log.Printf("TS: %+v", ts)
//utils.Logger.Debug("TS: " + utils.ToJSON(ts))
//}
if cd.MaxRate > 0 && cd.MaxRateUnit > 0 {
rate, _, rateUnit := ts.RateInterval.GetRateParameters(ts.GetGroupStart())
if rate/rateUnit.Seconds() > cd.MaxRate/cd.MaxRateUnit.Seconds() {
//utils.Logger.Debug(fmt.Sprintf("0_INIT DUR %v, TOTAL DUR: %v", initialDuration, totalDuration))
return utils.MinDuration(initialDuration, totalDuration), nil
}
}
@@ -624,14 +605,12 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura
ts.createIncrementsSlice()
}
for _, incr := range ts.Increments {
//utils.Logger.Debug("INCR: " + utils.ToJSON(incr))
totalCost += incr.Cost
if incr.BalanceInfo.Monetary != nil && incr.BalanceInfo.Monetary.UUID == defaultBalance.Uuid {
initialDefaultBalanceValue -= incr.Cost
if initialDefaultBalanceValue < 0 {
// this increment was payed with debt
// TODO: improve this check
//utils.Logger.Debug(fmt.Sprintf("1_INIT DUR %v, TOTAL DUR: %v", initialDuration, totalDuration))
return utils.MinDuration(initialDuration, totalDuration), nil
}
@@ -640,13 +619,10 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura
//log.Print("INC: ", utils.ToJSON(incr))
if totalDuration >= initialDuration {
// we have enough, return
//utils.Logger.Debug(fmt.Sprintf("2_INIT DUR %v, TOTAL DUR: %v", initialDuration, totalDuration))
return initialDuration, nil
}
}
}
//utils.Logger.Debug(fmt.Sprintf("3_INIT DUR %v, TOTAL DUR: %v", initialDuration, totalDuration))
return utils.MinDuration(initialDuration, totalDuration), nil
}

View File

@@ -476,9 +476,12 @@ func (rs *Responder) GetSessionRuns(ev *CDR, sRuns *[]*SessionRun) error {
continue // We only consider prepaid sessions
}
startTime, err := ev.GetAnswerTime(dc.AnswerTimeField, rs.Timezone)
if err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return errors.New("Error parsing answer event start time")
if err != nil || startTime.IsZero() { // AnswerTime not parsable, try SetupTime
startTime, err = ev.GetSetupTime(dc.SetupTimeField, rs.Timezone)
if err != nil {
rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return errors.New("Error parsing answer event start time")
}
}
endTime, err := ev.GetEndTime("", rs.Timezone)
if err != nil {

View File

@@ -20,6 +20,7 @@ package sessionmanager
import (
"fmt"
"net/url"
"strconv"
"strings"
"github.com/cgrates/aringo"
@@ -29,7 +30,8 @@ import (
)
const (
CGRAuthAPP = "cgrates_auth"
CGRAuthAPP = "cgrates_auth"
CGRMaxSessionTime = "CGRMaxSessionTime"
)
func NewSMAsterisk(cgrCfg *config.CGRConfig, astConnIdx int, smg rpcclient.RpcClientConnection) (*SMAsterisk, error) {
@@ -92,12 +94,52 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) {
// Since we got error, disconnect channel
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
return
}
return
}
var maxUsage float64
smgEv, err := ev.AsSMGenericSessionStart()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when generating SMG for channelID: %s", err.Error(), ev.ChannelID()))
// Since we got error, disconnect channel
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
}
if err = sma.smg.Call("SMGenericV1.InitiateSession", smgEv, &maxUsage); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
}
if maxUsage == -1 {
maxUsage = 0 // So we can set it later as unlimited
} else if maxUsage == 0 || maxUsage < sma.cgrCfg.SMAsteriskCfg().MinCallDuration.Seconds() {
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
}
// Call allowed, set absolute timeout
if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("http://%s/ari/channels/%s/variable",
sma.cgrCfg.SMAsteriskCfg().AsteriskConns[sma.astConnIdx].Address, ev.ChannelID()),
url.Values{"variable": {CGRMaxSessionTime}, "value": {strconv.FormatFloat(maxUsage*1000, 'f', -1, 64)}}); err != nil { // Asterisk expects value in ms
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when setting %s for channelID: %s", err.Error(), CGRMaxSessionTime, ev.ChannelID()))
// Since we got error, disconnect channel
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
}
// Exit channel from stasis
if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("http://%s/ari/channels/%s/continue",
sma.cgrCfg.SMAsteriskCfg().AsteriskConns[sma.astConnIdx].Address, ev.ChannelID()), url.Values{"channelId": {ev.ChannelID()}}); err != nil {
sma.cgrCfg.SMAsteriskCfg().AsteriskConns[sma.astConnIdx].Address, ev.ChannelID()), nil); err != nil {
}
}

View File

@@ -293,14 +293,13 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
}
for idx, s := range ss {
s.totalUsage = usage // save final usage as totalUsage
//utils.Logger.Info(fmt.Sprintf("<SMGeneric> Ending session: %s, runId: %s", sessionId, s.runId))
if idx == 0 && s.stopDebit != nil {
close(s.stopDebit) // Stop automatic debits
}
aTime, err := s.eventStart.GetAnswerTime(utils.META_DEFAULT, self.cgrCfg.DefaultTimezone)
if err != nil || aTime.IsZero() {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %s",
sessionId, s.runId, aTime, err.Error()))
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v",
sessionId, s.runId, aTime, err))
}
if err := s.close(aTime.Add(usage)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not close session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error()))

View File

@@ -139,12 +139,15 @@ func ParseTimeDetectLayout(tmStr string, timezone string) (time.Time, error) {
sqlRule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}$`)
gotimeRule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.?\d*\s[+,-]\d+\s\w+$`)
fsTimestamp := regexp.MustCompile(`^\d{16}$`)
astTimestamp := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d*[+,-]\d+$`)
unixTimestampRule := regexp.MustCompile(`^\d{10}$`)
oneLineTimestampRule := regexp.MustCompile(`^\d{14}$`)
oneSpaceTimestampRule := regexp.MustCompile(`^\d{2}\.\d{2}.\d{4}\s{1}\d{2}:\d{2}:\d{2}$`)
eamonTimestampRule := regexp.MustCompile(`^\d{2}/\d{2}/\d{4}\s{1}\d{2}:\d{2}:\d{2}$`)
broadsoftTimestampRule := regexp.MustCompile(`^\d{14}\.\d{3}`)
switch {
case astTimestamp.MatchString(tmStr):
return time.Parse("2006-01-02T15:04:05.999999999-0700", tmStr)
case rfc3339Rule.MatchString(tmStr):
return time.Parse(time.RFC3339, tmStr)
case gotimeRule.MatchString(tmStr):

View File

@@ -266,6 +266,14 @@ func TestParseTimeDetectLayout(t *testing.T) {
} else if !broadTmS.Equal(expectedTime) {
t.Errorf("Expecting: %v, received: %v", expectedTime, broadTmS)
}
astTimestamp := "2016-09-14T19:37:43.665+0000"
expectedTime = time.Date(2016, 9, 14, 19, 37, 43, 665000000, time.UTC)
astTMS, err := ParseTimeDetectLayout(astTimestamp, "")
if err != nil {
t.Error(err)
} else if !astTMS.Equal(expectedTime) {
t.Errorf("Expecting: %v, received: %v", expectedTime, astTMS)
}
}
func TestParseDateUnix(t *testing.T) {