From d60a5f1b79676cb650f344976597fa7628bf2082 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 22 Dec 2013 19:44:45 +0100 Subject: [PATCH] RatedCDR maintaining CDR interface, Cdrc csv reader does not longer stops in case of row error, further test related fixups --- apier/v1/apier_local_test.go | 2 +- cdrc/.cdrc.go.swp | Bin 16384 -> 20480 bytes cdrc/cdrc.go | 31 ++++++++++++++----- cdrs/cgrcdr_test.go | 2 +- local_test.sh | 5 ++- utils/ratedcdr.go | 58 +++++++++++++++++++++++++++++++++++ 6 files changed, 87 insertions(+), 11 deletions(-) diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 1594dc6bf..160124d92 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -77,7 +77,7 @@ func TestCreateTables(t *testing.T) { mysql = d.(*engine.MySQLStorage) } for _, scriptName := range []string{engine.CREATE_CDRS_TABLES_SQL, engine.CREATE_COSTDETAILS_TABLES_SQL, engine.CREATE_MEDIATOR_TABLES_SQL, engine.CREATE_TARIFFPLAN_TABLES_SQL} { - if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *stprDbType, scriptName)); err != nil { + if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, scriptName)); err != nil { t.Fatal("Error on mysql creation: ", err.Error()) return // No point in going further } diff --git a/cdrc/.cdrc.go.swp b/cdrc/.cdrc.go.swp index d65cf57f8137af991fb1b6da4d62fec90eb1a6ce..e5df2cc19d144e21c6f0450bea512ee6b6a7da1d 100644 GIT binary patch delta 1427 zcmaLWe{2(F7{KwjTXDOwy3MfR55itbw(LwTL*qI(4m7SLpmSsx5HWM!j@?yF=jSO0tgrb&L7MC2N`HgjF71RkQje!jNj|dq5s(AllQ&% zeeZpq_w79+BY~0Qf!3kU4Qu*kEc$@p*C<5#zEguctAx05yAY{_RVuYj^qp&Hs1`-$ z8L#|W-KNHcdeDFB_gm?=0l%&G+fC)=jc+v^mch{Ed?B{fGfW`%kj223a!F3l4{Bl4 zE~g^LtBdu#b;R45C7b*WCK`m;1|2Q9PHykw7#_w7EXF;kM;$bL9}?mMHlh`O68cq? z@Dg6cbJ&XlI_tet(z!~WUaseH7URf6;b}aDpX-FUiR-w8L)eJ5=))>Ve5naBgUgu0 zC-@lqF@R3otQBG!XKIOmlFk7Xum^Fh#3KAwBgEI3#Ism~I9d?IJGTk(2FB5etMdqo zk8lZxu^Anx!SCGkFBrwkI1&g>lx|do_6f~Vxm}JWEpymCVJx_dk=eny&aK(-^3|qg zW-}@!cQ|%ZWwWNWRkmd#a=V>pFlX}+j>xnXk(S!ME|JT)Q{7A5KVo;(nrW#Vr^j#) zbv1|mNxjz_lS>RQx-1CIAD1i}>(bq+u5C3d9!)vPmC}vHCf$2up*#KOxZ~K4G%cCT z=4IL(R_?mk-ELF2alesEOgXxB1fV`wH?wnw_3KN+(^&T z|J!$Aai3JfStV!B@UMI|ZOBoHXS^VBD<$o8B^BcTcV&Bf<(!5gO*G7ZizsIvIv5BS2+6KHxZ!AgPp(7dqfaEZ zE8WX29Oad;yP$V)?wxe+=Bl}e4~*9X?x+!7eCscq%|*85`SWJ`y6&2*UMVJ0LNxQe zD83=DQv3Z%{gLYY4&UGle1?m74`*;1Z(|%|7=>5akD&=?soK4G1O|H0j2Wu;BHqWj z5LJ4V&H?Pl3vjR=KTy3h_#Bs!LLYvmVz1!}PGSrP;Z^hyHsDFDM+?59a;NbcexgFx z!Yj<@@nG%aETG!1;R+OV{6)omiwV4mooGiGGu%`GyOBc%X{dSSKyXp8{OOXTb$8;w@K@ow24T_p&G|S)zDa!lC~yBQ6heI5p%FmDpH%Rwvj~FR0JtW z3yKF1je}l9h$8eLB4rO=q!&LBgeom63W^sI1kc{o|I&-#3qNLP@3wH#VK# zapJJ8`g?kXii*s}pA_C3BHubhilgO5qiZ3*7>SrtW8d^cR7tOk#;xCfR?GH|M;}KT zO?6WR+FRIu&3V5UF;z2$y@q-d3Y@K0&wIsGv!&J-6uH*A8nGGtT8M~DVhCOM#r2;t zgK3o2n+CSYA4)*UG9Iv>_b;c@gF{LUcmC32Gi#yf>syuNn`)~CBq~1TU g Parsing folder %s for CDR files.", self.cgrCfg.CdrcCdrInDir)) filesInDir, _ := ioutil.ReadDir(self.cgrCfg.CdrcCdrInDir) for _, file := range filesInDir { if err := self.processFile(path.Join(self.cgrCfg.CdrcCdrInDir, file.Name())); err != nil { @@ -141,11 +143,11 @@ func (self *Cdrc) trackCDRFiles() (err error) { if err != nil { return } - engine.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cgrCfg.CdrcCdrInDir)) + engine.Logger.Info(fmt.Sprintf(" Monitoring %s for file moves.", self.cgrCfg.CdrcCdrInDir)) for { select { case ev := <-watcher.Event: - if ev.IsCreate() && path.Ext(ev.Name) != ".csv" { + if ev.IsCreate() { //&& path.Ext(ev.Name) != ".csv" if err = self.processFile(ev.Name); err != nil { return err } @@ -160,7 +162,7 @@ func (self *Cdrc) trackCDRFiles() (err error) { // Processe file at filePath and posts the valid cdr rows out of it func (self *Cdrc) processFile(filePath string) error { _, fn := path.Split(filePath) - engine.Logger.Info(fmt.Sprintf("Parsing: %s", filePath)) + engine.Logger.Info(fmt.Sprintf(" Parsing: %s", filePath)) file, err := os.Open(filePath) defer file.Close() if err != nil { @@ -168,17 +170,30 @@ func (self *Cdrc) processFile(filePath string) error { return err } csvReader := csv.NewReader(bufio.NewReader(file)) - for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() { + for { + record, err := csvReader.Read() + if err != nil && err == io.EOF { + break // End of file + } else if err != nil { + engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) + continue // Other csv related errors, ignore + } cdrAsForm, err := self.cdrAsHttpForm(record) if err != nil { - engine.Logger.Err(err.Error()) + engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) continue } if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.CdrcCdrs), cdrAsForm); err != nil { - engine.Logger.Err(fmt.Sprintf("Failed posting CDR, error: %s", err.Error())) + engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, error: %s", err.Error())) continue } } // Finished with file, move it to processed folder - return os.Rename(filePath, path.Join(self.cgrCfg.CdrcCdrOutDir, fn)) + newPath := path.Join(self.cgrCfg.CdrcCdrOutDir, fn) + if err:= os.Rename(filePath, newPath); err != nil { + engine.Logger.Err(err.Error()) + return err + } + engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s", fn, newPath)) + return nil } diff --git a/cdrs/cgrcdr_test.go b/cdrs/cgrcdr_test.go index b16a42de7..37f378671 100644 --- a/cdrs/cgrcdr_test.go +++ b/cdrs/cgrcdr_test.go @@ -32,7 +32,7 @@ curl --data "accid=asbfdsaf&cdrhost=192.168.1.1&reqtype=rated&direction=*out&ten func TestCgrCdrFields(t *testing.T) { cfg, _ = config.NewDefaultCGRConfig() cgrCdr := CgrCdr{"accid": "dsafdsaf", "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", "tenant": "cgrates.org", "tor": "call", - "account": "1001", "subject": "1001", "destination": "1002", "answer_time": "1383813746", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"} + "account": "1001", "subject": "1001", "destination": "1002", "answer_time": "2013-11-07 08:42:26", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"} if cgrCdr.GetCgrId() != utils.FSCgrId("dsafdsaf") { t.Error("Error parsing cdr: ", cgrCdr) } diff --git a/local_test.sh b/local_test.sh index a93d87f53..77c35e2a3 100755 --- a/local_test.sh +++ b/local_test.sh @@ -6,8 +6,11 @@ go test github.com/cgrates/cgrates/apier/v1 -local ap=$? go test github.com/cgrates/cgrates/engine -local en=$? +go test github.com/cgrates/cgrates/cdrc -local +cdrc=$? -exit $gen && $ap && $en + +exit $gen && $ap && $en && cdrc diff --git a/utils/ratedcdr.go b/utils/ratedcdr.go index 19fee54fe..30146e323 100644 --- a/utils/ratedcdr.go +++ b/utils/ratedcdr.go @@ -41,3 +41,61 @@ type RatedCDR struct { MediationRunId string Cost float64 } + +// Methods maintaining CDR interface + +func (ratedCdr *RatedCDR) GetCgrId() string { + return ratedCdr.CgrId +} + +func (ratedCdr *RatedCDR) GetAccId() string { + return ratedCdr.AccId +} + +func (ratedCdr *RatedCDR) GetCdrHost() string { + return ratedCdr.CdrHost +} + +func (ratedCdr *RatedCDR) GetCdrSource() string { + return ratedCdr.CdrSource +} + +func (ratedCdr *RatedCDR) GetDirection() string { + return ratedCdr.Direction +} + +func (ratedCdr *RatedCDR) GetSubject() string { + return ratedCdr.Subject +} + +func (ratedCdr *RatedCDR) GetAccount() string { + return ratedCdr.Account +} + +func (ratedCdr *RatedCDR) GetDestination() string { + return ratedCdr.Destination +} + +func (ratedCdr *RatedCDR) GetTOR() string { + return ratedCdr.TOR +} + +func (ratedCdr *RatedCDR) GetTenant() string { + return ratedCdr.Tenant +} + +func (ratedCdr *RatedCDR) GetReqType() string { + return ratedCdr.ReqType +} + +func (ratedCdr *RatedCDR) GetAnswerTime() (time.Time, error) { + return ratedCdr.AnswerTime, nil +} + +func (ratedCdr *RatedCDR) GetDuration() int64 { + return ratedCdr.Duration +} + +func (ratedCdr *RatedCDR) GetExtraFields() map[string]string { + return ratedCdr.ExtraFields +}