diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 49efdb106..c96b7f653 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -21,8 +21,8 @@ package apier import ( "errors" "fmt" - "github.com/cgrates/cgrates/utils" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" "time" ) @@ -33,10 +33,10 @@ type AttrAcntAction struct { } type AccountActionTiming struct { - Id string // The id to reference this particular ActionTiming + Id string // The id to reference this particular ActionTiming ActionPlanId string // The id of the ActionPlanId profile attached to the account - ActionsId string // The id of actions which will be executed - NextExecTime time.Time // Next execution time + ActionsId string // The id of actions which will be executed + NextExecTime time.Time // Next execution time } func (self *ApierV1) GetAccountActionPlan(attrs AttrAcntAction, reply *[]*AccountActionTiming) error { @@ -60,12 +60,12 @@ func (self *ApierV1) GetAccountActionPlan(attrs AttrAcntAction, reply *[]*Accoun } type AttrRemActionTiming struct { - ActionPlanId string // Id identifying the ActionTimings profile + ActionPlanId string // Id identifying the ActionTimings profile ActionTimingId string // Internal CGR id identifying particular ActionTiming, *all for all user related ActionTimings to be canceled Tenant string // Tenant he account belongs to Account string // Account name Direction string // Traffic direction - ReloadScheduler bool // If set it will reload the scheduler after adding + ReloadScheduler bool // If set it will reload the scheduler after adding } // Removes an ActionTimings or parts of it depending on filters being set @@ -155,12 +155,11 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r return nil } - type AttrSetAccount struct { - Tenant string - Direction string - Account string - Type string // <*prepaid|*postpaid> + Tenant string + Direction string + Account string + Type string // <*prepaid|*postpaid> ActionPlanId string } @@ -186,8 +185,8 @@ func (self *ApierV1) SetAccount(attr AttrSetAccount, reply *string) error { Type: attr.Type, } } - - if len(attr.ActionPlanId) != 0 { + + if len(attr.ActionPlanId) != 0 { var err error ats, err = self.AccountDb.GetActionTimings(attr.ActionPlanId) if err != nil { diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 14ed6adf7..8649d0d6f 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -19,21 +19,20 @@ along with this program. If not, see package apier import ( - "flag" - "fmt" - "net/http" - "net/rpc" - "net/url" - "os/exec" "path" "reflect" - "strings" + "os/exec" "testing" "time" + "strings" + "net/url" + "net/http" + "net/rpc" + "flag" + "fmt" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/cgrates/config" ) // ToDo: Replace rpc.Client with internal rpc server and Apier using internal map as both data and stor so we can run the tests non-local @@ -1229,7 +1228,7 @@ func TestCdrServer(t *testing.T) { "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} { cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL) - if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", cfg.CdrcCdrs), cdrForm); err != nil { + if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", "127.0.0.1:2080"), cdrForm); err != nil { t.Error(err.Error()) } } diff --git a/apier/v1/tpactiontimings.go b/apier/v1/tpactiontimings.go index ec9210fb9..ca5c94807 100644 --- a/apier/v1/tpactiontimings.go +++ b/apier/v1/tpactiontimings.go @@ -43,8 +43,8 @@ func (self *ApierV1) SetTPActionPlan(attrs utils.TPActionPlan, reply *string) er } type AttrGetTPActionPlan struct { - TPid string // Tariff plan id - Id string // ActionTimings id + TPid string // Tariff plan id + Id string // ActionTimings id } // Queries specific ActionPlan profile on tariff plan diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 3df8587e6..6710dbd5f 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -26,13 +26,13 @@ import ( "io" "io/ioutil" "net/http" - "net/url" "os" "path" "strconv" "strings" "time" + "github.com/cgrates/cgrates/cdrs" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -44,8 +44,8 @@ const ( FS_CSV = "freeswitch_csv" ) -func NewCdrc(config *config.CGRConfig) (*Cdrc, error) { - cdrc := &Cdrc{cgrCfg: config} +func NewCdrc(config *config.CGRConfig, cdrServer *cdrs.CDRS) (*Cdrc, error) { + cdrc := &Cdrc{cgrCfg: config, cdrServer: cdrServer} // Before processing, make sure in and out folders exist for _, dir := range []string{cdrc.cgrCfg.CdrcCdrInDir, cdrc.cgrCfg.CdrcCdrOutDir} { if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { @@ -61,6 +61,7 @@ func NewCdrc(config *config.CGRConfig) (*Cdrc, error) { type Cdrc struct { cgrCfg *config.CGRConfig + cdrServer *cdrs.CDRS cfgCdrFields map[string]string // Key is the name of the field httpClient *http.Client } @@ -116,10 +117,9 @@ func (self *Cdrc) parseFieldsConfig() error { } // Takes the record out of csv and turns it into http form which can be posted -func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) { - // engine.Logger.Info(fmt.Sprintf("Processing record %v", record)) - v := url.Values{} - v.Set(utils.CDRSOURCE, self.cgrCfg.CdrcSourceId) +func (self *Cdrc) recordAsRatedCdr(record []string) (*utils.RatedCDR, error) { + ratedCdr := &utils.RatedCDR{CdrSource: self.cgrCfg.CdrcSourceId, ExtraFields: map[string]string{}, Cost: -1} + var err error for cfgFieldName, cfgFieldVal := range self.cfgCdrFields { var fieldVal string if strings.HasPrefix(cfgFieldVal, utils.STATIC_VALUE_PREFIX) { @@ -135,9 +135,38 @@ func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) { } else { // Modify here when we add more supported cdr formats fieldVal = "UNKNOWN" } - v.Set(cfgFieldName, fieldVal) + switch cfgFieldName { + case utils.ACCID: + ratedCdr.CgrId = utils.FSCgrId(fieldVal) + ratedCdr.AccId = fieldVal + case utils.REQTYPE: + ratedCdr.ReqType = fieldVal + case utils.DIRECTION: + ratedCdr.Direction = fieldVal + case utils.TENANT: + ratedCdr.Tenant = fieldVal + case utils.TOR: + ratedCdr.TOR = fieldVal + case utils.ACCOUNT: + ratedCdr.Account = fieldVal + case utils.SUBJECT: + ratedCdr.Subject = fieldVal + case utils.DESTINATION: + ratedCdr.Destination = fieldVal + case utils.ANSWER_TIME: + if ratedCdr.AnswerTime, err = utils.ParseTimeDetectLayout(fieldVal); err != nil { + return nil, fmt.Errorf("Cannot parse answer time field, err: %s", err.Error()) + } + case utils.DURATION: + if ratedCdr.Duration, err = utils.ParseDurationWithSecs(fieldVal); err != nil { + return nil, fmt.Errorf("Cannot parse duration field, err: %s", err.Error()) + } + default: // Extra fields will not match predefined so they all show up here + ratedCdr.ExtraFields[cfgFieldName] = fieldVal + } + } - return v, nil + return ratedCdr, nil } // One run over the CDR folder @@ -199,14 +228,21 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) continue // Other csv related errors, ignore } - cdrAsForm, err := self.cdrAsHttpForm(record) + rawCdr, err := self.recordAsRatedCdr(record) if err != nil { engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) continue } - if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.HTTPListen), cdrAsForm); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, error: %s", err.Error())) - continue + if self.cgrCfg.CdrcCdrs == utils.INTERNAL { + if err := self.cdrServer.ProcessRawCdr(rawCdr); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, error: %s", err.Error())) + continue + } + } else { // CDRs listening on IP + if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.HTTPListen), rawCdr.AsRawCdrHttpForm()); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, error: %s", err.Error())) + continue + } } } // Finished with file, move it to processed folder diff --git a/cdrc/cdrc_local_test.go b/cdrc/cdrc_local_test.go index e063cd209..0cfb95bec 100644 --- a/cdrc/cdrc_local_test.go +++ b/cdrc/cdrc_local_test.go @@ -130,10 +130,13 @@ func TestProcessCdrDir(t *testing.T) { if !*testLocal { return } + if cfg.CdrcCdrs == utils.INTERNAL { // For now we only test over network + return + } if err := startEngine(); err != nil { t.Fatal(err.Error()) } - cdrc, err := NewCdrc(cfg) + cdrc, err := NewCdrc(cfg, nil) if err != nil { t.Fatal(err.Error()) } diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index 87f72cf4e..4678b1a74 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -21,7 +21,9 @@ package cdrc import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "reflect" "testing" + "time" ) func TestParseFieldsConfig(t *testing.T) { @@ -50,33 +52,55 @@ func TestParseFieldsConfig(t *testing.T) { cgrConfig.CdrcExtraFields = []string{"supplier1:^top_supplier", "orig_ip:11"} cdrc = &Cdrc{cgrCfg: cgrConfig} if err := cdrc.parseFieldsConfig(); err != nil { - t.Errorf("Failed to corectly parse extra fields %v",cdrc.cfgCdrFields) + t.Errorf("Failed to corectly parse extra fields %v", cdrc.cfgCdrFields) } } -func TestCdrAsHttpForm(t *testing.T) { +func TestRecordAsRatedCdr(t *testing.T) { cgrConfig, _ := config.NewDefaultCGRConfig() + cgrConfig.CdrcExtraFields = []string{"supplier:10"} cdrc := &Cdrc{cgrCfg: cgrConfig} if err := cdrc.parseFieldsConfig(); err != nil { t.Error("Failed parsing default fieldIndexesFromConfig", err) } cdrRow := []string{"firstField", "secondField"} - _, err := cdrc.cdrAsHttpForm(cdrRow) + _, err := cdrc.recordAsRatedCdr(cdrRow) if err == nil { t.Error("Failed to corectly detect missing fields from record") } cdrRow = []string{"acc1", "prepaid", "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"} - cdrAsForm, err := cdrc.cdrAsHttpForm(cdrRow) + rtCdr, err := cdrc.recordAsRatedCdr(cdrRow) if err != nil { - t.Error("Failed to parse CDR in form", err) + t.Error("Failed to parse CDR in rated cdr", err) } - if cdrAsForm.Get(utils.CDRSOURCE) != cgrConfig.CdrcSourceId { - t.Error("Unexpected cdrsource received", cdrAsForm.Get(utils.CDRSOURCE)) + expectedCdr := &utils.RatedCDR{ + CgrId: utils.FSCgrId(cdrRow[0]), + AccId: cdrRow[0], + CdrSource: cgrConfig.CdrcSourceId, + ReqType: cdrRow[1], + Direction: cdrRow[2], + Tenant: cdrRow[3], + TOR: cdrRow[4], + Account: cdrRow[5], + Subject: cdrRow[6], + Destination: cdrRow[7], + AnswerTime: time.Date(2013, 2, 3, 19, 54, 0, 0, time.UTC), + Duration: time.Duration(62) * time.Second, + ExtraFields: map[string]string{"supplier": "supplier1"}, + Cost: -1, } - if cdrAsForm.Get(utils.REQTYPE) != "prepaid" { - t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE)) + if !reflect.DeepEqual(expectedCdr, rtCdr) { + t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) } - //if cdrAsForm.Get("supplier") != "supplier1" { - // t.Error("Unexpected CDR value received", cdrAsForm.Get("supplier")) - //} + /* + if cdrAsForm.Get(utils.CDRSOURCE) != cgrConfig.CdrcSourceId { + t.Error("Unexpected cdrsource received", cdrAsForm.Get(utils.CDRSOURCE)) + } + if cdrAsForm.Get(utils.REQTYPE) != "prepaid" { + t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE)) + } + if cdrAsForm.Get("supplier") != "supplier1" { + t.Error("Unexpected CDR value received", cdrAsForm.Get("supplier")) + } + */ } diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index e9d0118a2..05acc22bc 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -35,36 +35,42 @@ var ( medi *mediator.Mediator ) -func fsCdrHandler(w http.ResponseWriter, r *http.Request) { - body, _ := ioutil.ReadAll(r.Body) - if fsCdr, err := new(FSCdr).New(body); err == nil { - storage.SetCdr(fsCdr) - go func() { //FS will not send us hangup_complete until we have send back the answer to CDR, so we need to handle mediation async - if cfg.CDRSMediator == "internal" { - medi.MediateRawCDR(fsCdr) - } else { - //TODO: use the connection to mediator +// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline +func storeAndMediate(rawCdr utils.RawCDR) error { + if err := storage.SetCdr(rawCdr); err != nil { + return err + } + if cfg.CDRSMediator == utils.INTERNAL { + go func() { + if err := medi.MediateRawCDR(rawCdr); err != nil { + engine.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", err.Error())) } }() - } else { + } + return nil +} + +// Handler for generic cgr cdr http +func cgrCdrHandler(w http.ResponseWriter, r *http.Request) { + cgrCdr, err := utils.NewCgrCdrFromHttpReq(r) + if err != nil { engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %s", err.Error())) } + if err := storeAndMediate(cgrCdr); err != nil { + engine.Logger.Err(fmt.Sprintf("Errors when storing CDR entry: %s", err.Error())) + } } -func cgrCdrHandler(w http.ResponseWriter, r *http.Request) { - if cgrCdr, err := utils.NewCgrCdrFromHttpReq(r); err == nil { - storage.SetCdr(cgrCdr) - if cfg.CDRSMediator == "internal" { - errMedi := medi.MediateRawCDR(cgrCdr) - if errMedi != nil { - engine.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", errMedi.Error())) - } - } else { - //TODO: use the connection to mediator - } - } else { +// Handler for fs http +func fsCdrHandler(w http.ResponseWriter, r *http.Request) { + body, _ := ioutil.ReadAll(r.Body) + fsCdr, err := new(FSCdr).New(body) + if err != nil { engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %s", err.Error())) } + if err := storeAndMediate(fsCdr); err != nil { + engine.Logger.Err(fmt.Sprintf("Errors when storing CDR entry: %s", err.Error())) + } } type CDRS struct{} @@ -80,3 +86,8 @@ func (cdrs *CDRS) RegisterHanlersToServer(server *engine.Server) { server.RegisterHttpFunc("/cgr", cgrCdrHandler) server.RegisterHttpFunc("/freeswitch_json", fsCdrHandler) } + +// Used to internally process CDR +func (cdrs *CDRS) ProcessRawCdr(rawCdr utils.RawCDR) error { + return storeAndMediate(rawCdr) +} diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index 145631ec6..2981aea4a 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -24,8 +24,8 @@ import ( "fmt" "github.com/cgrates/cgrates/utils" "strconv" - "time" "strings" + "time" ) const ( @@ -172,7 +172,7 @@ func (fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld rtCdr := new(utils.RatedCDR) rtCdr.MediationRunId = runId rtCdr.Cost = -1.0 // Default for non-rated CDR - if rtCdr.AccId = fsCdr.GetAccId(); len(rtCdr.AccId)==0 { + if rtCdr.AccId = fsCdr.GetAccId(); len(rtCdr.AccId) == 0 { if fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, utils.ACCID)) } else { // Not mandatory, need to generate here CgrId @@ -181,10 +181,10 @@ func (fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld } else { // hasKey, use it to generate cgrid rtCdr.CgrId = utils.FSCgrId(rtCdr.AccId) } - if rtCdr.CdrHost = fsCdr.GetCdrHost(); len(rtCdr.CdrHost)==0 && fieldsMandatory { + if rtCdr.CdrHost = fsCdr.GetCdrHost(); len(rtCdr.CdrHost) == 0 && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, utils.CDRHOST)) } - if rtCdr.CdrSource = fsCdr.GetCdrSource(); len(rtCdr.CdrSource)==0 && fieldsMandatory { + if rtCdr.CdrSource = fsCdr.GetCdrSource(); len(rtCdr.CdrSource) == 0 && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, utils.CDRSOURCE)) } if strings.HasPrefix(reqTypeFld, utils.STATIC_VALUE_PREFIX) { // Values starting with prefix are not dynamically populated @@ -232,7 +232,7 @@ func (fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld return nil, err } } - if durStr, hasKey = fsCdr[durationFld]; !hasKey && fieldsMandatory && !strings.HasPrefix(durationFld, utils.STATIC_VALUE_PREFIX){ + if durStr, hasKey = fsCdr[durationFld]; !hasKey && fieldsMandatory && !strings.HasPrefix(durationFld, utils.STATIC_VALUE_PREFIX) { return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, durationFld)) } else { if strings.HasPrefix(durationFld, utils.STATIC_VALUE_PREFIX) { diff --git a/cdrs/fscdr_test.go b/cdrs/fscdr_test.go index 0fd986bc5..b6ca2c473 100644 --- a/cdrs/fscdr_test.go +++ b/cdrs/fscdr_test.go @@ -21,8 +21,8 @@ package cdrs import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" - "testing" "reflect" + "testing" "time" ) @@ -100,28 +100,28 @@ func TestFsCdrAsRatedCdr(t *testing.T) { if err != nil { t.Errorf("Error loading cdr: %v", err) } - rtCdrOut, err := fsCdr.AsRatedCdr("wholesale_run", "^"+utils.RATED, "^*out", "cgr_tenant", "cgr_tor", "cgr_account", "cgr_subject", "cgr_destination", + rtCdrOut, err := fsCdr.AsRatedCdr("wholesale_run", "^"+utils.RATED, "^*out", "cgr_tenant", "cgr_tor", "cgr_account", "cgr_subject", "cgr_destination", "answer_epoch", "billsec", []string{"effective_caller_id_number"}, true) if err != nil { t.Error("Unexpected error received", err) } - expctRatedCdr := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f", + expctRatedCdr := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f", CdrHost: "127.0.0.1", CdrSource: FS_CDR_SOURCE, ReqType: utils.RATED, - Direction: "*out", Tenant: "ipbx.itsyscom.com", TOR: "call", Account: "dan", Subject: "dan", Destination: "+4986517174963", - AnswerTime: time.Date(2013, 8, 4, 9, 50, 56, 0, time.UTC).Local(), Duration: time.Duration(4)*time.Second, + Direction: "*out", Tenant: "ipbx.itsyscom.com", TOR: "call", Account: "dan", Subject: "dan", Destination: "+4986517174963", + AnswerTime: time.Date(2013, 8, 4, 9, 50, 56, 0, time.UTC).Local(), Duration: time.Duration(4) * time.Second, ExtraFields: map[string]string{"effective_caller_id_number": "+4986517174960"}, MediationRunId: "wholesale_run", Cost: -1} if !reflect.DeepEqual(rtCdrOut, expctRatedCdr) { t.Errorf("Received: %v, expected: %v", rtCdrOut, expctRatedCdr) } - rtCdrOut2, err := fsCdr.AsRatedCdr("wholesale_run", "^postpaid", "^*in", "^cgrates.com", "^premium_call", "^first_account", "^first_subject", "cgr_destination", + rtCdrOut2, err := fsCdr.AsRatedCdr("wholesale_run", "^postpaid", "^*in", "^cgrates.com", "^premium_call", "^first_account", "^first_subject", "cgr_destination", "^2013-12-07T08:42:26Z", "^12s", []string{"effective_caller_id_number"}, true) if err != nil { t.Error("Unexpected error received", err) } - expctRatedCdr2 := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f", CdrHost: "127.0.0.1", + expctRatedCdr2 := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f", CdrHost: "127.0.0.1", CdrSource: FS_CDR_SOURCE, ReqType: "postpaid", - Direction: "*in", Tenant: "cgrates.com", TOR: "premium_call", Account: "first_account", Subject: "first_subject", Destination: "+4986517174963", - AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(12)*time.Second, + Direction: "*in", Tenant: "cgrates.com", TOR: "premium_call", Account: "first_account", Subject: "first_subject", Destination: "+4986517174963", + AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(12) * time.Second, ExtraFields: map[string]string{"effective_caller_id_number": "+4986517174960"}, MediationRunId: "wholesale_run", Cost: -1} if !reflect.DeepEqual(rtCdrOut2, expctRatedCdr2) { t.Errorf("Received: %v, expected: %v", rtCdrOut2, expctRatedCdr2) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 7c8ba3dca..fa6144dfb 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -43,7 +43,6 @@ import ( ) const ( - INTERNAL = "internal" JSON = "json" GOB = "gob" POSTGRES = "postgres" @@ -58,7 +57,7 @@ var ( cfgPath = flag.String("config", "/etc/cgrates/cgrates.cfg", "Configuration file location.") version = flag.Bool("version", false, "Prints the application version.") raterEnabled = flag.Bool("rater", false, "Enforce starting of the rater daemon overwriting config") - schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon overwriting config") + schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon .overwriting config") cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config") cdrcEnabled = flag.Bool("cdrc", false, "Enforce starting of the cdrc service overwriting config") mediatorEnabled = flag.Bool("mediator", false, "Enforce starting of the mediator service overwriting config") @@ -71,11 +70,35 @@ var ( medi *mediator.Mediator cfg *config.CGRConfig err error + +/* + scribeServer history.Scribe + cdrServer *cdrs.CDRS + sm sessionmanager.SessionManager + medi *mediator.Mediator + cfg *config.CGRConfig + err error +*/ ) -func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage) { +func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, doneChan chan struct{}) { + if err := ratingDb.CacheRating(nil, nil, nil); err != nil { + engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error())) + exitChan <- true + return + } + if err := accountDb.CacheAccounting(nil, nil); err != nil { + engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) + exitChan <- true + return + } + close(doneChan) +} + +func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage, cacheChan, chanDone chan struct{}) { var connector engine.Connector - if cfg.MediatorRater == INTERNAL { + if cfg.MediatorRater == utils.INTERNAL { + <-cacheChan // Cache needs to come up before we are ready connector = responder } else { var client *rpc.Client @@ -91,6 +114,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD if err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not connect to engine: %v", err)) exitChan <- true + return } connector = &engine.RPCClientConnector{Client: client} } @@ -99,11 +123,16 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD if err != nil { engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err)) exitChan <- true + return } + close(chanDone) } -func startCdrc() { - cdrc, err := cdrc.NewCdrc(cfg) +func startCdrc(cdrsChan chan struct{}) { + if cfg.CdrcCdrs == utils.INTERNAL { + <-cdrsChan // Wait for CDRServer to come up before start processing + } + cdrc, err := cdrc.NewCdrc(cfg, cdrServer) if err != nil { engine.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error())) exitChan <- true @@ -115,9 +144,10 @@ func startCdrc() { exitChan <- true // If run stopped, something is bad, stop the application } -func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage) { +func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) { var connector engine.Connector - if cfg.SMRater == INTERNAL { + if cfg.SMRater == utils.INTERNAL { + <-cacheChan // Wait for the cache to init before start doing queries connector = responder } else { var client *rpc.Client @@ -146,31 +176,40 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage } default: engine.Logger.Err(fmt.Sprintf(" Unsupported session manger type: %s!", cfg.SMSwitchType)) - exitChan <- true } exitChan <- true } -func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage) { - if cfg.CDRSMediator == INTERNAL { - for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable - time.Sleep(time.Duration(i+1) * time.Second) - if medi != nil { // Got our mediator, no need to wait any longer - break - } - } +func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, doneChan chan struct{}) { + if cfg.CDRSMediator == utils.INTERNAL { + <-mediChan // Deadlock if mediator not started if medi == nil { engine.Logger.Crit(" Could not connect to mediator, exiting.") exitChan <- true + return } } - cs := cdrs.New(cdrDb, medi, cfg) - cs.RegisterHanlersToServer(server) + cdrServer = cdrs.New(cdrDb, medi, cfg) + cdrServer.RegisterHanlersToServer(server) + close(doneChan) } -func startHistoryAgent(scribeServer history.Scribe) { - if cfg.HistoryServer != INTERNAL { // Connect in iteration since there are chances of concurrency here - engine.Logger.Info("Starting History Agent.") +func startHistoryServer(chanDone chan struct{}) { + if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + return + } + server.RpcRegisterName("Scribe", scribeServer) + close(chanDone) +} + +// chanStartServer will report when server is up, useful for internal requests +func startHistoryAgent(scribeServer history.Scribe, chanServerStarted chan struct{}) { + if cfg.HistoryServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting + engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) + <-chanServerStarted // If server is not enabled, will have deadlock here + } else { // Connect in iteration since there are chances of concurrency here for i := 0; i < 3; i++ { //ToDo: Make it globally configurable //engine.Logger.Crit(fmt.Sprintf(" Trying to connect, iteration: %d, time %s", i, time.Now())) if scribeServer, err = history.NewProxyScribe(cfg.HistoryServer); err == nil { @@ -180,13 +219,31 @@ func startHistoryAgent(scribeServer history.Scribe) { exitChan <- true return } - time.Sleep(time.Duration(i+1) * time.Second) + time.Sleep(time.Duration(i) * time.Second) } } engine.SetHistoryScribe(scribeServer) return } +// Starts the rpc server, waiting for the necessary components to finish their tasks +func serveRpc(rpcWaitChans []chan struct{}) { + for _, chn := range rpcWaitChans { + <-chn + } + // Each of the serve blocks so need to start in their own goroutine + go server.ServeJSON(cfg.RPCJSONListen) + go server.ServeGOB(cfg.RPCGOBListen) +} + +// Starts the http server, waiting for the necessary components to finish their tasks +func serveHttp(httpWaitChans []chan struct{}) { + for _, chn := range httpWaitChans { + <-chn + } + server.ServeHTTP(cfg.HTTPListen) +} + func checkConfigSanity() error { if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" { engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!") @@ -196,11 +253,11 @@ func checkConfigSanity() error { engine.Logger.Crit("The balancer is enabled so it cannot connect to another balancer (change rater/balancer to disabled)!") return errors.New("Improperly configured balancer") } - if cfg.CDRSEnabled && cfg.CDRSMediator == INTERNAL && !cfg.MediatorEnabled { + if cfg.CDRSEnabled && cfg.CDRSMediator == utils.INTERNAL && !cfg.MediatorEnabled { engine.Logger.Crit("CDRS cannot connect to mediator, Mediator not enabled in configuration!") return errors.New("Internal Mediator required by CDRS") } - if cfg.HistoryServerEnabled && cfg.HistoryServer == INTERNAL && !cfg.HistoryServerEnabled { + if cfg.HistoryServerEnabled && cfg.HistoryServer == utils.INTERNAL && !cfg.HistoryServerEnabled { engine.Logger.Crit("The history agent is enabled and internal and history server is disabled!") return errors.New("Improperly configured history service") } @@ -263,14 +320,16 @@ func main() { var logDb engine.LogStorage var loadDb engine.LoadStorage var cdrDb engine.CdrStorage - ratingDb, err = engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding) + ratingDb, err = engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, + cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding) if err != nil { // Cannot configure getter database, show stopper engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) return } defer ratingDb.Close() engine.SetRatingStorage(ratingDb) - accountDb, err = engine.ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, cfg.AccountDBName, cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding) + accountDb, err = engine.ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, + cfg.AccountDBName, cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding) if err != nil { // Cannot configure getter database, show stopper engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) return @@ -306,7 +365,8 @@ func main() { if cfg.StorDBType == SAME { logDb = ratingDb.(engine.LogStorage) } else { - logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding) + logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, + cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding) if err != nil { // Cannot configure logger database, show stopper engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) return @@ -327,18 +387,18 @@ func main() { stopHandled := false + // Async starts here + + rpcWait := make([]chan struct{}, 0) // Rpc server will start as soon as this list is consumed + httpWait := make([]chan struct{}, 0) // Http server will start as soon as this list is consumed + + var cacheChan chan struct{} if cfg.RaterEnabled { // Cache rating if rater enabled - if err := ratingDb.CacheRating(nil, nil, nil); err != nil { - engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error())) - return - } - if err := accountDb.CacheAccounting(nil, nil); err != nil { - engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) - return - } + cacheChan = make(chan struct{}) + rpcWait = append(rpcWait, cacheChan) + go cacheData(ratingDb, accountDb, cacheChan) } - // Async starts here if cfg.RaterEnabled && cfg.RaterBalancer != "" && !cfg.BalancerEnabled { go registerToBalancer() go stopRaterSignalHandler() @@ -348,14 +408,14 @@ func main() { responder := &engine.Responder{ExitChan: exitChan} apier := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, Config: cfg} - if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != INTERNAL { + if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != utils.INTERNAL { engine.Logger.Info("Registering CGRateS Rater service") server.RpcRegister(responder) server.RpcRegister(apier) } if cfg.BalancerEnabled { - engine.Logger.Info("Registering CGRateS Balancer service") + engine.Logger.Info("Registering CGRateS Balancer service.") go stopBalancerSignalHandler() stopHandled = true responder.Bal = bal @@ -382,49 +442,51 @@ func main() { }() } - var scribeServer history.Scribe - + var histServChan chan struct{} // Will be initialized only if the server starts if cfg.HistoryServerEnabled { - engine.Logger.Info("Registering CGRates History service") - if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) - exitChan <- true - return - } - server.RpcRegisterName("Scribe", scribeServer) - } - go startHistoryAgent(scribeServer) - - go server.ServeGOB(cfg.RPCGOBListen) - go server.ServeJSON(cfg.RPCJSONListen) - - go startHistoryAgent(scribeServer) - - if cfg.CDRSEnabled { - engine.Logger.Info("Registering CGRateS CDR service") - go startCDRS(responder, cdrDb) + histServChan = make(chan struct{}) + rpcWait = append(rpcWait, histServChan) + go startHistoryServer(histServChan) } - go server.ServeHTTP(cfg.HTTPListen) + if cfg.HistoryAgentEnabled { + engine.Logger.Info("Starting CGRateS History Agent.") + go startHistoryAgent(scribeServer, histServChan) + } + var medChan chan struct{} if cfg.MediatorEnabled { - engine.Logger.Info("Starting CGRateS Mediator service") - go startMediator(responder, logDb, cdrDb) + engine.Logger.Info("Starting CGRateS Mediator service.") + medChan = make(chan struct{}) + go startMediator(responder, logDb, cdrDb, cacheChan, medChan) + } + + var cdrsChan chan struct{} + if cfg.CDRSEnabled { + engine.Logger.Info("Starting CGRateS CDRS service.") + cdrsChan = make(chan struct{}) + httpWait = append(httpWait, cdrsChan) + go startCDRS(responder, cdrDb, medChan, cdrsChan) } if cfg.SMEnabled { - engine.Logger.Info("Starting CGRateS SessionManager service") - go startSessionManager(responder, logDb) + engine.Logger.Info("Starting CGRateS SessionManager service.") + go startSessionManager(responder, logDb, cacheChan) // close all sessions on shutdown go shutdownSessionmanagerSingnalHandler() } if cfg.CdrcEnabled { - engine.Logger.Info("Starting CGRateS CDR client") - go startCdrc() + engine.Logger.Info("Starting CGRateS CDR client.") + go startCdrc(cdrsChan) } + // Start the servers + go serveRpc(rpcWait) + go serveHttp(httpWait) + <-exitChan + if *pidFile != "" { if err := os.Remove(*pidFile); err != nil { engine.Logger.Warning("Could not remove pid file: " + err.Error()) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 179c0a339..a65234926 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -19,7 +19,6 @@ along with this program. If not, see package main import ( - "encoding/gob" "flag" "fmt" "log" @@ -67,8 +66,8 @@ var ( stats = flag.Bool("stats", false, "Generates statsistics about given data.") fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb") toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb") - historyServer = flag.String("history_server", cgrConfig.HistoryServer, "The history server address:port, empty to disable automaticautomatic history archiving") - raterAddress = flag.String("rater_address", cgrConfig.MediatorRater, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") + historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving") + raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") ) @@ -158,7 +157,6 @@ func main() { return } else { engine.SetHistoryScribe(scribeAgent) - gob.Register(&engine.Destination{}) defer scribeAgent.Client.Close() } } else { diff --git a/config/config.go b/config/config.go index bdfa88b82..99140023d 100644 --- a/config/config.go +++ b/config/config.go @@ -29,7 +29,6 @@ import ( const ( DISABLED = "disabled" - INTERNAL = "internal" JSON = "json" GOB = "gob" POSTGRES = "postgres" @@ -137,10 +136,10 @@ type CGRConfig struct { HistoryServerEnabled bool // Starts History as server: . HistoryDir string // Location on disk where to store history files. HistorySaveInterval time.Duration // The timout duration between history writes - MailerServer string // The server to use when sending emails out - MailerAuthUser string // Authenticate to email server using this user - MailerAuthPass string // Authenticate to email server with this password - MailerFromAddr string // From address used when sending emails out + MailerServer string // The server to use when sending emails out + MailerAuthUser string // Authenticate to email server using this user + MailerAuthPass string // Authenticate to email server with this password + MailerFromAddr string // From address used when sending emails out } func (self *CGRConfig) setDefaults() error { @@ -183,7 +182,7 @@ func (self *CGRConfig) setDefaults() error { self.CdreExtraFields = []string{} self.CdreDir = "/var/log/cgrates/cdr/cdrexport/csv" self.CdrcEnabled = false - self.CdrcCdrs = "127.0.0.1:2080" + self.CdrcCdrs = utils.INTERNAL self.CdrcCdrsMethod = "http_cgr" self.CdrcRunDelay = time.Duration(0) self.CdrcCdrType = "csv" diff --git a/config/config_test.go b/config/config_test.go index 17b8f8ad6..ffb138667 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -84,7 +84,7 @@ func TestDefaults(t *testing.T) { eCfg.CdreExtraFields = []string{} eCfg.CdreDir = "/var/log/cgrates/cdr/cdrexport/csv" eCfg.CdrcEnabled = false - eCfg.CdrcCdrs = "127.0.0.1:2080" + eCfg.CdrcCdrs = utils.INTERNAL eCfg.CdrcCdrsMethod = "http_cgr" eCfg.CdrcRunDelay = time.Duration(0) eCfg.CdrcCdrType = "csv" diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index c6ee15a81..9b14678c7 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -39,7 +39,7 @@ [rater] # enabled = false # Enable RaterCDRSExportPath service: . -# balancer = # Register to Balancer as worker: <""|127.0.0.1:2013>. +# balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>. [scheduler] # enabled = false # Starts Scheduler service: . @@ -56,7 +56,7 @@ [cdrc] # enabled = false # Enable CDR client functionality -# cdrs = 127.0.0.1:2080 # Address where to reach CDR server +# cdrs = internal # Address where to reach CDR server. # cdrs_method = http_cgr # Mechanism to use when posting CDRs on server # run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify # cdr_type = csv # CDR file format . diff --git a/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg b/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg index 5ee293764..a3fe952ec 100644 --- a/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg +++ b/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg @@ -24,11 +24,13 @@ # stordb_user = cgrates # Username to use when connecting to stordb. # stordb_passwd = CGRateS.org # Password to use when connecting to stordb. # dbdata_encoding = msgpack # The encoding used to store object data in strings: -# rpc_encoding = json # RPC encoding used on APIs: . +# rpc_json_listen = 127.0.0.1:2012 # RPC JSON listening address +# rpc_gob_listen = 127.0.0.1:2013 # RPC GOB listening address +# http_listen = 127.0.0.1:2080 # HTTP listening address # default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>. -# default_tor = 0 # Default Type of Record to consider when missing from requests. -# default_tenant = 0 # Default Tenant to consider when missing from requests. -# default_subject = 0 # Default rating Subject to consider when missing from requests. +# default_tor = call # Default Type of Record to consider when missing from requests. +# default_tenant = cgrates.org # Default Tenant to consider when missing from requests. +# default_subject = cgrates # Default rating Subject to consider when missing from requests. # rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down> # rounding_decimals = 4 # Number of decimals to round float/costs at @@ -38,15 +40,13 @@ [rater] enabled = true # Enable RaterCDRSExportPath service: . -# balancer = disabled # Register to Balancer as worker: . -# listen = 127.0.0.1:2012 # Rater's listening interface: . +# balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>. [scheduler] enabled = true # Starts Scheduler service: . [cdrs] enabled = true # Start the CDR Server service: . -# listen=127.0.0.1:2022 # CDRS's listening interface: . # extra_fields = # Extra fields to store in CDRs mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> @@ -57,7 +57,7 @@ export_dir = /tmp # Path where the exported CDRs will be placed [cdrc] enabled = true # Enable CDR client functionality -# cdrs = 127.0.0.1:2022 # Address where to reach CDR server +# cdrs = internal # Address where to reach CDR server. # cdrs_method = http_cgr # Mechanism to use when posting CDRs on server # run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify cdr_type = freeswitch_csv # CDR file format . @@ -78,8 +78,7 @@ extra_fields = read_codec:13,write_codec:14 # Extra fields identifiers. For .csv [mediator] enabled = true # Starts Mediator service: . -# listen=internal # Mediator's listening interface: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater: +# rater = internal # Address where to reach the Rater: # rater_reconnects = 3 # Number of reconnects to rater before giving up. # run_ids = # Identifiers of each extra mediation to run on CDRs # reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs. @@ -95,7 +94,7 @@ enabled = true # Starts Mediator service: . [session_manager] enabled = true # Starts SessionManager service: . # switch_type = freeswitch # Defines the type of switch behind: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater. +# rater = internal # Address where to reach the Rater. # rater_reconnects = 3 # Number of reconnects to rater before giving up. # debit_interval = 5 # Interval to perform debits on. @@ -106,11 +105,16 @@ enabled = true # Starts SessionManager service: . [history_server] enabled = true # Starts History service: . -# listen = 127.0.0.1:2013 # Listening addres for history server: history_dir = /tmp/cgr_history # Location on disk where to store history files. # save_interval = 1s # Interval to save changed cache into .git archive [history_agent] # enabled = false # Starts History as a client: . -# server = 127.0.0.1:2013 # Address where to reach the master history server: +# server = internal # Address where to reach the master history server: + +[mailer] +# server = localhost # The server to use when sending emails out +# auth_user = cgrates # Authenticate to email server using this user +# auth_passwd = CGRateS.org # Authenticate to email server with this password +# from_address = cgr-mailer@localhost.localdomain # From address used when sending emails out diff --git a/data/tutorials/fs_csv/cgrates/tariffplans/AccountActions.csv b/data/tutorials/fs_csv/cgrates/tariffplans/AccountActions.csv index 455f5318c..e00c681bf 100644 --- a/data/tutorials/fs_csv/cgrates/tariffplans/AccountActions.csv +++ b/data/tutorials/fs_csv/cgrates/tariffplans/AccountActions.csv @@ -1,4 +1,4 @@ -#Tenant,Account,Direction,ActionTimingsTag,ActionTriggersTag +#Tenant,Account,Direction,ActionPlanTag,ActionTriggersTag cgrates.org,1001,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1002,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1003,*out,PREPAID_10,STANDARD_TRIGGERS diff --git a/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg b/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg index 1ac42f06e..a40598797 100644 --- a/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg +++ b/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg @@ -24,11 +24,13 @@ # stordb_user = cgrates # Username to use when connecting to stordb. # stordb_passwd = CGRateS.org # Password to use when connecting to stordb. # dbdata_encoding = msgpack # The encoding used to store object data in strings: -# rpc_encoding = json # RPC encoding used on APIs: . +# rpc_json_listen = 127.0.0.1:2012 # RPC JSON listening address +# rpc_gob_listen = 127.0.0.1:2013 # RPC GOB listening address +# http_listen = 127.0.0.1:2080 # HTTP listening address # default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>. -# default_tor = 0 # Default Type of Record to consider when missing from requests. -# default_tenant = 0 # Default Tenant to consider when missing from requests. -# default_subject = 0 # Default rating Subject to consider when missing from requests. +# default_tor = call # Default Type of Record to consider when missing from requests. +# default_tenant = cgrates.org # Default Tenant to consider when missing from requests. +# default_subject = cgrates # Default rating Subject to consider when missing from requests. # rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down> # rounding_decimals = 4 # Number of decimals to round float/costs at @@ -38,15 +40,13 @@ [rater] enabled = true # Enable RaterCDRSExportPath service: . -# balancer = disabled # Register to Balancer as worker: . -# listen = 127.0.0.1:2012 # Rater's listening interface: . +# balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>. [scheduler] enabled = true # Starts Scheduler service: . [cdrs] enabled = true # Start the CDR Server service: . -# listen=127.0.0.1:2022 # CDRS's listening interface: . # extra_fields = # Extra fields to store in CDRs mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> @@ -57,29 +57,28 @@ export_dir = /tmp # Path where the exported CDRs will be placed [cdrc] # enabled = false # Enable CDR client functionality -# cdrs = 127.0.0.1:2022 # Address where to reach CDR server +# cdrs = internal # Address where to reach CDR server. # cdrs_method = http_cgr # Mechanism to use when posting CDRs on server # run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify -# cdr_type = freeswitch_csv # CDR file format . +# cdr_type = csv # CDR file format . # cdr_in_dir = /var/log/cgrates/cdr/cdrc/in # Absolute path towards the directory where the CDRs are stored. -# cdr_out_dir = /tmp # Absolute path towards the directory where processed CDRs will be moved. +# cdr_out_dir = /var/log/cgrates/cdr/cdrc/out # Absolute path towards the directory where processed CDRs will be moved. # cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database. -# accid_field = 10 # Accounting id field identifier. Use index number in case of .csv cdrs. -# reqtype_field = 16 # Request type field identifier. Use index number in case of .csv cdrs. -# direction_field = ^*out # Direction field identifier. Use index numbers in case of .csv cdrs. -# tenant_field = ^cgrates.org # Tenant field identifier. Use index numbers in case of .csv cdrs. -# tor_field = ^call # Type of Record field identifier. Use index numbers in case of .csv cdrs. -# account_field = 1 # Account field identifier. Use index numbers in case of .csv cdrs. -# subject_field = 1 # Subject field identifier. Use index numbers in case of .csv CDRs. -# destination_field = 2 # Destination field identifier. Use index numbers in case of .csv cdrs. -# answer_time_field = 5 # Answer time field identifier. Use index numbers in case of .csv cdrs. -# duration_field = 8 # Duration field identifier. Use index numbers in case of .csv cdrs. -# extra_fields = read_codec:13,write_codec:14 # Extra fields identifiers. For .csv, format: : +# accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs. +# reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs. +# direction_field = 2 # Direction field identifier. Use index numbers in case of .csv cdrs. +# tenant_field = 3 # Tenant field identifier. Use index numbers in case of .csv cdrs. +# tor_field = 4 # Type of Record field identifier. Use index numbers in case of .csv cdrs. +# account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs. +# subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs. +# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs. +# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs. +# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs. +# extra_fields = # Extra fields identifiers. For .csv, format: :[...,:] [mediator] enabled = true # Starts Mediator service: . -# listen=internal # Mediator's listening interface: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater: +# rater = internal # Address where to reach the Rater: # rater_reconnects = 3 # Number of reconnects to rater before giving up. # run_ids = # Identifiers of each extra mediation to run on CDRs # reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs. @@ -95,7 +94,7 @@ enabled = true # Starts Mediator service: . [session_manager] enabled = true # Starts SessionManager service: . # switch_type = freeswitch # Defines the type of switch behind: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater. +# rater = internal # Address where to reach the Rater. # rater_reconnects = 3 # Number of reconnects to rater before giving up. # debit_interval = 5 # Interval to perform debits on. @@ -106,11 +105,16 @@ enabled = true # Starts SessionManager service: . [history_server] enabled = true # Starts History service: . -# listen = 127.0.0.1:2013 # Listening addres for history server: history_dir = /tmp/cgr_history # Location on disk where to store history files. # save_interval = 1s # Interval to save changed cache into .git archive [history_agent] # enabled = false # Starts History as a client: . -# server = 127.0.0.1:2013 # Address where to reach the master history server: +# server = internal # Address where to reach the master history server: + +[mailer] +# server = localhost # The server to use when sending emails out +# auth_user = cgrates # Authenticate to email server using this user +# auth_passwd = CGRateS.org # Authenticate to email server with this password +# from_address = cgr-mailer@localhost.localdomain # From address used when sending emails out diff --git a/data/tutorials/fs_json/cgrates/tariffplans/AccountActions.csv b/data/tutorials/fs_json/cgrates/tariffplans/AccountActions.csv index 455f5318c..e00c681bf 100644 --- a/data/tutorials/fs_json/cgrates/tariffplans/AccountActions.csv +++ b/data/tutorials/fs_json/cgrates/tariffplans/AccountActions.csv @@ -1,4 +1,4 @@ -#Tenant,Account,Direction,ActionTimingsTag,ActionTriggersTag +#Tenant,Account,Direction,ActionPlanTag,ActionTriggersTag cgrates.org,1001,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1002,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1003,*out,PREPAID_10,STANDARD_TRIGGERS diff --git a/engine/action.go b/engine/action.go index dd93249d6..9360ba7a9 100644 --- a/engine/action.go +++ b/engine/action.go @@ -21,15 +21,15 @@ package engine import ( "bytes" "encoding/json" - "fmt" "errors" + "fmt" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" "net/http" "net/smtp" "sort" - "time" "strings" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/utils" + "time" ) /* @@ -101,7 +101,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { } func logAction(ub *UserBalance, a *Action) (err error) { - ubMarshal,_ := json.Marshal(ub) + ubMarshal, _ := json.Marshal(ub) Logger.Info(fmt.Sprintf("Threshold reached, balance: %s", ubMarshal)) return } @@ -214,7 +214,7 @@ func callUrlAsync(ub *UserBalance, a *Action) error { } time.Sleep(time.Duration(i) * time.Minute) } - + }() return nil } @@ -238,8 +238,8 @@ func mailAsync(ub *UserBalance, a *Action) error { } toAddrStr += addr } - message := []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.Id, time.Now(), ubJson)) - auth := smtp.PlainAuth("", cgrCfg.MailerAuthUser, cgrCfg.MailerAuthPass, strings.Split(cgrCfg.MailerServer,":")[0]) // We only need host part, so ignore port + message := []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.Id, time.Now(), ubJson)) + auth := smtp.PlainAuth("", cgrCfg.MailerAuthUser, cgrCfg.MailerAuthPass, strings.Split(cgrCfg.MailerServer, ":")[0]) // We only need host part, so ignore port go func() { for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort if err := smtp.SendMail(cgrCfg.MailerServer, auth, cgrCfg.MailerFromAddr, toAddrs, message); err == nil { @@ -250,11 +250,10 @@ func mailAsync(ub *UserBalance, a *Action) error { } time.Sleep(time.Duration(i) * time.Minute) } - }() + }() return nil } - - + // Structure to store actions according to weight type Actions []*Action diff --git a/engine/action_timing.go b/engine/action_timing.go index 5df8db1bb..b5d362ea1 100644 --- a/engine/action_timing.go +++ b/engine/action_timing.go @@ -20,11 +20,11 @@ package engine import ( "fmt" + "github.com/cgrates/cgrates/utils" "sort" "strconv" "strings" "time" - "github.com/cgrates/cgrates/utils" ) const ( @@ -299,12 +299,12 @@ func (at *ActionTiming) String_DISABLED() string { // Helper to remove ActionTiming members based on specific filters, empty data means no always match func RemActionTiming(ats ActionPlan, actionTimingId, balanceId string) ActionPlan { for idx, at := range ats { - if len(actionTimingId)!=0 && at.Id!=actionTimingId { // No Match for ActionTimingId, no need to move further + if len(actionTimingId) != 0 && at.Id != actionTimingId { // No Match for ActionTimingId, no need to move further continue } if len(balanceId) == 0 { // No account defined, considered match for complete removal if len(ats) == 1 { // Removing last item, by init empty - return make([]*ActionTiming,0) + return make([]*ActionTiming, 0) } ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] continue @@ -313,9 +313,9 @@ func RemActionTiming(ats ActionPlan, actionTimingId, balanceId string) ActionPla if blncId == balanceId { if len(at.UserBalanceIds) == 1 { // Only one balance, remove complete at if len(ats) == 1 { // Removing last item, by init empty - return make([]*ActionTiming,0) + return make([]*ActionTiming, 0) } - ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] + ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] } else { at.UserBalanceIds[iBlnc], at.UserBalanceIds = at.UserBalanceIds[len(at.UserBalanceIds)-1], at.UserBalanceIds[:len(at.UserBalanceIds)-1] } diff --git a/engine/actions_test.go b/engine/actions_test.go index 57c8bc9b3..1a8dcad2b 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -427,21 +427,21 @@ func TestActionTimingsRemoveMember(t *testing.T) { Tag: "test", UserBalanceIds: []string{"one", "two", "three"}, ActionsId: "TEST_ACTIONS", - } + } at2 := &ActionTiming{ Id: "some uuid22", Tag: "test2", UserBalanceIds: []string{"three", "four"}, ActionsId: "TEST_ACTIONS2", - } + } ats := ActionPlan{at1, at2} - if outAts := RemActionTiming(ats, "", "four"); len(outAts[1].UserBalanceIds) != 1 { + if outAts := RemActionTiming(ats, "", "four"); len(outAts[1].UserBalanceIds) != 1 { t.Error("Expecting fewer balance ids", outAts[1].UserBalanceIds) } if ats = RemActionTiming(ats, "", "three"); len(ats) != 1 { t.Error("Expecting fewer actionTimings", ats) } - if ats = RemActionTiming(ats, "some_uuid22", "");len(ats) != 1 { + if ats = RemActionTiming(ats, "some_uuid22", ""); len(ats) != 1 { t.Error("Expecting fewer actionTimings members", ats) } ats2 := ActionPlan{at1, at2} @@ -450,8 +450,6 @@ func TestActionTimingsRemoveMember(t *testing.T) { } } - - func TestActionTriggerMatchNil(t *testing.T) { at := &ActionTrigger{ Direction: OUTBOUND, diff --git a/engine/destinations.go b/engine/destinations.go index e4e9162e1..baf4e06fa 100644 --- a/engine/destinations.go +++ b/engine/destinations.go @@ -18,7 +18,12 @@ along with this program. If not, see package engine -import "strings" +import ( + "encoding/json" + "strings" + + "github.com/cgrates/cgrates/history" +) /* Structure that gathers multiple destination prefixes under a common id. @@ -53,3 +58,13 @@ func (d *Destination) String() (result string) { func (d *Destination) AddPrefix(pfx string) { d.Prefixes = append(d.Prefixes, pfx) } + +// history record method +func (d *Destination) GetHistoryRecord() history.Record { + js, _ := json.Marshal(d) + return history.Record{ + Id: d.Id, + Filename: history.DESTINATIONS_FN, + Payload: js, + } +} diff --git a/engine/history_test.go b/engine/history_test.go index a432493dd..aa599ae6e 100644 --- a/engine/history_test.go +++ b/engine/history_test.go @@ -27,24 +27,26 @@ import ( func TestHistoryRatinPlans(t *testing.T) { scribe := historyScribe.(*history.MockScribe) - if !strings.Contains(scribe.RpBuf.String(), `{"Key":"*out:vdf:0:minu","Object":{"Id":"*out:vdf:0:minu","RatingPlanActivations":[{"ActivationTime":"2012-01-01T00:00:00Z","RatingPlanId":"EVENING","FallbackKeys":null}]}}`) { - t.Error("Error in destination history content:", scribe.RpBuf.String()) + buf := scribe.BufMap[history.RATING_PROFILES_FN] + if !strings.Contains(buf.String(), `{"Id":"*out:vdf:0:minu","RatingPlanActivations":[{"ActivationTime":"2012-01-01T00:00:00Z","RatingPlanId":"EVENING","FallbackKeys":null}]}`) { + t.Error("Error in destination history content:", buf.String()) } } func TestHistoryDestinations(t *testing.T) { scribe := historyScribe.(*history.MockScribe) - expected := `[{"Key":"ALL","Object":{"Id":"ALL","Prefixes":["49","41","43"]}} -{"Key":"GERMANY","Object":{"Id":"GERMANY","Prefixes":["49"]}} -{"Key":"GERMANY_O2","Object":{"Id":"GERMANY_O2","Prefixes":["41"]}} -{"Key":"GERMANY_PREMIUM","Object":{"Id":"GERMANY_PREMIUM","Prefixes":["43"]}} -{"Key":"NAT","Object":{"Id":"NAT","Prefixes":["0256","0257","0723"]}} -{"Key":"PSTN_70","Object":{"Id":"PSTN_70","Prefixes":["+4970"]}} -{"Key":"PSTN_71","Object":{"Id":"PSTN_71","Prefixes":["+4971"]}} -{"Key":"PSTN_72","Object":{"Id":"PSTN_72","Prefixes":["+4972"]}} -{"Key":"RET","Object":{"Id":"RET","Prefixes":["0723","0724"]}} -{"Key":"nat","Object":{"Id":"nat","Prefixes":["0257","0256","0723"]}}]` - if scribe.DestBuf.String() != expected { - t.Error("Error in destination history content:", scribe.DestBuf.String()) + buf := scribe.BufMap[history.DESTINATIONS_FN] + expected := `[{"Id":"ALL","Prefixes":["49","41","43"]}, +{"Id":"GERMANY","Prefixes":["49"]}, +{"Id":"GERMANY_O2","Prefixes":["41"]}, +{"Id":"GERMANY_PREMIUM","Prefixes":["43"]}, +{"Id":"NAT","Prefixes":["0256","0257","0723"]}, +{"Id":"PSTN_70","Prefixes":["+4970"]}, +{"Id":"PSTN_71","Prefixes":["+4971"]}, +{"Id":"PSTN_72","Prefixes":["+4972"]}, +{"Id":"RET","Prefixes":["0723","0724"]}, +{"Id":"nat","Prefixes":["0257","0256","0723"]}]` + if buf.String() != expected { + t.Error("Error in destination history content:", buf.String()) } } diff --git a/engine/ratingplan.go b/engine/ratingplan.go index 5db3fc4b7..d0ddf2249 100644 --- a/engine/ratingplan.go +++ b/engine/ratingplan.go @@ -18,6 +18,12 @@ along with this program. If not, see package engine +import ( + "encoding/json" + + "github.com/cgrates/cgrates/history" +) + /* The struture that is saved to storage. */ @@ -97,3 +103,13 @@ func (rp *RatingPlan) AddRateInterval(dId string, ris ...*RateInterval) { func (rp *RatingPlan) Equal(o *RatingPlan) bool { return rp.Id == o.Id } + +// history record method +func (rp *RatingPlan) GetHistoryRecord() history.Record { + js, _ := json.Marshal(rp) + return history.Record{ + Id: rp.Id, + Filename: history.RATING_PLANS_FN, + Payload: js, + } +} diff --git a/engine/ratingprofile.go b/engine/ratingprofile.go index 869c55ed1..12bd6ea5e 100644 --- a/engine/ratingprofile.go +++ b/engine/ratingprofile.go @@ -19,12 +19,14 @@ along with this program. If not, see package engine import ( + "encoding/json" "errors" "fmt" "sort" "time" "github.com/cgrates/cgrates/cache2go" + "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" ) @@ -146,3 +148,13 @@ func (rp *RatingProfile) GetRatingPlansForPrefix(cd *CallDescriptor) (err error) return errors.New("not found") } + +// history record method +func (rpf *RatingProfile) GetHistoryRecord() history.Record { + js, _ := json.Marshal(rpf) + return history.Record{ + Id: rpf.Id, + Filename: history.RATING_PROFILES_FN, + Payload: js, + } +} diff --git a/engine/storage_map.go b/engine/storage_map.go index 0c0a6b8a9..d2c0810a3 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -25,7 +25,6 @@ import ( "strings" "time" "github.com/cgrates/cgrates/cache2go" - "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" ) @@ -139,7 +138,8 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) { result, err := ms.ms.Marshal(rp) ms.dict[RATING_PLAN_PREFIX+rp.Id] = result response := 0 - go historyScribe.Record(&history.Record{Key: RATING_PLAN_PREFIX + rp.Id, Object: rp}, &response) + + go historyScribe.Record(rp.GetHistoryRecord(), &response) cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) return } @@ -167,7 +167,7 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) { result, err := ms.ms.Marshal(rpf) ms.dict[RATING_PROFILE_PREFIX+rpf.Id] = result response := 0 - go historyScribe.Record(&history.Record{Key: RATING_PROFILE_PREFIX + rpf.Id, Object: rpf}, &response) + go historyScribe.Record(rpf.GetHistoryRecord(), &response) cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return } @@ -196,7 +196,7 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) { result, err := ms.ms.Marshal(dest) ms.dict[DESTINATION_PREFIX+dest.Id] = result response := 0 - go historyScribe.Record(&history.Record{Key: DESTINATION_PREFIX + dest.Id, Object: dest}, &response) + go historyScribe.Record(dest.GetHistoryRecord(), &response) cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) return } diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index dfe22954d..f136ad71b 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -23,7 +23,6 @@ import ( "log" "time" - "github.com/cgrates/cgrates/history" "labix.org/v2/mgo" "labix.org/v2/mgo/bson" ) @@ -138,7 +137,7 @@ func (ms *MongoStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) { func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error { if historyScribe != nil { response := 0 - historyScribe.Record(&history.Record{Key: RATING_PLAN_PREFIX + rp.Id, Object: rp}, &response) + historyScribe.Record(rp.GetHistoryRecord(), &response) } return ms.db.C("ratingplans").Insert(rp) } @@ -152,7 +151,7 @@ func (ms *MongoStorage) GetRatingProfile(key string) (rp *RatingProfile, err err func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error { if historyScribe != nil { response := 0 - historyScribe.Record(&history.Record{Key: RATING_PROFILE_PREFIX + rp.Id, Object: rp}, &response) + historyScribe.Record(rp.GetHistoryRecord(), &response) } return ms.db.C("ratingprofiles").Insert(rp) } @@ -169,7 +168,7 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err func (ms *MongoStorage) SetDestination(dest *Destination) error { if historyScribe != nil { response := 0 - historyScribe.Record(&history.Record{Key: DESTINATION_PREFIX + dest.Id, Object: dest}, &response) + historyScribe.Record(dest.GetHistoryRecord(), &response) } return ms.db.C("destinations").Insert(dest) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index fd99a5d91..d874cd121 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -25,7 +25,6 @@ import ( "fmt" "github.com/cgrates/cgrates/cache2go" - "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" "github.com/hoisie/redis" @@ -214,7 +213,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) { err = rs.db.Set(RATING_PLAN_PREFIX+rp.Id, b.Bytes()) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(&history.Record{Key: RATING_PLAN_PREFIX + rp.Id, Object: rp}, &response) + go historyScribe.Record(rp.GetHistoryRecord(), &response) } //cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) return @@ -242,7 +241,7 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) { err = rs.db.Set(RATING_PROFILE_PREFIX+rpf.Id, result) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(&history.Record{Key: RATING_PROFILE_PREFIX + rpf.Id, Object: rpf}, &response) + go historyScribe.Record(rpf.GetHistoryRecord(), &response) } //cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return @@ -291,7 +290,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { err = rs.db.Set(DESTINATION_PREFIX+dest.Id, b.Bytes()) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(&history.Record{Key: DESTINATION_PREFIX + dest.Id, Object: dest}, &response) + go historyScribe.Record(dest.GetHistoryRecord(), &response) } //cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) return diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 6eab69d07..69d31afc7 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -1129,7 +1129,7 @@ func (self *SQLStorage) GetTpAccountActions(aaFltr *utils.TPAccountActions) (map Tenant: tenant, Account: account, Direction: direction, - ActionPlanId: action_timings_tag, + ActionPlanId: action_timings_tag, ActionTriggersId: action_triggers_tag, } aa[aacts.KeyId()] = aacts diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index 5fc8bb9a5..48499b987 100644 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -47,7 +47,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{ utils.RATING_PLANS_CSV: (*TPCSVImporter).importRatingPlans, utils.RATING_PROFILES_CSV: (*TPCSVImporter).importRatingProfiles, utils.ACTIONS_CSV: (*TPCSVImporter).importActions, - utils.ACTION_PLANS_CSV: (*TPCSVImporter).importActionTimings, + utils.ACTION_PLANS_CSV: (*TPCSVImporter).importActionTimings, utils.ACTION_TRIGGERS_CSV: (*TPCSVImporter).importActionTriggers, utils.ACCOUNT_ACTIONS_CSV: (*TPCSVImporter).importAccountActions, } diff --git a/history/file_scribe.go b/history/file_scribe.go index 0db5ec0f9..5694a97dc 100644 --- a/history/file_scribe.go +++ b/history/file_scribe.go @@ -22,31 +22,22 @@ import ( "bufio" "encoding/json" "errors" + "fmt" "io" "os" "os/exec" "path/filepath" - "strings" "sync" "time" ) -const ( - DESTINATIONS_FILE = "destinations.json" - RATING_PLANS_FILE = "rating_plans.json" - RATING_PROFILES_FILE = "rating_profiles.json" -) - type FileScribe struct { - mu sync.Mutex - fileRoot string - gitCommand string - destinations records - ratingPlans records - ratingProfiles records - loopChecker chan int - waitingFile string - savePeriod time.Duration + mu sync.Mutex + fileRoot string + gitCommand string + loopChecker chan int + waitingFile string + savePeriod time.Duration } func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, error) { @@ -58,32 +49,19 @@ func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, er s := &FileScribe{fileRoot: fileRoot, gitCommand: gitCommand, savePeriod: saveInterval} s.loopChecker = make(chan int) s.gitInit() - if err := s.load(DESTINATIONS_FILE); err != nil { - return nil, err - } - if err := s.load(RATING_PLANS_FILE); err != nil { - return nil, err - } - if err := s.load(RATING_PROFILES_FILE); err != nil { - return nil, err + + for _, fn := range []string{DESTINATIONS_FN, RATING_PLANS_FN, RATING_PROFILES_FN} { + if err := s.load(fn); err != nil { + return nil, err + } } return s, nil } -func (s *FileScribe) Record(rec *Record, out *int) error { +func (s *FileScribe) Record(rec Record, out *int) error { s.mu.Lock() - var fileToSave string - switch { - case strings.HasPrefix(rec.Key, DESTINATION_PREFIX): - s.destinations = s.destinations.SetOrAdd(&Record{rec.Key[len(DESTINATION_PREFIX):], rec.Object}) - fileToSave = DESTINATIONS_FILE - case strings.HasPrefix(rec.Key, RATING_PLAN_PREFIX): - s.ratingPlans = s.ratingPlans.SetOrAdd(&Record{rec.Key[len(RATING_PLAN_PREFIX):], rec.Object}) - fileToSave = RATING_PLANS_FILE - case strings.HasPrefix(rec.Key, RATING_PROFILE_PREFIX): - s.ratingProfiles = s.ratingProfiles.SetOrAdd(&Record{rec.Key[len(RATING_PROFILE_PREFIX):], rec.Object}) - fileToSave = RATING_PROFILES_FILE - } + fileToSave := rec.Filename + recordsMap[fileToSave] = recordsMap[fileToSave].SetOrAdd(&rec) // flood protection for save method (do not save on every loop iteration) if s.waitingFile == fileToSave { @@ -126,21 +104,14 @@ func (s *FileScribe) gitInit() error { if out, err := cmd.Output(); err != nil { return errors.New(string(out) + " " + err.Error()) } - if f, err := os.Create(filepath.Join(s.fileRoot, DESTINATIONS_FILE)); err != nil { - return errors.New(" Error writing destinations file: " + err.Error()) - } else { - f.Close() - } - if f, err := os.Create(filepath.Join(s.fileRoot, RATING_PLANS_FILE)); err != nil { - return errors.New(" Error writing rating plans file: " + err.Error()) - } else { - f.Close() - } - if f, err := os.Create(filepath.Join(s.fileRoot, RATING_PROFILES_FILE)); err != nil { - return errors.New(" Error writing rating profiles file: " + err.Error()) - } else { - f.Close() + for fn, _ := range recordsMap { + if f, err := os.Create(filepath.Join(s.fileRoot, fn)); err != nil { + return fmt.Errorf(" Error writing %s file: %s", fn, err.Error()) + } else { + f.Close() + } } + cmd = exec.Command(s.gitCommand, "add", ".") cmd.Dir = s.fileRoot if out, err := cmd.Output(); err != nil { @@ -169,23 +140,11 @@ func (s *FileScribe) load(filename string) error { defer f.Close() d := json.NewDecoder(f) - switch filename { - case DESTINATIONS_FILE: - if err := d.Decode(&s.destinations); err != nil && err != io.EOF { - return errors.New(" Error loading destinations: " + err.Error()) - } - s.destinations.Sort() - case RATING_PLANS_FILE: - if err := d.Decode(&s.ratingPlans); err != nil && err != io.EOF { - return errors.New(" Error loading rating plans: " + err.Error()) - } - s.ratingPlans.Sort() - case RATING_PROFILES_FILE: - if err := d.Decode(&s.ratingProfiles); err != nil && err != io.EOF { - return errors.New(" Error loading rating profiles: " + err.Error()) - } - s.ratingProfiles.Sort() + records := recordsMap[filename] + if err := d.Decode(&records); err != nil && err != io.EOF { + return fmt.Errorf(" Error loading %s: %s", filename, err.Error()) } + records.Sort() return nil } @@ -198,38 +157,11 @@ func (s *FileScribe) save(filename string) error { } b := bufio.NewWriter(f) - switch filename { - case DESTINATIONS_FILE: - if err := s.format(b, s.destinations); err != nil { - return err - } - case RATING_PLANS_FILE: - if err := s.format(b, s.ratingPlans); err != nil { - return err - } - case RATING_PROFILES_FILE: - if err := s.format(b, s.ratingProfiles); err != nil { - return err - } + records := recordsMap[filename] + if err := format(b, records); err != nil { + return err } b.Flush() f.Close() return s.gitCommit() } - -func (s *FileScribe) format(b io.Writer, recs records) error { - recs.Sort() - b.Write([]byte("[")) - for i, r := range recs { - src, err := json.Marshal(r) - if err != nil { - return err - } - b.Write(src) - if i < len(recs)-1 { - b.Write([]byte(",\n")) - } - } - b.Write([]byte("]")) - return nil -} diff --git a/history/mock_scribe.go b/history/mock_scribe.go index 0f1caf649..808298a8e 100644 --- a/history/mock_scribe.go +++ b/history/mock_scribe.go @@ -21,85 +21,38 @@ package history import ( "bufio" "bytes" - "encoding/json" - "io" - "strings" "sync" ) type MockScribe struct { - mu sync.Mutex - destinations records - ratingPlans records - ratingProfiles records - DestBuf bytes.Buffer - RplBuf bytes.Buffer - RpBuf bytes.Buffer + mu sync.Mutex + BufMap map[string]*bytes.Buffer } func NewMockScribe() (*MockScribe, error) { - return &MockScribe{}, nil + return &MockScribe{BufMap: map[string]*bytes.Buffer{ + DESTINATIONS_FN: bytes.NewBuffer(nil), + RATING_PLANS_FN: bytes.NewBuffer(nil), + RATING_PROFILES_FN: bytes.NewBuffer(nil), + }}, nil } -func (s *MockScribe) Record(rec *Record, out *int) error { - switch { - case strings.HasPrefix(rec.Key, DESTINATION_PREFIX): - s.destinations = s.destinations.SetOrAdd(&Record{rec.Key[len(DESTINATION_PREFIX):], rec.Object}) - s.save(DESTINATIONS_FILE) - case strings.HasPrefix(rec.Key, RATING_PLAN_PREFIX): - s.ratingPlans = s.ratingPlans.SetOrAdd(&Record{rec.Key[len(RATING_PLAN_PREFIX):], rec.Object}) - s.save(RATING_PLANS_FILE) - case strings.HasPrefix(rec.Key, RATING_PROFILE_PREFIX): - s.ratingProfiles = s.ratingProfiles.SetOrAdd(&Record{rec.Key[len(RATING_PROFILE_PREFIX):], rec.Object}) - s.save(RATING_PROFILES_FILE) - } - *out = 0 +func (s *MockScribe) Record(rec Record, out *int) error { + fn := rec.Filename + recordsMap[fn] = recordsMap[fn].SetOrAdd(&rec) + s.save(fn) return nil } func (s *MockScribe) save(filename string) error { s.mu.Lock() defer s.mu.Unlock() - switch filename { - case DESTINATIONS_FILE: - s.DestBuf.Reset() - b := bufio.NewWriter(&s.DestBuf) - defer b.Flush() - if err := s.format(b, s.destinations); err != nil { - return err - } - case RATING_PLANS_FILE: - s.RplBuf.Reset() - b := bufio.NewWriter(&s.RplBuf) - defer b.Flush() - if err := s.format(b, s.ratingPlans); err != nil { - return err - } - case RATING_PROFILES_FILE: - s.RpBuf.Reset() - b := bufio.NewWriter(&s.RpBuf) - defer b.Flush() - if err := s.format(b, s.ratingProfiles); err != nil { - return err - } + records := recordsMap[filename] + s.BufMap[filename].Reset() + b := bufio.NewWriter(s.BufMap[filename]) + defer b.Flush() + if err := format(b, records); err != nil { + return err } - - return nil -} - -func (s *MockScribe) format(b io.Writer, recs records) error { - recs.Sort() - b.Write([]byte("[")) - for i, r := range recs { - src, err := json.Marshal(r) - if err != nil { - return err - } - b.Write(src) - if i < len(recs)-1 { - b.Write([]byte("\n")) - } - } - b.Write([]byte("]")) return nil } diff --git a/history/proxy_scribe.go b/history/proxy_scribe.go index 31fd3efb6..0d5b1b3dd 100644 --- a/history/proxy_scribe.go +++ b/history/proxy_scribe.go @@ -20,11 +20,6 @@ package history import "net/rpc" -const ( - JSON = "json" - GOB = "gob" -) - type ProxyScribe struct { Client *rpc.Client } @@ -38,6 +33,6 @@ func NewProxyScribe(addr string) (*ProxyScribe, error) { return &ProxyScribe{Client: client}, nil } -func (ps *ProxyScribe) Record(rec *Record, out *int) error { +func (ps *ProxyScribe) Record(rec Record, out *int) error { return ps.Client.Call("Scribe.Record", rec, out) } diff --git a/history/scribe.go b/history/scribe.go index 8391feb0e..e1bb9c3a4 100644 --- a/history/scribe.go +++ b/history/scribe.go @@ -19,26 +19,34 @@ along with this program. If not, see package history import ( + "io" + "reflect" "sort" ) const ( - RATING_PLAN_PREFIX = "rpl_" - RATING_PROFILE_PREFIX = "rpf_" - DESTINATION_PREFIX = "dst_" + DESTINATIONS_FN = "destinations.json" + RATING_PLANS_FN = "rating_plans.json" + RATING_PROFILES_FN = "rating_profiles.json" ) type Scribe interface { - Record(record *Record, out *int) error + Record(Record, *int) error } type Record struct { - Key string - Object interface{} + Id string + Filename string + Payload []byte } type records []*Record +var ( + recordsMap = make(map[string]records) + filenameMap = make(map[reflect.Type]string) +) + func (rs records) Len() int { return len(rs) } @@ -48,7 +56,7 @@ func (rs records) Swap(i, j int) { } func (rs records) Less(i, j int) bool { - return rs[i].Key < rs[j].Key + return rs[i].Id < rs[j].Id } func (rs records) Sort() { @@ -58,9 +66,9 @@ func (rs records) Sort() { func (rs records) SetOrAdd(rec *Record) records { //rs.Sort() n := len(rs) - i := sort.Search(n, func(i int) bool { return rs[i].Key >= rec.Key }) - if i < n && rs[i].Key == rec.Key { - rs[i].Object = rec.Object + i := sort.Search(n, func(i int) bool { return rs[i].Id >= rec.Id }) + if i < n && rs[i].Id == rec.Id { + rs[i] = rec } else { // i is the index where it would be inserted. rs = append(rs, nil) @@ -70,17 +78,15 @@ func (rs records) SetOrAdd(rec *Record) records { return rs } -func (rs records) SetOrAddOld(rec *Record) records { - found := false - for _, r := range rs { - if r.Key == rec.Key { - found = true - r.Object = rec.Object - return rs +func format(b io.Writer, recs records) error { + recs.Sort() + b.Write([]byte("[")) + for i, r := range recs { + b.Write(r.Payload) + if i < len(recs)-1 { + b.Write([]byte(",\n")) } } - if !found { - rs = append(rs, rec) - } - return rs + b.Write([]byte("]")) + return nil } diff --git a/history/scribe_test.go b/history/scribe_test.go index 8f586dcf5..31011b98f 100644 --- a/history/scribe_test.go +++ b/history/scribe_test.go @@ -24,17 +24,19 @@ import ( ) func TestHistorySet(t *testing.T) { - rs := records{&Record{"first", "test"}} - rs.SetOrAdd(&Record{"first", "new value"}) - if len(rs) != 1 || rs[0].Object != "new value" { + rs := records{&Record{Id: "first"}} + second := &Record{Id: "first"} + rs.SetOrAdd(second) + if len(rs) != 1 || rs[0] != second { t.Error("error setting new value: ", rs[0]) } } func TestHistoryAdd(t *testing.T) { - rs := records{&Record{"first", "test"}} - rs = rs.SetOrAdd(&Record{"second", "new value"}) - if len(rs) != 2 || rs[1].Object != "new value" { + rs := records{&Record{Id: "first"}} + second := &Record{Id: "second"} + rs = rs.SetOrAdd(second) + if len(rs) != 2 || rs[1] != second { t.Error("error setting new value: ", rs) } } @@ -42,19 +44,9 @@ func TestHistoryAdd(t *testing.T) { func BenchmarkSetOrAdd(b *testing.B) { var rs records for i := 0; i < 1000; i++ { - rs = rs.SetOrAdd(&Record{strconv.Itoa(i), strconv.Itoa(i)}) + rs = rs.SetOrAdd(&Record{Id: strconv.Itoa(i)}) } for i := 0; i < b.N; i++ { - rs.SetOrAdd(&Record{"400", "test"}) - } -} - -func BenchmarkSetOrAddOld(b *testing.B) { - var rs records - for i := 0; i < 1000; i++ { - rs = rs.SetOrAddOld(&Record{strconv.Itoa(i), strconv.Itoa(i)}) - } - for i := 0; i < b.N; i++ { - rs.SetOrAddOld(&Record{"400", "test"}) + rs.SetOrAdd(&Record{Id: "400"}) } } diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index e80b39b8d..cf1e81043 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -133,7 +133,6 @@ func (sm *FSSessionManager) setMaxCallDuration(uuid string, maxDur time.Duration return nil } - // Sends the transfer command to unpark the call to freeswitch func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) { err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) @@ -175,7 +174,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { Destination: ev.GetDestination(), TimeStart: startTime, TimeEnd: startTime.Add(cfg.SMMaxCallDuration), - } + } var remainingDurationFloat float64 err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat) if err != nil { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 66913381f..d9a9f414e 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -221,9 +221,9 @@ type TPAction struct { } type TPActionPlan struct { - TPid string // Tariff plan id - Id string // ActionPlan id - ActionPlan []*TPActionTiming // Set of ActionTiming bindings this profile will group + TPid string // Tariff plan id + Id string // ActionPlan id + ActionPlan []*TPActionTiming // Set of ActionTiming bindings this profile will group } type TPActionTiming struct { @@ -266,7 +266,7 @@ type TPAccountActions struct { Tenant string // Tenant's Id Account string // Account name Direction string // Traffic direction - ActionPlanId string // Id of ActionPlan profile to use + ActionPlanId string // Id of ActionPlan profile to use ActionTriggersId string // Id of ActionTriggers profile to use } diff --git a/utils/consts.go b/utils/consts.go index 3a6135335..8cadcd6ef 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -64,6 +64,7 @@ const ( JSON = "json" MSGPACK = "msgpack" CSV_LOAD = "CSVLOAD" + CGRID = "cgrid" ACCID = "accid" CDRHOST = "cdrhost" CDRSOURCE = "cdrsource" @@ -80,6 +81,7 @@ const ( STATIC_VALUE_PREFIX = "^" CDRE_CSV = "csv" CDRE_DRYRUN = "dry_run" + INTERNAL = "internal" ) var ( diff --git a/utils/coreutils.go b/utils/coreutils.go index 7b49a8e0d..5d9023f6d 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -184,6 +184,6 @@ func ParseDurationWithSecs(durStr string) (time.Duration, error) { return time.ParseDuration(durStr) } -func BalanceKey(tenant, account, direction string ) string { +func BalanceKey(tenant, account, direction string) string { return fmt.Sprintf("%s:%s:%s", direction, tenant, account) } diff --git a/utils/ratedcdr.go b/utils/ratedcdr.go index 51798d95e..da4846d78 100644 --- a/utils/ratedcdr.go +++ b/utils/ratedcdr.go @@ -19,6 +19,8 @@ along with this program. If not, see package utils import ( + "net/url" + "strconv" "time" ) @@ -127,3 +129,24 @@ func (ratedCdr *RatedCDR) GetExtraFields() map[string]string { func (ratedCdr *RatedCDR) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld, accountFld, subjectFld, destFld, answerTimeFld, durationFld string, extraFlds []string, fieldsMandatory bool) (*RatedCDR, error) { return ratedCdr, nil } + +// Converts part of the rated Cdr as httpForm used to post remotely to CDRS +func (ratedCdr *RatedCDR) AsRawCdrHttpForm() url.Values { + v := url.Values{} + v.Set(ACCID, ratedCdr.AccId) + v.Set(CDRHOST, ratedCdr.CdrHost) + v.Set(CDRSOURCE, ratedCdr.CdrSource) + v.Set(REQTYPE, ratedCdr.ReqType) + v.Set(DIRECTION, ratedCdr.Direction) + v.Set(TENANT, ratedCdr.Tenant) + v.Set(TOR, ratedCdr.TOR) + v.Set(ACCOUNT, ratedCdr.Account) + v.Set(SUBJECT, ratedCdr.Subject) + v.Set(DESTINATION, ratedCdr.Destination) + v.Set(ANSWER_TIME, ratedCdr.AnswerTime.String()) + v.Set(DURATION, strconv.FormatFloat(ratedCdr.Duration.Seconds(), 'f', -1, 64)) + for fld, val := range ratedCdr.ExtraFields { + v.Set(fld, val) + } + return v +} diff --git a/utils/ratedcdr_test.go b/utils/ratedcdr_test.go index d3ff122e5..7802924d9 100644 --- a/utils/ratedcdr_test.go +++ b/utils/ratedcdr_test.go @@ -98,3 +98,53 @@ func TestRatedCdrFields(t *testing.T) { t.Error("Error parsing cdr: ", ratedCdr) } } + +func TestAsRawCdrHttpForm(t *testing.T) { + ratedCdr := RatedCDR{CgrId: FSCgrId("dsafdsaf"), AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", + TOR: "call", Account: "1001", Subject: "1001", Destination: "1002", AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + Duration: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, + } + cdrForm := ratedCdr.AsRawCdrHttpForm() + if cdrForm.Get(ACCID) != ratedCdr.AccId { + t.Errorf("Expected: %s, received: %s", ratedCdr.AccId, cdrForm.Get(ACCID)) + } + if cdrForm.Get(CDRHOST) != ratedCdr.CdrHost { + t.Errorf("Expected: %s, received: %s", ratedCdr.CdrHost, cdrForm.Get(CDRHOST)) + } + if cdrForm.Get(CDRSOURCE) != ratedCdr.CdrSource { + t.Errorf("Expected: %s, received: %s", ratedCdr.CdrSource, cdrForm.Get(CDRSOURCE)) + } + if cdrForm.Get(REQTYPE) != ratedCdr.ReqType { + t.Errorf("Expected: %s, received: %s", ratedCdr.ReqType, cdrForm.Get(REQTYPE)) + } + if cdrForm.Get(DIRECTION) != ratedCdr.Direction { + t.Errorf("Expected: %s, received: %s", ratedCdr.Direction, cdrForm.Get(DIRECTION)) + } + if cdrForm.Get(TENANT) != ratedCdr.Tenant { + t.Errorf("Expected: %s, received: %s", ratedCdr.Tenant, cdrForm.Get(TENANT)) + } + if cdrForm.Get(TOR) != ratedCdr.TOR { + t.Errorf("Expected: %s, received: %s", ratedCdr.TOR, cdrForm.Get(TOR)) + } + if cdrForm.Get(ACCOUNT) != ratedCdr.Account { + t.Errorf("Expected: %s, received: %s", ratedCdr.Account, cdrForm.Get(ACCOUNT)) + } + if cdrForm.Get(SUBJECT) != ratedCdr.Subject { + t.Errorf("Expected: %s, received: %s", ratedCdr.Subject, cdrForm.Get(SUBJECT)) + } + if cdrForm.Get(DESTINATION) != ratedCdr.Destination { + t.Errorf("Expected: %s, received: %s", ratedCdr.Destination, cdrForm.Get(DESTINATION)) + } + if cdrForm.Get(ANSWER_TIME) != "2013-11-07 08:42:26 +0000 UTC" { + t.Errorf("Expected: %s, received: %s", "2013-11-07 08:42:26 +0000 UTC", cdrForm.Get(ANSWER_TIME)) + } + if cdrForm.Get(DURATION) != "10" { + t.Errorf("Expected: %s, received: %s", "10", cdrForm.Get(DURATION)) + } + if cdrForm.Get("field_extr1") != ratedCdr.ExtraFields["field_extr1"] { + t.Errorf("Expected: %s, received: %s", ratedCdr.ExtraFields["field_extr1"], cdrForm.Get("field_extr1")) + } + if cdrForm.Get("fieldextr2") != ratedCdr.ExtraFields["fieldextr2"] { + t.Errorf("Expected: %s, received: %s", ratedCdr.ExtraFields["fieldextr2"], cdrForm.Get("fieldextr2")) + } +}