diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 98f87152c..0fb3954ce 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -178,6 +178,7 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage if err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) exitChan <- true + return } cdrsConn = &engine.RPCClientConnector{Client: client} } @@ -186,7 +187,13 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage dp, _ := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval)) sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, cdrsConn, dp) case KAMAILIO: - sm, _ = sessionmanager.NewKamailioSessionManager(cfg, raterConn, cdrsConn) + var debitInterval time.Duration + if debitInterval, err = utils.ParseDurationWithSecs(strconv.Itoa(cfg.SMDebitInterval)); err != nil { + engine.Logger.Crit(fmt.Sprintf(" Error: %s", err.Error())) + exitChan <- true + return + } + sm, _ = sessionmanager.NewKamailioSessionManager(cfg, raterConn, cdrsConn, loggerDb, debitInterval) case OSIPS: sm, _ = sessionmanager.NewOSipsSessionManager(cfg, raterConn, cdrsConn) default: diff --git a/data/kamailio/etc/kamailio/kamailio.cfg b/data/kamailio/etc/kamailio/kamailio.cfg index ee5b10c2f..df3c69c14 100644 --- a/data/kamailio/etc/kamailio/kamailio.cfg +++ b/data/kamailio/etc/kamailio/kamailio.cfg @@ -2,9 +2,6 @@ ####### Defined Values ######### -#!define FLT_ACC 1 -#!define FLT_ACCMISSED 2 -#!define FLT_ACCFAILED 3 #!define FLT_DIALOG 4 #!define FLT_NATS 5 #!define FLB_NATB 6 @@ -44,8 +41,6 @@ loadmodule "xlog.so" loadmodule "sanity.so" loadmodule "ctl.so" loadmodule "mi_rpc.so" -loadmodule "db_flatstore.so" -loadmodule "acc.so" loadmodule "nathelper.so" loadmodule "rtpproxy.so" loadmodule "htable.so" @@ -53,6 +48,9 @@ loadmodule "auth.so" loadmodule "evapi.so" loadmodule "json.so" loadmodule "dialog.so" +loadmodule "xhttp.so" +loadmodule "jsonrpc-s.so" + # ----------------- setting module-specific parameters --------------- @@ -65,36 +63,14 @@ modparam("tm", "failure_reply_mode", 3) modparam("tm", "fr_timer", 30000) modparam("tm", "fr_inv_timer", 120000) - # ----- rr params ----- modparam("rr", "enable_full_lr", 0) modparam("rr", "append_fromtag", 0) - # ----- registrar params ----- modparam("registrar", "method_filtering", 1) modparam("registrar", "max_expires", 3600) - -# ----- acc params ----- -modparam("acc", "early_media", 0) -modparam("acc", "report_ack", 0) -modparam("acc", "report_cancels", 0) -modparam("acc", "detect_direction", 0) -modparam("acc", "log_flag", FLT_ACC) -modparam("acc", "log_missed_flag", FLT_ACCMISSED) -modparam("acc", "log_extra", - "src_user=$fU;src_domain=$fd;src_ip=$si;" - "dst_ouser=$tU;dst_user=$rU;dst_domain=$rd") -modparam("acc", "failed_transaction_flag", FLT_ACCFAILED) -modparam("acc", "db_flag", FLT_ACC) -modparam("acc", "db_missed_flag", FLT_ACCMISSED) -modparam("acc", "db_url", "flatstore:/var/log/acc") -modparam("acc", "db_extra", - "src_user=$fU;src_domain=$fd;src_ip=$si;" - "dst_ouser=$tU;dst_user=$rU;dst_domain=$rd") - - # ----- dialog params ----- modparam("dialog", "dlg_flag", FLT_DIALOG) modparam("dialog", "send_bye", 1) @@ -119,6 +95,8 @@ modparam("htable", "htable", "cgrconn=>size=1;") ####### Routing Logic ######## +include_file "kamailio-cgrates.cfg" + event_route[htable:mod-init] { $sht(users=>1001) = "CGRateS.org"; $sht(users=>1002) = "CGRateS.org"; @@ -128,102 +106,6 @@ event_route[htable:mod-init] { $sht(users=>1007) = "CGRateS.org"; } -event_route[evapi:connection-new] { - $sht(cgrconn=>cgr) = $evapi(srcaddr) + ":" + $evapi(srcport); # Detect presence of at least one connection -} - -event_route[evapi:connection-closed] { - $var(connClosed) = $evapi(srcaddr) + ":" + $evapi(srcport); - if $sht(cgrconn=>cgr) == $var(connClosed) { - $sht(cgrconn=>cgr) = $null; - } -} - -event_route[evapi:message-received] { - json_get_field("$evapi(msg)", "Event", "$var(Event)"); - route($(var(Event){s.rm,"})); # String characters are kept by json_get_field, remove them here -} - -event_route[dialog:start] { - route(CGR_CALL_START); -} - -event_route[dialog:end] { - route(CGR_CALL_END); -} - -route[CGR_AUTH_REQUEST] { - # Auth INVITEs with CGRateS - if $sht(cgrconn=>cgr) == $null { - sl_send_reply("503","Charging controller unreachable"); - exit; - } - $dlg_var(cgrReqType) = "postpaid"; - $dlg_var(cgrAccount) = $fU; - $dlg_var(cgrDestination) = $rU; - evapi_async_relay("{\"event\":\"CGR_AUTH_REQUEST\", - \"tr_index\":\"$T(id_index)\", - \"tr_label\":\"$T(id_label)\", - \"cgr_reqtype\":\"$dlg_var(cgrReqType)\", - \"cgr_account\":\"$dlg_var(cgrAccount)\", - \"cgr_destination\":\"$dlg_var(cgrDestination)\"}"); - exit; -} - -route[CGR_AUTH_REPLY] { - json_get_field("$evapi(msg)", "TransactionIndex", "$var(TransactionIndex)"); - json_get_field("$evapi(msg)", "TransactionLabel", "$var(TransactionLabel)"); - json_get_field("$evapi(msg)", "MaxSessionTime", "$var(MaxSessionTime)"); - json_get_field("$evapi(msg)", "AuthError", "$var(AuthError)"); - $var(id_index) = $(var(TransactionIndex){s.int}); - $var(id_label) = $(var(TransactionLabel){s.int}); - t_continue("$var(id_index)", "$var(id_label)", "CGR_DIALOG_TIMEOUT"); -} - -route[CGR_DIALOG_TIMEOUT] { - if $var(AuthError) != "null" { # null is converted in string by json_get_field - xlog("CGR_AUTH_ERROR: $var(AuthError)"); - sl_send_reply("503","CGR_AUTH_ERROR"); - exit; - } - if $var(MaxSessionTime) != -1 && !dlg_set_timeout("$var(MaxSessionTime)") { - sl_send_reply("503","CGR_MAX_SESSION_TIME_ERROR"); - exit; - } - route(RELAY); -} - -route[CGR_CALL_START] { - if $sht(cgrconn=>cgr) == $null { - sl_send_reply("503","Charging controller unreachable"); - exit; - } - evapi_async_relay("{\"event\":\"CGR_CALL_START\", - \"callid\":\"$dlg(callid)\", - \"from_tag\":\"$dlg(from_tag)\", - \"cgr_reqtype\":\"$dlg_var(cgrReqType)\", - \"cgr_account\":\"$dlg_var(cgrAccount)\", - \"cgr_destination\":\"$dlg_var(cgrDestination)\"}"); - exit; -} - -route[CGR_CALL_END] { - if $sht(cgrconn=>cgr) == $null { - sl_send_reply("503","Charging controller unreachable"); - exit; - } - $var(callDur) = $TS - $dlg(start_ts); - evapi_async_relay("{\"event\":\"CGR_CALL_END\", - \"callid\":\"$dlg(callid)\", - \"from_tag\":\"$dlg(from_tag)\", - \"cgr_reqtype\":\"$dlg_var(cgrReqType)\", - \"cgr_account\":\"$dlg_var(cgrAccount)\", - \"cgr_destination\":\"$dlg_var(cgrDestination)\", - \"cgr_answertime\":\"$dlg(start_ts)\", - \"cgr_duration\":\"$var(callDur)\"}"); - - exit; -} # Main SIP request routing logic request_route { @@ -263,11 +145,6 @@ request_route { if (is_method("INVITE|SUBSCRIBE")) record_route(); - # account only INVITEs - if (is_method("INVITE")) { - setflag(FLT_ACC); # do accounting - } - # Not handling requests towards external domains if uri != myself { sl_send_reply("604", "Only local destinations accepted"); @@ -341,11 +218,7 @@ route[WITHINDLG] { # take the path determined by record-routing if (loose_route()) { route(DLGURI); - if (is_method("BYE")) { - setflag(FLT_ACC); # do accounting ... - setflag(FLT_ACCFAILED); # ... even if the transaction fails - } - else if ( is_method("ACK") ) { + if ( is_method("ACK") ) { # ACK is forwarded statelessy route(NATMANAGE); } @@ -400,10 +273,6 @@ route[LOCATION] { exit; } } - # when routing via usrloc, log the missed calls also - if (is_method("INVITE")) { - setflag(FLT_ACCMISSED); - } } # user uthentication diff --git a/engine/responder.go b/engine/responder.go index 5089c741d..efdd0af55 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -33,6 +33,13 @@ import ( "github.com/cgrates/rpcclient" ) +// Individual session run +type SessionRun struct { + DerivedCharger *utils.DerivedCharger // Needed in reply + CallDescriptor *CallDescriptor + CallCosts []*CallCost +} + type Responder struct { Bal *balancer2go.Balancer ExitChan chan bool @@ -176,6 +183,41 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev utils.Event, reply *float64) er return nil } +// Used by SM to get all the prepaid CallDescriptors attached to a session +func (rs *Responder) GetSessionRuns(ev utils.Event, sRuns *[]*SessionRun) error { + if rs.Bal != nil { + return errors.New("Unsupported method on the balancer") + } + attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), + Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} + var dcs utils.DerivedChargers + if err := rs.GetDerivedChargers(attrsDC, &dcs); err != nil { + return err + } + dcs, _ = dcs.AppendDefaultRun() + sesRuns := make([]*SessionRun, 0) + for _, dc := range dcs { + if ev.GetReqType(dc.ReqTypeField) != utils.PREPAID { + continue // We only consider prepaid sessions + } + startTime, err := ev.GetAnswerTime(dc.AnswerTimeField) + if err != nil { + return errors.New("Error parsing answer event start time") + } + cd := &CallDescriptor{ + Direction: ev.GetDirection(dc.DirectionField), + Tenant: ev.GetTenant(dc.TenantField), + Category: ev.GetCategory(dc.CategoryField), + Subject: ev.GetSubject(dc.SubjectField), + Account: ev.GetAccount(dc.AccountField), + Destination: ev.GetDestination(dc.DestinationField), + TimeStart: startTime} + sesRuns = append(sesRuns, &SessionRun{DerivedCharger: dc, CallDescriptor: cd}) + } + *sRuns = sesRuns + return nil +} + func (rs *Responder) GetDerivedChargers(attrs utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error { // ToDo: Make it work with balancer if needed @@ -355,6 +397,7 @@ type Connector interface { GetMaxSessionTime(CallDescriptor, *float64) error GetDerivedChargers(utils.AttrDerivedChargers, *utils.DerivedChargers) error GetDerivedMaxSessionTime(utils.Event, *float64) error + GetSessionRuns(utils.Event, *[]*SessionRun) error ProcessCdr(*utils.StoredCdr, *string) error } @@ -386,6 +429,10 @@ func (rcc *RPCClientConnector) GetDerivedMaxSessionTime(ev utils.Event, reply *f return rcc.Client.Call("Responder.GetDerivedMaxSessionTime", ev, reply) } +func (rcc *RPCClientConnector) GetSessionRuns(ev utils.Event, sRuns *[]*SessionRun) error { + return rcc.Client.Call("Responder.GetSessionRuns", ev, sRuns) +} + func (rcc *RPCClientConnector) GetDerivedChargers(attrs utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error { return rcc.Client.Call("ApierV1.GetDerivedChargers", attrs, dcs) } diff --git a/engine/responder_test.go b/engine/responder_test.go index 3ff59093a..e73b3b132 100644 --- a/engine/responder_test.go +++ b/engine/responder_test.go @@ -85,5 +85,41 @@ func TestGetDerivedMaxSessionTime(t *testing.T) { } else if maxSessionTime != 9.9e+10 { // Smallest one t.Error("Unexpected maxSessionTime received: ", maxSessionTime) } - +} + +func TestGetSessionRuns(t *testing.T) { + config.CgrConfig().CombinedDerivedChargers = false + testTenant := "vdf" + cdr := &utils.StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", + CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "prepaid", Direction: "*out", Tenant: testTenant, Category: "call", Account: "dan2", Subject: "dan2", + Destination: "1002", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + MediationRunId: utils.DEFAULT_RUNID, Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan"} + keyCharger1 := utils.ConcatenatedKey("*out", testTenant, "call", "dan2", "dan2") + dfDC := &utils.DerivedCharger{RunId: utils.DEFAULT_RUNID, ReqTypeField: "*default", DirectionField: "*default", TenantField: "*default", CategoryField: "*default", + AccountField: "*default", SubjectField: "*default", DestinationField: "*default", SetupTimeField: "*default", AnswerTimeField: "*default", UsageField: "*default"} + extra1DC := &utils.DerivedCharger{RunId: "extra1", ReqTypeField: "^prepaid", DirectionField: "*default", TenantField: "*default", CategoryField: "^0", + AccountField: "^minitsboy", SubjectField: "^rif", DestinationField: "^0256", SetupTimeField: "*default", AnswerTimeField: "*default", UsageField: "*default"} + extra2DC := &utils.DerivedCharger{RunId: "extra2", ReqTypeField: "*default", DirectionField: "*default", TenantField: "*default", CategoryField: "*default", + AccountField: "^ivo", SubjectField: "^ivo", DestinationField: "*default", SetupTimeField: "*default", AnswerTimeField: "*default", UsageField: "*default"} + extra3DC := &utils.DerivedCharger{RunId: "extra3", ReqTypeField: "^pseudoprepaid", DirectionField: "*default", TenantField: "*default", CategoryField: "^0", + AccountField: "^minu", SubjectField: "^rif", DestinationField: "^0256", SetupTimeField: "*default", AnswerTimeField: "*default", UsageField: "*default"} + charger1 := utils.DerivedChargers{extra1DC, extra2DC, extra3DC} + if err := accountingStorage.SetDerivedChargers(keyCharger1, charger1); err != nil { + t.Error("Error on setting DerivedChargers", err.Error()) + } + accountingStorage.CacheAccounting(nil, nil, nil, nil) + sesRuns := make([]*SessionRun, 0) + eSRuns := []*SessionRun{ + &SessionRun{DerivedCharger: extra1DC, + CallDescriptor: &CallDescriptor{Direction: "*out", Category: "0", Tenant: "vdf", Subject: "rif", Account: "minitsboy", Destination: "0256", TimeStart: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC)}}, + &SessionRun{DerivedCharger: extra2DC, + CallDescriptor: &CallDescriptor{Direction: "*out", Category: "call", Tenant: "vdf", Subject: "ivo", Account: "ivo", Destination: "1002", TimeStart: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC)}}, + &SessionRun{DerivedCharger: dfDC, + CallDescriptor: &CallDescriptor{Direction: "*out", Category: "call", Tenant: "vdf", Subject: "dan2", Account: "dan2", Destination: "1002", TimeStart: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC)}}} + if err := rsponder.GetSessionRuns(cdr.AsEvent(""), &sesRuns); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eSRuns, sesRuns) { + t.Errorf("Received: %+v", sesRuns) + } } diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index 752a2f4e9..7e2311895 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -159,7 +159,7 @@ func (fsev FSEvent) GetReqType(fieldName string) string { } return utils.FirstNonEmpty(fsev[fieldName], fsev[REQTYPE], config.CgrConfig().DefaultReqType) } -func (fsev FSEvent) MissingParameter(eventName string) bool { +func (fsev FSEvent) MissingParameter() bool { return strings.TrimSpace(fsev.GetDirection(utils.META_DEFAULT)) == "" || strings.TrimSpace(fsev.GetSubject(utils.META_DEFAULT)) == "" || strings.TrimSpace(fsev.GetAccount(utils.META_DEFAULT)) == "" || diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 9ca84ad0c..f7b4da01a 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -190,7 +190,7 @@ func (sm *FSSessionManager) OnChannelPark(ev utils.Event) { engine.Logger.Err("Error parsing answer event start time, using time.Now!") startTime = time.Now() } - if ev.MissingParameter(utils.META_DEFAULT) { + if ev.MissingParameter() { sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(dc.DestinationField), MISSING_PARAMETER) engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID())) return @@ -231,22 +231,10 @@ func (sm *FSSessionManager) OnChannelPark(ev utils.Event) { } func (sm *FSSessionManager) OnChannelAnswer(ev utils.Event) { - if ev.MissingParameter(utils.META_DEFAULT) { + if ev.MissingParameter() { sm.DisconnectSession(ev.GetUUID(), MISSING_PARAMETER, "") } - if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_reqtype %s\n\n", ev.GetUUID(), ev.GetReqType(""))); err != nil { - engine.Logger.Err(fmt.Sprintf("Error on attempting to overwrite cgr_type in chan variables: %v", err)) - } - attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), - Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} - var dcs utils.DerivedChargers - if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil { - engine.Logger.Err(fmt.Sprintf(" OnAnswer: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error())) - sm.DisconnectSession(ev.GetUUID(), SYSTEM_ERROR, "") // Disconnect the session since we are not able to process sessions - return - } - dcs, _ = dcs.AppendDefaultRun() - s := NewSession(ev, sm, dcs) + s := NewSession(ev, sm) if s != nil { sm.sessions = append(sm.sessions, s) } @@ -257,91 +245,10 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev utils.Event) { s := sm.GetSession(ev.GetUUID()) if s == nil { // Not handled by us return - } else { - sm.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency } - defer s.Close(ev) // Stop loop and save the costs deducted so far to database - attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), - Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} - var dcs utils.DerivedChargers - if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil { - engine.Logger.Err(fmt.Sprintf(" OnHangup: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error())) - return - } - dcs, _ = dcs.AppendDefaultRun() - for _, dc := range dcs { - if ev.GetReqType(dc.ReqTypeField) != utils.PREPAID { - continue - } - sr := s.GetSessionRun(dc.RunId) - if sr == nil { - continue // Did not save a sessionRun for this dc - } - if len(sr.callCosts) == 0 { - continue // why would we have 0 callcosts - } - lastCC := sr.callCosts[len(sr.callCosts)-1] - lastCC.Timespans.Decompress() - // put credit back - startTime, err := ev.GetAnswerTime(dc.AnswerTimeField) - if err != nil { - engine.Logger.Crit("Error parsing prepaid call start time from event") - return - } - duration, err := ev.GetDuration(dc.UsageField) - if err != nil { - engine.Logger.Crit(fmt.Sprintf("Error parsing call duration from event %s", err.Error())) - return - } - hangupTime := startTime.Add(duration) - end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd - refundDuration := end.Sub(hangupTime) - var refundIncrements engine.Increments - for i := len(lastCC.Timespans) - 1; i >= 0; i-- { - ts := lastCC.Timespans[i] - tsDuration := ts.GetDuration() - if refundDuration <= tsDuration { - lastRefundedIncrementIndex := 0 - for j := len(ts.Increments) - 1; j >= 0; j-- { - increment := ts.Increments[j] - if increment.Duration <= refundDuration { - refundIncrements = append(refundIncrements, increment) - refundDuration -= increment.Duration - lastRefundedIncrementIndex = j - } - } - ts.SplitByIncrement(lastRefundedIncrementIndex) - break // do not go to other timespans - } else { - refundIncrements = append(refundIncrements, ts.Increments...) - // remove the timespan entirely - lastCC.Timespans[i] = nil - lastCC.Timespans = lastCC.Timespans[:i] - // continue to the next timespan with what is left to refund - refundDuration -= tsDuration - } - } - // show only what was actualy refunded (stopped in timespan) - // engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) - if len(refundIncrements) > 0 { - cd := &engine.CallDescriptor{ - Direction: lastCC.Direction, - Tenant: lastCC.Tenant, - Category: lastCC.Category, - Subject: lastCC.Subject, - Account: lastCC.Account, - Destination: lastCC.Destination, - Increments: refundIncrements, - } - var response float64 - err := sm.rater.RefundIncrements(*cd, &response) - if err != nil { - engine.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err)) - } - } - cost := refundIncrements.GetTotalCost() - lastCC.Cost -= cost - lastCC.Timespans.Compress() + sm.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency + if err := s.Close(ev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database + engine.Logger.Err(err.Error()) } } @@ -368,6 +275,10 @@ func (sm *FSSessionManager) GetDbLogger() engine.LogStorage { return sm.loggerDB } +func (sm *FSSessionManager) Rater() engine.Connector { + return sm.rater +} + func (sm *FSSessionManager) Shutdown() (err error) { if fsock.FS == nil || !fsock.FS.Connected() { return errors.New("Cannot shutdown sessions, fsock not connected") diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index 47e20603e..40eb63d3c 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -27,19 +27,23 @@ import ( "github.com/cgrates/kamevapi" "log/syslog" "regexp" + "strings" "time" ) -func NewKamailioSessionManager(cfg *config.CGRConfig, rater, cdrsrv engine.Connector) (*KamailioSessionManager, error) { - ksm := &KamailioSessionManager{cgrCfg: cfg, rater: rater, cdrsrv: cdrsrv} +func NewKamailioSessionManager(cfg *config.CGRConfig, rater, cdrsrv engine.Connector, loggerDb engine.LogStorage, debitInterval time.Duration) (*KamailioSessionManager, error) { + ksm := &KamailioSessionManager{cgrCfg: cfg, rater: rater, cdrsrv: cdrsrv, loggerDb: loggerDb, debitInterval: debitInterval} return ksm, nil } type KamailioSessionManager struct { - cgrCfg *config.CGRConfig - rater engine.Connector - cdrsrv engine.Connector - kea *kamevapi.KamEvapi + cgrCfg *config.CGRConfig + rater engine.Connector + cdrsrv engine.Connector + loggerDb engine.LogStorage + debitInterval time.Duration + kea *kamevapi.KamEvapi + sessions []*Session } func (self *KamailioSessionManager) onCgrAuth(evData []byte) { @@ -47,6 +51,14 @@ func (self *KamailioSessionManager) onCgrAuth(evData []byte) { if err != nil { engine.Logger.Info(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error())) } + if kev.MissingParameter() { + if kar, err := kev.AsKamAuthReply(0.0, errors.New(utils.ERR_MANDATORY_IE_MISSING)); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed building auth reply %s", err.Error())) + } else if err = self.kea.Send(kar.String()); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed sending auth reply %s", err.Error())) + } + return + } var remainingDuration float64 if err = self.rater.GetDerivedMaxSessionTime(kev.AsEvent(""), &remainingDuration); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not get max session time for %s, error: %s", kev.GetUUID(), err.Error())) @@ -59,18 +71,37 @@ func (self *KamailioSessionManager) onCgrAuth(evData []byte) { } func (self *KamailioSessionManager) onCallStart(evData []byte) { - _, err := NewKamEvent(evData) + kamEv, err := NewKamEvent(evData) if err != nil { - engine.Logger.Info(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error())) + engine.Logger.Err(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error())) + } + if kamEv.MissingParameter() { + self.DisconnectSession(fmt.Sprintf("%s,%s", kamEv[HASH_ENTRY], kamEv[HASH_ID]), utils.ERR_MANDATORY_IE_MISSING, "") + return + } + s := NewSession(kamEv, self) + if s != nil { + self.sessions = append(self.sessions, s) } } func (self *KamailioSessionManager) onCallEnd(evData []byte) { kev, err := NewKamEvent(evData) if err != nil { - engine.Logger.Info(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error())) + engine.Logger.Err(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error())) + } + if kev.MissingParameter() { + engine.Logger.Err(fmt.Sprintf(" Mandatory IE missing out of event: %+v", kev)) + } + go self.ProcessCdr(kev.AsStoredCdr()) + s := self.GetSession(kev.GetUUID()) + if s == nil { // Not handled by us + return + } + self.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency + if err := s.Close(kev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database + engine.Logger.Err(err.Error()) } - go self.ProcessCdr(kev) } func (self *KamailioSessionManager) Connect() error { @@ -90,29 +121,52 @@ func (self *KamailioSessionManager) Connect() error { } func (self *KamailioSessionManager) DisconnectSession(uuid, notify, destnr string) { + hashSplt := strings.Split(uuid, ",") + disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: hashSplt[0], HashId: hashSplt[1], Reason: notify} + if err := self.kea.Send(disconnectEv.String()); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed sending disconnect request %s", err.Error())) + } return } func (self *KamailioSessionManager) RemoveSession(uuid string) { - return + for i, ss := range self.sessions { + if ss.uuid == uuid { + self.sessions = append(self.sessions[:i], self.sessions[i+1:]...) + return + } + } +} + +// Searches and return the session with the specifed uuid +func (self *KamailioSessionManager) GetSession(uuid string) *Session { + for _, s := range self.sessions { + if s.uuid == uuid { + return s + } + } + return nil } func (self *KamailioSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error { - return nil + return self.rater.MaxDebit(*cd, cc) } + func (self *KamailioSessionManager) GetDebitPeriod() time.Duration { - var nilDuration time.Duration - return nilDuration + return self.debitInterval } func (self *KamailioSessionManager) GetDbLogger() engine.LogStorage { - return nil + return self.loggerDb } -func (self *KamailioSessionManager) ProcessCdr(ev utils.Event) { +func (self *KamailioSessionManager) Rater() engine.Connector { + return self.rater +} + +func (self *KamailioSessionManager) ProcessCdr(cdr *utils.StoredCdr) { if self.cdrsrv == nil { return } - storedCdr := ev.AsStoredCdr() var reply string - if err := self.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error())) + if err := self.cdrsrv.ProcessCdr(cdr, &reply); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CgrId, cdr.AccId, err.Error())) } } func (self *KamailioSessionManager) Shutdown() error { diff --git a/sessionmanager/kamevent.go b/sessionmanager/kamevent.go index 16a93983f..ee856aebd 100644 --- a/sessionmanager/kamevent.go +++ b/sessionmanager/kamevent.go @@ -30,21 +30,25 @@ import ( ) const ( - EVENT = "event" - CGR_AUTH_REPLY = "CGR_AUTH_REPLY" - CGR_SETUPTIME = "cgr_setuptime" - CGR_ANSWERTIME = "cgr_answertime" - CGR_STOPTIME = "cgr_stoptime" - CGR_DURATION = "cgr_duration" - KAM_TR_INDEX = "tr_index" - KAM_TR_LABEL = "tr_label" + EVENT = "event" + CGR_AUTH_REQUEST = "CGR_AUTH_REQUEST" + CGR_AUTH_REPLY = "CGR_AUTH_REPLY" + CGR_SESSION_DISCONNECT = "CGR_SESSION_DISCONNECT" + CGR_CALL_START = "CGR_CALL_START" + CGR_CALL_END = "CGR_CALL_END" + CGR_SETUPTIME = "cgr_setuptime" + CGR_ANSWERTIME = "cgr_answertime" + CGR_STOPTIME = "cgr_stoptime" + CGR_DURATION = "cgr_duration" + KAM_TR_INDEX = "tr_index" + KAM_TR_LABEL = "tr_label" + HASH_ENTRY = "h_entry" + HASH_ID = "h_id" ) -var primaryFields = []string{EVENT, CALLID, FROM_TAG, TO_TAG, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION, +var primaryFields = []string{EVENT, CALLID, FROM_TAG, HASH_ENTRY, HASH_ID, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION, CGR_CATEGORY, CGR_TENANT, CGR_REQTYPE, CGR_ANSWERTIME, CGR_SETUPTIME, CGR_STOPTIME, CGR_DURATION} -var mandatoryAuth = []string{EVENT, CALLID, FROM_TAG, CGR_ACCOUNT, CGR_DESTINATION, CGR_SETUPTIME} - type KamAuthReply struct { Event string // Kamailio will use this to differentiate between requests and replies TransactionIndex int // Original transaction index @@ -58,6 +62,18 @@ func (self *KamAuthReply) String() string { return string(mrsh) } +type KamSessionDisconnect struct { + Event string + HashEntry string + HashId string + Reason string +} + +func (self *KamSessionDisconnect) String() string { + mrsh, _ := json.Marshal(self) + return string(mrsh) +} + func NewKamEvent(kamEvData []byte) (KamEvent, error) { kev := make(map[string]string) if err := json.Unmarshal(kamEvData, &kev); err != nil { @@ -167,14 +183,29 @@ func (kev KamEvent) GetExtraFields() map[string]string { func (kev KamEvent) GetCdrSource() string { return "KAMAILIO_" + kev.GetName() } -func (kev KamEvent) MissingParameter(eventName string) bool { - switch eventName { - case utils.CGR_AUTHORIZE: +func (kev KamEvent) MissingParameter() bool { + var nullTime time.Time + switch kev.GetName() { + case CGR_AUTH_REQUEST: + if setupTime, err := kev.GetSetupTime(utils.META_DEFAULT); err != nil || setupTime == nullTime { + return true + } + return len(kev.GetAccount(utils.META_DEFAULT)) == 0 || + len(kev.GetDestination(utils.META_DEFAULT)) == 0 || + len(kev[KAM_TR_INDEX]) == 0 || len(kev[KAM_TR_LABEL]) == 0 + case CGR_CALL_START: + if aTime, err := kev.GetAnswerTime(utils.META_DEFAULT); err != nil || aTime == nullTime { + return true + } return len(kev.GetUUID()) == 0 || - len(kev.GetCategory(utils.META_DEFAULT)) == 0 || - len(kev.GetTenant(utils.META_DEFAULT)) == 0 || len(kev.GetAccount(utils.META_DEFAULT)) == 0 || - len(kev.GetDestination(utils.META_DEFAULT)) == 0 + len(kev.GetDestination(utils.META_DEFAULT)) == 0 || + len(kev[HASH_ENTRY]) == 0 || len(kev[HASH_ID]) == 0 + case CGR_CALL_END: + return len(kev.GetUUID()) == 0 || + len(kev.GetAccount(utils.META_DEFAULT)) == 0 || + len(kev.GetDestination(utils.META_DEFAULT)) == 0 || + len(kev[CGR_DURATION]) == 0 default: return true } diff --git a/sessionmanager/kamevent_test.go b/sessionmanager/kamevent_test.go index 9f279baa9..812d02d6b 100644 --- a/sessionmanager/kamevent_test.go +++ b/sessionmanager/kamevent_test.go @@ -60,3 +60,30 @@ func TestKevAsKamAuthReply(t *testing.T) { t.Error("Received KAR: ", rcvKar) } } + +func TestKevMissingParameter(t *testing.T) { + kamEv := KamEvent{"event": "CGR_AUTH_REQUEST", "tr_index": "36045", "tr_label": "612369399", "cgr_reqtype": "postpaid", + "cgr_account": "1001", "cgr_destination": "1002"} + if !kamEv.MissingParameter() { + t.Error("Failed detecting missing parameters") + } + kamEv["cgr_setuptime"] = "1419962256" + if kamEv.MissingParameter() { + t.Error("False detecting missing parameters") + } + kamEv = KamEvent{"event": "UNKNOWN"} + if !kamEv.MissingParameter() { + t.Error("Failed detecting missing parameters") + } + kamEv = KamEvent{"event": "CGR_CALL_START", "callid": "9d28ec3ee068babdfe036623f42c0969@0:0:0:0:0:0:0:0", "from_tag": "3131b566", + "cgr_reqtype": "postpaid", "cgr_account": "1001", "cgr_destination": "1002"} + if !kamEv.MissingParameter() { + t.Error("Failed detecting missing parameters") + } + kamEv["h_entry"] = "463" + kamEv["h_id"] = "2605" + kamEv["cgr_answertime"] = "1419964961" + if kamEv.MissingParameter() { + t.Error("False detecting missing parameters") + } +} diff --git a/sessionmanager/osipsevent.go b/sessionmanager/osipsevent.go index 404d3955c..322244856 100644 --- a/sessionmanager/osipsevent.go +++ b/sessionmanager/osipsevent.go @@ -172,7 +172,7 @@ func (osipsEv *OsipsEvent) GetOriginatorIP(fieldName string) string { } return osipsEv.osipsEvent.OriginatorAddress.IP.String() } -func (osipsev *OsipsEvent) MissingParameter(eventName string) bool { +func (osipsev *OsipsEvent) MissingParameter() bool { return len(osipsev.GetUUID()) == 0 || len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 || len(osipsev.GetSubject(utils.META_DEFAULT)) == 0 || diff --git a/sessionmanager/osipsevent_test.go b/sessionmanager/osipsevent_test.go index a60450fc5..a6a5063e9 100644 --- a/sessionmanager/osipsevent_test.go +++ b/sessionmanager/osipsevent_test.go @@ -113,14 +113,14 @@ func TestOsipsEventGetValues(t *testing.T) { } func TestOsipsEventMissingParameter(t *testing.T) { - if osipsEv.MissingParameter(utils.META_DEFAULT) { + if osipsEv.MissingParameter() { t.Errorf("Wrongly detected missing parameter: %+v", osipsEv) } osipsEv2 := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_CDR", AttrValues: map[string]string{"to_tag": "4ea9687f", "cgr_account": "dan", "setuptime": "7", "created": "1406370492", "method": "INVITE", "callid": "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ", "sip_reason": "OK", "time": "1406370499", "cgr_reqtype": "prepaid", "cgr_subject": "dan", "cgr_tenant": "itsyscom.com", "sip_code": "200", "duration": "20", "from_tag": "eb082607"}}} - if !osipsEv2.MissingParameter(utils.META_DEFAULT) { + if !osipsEv2.MissingParameter() { t.Error("Failed to detect missing parameter.") } } diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index 2d6f741c5..9aed6428d 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -85,6 +85,9 @@ func (osm *OsipsSessionManager) GetDebitPeriod() time.Duration { func (osm *OsipsSessionManager) GetDbLogger() engine.LogStorage { return nil } +func (self *OsipsSessionManager) Rater() engine.Connector { + return self.rater +} func (osm *OsipsSessionManager) Shutdown() error { return nil } @@ -152,7 +155,7 @@ func (osm *OsipsSessionManager) OnCdr(cdrDagram *osipsdagram.OsipsEvent) { // Process Authorize request from OpenSIPS and communicate back maxdur func (osm *OsipsSessionManager) OnAuthorize(osipsDagram *osipsdagram.OsipsEvent) { ev, _ := NewOsipsEvent(osipsDagram) - if ev.MissingParameter(utils.META_DEFAULT) { + if ev.MissingParameter() { cmdNotify := fmt.Sprintf(":cache_store:\nlocal\n%s/cgr_notify\n%s\n2\n\n", ev.GetUUID(), utils.ERR_MANDATORY_IE_MISSING) if reply, err := osm.miConn.SendCommand([]byte(cmdNotify)); err != nil || !bytes.HasPrefix(reply, []byte("200 OK")) { engine.Logger.Err(fmt.Sprintf("Failed setting cgr_notify variable for accid: %s, err: %v, reply: %s", ev.GetUUID(), err, string(reply))) diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 0b0a71575..351d8221a 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -35,65 +35,42 @@ type Session struct { uuid string stopDebit chan bool sessionManager SessionManager - sessionRuns []*SessionRun + sessionRuns []*engine.SessionRun } -func (s *Session) GetSessionRun(runid string) *SessionRun { +func (s *Session) GetSessionRun(runid string) *engine.SessionRun { for _, sr := range s.sessionRuns { - if sr.runId == runid { + if sr.DerivedCharger.RunId == runid { return sr } } return nil } -// One individual run -type SessionRun struct { - runId string - callDescriptor *engine.CallDescriptor - callCosts []*engine.CallCost +func (s *Session) SessionRuns() []*engine.SessionRun { + return s.sessionRuns } // Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually -func NewSession(ev utils.Event, sm SessionManager, dcs utils.DerivedChargers) *Session { +func NewSession(ev utils.Event, sm SessionManager) *Session { s := &Session{cgrid: ev.GetCgrId(), uuid: ev.GetUUID(), stopDebit: make(chan bool), sessionManager: sm, } - for _, dc := range dcs { - if ev.GetReqType(dc.ReqTypeField) != utils.PREPAID { - continue // We only consider prepaid sessions - } - startTime, err := ev.GetAnswerTime(dc.AnswerTimeField) - if err != nil { - engine.Logger.Err("Error parsing answer event start time, using time.Now!") - return nil - } - cd := &engine.CallDescriptor{ - Direction: ev.GetDirection(dc.DirectionField), - Tenant: ev.GetTenant(dc.TenantField), - Category: ev.GetCategory(dc.CategoryField), - Subject: ev.GetSubject(dc.SubjectField), - Account: ev.GetAccount(dc.AccountField), - Destination: ev.GetDestination(dc.DestinationField), - TimeStart: startTime} - sr := &SessionRun{ - runId: dc.RunId, - callDescriptor: cd, - } - s.sessionRuns = append(s.sessionRuns, sr) - go s.debitLoop(len(s.sessionRuns) - 1) // Send index of the just appended sessionRun - } - if len(s.sessionRuns) == 0 { + sRuns := make([]*engine.SessionRun, 0) + if err := sm.Rater().GetSessionRuns(ev, &sRuns); err != nil || len(sRuns) == 0 { return nil } + for runIdx := range sRuns { + go s.debitLoop(runIdx) // Send index of the just appended sessionRun + } return s } // the debit loop method (to be stoped by sending somenthing on stopDebit channel) func (s *Session) debitLoop(runIdx int) { - nextCd := *s.sessionRuns[runIdx].callDescriptor + nextCd := *s.sessionRuns[runIdx].CallDescriptor index := 0.0 debitPeriod := s.sessionManager.GetDebitPeriod() for { @@ -123,7 +100,7 @@ func (s *Session) debitLoop(runIdx int) { engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch: %s", err.Error())) } } - s.sessionRuns[runIdx].callCosts = append(s.sessionRuns[runIdx].callCosts, cc) + s.sessionRuns[runIdx].CallCosts = append(s.sessionRuns[runIdx].CallCosts, cc) nextCd.TimeEnd = cc.GetEndTime() // set debited timeEnd // update call duration with real debited duration nextCd.DurationIndex -= debitPeriod @@ -134,19 +111,84 @@ func (s *Session) debitLoop(runIdx int) { } // Stops the debit loop -func (s *Session) Close(ev utils.Event) { - // engine.Logger.Debug(fmt.Sprintf("Stopping debit for %s", s.uuid)) - if s == nil { - return - } +func (s *Session) Close(ev utils.Event) error { close(s.stopDebit) // Close the channel so all the sessionRuns listening will be notified if _, err := ev.GetEndTime(); err != nil { engine.Logger.Err("Error parsing answer event stop time.") for idx := range s.sessionRuns { - s.sessionRuns[idx].callDescriptor.TimeEnd = s.sessionRuns[idx].callDescriptor.TimeStart.Add(s.sessionRuns[idx].callDescriptor.DurationIndex) + s.sessionRuns[idx].CallDescriptor.TimeEnd = s.sessionRuns[idx].CallDescriptor.TimeStart.Add(s.sessionRuns[idx].CallDescriptor.DurationIndex) } } - s.SaveOperations() + // Costs refunds + for _, sr := range s.SessionRuns() { + if len(sr.CallCosts) == 0 { + continue // why would we have 0 callcosts + } + lastCC := sr.CallCosts[len(sr.CallCosts)-1] + lastCC.Timespans.Decompress() + // put credit back + startTime, err := ev.GetAnswerTime(sr.DerivedCharger.AnswerTimeField) + if err != nil { + engine.Logger.Crit("Error parsing prepaid call start time from event") + return err + } + duration, err := ev.GetDuration(sr.DerivedCharger.UsageField) + if err != nil { + engine.Logger.Crit(fmt.Sprintf("Error parsing call duration from event %s", err.Error())) + return err + } + hangupTime := startTime.Add(duration) + end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd + refundDuration := end.Sub(hangupTime) + var refundIncrements engine.Increments + for i := len(lastCC.Timespans) - 1; i >= 0; i-- { + ts := lastCC.Timespans[i] + tsDuration := ts.GetDuration() + if refundDuration <= tsDuration { + lastRefundedIncrementIndex := 0 + for j := len(ts.Increments) - 1; j >= 0; j-- { + increment := ts.Increments[j] + if increment.Duration <= refundDuration { + refundIncrements = append(refundIncrements, increment) + refundDuration -= increment.Duration + lastRefundedIncrementIndex = j + } + } + ts.SplitByIncrement(lastRefundedIncrementIndex) + break // do not go to other timespans + } else { + refundIncrements = append(refundIncrements, ts.Increments...) + // remove the timespan entirely + lastCC.Timespans[i] = nil + lastCC.Timespans = lastCC.Timespans[:i] + // continue to the next timespan with what is left to refund + refundDuration -= tsDuration + } + } + // show only what was actualy refunded (stopped in timespan) + // engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) + if len(refundIncrements) > 0 { + cd := &engine.CallDescriptor{ + Direction: lastCC.Direction, + Tenant: lastCC.Tenant, + Category: lastCC.Category, + Subject: lastCC.Subject, + Account: lastCC.Account, + Destination: lastCC.Destination, + Increments: refundIncrements, + } + var response float64 + err := s.sessionManager.Rater().RefundIncrements(*cd, &response) + if err != nil { + return err + } + } + cost := refundIncrements.GetTotalCost() + lastCC.Cost -= cost + lastCC.Timespans.Compress() + } + go s.SaveOperations() + return nil } // Nice print for session @@ -157,22 +199,17 @@ func (s *Session) String() string { // Saves call_costs for each session run func (s *Session) SaveOperations() { - if s == nil { - return - } - go func() { - for _, sr := range s.sessionRuns { - if len(sr.callCosts) == 0 { - break // There are no costs to save, ignore the operation - } - firstCC := sr.callCosts[0] - for _, cc := range sr.callCosts[1:] { - firstCC.Merge(cc) - } - if s.sessionManager.GetDbLogger() == nil { - engine.Logger.Err(" Error: no connection to logger database, cannot save costs") - } - s.sessionManager.GetDbLogger().LogCallCost(s.cgrid, engine.SESSION_MANAGER_SOURCE, sr.runId, firstCC) + for _, sr := range s.sessionRuns { + if len(sr.CallCosts) == 0 { + break // There are no costs to save, ignore the operation } - }() + firstCC := sr.CallCosts[0] + for _, cc := range sr.CallCosts[1:] { + firstCC.Merge(cc) + } + if s.sessionManager.GetDbLogger() == nil { + engine.Logger.Err(" Error: no connection to logger database, cannot save costs") + } + s.sessionManager.GetDbLogger().LogCallCost(s.cgrid, engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC) + } } diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index c05dab8d2..b526e6954 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -31,5 +31,6 @@ type SessionManager interface { MaxDebit(*engine.CallDescriptor, *engine.CallCost) error GetDebitPeriod() time.Duration GetDbLogger() engine.LogStorage + Rater() engine.Connector Shutdown() error } diff --git a/utils/event.go b/utils/event.go index a3c7d9d60..7e522c581 100644 --- a/utils/event.go +++ b/utils/event.go @@ -40,7 +40,7 @@ type Event interface { GetDuration(string) (time.Duration, error) GetOriginatorIP(string) string GetExtraFields() map[string]string - MissingParameter(string) bool + MissingParameter() bool ParseEventValue(*RSRField) string PassesFieldFilter(*RSRField) (bool, string) AsStoredCdr() *StoredCdr diff --git a/utils/storedcdr.go b/utils/storedcdr.go index 3655a81ad..2d341697c 100644 --- a/utils/storedcdr.go +++ b/utils/storedcdr.go @@ -462,17 +462,12 @@ func (storedCdr *StoredCdr) GetOriginatorIP(fieldName string) string { func (storedCdr *StoredCdr) GetExtraFields() map[string]string { return storedCdr.ExtraFields } -func (storedCdr *StoredCdr) MissingParameter(eventName string) bool { - switch eventName { - case CGR_AUTHORIZE: - return len(storedCdr.AccId) == 0 || - len(storedCdr.Category) == 0 || - len(storedCdr.Tenant) == 0 || - len(storedCdr.Account) == 0 || - len(storedCdr.Destination) == 0 - default: - return true - } +func (storedCdr *StoredCdr) MissingParameter() bool { + return len(storedCdr.AccId) == 0 || + len(storedCdr.Category) == 0 || + len(storedCdr.Tenant) == 0 || + len(storedCdr.Account) == 0 || + len(storedCdr.Destination) == 0 } func (storedCdr *StoredCdr) ParseEventValue(rsrFld *RSRField) string { return storedCdr.FieldAsString(rsrFld)