diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 1dee125a3..7393b95e8 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -51,10 +51,8 @@ func NewCdrc(config *config.CGRConfig) (*Cdrc, error) { return nil, fmt.Errorf("Folder %s does not exist", dir) } } - if cdrc.cgrCfg.CdrcCdrType == CSV { - if err := cdrc.parseFieldIndexesFromConfig(); err != nil { - return nil, err - } + if err := cdrc.parseFieldsConfig(); err != nil { + return nil, err } cdrc.httpClient = new(http.Client) return cdrc, nil @@ -62,7 +60,7 @@ func NewCdrc(config *config.CGRConfig) (*Cdrc, error) { type Cdrc struct { cgrCfg *config.CGRConfig - fieldIndxes map[string]int // Key is the name of the field, int is the position in the csv file + cfgCdrFields map[string]string // Key is the name of the field httpClient *http.Client } @@ -76,22 +74,25 @@ func (self *Cdrc) Run() error { self.processCdrDir() time.Sleep(self.cgrCfg.CdrcRunDelay) } - return nil } -// Parses fieldIndex strings into fieldIndex integers needed -func (self *Cdrc) parseFieldIndexesFromConfig() error { +// Loads all fields (primary and extra) into cfgCdrFields, do some pre-checks (eg: in case of csv make sure that values are integers) +func (self *Cdrc) parseFieldsConfig() error { var err error - self.fieldIndxes = make(map[string]int) - fieldKeys := []string{utils.ACCID, utils.REQTYPE, utils.DIRECTION, utils.TENANT, utils.TOR, utils.ACCOUNT, utils.SUBJECT, utils.DESTINATION, utils.ANSWER_TIME, utils.DURATION} - fieldIdxStrs := []string{self.cgrCfg.CdrcAccIdField, self.cgrCfg.CdrcReqTypeField, self.cgrCfg.CdrcDirectionField, self.cgrCfg.CdrcTenantField, self.cgrCfg.CdrcTorField, - self.cgrCfg.CdrcAccountField, self.cgrCfg.CdrcSubjectField, self.cgrCfg.CdrcDestinationField, self.cgrCfg.CdrcAnswerTimeField, self.cgrCfg.CdrcDurationField} - for i, strVal := range fieldIdxStrs { - if self.fieldIndxes[fieldKeys[i]], err = strconv.Atoi(strVal); err != nil { - return fmt.Errorf("Cannot parse configuration field %s into integer", fieldKeys[i]) - } - } - // Add extra fields here, extra fields in the form of []string{"indxInCsv1:fieldName1","indexInCsv2:fieldName2"} + self.cfgCdrFields = map[string]string{ + utils.ACCID: self.cgrCfg.CdrcAccIdField, + utils.REQTYPE: self.cgrCfg.CdrcReqTypeField, + utils.DIRECTION: self.cgrCfg.CdrcDirectionField, + utils.TENANT: self.cgrCfg.CdrcTenantField, + utils.TOR: self.cgrCfg.CdrcTorField, + utils.ACCOUNT: self.cgrCfg.CdrcAccountField, + utils.SUBJECT: self.cgrCfg.CdrcSubjectField, + utils.DESTINATION: self.cgrCfg.CdrcDestinationField, + utils.ANSWER_TIME: self.cgrCfg.CdrcAnswerTimeField, + utils.DURATION: self.cgrCfg.CdrcDurationField, + } + + // Add extra fields here, config extra fields in the form of []string{"fieldName1:indxInCsv1","fieldName2: indexInCsv2"} for _, fieldWithIdx := range self.cgrCfg.CdrcExtraFields { splt := strings.Split(fieldWithIdx, ":") if len(splt) != 2 { @@ -100,8 +101,14 @@ func (self *Cdrc) parseFieldIndexesFromConfig() error { if utils.IsSliceMember(utils.PrimaryCdrFields, splt[0]) { return errors.New("Extra cdrc.extra_fields overwriting primary fields") } - if self.fieldIndxes[splt[1]], err = strconv.Atoi(splt[0]); err != nil { - return fmt.Errorf("Cannot parse configuration cdrc extra field %s into integer", splt[1]) + self.cfgCdrFields[splt[0]] = splt[1] + } + // Fields populated, do some sanity checks here + for cdrField, cfgVal := range self.cfgCdrFields { + if utils.IsSliceMember([]string{CSV, FS_CSV}, self.cgrCfg.CdrcCdrType) && !strings.HasPrefix(cfgVal, utils.STATIC_VALUE_PREFIX) { + if _, err = strconv.Atoi(cfgVal); err != nil { + return fmt.Errorf("Cannot parse configuration field %s into integer", cdrField) + } } } return nil @@ -112,11 +119,22 @@ func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) { // engine.Logger.Info(fmt.Sprintf("Processing record %v", record)) v := url.Values{} v.Set(utils.CDRSOURCE, self.cgrCfg.CdrcSourceId) - for fldName, idx := range self.fieldIndxes { - if len(record) <= idx { - return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, fldName) + for cfgFieldName, cfgFieldVal := range self.cfgCdrFields { + var fieldVal string + if strings.HasPrefix(cfgFieldVal, utils.STATIC_VALUE_PREFIX) { + fieldVal = cfgFieldVal[1:] + } else if utils.IsSliceMember([]string{CSV, FS_CSV}, self.cgrCfg.CdrcCdrType) { + if cfgFieldIdx, err := strconv.Atoi(cfgFieldVal); err != nil { // Should in theory never happen since we have already parsed config + return nil, err + } else if len(record) <= cfgFieldIdx { + return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cfgFieldName) + } else { + fieldVal = record[cfgFieldIdx] + } + } else { // Modify here when we add more supported cdr formats + fieldVal = "UNKNOWN" } - v.Set(fldName, record[idx]) + v.Set(cfgFieldName, fieldVal) } return v, nil } @@ -152,14 +170,13 @@ func (self *Cdrc) trackCDRFiles() (err error) { case ev := <-watcher.Event: if ev.IsCreate() && (self.cgrCfg.CdrcCdrType != FS_CSV || path.Ext(ev.Name) != ".csv") { if err = self.processFile(ev.Name); err != nil { - return err + engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error())) } } case err := <-watcher.Error: engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error())) } } - return } // Processe file at filePath and posts the valid cdr rows out of it diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index 3eee88c06..87f72cf4e 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -24,28 +24,40 @@ import ( "testing" ) -func TestParseFieldIndexesFromConfig(t *testing.T) { +func TestParseFieldsConfig(t *testing.T) { // Test default config cgrConfig, _ := config.NewDefaultCGRConfig() // Test primary field index definition cgrConfig.CdrcAccIdField = "detect_me" cdrc := &Cdrc{cgrCfg: cgrConfig} - if err := cdrc.parseFieldIndexesFromConfig(); err == nil { + if err := cdrc.parseFieldsConfig(); err == nil { t.Error("Failed detecting error in accounting id definition", err) } + cgrConfig.CdrcAccIdField = "^static_val" + cgrConfig.CdrcSubjectField = "1" + cdrc = &Cdrc{cgrCfg: cgrConfig} + if err := cdrc.parseFieldsConfig(); err != nil { + t.Error("Failed to corectly parse primary fields %v", cdrc.cfgCdrFields) + } + cgrConfig.CdrcExtraFields = []string{"^static_val:orig_ip"} // Test extra field index definition cgrConfig.CdrcAccIdField = "0" // Put back as int - cgrConfig.CdrcExtraFields = []string{"supplier1", "11:orig_ip"} + cgrConfig.CdrcExtraFields = []string{"supplier1", "orig_ip:11"} cdrc = &Cdrc{cgrCfg: cgrConfig} - if err := cdrc.parseFieldIndexesFromConfig(); err == nil { + if err := cdrc.parseFieldsConfig(); err == nil { t.Error("Failed detecting error in extra fields definition", err) } + cgrConfig.CdrcExtraFields = []string{"supplier1:^top_supplier", "orig_ip:11"} + cdrc = &Cdrc{cgrCfg: cgrConfig} + if err := cdrc.parseFieldsConfig(); err != nil { + t.Errorf("Failed to corectly parse extra fields %v",cdrc.cfgCdrFields) + } } func TestCdrAsHttpForm(t *testing.T) { cgrConfig, _ := config.NewDefaultCGRConfig() cdrc := &Cdrc{cgrCfg: cgrConfig} - if err := cdrc.parseFieldIndexesFromConfig(); err != nil { + if err := cdrc.parseFieldsConfig(); err != nil { t.Error("Failed parsing default fieldIndexesFromConfig", err) } cdrRow := []string{"firstField", "secondField"} @@ -64,7 +76,7 @@ func TestCdrAsHttpForm(t *testing.T) { if cdrAsForm.Get(utils.REQTYPE) != "prepaid" { t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE)) } - if cdrAsForm.Get("supplier") != "supplier1" { - t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE)) - } + //if cdrAsForm.Get("supplier") != "supplier1" { + // t.Error("Unexpected CDR value received", cdrAsForm.Get("supplier")) + //} } diff --git a/config/config.go b/config/config.go index 422906a6f..2db5edf22 100644 --- a/config/config.go +++ b/config/config.go @@ -110,12 +110,13 @@ type CGRConfig struct { CdrcDestinationField string // Destination field identifier. Use index numbers in case of .csv cdrs. CdrcAnswerTimeField string // Answer time field identifier. Use index numbers in case of .csv cdrs. CdrcDurationField string // Duration field identifier. Use index numbers in case of .csv cdrs. - CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2" + CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "field1:index1,field2:index2" SMEnabled bool SMSwitchType string SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer SMRaterReconnects int // Number of reconnect attempts to rater SMDebitInterval int // the period to be debited in advanced during a call (in seconds) + SMMaxCallDuration time.Duration // The maximum duration of a call MediatorEnabled bool // Starts Mediator service: . MediatorListen string // Mediator's listening interface: . MediatorRater string // Address where to reach the Rater: @@ -199,7 +200,7 @@ func (self *CGRConfig) setDefaults() error { self.CdrcDestinationField = "7" self.CdrcAnswerTimeField = "8" self.CdrcDurationField = "9" - self.CdrcExtraFields = []string{"10:supplier","11:orig_ip"} + self.CdrcExtraFields = []string{} self.MediatorEnabled = false self.MediatorListen = "127.0.0.1:2032" self.MediatorRater = "127.0.0.1:2012" @@ -219,6 +220,7 @@ func (self *CGRConfig) setDefaults() error { self.SMRater = "127.0.0.1:2012" self.SMRaterReconnects = 3 self.SMDebitInterval = 10 + self.SMMaxCallDuration = time.Duration(3) * time.Hour self.FreeswitchServer = "127.0.0.1:8021" self.FreeswitchPass = "ClueCon" self.FreeswitchReconnects = 5 @@ -519,6 +521,12 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { if hasOpt = c.HasOption("session_manager", "debit_interval"); hasOpt { cfg.SMDebitInterval, _ = c.GetInt("session_manager", "debit_interval") } + if hasOpt = c.HasOption("session_manager", "max_call_duration"); hasOpt { + maxCallDurStr,_ := c.GetString("session_manager", "max_call_duration") + if cfg.SMMaxCallDuration, errParse = utils.ParseDurationWithSecs(maxCallDurStr); errParse != nil { + return nil, errParse + } + } if hasOpt = c.HasOption("freeswitch", "server"); hasOpt { cfg.FreeswitchServer, _ = c.GetString("freeswitch", "server") } diff --git a/config/config_test.go b/config/config_test.go index 99ddbf056..0e43924d8 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -103,7 +103,7 @@ func TestDefaults(t *testing.T) { eCfg.CdrcDestinationField = "7" eCfg.CdrcAnswerTimeField = "8" eCfg.CdrcDurationField = "9" - eCfg.CdrcExtraFields = []string{"10:supplier","11:orig_ip"} + eCfg.CdrcExtraFields = []string{} eCfg.MediatorEnabled = false eCfg.MediatorListen = "127.0.0.1:2032" eCfg.MediatorRater = "127.0.0.1:2012" @@ -123,6 +123,7 @@ func TestDefaults(t *testing.T) { eCfg.SMRater = "127.0.0.1:2012" eCfg.SMRaterReconnects = 3 eCfg.SMDebitInterval = 10 + eCfg.SMMaxCallDuration = time.Duration(3) * time.Hour eCfg.FreeswitchServer = "127.0.0.1:8021" eCfg.FreeswitchPass = "ClueCon" eCfg.FreeswitchReconnects = 5 @@ -245,6 +246,7 @@ func TestConfigFromFile(t *testing.T) { eCfg.SMRater = "test" eCfg.SMRaterReconnects = 99 eCfg.SMDebitInterval = 99 + eCfg.SMMaxCallDuration = time.Duration(99)*time.Second eCfg.FreeswitchServer = "test" eCfg.FreeswitchPass = "test" eCfg.FreeswitchReconnects = 99 diff --git a/config/test_data.txt b/config/test_data.txt index 2636b4474..0b820a81c 100644 --- a/config/test_data.txt +++ b/config/test_data.txt @@ -94,8 +94,9 @@ duration_fields = test # Name of duration fields to be used during mediation. enabled = true # Starts SessionManager service: . switch_type = test # Defines the type of switch behind: . rater = test # Address where to reach the Rater. -rater_reconnects = 99 # Number of reconnects to rater before giving up. -debit_interval = 99 # Interval to perform debits on. +rater_reconnects = 99 # Number of reconnects to rater before giving up. +debit_interval = 99 # Interval to perform debits on. +max_call_duration = 99 # Maximum call duration a prepaid call can last [freeswitch] server = test # Adress where to connect to FreeSWITCH socket. diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index d2724292f..1ba1e1a2c 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -51,9 +51,9 @@ # mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> [cdre] -# cdr_format = csv # Exported CDRs format -# extra_fields = # List of extra fields to be exported out in CDRs -# export_dir = /var/log/cgrates/cdr/out/cgr # Path where the exported CDRs will be placed +# cdr_format = csv # Exported CDRs format +# extra_fields = # List of extra fields to be exported out in CDRs +# export_dir = /var/log/cgrates/cdr/cdrexport/csv # Path where the exported CDRs will be placed [cdrc] # enabled = false # Enable CDR client functionality @@ -61,8 +61,8 @@ # cdrs_method = http_cgr # Mechanism to use when posting CDRs on server # run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify # cdr_type = csv # CDR file format . -# cdr_in_dir = /var/log/cgrates/cdr/in/csv # Absolute path towards the directory where the CDRs are stored. -# cdr_out_dir = /var/log/cgrates/cdr/out/csv # Absolute path towards the directory where processed CDRs will be moved. +# cdr_in_dir = /var/log/cgrates/cdr/cdrc/in # Absolute path towards the directory where the CDRs are stored. +# cdr_out_dir = /var/log/cgrates/cdr/cdrc/out # Absolute path towards the directory where processed CDRs will be moved. # cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database. # accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs. # reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs. @@ -74,7 +74,7 @@ # destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs. # answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs. # duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs. -# extra_fields = 10:supplier,11:orig_ip # Extra fields identifiers. For .csv, format: :[,:] +# extra_fields = # Extra fields identifiers. For .csv, format: :[...,:] [mediator] # enabled = false # Starts Mediator service: . @@ -97,7 +97,8 @@ # switch_type = freeswitch # Defines the type of switch behind: . # rater = 127.0.0.1:2012 # Address where to reach the Rater. # rater_reconnects = 3 # Number of reconnects to rater before giving up. -# debit_interval = 5 # Interval to perform debits on. +# debit_interval = 10 # Interval to perform debits on. +# max_call_duration = 3h # Maximum call duration a prepaid call can last [freeswitch] # server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket. diff --git a/data/scripts/freeswitch_cdr_csv_rotate.sh b/data/scripts/freeswitch_cdr_csv_rotate.sh new file mode 100755 index 000000000..09bcbd3f5 --- /dev/null +++ b/data/scripts/freeswitch_cdr_csv_rotate.sh @@ -0,0 +1,11 @@ +#! /usr/bin/env sh + +FS_CDR_CSV_DIR=/var/log/freeswitch/cdr-csv +CGR_CDRC_IN_DIR=/var/log/cgrates/cdr/cdrc/in + +/usr/bin/fs_cli -x "cdr_csv rotate" + +find $FS_CDR_CSV_DIR -maxdepth 1 -mindepth 1 -not -name *.csv -exec chown cgrates:cgrates '{}' \; -exec mv '{}' $CGR_CDRC_IN_DIR \; + +exit 0 + diff --git a/data/storage/mysql/create_costdetails_tables.sql b/data/storage/mysql/create_costdetails_tables.sql index d55cb7a34..774bd27cc 100644 --- a/data/storage/mysql/create_costdetails_tables.sql +++ b/data/storage/mysql/create_costdetails_tables.sql @@ -14,8 +14,8 @@ CREATE TABLE `cost_details` ( `account` varchar(128) NOT NULL, `subject` varchar(128) NOT NULL, `destination` varchar(128) NOT NULL, - `cost` DECIMAL(20,4) NOT NULL, `connect_fee` DECIMAL(5,4) NOT NULL, + `cost` DECIMAL(20,4) NOT NULL, `timespans` text, `source` varchar(64) NOT NULL, `runid` varchar(64) NOT NULL, diff --git a/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg b/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg index a5ccac8c6..5ee293764 100644 --- a/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg +++ b/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg @@ -61,20 +61,20 @@ enabled = true # Enable CDR client functionality # cdrs_method = http_cgr # Mechanism to use when posting CDRs on server # run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify cdr_type = freeswitch_csv # CDR file format . -cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are stored. +cdr_in_dir = /var/log/cgrates/cdr/cdrc/in # Absolute path towards the directory where the CDRs are stored. cdr_out_dir = /tmp # Absolute path towards the directory where processed CDRs will be moved. cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database. -# accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs. -# reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs. -# direction_field = 2 # Direction field identifier. Use index numbers in case of .csv cdrs. -# tenant_field = 3 # Tenant field identifier. Use index numbers in case of .csv cdrs. -# tor_field = 4 # Type of Record field identifier. Use index numbers in case of .csv cdrs. -# account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs. -# subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs. -# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs. -# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs. -# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs. -# extra_fields = 10:supplier,11:orig_ip # Extra fields identifiers. For .csv, format: :[,:] +accid_field = 10 # Accounting id field identifier. Use index number in case of .csv cdrs. +reqtype_field = 16 # Request type field identifier. Use index number in case of .csv cdrs. +direction_field = ^*out # Direction field identifier. Use index numbers in case of .csv cdrs. +tenant_field = ^cgrates.org # Tenant field identifier. Use index numbers in case of .csv cdrs. +tor_field = ^call # Type of Record field identifier. Use index numbers in case of .csv cdrs. +account_field = 1 # Account field identifier. Use index numbers in case of .csv cdrs. +subject_field = 1 # Subject field identifier. Use index numbers in case of .csv CDRs. +destination_field = 2 # Destination field identifier. Use index numbers in case of .csv cdrs. +answer_time_field = 5 # Answer time field identifier. Use index numbers in case of .csv cdrs. +duration_field = 8 # Duration field identifier. Use index numbers in case of .csv cdrs. +extra_fields = read_codec:13,write_codec:14 # Extra fields identifiers. For .csv, format: : [mediator] enabled = true # Starts Mediator service: . diff --git a/data/tutorials/fs_csv/freeswitch/etc/freeswitch/dialplan/default.xml b/data/tutorials/fs_csv/freeswitch/etc/freeswitch/dialplan/default.xml index 8bc0ea41b..cfdc0a9e4 100644 --- a/data/tutorials/fs_csv/freeswitch/etc/freeswitch/dialplan/default.xml +++ b/data/tutorials/fs_csv/freeswitch/etc/freeswitch/dialplan/default.xml @@ -254,7 +254,7 @@ --> - + @@ -262,7 +262,7 @@ - + @@ -272,7 +272,7 @@ - + diff --git a/docs/installation.rst b/docs/installation.rst index ab383ab6d..76f9bfa94 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -51,13 +51,13 @@ As DataDB types (rating and accounting subsystems): - Redis_ -As StorDB (persistent storage for CDRs and tariff plan versions): +As StorDB (persistent storage for CDRs and tariff plan versions). + +Once installed there should be no special requirements in terms of setup since no schema is necessary. - MySQL_ -Redis_: once installed there should be no special requirements in terms of setup since no schema is necessary. - -MySQL_: once database is installed, CGRateS database needs to be set-up out of provided scripts (example for the paths set-up by debian package) +Once database is installed, CGRateS database needs to be set-up out of provided scripts (example for the paths set-up by debian package) :: diff --git a/docs/tut_freeswitch_csv.rst b/docs/tut_freeswitch_csv.rst index 70b57caed..80de413e2 100644 --- a/docs/tut_freeswitch_csv.rst +++ b/docs/tut_freeswitch_csv.rst @@ -13,10 +13,10 @@ Scenario - **CGRateS** with following components: - CGR-SM started as prepaid controller, with debits taking place at 5s intervals. - - CGR-CDRC component importing FreeSWITCH_ generated *.csv* CDRs into CGR (moving the processed *.csv* files to */tmp* folder). - - CGR-Mediator compoenent attaching costs to the raw CDRs from CGR-CDRC. - - CGR-CDRE exporting mediated CDRs (export path: */tmp*). - - CGR-History component keeping the archive of the rates modifications (path: */tmp/cgr_history*). + - CGR-CDRC component importing FreeSWITCH_ generated *.csv* CDRs into CGR and moving the processed *.csv* files to */tmp* folder. + - CGR-Mediator compoenent attaching costs to the raw CDRs from CGR-CDRC inside CGR StorDB. + - CGR-CDRE exporting mediated CDRs from CGR StorDB (export path: */tmp*). + - CGR-History component keeping the archive of the rates modifications (path browsable with git client at */tmp/cgr_history*). Starting FreeSWITCH_ with custom configuration @@ -103,28 +103,73 @@ To verify that all actions successfully performed, we use following *cgr-console Test calls ---------- -Calling between 1001 and 1003 should generate prepaid debits which can be checked with *get_balance* command integrated within *cgr-console* tool. The difference between calling from 1001 or 1003 should be reflected in fact that 1001 will generate real-time debits as opposite to 1003 which will only generate debits when CDRs will be processed. + +1001 -> 1002 +~~~~~~~~~~~~ + +Since the user 1001 is marked as *prepaid* inside FreeSWITCH_ directory configuration, calling between 1001 and 1002 should generate pre-auth and prepaid debits which can be checked with *get_balance* command integrated within *cgr-console* tool. As per our tariff plans, we should get first 60s charged as a whole, then in intervals of 1s (configured SessionManager debit interval of 10s). + +*Note*: An important particularity to note here is the ability of **CGRateS** SessionManager to refund units booked in advance (eg: if debit occurs every 10s and rate increments are set to 1s, the SessionManager will be smart enough to refund pre-booked credits for calls stoped in the middle of debit interval). + +Check that 1001 balance is properly debitted, during the call: :: cgr-console get_balance cgrates.org 1001 + + +1002 -> 1001 +~~~~~~~~~~~~ + +The user 1002 is marked as *postpaid* inside FreeSWITCH_ hence his calls will be debited at the end of the call instead of during a call and his balance will be able to go on negative without influencing his new calls (no pre-auth). + +To check that we had debits we use again console command, this time not during the call but at the end of it: + +:: + cgr-console get_balance cgrates.org 1002 +1003 -> 1001 +~~~~~~~~~~~~ + +The user 1003 is marked as *pseudoprepaid* inside FreeSWITCH_ hence his calls will be considered same as prepaid (no call setups possible on negative balance due to pre-auth mechanism) but not handled automatically by session manager. His call costs will be calculated directly out of CDRs and balance updated by the time when mediation process occurs. This is sometimes a good compromise of prepaid running without influencing performance (there are no recurrent call debits during a call). + +To check that there are no debits during or by the end of the call, but when the CDR is imported, run the command before and after rotating the FreeSWITCH_ *.csv* CDRs: + +:: + + cgr-console get_balance cgrates.org 1003 + + +1004 -> 1001 +~~~~~~~~~~~~ + +The user 1004 is marked as *rated* inside FreeSWITCH_ hence his calls not interact in any way with accounting subsystem. The only action perfomed by **CGRateS** related to his calls wil be rating/mediation of his CDRs. + + CDR processing -------------- -For every call FreeSWITCH_ will generate CDR records within the *Master.csv* file. In order to avoid double-processing them we will use the rotate mechanism built in FreeSWITCH_. We rotate files via *fs_console* command: +For every call FreeSWITCH_ will generate CDR records within the *Master.csv* file. +In order to avoid double-processing we will use the rotate mechanism built in FreeSWITCH_. +Once rotated, we will move the resulted files inside the path considered by **CGRateS** *CDRC* component as inbound. + +These steps are automated in a script provided in the */usr/share/cgrates/scripts* location: :: - fs_cli -x "cdr_csv rotate" + /usr/share/cgrates/scripts/freeswitch_cdr_csv_rotate.sh -On each rotate CGR-CDRC component will be informed via *inotify* subsystem and will instantly process the CDR file. The records end up in **CGRateS**/StorDB inside *cdrs_primary* table via CGR-CDRS. Once in there mediation will occur, generating the costs inside *rated_cdrs* and *cost_details* tables. +On each rotate CGR-CDRC component will be informed via *inotify* subsystem and will instantly process the CDR file. The records end up in **CGRateS**/StorDB inside *cdrs_primary* table via CGR-CDRS. As soon as the CDR will hit CDRS component, mediation will occur, either considering the costs calculated in case of prepaid and postpaid calls out of *cost_details* table or query it's own one from rater in case of *pseudoprepaid* and *rated* CDRs. Once the CDRs are mediated, can be exported as *.csv* format again via remote command offered by *cgr-console* tool: +:: + + cgr-console export_cdrs csv + .. _FreeSWITCH: http://www.freeswitch.org/ .. _Jitsi: http://www.jitsi.org/ diff --git a/engine/calldesc.go b/engine/calldesc.go index 147e07e60..5baedfee1 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -368,6 +368,11 @@ func (cd *CallDescriptor) roundTimeSpansToIncrement(timespans TimeSpans) []*Time return timespans } +// Returns call descripor's total duration +func (cd *CallDescriptor) GetDuration() time.Duration { + return cd.TimeEnd.Sub(cd.TimeStart) +} + /* Creates a CallCost structure with the cost information calculated for the received CallDescriptor. */ diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 61e782e4e..a926ed1b3 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -334,13 +334,15 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) { break } } - if !destinationExists { - if dbExists, err := csvr.dataStorage.DataExists(DESTINATION_PREFIX, record[1]); err != nil { + var err error + if !destinationExists && csvr.dataStorage != nil { + if destinationExists, err = csvr.dataStorage.DataExists(DESTINATION_PREFIX, record[1]); err != nil { return err - } else if !dbExists { - return fmt.Errorf("Could not get destination for tag %v", record[1]) } } + if !destinationExists { + return fmt.Errorf("Could not get destination for tag %v", record[1]) + } dr := &utils.TPDestinationRate{ DestinationRateId: tag, DestinationRates: []*utils.DestinationRate{ @@ -417,13 +419,14 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) { csvr.ratingProfiles[key] = rp } _, exists := csvr.ratingPlans[record[5]] - if !exists { - if dbExists, err := csvr.dataStorage.DataExists(RATING_PLAN_PREFIX, record[5]); err != nil { + if !exists && csvr.dataStorage != nil { + if exists, err = csvr.dataStorage.DataExists(RATING_PLAN_PREFIX, record[5]); err != nil { return err - } else if !dbExists { - return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5])) } } + if !exists { + return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5])) + } rpa := &RatingPlanActivation{ ActivationTime: at, RatingPlanId: record[5], diff --git a/engine/storage_mysql.go b/engine/storage_mysql.go index 1df69b393..543243883 100644 --- a/engine/storage_mysql.go +++ b/engine/storage_mysql.go @@ -33,5 +33,8 @@ func NewMySQLStorage(host, port, name, user, password string) (Storage, error) { if err != nil { return nil, err } + if err := db.Ping(); err != nil { + return nil, err + } return &MySQLStorage{&SQLStorage{db}}, nil } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 4a688f5a9..31295937c 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -656,7 +656,7 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e if err != nil { Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) } - _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, cost, connect_fee, timespans, source, runid)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, %f, '%s','%s','%s')", + _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source, runid)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, %f, '%s','%s','%s')", utils.TBL_COST_DETAILS, utils.FSCgrId(uuid), uuid, @@ -666,8 +666,8 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e cc.Account, cc.Subject, cc.Destination, - cc.Cost, cc.ConnectFee, + cc.Cost, tss, source, runid)) @@ -678,12 +678,12 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e } func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, err error) { - row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, cost, connect_fee, timespans, source FROM %s WHERE cgrid='%s' AND source='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid)) + row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s' AND runid='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid)) var accid, src string var timespansJson string cc = &CallCost{Cost: -1} err = row.Scan(&cgrid, &accid, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Account, &cc.Subject, - &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson, &src) + &cc.Destination, &cc.ConnectFee, &cc.Cost, ×pansJson, &src) if err = json.Unmarshal([]byte(timespansJson), &cc.Timespans); err != nil { return nil, err } diff --git a/pkg/debian/postinst b/pkg/debian/postinst index af3e428a3..fe1460296 100755 --- a/pkg/debian/postinst +++ b/pkg/debian/postinst @@ -27,9 +27,10 @@ set -e case "$1" in configure) - adduser --quiet --system --group --disabled-password --shell /bin/false --home /var/run/cgrates --gecos "CGRateS" cgrates || true + adduser --quiet --system --group --disabled-password --shell /bin/false --gecos "CGRateS" cgrates || true chown -R cgrates:cgrates /var/log/cgrates/ chown -R cgrates:cgrates /usr/share/cgrates/ + chown -R cgrates:cgrates /var/run/cgrates/ ;; abort-upgrade|abort-remove|abort-deconfigure) diff --git a/pkg/debian/rules b/pkg/debian/rules index 889fbd6cb..047c58cc1 100755 --- a/pkg/debian/rules +++ b/pkg/debian/rules @@ -33,6 +33,7 @@ binary-arch: clean mkdir -p $(PKGDIR)/var/log/cgrates/cdr/cdrc/out mkdir -p $(PKGDIR)/var/log/cgrates/cdr/cdrexport/csv mkdir -p $(PKGDIR)/var/log/cgrates/history + mkdir -p $(PKGDIR)/var/run/cgrates dh_strip dh_compress dh_fixperms diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index e6c3cfc70..e80b39b8d 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -101,7 +101,7 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session { // Disconnects a session by sending hangup command to freeswitch func (sm *FSSessionManager) DisconnectSession(s *Session, notify string) { - engine.Logger.Debug(fmt.Sprintf("Session: %+v", s.uuid)) + // engine.Logger.Debug(fmt.Sprintf("Session: %+v", s.uuid)) err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", s.uuid, notify)) if err != nil { engine.Logger.Err(fmt.Sprintf("could not send disconect api notification to freeswitch: %v", err)) @@ -123,6 +123,17 @@ func (sm *FSSessionManager) RemoveSession(s *Session) { } } +// Sets the call timeout valid of starting of the call +func (sm *FSSessionManager) setMaxCallDuration(uuid string, maxDur time.Duration) error { + err := fsock.FS.SendApiCmd(fmt.Sprintf("sched_hangup +%d %s\n\n", int(maxDur.Seconds()), uuid)) + if err != nil { + engine.Logger.Err("could not send sched_hangup command to freeswitch") + return err + } + return nil +} + + // Sends the transfer command to unpark the call to freeswitch func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) { err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) @@ -140,15 +151,15 @@ func (sm *FSSessionManager) OnHeartBeat(ev Event) { } func (sm *FSSessionManager) OnChannelPark(ev Event) { - engine.Logger.Info("freeswitch park") + //engine.Logger.Info("freeswitch park") startTime, err := ev.GetStartTime(PARK_TIME) if err != nil { engine.Logger.Err("Error parsing answer event start time, using time.Now!") startTime = time.Now() } // if there is no account configured leave the call alone - if strings.TrimSpace(ev.GetReqType()) != utils.PREPAID { - return + if !utils.IsSliceMember([]string{utils.PREPAID, utils.PSEUDOPREPAID}, strings.TrimSpace(ev.GetReqType())) { + return // we unpark only prepaid and pseudoprepaid calls } if ev.MissingParameter() { sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(), MISSING_PARAMETER) @@ -162,8 +173,9 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { Subject: ev.GetSubject(), Account: ev.GetAccount(), Destination: ev.GetDestination(), - Amount: sm.debitPeriod.Seconds(), - TimeStart: startTime} + TimeStart: startTime, + TimeEnd: startTime.Add(cfg.SMMaxCallDuration), + } var remainingDurationFloat float64 err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat) if err != nil { @@ -172,17 +184,18 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { return } remainingDuration := time.Duration(remainingDurationFloat) - engine.Logger.Info(fmt.Sprintf("Remaining seconds: %v", remainingDuration)) + //engine.Logger.Info(fmt.Sprintf("Remaining duration: %v", remainingDuration)) if remainingDuration == 0 { - engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject))) + //engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject))) sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(), INSUFFICIENT_FUNDS) return } + sm.setMaxCallDuration(ev.GetUUID(), remainingDuration) sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(), AUTH_OK) } func (sm *FSSessionManager) OnChannelAnswer(ev Event) { - engine.Logger.Info(" FreeSWITCH answer.") + //engine.Logger.Info(" FreeSWITCH answer.") // Make sure cgr_type is enforced even if not set by FreeSWITCH if err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_reqtype %s\n\n", ev.GetUUID(), ev.GetReqType())); err != nil { engine.Logger.Err(fmt.Sprintf("Error on attempting to overwrite cgr_type in chan variables: %v", err)) @@ -194,7 +207,7 @@ func (sm *FSSessionManager) OnChannelAnswer(ev Event) { } func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { - engine.Logger.Info(" FreeSWITCH hangup.") + //engine.Logger.Info(" FreeSWITCH hangup.") s := sm.GetSession(ev.GetUUID()) if s == nil { // Not handled by us return @@ -246,7 +259,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { } end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd refundDuration := end.Sub(hangupTime) - initialRefundDuration := refundDuration + //initialRefundDuration := refundDuration var refundIncrements engine.Increments for i := len(lastCC.Timespans) - 1; i >= 0; i-- { ts := lastCC.Timespans[i] @@ -273,7 +286,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { } } // show only what was actualy refunded (stopped in timespan) - engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) + // engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) if len(refundIncrements) > 0 { cd := &engine.CallDescriptor{ Direction: lastCC.Direction, @@ -293,23 +306,21 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { } cost := refundIncrements.GetTotalCost() lastCC.Cost -= cost - engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost)) + // engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost)) } -func (sm *FSSessionManager) LoopAction(s *Session, cd *engine.CallDescriptor, index float64) (cc *engine.CallCost) { +func (sm *FSSessionManager) LoopAction(s *Session, cd *engine.CallDescriptor) (cc *engine.CallCost) { cc = &engine.CallCost{} - cd.LoopIndex = index - cd.CallDuration += sm.debitPeriod err := sm.connector.MaxDebit(*cd, cc) if err != nil { engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err)) // disconnect session s.sessionManager.DisconnectSession(s, SYSTEM_ERROR) } - engine.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc)) + // engine.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc)) if cc.GetDuration() == 0 || err != nil { - engine.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s)) + // engine.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s)) sm.DisconnectSession(s, INSUFFICIENT_FUNDS) return } diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 2af31f6f8..47951e132 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -77,6 +77,7 @@ func NewSession(ev Event, sm SessionManager) (s *Session) { func (s *Session) startDebitLoop() { nextCd := *s.callDescriptor index := 0.0 + debitPeriod := s.sessionManager.GetDebitPeriod() for { select { case <-s.stopDebit: @@ -86,9 +87,14 @@ func (s *Session) startDebitLoop() { if index > 0 { // first time use the session start time nextCd.TimeStart = nextCd.TimeEnd } - nextCd.TimeEnd = nextCd.TimeStart.Add(s.sessionManager.GetDebitPeriod()) - cc := s.sessionManager.LoopAction(s, &nextCd, index) - nextCd.TimeEnd = cc.GetEndTime() + nextCd.TimeEnd = nextCd.TimeStart.Add(debitPeriod) + nextCd.LoopIndex = index + nextCd.CallDuration += debitPeriod // first presumed duration + cc := s.sessionManager.LoopAction(s, &nextCd) + nextCd.TimeEnd = cc.GetEndTime() // set debited timeEnd + // update call duration with real debited duration + nextCd.CallDuration -= debitPeriod + nextCd.CallDuration += nextCd.GetDuration() time.Sleep(cc.GetDuration()) index++ } @@ -106,7 +112,7 @@ func (s *Session) getSessionDurationFrom(now time.Time) (d time.Duration) { // Stops the debit loop func (s *Session) Close(ev Event) { - engine.Logger.Debug(fmt.Sprintf("Stopping debit for %s", s.uuid)) + // engine.Logger.Debug(fmt.Sprintf("Stopping debit for %s", s.uuid)) if s == nil { return } @@ -140,6 +146,6 @@ func (s *Session) SaveOperations() { engine.Logger.Err(" Error: no connection to logger database, cannot save costs") } s.sessionManager.GetDbLogger().LogCallCost(s.uuid, engine.SESSION_MANAGER_SOURCE, utils.DEFAULT_RUNID, firstCC) - engine.Logger.Debug(fmt.Sprintf(" End of call, having costs: %v", firstCC.String())) + // engine.Logger.Debug(fmt.Sprintf(" End of call, having costs: %v", firstCC.String())) }() } diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 9eb2f68ae..ea0074786 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -19,16 +19,17 @@ along with this program. If not, see package sessionmanager import ( + "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" - "time" ) type SessionManager interface { Connect(*config.CGRConfig) error DisconnectSession(*Session, string) RemoveSession(*Session) - LoopAction(*Session, *engine.CallDescriptor, float64) *engine.CallCost + LoopAction(*Session, *engine.CallDescriptor) *engine.CallCost GetDebitPeriod() time.Duration GetDbLogger() engine.LogStorage Shutdown() error diff --git a/utils/consts.go b/utils/consts.go index 5f17455f2..070638c1f 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1,7 +1,7 @@ package utils const ( - VERSION = "0.9.1rc3" + VERSION = "0.9.1c3" POSTGRES = "postgres" MYSQL = "mysql" MONGO = "mongo"