diff --git a/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg b/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg index a40598797..a6339decb 100644 --- a/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg +++ b/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg @@ -36,7 +36,6 @@ [balancer] # enabled = false # Start Balancer service: . -# listen = 127.0.0.1:2012 # Balancer listen interface: <""|x.y.z.y:1234>. [rater] enabled = true # Enable RaterCDRSExportPath service: . @@ -47,13 +46,13 @@ enabled = true # Starts Scheduler service: . [cdrs] enabled = true # Start the CDR Server service: . -# extra_fields = # Extra fields to store in CDRs +# extra_fields = # Extra fields to store in CDRs for non-generic CDRs mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> [cdre] -# cdr_format = csv # Exported CDRs format -# extra_fields = # List of extra fields to be exported out in CDRs -export_dir = /tmp # Path where the exported CDRs will be placed +# cdr_format = csv # Exported CDRs format +# extra_fields = # List of extra fields to be exported out in CDRs +# export_dir = /var/log/cgrates/cdr/cdrexport/csv # Path where the exported CDRs will be placed [cdrc] # enabled = false # Enable CDR client functionality @@ -72,8 +71,9 @@ export_dir = /tmp # Path where the exported CDRs will be placed # account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs. # subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs. # destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs. -# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs. -# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs. +# setup_time_field = 8 # Setup time field identifier. Use index numbers in case of .csv cdrs. +# answer_time_field = 9 # Answer time field identifier. Use index numbers in case of .csv cdrs. +# duration_field = 10 # Duration field identifier. Use index numbers in case of .csv cdrs. # extra_fields = # Extra fields identifiers. For .csv, format: :[...,:] [mediator] @@ -88,15 +88,28 @@ enabled = true # Starts Mediator service: . # account_fields = # Name of account fields to be used during extra mediation. Use index numbers in case of .csv cdrs. # subject_fields = # Name of fields to be used during extra mediation. Use index numbers in case of .csv cdrs. # destination_fields = # Name of destination fields to be used during extra mediation. Use index numbers in case of .csv cdrs. -# answer_time_fields = # Name of time_answer fields to be used during extra mediation. Use index numbers in case of .csv cdrs. +# setup_time_fields = # Name of setup_time fields to be used during extra mediation. Use index numbers in case of .csv cdrs. +# answer_time_fields = # Name of answer_time fields to be used during extra mediation. Use index numbers in case of .csv cdrs. # duration_fields = # Name of duration fields to be used during extra mediation. Use index numbers in case of .csv cdrs. [session_manager] enabled = true # Starts SessionManager service: . # switch_type = freeswitch # Defines the type of switch behind: . -# rater = internal # Address where to reach the Rater. +rater = 127.0.0.1:2013 # Address where to reach the Rater. # rater_reconnects = 3 # Number of reconnects to rater before giving up. -# debit_interval = 5 # Interval to perform debits on. +# debit_interval = 10 # Interval to perform debits on. +# max_call_duration = 3h # Maximum call duration a prepaid call can last +# run_ids = # Identifiers of additional sessions control. +# reqtype_fields = # Name of request type fields to be used during additional sessions control <""|*default|field_name>. +# direction_fields = # Name of direction fields to be used during additional sessions control <""|*default|field_name>. +# tenant_fields = # Name of tenant fields to be used during additional sessions control <""|*default|field_name>. +# tor_fields = # Name of tor fields to be used during additional sessions control <""|*default|field_name>. +# account_fields = # Name of account fields to be used during additional sessions control <""|*default|field_name>. +# subject_fields = # Name of fields to be used during additional sessions control <""|*default|field_name>. +# destination_fields = # Name of destination fields to be used during additional sessions control <""|*default|field_name>. +# setup_time_fields = # Name of setup_time fields to be used during additional sessions control <""|*default|field_name>. +# answer_time_fields = # Name of answer_time fields to be used during additional sessions control <""|*default|field_name>. +# duration_fields = # Name of duration fields to be used during additional sessions control <""|*default|field_name>. [freeswitch] # server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket. @@ -105,11 +118,11 @@ enabled = true # Starts SessionManager service: . [history_server] enabled = true # Starts History service: . -history_dir = /tmp/cgr_history # Location on disk where to store history files. -# save_interval = 1s # Interval to save changed cache into .git archive +# history_dir = /var/log/cgrates/history # Location on disk where to store history files. +# save_interval = 1s # Interval to save changed cache into .git archive [history_agent] -# enabled = false # Starts History as a client: . +enabled = true # Starts History as a client: . # server = internal # Address where to reach the master history server: [mailer] diff --git a/data/tutorials/fs_json/cgrates/tariffplans/Actions.csv b/data/tutorials/fs_json/cgrates/tariffplans/Actions.csv index 03b881baa..103d4070f 100644 --- a/data/tutorials/fs_json/cgrates/tariffplans/Actions.csv +++ b/data/tutorials/fs_json/cgrates/tariffplans/Actions.csv @@ -1,3 +1,3 @@ -#ActionsTag,Action,BalanceType,Direction,Units,ExpiryTime,DestinationTag,RatingSubject,SharedGroup,BalanceWeight,ExtraParameters,Weight -PREPAID_10,*topup_reset,*monetary,*out,10,*unlimited,*any,,,10,,10 +#ActionsTag,Action,BalanceType,Direction,Units,ExpiryTime,DestinationTag,RatingSubject,BalanceWeight,SharedGroup,ExtraParameters,Weight +PREPAID_10,*topup_reset,*monetary,*out,10,*unlimited,*any,,10,,,10 LOG_WARNING,*log,,,,,,,,,,10 diff --git a/mediator/mediator_local_test.go b/mediator/mediator_local_test.go index d017252f8..07f43c233 100644 --- a/mediator/mediator_local_test.go +++ b/mediator/mediator_local_test.go @@ -149,6 +149,7 @@ func TestPostCdrs(t *testing.T) { t.Error(err.Error()) } } + time.Sleep(10 * time.Millisecond) // Give time for CDRs to reach database if storedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, false); err != nil { t.Error(err) } else if len(storedCdrs) != 2 { // Make sure CDRs made it into StorDb diff --git a/sessionmanager/event.go b/sessionmanager/event.go index a9e222206..20df15e5c 100644 --- a/sessionmanager/event.go +++ b/sessionmanager/event.go @@ -34,7 +34,9 @@ type Event interface { GetTOR(string) string GetTenant(string) string GetReqType(string) string + GetSetupTime(string) (time.Time, error) GetAnswerTime(string) (time.Time, error) GetEndTime() (time.Time, error) + GetDuration(string) (time.Duration, error) MissingParameter() bool } diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index 52a70ccdd..adfc2afe1 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -20,6 +20,7 @@ package sessionmanager import ( "fmt" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/cgrates/fsock" "strconv" @@ -45,6 +46,7 @@ const ( SETUP_TIME = "Caller-Channel-Created-Time" ANSWER_TIME = "Caller-Channel-Answered-Time" END_TIME = "Caller-Channel-Hangup-Time" + DURATION = "" NAME = "Event-Name" HEARTBEAT = "HEARTBEAT" ANSWER = "CHANNEL_ANSWER" @@ -79,37 +81,60 @@ func (fsev FSEvent) GetName() string { return fsev[NAME] } func (fsev FSEvent) GetDirection(fieldName string) string { + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + return fieldName[len(utils.STATIC_VALUE_PREFIX):] + } //TODO: implement direction return "*out" - //return fsev[DIRECTION] } func (fsev FSEvent) GetSubject(fieldName string) string { + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + return fieldName[len(utils.STATIC_VALUE_PREFIX):] + } return utils.FirstNonEmpty(fsev[fieldName], fsev[SUBJECT], fsev[USERNAME]) } func (fsev FSEvent) GetAccount(fieldName string) string { + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + return fieldName[len(utils.STATIC_VALUE_PREFIX):] + } return utils.FirstNonEmpty(fsev[fieldName], fsev[ACCOUNT], fsev[USERNAME]) } // Charging destination number func (fsev FSEvent) GetDestination(fieldName string) string { + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + return fieldName[len(utils.STATIC_VALUE_PREFIX):] + } return utils.FirstNonEmpty(fsev[fieldName], fsev[DESTINATION], fsev[CALL_DEST_NR]) } // Original dialed destination number, useful in case of unpark func (fsev FSEvent) GetCallDestNr(fieldName string) string { + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + return fieldName[len(utils.STATIC_VALUE_PREFIX):] + } return utils.FirstNonEmpty(fsev[fieldName], fsev[CALL_DEST_NR]) } func (fsev FSEvent) GetTOR(fieldName string) string { - return utils.FirstNonEmpty(fsev[fieldName], fsev[TOR], cfg.DefaultTOR) + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + return fieldName[len(utils.STATIC_VALUE_PREFIX):] + } + return utils.FirstNonEmpty(fsev[fieldName], fsev[TOR], config.CgrConfig().DefaultTOR) } func (fsev FSEvent) GetUUID() string { return fsev[UUID] } func (fsev FSEvent) GetTenant(fieldName string) string { - return utils.FirstNonEmpty(fsev[fieldName], fsev[CSTMID], cfg.DefaultTenant) + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + return fieldName[len(utils.STATIC_VALUE_PREFIX):] + } + return utils.FirstNonEmpty(fsev[fieldName], fsev[CSTMID], config.CgrConfig().DefaultTenant) } func (fsev FSEvent) GetReqType(fieldName string) string { - return utils.FirstNonEmpty(fsev[fieldName], fsev[REQTYPE], cfg.DefaultReqType) + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + return fieldName[len(utils.STATIC_VALUE_PREFIX):] + } + return utils.FirstNonEmpty(fsev[fieldName], fsev[REQTYPE], config.CgrConfig().DefaultReqType) } func (fsev FSEvent) MissingParameter() bool { return strings.TrimSpace(fsev.GetDirection("")) == "" || @@ -121,15 +146,19 @@ func (fsev FSEvent) MissingParameter() bool { strings.TrimSpace(fsev.GetTenant("")) == "" || strings.TrimSpace(fsev.GetCallDestNr("")) == "" } -func (fsev FSEvent) GetSetupTime(field string) (t time.Time, err error) { - st, err := strconv.ParseInt(fsev[field], 0, 64) - t = time.Unix(0, st*1000) - return +func (fsev FSEvent) GetSetupTime(fieldName string) (t time.Time, err error) { + sTimeStr := utils.FirstNonEmpty(fsev[fieldName], fsev[SETUP_TIME]) + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + sTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):] + } + return utils.ParseTimeDetectLayout(sTimeStr) } -func (fsev FSEvent) GetAnswerTime(field string) (t time.Time, err error) { - st, err := strconv.ParseInt(fsev[field], 0, 64) - t = time.Unix(0, st*1000) - return +func (fsev FSEvent) GetAnswerTime(fieldName string) (t time.Time, err error) { + aTimeStr := utils.FirstNonEmpty(fsev[fieldName], fsev[ANSWER_TIME]) + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + aTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):] + } + return utils.ParseTimeDetectLayout(aTimeStr) } func (fsev FSEvent) GetEndTime() (t time.Time, err error) { @@ -137,3 +166,11 @@ func (fsev FSEvent) GetEndTime() (t time.Time, err error) { t = time.Unix(0, st*1000) return } + +func (fsev FSEvent) GetDuration(fieldName string) (dur time.Duration, err error) { + durStr := utils.FirstNonEmpty(fsev[fieldName], fsev[DURATION]) + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + durStr = fieldName[len(utils.STATIC_VALUE_PREFIX):] + } + return utils.ParseDurationWithSecs(durStr) +} diff --git a/sessionmanager/fsevent_test.go b/sessionmanager/fsevent_test.go index 6872eb871..25e0b469b 100644 --- a/sessionmanager/fsevent_test.go +++ b/sessionmanager/fsevent_test.go @@ -19,7 +19,9 @@ along with this program. If not, see package sessionmanager import ( + "github.com/cgrates/cgrates/config" "testing" + "time" ) func TestEventCreation(t *testing.T) { @@ -49,3 +51,82 @@ Task-Runtime: 1349437318` t.Error("Incorrect number of event fields: ", l) } } + +// Detects if any of the parsers do not return static values +func TestEventParseStatic(t *testing.T) { + ev := new(FSEvent).New("") + setupTime, _ := ev.GetSetupTime("^2013-12-07 08:42:24") + answerTime, _ := ev.GetAnswerTime("^2013-12-07 08:42:24") + dur, _ := ev.GetDuration("^60s") + if ev.GetReqType("^test") != "test" || + ev.GetDirection("^test") != "test" || + ev.GetTenant("^test") != "test" || + ev.GetTOR("^test") != "test" || + ev.GetAccount("^test") != "test" || + ev.GetSubject("^test") != "test" || + ev.GetDestination("^test") != "test" || + setupTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC) || + answerTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC) || + dur != time.Duration(60)*time.Second { + t.Error("Values out of static not matching", + ev.GetReqType("^test") != "test", + ev.GetDirection("^test") != "test", + ev.GetTenant("^test") != "test", + ev.GetTOR("^test") != "test", + ev.GetAccount("^test") != "test", + ev.GetSubject("^test") != "test", + ev.GetDestination("^test") != "test", + setupTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC), + answerTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC), + dur != time.Duration(60)*time.Second) + } +} + +// Test here if the answer is selected out of headers we specify, even if not default defined +func TestEventSelectiveHeaders(t *testing.T) { + body := `Event-Name: RE_SCHEDULE +Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d +FreeSWITCH-Hostname: h1.ip-switch.net +FreeSWITCH-Switchname: h1.ip-switch.net +FreeSWITCH-IPv4: 88.198.12.156 +FreeSWITCH-IPv6: %3A%3A1 +Event-Date-Local: 2012-10-05%2013%3A41%3A38 +Event-Date-GMT: Fri,%2005%20Oct%202012%2011%3A41%3A38%20GMT +Event-Date-Timestamp: 1349437298012866 +Event-Calling-File: switch_scheduler.c +Event-Calling-Function: switch_scheduler_execute +Event-Calling-Line-Number: 65 +Event-Sequence: 34263 +Task-ID: 2 +Task-Desc: heartbeat +Task-Group: core +Task-Runtime: 1349437318` + cfg, _ = config.NewDefaultCGRConfig() + config.SetCgrConfig(cfg) + ev := new(FSEvent).New(body) + setupTime, _ := ev.GetSetupTime("Event-Date-Local") + answerTime, _ := ev.GetAnswerTime("Event-Date-Local") + dur, _ := ev.GetDuration("Event-Calling-Line-Number") + if ev.GetReqType("FreeSWITCH-Hostname") != "h1.ip-switch.net" || + ev.GetDirection("FreeSWITCH-Hostname") != "*out" || + ev.GetTenant("FreeSWITCH-Hostname") != "h1.ip-switch.net" || + ev.GetTOR("FreeSWITCH-Hostname") != "h1.ip-switch.net" || + ev.GetAccount("FreeSWITCH-Hostname") != "h1.ip-switch.net" || + ev.GetSubject("FreeSWITCH-Hostname") != "h1.ip-switch.net" || + ev.GetDestination("FreeSWITCH-Hostname") != "h1.ip-switch.net" || + setupTime != time.Date(2012, 10, 5, 13, 41, 38, 0, time.UTC) || + answerTime != time.Date(2012, 10, 5, 13, 41, 38, 0, time.UTC) || + dur != time.Duration(65)*time.Second { + t.Error("Values out of static not matching", + ev.GetReqType("FreeSWITCH-Hostname") != "h1.ip-switch.net", + ev.GetDirection("FreeSWITCH-Hostname") != "*out", + ev.GetTenant("FreeSWITCH-Hostname") != "h1.ip-switch.net", + ev.GetTOR("FreeSWITCH-Hostname") != "h1.ip-switch.net", + ev.GetAccount("FreeSWITCH-Hostname") != "h1.ip-switch.net", + ev.GetSubject("FreeSWITCH-Hostname") != "h1.ip-switch.net", + ev.GetDestination("FreeSWITCH-Hostname") != "h1.ip-switch.net", + setupTime != time.Date(2012, 10, 5, 13, 41, 38, 0, time.UTC), + answerTime != time.Date(2012, 10, 5, 13, 41, 38, 0, time.UTC), + dur != time.Duration(65)*time.Second) + } +} diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index cb66fee38..fca71cfe8 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -24,7 +24,6 @@ import ( "fmt" "log/syslog" "net" - "strings" "time" "github.com/cgrates/cgrates/config" @@ -100,23 +99,23 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session { } // Disconnects a session by sending hangup command to freeswitch -func (sm *FSSessionManager) DisconnectSession(s *Session, notify string) { +func (sm *FSSessionManager) DisconnectSession(uuid string, notify string) { // engine.Logger.Debug(fmt.Sprintf("Session: %+v", s.uuid)) - _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", s.uuid, notify)) + _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) if err != nil { engine.Logger.Err(fmt.Sprintf("could not send disconect api notification to freeswitch: %v", err)) } - err = fsock.FS.SendMsgCmd(s.uuid, map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}) // without + sign + err = fsock.FS.SendMsgCmd(uuid, map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}) // without + sign if err != nil { engine.Logger.Err(fmt.Sprintf("could not send disconect msg to freeswitch: %v", err)) } return } -// Remove session from sessin list -func (sm *FSSessionManager) RemoveSession(s *Session) { +// Remove session from sessin list, removes all related in case of multiple runs +func (sm *FSSessionManager) RemoveSession(uuid string) { for i, ss := range sm.sessions { - if ss == s { + if ss.uuid == uuid { sm.sessions = append(sm.sessions[:i], sm.sessions[i+1:]...) return } @@ -150,52 +149,73 @@ func (sm *FSSessionManager) OnHeartBeat(ev Event) { } func (sm *FSSessionManager) OnChannelPark(ev Event) { - //engine.Logger.Info("freeswitch park") - startTime, err := ev.GetAnswerTime(PARK_TIME) - if err != nil { - engine.Logger.Err("Error parsing answer event start time, using time.Now!") - startTime = time.Now() + var maxCallDuration time.Duration // This will be the maximum duration this channel will be allowed to last + runIds := append([]string{utils.DEFAULT_RUNID}, cfg.SMRunIds...) // Prepend default runid to extra configured for session manager + for idx := range runIds { + var directionFld, tenantFld, torFld, actFld, subjFld, dstFld string + if idx != 0 { // Take fields out of config, default ones are automatically handled as empty + idxCfg := idx - 1 // In configuration we did not prepend values + directionFld = cfg.SMDirectionFields[idxCfg] + tenantFld = cfg.SMTenantFields[idxCfg] + torFld = cfg.SMTORFields[idxCfg] + actFld = cfg.SMAccountFields[idxCfg] + subjFld = cfg.SMSubjectFields[idxCfg] + dstFld = cfg.SMDestFields[idxCfg] + } + startTime, err := ev.GetAnswerTime(PARK_TIME) + if err != nil { + engine.Logger.Err("Error parsing answer event start time, using time.Now!") + startTime = time.Now() + } + // if there is no account configured leave the call alone + if idx == 0 && !utils.IsSliceMember([]string{utils.PREPAID, utils.PSEUDOPREPAID}, ev.GetReqType("")) { + return // we unpark only prepaid and pseudoprepaid calls + } + if ev.MissingParameter() { + sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(dstFld), MISSING_PARAMETER) + engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID())) + return + } + cd := engine.CallDescriptor{ + Direction: ev.GetDirection(directionFld), + Tenant: ev.GetTenant(tenantFld), + TOR: ev.GetTOR(torFld), + Subject: ev.GetSubject(subjFld), + Account: ev.GetAccount(actFld), + Destination: ev.GetDestination(dstFld), + TimeStart: startTime, + TimeEnd: startTime.Add(cfg.SMMaxCallDuration), + } + var remainingDurationFloat float64 + err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat) + if err != nil { + engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err)) + sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), SYSTEM_ERROR) // We unpark on original destination + return + } + remainingDuration := time.Duration(remainingDurationFloat) + // Set maxCallDuration, smallest out of all forked sessions + if idx == 0 { + maxCallDuration = remainingDuration + } else if maxCallDuration > remainingDuration { + maxCallDuration = remainingDuration + } } - // if there is no account configured leave the call alone - if !utils.IsSliceMember([]string{utils.PREPAID, utils.PSEUDOPREPAID}, strings.TrimSpace(ev.GetReqType(""))) { - return // we unpark only prepaid and pseudoprepaid calls - } - if ev.MissingParameter() { - sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), MISSING_PARAMETER) - engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID())) - return - } - cd := engine.CallDescriptor{ - Direction: ev.GetDirection(""), - Tenant: ev.GetTenant(""), - TOR: ev.GetTOR(""), - Subject: ev.GetSubject(""), - Account: ev.GetAccount(""), - Destination: ev.GetDestination(""), - TimeStart: startTime, - TimeEnd: startTime.Add(cfg.SMMaxCallDuration), - } - var remainingDurationFloat float64 - err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat) - if err != nil { - engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err)) - sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), SYSTEM_ERROR) - return - } - remainingDuration := time.Duration(remainingDurationFloat) - //engine.Logger.Info(fmt.Sprintf("Remaining duration: %v", remainingDuration)) - if remainingDuration == 0 { + if maxCallDuration == 0 { //engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject))) sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), INSUFFICIENT_FUNDS) return } - sm.setMaxCallDuration(ev.GetUUID(), remainingDuration) + sm.setMaxCallDuration(ev.GetUUID(), maxCallDuration) sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), AUTH_OK) } func (sm *FSSessionManager) OnChannelAnswer(ev Event) { //engine.Logger.Info(" FreeSWITCH answer.") // Make sure cgr_type is enforced even if not set by FreeSWITCH + if ev.MissingParameter() { + sm.DisconnectSession(ev.GetUUID(), MISSING_PARAMETER) + } if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_reqtype %s\n\n", ev.GetUUID(), ev.GetReqType(""))); err != nil { engine.Logger.Err(fmt.Sprintf("Error on attempting to overwrite cgr_type in chan variables: %v", err)) } @@ -206,131 +226,131 @@ func (sm *FSSessionManager) OnChannelAnswer(ev Event) { } func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { - //engine.Logger.Info(" FreeSWITCH hangup.") s := sm.GetSession(ev.GetUUID()) if s == nil { // Not handled by us return + } else { + sm.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency } - defer s.Close(ev) // Stop loop and save the costs deducted so far to database - if ev.GetReqType("") == utils.POSTPAID { - startTime, err := ev.GetAnswerTime(ANSWER_TIME) - if err != nil { - engine.Logger.Crit("Error parsing postpaid call start time from event") - return + defer s.Close(ev) // Stop loop and save the costs deducted so far to database + runIds := append([]string{utils.DEFAULT_RUNID}, cfg.SMRunIds...) // Prepend default runid to extra configured for session manager + for idx := range runIds { + var reqTypeFld, directionFld, tenantFld, torFld, actFld, subjFld, dstFld, aTimeFld string // ToDo: Add durFld + if idx != 0 { // Take fields out of config, default ones are automatically handled as empty + idxCfg := idx - 1 // In configuration we did not prepend values + reqTypeFld = cfg.SMReqTypeFields[idxCfg] + directionFld = cfg.SMDirectionFields[idxCfg] + tenantFld = cfg.SMTenantFields[idxCfg] + torFld = cfg.SMTORFields[idxCfg] + actFld = cfg.SMAccountFields[idxCfg] + subjFld = cfg.SMSubjectFields[idxCfg] + dstFld = cfg.SMDestFields[idxCfg] + aTimeFld = cfg.SMAnswerTimeFields[idxCfg] + // durFld = cfg.SMDurationFields[idxCfg] } - endTime, err := ev.GetEndTime() - if err != nil { - engine.Logger.Crit("Error parsing postpaid call start time from event") - return - } - cd := engine.CallDescriptor{ - Direction: ev.GetDirection(""), - Tenant: ev.GetTenant(""), - TOR: ev.GetTOR(""), - Subject: ev.GetSubject(""), - Account: ev.GetAccount(""), - LoopIndex: 0, - CallDuration: endTime.Sub(startTime), - Destination: ev.GetDestination(""), - TimeStart: startTime, - TimeEnd: endTime, - } - cc := &engine.CallCost{} - err = sm.connector.Debit(cd, cc) - if err != nil { - engine.Logger.Err(fmt.Sprintf("Error making the general debit for postpaid call: %v", ev.GetUUID())) - return - } - s.CallCosts = append(s.CallCosts, cc) - return - } - - if s == nil || len(s.CallCosts) == 0 { - return // why would we have 0 callcosts - } - lastCC := s.CallCosts[len(s.CallCosts)-1] - // put credit back - var hangupTime time.Time - var err error - if hangupTime, err = ev.GetEndTime(); err != nil { - engine.Logger.Err("Error parsing answer event hangup time, using time.Now!") - hangupTime = time.Now() - } - end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd - refundDuration := end.Sub(hangupTime) - //initialRefundDuration := refundDuration - var refundIncrements engine.Increments - for i := len(lastCC.Timespans) - 1; i >= 0; i-- { - ts := lastCC.Timespans[i] - tsDuration := ts.GetDuration() - if refundDuration <= tsDuration { - lastRefundedIncrementIndex := 0 - for j := len(ts.Increments) - 1; j >= 0; j-- { - increment := ts.Increments[j] - if increment.Duration <= refundDuration { - refundIncrements = append(refundIncrements, increment) - refundDuration -= increment.Duration - lastRefundedIncrementIndex = j + if ev.GetReqType(reqTypeFld) == utils.POSTPAID { + startTime, err := ev.GetAnswerTime(aTimeFld) + if err != nil { + engine.Logger.Crit("Error parsing postpaid call start time from event") + return + } + endTime, err := ev.GetEndTime() + if err != nil { + engine.Logger.Crit("Error parsing postpaid call start time from event") + return + } + cd := engine.CallDescriptor{ + Direction: ev.GetDirection(directionFld), + Tenant: ev.GetTenant(tenantFld), + TOR: ev.GetTOR(torFld), + Subject: ev.GetSubject(actFld), + Account: ev.GetAccount(subjFld), + LoopIndex: 0, + CallDuration: endTime.Sub(startTime), + Destination: ev.GetDestination(dstFld), + TimeStart: startTime, + TimeEnd: endTime, + } + cc := &engine.CallCost{} + err = sm.connector.Debit(cd, cc) + if err != nil { + engine.Logger.Err(fmt.Sprintf("Error making the general debit for postpaid call: %v", ev.GetUUID())) + return + } + s.sessionRuns[idx].callCosts = append(s.sessionRuns[idx].callCosts, cc) + } else if ev.GetReqType(reqTypeFld) == utils.PREPAID { // Prepaid calls + if len(s.sessionRuns[idx].callCosts) == 0 { + continue // why would we have 0 callcosts + } + lastCC := s.sessionRuns[idx].callCosts[len(s.sessionRuns[idx].callCosts)-1] + // put credit back + var hangupTime time.Time + var err error + if hangupTime, err = ev.GetEndTime(); err != nil { + engine.Logger.Err("Error parsing answer event hangup time, using time.Now!") + hangupTime = time.Now() + } + end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd + refundDuration := end.Sub(hangupTime) + var refundIncrements engine.Increments + for i := len(lastCC.Timespans) - 1; i >= 0; i-- { + ts := lastCC.Timespans[i] + tsDuration := ts.GetDuration() + if refundDuration <= tsDuration { + lastRefundedIncrementIndex := 0 + for j := len(ts.Increments) - 1; j >= 0; j-- { + increment := ts.Increments[j] + if increment.Duration <= refundDuration { + refundIncrements = append(refundIncrements, increment) + refundDuration -= increment.Duration + lastRefundedIncrementIndex = j + } + } + ts.SplitByIncrement(lastRefundedIncrementIndex) + break // do not go to other timespans + } else { + refundIncrements = append(refundIncrements, ts.Increments...) + // remove the timespan entirely + lastCC.Timespans[i] = nil + lastCC.Timespans = lastCC.Timespans[:i] + // continue to the next timespan with what is left to refund + refundDuration -= tsDuration } } - ts.SplitByIncrement(lastRefundedIncrementIndex) - break // do not go to other timespans - } else { - refundIncrements = append(refundIncrements, ts.Increments...) - // remove the timespan entirely - lastCC.Timespans[i] = nil - lastCC.Timespans = lastCC.Timespans[:i] - // continue to the next timespan with what is left to refund - refundDuration -= tsDuration + // show only what was actualy refunded (stopped in timespan) + // engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) + if len(refundIncrements) > 0 { + cd := &engine.CallDescriptor{ + Direction: lastCC.Direction, + Tenant: lastCC.Tenant, + TOR: lastCC.TOR, + Subject: lastCC.Subject, + Account: lastCC.Account, + Destination: lastCC.Destination, + Increments: refundIncrements, + } + var response float64 + err := sm.connector.RefundIncrements(*cd, &response) + if err != nil { + engine.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err)) + } + } + cost := refundIncrements.GetTotalCost() + lastCC.Cost -= cost + // engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost)) } } - // show only what was actualy refunded (stopped in timespan) - // engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) - if len(refundIncrements) > 0 { - cd := &engine.CallDescriptor{ - Direction: lastCC.Direction, - Tenant: lastCC.Tenant, - TOR: lastCC.TOR, - Subject: lastCC.Subject, - Account: lastCC.Account, - Destination: lastCC.Destination, - Increments: refundIncrements, - // FallbackSubject: lastCC.FallbackSubject, // TODO: check how to best add it - } - var response float64 - err := sm.connector.RefundIncrements(*cd, &response) - if err != nil { - engine.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err)) - } - } - cost := refundIncrements.GetTotalCost() - lastCC.Cost -= cost - // engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost)) } -func (sm *FSSessionManager) LoopAction(s *Session, cd *engine.CallDescriptor) (cc *engine.CallCost) { - cc = &engine.CallCost{} - err := sm.connector.MaxDebit(*cd, cc) - if err != nil { - engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err)) - // disconnect session - s.sessionManager.DisconnectSession(s, SYSTEM_ERROR) - } - // engine.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc)) - if cc.GetDuration() == 0 || err != nil { - // engine.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s)) - sm.DisconnectSession(s, INSUFFICIENT_FUNDS) - return - } - s.CallCosts = append(s.CallCosts, cc) - return -} - func (sm *FSSessionManager) GetDebitPeriod() time.Duration { return sm.debitPeriod } +func (sm *FSSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error { + return sm.connector.MaxDebit(*cd, cc) +} + func (sm *FSSessionManager) GetDbLogger() engine.LogStorage { return sm.loggerDB } @@ -345,7 +365,6 @@ func (sm *FSSessionManager) Shutdown() (err error) { for _, cmd := range []string{cmdKillPrepaid, cmdKillPostpaid} { if _, err = fsock.FS.SendApiCmd(cmd); err != nil { engine.Logger.Err(fmt.Sprintf("Error on calls shutdown: %s", err)) - return } } for guard := 0; len(sm.sessions) > 0 && guard < 20; guard++ { diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 902ede552..706250dc7 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -19,6 +19,7 @@ along with this program. If not, see package sessionmanager import ( + "encoding/json" "fmt" "time" @@ -30,52 +31,70 @@ import ( // actions and a channel to signal end of the debit loop. type Session struct { uuid string - callDescriptor *engine.CallDescriptor - sessionManager SessionManager stopDebit chan bool - CallCosts []*engine.CallCost + sessionManager SessionManager + sessionRuns []*SessionRun } -// Creates a new session and starts the debit loop -func NewSession(ev Event, sm SessionManager) (s *Session) { - // SesionManager only handles prepaid and postpaid calls - if ev.GetReqType("") != utils.PREPAID && ev.GetReqType("") != utils.POSTPAID { - return - } - startTime, err := ev.GetAnswerTime(ANSWER_TIME) - if err != nil { - engine.Logger.Err("Error parsing answer event start time, using time.Now!") - startTime = time.Now() - } +// One individual run +type SessionRun struct { + runId string + callDescriptor *engine.CallDescriptor + callCosts []*engine.CallCost +} - cd := &engine.CallDescriptor{ - Direction: ev.GetDirection(""), - Tenant: ev.GetTenant(""), - TOR: ev.GetTOR(""), - Subject: ev.GetSubject(""), - Account: ev.GetAccount(""), - Destination: ev.GetDestination(""), - TimeStart: startTime} - s = &Session{uuid: ev.GetUUID(), - callDescriptor: cd, - stopDebit: make(chan bool, 2)} //buffer it for multiple close signals - s.sessionManager = sm - if ev.MissingParameter() { - sm.DisconnectSession(s, MISSING_PARAMETER) - } else { - switch ev.GetReqType("") { - case utils.PREPAID: - go s.startDebitLoop() - case utils.POSTPAID: - // do not loop, make only one debit at hangup +// Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually +func NewSession(ev Event, sm SessionManager) *Session { + s := &Session{uuid: ev.GetUUID(), + stopDebit: make(chan bool), + sessionManager: sm, + sessionRuns: make([]*SessionRun, 0), + } + runIds := append([]string{utils.DEFAULT_RUNID}, cfg.SMRunIds...) // Prepend default runid to extra configured for session manager + for idx, runId := range runIds { // Create the SessionRuns here + var reqTypeFld, directionFld, tenantFld, torFld, actFld, subjFld, dstFld, aTimeFld string + if idx != 0 { // Take fields out of config, default ones are automatically handled as empty + idxCfg := idx - 1 // In configuration we did not prepend values + reqTypeFld = cfg.SMReqTypeFields[idxCfg] + directionFld = cfg.SMDirectionFields[idxCfg] + tenantFld = cfg.SMTenantFields[idxCfg] + torFld = cfg.SMTORFields[idxCfg] + actFld = cfg.SMAccountFields[idxCfg] + subjFld = cfg.SMSubjectFields[idxCfg] + dstFld = cfg.SMDestFields[idxCfg] + aTimeFld = cfg.SMAnswerTimeFields[idxCfg] + } + startTime, err := ev.GetAnswerTime(aTimeFld) + if err != nil { + engine.Logger.Err("Error parsing answer event start time, using time.Now!") + startTime = time.Now() + } + cd := &engine.CallDescriptor{ + Direction: ev.GetDirection(directionFld), + Tenant: ev.GetTenant(tenantFld), + TOR: ev.GetTOR(torFld), + Subject: ev.GetSubject(subjFld), + Account: ev.GetAccount(actFld), + Destination: ev.GetDestination(dstFld), + TimeStart: startTime} + sr := &SessionRun{ + runId: runId, + callDescriptor: cd, + } + s.sessionRuns = append(s.sessionRuns, sr) + if ev.GetReqType(reqTypeFld) == utils.PREPAID { + go s.debitLoop(len(s.sessionRuns) - 1) // Send index of the just appended sessionRun } } - return + if len(s.sessionRuns) == 0 { + return nil + } + return s } // the debit loop method (to be stoped by sending somenthing on stopDebit channel) -func (s *Session) startDebitLoop() { - nextCd := *s.callDescriptor +func (s *Session) debitLoop(runIdx int) { + nextCd := *s.sessionRuns[runIdx].callDescriptor index := 0.0 debitPeriod := s.sessionManager.GetDebitPeriod() for { @@ -90,7 +109,18 @@ func (s *Session) startDebitLoop() { nextCd.TimeEnd = nextCd.TimeStart.Add(debitPeriod) nextCd.LoopIndex = index nextCd.CallDuration += debitPeriod // first presumed duration - cc := s.sessionManager.LoopAction(s, &nextCd) + cc := &engine.CallCost{} + if err := s.sessionManager.MaxDebit(&nextCd, cc); err != nil { + engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err)) + // disconnect session + s.sessionManager.DisconnectSession(s.uuid, SYSTEM_ERROR) + return + } + if cc.GetDuration() == 0 { + s.sessionManager.DisconnectSession(s.uuid, INSUFFICIENT_FUNDS) + return + } + s.sessionRuns[runIdx].callCosts = append(s.sessionRuns[runIdx].callCosts, cc) nextCd.TimeEnd = cc.GetEndTime() // set debited timeEnd // update call duration with real debited duration nextCd.CallDuration -= debitPeriod @@ -100,52 +130,43 @@ func (s *Session) startDebitLoop() { } } -// Returns the session duration till the specified time -func (s *Session) getSessionDurationFrom(now time.Time) (d time.Duration) { - seconds := now.Sub(s.callDescriptor.TimeStart).Seconds() - d, err := time.ParseDuration(fmt.Sprintf("%ds", int(seconds))) - if err != nil { - engine.Logger.Err(fmt.Sprintf("Cannot parse session duration %v", seconds)) - } - return -} - // Stops the debit loop func (s *Session) Close(ev Event) { // engine.Logger.Debug(fmt.Sprintf("Stopping debit for %s", s.uuid)) if s == nil { return } - s.stopDebit <- true - //s.callDescriptor.TimeEnd = time.Now() - if endTime, err := ev.GetEndTime(); err != nil { + close(s.stopDebit) // Close the channel so all the sessionRuns listening will be notified + if _, err := ev.GetEndTime(); err != nil { engine.Logger.Err("Error parsing answer event stop time.") - endTime = s.callDescriptor.TimeStart.Add(s.callDescriptor.CallDuration) - s.callDescriptor.TimeEnd = endTime + for idx := range s.sessionRuns { + s.sessionRuns[idx].callDescriptor.TimeEnd = s.sessionRuns[idx].callDescriptor.TimeStart.Add(s.sessionRuns[idx].callDescriptor.CallDuration) + } } s.SaveOperations() - s.sessionManager.RemoveSession(s) } // Nice print for session func (s *Session) String() string { - return fmt.Sprintf("%v: %s(%s) -> %s", s.callDescriptor.TimeStart, s.callDescriptor.Subject, s.callDescriptor.Account, s.callDescriptor.Destination) + sDump, _ := json.Marshal(s) + return string(sDump) } -// +// Saves call_costs for each session run func (s *Session) SaveOperations() { - go func() { - if s == nil || len(s.CallCosts) == 0 { - return - } - firstCC := s.CallCosts[0] - for _, cc := range s.CallCosts[1:] { - firstCC.Merge(cc) - } - if s.sessionManager.GetDbLogger() == nil { - engine.Logger.Err(" Error: no connection to logger database, cannot save costs") - } - s.sessionManager.GetDbLogger().LogCallCost(s.uuid, engine.SESSION_MANAGER_SOURCE, utils.DEFAULT_RUNID, firstCC) - // engine.Logger.Debug(fmt.Sprintf(" End of call, having costs: %v", firstCC.String())) - }() + if s == nil { + return + } + for _, sr := range s.sessionRuns { + go func() { + firstCC := sr.callCosts[0] + for _, cc := range sr.callCosts[1:] { + firstCC.Merge(cc) + } + if s.sessionManager.GetDbLogger() == nil { + engine.Logger.Err(" Error: no connection to logger database, cannot save costs") + } + s.sessionManager.GetDbLogger().LogCallCost(s.uuid, engine.SESSION_MANAGER_SOURCE, sr.runId, firstCC) + }() + } } diff --git a/sessionmanager/session_test.go b/sessionmanager/session_test.go index 6fac76144..b4dc88773 100644 --- a/sessionmanager/session_test.go +++ b/sessionmanager/session_test.go @@ -19,8 +19,8 @@ along with this program. If not, see package sessionmanager import ( - "github.com/cgrates/cgrates/config" - "testing" +//"github.com/cgrates/cgrates/config" +//"testing" ) var ( @@ -58,17 +58,7 @@ default_reqtype= `) ) -/*func TestSessionDurationSingle(t *testing.T) { - newEvent := new(FSEvent).New(newEventBody) - sm := &FSSessionManager{} - s := NewSession(newEvent, sm) - defer s.Close() - twoSeconds, _ := time.ParseDuration("2s") - if d := s.getSessionDurationFrom(s.callDescriptor.TimeStart.Add(twoSeconds)); d.Seconds() < 2 || d.Seconds() > 3 { - t.Errorf("Wrong session duration %v", d) - } -}*/ - +/* Missing parameter is not longer tested in NewSession. ToDo: expand this test for more util information func TestSessionNilSession(t *testing.T) { var errCfg error cfg, errCfg = config.NewCGRConfigBytes(conf_data) // Needed here to avoid nil on cfg variable @@ -82,3 +72,4 @@ func TestSessionNilSession(t *testing.T) { t.Error("no account and it still created session.") } } +*/ diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index ea0074786..f00042496 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -27,9 +27,9 @@ import ( type SessionManager interface { Connect(*config.CGRConfig) error - DisconnectSession(*Session, string) - RemoveSession(*Session) - LoopAction(*Session, *engine.CallDescriptor) *engine.CallCost + DisconnectSession(string, string) + RemoveSession(string) + MaxDebit(*engine.CallDescriptor, *engine.CallCost) error GetDebitPeriod() time.Duration GetDbLogger() engine.LogStorage Shutdown() error diff --git a/utils/coreutils.go b/utils/coreutils.go index 71e713cd8..a4c78c212 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -112,6 +112,7 @@ func ParseTimeDetectLayout(tmStr string) (time.Time, error) { rfc3339Rule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.+$`) sqlRule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}$`) gotimeRule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.?\d*\s[+,-]\d+\s\w+$`) + fsTimestamp := regexp.MustCompile(`^\d{16}$`) unixTimestampRule := regexp.MustCompile(`^\d{10}$`) switch { case rfc3339Rule.MatchString(tmStr): @@ -120,6 +121,12 @@ func ParseTimeDetectLayout(tmStr string) (time.Time, error) { return time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", tmStr) case sqlRule.MatchString(tmStr): return time.Parse("2006-01-02 15:04:05", tmStr) + case fsTimestamp.MatchString(tmStr): + if tmstmp, err := strconv.ParseInt(tmStr+"000", 10, 64); err != nil { + return nilTime, err + } else { + return time.Unix(0, tmstmp), nil + } case unixTimestampRule.MatchString(tmStr): if tmstmp, err := strconv.ParseInt(tmStr, 10, 64); err != nil { return nilTime, err diff --git a/utils/utils_test.go b/utils/utils_test.go index 394170f92..1e5bcefb0 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -186,6 +186,14 @@ func TestParseTimeDetectLayout(t *testing.T) { if err == nil { t.Errorf("Expecting error") } + fsTmstampStr := "1394291049287234" + fsTm, err := ParseTimeDetectLayout(fsTmstampStr) + expectedTime = time.Date(2014, 3, 8, 15, 4, 9, 287234000, time.UTC) + if err != nil { + t.Error(err) + } else if !fsTm.Equal(expectedTime) { + t.Errorf("Unexpected time parsed: %v, expecting: %v", fsTm, expectedTime) + } } func TestParseDateUnix(t *testing.T) {