Responder.GetSessionRuns, SM-Kamailio with prepaid support

This commit is contained in:
DanB
2015-01-01 15:57:21 +01:00
parent 39acb88824
commit ea3a9e6dee
16 changed files with 369 additions and 351 deletions

View File

@@ -178,6 +178,7 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
@@ -186,7 +187,13 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
dp, _ := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval))
sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, cdrsConn, dp)
case KAMAILIO:
sm, _ = sessionmanager.NewKamailioSessionManager(cfg, raterConn, cdrsConn)
var debitInterval time.Duration
if debitInterval, err = utils.ParseDurationWithSecs(strconv.Itoa(cfg.SMDebitInterval)); err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-Kamailio> Error: %s", err.Error()))
exitChan <- true
return
}
sm, _ = sessionmanager.NewKamailioSessionManager(cfg, raterConn, cdrsConn, loggerDb, debitInterval)
case OSIPS:
sm, _ = sessionmanager.NewOSipsSessionManager(cfg, raterConn, cdrsConn)
default:

View File

@@ -2,9 +2,6 @@
####### Defined Values #########
#!define FLT_ACC 1
#!define FLT_ACCMISSED 2
#!define FLT_ACCFAILED 3
#!define FLT_DIALOG 4
#!define FLT_NATS 5
#!define FLB_NATB 6
@@ -44,8 +41,6 @@ loadmodule "xlog.so"
loadmodule "sanity.so"
loadmodule "ctl.so"
loadmodule "mi_rpc.so"
loadmodule "db_flatstore.so"
loadmodule "acc.so"
loadmodule "nathelper.so"
loadmodule "rtpproxy.so"
loadmodule "htable.so"
@@ -53,6 +48,9 @@ loadmodule "auth.so"
loadmodule "evapi.so"
loadmodule "json.so"
loadmodule "dialog.so"
loadmodule "xhttp.so"
loadmodule "jsonrpc-s.so"
# ----------------- setting module-specific parameters ---------------
@@ -65,36 +63,14 @@ modparam("tm", "failure_reply_mode", 3)
modparam("tm", "fr_timer", 30000)
modparam("tm", "fr_inv_timer", 120000)
# ----- rr params -----
modparam("rr", "enable_full_lr", 0)
modparam("rr", "append_fromtag", 0)
# ----- registrar params -----
modparam("registrar", "method_filtering", 1)
modparam("registrar", "max_expires", 3600)
# ----- acc params -----
modparam("acc", "early_media", 0)
modparam("acc", "report_ack", 0)
modparam("acc", "report_cancels", 0)
modparam("acc", "detect_direction", 0)
modparam("acc", "log_flag", FLT_ACC)
modparam("acc", "log_missed_flag", FLT_ACCMISSED)
modparam("acc", "log_extra",
"src_user=$fU;src_domain=$fd;src_ip=$si;"
"dst_ouser=$tU;dst_user=$rU;dst_domain=$rd")
modparam("acc", "failed_transaction_flag", FLT_ACCFAILED)
modparam("acc", "db_flag", FLT_ACC)
modparam("acc", "db_missed_flag", FLT_ACCMISSED)
modparam("acc", "db_url", "flatstore:/var/log/acc")
modparam("acc", "db_extra",
"src_user=$fU;src_domain=$fd;src_ip=$si;"
"dst_ouser=$tU;dst_user=$rU;dst_domain=$rd")
# ----- dialog params -----
modparam("dialog", "dlg_flag", FLT_DIALOG)
modparam("dialog", "send_bye", 1)
@@ -119,6 +95,8 @@ modparam("htable", "htable", "cgrconn=>size=1;")
####### Routing Logic ########
include_file "kamailio-cgrates.cfg"
event_route[htable:mod-init] {
$sht(users=>1001) = "CGRateS.org";
$sht(users=>1002) = "CGRateS.org";
@@ -128,102 +106,6 @@ event_route[htable:mod-init] {
$sht(users=>1007) = "CGRateS.org";
}
event_route[evapi:connection-new] {
$sht(cgrconn=>cgr) = $evapi(srcaddr) + ":" + $evapi(srcport); # Detect presence of at least one connection
}
event_route[evapi:connection-closed] {
$var(connClosed) = $evapi(srcaddr) + ":" + $evapi(srcport);
if $sht(cgrconn=>cgr) == $var(connClosed) {
$sht(cgrconn=>cgr) = $null;
}
}
event_route[evapi:message-received] {
json_get_field("$evapi(msg)", "Event", "$var(Event)");
route($(var(Event){s.rm,"})); # String characters are kept by json_get_field, remove them here
}
event_route[dialog:start] {
route(CGR_CALL_START);
}
event_route[dialog:end] {
route(CGR_CALL_END);
}
route[CGR_AUTH_REQUEST] {
# Auth INVITEs with CGRateS
if $sht(cgrconn=>cgr) == $null {
sl_send_reply("503","Charging controller unreachable");
exit;
}
$dlg_var(cgrReqType) = "postpaid";
$dlg_var(cgrAccount) = $fU;
$dlg_var(cgrDestination) = $rU;
evapi_async_relay("{\"event\":\"CGR_AUTH_REQUEST\",
\"tr_index\":\"$T(id_index)\",
\"tr_label\":\"$T(id_label)\",
\"cgr_reqtype\":\"$dlg_var(cgrReqType)\",
\"cgr_account\":\"$dlg_var(cgrAccount)\",
\"cgr_destination\":\"$dlg_var(cgrDestination)\"}");
exit;
}
route[CGR_AUTH_REPLY] {
json_get_field("$evapi(msg)", "TransactionIndex", "$var(TransactionIndex)");
json_get_field("$evapi(msg)", "TransactionLabel", "$var(TransactionLabel)");
json_get_field("$evapi(msg)", "MaxSessionTime", "$var(MaxSessionTime)");
json_get_field("$evapi(msg)", "AuthError", "$var(AuthError)");
$var(id_index) = $(var(TransactionIndex){s.int});
$var(id_label) = $(var(TransactionLabel){s.int});
t_continue("$var(id_index)", "$var(id_label)", "CGR_DIALOG_TIMEOUT");
}
route[CGR_DIALOG_TIMEOUT] {
if $var(AuthError) != "null" { # null is converted in string by json_get_field
xlog("CGR_AUTH_ERROR: $var(AuthError)");
sl_send_reply("503","CGR_AUTH_ERROR");
exit;
}
if $var(MaxSessionTime) != -1 && !dlg_set_timeout("$var(MaxSessionTime)") {
sl_send_reply("503","CGR_MAX_SESSION_TIME_ERROR");
exit;
}
route(RELAY);
}
route[CGR_CALL_START] {
if $sht(cgrconn=>cgr) == $null {
sl_send_reply("503","Charging controller unreachable");
exit;
}
evapi_async_relay("{\"event\":\"CGR_CALL_START\",
\"callid\":\"$dlg(callid)\",
\"from_tag\":\"$dlg(from_tag)\",
\"cgr_reqtype\":\"$dlg_var(cgrReqType)\",
\"cgr_account\":\"$dlg_var(cgrAccount)\",
\"cgr_destination\":\"$dlg_var(cgrDestination)\"}");
exit;
}
route[CGR_CALL_END] {
if $sht(cgrconn=>cgr) == $null {
sl_send_reply("503","Charging controller unreachable");
exit;
}
$var(callDur) = $TS - $dlg(start_ts);
evapi_async_relay("{\"event\":\"CGR_CALL_END\",
\"callid\":\"$dlg(callid)\",
\"from_tag\":\"$dlg(from_tag)\",
\"cgr_reqtype\":\"$dlg_var(cgrReqType)\",
\"cgr_account\":\"$dlg_var(cgrAccount)\",
\"cgr_destination\":\"$dlg_var(cgrDestination)\",
\"cgr_answertime\":\"$dlg(start_ts)\",
\"cgr_duration\":\"$var(callDur)\"}");
exit;
}
# Main SIP request routing logic
request_route {
@@ -263,11 +145,6 @@ request_route {
if (is_method("INVITE|SUBSCRIBE"))
record_route();
# account only INVITEs
if (is_method("INVITE")) {
setflag(FLT_ACC); # do accounting
}
# Not handling requests towards external domains
if uri != myself {
sl_send_reply("604", "Only local destinations accepted");
@@ -341,11 +218,7 @@ route[WITHINDLG] {
# take the path determined by record-routing
if (loose_route()) {
route(DLGURI);
if (is_method("BYE")) {
setflag(FLT_ACC); # do accounting ...
setflag(FLT_ACCFAILED); # ... even if the transaction fails
}
else if ( is_method("ACK") ) {
if ( is_method("ACK") ) {
# ACK is forwarded statelessy
route(NATMANAGE);
}
@@ -400,10 +273,6 @@ route[LOCATION] {
exit;
}
}
# when routing via usrloc, log the missed calls also
if (is_method("INVITE")) {
setflag(FLT_ACCMISSED);
}
}
# user uthentication

View File

@@ -33,6 +33,13 @@ import (
"github.com/cgrates/rpcclient"
)
// Individual session run
type SessionRun struct {
DerivedCharger *utils.DerivedCharger // Needed in reply
CallDescriptor *CallDescriptor
CallCosts []*CallCost
}
type Responder struct {
Bal *balancer2go.Balancer
ExitChan chan bool
@@ -176,6 +183,41 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev utils.Event, reply *float64) er
return nil
}
// Used by SM to get all the prepaid CallDescriptors attached to a session
func (rs *Responder) GetSessionRuns(ev utils.Event, sRuns *[]*SessionRun) error {
if rs.Bal != nil {
return errors.New("Unsupported method on the balancer")
}
attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := rs.GetDerivedChargers(attrsDC, &dcs); err != nil {
return err
}
dcs, _ = dcs.AppendDefaultRun()
sesRuns := make([]*SessionRun, 0)
for _, dc := range dcs {
if ev.GetReqType(dc.ReqTypeField) != utils.PREPAID {
continue // We only consider prepaid sessions
}
startTime, err := ev.GetAnswerTime(dc.AnswerTimeField)
if err != nil {
return errors.New("Error parsing answer event start time")
}
cd := &CallDescriptor{
Direction: ev.GetDirection(dc.DirectionField),
Tenant: ev.GetTenant(dc.TenantField),
Category: ev.GetCategory(dc.CategoryField),
Subject: ev.GetSubject(dc.SubjectField),
Account: ev.GetAccount(dc.AccountField),
Destination: ev.GetDestination(dc.DestinationField),
TimeStart: startTime}
sesRuns = append(sesRuns, &SessionRun{DerivedCharger: dc, CallDescriptor: cd})
}
*sRuns = sesRuns
return nil
}
func (rs *Responder) GetDerivedChargers(attrs utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error {
// ToDo: Make it work with balancer if needed
@@ -355,6 +397,7 @@ type Connector interface {
GetMaxSessionTime(CallDescriptor, *float64) error
GetDerivedChargers(utils.AttrDerivedChargers, *utils.DerivedChargers) error
GetDerivedMaxSessionTime(utils.Event, *float64) error
GetSessionRuns(utils.Event, *[]*SessionRun) error
ProcessCdr(*utils.StoredCdr, *string) error
}
@@ -386,6 +429,10 @@ func (rcc *RPCClientConnector) GetDerivedMaxSessionTime(ev utils.Event, reply *f
return rcc.Client.Call("Responder.GetDerivedMaxSessionTime", ev, reply)
}
func (rcc *RPCClientConnector) GetSessionRuns(ev utils.Event, sRuns *[]*SessionRun) error {
return rcc.Client.Call("Responder.GetSessionRuns", ev, sRuns)
}
func (rcc *RPCClientConnector) GetDerivedChargers(attrs utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error {
return rcc.Client.Call("ApierV1.GetDerivedChargers", attrs, dcs)
}

View File

@@ -85,5 +85,41 @@ func TestGetDerivedMaxSessionTime(t *testing.T) {
} else if maxSessionTime != 9.9e+10 { // Smallest one
t.Error("Unexpected maxSessionTime received: ", maxSessionTime)
}
}
func TestGetSessionRuns(t *testing.T) {
config.CgrConfig().CombinedDerivedChargers = false
testTenant := "vdf"
cdr := &utils.StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf",
CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "prepaid", Direction: "*out", Tenant: testTenant, Category: "call", Account: "dan2", Subject: "dan2",
Destination: "1002", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
MediationRunId: utils.DEFAULT_RUNID, Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan"}
keyCharger1 := utils.ConcatenatedKey("*out", testTenant, "call", "dan2", "dan2")
dfDC := &utils.DerivedCharger{RunId: utils.DEFAULT_RUNID, ReqTypeField: "*default", DirectionField: "*default", TenantField: "*default", CategoryField: "*default",
AccountField: "*default", SubjectField: "*default", DestinationField: "*default", SetupTimeField: "*default", AnswerTimeField: "*default", UsageField: "*default"}
extra1DC := &utils.DerivedCharger{RunId: "extra1", ReqTypeField: "^prepaid", DirectionField: "*default", TenantField: "*default", CategoryField: "^0",
AccountField: "^minitsboy", SubjectField: "^rif", DestinationField: "^0256", SetupTimeField: "*default", AnswerTimeField: "*default", UsageField: "*default"}
extra2DC := &utils.DerivedCharger{RunId: "extra2", ReqTypeField: "*default", DirectionField: "*default", TenantField: "*default", CategoryField: "*default",
AccountField: "^ivo", SubjectField: "^ivo", DestinationField: "*default", SetupTimeField: "*default", AnswerTimeField: "*default", UsageField: "*default"}
extra3DC := &utils.DerivedCharger{RunId: "extra3", ReqTypeField: "^pseudoprepaid", DirectionField: "*default", TenantField: "*default", CategoryField: "^0",
AccountField: "^minu", SubjectField: "^rif", DestinationField: "^0256", SetupTimeField: "*default", AnswerTimeField: "*default", UsageField: "*default"}
charger1 := utils.DerivedChargers{extra1DC, extra2DC, extra3DC}
if err := accountingStorage.SetDerivedChargers(keyCharger1, charger1); err != nil {
t.Error("Error on setting DerivedChargers", err.Error())
}
accountingStorage.CacheAccounting(nil, nil, nil, nil)
sesRuns := make([]*SessionRun, 0)
eSRuns := []*SessionRun{
&SessionRun{DerivedCharger: extra1DC,
CallDescriptor: &CallDescriptor{Direction: "*out", Category: "0", Tenant: "vdf", Subject: "rif", Account: "minitsboy", Destination: "0256", TimeStart: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC)}},
&SessionRun{DerivedCharger: extra2DC,
CallDescriptor: &CallDescriptor{Direction: "*out", Category: "call", Tenant: "vdf", Subject: "ivo", Account: "ivo", Destination: "1002", TimeStart: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC)}},
&SessionRun{DerivedCharger: dfDC,
CallDescriptor: &CallDescriptor{Direction: "*out", Category: "call", Tenant: "vdf", Subject: "dan2", Account: "dan2", Destination: "1002", TimeStart: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC)}}}
if err := rsponder.GetSessionRuns(cdr.AsEvent(""), &sesRuns); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSRuns, sesRuns) {
t.Errorf("Received: %+v", sesRuns)
}
}

View File

@@ -159,7 +159,7 @@ func (fsev FSEvent) GetReqType(fieldName string) string {
}
return utils.FirstNonEmpty(fsev[fieldName], fsev[REQTYPE], config.CgrConfig().DefaultReqType)
}
func (fsev FSEvent) MissingParameter(eventName string) bool {
func (fsev FSEvent) MissingParameter() bool {
return strings.TrimSpace(fsev.GetDirection(utils.META_DEFAULT)) == "" ||
strings.TrimSpace(fsev.GetSubject(utils.META_DEFAULT)) == "" ||
strings.TrimSpace(fsev.GetAccount(utils.META_DEFAULT)) == "" ||

View File

@@ -190,7 +190,7 @@ func (sm *FSSessionManager) OnChannelPark(ev utils.Event) {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
if ev.MissingParameter(utils.META_DEFAULT) {
if ev.MissingParameter() {
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(dc.DestinationField), MISSING_PARAMETER)
engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID()))
return
@@ -231,22 +231,10 @@ func (sm *FSSessionManager) OnChannelPark(ev utils.Event) {
}
func (sm *FSSessionManager) OnChannelAnswer(ev utils.Event) {
if ev.MissingParameter(utils.META_DEFAULT) {
if ev.MissingParameter() {
sm.DisconnectSession(ev.GetUUID(), MISSING_PARAMETER, "")
}
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_reqtype %s\n\n", ev.GetUUID(), ev.GetReqType(""))); err != nil {
engine.Logger.Err(fmt.Sprintf("Error on attempting to overwrite cgr_type in chan variables: %v", err))
}
attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT),
Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> OnAnswer: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
sm.DisconnectSession(ev.GetUUID(), SYSTEM_ERROR, "") // Disconnect the session since we are not able to process sessions
return
}
dcs, _ = dcs.AppendDefaultRun()
s := NewSession(ev, sm, dcs)
s := NewSession(ev, sm)
if s != nil {
sm.sessions = append(sm.sessions, s)
}
@@ -257,91 +245,10 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev utils.Event) {
s := sm.GetSession(ev.GetUUID())
if s == nil { // Not handled by us
return
} else {
sm.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency
}
defer s.Close(ev) // Stop loop and save the costs deducted so far to database
attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> OnHangup: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
return
}
dcs, _ = dcs.AppendDefaultRun()
for _, dc := range dcs {
if ev.GetReqType(dc.ReqTypeField) != utils.PREPAID {
continue
}
sr := s.GetSessionRun(dc.RunId)
if sr == nil {
continue // Did not save a sessionRun for this dc
}
if len(sr.callCosts) == 0 {
continue // why would we have 0 callcosts
}
lastCC := sr.callCosts[len(sr.callCosts)-1]
lastCC.Timespans.Decompress()
// put credit back
startTime, err := ev.GetAnswerTime(dc.AnswerTimeField)
if err != nil {
engine.Logger.Crit("Error parsing prepaid call start time from event")
return
}
duration, err := ev.GetDuration(dc.UsageField)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Error parsing call duration from event %s", err.Error()))
return
}
hangupTime := startTime.Add(duration)
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refundDuration := end.Sub(hangupTime)
var refundIncrements engine.Increments
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
tsDuration := ts.GetDuration()
if refundDuration <= tsDuration {
lastRefundedIncrementIndex := 0
for j := len(ts.Increments) - 1; j >= 0; j-- {
increment := ts.Increments[j]
if increment.Duration <= refundDuration {
refundIncrements = append(refundIncrements, increment)
refundDuration -= increment.Duration
lastRefundedIncrementIndex = j
}
}
ts.SplitByIncrement(lastRefundedIncrementIndex)
break // do not go to other timespans
} else {
refundIncrements = append(refundIncrements, ts.Increments...)
// remove the timespan entirely
lastCC.Timespans[i] = nil
lastCC.Timespans = lastCC.Timespans[:i]
// continue to the next timespan with what is left to refund
refundDuration -= tsDuration
}
}
// show only what was actualy refunded (stopped in timespan)
// engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
if len(refundIncrements) > 0 {
cd := &engine.CallDescriptor{
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
Category: lastCC.Category,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Increments: refundIncrements,
}
var response float64
err := sm.rater.RefundIncrements(*cd, &response)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err))
}
}
cost := refundIncrements.GetTotalCost()
lastCC.Cost -= cost
lastCC.Timespans.Compress()
sm.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency
if err := s.Close(ev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
engine.Logger.Err(err.Error())
}
}
@@ -368,6 +275,10 @@ func (sm *FSSessionManager) GetDbLogger() engine.LogStorage {
return sm.loggerDB
}
func (sm *FSSessionManager) Rater() engine.Connector {
return sm.rater
}
func (sm *FSSessionManager) Shutdown() (err error) {
if fsock.FS == nil || !fsock.FS.Connected() {
return errors.New("Cannot shutdown sessions, fsock not connected")

View File

@@ -27,19 +27,23 @@ import (
"github.com/cgrates/kamevapi"
"log/syslog"
"regexp"
"strings"
"time"
)
func NewKamailioSessionManager(cfg *config.CGRConfig, rater, cdrsrv engine.Connector) (*KamailioSessionManager, error) {
ksm := &KamailioSessionManager{cgrCfg: cfg, rater: rater, cdrsrv: cdrsrv}
func NewKamailioSessionManager(cfg *config.CGRConfig, rater, cdrsrv engine.Connector, loggerDb engine.LogStorage, debitInterval time.Duration) (*KamailioSessionManager, error) {
ksm := &KamailioSessionManager{cgrCfg: cfg, rater: rater, cdrsrv: cdrsrv, loggerDb: loggerDb, debitInterval: debitInterval}
return ksm, nil
}
type KamailioSessionManager struct {
cgrCfg *config.CGRConfig
rater engine.Connector
cdrsrv engine.Connector
kea *kamevapi.KamEvapi
cgrCfg *config.CGRConfig
rater engine.Connector
cdrsrv engine.Connector
loggerDb engine.LogStorage
debitInterval time.Duration
kea *kamevapi.KamEvapi
sessions []*Session
}
func (self *KamailioSessionManager) onCgrAuth(evData []byte) {
@@ -47,6 +51,14 @@ func (self *KamailioSessionManager) onCgrAuth(evData []byte) {
if err != nil {
engine.Logger.Info(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
}
if kev.MissingParameter() {
if kar, err := kev.AsKamAuthReply(0.0, errors.New(utils.ERR_MANDATORY_IE_MISSING)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed building auth reply %s", err.Error()))
} else if err = self.kea.Send(kar.String()); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending auth reply %s", err.Error()))
}
return
}
var remainingDuration float64
if err = self.rater.GetDerivedMaxSessionTime(kev.AsEvent(""), &remainingDuration); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Could not get max session time for %s, error: %s", kev.GetUUID(), err.Error()))
@@ -59,18 +71,37 @@ func (self *KamailioSessionManager) onCgrAuth(evData []byte) {
}
func (self *KamailioSessionManager) onCallStart(evData []byte) {
_, err := NewKamEvent(evData)
kamEv, err := NewKamEvent(evData)
if err != nil {
engine.Logger.Info(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
}
if kamEv.MissingParameter() {
self.DisconnectSession(fmt.Sprintf("%s,%s", kamEv[HASH_ENTRY], kamEv[HASH_ID]), utils.ERR_MANDATORY_IE_MISSING, "")
return
}
s := NewSession(kamEv, self)
if s != nil {
self.sessions = append(self.sessions, s)
}
}
func (self *KamailioSessionManager) onCallEnd(evData []byte) {
kev, err := NewKamEvent(evData)
if err != nil {
engine.Logger.Info(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
}
if kev.MissingParameter() {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Mandatory IE missing out of event: %+v", kev))
}
go self.ProcessCdr(kev.AsStoredCdr())
s := self.GetSession(kev.GetUUID())
if s == nil { // Not handled by us
return
}
self.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency
if err := s.Close(kev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
engine.Logger.Err(err.Error())
}
go self.ProcessCdr(kev)
}
func (self *KamailioSessionManager) Connect() error {
@@ -90,29 +121,52 @@ func (self *KamailioSessionManager) Connect() error {
}
func (self *KamailioSessionManager) DisconnectSession(uuid, notify, destnr string) {
hashSplt := strings.Split(uuid, ",")
disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: hashSplt[0], HashId: hashSplt[1], Reason: notify}
if err := self.kea.Send(disconnectEv.String()); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending disconnect request %s", err.Error()))
}
return
}
func (self *KamailioSessionManager) RemoveSession(uuid string) {
return
for i, ss := range self.sessions {
if ss.uuid == uuid {
self.sessions = append(self.sessions[:i], self.sessions[i+1:]...)
return
}
}
}
// Searches and return the session with the specifed uuid
func (self *KamailioSessionManager) GetSession(uuid string) *Session {
for _, s := range self.sessions {
if s.uuid == uuid {
return s
}
}
return nil
}
func (self *KamailioSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error {
return nil
return self.rater.MaxDebit(*cd, cc)
}
func (self *KamailioSessionManager) GetDebitPeriod() time.Duration {
var nilDuration time.Duration
return nilDuration
return self.debitInterval
}
func (self *KamailioSessionManager) GetDbLogger() engine.LogStorage {
return nil
return self.loggerDb
}
func (self *KamailioSessionManager) ProcessCdr(ev utils.Event) {
func (self *KamailioSessionManager) Rater() engine.Connector {
return self.rater
}
func (self *KamailioSessionManager) ProcessCdr(cdr *utils.StoredCdr) {
if self.cdrsrv == nil {
return
}
storedCdr := ev.AsStoredCdr()
var reply string
if err := self.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error()))
if err := self.cdrsrv.ProcessCdr(cdr, &reply); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CgrId, cdr.AccId, err.Error()))
}
}
func (self *KamailioSessionManager) Shutdown() error {

View File

@@ -30,21 +30,25 @@ import (
)
const (
EVENT = "event"
CGR_AUTH_REPLY = "CGR_AUTH_REPLY"
CGR_SETUPTIME = "cgr_setuptime"
CGR_ANSWERTIME = "cgr_answertime"
CGR_STOPTIME = "cgr_stoptime"
CGR_DURATION = "cgr_duration"
KAM_TR_INDEX = "tr_index"
KAM_TR_LABEL = "tr_label"
EVENT = "event"
CGR_AUTH_REQUEST = "CGR_AUTH_REQUEST"
CGR_AUTH_REPLY = "CGR_AUTH_REPLY"
CGR_SESSION_DISCONNECT = "CGR_SESSION_DISCONNECT"
CGR_CALL_START = "CGR_CALL_START"
CGR_CALL_END = "CGR_CALL_END"
CGR_SETUPTIME = "cgr_setuptime"
CGR_ANSWERTIME = "cgr_answertime"
CGR_STOPTIME = "cgr_stoptime"
CGR_DURATION = "cgr_duration"
KAM_TR_INDEX = "tr_index"
KAM_TR_LABEL = "tr_label"
HASH_ENTRY = "h_entry"
HASH_ID = "h_id"
)
var primaryFields = []string{EVENT, CALLID, FROM_TAG, TO_TAG, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION,
var primaryFields = []string{EVENT, CALLID, FROM_TAG, HASH_ENTRY, HASH_ID, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION,
CGR_CATEGORY, CGR_TENANT, CGR_REQTYPE, CGR_ANSWERTIME, CGR_SETUPTIME, CGR_STOPTIME, CGR_DURATION}
var mandatoryAuth = []string{EVENT, CALLID, FROM_TAG, CGR_ACCOUNT, CGR_DESTINATION, CGR_SETUPTIME}
type KamAuthReply struct {
Event string // Kamailio will use this to differentiate between requests and replies
TransactionIndex int // Original transaction index
@@ -58,6 +62,18 @@ func (self *KamAuthReply) String() string {
return string(mrsh)
}
type KamSessionDisconnect struct {
Event string
HashEntry string
HashId string
Reason string
}
func (self *KamSessionDisconnect) String() string {
mrsh, _ := json.Marshal(self)
return string(mrsh)
}
func NewKamEvent(kamEvData []byte) (KamEvent, error) {
kev := make(map[string]string)
if err := json.Unmarshal(kamEvData, &kev); err != nil {
@@ -167,14 +183,29 @@ func (kev KamEvent) GetExtraFields() map[string]string {
func (kev KamEvent) GetCdrSource() string {
return "KAMAILIO_" + kev.GetName()
}
func (kev KamEvent) MissingParameter(eventName string) bool {
switch eventName {
case utils.CGR_AUTHORIZE:
func (kev KamEvent) MissingParameter() bool {
var nullTime time.Time
switch kev.GetName() {
case CGR_AUTH_REQUEST:
if setupTime, err := kev.GetSetupTime(utils.META_DEFAULT); err != nil || setupTime == nullTime {
return true
}
return len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
len(kev[KAM_TR_INDEX]) == 0 || len(kev[KAM_TR_LABEL]) == 0
case CGR_CALL_START:
if aTime, err := kev.GetAnswerTime(utils.META_DEFAULT); err != nil || aTime == nullTime {
return true
}
return len(kev.GetUUID()) == 0 ||
len(kev.GetCategory(utils.META_DEFAULT)) == 0 ||
len(kev.GetTenant(utils.META_DEFAULT)) == 0 ||
len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(kev.GetDestination(utils.META_DEFAULT)) == 0
len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
len(kev[HASH_ENTRY]) == 0 || len(kev[HASH_ID]) == 0
case CGR_CALL_END:
return len(kev.GetUUID()) == 0 ||
len(kev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(kev.GetDestination(utils.META_DEFAULT)) == 0 ||
len(kev[CGR_DURATION]) == 0
default:
return true
}

View File

@@ -60,3 +60,30 @@ func TestKevAsKamAuthReply(t *testing.T) {
t.Error("Received KAR: ", rcvKar)
}
}
func TestKevMissingParameter(t *testing.T) {
kamEv := KamEvent{"event": "CGR_AUTH_REQUEST", "tr_index": "36045", "tr_label": "612369399", "cgr_reqtype": "postpaid",
"cgr_account": "1001", "cgr_destination": "1002"}
if !kamEv.MissingParameter() {
t.Error("Failed detecting missing parameters")
}
kamEv["cgr_setuptime"] = "1419962256"
if kamEv.MissingParameter() {
t.Error("False detecting missing parameters")
}
kamEv = KamEvent{"event": "UNKNOWN"}
if !kamEv.MissingParameter() {
t.Error("Failed detecting missing parameters")
}
kamEv = KamEvent{"event": "CGR_CALL_START", "callid": "9d28ec3ee068babdfe036623f42c0969@0:0:0:0:0:0:0:0", "from_tag": "3131b566",
"cgr_reqtype": "postpaid", "cgr_account": "1001", "cgr_destination": "1002"}
if !kamEv.MissingParameter() {
t.Error("Failed detecting missing parameters")
}
kamEv["h_entry"] = "463"
kamEv["h_id"] = "2605"
kamEv["cgr_answertime"] = "1419964961"
if kamEv.MissingParameter() {
t.Error("False detecting missing parameters")
}
}

View File

@@ -172,7 +172,7 @@ func (osipsEv *OsipsEvent) GetOriginatorIP(fieldName string) string {
}
return osipsEv.osipsEvent.OriginatorAddress.IP.String()
}
func (osipsev *OsipsEvent) MissingParameter(eventName string) bool {
func (osipsev *OsipsEvent) MissingParameter() bool {
return len(osipsev.GetUUID()) == 0 ||
len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(osipsev.GetSubject(utils.META_DEFAULT)) == 0 ||

View File

@@ -113,14 +113,14 @@ func TestOsipsEventGetValues(t *testing.T) {
}
func TestOsipsEventMissingParameter(t *testing.T) {
if osipsEv.MissingParameter(utils.META_DEFAULT) {
if osipsEv.MissingParameter() {
t.Errorf("Wrongly detected missing parameter: %+v", osipsEv)
}
osipsEv2 := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_CDR",
AttrValues: map[string]string{"to_tag": "4ea9687f", "cgr_account": "dan", "setuptime": "7", "created": "1406370492", "method": "INVITE", "callid": "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ",
"sip_reason": "OK", "time": "1406370499", "cgr_reqtype": "prepaid", "cgr_subject": "dan", "cgr_tenant": "itsyscom.com", "sip_code": "200",
"duration": "20", "from_tag": "eb082607"}}}
if !osipsEv2.MissingParameter(utils.META_DEFAULT) {
if !osipsEv2.MissingParameter() {
t.Error("Failed to detect missing parameter.")
}
}

View File

@@ -85,6 +85,9 @@ func (osm *OsipsSessionManager) GetDebitPeriod() time.Duration {
func (osm *OsipsSessionManager) GetDbLogger() engine.LogStorage {
return nil
}
func (self *OsipsSessionManager) Rater() engine.Connector {
return self.rater
}
func (osm *OsipsSessionManager) Shutdown() error {
return nil
}
@@ -152,7 +155,7 @@ func (osm *OsipsSessionManager) OnCdr(cdrDagram *osipsdagram.OsipsEvent) {
// Process Authorize request from OpenSIPS and communicate back maxdur
func (osm *OsipsSessionManager) OnAuthorize(osipsDagram *osipsdagram.OsipsEvent) {
ev, _ := NewOsipsEvent(osipsDagram)
if ev.MissingParameter(utils.META_DEFAULT) {
if ev.MissingParameter() {
cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_MANDATORY_IE_MISSING)
if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) {
engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply)))

View File

@@ -35,65 +35,42 @@ type Session struct {
uuid string
stopDebit chan bool
sessionManager SessionManager
sessionRuns []*SessionRun
sessionRuns []*engine.SessionRun
}
func (s *Session) GetSessionRun(runid string) *SessionRun {
func (s *Session) GetSessionRun(runid string) *engine.SessionRun {
for _, sr := range s.sessionRuns {
if sr.runId == runid {
if sr.DerivedCharger.RunId == runid {
return sr
}
}
return nil
}
// One individual run
type SessionRun struct {
runId string
callDescriptor *engine.CallDescriptor
callCosts []*engine.CallCost
func (s *Session) SessionRuns() []*engine.SessionRun {
return s.sessionRuns
}
// Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually
func NewSession(ev utils.Event, sm SessionManager, dcs utils.DerivedChargers) *Session {
func NewSession(ev utils.Event, sm SessionManager) *Session {
s := &Session{cgrid: ev.GetCgrId(),
uuid: ev.GetUUID(),
stopDebit: make(chan bool),
sessionManager: sm,
}
for _, dc := range dcs {
if ev.GetReqType(dc.ReqTypeField) != utils.PREPAID {
continue // We only consider prepaid sessions
}
startTime, err := ev.GetAnswerTime(dc.AnswerTimeField)
if err != nil {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
return nil
}
cd := &engine.CallDescriptor{
Direction: ev.GetDirection(dc.DirectionField),
Tenant: ev.GetTenant(dc.TenantField),
Category: ev.GetCategory(dc.CategoryField),
Subject: ev.GetSubject(dc.SubjectField),
Account: ev.GetAccount(dc.AccountField),
Destination: ev.GetDestination(dc.DestinationField),
TimeStart: startTime}
sr := &SessionRun{
runId: dc.RunId,
callDescriptor: cd,
}
s.sessionRuns = append(s.sessionRuns, sr)
go s.debitLoop(len(s.sessionRuns) - 1) // Send index of the just appended sessionRun
}
if len(s.sessionRuns) == 0 {
sRuns := make([]*engine.SessionRun, 0)
if err := sm.Rater().GetSessionRuns(ev, &sRuns); err != nil || len(sRuns) == 0 {
return nil
}
for runIdx := range sRuns {
go s.debitLoop(runIdx) // Send index of the just appended sessionRun
}
return s
}
// the debit loop method (to be stoped by sending somenthing on stopDebit channel)
func (s *Session) debitLoop(runIdx int) {
nextCd := *s.sessionRuns[runIdx].callDescriptor
nextCd := *s.sessionRuns[runIdx].CallDescriptor
index := 0.0
debitPeriod := s.sessionManager.GetDebitPeriod()
for {
@@ -123,7 +100,7 @@ func (s *Session) debitLoop(runIdx int) {
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send uuid_broadcast to freeswitch: %s", err.Error()))
}
}
s.sessionRuns[runIdx].callCosts = append(s.sessionRuns[runIdx].callCosts, cc)
s.sessionRuns[runIdx].CallCosts = append(s.sessionRuns[runIdx].CallCosts, cc)
nextCd.TimeEnd = cc.GetEndTime() // set debited timeEnd
// update call duration with real debited duration
nextCd.DurationIndex -= debitPeriod
@@ -134,19 +111,84 @@ func (s *Session) debitLoop(runIdx int) {
}
// Stops the debit loop
func (s *Session) Close(ev utils.Event) {
// engine.Logger.Debug(fmt.Sprintf("Stopping debit for %s", s.uuid))
if s == nil {
return
}
func (s *Session) Close(ev utils.Event) error {
close(s.stopDebit) // Close the channel so all the sessionRuns listening will be notified
if _, err := ev.GetEndTime(); err != nil {
engine.Logger.Err("Error parsing answer event stop time.")
for idx := range s.sessionRuns {
s.sessionRuns[idx].callDescriptor.TimeEnd = s.sessionRuns[idx].callDescriptor.TimeStart.Add(s.sessionRuns[idx].callDescriptor.DurationIndex)
s.sessionRuns[idx].CallDescriptor.TimeEnd = s.sessionRuns[idx].CallDescriptor.TimeStart.Add(s.sessionRuns[idx].CallDescriptor.DurationIndex)
}
}
s.SaveOperations()
// Costs refunds
for _, sr := range s.SessionRuns() {
if len(sr.CallCosts) == 0 {
continue // why would we have 0 callcosts
}
lastCC := sr.CallCosts[len(sr.CallCosts)-1]
lastCC.Timespans.Decompress()
// put credit back
startTime, err := ev.GetAnswerTime(sr.DerivedCharger.AnswerTimeField)
if err != nil {
engine.Logger.Crit("Error parsing prepaid call start time from event")
return err
}
duration, err := ev.GetDuration(sr.DerivedCharger.UsageField)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Error parsing call duration from event %s", err.Error()))
return err
}
hangupTime := startTime.Add(duration)
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refundDuration := end.Sub(hangupTime)
var refundIncrements engine.Increments
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
tsDuration := ts.GetDuration()
if refundDuration <= tsDuration {
lastRefundedIncrementIndex := 0
for j := len(ts.Increments) - 1; j >= 0; j-- {
increment := ts.Increments[j]
if increment.Duration <= refundDuration {
refundIncrements = append(refundIncrements, increment)
refundDuration -= increment.Duration
lastRefundedIncrementIndex = j
}
}
ts.SplitByIncrement(lastRefundedIncrementIndex)
break // do not go to other timespans
} else {
refundIncrements = append(refundIncrements, ts.Increments...)
// remove the timespan entirely
lastCC.Timespans[i] = nil
lastCC.Timespans = lastCC.Timespans[:i]
// continue to the next timespan with what is left to refund
refundDuration -= tsDuration
}
}
// show only what was actualy refunded (stopped in timespan)
// engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
if len(refundIncrements) > 0 {
cd := &engine.CallDescriptor{
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
Category: lastCC.Category,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Increments: refundIncrements,
}
var response float64
err := s.sessionManager.Rater().RefundIncrements(*cd, &response)
if err != nil {
return err
}
}
cost := refundIncrements.GetTotalCost()
lastCC.Cost -= cost
lastCC.Timespans.Compress()
}
go s.SaveOperations()
return nil
}
// Nice print for session
@@ -157,22 +199,17 @@ func (s *Session) String() string {
// Saves call_costs for each session run
func (s *Session) SaveOperations() {
if s == nil {
return
}
go func() {
for _, sr := range s.sessionRuns {
if len(sr.callCosts) == 0 {
break // There are no costs to save, ignore the operation
}
firstCC := sr.callCosts[0]
for _, cc := range sr.callCosts[1:] {
firstCC.Merge(cc)
}
if s.sessionManager.GetDbLogger() == nil {
engine.Logger.Err("<SessionManager> Error: no connection to logger database, cannot save costs")
}
s.sessionManager.GetDbLogger().LogCallCost(s.cgrid, engine.SESSION_MANAGER_SOURCE, sr.runId, firstCC)
for _, sr := range s.sessionRuns {
if len(sr.CallCosts) == 0 {
break // There are no costs to save, ignore the operation
}
}()
firstCC := sr.CallCosts[0]
for _, cc := range sr.CallCosts[1:] {
firstCC.Merge(cc)
}
if s.sessionManager.GetDbLogger() == nil {
engine.Logger.Err("<SessionManager> Error: no connection to logger database, cannot save costs")
}
s.sessionManager.GetDbLogger().LogCallCost(s.cgrid, engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC)
}
}

View File

@@ -31,5 +31,6 @@ type SessionManager interface {
MaxDebit(*engine.CallDescriptor, *engine.CallCost) error
GetDebitPeriod() time.Duration
GetDbLogger() engine.LogStorage
Rater() engine.Connector
Shutdown() error
}

View File

@@ -40,7 +40,7 @@ type Event interface {
GetDuration(string) (time.Duration, error)
GetOriginatorIP(string) string
GetExtraFields() map[string]string
MissingParameter(string) bool
MissingParameter() bool
ParseEventValue(*RSRField) string
PassesFieldFilter(*RSRField) (bool, string)
AsStoredCdr() *StoredCdr

View File

@@ -462,17 +462,12 @@ func (storedCdr *StoredCdr) GetOriginatorIP(fieldName string) string {
func (storedCdr *StoredCdr) GetExtraFields() map[string]string {
return storedCdr.ExtraFields
}
func (storedCdr *StoredCdr) MissingParameter(eventName string) bool {
switch eventName {
case CGR_AUTHORIZE:
return len(storedCdr.AccId) == 0 ||
len(storedCdr.Category) == 0 ||
len(storedCdr.Tenant) == 0 ||
len(storedCdr.Account) == 0 ||
len(storedCdr.Destination) == 0
default:
return true
}
func (storedCdr *StoredCdr) MissingParameter() bool {
return len(storedCdr.AccId) == 0 ||
len(storedCdr.Category) == 0 ||
len(storedCdr.Tenant) == 0 ||
len(storedCdr.Account) == 0 ||
len(storedCdr.Destination) == 0
}
func (storedCdr *StoredCdr) ParseEventValue(rsrFld *RSRField) string {
return storedCdr.FieldAsString(rsrFld)