diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index 38b8d19dc..a3f7558d6 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -38,7 +38,7 @@ func cdrHandler(w http.ResponseWriter, r *http.Request) { if fsCdr, err := new(FSCdr).New(body); err == nil { storage.SetCdr(fsCdr) if cfg.CDRSMediator == "internal" { - errMedi := medi.MediateCdrFromDB(fsCdr, storage) + errMedi := medi.MediateDBCDR(fsCdr, storage) if errMedi != nil { rater.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", errMedi.Error())) } diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index e51fee794..db0db9a33 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -21,7 +21,6 @@ package cdrs import ( "encoding/json" "errors" - "github.com/cgrates/cgrates/rater" "github.com/cgrates/cgrates/utils" "strconv" "time" @@ -41,8 +40,8 @@ const ( CSTMID = "cgr_cstmid" CALL_DEST_NR = "dialed_extension" PARK_TIME = "start_epoch" - START_TIME = "answer_epoch" - END_TIME = "end_epoch" + ANSWER_TIME = "answer_epoch" + HANGUP_TIME = "end_epoch" DURATION = "billsec" USERNAME = "user_name" FS_IP = "sip_local_network_addr" @@ -50,7 +49,7 @@ const ( type FSCdr map[string]string -func (fsCdr FSCdr) New(body []byte) (rater.CDR, error) { +func (fsCdr FSCdr) New(body []byte) (utils.CDR, error) { fsCdr = make(map[string]string) var tmp map[string]interface{} var err error @@ -95,35 +94,29 @@ func (fsCdr FSCdr) GetDestination() string { return utils.FirstNonEmpty(fsCdr[DESTINATION], fsCdr[CALL_DEST_NR]) } -// Original dialed destination number, useful in case of unpark -func (fsCdr FSCdr) GetCallDestNr() string { - return fsCdr[CALL_DEST_NR] -} func (fsCdr FSCdr) GetTOR() string { return utils.FirstNonEmpty(fsCdr[TOR], cfg.DefaultTOR) } -func (fsCdr FSCdr) GetUUID() string { - return fsCdr[UUID] -} + func (fsCdr FSCdr) GetTenant() string { return utils.FirstNonEmpty(fsCdr[CSTMID], cfg.DefaultTenant) } func (fsCdr FSCdr) GetReqType() string { return utils.FirstNonEmpty(fsCdr[REQTYPE], cfg.DefaultReqType) } -func (fsCdr FSCdr) GetExtraParameters() string { - return "" // ToDo: Add and extract from config +func (fsCdr FSCdr) GetExtraFields() map[string]string { + return nil // ToDo: Add and extract from config } func (fsCdr FSCdr) GetFallbackSubj() string { return cfg.DefaultSubject } -func (fsCdr FSCdr) GetStartTime() (t time.Time, err error) { - st, err := strconv.ParseInt(fsCdr[START_TIME], 0, 64) +func (fsCdr FSCdr) GetAnswerTime() (t time.Time, err error) { + st, err := strconv.ParseInt(fsCdr[ANSWER_TIME], 0, 64) t = time.Unix(0, st*1000) return } -func (fsCdr FSCdr) GetEndTime() (t time.Time, err error) { - st, err := strconv.ParseInt(fsCdr[END_TIME], 0, 64) +func (fsCdr FSCdr) GetHangupTime() (t time.Time, err error) { + st, err := strconv.ParseInt(fsCdr[HANGUP_TIME], 0, 64) t = time.Unix(0, st*1000) return } @@ -143,24 +136,22 @@ func (fsCdr FSCdr) Store() (result string, err error) { result += fsCdr.GetSubject() + "|" result += fsCdr.GetAccount() + "|" result += fsCdr.GetDestination() + "|" - result += fsCdr.GetCallDestNr() + "|" result += fsCdr.GetTOR() + "|" - result += fsCdr.GetUUID() + "|" + result += fsCdr.GetAccId() + "|" result += fsCdr.GetTenant() + "|" result += fsCdr.GetReqType() + "|" - st, err := fsCdr.GetStartTime() + st, err := fsCdr.GetAnswerTime() if err != nil { return "", err } result += strconv.FormatInt(st.UnixNano(), 10) + "|" - et, err := fsCdr.GetEndTime() + et, err := fsCdr.GetHangupTime() if err != nil { return "", err } result += strconv.FormatInt(et.UnixNano(), 10) + "|" result += strconv.FormatInt(fsCdr.GetDuration(), 10) + "|" result += fsCdr.GetFallbackSubj() + "|" - result += fsCdr.GetExtraParameters() + "|" return } diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index ce982b805..1d03e6955 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -29,11 +29,11 @@ import ( "github.com/cgrates/cgrates/rater" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/sessionmanager" + "github.com/cgrates/cgrates/utils" "io" "net" "net/rpc" "net/rpc/jsonrpc" - "os" "runtime" "strconv" "time" @@ -50,7 +50,6 @@ const ( REDIS = "redis" SAME = "same" FS = "freeswitch" - FSCDR_FILE_CSV = "freeswitch_file_csv" ) var ( @@ -130,19 +129,9 @@ func startMediator(responder *rater.Responder, loggerDb rater.DataStorage) { exitChan <- true } - if cfg.MediatorCDRType == FSCDR_FILE_CSV { //Mediator as standalone service for file CDRs - if _, err := os.Stat(cfg.MediatorCDRInDir); err != nil { - rater.Logger.Crit(fmt.Sprintf("The input path for mediator does not exist: %v", cfg.MediatorCDRInDir)) - exitChan <- true - } - if _, err := os.Stat(cfg.MediatorCDROutDir); err != nil { - rater.Logger.Crit(fmt.Sprintf("The output path for mediator does not exist: %v", cfg.MediatorCDROutDir)) - exitChan <- true - } - medi.TrackCDRFiles(cfg.MediatorCDRInDir) + if cfg.MediatorCDRType == utils.FSCDR_FILE_CSV { //Mediator as standalone service for file CDRs + medi.TrackCDRFiles() } - - } func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage) { diff --git a/config/config.go b/config/config.go index c25784526..945eb4696 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,7 @@ import ( "code.google.com/p/goconf/conf" "errors" "fmt" + "github.com/cgrates/cgrates/utils" ) const ( @@ -67,7 +68,8 @@ type CGRConfig struct { SchedulerEnabled bool CDRSListen string // CDRS's listening interface: . CDRSfsJSONEnabled bool // Enable the handler for FreeSWITCH JSON CDRs: . - CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal> + CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal> + CDRSExtraFields []string //Extra fields to store in CDRs SMEnabled bool SMSwitchType string SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer @@ -79,15 +81,15 @@ type CGRConfig struct { MediatorRaterReconnects int // Number of reconnects to rater before giving up. MediatorCDRType string // CDR type . MediatorAccIdField string // Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs. - MediatorSubjectFields string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs. - MediatorReqTypeFields string // Name of request type fields to be used during mediation. Use index number in case of .csv cdrs. - MediatorDirectionFields string // Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs. - MediatorTenantFields string // Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs. - MediatorTORFields string // Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs. - MediatorAccountFields string // Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs. - MediatorDestFields string // Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs. - MediatorTimeStartFields string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs. - MediatorDurationFields string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs. + MediatorSubjectFields []string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs. + MediatorReqTypeFields []string // Name of request type fields to be used during mediation. Use index number in case of .csv cdrs. + MediatorDirectionFields []string // Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs. + MediatorTenantFields []string // Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs. + MediatorTORFields []string // Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs. + MediatorAccountFields []string // Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs. + MediatorDestFields []string // Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs. + MediatorTimeAnswerFields []string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs. + MediatorDurationFields []string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs. MediatorCDRInDir string // Absolute path towards the directory where the CDRs are kept (file stored CDRs). MediatorCDROutDir string // Absolute path towards the directory where processed CDRs will be exported (file stored CDRs). FreeswitchServer string // freeswitch address host:port @@ -122,21 +124,22 @@ func ( self *CGRConfig ) setDefaults() error { self.CDRSListen = "127.0.0.1:2022" self.CDRSfsJSONEnabled = false self.CDRSMediator = INTERNAL + self.CDRSExtraFields = []string{} self.MediatorEnabled = false self.MediatorListen = "127.0.0.1:2032" self.MediatorRater = "127.0.0.1:2012" self.MediatorRaterReconnects = 3 - self.MediatorCDRType = "freeswitch_http_json" + self.MediatorCDRType = utils.FSCDR_HTTP_JSON self.MediatorAccIdField = "accid" - self.MediatorSubjectFields = "subject" - self.MediatorReqTypeFields = "reqtype" - self.MediatorDirectionFields = "direction" - self.MediatorTenantFields = "tenant" - self.MediatorTORFields = "tor" - self.MediatorAccountFields = "account" - self.MediatorDestFields = "destination" - self.MediatorTimeStartFields = "time_start" - self.MediatorDurationFields = "duration" + self.MediatorSubjectFields = []string{"subject"} + self.MediatorReqTypeFields = []string{"reqtype"} + self.MediatorDirectionFields = []string{"direction"} + self.MediatorTenantFields = []string{"tenant"} + self.MediatorTORFields = []string{"tor"} + self.MediatorAccountFields = []string{"account"} + self.MediatorDestFields = []string{"destination"} + self.MediatorTimeAnswerFields = []string{"time_answer"} + self.MediatorDurationFields = []string{"duration"} self.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv" self.MediatorCDROutDir = "/var/log/cgrates/cdr/out/freeswitch/csv" self.SMEnabled = false @@ -173,6 +176,7 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { cfg := &CGRConfig{} cfg.setDefaults() var hasOpt bool + var errParse error if hasOpt = c.HasOption("global", "datadb_type"); hasOpt { cfg.DataDBType, _ = c.GetString("global", "datadb_type") } @@ -251,6 +255,11 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { if hasOpt = c.HasOption("cdrs", "mediator"); hasOpt { cfg.CDRSMediator, _ = c.GetString("cdrs", "mediator") } + if hasOpt = c.HasOption("cdrs", "extra_fields"); hasOpt { + if cfg.CDRSExtraFields, errParse = ConfigSlice( c, "cdrs", "extra_fields"); errParse!=nil { + return nil, errParse + } + } if hasOpt = c.HasOption("mediator", "enabled"); hasOpt { cfg.MediatorEnabled, _ = c.GetBool("mediator", "enabled") } @@ -270,31 +279,49 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { cfg.MediatorAccIdField, _ = c.GetString("mediator", "accid_field") } if hasOpt = c.HasOption("mediator", "subject_fields"); hasOpt { - cfg.MediatorSubjectFields, _ = c.GetString("mediator", "subject_fields") + if cfg.MediatorSubjectFields, errParse = ConfigSlice( c, "mediator", "subject_fields"); errParse!=nil { + return nil, errParse + } } if hasOpt = c.HasOption("mediator", "reqtype_fields"); hasOpt { - cfg.MediatorReqTypeFields, _ = c.GetString("mediator", "reqtype_fields") + if cfg.MediatorReqTypeFields, errParse = ConfigSlice( c, "mediator", "reqtype_fields"); errParse!=nil { + return nil, errParse + } } if hasOpt = c.HasOption("mediator", "direction_fields"); hasOpt { - cfg.MediatorDirectionFields, _ = c.GetString("mediator", "direction_fields") + if cfg.MediatorDirectionFields, errParse = ConfigSlice( c, "mediator", "direction_fields"); errParse!=nil { + return nil, errParse + } } if hasOpt = c.HasOption("mediator", "tenant_fields"); hasOpt { - cfg.MediatorTenantFields, _ = c.GetString("mediator", "tenant_fields") + if cfg.MediatorTenantFields, errParse = ConfigSlice( c, "mediator", "tenant_fields"); errParse!=nil { + return nil, errParse + } } if hasOpt = c.HasOption("mediator", "tor_fields"); hasOpt { - cfg.MediatorTORFields, _ = c.GetString("mediator", "tor_fields") + if cfg.MediatorTORFields, errParse = ConfigSlice( c, "mediator", "tor_fields"); errParse!=nil { + return nil, errParse + } } if hasOpt = c.HasOption("mediator", "account_fields"); hasOpt { - cfg.MediatorAccountFields, _ = c.GetString("mediator", "account_fields") + if cfg.MediatorAccountFields, errParse = ConfigSlice( c, "mediator", "account_fields"); errParse!=nil { + return nil, errParse + } } if hasOpt = c.HasOption("mediator", "destination_fields"); hasOpt { - cfg.MediatorDestFields, _ = c.GetString("mediator", "destination_fields") + if cfg.MediatorDestFields, errParse = ConfigSlice( c, "mediator", "destination_fields"); errParse!=nil { + return nil, errParse + } } - if hasOpt = c.HasOption("mediator", "time_start_fields"); hasOpt { - cfg.MediatorTimeStartFields, _ = c.GetString("mediator", "time_start_fields") + if hasOpt = c.HasOption("mediator", "time_answer_fields"); hasOpt { + if cfg.MediatorTimeAnswerFields, errParse = ConfigSlice( c, "mediator", "time_answer_fields"); errParse!=nil { + return nil, errParse + } } if hasOpt = c.HasOption("mediator", "duration_fields"); hasOpt { - cfg.MediatorDurationFields, _ = c.GetString("mediator", "duration_fields") + if cfg.MediatorDurationFields, errParse = ConfigSlice( c, "mediator", "duration_fields"); errParse!=nil { + return nil, errParse + } } if hasOpt = c.HasOption("mediator", "cdr_in_dir"); hasOpt { cfg.MediatorCDRInDir, _ = c.GetString("mediator", "cdr_in_dir") diff --git a/config/config_test.go b/config/config_test.go index ca94c6667..839ef3e5e 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -58,22 +58,23 @@ func TestDefaults(t *testing.T) { eCfg.SchedulerEnabled = false eCfg.CDRSListen = "127.0.0.1:2022" eCfg.CDRSfsJSONEnabled = false - eCfg.CDRSMediator = INTERNAL + eCfg.CDRSMediator = INTERNAL + eCfg.CDRSExtraFields = []string{} eCfg.MediatorEnabled = false eCfg.MediatorListen = "127.0.0.1:2032" eCfg.MediatorRater = "127.0.0.1:2012" eCfg.MediatorRaterReconnects = 3 eCfg.MediatorCDRType = "freeswitch_http_json" eCfg.MediatorAccIdField = "accid" - eCfg.MediatorSubjectFields = "subject" - eCfg.MediatorReqTypeFields = "reqtype" - eCfg.MediatorDirectionFields = "direction" - eCfg.MediatorTenantFields = "tenant" - eCfg.MediatorTORFields = "tor" - eCfg.MediatorAccountFields = "account" - eCfg.MediatorDestFields = "destination" - eCfg.MediatorTimeStartFields = "time_start" - eCfg.MediatorDurationFields = "duration" + eCfg.MediatorSubjectFields = []string{"subject"} + eCfg.MediatorReqTypeFields = []string{"reqtype"} + eCfg.MediatorDirectionFields = []string{"direction"} + eCfg.MediatorTenantFields = []string{"tenant"} + eCfg.MediatorTORFields = []string{"tor"} + eCfg.MediatorAccountFields = []string{"account"} + eCfg.MediatorDestFields = []string{"destination"} + eCfg.MediatorTimeAnswerFields = []string{"time_answer"} + eCfg.MediatorDurationFields = []string{"duration"} eCfg.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv" eCfg.MediatorCDROutDir = "/var/log/cgrates/cdr/out/freeswitch/csv" eCfg.SMEnabled = false @@ -146,21 +147,22 @@ func TestConfigFromFile(t *testing.T) { eCfg.CDRSListen = "test" eCfg.CDRSfsJSONEnabled = true eCfg.CDRSMediator = "test" + eCfg.CDRSExtraFields = []string{"test"} eCfg.MediatorEnabled = true eCfg.MediatorListen = "test" eCfg.MediatorRater = "test" eCfg.MediatorRaterReconnects = 99 eCfg.MediatorCDRType = "test" eCfg.MediatorAccIdField = "test" - eCfg.MediatorSubjectFields = "test" - eCfg.MediatorReqTypeFields = "test" - eCfg.MediatorDirectionFields = "test" - eCfg.MediatorTenantFields = "test" - eCfg.MediatorTORFields = "test" - eCfg.MediatorAccountFields = "test" - eCfg.MediatorDestFields = "test" - eCfg.MediatorTimeStartFields = "test" - eCfg.MediatorDurationFields = "test" + eCfg.MediatorSubjectFields = []string{"test"} + eCfg.MediatorReqTypeFields = []string{"test"} + eCfg.MediatorDirectionFields = []string{"test"} + eCfg.MediatorTenantFields = []string{"test"} + eCfg.MediatorTORFields = []string{"test"} + eCfg.MediatorAccountFields = []string{"test"} + eCfg.MediatorDestFields = []string{"test"} + eCfg.MediatorTimeAnswerFields = []string{"test"} + eCfg.MediatorDurationFields = []string{"test"} eCfg.MediatorCDRInDir = "test" eCfg.MediatorCDROutDir = "test" eCfg.SMEnabled = true diff --git a/config/helpers.go b/config/helpers.go new file mode 100644 index 000000000..b76663d4e --- /dev/null +++ b/config/helpers.go @@ -0,0 +1,45 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2013 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package config + +import ( + "code.google.com/p/goconf/conf" + "errors" + "strings" +) + + +// Adds support for slice values in config +func ConfigSlice( c *conf.ConfigFile, section, valName string ) ([]string, error) { + sliceStr, errGet := c.GetString(section, valName) + if errGet != nil { + return nil, errGet + } + cfgValStrs := strings.Split(sliceStr, ",") // If need arrises, we can make the separator configurable + if len(cfgValStrs)==1 && cfgValStrs[0]=="" { // Prevents returning iterable with empty value + return []string{}, nil + } + for _,elm := range cfgValStrs { + if elm == "" { //One empty element is presented when splitting empty string + return nil, errors.New("Empty values in config slice") + + } + } + return cfgValStrs, nil +} diff --git a/config/test_data.txt b/config/test_data.txt index d55f64293..66e1cb1c9 100644 --- a/config/test_data.txt +++ b/config/test_data.txt @@ -1,71 +1,72 @@ -# Test Data. -# NOT A REAL CONFIGURATION FILE +# TEST DATA - NOT FOR PRODUCTION USAGE +# [global] datadb_type = test # The main database: . -datadb_host = test # Database host address. +datadb_host = test # Database host address. datadb_port = test # Port to reach the database. datadb_name = test # The name of the database to connect to. datadb_user = test # Username to use when connecting to database. -datadb_passwd = test # Password to use when connecting to database. -logdb_type = test # Log/stored database type to use: -logdb_host = test # The host to connect to. Values that start with / are for UNIX domain sockets. +datadb_passwd = test # Password to use when connecting to database. +logdb_type = test # Log/stored database type to use: +logdb_host = test # The host to connect to. Values that start with / are for UNIX domain sockets. logdb_port = test # The port to reach the logdb. logdb_name = test # The name of the log database to connect to. -logdb_user = test # Username to use when connecting to logdb. +logdb_user = test # Username to use when connecting to logdb. logdb_passwd = test # Password to use when connecting to logdb. rpc_encoding = test # RPC encoding used on APIs: . -default_reqtype = test # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>. +default_reqtype = test # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>. default_tor = test # Default Type of Record to consider when missing from requests. default_tenant = test # Default Tenant to consider when missing from requests. default_subject = test # Default rating Subject to consider when missing from requests. [balancer] -enabled = true # Start Balancer service: . -listen = test # Balancer listen interface: . +enabled = true # Start Balancer service: . +listen = test # Balancer listen interface: . [rater] -enabled = true # Enable Rater service: . +enabled = true # Enable Rater service: . balancer = test # Register to Balancer as worker: . -listen = test # Rater's listening interface: . +listen = test # Rater's listening interface: . [scheduler] -enabled = true # Starts Scheduler service: . + enabled = true # Starts Scheduler service: . [cdrs] -listen = test # CDRS's listening interface: . -freeswitch_json_enabled = true # Enable the handler for FreeSWITCH JSON CDRs: . -mediator = test # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> +listen=test # CDRS's listening interface: . +freeswitch_json_enabled=true # Enable the handler for FreeSWITCH JSON CDRs: . +mediator = test # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> +extra_fields = test # Extra fields to store in CDRs [mediator] -enabled = true # Starts Mediator service: . -listen = test # Mediator's listening interface: . -rater = test # Address where to reach the Rater: +enabled = true # Starts Mediator service: . +listen=test # Mediator's listening interface: . +rater = test # Address where to reach the Rater: rater_reconnects = 99 # Number of reconnects to rater before giving up. -cdr_type = test # CDR type . +cdr_type = test # CDR type . accid_field = test # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs. -subject_fields = test # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs. +subject_fields = test # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs. reqtype_fields = test # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs. -direction_fields = test # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs. -tenant_fields = test # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs. +direction_fields = test # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs. +tenant_fields = test # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs. tor_fields = test # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs. -account_fields = test # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs. -destination_fields = test # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs. -time_start_fields = test # Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs. -duration_fields = test # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs. -cdr_in_dir = test # Absolute path towards the directory where the CDRs are kept (file stored CDRs). -cdr_out_dir = test # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs). +account_fields = test # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs. +destination_fields = test # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs. +time_answer_fields = test # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs. +duration_fields = test # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs. +cdr_in_dir = test # Absolute path towards the directory where the CDRs are kept (file stored CDRs). +cdr_out_dir = test # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs). [session_manager] -enabled = true # Starts SessionManager service: . -switch_type = test # Defines the type of switch behind: . -rater = test # Address where to reach the Rater. +enabled = true # Starts SessionManager service: . +switch_type = test # Defines the type of switch behind: . +rater = test # Address where to reach the Rater. rater_reconnects = 99 # Number of reconnects to rater before giving up. debit_interval = 99 # Interval to perform debits on. [freeswitch] -server = test # Adress where to connect to FreeSWITCH socket. -passwd = test # FreeSWITCH socket password. -reconnects = 99 # Number of attempts on connect failure. +server = test # Adress where to connect to FreeSWITCH socket. +passwd = test # FreeSWITCH socket password. +reconnects = 99 # Number of attempts on connect failure. diff --git a/data/conf/cgr_fs_prep_csv.cfg b/data/conf/cgr_fs_prep_csv.cfg deleted file mode 100644 index f0856a531..000000000 --- a/data/conf/cgr_fs_prep_csv.cfg +++ /dev/null @@ -1,72 +0,0 @@ -# CGRateS Configuration file -# -# This file contains the default configuration hardcoded into CGRateS. -# This is what you get when you load CGRateS with an empty configuration file. -# [global] must exist in all files, rest of the configuration is inter-changeable. - -[global] -# datadb_type = redis # The main database: . -# datadb_host = 127.0.0.1 # Database host address. -# datadb_port = 6379 # Port to reach the database. -# datadb_name = 10 # The name of the database to connect to. -# datadb_user = # Username to use when connecting to database. -# datadb_passwd = # Password to use when connecting to database. -# logdb_type = mongo # Log/stored database type to use: -# logdb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets. -# logdb_port = 27017 # The port to reach the logdb. -# logdb_name = cgrates # The name of the log database to connect to. -# logdb_user = # Username to use when connecting to logdb. -# logdb_passwd = # Password to use when connecting to logdb. - -[balancer] -# enabled = false # Start Balancer service: . -# listen = 127.0.0.1:2012 # Balancer listen interface: . -# rpc_encoding = gob # RPC encoding used: . - -[rater] - enabled = true # Enable Rater service: . -# balancer = disabled # Register to Balancer as worker: . -# listen = 127.0.0.1:2012 # Rater's listening interface: . -# rpc_encoding = gob # RPC encoding used: . - -[scheduler] - enabled = true # Starts Scheduler service: . - -[mediator] - enabled = true # Starts Mediator service: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater. -# rater_reconnects = 3 # Number of reconnects to rater before giving up. -# rpc_encoding = gob # RPC encoding used when talking to Rater: . -# skipdb = false # Skips database checks for previous recorded prices: . -# pseudoprepaid = false # Execute debits together with pricing: . -# cdr_type = freeswitch_cdr # CDR type . -# cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept. -# cdr_out_dir = /var/log/cgrates/cdr_out # Absolute path towards the directory where processed CDRs will be exported. - -[session_manager] - enabled = true # Starts SessionManager service: . -# switch_type = freeswitch # Defines the type of switch behind: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater. -# rater_reconnects = 3 # Number of reconnects to rater before giving up. -# debit_interval = 5 # Interval to perform debits on. -# rpc_encoding = gob # RPC encoding used when talking to Rater: . - default_reqtype = prepaid # Default request type to consider when missing from requests: <""|prepaid|postpaid>. -# default_tor = 0 # Default Type of Record to consider when missing from requests. -# default_tenant = 0 # Default Tenant to consider when missing from requests. -# default_subject = 0 # Default rating Subject to consider when missing from requests. - -[freeswitch] -# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket. -# passwd = ClueCon # FreeSWITCH socket password. -# reconnects = 5 # Number of attempts on connect failure. -# uuid_index = 10 # Index of the UUID info in the CDR file. -# direction_index = -1 # Index of the CallDirection info in the CDR file. -# tor_index = -1 # Index of the TypeOfRecord info in the CDR file. -# tenant_index = -1 # Index of the Tenant info in the CDR file. -# subject_index = -1 # Index of the Subject info in the CDR file. -1 to query database instead of rater -# account_index = -1 # Index of the Account info in the CDR file. -# destination_index = -1 # Index of the Destination info in the CDR file. -# time_start_index = -1 # Index of the TimeStart info in the CDR file. -# duration_index = -1 # Index of the CallDuration info in the CDR file. - - diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index ba96c3e5d..58df413ae 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -40,6 +40,7 @@ # listen=127.0.0.1:2022 # CDRS's listening interface: . # freeswitch_json_enabled=false # Enable the handler for FreeSWITCH JSON CDRs: . # mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> +# extra_fields = # Extra fields to store in CDRs [mediator] # enabled = false # Starts Mediator service: . @@ -49,13 +50,13 @@ # cdr_type = freeswitch_http_json # CDR type . # accid_field = accid # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs. # subject_fields = subject # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs. -# reqtype_fields = -1 # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs. +# reqtype_fields = reqtype # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs. # direction_fields = direction # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs. # tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs. # tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs. # account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs. # destination_fields = destination # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs. -# time_start_fields = time_start # Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs. +# time_answer_fields = time_answer # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs. # duration_fields = duration # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs. # cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept (file stored CDRs). # cdr_out_dir = /var/log/cgrates/cdr/out/freeswitch/csv # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs). diff --git a/data/storage/mysql/create_cdrs_tables.sql b/data/storage/mysql/create_cdrs_tables.sql index cd11ec32b..3e2eae052 100644 --- a/data/storage/mysql/create_cdrs_tables.sql +++ b/data/storage/mysql/create_cdrs_tables.sql @@ -13,7 +13,7 @@ CREATE TABLE `cdrs_primary` ( `account` varchar(64) NOT NULL, `subject` varchar(64) NOT NULL, `destination` varchar(64) NOT NULL, - `time_start` datetime NOT NULL, + `time_answer` datetime NOT NULL, `duration` int(11) NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `cgrid` (`cgrid`) diff --git a/mediator/fsfilecsvcdr.go b/mediator/fsfilecsvcdr.go new file mode 100644 index 000000000..7a30c11de --- /dev/null +++ b/mediator/fsfilecsvcdr.go @@ -0,0 +1,121 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2013 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package mediator + +import ( + "time" + "strconv" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +type FScsvCDR struct { + rowData []string // The original row extracted form csv file + accIdIdx, + subjectIdx, + reqtypeIdx, + directionIdx, + tenantIdx, + torIdx, + accountIdx, + destinationIdx, + answerTimeIdx, + durationIdx int // Field indexes + cgrCfg *config.CGRConfig // CGR Config instance +} + + +func NewFScsvCDR(cdrRow []string, accIdIdx, subjectIdx, reqtypeIdx, directionIdx, tenantIdx, torIdx, + accountIdx, destinationIdx, answerTimeIdx, durationIdx int, cfg *config.CGRConfig ) (*FScsvCDR, error) { + fscdr := FScsvCDR{ cdrRow, accIdIdx, subjectIdx, reqtypeIdx, directionIdx, tenantIdx, + torIdx, accountIdx, destinationIdx, answerTimeIdx, durationIdx, cfg } + return &fscdr, nil +} + +func (self *FScsvCDR) GetCgrId() string { + return utils.FSCgrId(self.rowData[self.accIdIdx]) +} + +func (self *FScsvCDR) GetAccId() string { + return self.rowData[self.accIdIdx] +} + +func (self *FScsvCDR) GetCdrHost() string { + return utils.LOCALHOST // ToDo: Maybe extract dynamically the external IP address here +} + +func (self *FScsvCDR) GetDirection() string { + return "OUT" +} + +func (self *FScsvCDR) GetOrigId() string { + return utils.NOT_IMPLEMENTED +} + +func (self *FScsvCDR) GetSubject() string { + return self.rowData[self.subjectIdx] +} + +func (self *FScsvCDR) GetAccount() string { + return self.rowData[self.accountIdx] +} + +func (self *FScsvCDR) GetDestination() string { + return self.rowData[self.destinationIdx] +} + +func (self *FScsvCDR) GetTOR() string { + return self.rowData[self.torIdx] +} + +func (self *FScsvCDR) GetTenant() string { + return self.rowData[self.tenantIdx] +} + +func (self *FScsvCDR) GetReqType() string { + if self.reqtypeIdx == -1 { + return self.cgrCfg.DefaultReqType + } + return self.rowData[self.reqtypeIdx] +} + +func (self *FScsvCDR) GetAnswerTime() (time.Time, error) { + return time.Parse("2006-01-02 15:04:05", self.rowData[self.answerTimeIdx]) +} + +func (self *FScsvCDR) GetDuration() int64 { + dur, _ := strconv.ParseInt(self.rowData[self.durationIdx], 0, 64) + return dur +} + +func (self *FScsvCDR) GetFallbackSubj() string { + return utils.NOT_IMPLEMENTED +} + +func (self *FScsvCDR) GetExtraFields() map[string]string { + return nil +} + + + + + + + + diff --git a/mediator/mediator.go b/mediator/mediator.go index b86a17e9e..7da1451d7 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -19,11 +19,14 @@ along with this program. If not, see package mediator import ( + "errors" "bufio" "encoding/csv" "flag" "fmt" "github.com/cgrates/cgrates/rater" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" "github.com/howeyc/fsnotify" "os" "path" @@ -32,107 +35,168 @@ import ( "time" ) -type mediatorFieldIdxs []int - -// Extends goconf to provide us the slice with indexes we need for multiple mediation -func (mfi *mediatorFieldIdxs) Load(idxs string) error { - cfgStrIdxs := strings.Split(idxs, ",") - if len(cfgStrIdxs) == 0 { - return fmt.Errorf("Undefined %s", idxs) +func NewMediator( connector rater.Connector, storDb rater.DataStorage, cfg *config.CGRConfig) (m *Mediator, err error) { + m = &Mediator{ + connector: connector, + storDb: storDb, + cgrCfg: cfg, } - for _, cfgStrIdx := range cfgStrIdxs { - if cfgIntIdx, errConv := strconv.Atoi(cfgStrIdx); errConv != nil || cfgStrIdx == "" { - return fmt.Errorf("All mediator index members (%s) must be ints", idxs) - } else { - *mfi = append(*mfi, cfgIntIdx) - } + m.fieldNames = make( map[string][]string ) + m.fieldIdxs = make( map[string][]int ) + // Load config fields + if errLoad := m.loadConfig(); errLoad != nil { + return nil, errLoad } - return nil + return m, nil } type Mediator struct { connector rater.Connector - loggerDb rater.DataStorage - outputDir string - directionIndexs, - torIndexs, - tenantIndexs, - subjectIndexs, - accountIndexs, - destinationIndexs, - timeStartIndexs, - durationIndexs, - uuidIndexs mediatorFieldIdxs + storDb rater.DataStorage + cgrCfg *config.CGRConfig + cdrInDir, cdrOutDir string + accIdField string + accIdIdx int // Populated only for csv files where we have no names but indexes for the fields + fieldNames map[string][]string + fieldIdxs map[string][]int // Populated only for csv files where we have no names but indexes for the fields } -// Creates a new mediator object parsing the indexses -func NewMediator(connector rater.Connector, - loggerDb rater.DataStorage, - cfg *config.CGRConfig) (m *Mediator, err error) { - m = &Mediator{ - connector: connector, - loggerDb: loggerDb, - outputDir: outputDir, +// Load configuration out of config fields and does the necessary checks +func (self *Mediator) loadConfig() error { + fieldKeys := []string{"subject", "reqtype", "direction", "tenant", "tor", "account", "destination", "time_start", "duration"} + cfgVals := [][]string{self.cgrCfg.MediatorSubjectFields, self.cgrCfg.MediatorReqTypeFields, self.cgrCfg.MediatorDirectionFields, + self.cgrCfg.MediatorTenantFields, self.cgrCfg.MediatorTORFields, self.cgrCfg.MediatorAccountFields, self.cgrCfg.MediatorDestFields, + self.cgrCfg.MediatorTimeAnswerFields, self.cgrCfg.MediatorDurationFields} + + refIdx := 0 // Subject becomes reference for our checks + if len(cfgVals[refIdx]) == 0 { + return fmt.Errorf("Unconfigured %s fields", fieldKeys[refIdx]) } - idxs := []string{directionIndexs, torIndexs, tenantIndexs, subjectIndexs, accountIndexs, - destinationIndexs, timeStartIndexs, durationIndexs, uuidIndexs} - objs := []*mediatorFieldIdxs{&m.directionIndexs, &m.torIndexs, &m.tenantIndexs, &m.subjectIndexs, - &m.accountIndexs, &m.destinationIndexs, &m.timeStartIndexs, &m.durationIndexs, &m.uuidIndexs} - for i, o := range objs { - err = o.Load(idxs[i]) - if err != nil { - return + // All other configured fields must match the length of reference fields + for iCfgVal := range cfgVals { + if ( len(cfgVals[refIdx])!=len(cfgVals[iCfgVal]) ) { + // Make sure we have everywhere the length of reference key (subject) + return errors.New("Inconsistent lenght of mediator fields.") } } - if !m.validateIndexses() { - err = fmt.Errorf("All members must have the same length") + + // AccIdField has no special requirements, should just exist + if self.cgrCfg.MediatorAccIdField == "" { + return errors.New("Undefined mediator accid field") } - return + self.accIdField = self.cgrCfg.MediatorAccIdField + + var errConv error + // Specific settings of CSV style CDRS + if self.cgrCfg.MediatorCDRType == utils.FSCDR_FILE_CSV { + // Check paths to be valid before adding as configuration + if _, err := os.Stat(self.cgrCfg.MediatorCDRInDir); err != nil { + return fmt.Errorf("The input path for mediator does not exist: %v", self.cgrCfg.MediatorCDRInDir) + } else { + self.cdrInDir = self.cgrCfg.MediatorCDRInDir + } + if _, err := os.Stat(self.cgrCfg.MediatorCDROutDir); err != nil { + return fmt.Errorf("The output path for mediator does not exist: %v", self.cgrCfg.MediatorCDROutDir) + } else { + self.cdrOutDir = self.cgrCfg.MediatorCDROutDir + } + if self.accIdIdx, errConv = strconv.Atoi(self.cgrCfg.MediatorAccIdField); errConv != nil { + return errors.New("AccIdIndex must be integer.") + } + } + + // Load here field names and convert to integers in case of unamed cdrs like CSV + for idx, key := range fieldKeys { + self.fieldNames[key] = cfgVals[idx] + if self.cgrCfg.MediatorCDRType == utils.FSCDR_FILE_CSV { // Special case when field names represent indexes of their location in file + self.fieldIdxs[key] = make( []int, len(cfgVals[idx]) ) + for iStr, cfgStr := range cfgVals[idx] { + if self.fieldIdxs[key][iStr], errConv = strconv.Atoi(cfgStr); errConv != nil { + return fmt.Errorf("All mediator index members (%s) must be ints", key) + } + } + } + } + + return nil } -// Make sure all indexes are having same lenght -func (m *Mediator) validateIndexses() bool { - refLen := len(m.subjectIndexs) - for _, fldIdxs := range []mediatorFieldIdxs{m.directionIndexs, m.torIndexs, m.tenantIndexs, - m.accountIndexs, m.destinationIndexs, m.timeStartIndexs, m.durationIndexs, m.uuidIndexs} { - if len(fldIdxs) != refLen { - return false - } - } - return true -} + // Watch the specified folder for file moves and parse the files on events -func (m *Mediator) TrackCDRFiles(cdrPath string) (err error) { +func (self *Mediator) TrackCDRFiles() (err error) { watcher, err := fsnotify.NewWatcher() if err != nil { return } defer watcher.Close() - err = watcher.Watch(cdrPath) + err = watcher.Watch(self.cdrInDir) if err != nil { return } - rater.Logger.Info(fmt.Sprintf("Monitoring %v for file moves.", cdrPath)) + rater.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cdrInDir)) for { select { case ev := <-watcher.Event: if ev.IsCreate() && path.Ext(ev.Name) != ".csv" { - rater.Logger.Info(fmt.Sprintf("Parsing: %v", ev.Name)) - err = m.parseCSV(ev.Name) + rater.Logger.Info(fmt.Sprintf("Parsing: %s", ev.Name)) + err = self.MediateCSVCDR(ev.Name) if err != nil { return err } } case err := <-watcher.Error: - rater.Logger.Err(fmt.Sprintf("Inotify error: %v", err)) + rater.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error())) } } return } +// Retrive the cost from logging database +func (self *Mediator) getCostsFromDB(cdr utils.CDR) (cc *rater.CallCost, err error) { + return self.storDb.GetCallCostLog(cdr.GetCgrId(), rater.SESSION_MANAGER_SOURCE) +} + +// Retrive the cost from rater +func (self *Mediator) getCostsFromRater(cdr utils.CDR) (*rater.CallCost, error) { + cc := &rater.CallCost{} + d, err := time.ParseDuration(strconv.FormatInt(cdr.GetDuration(),10) + "s") + if err != nil { + return nil, err + } + if d.Seconds() == 0 { // failed call, returning empty callcost, no error + return cc, nil + } + t1, err := cdr.GetAnswerTime() + if err != nil { + return nil, err + } + cd := rater.CallDescriptor{ + Direction: "OUT", //record[m.directionFields[runIdx]] TODO: fix me + Tenant: cdr.GetTenant(), + TOR: cdr.GetTOR(), + Subject: cdr.GetSubject(), + Account: cdr.GetAccount(), + Destination: cdr.GetDestination(), + TimeStart: t1, + TimeEnd: t1.Add(d)} + if cdr.GetReqType()== utils.PSEUDOPREPAID { + err = self.connector.Debit(cd, cc) + } else { + err = self.connector.GetCost(cd, cc) + } + if err != nil { + self.storDb.LogError(cdr.GetCgrId(), rater.MEDIATOR_SOURCE, err.Error()) + } else { + // If the mediator calculated a price it will write it to logdb + self.storDb.LogCallCost(cdr.GetCgrId(), rater.MEDIATOR_SOURCE, cc) + } + return cc, nil +} + + // Parse the files and get cost for every record -func (m *Mediator) parseCSV(cdrfn string) (err error) { +func (self *Mediator) MediateCSVCDR(cdrfn string) (err error) { flag.Parse() file, err := os.Open(cdrfn) defer file.Close() @@ -143,7 +207,7 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) { csvReader := csv.NewReader(bufio.NewReader(file)) _, fn := path.Split(cdrfn) - fout, err := os.Create(path.Join(m.outputDir, fn)) + fout, err := os.Create(path.Join(self.cdrOutDir, fn)) if err != nil { return err } @@ -153,18 +217,36 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) { for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() { //t, _ := time.Parse("2006-01-02 15:04:05", record[5]) var cc *rater.CallCost - for runIdx, idxVal := range m.subjectIndexs { // Query costs for every run index given by subject - if idxVal == -1 { // -1 as subject means use database to get previous set price - cc, err = m.getCostsFromDB(record, runIdx) + + for runIdx := range self.fieldIdxs["subject"] { // Query costs for every run index given by subject + csvCDR, errCDR := NewFScsvCDR( record, self.accIdIdx, + self.fieldIdxs["subject"][runIdx], + self.fieldIdxs["reqtype"][runIdx], + self.fieldIdxs["direction"][runIdx], + self.fieldIdxs["tenant"][runIdx], + self.fieldIdxs["tor"][runIdx], + self.fieldIdxs["account"][runIdx], + self.fieldIdxs["destination"][runIdx], + self.fieldIdxs["answer_time"][runIdx], + self.fieldIdxs["duration"][runIdx], + self.cgrCfg) + if errCDR != nil { + rater.Logger.Err(fmt.Sprintf(" Could not calculate price for accid: <%s>, err: <%s>", + record[self.accIdIdx], errCDR.Error())) + } + var errCost error + if (csvCDR.GetReqType()==utils.PREPAID || csvCDR.GetReqType()==utils.POSTPAID){ + // Should be previously calculated and stored in DB + cc, errCost = self.getCostsFromDB( csvCDR ) } else { - cc, err = m.getCostsFromRater(record, runIdx) + cc, errCost = self.getCostsFromRater( csvCDR ) } cost := "-1" - if err != nil || cc == nil { - rater.Logger.Err(fmt.Sprintf(" Could not calculate price for uuid: <%s>, err: <%s>, cost: <%v>", record[m.uuidIndexs[runIdx]], err.Error(), cc)) + if errCost != nil || cc == nil { + rater.Logger.Err(fmt.Sprintf(" Could not calculate price for accid: <%s>, err: <%s>, cost: <%v>", csvCDR.GetAccId(), err.Error(), cc)) } else { cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64) - rater.Logger.Debug(fmt.Sprintf("Calculated for uuid:%s, cost: %v", record[m.uuidIndexs[runIdx]], cost)) + rater.Logger.Debug(fmt.Sprintf("Calculated for accid:%s, cost: %v", csvCDR.GetAccId(), cost)) } record = append(record, cost) } @@ -174,75 +256,23 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) { return } -// Retrive the cost from logging database -func (m *Mediator) getCostsFromDB(record []string, runIdx int) (cc *rater.CallCost, err error) { - searchedUUID := record[m.uuidIndexs[runIdx]] - cc, err = m.loggerDb.GetCallCostLog(searchedUUID, rater.SESSION_MANAGER_SOURCE) - return -} - -// Retrive the cost from rater -func (m *Mediator) getCostsFromRater(record []string, runIdx int) (cc *rater.CallCost, err error) { - d, err := time.ParseDuration(record[m.durationIndexs[runIdx]] + "s") - if err != nil { - return - } - - cc = &rater.CallCost{} - if d.Seconds() == 0 { // failed call, returning empty callcost, no error - return cc, nil - } - t1, err := time.Parse("2006-01-02 15:04:05", record[m.timeStartIndexs[runIdx]]) - if err != nil { - return - } - cd := rater.CallDescriptor{ - Direction: "OUT", //record[m.directionIndexs[runIdx]] TODO: fix me - Tenant: record[m.tenantIndexs[runIdx]], - TOR: record[m.torIndexs[runIdx]], - Subject: record[m.subjectIndexs[runIdx]], - Account: record[m.accountIndexs[runIdx]], - Destination: record[m.destinationIndexs[runIdx]], - TimeStart: t1, - TimeEnd: t1.Add(d)} - if cfg.DefaultReqType == PSEUDOPREPAID { //ToDo: Implement here dynamically getting the tor out of record instance - err = m.connector.Debit(cd, cc) + +func (self *Mediator) MediateDBCDR(cdr utils.CDR, db rater.DataStorage) error { + var cc *rater.CallCost + var errCost error + if (cdr.GetReqType()==utils.PREPAID || cdr.GetReqType()==utils.POSTPAID){ + // Should be previously calculated and stored in DB + cc, errCost = self.getCostsFromDB( cdr ) } else { - err = m.connector.GetCost(cd, cc) + cc, errCost = self.getCostsFromRater( cdr ) } - if err != nil { - m.loggerDb.LogError(record[m.uuidIndexs[runIdx]], rater.MEDIATOR_SOURCE, err.Error()) + cost := "-1" + if errCost != nil || cc == nil { + rater.Logger.Err(fmt.Sprintf(" Could not calculate price for cgrid: <%s>, err: <%s>, cost: <%v>", cdr.GetCgrId(), errCost.Error(), cc)) } else { - // If the mediator calculated a price it will write it to logdb - m.loggerDb.LogCallCost(record[m.uuidIndexs[runIdx]], rater.MEDIATOR_SOURCE, cc) + cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64) + rater.Logger.Debug(fmt.Sprintf(" Calculated for cgrid:%s, cost: %v", cdr.GetCgrId(), cost)) } - return + return self.storDb.SetRatedCdr(cdr, cc) } -/* Calculates price for the specified cdr and writes the new cdr with price to -the storage. If the cdr is nil then it will fetch it from the storage. */ -func (m *Mediator) MediateCdrFromDB(cdr rater.CDR, db rater.DataStorage) error { - cc := &rater.CallCost{} - startTime, err := cdr.GetStartTime() - if err != nil { - return err - } - endTime, err := cdr.GetEndTime() - if err != nil { - return err - } - cd := rater.CallDescriptor{ - Direction: cdr.GetDirection(), - Tenant: cdr.GetTenant(), - TOR: cdr.GetTOR(), - Subject: cdr.GetSubject(), - Account: cdr.GetAccount(), - Destination: cdr.GetDestination(), - TimeStart: startTime, - TimeEnd: endTime} - if err := m.connector.GetCost(cd, cc); err != nil { - fmt.Println("Got error in the mediator getCost", err.Error()) - return err - } - return db.SetRatedCdr(cdr, cc) -} diff --git a/mediator/mediator_test.go b/mediator/mediator_test.go index e217b6645..a37db7185 100644 --- a/mediator/mediator_test.go +++ b/mediator/mediator_test.go @@ -1,45 +1,10 @@ package mediator import ( - "testing" + //"testing" ) -func TestIndexLoad(t *testing.T) { - idxs := make(mediatorFieldIdxs, 0) - err := idxs.Load("1,2,3") - if err != nil && len(idxs) != 3 { - t.Error("Error parsing indexses ", err) - } -} - -func TestIndexLoadError(t *testing.T) { - idxs := make(mediatorFieldIdxs, 0) - err := idxs.Load("1,a,2") - if err == nil { - t.Error("Error parsing indexses ", err) - } -} - -func TestIndexLoadEmpty(t *testing.T) { - idxs := make(mediatorFieldIdxs, 0) - err := idxs.Load("") - if err == nil { - t.Error("Error parsing indexses ", err) - } -} - -func TestIndexLengthSame(t *testing.T) { - m := new(Mediator) - objs := []*mediatorFieldIdxs{&m.directionIndexs, &m.torIndexs, &m.tenantIndexs, &m.subjectIndexs, - &m.accountIndexs, &m.destinationIndexs, &m.timeStartIndexs, &m.durationIndexs, &m.uuidIndexs} - for _, o := range objs { - o.Load("1,2,3") - } - if !m.validateIndexses() { - t.Error("Error checking length") - } -} - +/* func TestIndexLengthDifferent(t *testing.T) { m := new(Mediator) objs := []*mediatorFieldIdxs{&m.directionIndexs, &m.torIndexs, &m.tenantIndexs, &m.subjectIndexs, @@ -63,3 +28,4 @@ func TestLoad(t *testing.T) { t.Errorf("Expected %v was %v", 3, len(m.directionIndexs)) } } +*/ diff --git a/rater/storage_gosexy.go b/rater/storage_gosexy.go index cd6ae70b9..c4678eae6 100644 --- a/rater/storage_gosexy.go +++ b/rater/storage_gosexy.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" "time" + "github.com/cgrates/cgrates/utils" ) type GosexyStorage struct { @@ -217,10 +218,10 @@ func (rs *GosexyStorage) LogError(uuid, source, errstr string) (err error) { return } -func (rs *GosexyStorage) SetCdr(CDR) error { +func (rs *GosexyStorage) SetCdr(utils.CDR) error { return nil } -func (rs *GosexyStorage) SetRatedCdr(CDR, *CallCost) error { +func (rs *GosexyStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } diff --git a/rater/storage_interface.go b/rater/storage_interface.go index 31484ebc4..54b831c2b 100644 --- a/rater/storage_interface.go +++ b/rater/storage_interface.go @@ -25,6 +25,7 @@ import ( "github.com/ugorji/go/codec" "github.com/vmihailenco/msgpack" "labix.org/v2/mgo/bson" + "github.com/cgrates/cgrates/utils" ) const ( @@ -63,8 +64,8 @@ type DataStorage interface { GetActionTimings(string) (ActionTimings, error) SetActionTimings(string, ActionTimings) error GetAllActionTimings() (map[string]ActionTimings, error) - SetCdr(CDR) error - SetRatedCdr(CDR, *CallCost) error + SetCdr(utils.CDR) error + SetRatedCdr(utils.CDR, *CallCost) error //GetAllActionTimingsLogs() (map[string]ActionsTimings, error) LogCallCost(uuid, source string, cc *CallCost) error LogError(uuid, source, errstr string) error diff --git a/rater/storage_map.go b/rater/storage_map.go index 9c6c911c6..0741e4010 100644 --- a/rater/storage_map.go +++ b/rater/storage_map.go @@ -23,6 +23,7 @@ import ( "fmt" "strings" "time" + "github.com/cgrates/cgrates/utils" ) type MapStorage struct { @@ -183,10 +184,10 @@ func (ms *MapStorage) LogError(uuid, source, errstr string) (err error) { return nil } -func (ms *MapStorage) SetCdr(CDR) error { +func (ms *MapStorage) SetCdr(utils.CDR) error { return nil } -func (ms *MapStorage) SetRatedCdr(CDR, *CallCost) error { +func (ms *MapStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } diff --git a/rater/storage_mongo.go b/rater/storage_mongo.go index 57c499088..fe3e97b96 100644 --- a/rater/storage_mongo.go +++ b/rater/storage_mongo.go @@ -24,6 +24,7 @@ import ( "labix.org/v2/mgo/bson" "log" "time" + "github.com/cgrates/cgrates/utils" ) type MongoStorage struct { @@ -208,10 +209,10 @@ func (ms *MongoStorage) LogError(uuid, source, errstr string) (err error) { return ms.db.C("errlog").Insert(&LogErrEntry{uuid, errstr, source}) } -func (ms *MongoStorage) SetCdr(CDR) error { +func (ms *MongoStorage) SetCdr(utils.CDR) error { return nil } -func (ms *MongoStorage) SetRatedCdr(CDR, *CallCost) error { +func (ms *MongoStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } diff --git a/rater/storage_mysql.go b/rater/storage_mysql.go index ec001cc60..130100e81 100644 --- a/rater/storage_mysql.go +++ b/rater/storage_mysql.go @@ -23,6 +23,7 @@ import ( "encoding/json" "fmt" _ "github.com/go-sql-driver/mysql" + "github.com/cgrates/cgrates/utils" ) type MySQLStorage struct { @@ -122,8 +123,8 @@ func (mys *MySQLStorage) LogActionTiming(source string, at *ActionTiming, as Act } func (mys *MySQLStorage) LogError(uuid, source, errstr string) (err error) { return } -func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) { - startTime, err := cdr.GetStartTime() +func (mys *MySQLStorage) SetCdr(cdr utils.CDR) (err error) { + startTime, err := cdr.GetAnswerTime() if err != nil { return err } @@ -146,7 +147,7 @@ func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) { } _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('NULL','%s', '%s')", cdr.GetCgrId(), - cdr.GetExtraParameters(), + cdr.GetExtraFields(), )) if err != nil { Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) @@ -155,7 +156,7 @@ func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) { return } -func (mys *MySQLStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) { +func (mys *MySQLStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost) (err error) { _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO rated_cdrs VALUES ('%s', '%s', '%s', '%s')", cdr.GetCgrId(), cc.Cost, diff --git a/rater/storage_postgres.go b/rater/storage_postgres.go index 85023788d..291c2b1d2 100644 --- a/rater/storage_postgres.go +++ b/rater/storage_postgres.go @@ -22,6 +22,7 @@ import ( "database/sql" "encoding/json" "fmt" + "github.com/cgrates/cgrates/utils" _ "github.com/bmizerany/pq" ) @@ -122,8 +123,8 @@ func (psl *PostgresStorage) LogActionTiming(source string, at *ActionTiming, as } func (psl *PostgresStorage) LogError(uuid, source, errstr string) (err error) { return } -func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) { - startTime, err := cdr.GetStartTime() +func (psl *PostgresStorage) SetCdr(cdr utils.CDR) (err error) { + startTime, err := cdr.GetAnswerTime() if err != nil { return err } @@ -146,7 +147,7 @@ func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) { } _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s')", cdr.GetCgrId(), - cdr.GetExtraParameters(), + cdr.GetExtraFields(), )) if err != nil { Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) @@ -155,7 +156,7 @@ func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) { return } -func (psl *PostgresStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) { +func (psl *PostgresStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost) (err error) { if err != nil { return err } diff --git a/rater/storage_redigo.go b/rater/storage_redigo.go index 5581c5a71..935a16e61 100644 --- a/rater/storage_redigo.go +++ b/rater/storage_redigo.go @@ -23,6 +23,7 @@ import ( "github.com/garyburd/redigo/redis" //"log" "time" + "github.com/cgrates/cgrates/utils" ) type RedigoStorage struct { @@ -215,10 +216,10 @@ func (rs *RedigoStorage) LogError(uuid, source, errstr string) (err error) { return } -func (rs *RedigoStorage) SetCdr(CDR) error { +func (rs *RedigoStorage) SetCdr(utils.CDR) error { return nil } -func (rs *RedigoStorage) SetRatedCdr(CDR, *CallCost) error { +func (rs *RedigoStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } diff --git a/rater/storage_redis.go b/rater/storage_redis.go index c9778d354..d71753b62 100644 --- a/rater/storage_redis.go +++ b/rater/storage_redis.go @@ -23,6 +23,7 @@ import ( "github.com/fzzy/radix/redis" //"log" "time" + "github.com/cgrates/cgrates/utils" ) type RedisStorage struct { @@ -242,10 +243,10 @@ func (rs *RedisStorage) LogError(uuid, source, errstr string) (err error) { return } -func (rs *RedisStorage) SetCdr(CDR) error { +func (rs *RedisStorage) SetCdr(utils.CDR) error { return nil } -func (rs *RedisStorage) SetRatedCdr(CDR, *CallCost) error { +func (rs *RedisStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index 0e36fefc5..b40807f06 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -50,8 +50,6 @@ const ( ANSWER = "CHANNEL_ANSWER" HANGUP = "CHANNEL_HANGUP_COMPLETE" PARK = "CHANNEL_PARK" - REQTYPE_PREPAID = "prepaid" - REQTYPE_POSTPAID = "postpaid" AUTH_OK = "+AUTH_OK" DISCONNECT = "+SWITCH DISCONNECT" INSUFFICIENT_FUNDS = "-INSUFFICIENT_FUNDS" diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 48e1e9960..ec849c4c3 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -24,6 +24,7 @@ import ( "fmt" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/rater" + "github.com/cgrates/cgrates/utils" "github.com/cgrates/fsock" "log/syslog" "net" @@ -145,7 +146,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { startTime = time.Now() } // if there is no account configured leave the call alone - if strings.TrimSpace(ev.GetReqType()) != REQTYPE_PREPAID { + if strings.TrimSpace(ev.GetReqType()) != utils.PREPAID { return } if ev.MissingParameter() { @@ -194,7 +195,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { return } defer s.Close() // Stop loop and save the costs deducted so far to database - if ev.GetReqType() == REQTYPE_POSTPAID { + if ev.GetReqType() == utils.POSTPAID { startTime, err := ev.GetStartTime(START_TIME) if err != nil { rater.Logger.Crit("Error parsing postpaid call start time from event") diff --git a/sessionmanager/session.go b/sessionmanager/session.go index b85d617c7..aa25f0c10 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -21,8 +21,8 @@ package sessionmanager import ( "fmt" "github.com/cgrates/cgrates/rater" + "github.com/cgrates/cgrates/utils" "github.com/cgrates/fsock" - "strings" "time" ) @@ -38,8 +38,8 @@ type Session struct { // Creates a new session and starts the debit loop func NewSession(ev Event, sm SessionManager) (s *Session) { - // Ignore calls which have nothing to do with CGRateS - if strings.TrimSpace(ev.GetReqType()) == "" { + // SesionManager only handles prepaid and postpaid calls + if ev.GetReqType() != utils.PREPAID && ev.GetReqType() != utils.POSTPAID { return } // Make sure cgr_type is enforced even if not set by FreeSWITCH @@ -68,9 +68,9 @@ func NewSession(ev Event, sm SessionManager) (s *Session) { sm.DisconnectSession(s, MISSING_PARAMETER) } else { switch ev.GetReqType() { - case REQTYPE_PREPAID: + case utils.PREPAID: go s.startDebitLoop() - case REQTYPE_POSTPAID: + case utils.POSTPAID: // do not loop, make only one debit at hangup } } diff --git a/sessionmanager/session_test.go b/sessionmanager/session_test.go index 00ffae9ae..6fac76144 100644 --- a/sessionmanager/session_test.go +++ b/sessionmanager/session_test.go @@ -21,7 +21,6 @@ package sessionmanager import ( "github.com/cgrates/cgrates/config" "testing" - // "time" ) var ( @@ -55,61 +54,7 @@ var ( ### Test data, not for production usage [global] -datadb_type = test # -datadb_host = test # The host to connect to. Values that start with / are for UNIX domain sockets. -datadb_port = test # The port to bind to. -datadb_name = test # The name of the database to connect to. -datadb_user = test # The user to sign in as. -datadb_passwd = test # The user's password.root -logdb_type = test # -logdb_host = test # The host to connect to. Values that start with / are for UNIX domain sockets. -logdb_port = test # The port to bind to. -logdb_name = test # The name of the database to connect to. -logdb_user = test # The user to sign in as. -logdb_passwd = test # The user's password.root - -[balancer] -enabled = true # Start balancer server -listen = test # Balancer listen interface -rpc_encoding = test # use JSON for RPC encoding - -[rater] -enabled = true -listen = test # listening address host:port, internal for internal communication only -balancer = test # if defined it will register to balancer as worker -rpc_encoding = test # use JSON for RPC encoding - -[mediator] -enabled = true -cdr_in_dir = test # Freeswitch Master CSV CDR path. -cdr_out_dir = test -rater = test #address where to access rater. Can be internal, direct rater address or the address of a balancer -rpc_encoding = test # use JSON for RPC encoding -skipdb = true -pseudoprepaid = true - -[scheduler] -enabled = true - -[session_manager] -enabled = true -switch_type = test -rater = test #address where to access rater. Can be internal, direct rater address or the address of a balancer -debit_interval = 11 -rpc_encoding = test # use JSON for RPC encoding - -[freeswitch] -server = test # freeswitch address host:port -passwd = test # freeswitch address host:port -direction_index = test -tor_index = test -tenant_index = test -subject_index = test -account_index = test -destination_index = test -time_start_index = test -duration_index = test -uuid_index = test +default_reqtype= `) ) diff --git a/rater/cdr.go b/utils/cdr.go similarity index 85% rename from rater/cdr.go rename to utils/cdr.go index a25b66e3c..b8dbff990 100644 --- a/rater/cdr.go +++ b/utils/cdr.go @@ -16,14 +16,13 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package rater +package utils import ( "time" ) type CDR interface { - New([]byte) (CDR, error) GetCgrId() string GetAccId() string GetCdrHost() string @@ -32,14 +31,11 @@ type CDR interface { GetSubject() string GetAccount() string GetDestination() string - GetCallDestNr() string GetTOR() string - GetUUID() string GetTenant() string GetReqType() string - GetStartTime() (time.Time, error) - GetEndTime() (time.Time, error) + GetAnswerTime() (time.Time, error) GetDuration() int64 GetFallbackSubj() string - GetExtraParameters() string + GetExtraFields() map[string]string //Stores extra CDR Fields } diff --git a/utils/consts.go b/utils/consts.go new file mode 100644 index 000000000..307c1fee4 --- /dev/null +++ b/utils/consts.go @@ -0,0 +1,13 @@ +package utils + +const ( + LOCALHOST = "127.0.0.1" + FSCDR_FILE_CSV = "freeswitch_file_csv" + FSCDR_HTTP_JSON = "freeswitch_http_json" + NOT_IMPLEMENTED = "not implemented" + PREPAID = "prepaid" + POSTPAID = "postpaid" + PSEUDOPREPAID = "pseudoprepaid" + RATED = "rated" +) +