From ca3b13651fc43ba2603fbe7a3ec3e550f0c7f762 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 20 Dec 2013 22:57:45 +0100 Subject: [PATCH] Adding cdrc together with config dependencies and tests --- cdrc/.cdrc.go.swp | Bin 0 -> 20480 bytes cdrc/.cdrc_test.go.swp | Bin 0 -> 12288 bytes cdrc/cdrc.go | 138 ++++++++++++++++ cdrc/cdrc_test.go | 41 +++++ cdrs/cgrcdr.go | 22 +-- cdrs/fscdr.go | 4 + config/config.go | 147 +++++++++++++----- config/config_test.go | 68 +++++--- config/test_data.txt | 27 +++- data/conf/cgrates.cfg | 49 ++++-- data/storage/mysql/create_cdrs_tables.sql | 1 + data/storage/mysql/create_mediator_tables.sql | 3 +- mediator/mediator.go | 100 +----------- utils/cdr.go | 2 + utils/consts.go | 12 ++ 15 files changed, 425 insertions(+), 189 deletions(-) create mode 100644 cdrc/.cdrc.go.swp create mode 100644 cdrc/.cdrc_test.go.swp create mode 100644 cdrc/cdrc.go create mode 100644 cdrc/cdrc_test.go diff --git a/cdrc/.cdrc.go.swp b/cdrc/.cdrc.go.swp new file mode 100644 index 0000000000000000000000000000000000000000..4916112cbc15f0a26f0412cbc505ea65d8e5786a GIT binary patch literal 20480 zcmeI3U5q4E6~{|KaX^qjKs1nC+ANt~d%I_5Ss=3?+Vsq@4Lcup_b{yMWK+|1ySv=( zszxVDU$xNDj%yc6cU0_1S7&suPpK|Dxpj9m%mVazD9D4=DpF11SS311SS311SS311SS311SS3 z11SS31OEjEY{#-D$1LlbOL;l*|Ni>_wf9)oPr;+$5wHes0Kd7yvc3bouLZ{RQB_uzT(3-A7y#*EWcz2p&rjnO-$T!U}EyX#FUymG(BL(rlp+<?jeamLPsH305%1`R z4wN*QFSUL6mY#EElqQn+VSaI4dq&bF1kYNHu2)y%!o1pFBH1R3+Woo?!#SQC)Kg)E z%`~$j8B9Weg}AY_vE(BCh{OoG-QLRBm)qEk8<|%7LR3H8BcvUx+o6_1k@m=e4Lhp0 zV$=15X?cZtPx5y=U42_rN0l+8;eO~|-YDj|BC3rSV?}j z#;mc}^7_>W#lc_D#h016Ai9X0x8`!vbHq>K`Z)=SDK`640 zXV7nboxE1x_O*?k5$>@|@88INnZ#Qz=AtqF9<7?-2XN^9` zWlM2=6gFp`K$q+7=+wljbaHP|O139nJnqyEiTbZ?;{&9*tXSuiX;3c3D&vtJ4qO`2 zuDDUF?3~T1%(OHhrYY`E#7b_-gMsZeVO)Ipkc}8>;vUFr&`!~Dlwm2MLNUO_Lm3WZ zc9iSI3h{%E@*C>ZYAn^g8JC%%?3KdaUTui{UE@O(R(w9G)DU<q)^MHG*t`v| zb>Hjdi;?e+)1ABRHgk@c?v^_eW?d9*?rw1!x9yy*MnM?sYJ}usz3z9tXihY;bIez} zYahp}>~VKShuEj-^d2>eW)_zDyk9})VKr)?11;e<1_~>dO68fHVzeG#tk!a>GW(I* z$)(wxnkiRiOSSUiLQd6Y7m5qqt1VV?iU$^#<*K@T`<>LF=4`cAUMR+Oiwo72*~;l! zd0yIBu9#xfJE{X@=-6lUc*kBp72D1#I>dMr1MJwCSi=k~V}I6I&o)jZb|kxa9W$Z~ z0cctCc`_IHoszK`d=Ppx4qJE;e}XTT#!c1!kcv%ds64T%P)<<|i9pDDKy?_7^O>?K1;xzy~@& zDSVnhNGGpUxgj6jGG?_pB0@|T3L6_6`KH&+`$4lH(}IUN6^>aa{jLgI<~zM*_q2SY zYPZMK#?0fdZ}t^@$HKC@Lwh=~+iI!1Mg!_Z+=ylEea}pc9Y4^@(UC24k~KeDDII5E z7H=<~DA!IZ?0c?UTbQj@)!br571dI)QY)91PZTR^X}PkrSe>OqRXT#5S!>-$t(0rW z7nf_QxNuUf6e|@PJ9*g9;B!|dS!0qAhiN-}Yy`|9k@UnOTYNy17^>rn8flbziJi85 z>@I5A7;;3T?Y3IesvGJ?x1F=BamqLSZlL^iG43o={ic#-O`w>RLyGd2jEA`ZfvbKa z;_Y1FMjhh|aoebCtF}7WCpck~bwptCW*(rHG)3DT=KOe^orh|ho{3_@7M)q!+|k#D zh!iFwO-nCWlzHfTiv;O1-!27o(!uH*o0e6=1_HO)!U9WKH8nALP?c-pW>{LBw@9a# zPY|vm-9f{l+w^!h6I)D~;q-35yrfF#z=a>R;sUx*CT5A(q2Q=`|r zQTlr7us}7mzkbFXHu>yz!dcFM-MSyjCSN~Hv^{z4^?yIVf06b5H(1MG41UY{{qunTK5X3# zCV|B2KLzdqw}Z>TCE#h|@DG6dz!Z3%82ocUV)8!$KL$^Mb&v(uf3)ly)2k!$vBG&&TcpQ8V+ykx$FA>}S6?hIj1HJ=X zpurmWD7Xi#fUAMT|9=P`1rLKRXaNnTz#oX|KLNfD?gt_8z$w@vb>`b{X94_?!JeqB15rMQGgw0 z=kL_8ps}1HK@svzQCG6IppZ^2NA^-Eq9}?t>lSCw*M7jZc z?Q)cPCJw1%8*%8VXSZEP)U5)4gUxaF{n;RQqoLUPP>6Q>d_6mAzG?nQKR+hTA0e0E zt&~5~%f%@qeDIC+@>NZ;Slf~0g+cP0>zp+S1yZA**hJO|-*!U@3+1L4C;Pyhx&5~6hUT`3klx@%H7!EsA+gKn0 z)I50%T^-(gZkLexZtixF8bIF8*5k=El;rQ!h}_5SBbKu7HY74F89VZ=OkxIsKgrFI zDM!MW9ktPN>n#&2BdjHpoOXu%%$5R%DIR9`Jn0~Gr9PaZvP))&Nza+wGbtpwy+=Ms zlG-zx5Mox9eQRKjOEZqrjHCa{jH8i+rrj rD!v>_n2HPfX;aCSBu~YsgA}Uxa*#+BUv5jMN*)+as!D1NQmg(2_<}{E literal 0 HcmV?d00001 diff --git a/cdrc/.cdrc_test.go.swp b/cdrc/.cdrc_test.go.swp new file mode 100644 index 0000000000000000000000000000000000000000..e8ddb1e84604a7837f306aabc97e12eeb0b69c0c GIT binary patch literal 12288 zcmeI2%W4!s6o$(SLcF6dP_`m7(R8|J;>1Lea& zpz+FYuX4BDwyy0CsUZSHfCvx)B0vO)01+SpM1Tko0U|&IwjhBZV(jH^##(z2JpTXh zz5o9>z}R=_6Z8>!4LyaPK#!q&&;k@d*P!FjE@&sT1A4z7c2EPl3|)jSK@}`i901+SpM1Tko0U|&I{vQG> zD{KIt0qssHm8p_*Jr(zJ5o!_f`ecJI21#3RB|;$=MdY01I39bpn9fwooe{d!)-oyF z#?;lhn^&gFWi}w=brw$1&htbH<&^o63Em(24qyJqORo*iuAVEA2Qu2Y_H-ZdV!%kn6wNw76KRPm2 zs~U@n=U0ck%8*y({@D1bk#Wy6)3u8E?rlfM$7sd%Yp(D5&iXo71rlc1ew{qvk9q1rp z_mWte?Bs+q?H$`vxr0llQkN{1#X#9o<%7mKv%{V@&Ih}$`VP!1_<+-t`c8Yn4YSs8 kw>0~${iQjZ+jN{7qwWqlOt$h&X +*/ + +package cdrc + +import ( + "fmt" + "errors" + "github.com/howeyc/fsnotify" + "os" + "path" + "net/http" + "net/url" + "strconv" + "strings" + "bufio" + "encoding/csv" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/cgrates/engine" +) + + +type Cdrc struct { + cgrCfg *config.CGRConfig + fieldIndxes map[string]int // Key is the name of the field, int is the position in the csv file + httpClient *http.Client +} + +// Parses fieldIndex strings into fieldIndex integers needed +func (self *Cdrc) parseFieldIndexesFromConfig() error { + var err error + // Add main fields here + self.fieldIndxes = make(map[string]int) + // PrimaryCdrFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION} + fieldKeys := []string{utils.ACCID, utils.REQTYPE, utils.DIRECTION, utils.TENANT, utils.TOR, utils.ACCOUNT, utils.SUBJECT, utils.DESTINATION, utils.ANSWER_TIME, utils.DURATION} + fieldIdxStrs := []string{self.cgrCfg.CdrcAccIdField, self.cgrCfg.CdrcReqTypeField, self.cgrCfg.CdrcDirectionField, self.cgrCfg.CdrcTenantField, self.cgrCfg.CdrcTorField, + self.cgrCfg.CdrcAccountField, self.cgrCfg.CdrcSubjectField, self.cgrCfg.CdrcDestinationField, self.cgrCfg.CdrcAnswerTimeField, self.cgrCfg.CdrcDurationField} + for i, strVal := range fieldIdxStrs { + if self.fieldIndxes[fieldKeys[i]], err = strconv.Atoi(strVal); err != nil { + return fmt.Errorf("Cannot parse configuration field %s into integer", fieldKeys[i]) + } + } + // Add extra fields here, extra fields in the form of []string{"indxInCsv1:fieldName1","indexInCsv2:fieldName2"} + for _, fieldWithIdx := range self.cgrCfg.CdrcExtraFields { + splt := strings.Split(fieldWithIdx, ":") + if len(splt) != 2 { + return errors.New("Cannot parse cdrc.extra_fields") + } + if utils.IsSliceMember(utils.PrimaryCdrFields, splt[0]) { + return errors.New("Extra cdrc.extra_fields overwriting primary fields") + } + if self.fieldIndxes[splt[1]], err = strconv.Atoi(splt[0]); err != nil { + return fmt.Errorf("Cannot parse configuration cdrc extra field %s into integer", splt[1]) + } + } + return nil +} + +// Takes the record out of csv and turns it into http form which can be posted +func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) { + v := url.Values{} + for fldName, idx := range self.fieldIndxes { + if len(record) <= idx { + return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, fldName) + } + v.Set(fldName, record[idx]) + } + return v, nil +} + +// Watch the specified folder for file moves and parse the files on events +func (self *Cdrc) trackCDRFiles() (err error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return + } + defer watcher.Close() + err = watcher.Watch(self.cgrCfg.CdrcCdrInDir) + if err != nil { + return + } + engine.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cgrCfg.CdrcCdrInDir)) + for { + select { + case ev := <-watcher.Event: + if ev.IsCreate() && path.Ext(ev.Name) != ".csv" { + engine.Logger.Info(fmt.Sprintf("Parsing: %s", ev.Name)) + if err = self.processFile(ev.Name); err != nil { + return err + } + } + case err := <-watcher.Error: + engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error())) + } + } + return +} + +// Processe file at filePath and posts the valid cdr rows out of it +func (self *Cdrc) processFile(filePath string) error { + file, err := os.Open(filePath) + defer file.Close() + if err != nil { + engine.Logger.Crit(err.Error()) + return err + } + csvReader := csv.NewReader(bufio.NewReader(file)) + for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() { + cdrAsForm, err := self.cdrAsHttpForm(record) + if err != nil { + engine.Logger.Err(err.Error()) + continue + } + if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.CdrcCdrs), cdrAsForm); err != nil { + engine.Logger.Err(fmt.Sprintf("Failed posting CDR, error: %s",err.Error())) + continue + } + } + // Finished with file, move it to processed folder + _, fn := path.Split(filePath) + return os.Rename(filePath, path.Join(self.cgrCfg.CdrcCdrOutDir, fn)) +} diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go new file mode 100644 index 000000000..4f3b4fb1b --- /dev/null +++ b/cdrc/cdrc_test.go @@ -0,0 +1,41 @@ +package cdrc + +import ( + "testing" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +var cgrConfig *config.CGRConfig +var cdrc *Cdrc + +func init() { + cgrConfig, _ = config.NewDefaultCGRConfig() + cdrc = &Cdrc{cgrCfg:cgrConfig} +} + +func TestParseFieldIndexesFromConfig(t *testing.T) { + if err := cdrc.parseFieldIndexesFromConfig(); err != nil { + t.Error("Failed parsing default fieldIndexesFromConfig", err) + } +} + + +func TestCdrAsHttpForm(t *testing.T) { + cdrRow := []string{"firstField", "secondField"} + _, err := cdrc.cdrAsHttpForm(cdrRow) + if err == nil { + t.Error("Failed to corectly detect missing fields from record") + } + cdrRow = []string{"acc1", "prepaid", "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"} + cdrAsForm, err := cdrc.cdrAsHttpForm(cdrRow); + if err != nil { + t.Error("Failed to parse CDR in form", err) + } + if cdrAsForm.Get(utils.REQTYPE) != "prepaid" { + t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE)) + } + if cdrAsForm.Get("supplier") != "supplier1" { + t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE)) + } +} diff --git a/cdrs/cgrcdr.go b/cdrs/cgrcdr.go index 4dd46f4a2..69e141984 100644 --- a/cdrs/cgrcdr.go +++ b/cdrs/cgrcdr.go @@ -25,22 +25,6 @@ import ( "time" ) -const ( - ACCID = "accid" - CDRHOST = "cdrhost" - REQTYPE = "reqtype" - DIRECTION = "direction" - TENANT = "tenant" - TOR = "tor" - ACCOUNT = "account" - SUBJECT = "subject" - DESTINATION = "destination" - TIME_ANSWER = "time_answer" - DURATION = "duration" -) - -var primaryFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, TIME_ANSWER, DURATION} - func NewCgrCdrFromHttpReq(req *http.Request) (CgrCdr, error) { if req.Form == nil { if err := req.ParseForm(); err != nil { @@ -67,6 +51,10 @@ func (cgrCdr CgrCdr) GetAccId() string { func (cgrCdr CgrCdr) GetCdrHost() string { return cgrCdr[CDRHOST] } + +func (cgrCdr CgrCdr) GetCdrSource() string { + return cgrCdr[CDRSOURCE] +} func (cgrCdr CgrCdr) GetDirection() string { //TODO: implement direction return "*out" @@ -99,7 +87,7 @@ func (cgrCdr CgrCdr) GetReqType() string { func (cgrCdr CgrCdr) GetExtraFields() map[string]string { extraFields := make(map[string]string) for k, v := range cgrCdr { - if !utils.IsSliceMember(primaryFields, k) { + if !utils.IsSliceMember(utils.PrimaryCdrFields, k) { extraFields[k] = v } } diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index 69889103e..da76f7c57 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -44,6 +44,7 @@ const ( FS_DURATION = "billsec" FS_USERNAME = "user_name" FS_IP = "sip_local_network_addr" + FS_CDR_SOURCE = "freeswitch_json" ) type FSCdr map[string]string @@ -74,6 +75,9 @@ func (fsCdr FSCdr) GetAccId() string { func (fsCdr FSCdr) GetCdrHost() string { return fsCdr[FS_IP] } +func (fsCdr FSCdr) GetCdrSource() string { + return FS_CDR_SOURCE +} func (fsCdr FSCdr) GetDirection() string { //TODO: implement direction, not related to FS_DIRECTION but traffic towards or from subject/account return "*out" diff --git a/config/config.go b/config/config.go index 98aadb482..ccd03bb92 100644 --- a/config/config.go +++ b/config/config.go @@ -54,7 +54,7 @@ type CGRConfig struct { AccountDBPass string // The user's password. StorDBType string // Should reflect the database type used to store logs StorDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets. - StorDBPort string // The port to bind to. + StorDBPort string // Th e port to bind to. StorDBName string // The name of the database to connect to. StorDBUser string // The user to sign in as. StorDBPass string // The user's password. @@ -78,6 +78,25 @@ type CGRConfig struct { CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal> CDRSExportPath string // Path towards exported cdrs CDRSExportExtraFields []string // Extra fields list to add in exported CDRs + CdrcEnabled bool // Enable CDR client functionality + CdrcCdrs string // Address where to reach CDR server + CdrcCdrsMethod string // Mechanism to use when posting CDRs on server + CdrcRunDelay int // Sleep interval in seconds between consecutive runs, 0 to use automation via inotify + CdrcCdrType string // CDR file format . + CdrcCdrInDir string // Absolute path towards the directory where the CDRs are stored. + CdrcCdrOutDir string // Absolute path towards the directory where processed CDRs will be moved. + CdrcSourceId string // Tag identifying the source of the CDRs within CGRS database. + CdrcAccIdField string // Accounting id field identifier. Use index number in case of .csv cdrs. + CdrcReqTypeField string // Request type field identifier. Use index number in case of .csv cdrs. + CdrcDirectionField string // Direction field identifier. Use index numbers in case of .csv cdrs. + CdrcTenantField string // Tenant field identifier. Use index numbers in case of .csv cdrs. + CdrcTorField string // Type of Record field identifier. Use index numbers in case of .csv cdrs. + CdrcAccountField string // Account field identifier. Use index numbers in case of .csv cdrs. + CdrcSubjectField string // Subject field identifier. Use index numbers in case of .csv CDRs. + CdrcDestinationField string // Destination field identifier. Use index numbers in case of .csv cdrs. + CdrcAnswerTimeField string // Answer time field identifier. Use index numbers in case of .csv cdrs. + CdrcDurationField string // Duration field identifier. Use index numbers in case of .csv cdrs. + CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2" SMEnabled bool SMSwitchType string SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer @@ -87,20 +106,16 @@ type CGRConfig struct { MediatorListen string // Mediator's listening interface: . MediatorRater string // Address where to reach the Rater: MediatorRaterReconnects int // Number of reconnects to rater before giving up. - MediatorCDRType string // CDR type . - MediatorAccIdField string // Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs. MediatorRunIds []string // Identifiers for each mediation run on CDRs - MediatorSubjectFields []string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs. MediatorReqTypeFields []string // Name of request type fields to be used during mediation. Use index number in case of .csv cdrs. MediatorDirectionFields []string // Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs. MediatorTenantFields []string // Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs. MediatorTORFields []string // Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs. MediatorAccountFields []string // Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs. + MediatorSubjectFields []string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs. MediatorDestFields []string // Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs. - MediatorTimeAnswerFields []string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs. + MediatorAnswerTimeFields []string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs. MediatorDurationFields []string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs. - MediatorCDRInDir string // Absolute path towards the directory where the CDRs are kept (file stored CDRs). - MediatorCDROutDir string // Absolute path towards the directory where processed CDRs will be exported (file stored CDRs). FreeswitchServer string // freeswitch address host:port FreeswitchPass string // FS socket password FreeswitchReconnects int // number of times to attempt reconnect after connect fails @@ -151,24 +166,39 @@ func (self *CGRConfig) setDefaults() error { self.CDRSMediator = "" self.CDRSExportPath = "/var/log/cgrates/cdr/out" self.CDRSExportExtraFields = []string{} + self.CdrcEnabled = false + self.CdrcCdrs = "127.0.0.1:2022" + self.CdrcCdrsMethod = "http_cgr" + self.CdrcRunDelay = 0 + self.CdrcCdrType = "csv" + self.CdrcCdrInDir = "/var/log/cgrates/cdr/in/csv" + self.CdrcCdrOutDir = "/var/log/cgrates/cdr/out/csv" + self.CdrcSourceId = "freeswitch_csv" + self.CdrcAccIdField = "0" + self.CdrcReqTypeField = "1" + self.CdrcDirectionField = "2" + self.CdrcTenantField = "3" + self.CdrcTorField = "4" + self.CdrcAccountField = "5" + self.CdrcSubjectField = "6" + self.CdrcDestinationField = "7" + self.CdrcAnswerTimeField = "8" + self.CdrcDurationField = "9" + self.CdrcExtraFields = []string{"10:supplier","11:orig_ip"} self.MediatorEnabled = false self.MediatorListen = "127.0.0.1:2032" self.MediatorRater = "127.0.0.1:2012" self.MediatorRaterReconnects = 3 - self.MediatorCDRType = utils.FSCDR_HTTP_JSON - self.MediatorAccIdField = "accid" - self.MediatorRunIds = []string{"default"} - self.MediatorSubjectFields = []string{"subject"} - self.MediatorReqTypeFields = []string{"reqtype"} - self.MediatorDirectionFields = []string{"direction"} - self.MediatorTenantFields = []string{"tenant"} - self.MediatorTORFields = []string{"tor"} - self.MediatorAccountFields = []string{"account"} - self.MediatorDestFields = []string{"destination"} - self.MediatorTimeAnswerFields = []string{"time_answer"} - self.MediatorDurationFields = []string{"duration"} - self.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv" - self.MediatorCDROutDir = "/var/log/cgrates/cdr/out/freeswitch/csv" + self.MediatorRunIds = []string{} + self.MediatorSubjectFields = []string{} + self.MediatorReqTypeFields = []string{} + self.MediatorDirectionFields = []string{} + self.MediatorTenantFields = []string{} + self.MediatorTORFields = []string{} + self.MediatorAccountFields = []string{} + self.MediatorDestFields = []string{} + self.MediatorAnswerTimeFields = []string{} + self.MediatorDurationFields = []string{} self.SMEnabled = false self.SMSwitchType = FS self.SMRater = "127.0.0.1:2012" @@ -332,6 +362,65 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { return nil, errParse } } + if hasOpt = c.HasOption("cdrc", "enabled"); hasOpt { + cfg.CdrcEnabled, _ = c.GetBool("cdrc", "enabled") + } + if hasOpt = c.HasOption("cdrc", "cdrs"); hasOpt { + cfg.CdrcCdrs, _ = c.GetString("cdrc", "cdrs") + } + if hasOpt = c.HasOption("cdrc", "cdrs_method"); hasOpt { + cfg.CdrcCdrsMethod, _ = c.GetString("cdrc", "cdrs_method") + } + if hasOpt = c.HasOption("cdrc", "run_delay"); hasOpt { + cfg.CdrcRunDelay, _ = c.GetInt("cdrc", "run_delay") + } + if hasOpt = c.HasOption("cdrc", "cdr_type"); hasOpt { + cfg.CdrcCdrType, _ = c.GetString("cdrc", "cdr_type") + } + if hasOpt = c.HasOption("cdrc", "cdr_in_dir"); hasOpt { + cfg.CdrcCdrInDir, _ = c.GetString("cdrc", "cdr_in_dir") + } + if hasOpt = c.HasOption("cdrc", "cdr_out_dir"); hasOpt { + cfg.CdrcCdrOutDir, _ = c.GetString("cdrc", "cdr_out_dir") + } + if hasOpt = c.HasOption("cdrc", "cdr_source_id"); hasOpt { + cfg.CdrcSourceId, _ = c.GetString("cdrc", "cdr_source_id") + } + if hasOpt = c.HasOption("cdrc", "accid_field"); hasOpt { + cfg.CdrcAccIdField, _ = c.GetString("cdrc", "accid_field") + } + if hasOpt = c.HasOption("cdrc", "reqtype_field"); hasOpt { + cfg.CdrcReqTypeField, _ = c.GetString("cdrc", "reqtype_field") + } + if hasOpt = c.HasOption("cdrc", "direction_field"); hasOpt { + cfg.CdrcDirectionField, _ = c.GetString("cdrc", "direction_field") + } + if hasOpt = c.HasOption("cdrc", "tenant_field"); hasOpt { + cfg.CdrcTenantField, _ = c.GetString("cdrc", "tenant_field") + } + if hasOpt = c.HasOption("cdrc", "tor_field"); hasOpt { + cfg.CdrcTorField, _ = c.GetString("cdrc", "tor_field") + } + if hasOpt = c.HasOption("cdrc", "account_field"); hasOpt { + cfg.CdrcAccountField, _ = c.GetString("cdrc", "account_field") + } + if hasOpt = c.HasOption("cdrc", "subject_field"); hasOpt { + cfg.CdrcSubjectField, _ = c.GetString("cdrc", "subject_field") + } + if hasOpt = c.HasOption("cdrc", "destination_field"); hasOpt { + cfg.CdrcDestinationField, _ = c.GetString("cdrc", "destination_field") + } + if hasOpt = c.HasOption("cdrc", "answer_time_field"); hasOpt { + cfg.CdrcAnswerTimeField, _ = c.GetString("cdrc", "answer_time_field") + } + if hasOpt = c.HasOption("cdrc", "duration_field"); hasOpt { + cfg.CdrcDurationField, _ = c.GetString("cdrc", "duration_field") + } + if hasOpt = c.HasOption("cdrc", "extra_fields"); hasOpt { + if cfg.CdrcExtraFields, errParse = ConfigSlice(c, "cdrc", "extra_fields"); errParse != nil { + return nil, errParse + } + } if hasOpt = c.HasOption("mediator", "enabled"); hasOpt { cfg.MediatorEnabled, _ = c.GetBool("mediator", "enabled") } @@ -344,12 +433,6 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { if hasOpt = c.HasOption("mediator", "rater_reconnects"); hasOpt { cfg.MediatorRaterReconnects, _ = c.GetInt("mediator", "rater_reconnects") } - if hasOpt = c.HasOption("mediator", "cdr_type"); hasOpt { - cfg.MediatorCDRType, _ = c.GetString("mediator", "cdr_type") - } - if hasOpt = c.HasOption("mediator", "accid_field"); hasOpt { - cfg.MediatorAccIdField, _ = c.GetString("mediator", "accid_field") - } if hasOpt = c.HasOption("mediator", "run_ids"); hasOpt { if cfg.MediatorRunIds, errParse = ConfigSlice(c, "mediator", "run_ids"); errParse != nil { return nil, errParse @@ -390,8 +473,8 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { return nil, errParse } } - if hasOpt = c.HasOption("mediator", "time_answer_fields"); hasOpt { - if cfg.MediatorTimeAnswerFields, errParse = ConfigSlice(c, "mediator", "time_answer_fields"); errParse != nil { + if hasOpt = c.HasOption("mediator", "answer_time_fields"); hasOpt { + if cfg.MediatorAnswerTimeFields, errParse = ConfigSlice(c, "mediator", "answer_time_fields"); errParse != nil { return nil, errParse } } @@ -400,12 +483,6 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { return nil, errParse } } - if hasOpt = c.HasOption("mediator", "cdr_in_dir"); hasOpt { - cfg.MediatorCDRInDir, _ = c.GetString("mediator", "cdr_in_dir") - } - if hasOpt = c.HasOption("mediator", "cdr_out_dir"); hasOpt { - cfg.MediatorCDROutDir, _ = c.GetString("mediator", "cdr_out_dir") - } if hasOpt = c.HasOption("session_manager", "enabled"); hasOpt { cfg.SMEnabled, _ = c.GetBool("session_manager", "enabled") } diff --git a/config/config_test.go b/config/config_test.go index 873c67971..06e8d3f0b 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -70,6 +70,25 @@ func TestDefaults(t *testing.T) { eCfg.CDRSEnabled = false eCfg.CDRSListen = "127.0.0.1:2022" eCfg.CDRSExtraFields = []string{} + eCfg.CdrcEnabled = false + eCfg.CdrcCdrs = "127.0.0.1:2022" + eCfg.CdrcCdrsMethod = "http_cgr" + eCfg.CdrcRunDelay = 0 + eCfg.CdrcCdrType = "csv" + eCfg.CdrcCdrInDir = "/var/log/cgrates/cdr/in/csv" + eCfg.CdrcCdrOutDir = "/var/log/cgrates/cdr/out/csv" + eCfg.CdrcSourceId = "freeswitch_csv" + eCfg.CdrcAccIdField = "0" + eCfg.CdrcReqTypeField = "1" + eCfg.CdrcDirectionField = "2" + eCfg.CdrcTenantField = "3" + eCfg.CdrcTorField = "4" + eCfg.CdrcAccountField = "5" + eCfg.CdrcSubjectField = "6" + eCfg.CdrcDestinationField = "7" + eCfg.CdrcAnswerTimeField = "8" + eCfg.CdrcDurationField = "9" + eCfg.CdrcExtraFields = []string{"10:supplier","11:orig_ip"} eCfg.CDRSMediator = "" eCfg.CDRSExportPath = "/var/log/cgrates/cdr/out" eCfg.CDRSExportExtraFields = []string{} @@ -77,20 +96,16 @@ func TestDefaults(t *testing.T) { eCfg.MediatorListen = "127.0.0.1:2032" eCfg.MediatorRater = "127.0.0.1:2012" eCfg.MediatorRaterReconnects = 3 - eCfg.MediatorCDRType = "freeswitch_http_json" - eCfg.MediatorAccIdField = "accid" - eCfg.MediatorRunIds = []string{"default"} - eCfg.MediatorSubjectFields = []string{"subject"} - eCfg.MediatorReqTypeFields = []string{"reqtype"} - eCfg.MediatorDirectionFields = []string{"direction"} - eCfg.MediatorTenantFields = []string{"tenant"} - eCfg.MediatorTORFields = []string{"tor"} - eCfg.MediatorAccountFields = []string{"account"} - eCfg.MediatorDestFields = []string{"destination"} - eCfg.MediatorTimeAnswerFields = []string{"time_answer"} - eCfg.MediatorDurationFields = []string{"duration"} - eCfg.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv" - eCfg.MediatorCDROutDir = "/var/log/cgrates/cdr/out/freeswitch/csv" + eCfg.MediatorRunIds = []string{} + eCfg.MediatorSubjectFields = []string{} + eCfg.MediatorReqTypeFields = []string{} + eCfg.MediatorDirectionFields = []string{} + eCfg.MediatorTenantFields = []string{} + eCfg.MediatorTORFields = []string{} + eCfg.MediatorAccountFields = []string{} + eCfg.MediatorDestFields = []string{} + eCfg.MediatorAnswerTimeFields = []string{} + eCfg.MediatorDurationFields = []string{} eCfg.SMEnabled = false eCfg.SMSwitchType = FS eCfg.SMRater = "127.0.0.1:2012" @@ -179,12 +194,29 @@ func TestConfigFromFile(t *testing.T) { eCfg.CDRSMediator = "test" eCfg.CDRSExportPath = "test" eCfg.CDRSExportExtraFields = []string{"test"} + eCfg.CdrcEnabled = true + eCfg.CdrcCdrs = "test" + eCfg.CdrcCdrsMethod = "test" + eCfg.CdrcRunDelay = 99 + eCfg.CdrcCdrType = "test" + eCfg.CdrcCdrInDir = "test" + eCfg.CdrcCdrOutDir = "test" + eCfg.CdrcSourceId = "test" + eCfg.CdrcAccIdField = "test" + eCfg.CdrcReqTypeField = "test" + eCfg.CdrcDirectionField = "test" + eCfg.CdrcTenantField = "test" + eCfg.CdrcTorField = "test" + eCfg.CdrcAccountField = "test" + eCfg.CdrcSubjectField = "test" + eCfg.CdrcDestinationField = "test" + eCfg.CdrcAnswerTimeField = "test" + eCfg.CdrcDurationField = "test" + eCfg.CdrcExtraFields = []string{"test"} eCfg.MediatorEnabled = true eCfg.MediatorListen = "test" eCfg.MediatorRater = "test" eCfg.MediatorRaterReconnects = 99 - eCfg.MediatorCDRType = "test" - eCfg.MediatorAccIdField = "test" eCfg.MediatorRunIds = []string{"test"} eCfg.MediatorSubjectFields = []string{"test"} eCfg.MediatorReqTypeFields = []string{"test"} @@ -193,10 +225,8 @@ func TestConfigFromFile(t *testing.T) { eCfg.MediatorTORFields = []string{"test"} eCfg.MediatorAccountFields = []string{"test"} eCfg.MediatorDestFields = []string{"test"} - eCfg.MediatorTimeAnswerFields = []string{"test"} + eCfg.MediatorAnswerTimeFields = []string{"test"} eCfg.MediatorDurationFields = []string{"test"} - eCfg.MediatorCDRInDir = "test" - eCfg.MediatorCDROutDir = "test" eCfg.SMEnabled = true eCfg.SMSwitchType = "test" eCfg.SMRater = "test" diff --git a/config/test_data.txt b/config/test_data.txt index 8e491467b..9bc43ddc2 100644 --- a/config/test_data.txt +++ b/config/test_data.txt @@ -50,13 +50,32 @@ mediator = test # Address where to reach the Mediator. Empty for disabling me export_path = test # Path where exported cdrs will be written export_extra_fields = test # Extra fields list to be exported +[cdrc] +enabled = true # Enable CDR client functionality +cdrs = test # Address where to reach CDR server +cdrs_method = test # Mechanism to use when posting CDRs on server +run_delay = 99 # Period to sleep between two runs, 0 to use automation via inotify +cdr_type = test # CDR file format . +cdr_in_dir = test # Absolute path towards the directory where the CDRs are kept (file stored CDRs). +cdr_out_dir = test # Absolute path towards the directory where processed CDRs will be moved after processing. +cdr_source_id = test # Tag identifying the source of the CDRs within CGRS database. +accid_field = test # Accounting id field identifier. Use index number in case of .csv cdrs. +reqtype_field = test # Request type field identifier. Use index number in case of .csv cdrs. +direction_field = test # Direction field identifier. Use index numbers in case of .csv cdrs. +tenant_field = test # Tenant field identifier. Use index numbers in case of .csv cdrs. +tor_field = test # Type of Record field identifier. Use index numbers in case of .csv cdrs. +account_field = test # Account field identifier. Use index numbers in case of .csv cdrs. +subject_field = test # Subject field identifier. Use index numbers in case of .csv CDRs. +destination_field = test # Destination field identifier. Use index numbers in case of .csv cdrs. +answer_time_field = test # Answer time field identifier. Use index numbers in case of .csv cdrs. +duration_field = test # Duration field identifier. Use index numbers in case of .csv cdrs. +extra_fields = test # Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2" + [mediator] enabled = true # Starts Mediator service: . listen=test # Mediator's listening interface: . rater = test # Address where to reach the Rater: rater_reconnects = 99 # Number of reconnects to rater before giving up. -cdr_type = test # CDR type . -accid_field = test # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs. run_ids = test # Identifiers for each mediation run on CDRs subject_fields = test # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs. reqtype_fields = test # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs. @@ -65,10 +84,8 @@ tenant_fields = test # Name of tenant fields to be used during mediation. Use tor_fields = test # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs. account_fields = test # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs. destination_fields = test # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs. -time_answer_fields = test # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs. +answer_time_fields = test # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs. duration_fields = test # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs. -cdr_in_dir = test # Absolute path towards the directory where the CDRs are kept (file stored CDRs). -cdr_out_dir = test # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs). [session_manager] enabled = true # Starts SessionManager service: . diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index 7da70c77e..af1e17c3a 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -32,7 +32,6 @@ # rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down> # rounding_decimals = 4 # Number of decimals to round float/costs at - [balancer] # enabled = false # Start Balancer service: . # listen = 127.0.0.1:2012 # Balancer listen interface: . @@ -50,29 +49,45 @@ # listen=127.0.0.1:2022 # CDRS's listening interface: . # extra_fields = # Extra fields to store in CDRs # mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> -# export_path = /var/log/cgrates/cdr/out # Path where the exported CDRs will be placed +# export_path = /var/log/cgrates/cdr/out/cgr # Path where the exported CDRs will be placed # export_extra_fields = # List of extra fields to be exported out in CDRs +[cdrc] +# enabled = false # Enable CDR client functionality +# cdrs = 127.0.0.1:2022 # Address where to reach CDR server +# cdrs_method = http_cgr # Mechanism to use when posting CDRs on server +# run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify +# cdr_type = csv # CDR file format . +# cdr_in_dir = /var/log/cgrates/cdr/in/csv # Absolute path towards the directory where the CDRs are stored. +# cdr_out_dir = /var/log/cgrates/cdr/out/csv # Absolute path towards the directory where processed CDRs will be moved. +# cdr_source_id = freeswitch_csv # Tag identifying the source of the CDRs within CGRS database. +# accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs. +# reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs. +# direction_field = 2 # Direction field identifier. Use index numbers in case of .csv cdrs. +# tenant_field = 3 # Tenant field identifier. Use index numbers in case of .csv cdrs. +# tor_field = 4 # Type of Record field identifier. Use index numbers in case of .csv cdrs. +# account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs. +# subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs. +# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs. +# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs. +# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs. +# extra_fields = 10:supplier,11:orig_ip # Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2" + [mediator] # enabled = false # Starts Mediator service: . # listen=internal # Mediator's listening interface: . # rater = 127.0.0.1:2012 # Address where to reach the Rater: # rater_reconnects = 3 # Number of reconnects to rater before giving up. -# accid_field = accid # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs. -# run_ids = default # Identifiers for each mediation run on CDRs -# subject_fields = subject # Name of fields to be used during mediation. Use index numbers in case of .csv cdrs. -# reqtype_fields = reqtype # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs. -# direction_fields = direction # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs. -# tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs. -# tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs. -# account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs. -# destination_fields = destination # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs. -# time_answer_fields = time_answer # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs. -# duration_fields = duration # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs. -# cdr_type = # CDR type, used when running mediator as service . -# cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept (file stored CDRs). -# cdr_out_dir = /var/log/cgrates/cdr/out/freeswitch/csv - # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs). +# run_ids = # Identifiers of each extra mediation to run on CDRs +# reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs. +# direction_fields = # Name of direction fields to be used during extra mediation. Use index numbers in case of .csv cdrs. +# tenant_fields = # Name of tenant fields to be used during extra mediation. Use index numbers in case of .csv cdrs. +# tor_fields = # Name of tor fields to be used during extra mediation. Use index numbers in case of .csv cdrs. +# account_fields = # Name of account fields to be used during extra mediation. Use index numbers in case of .csv cdrs. +# subject_fields = # Name of fields to be used during extra mediation. Use index numbers in case of .csv cdrs. +# destination_fields = # Name of destination fields to be used during extra mediation. Use index numbers in case of .csv cdrs. +# answer_time_fields = # Name of time_answer fields to be used during extra mediation. Use index numbers in case of .csv cdrs. +# duration_fields = # Name of duration fields to be used during extra mediation. Use index numbers in case of .csv cdrs. [session_manager] # enabled = false # Starts SessionManager service: . diff --git a/data/storage/mysql/create_cdrs_tables.sql b/data/storage/mysql/create_cdrs_tables.sql index e8f453f44..43d6f5bf5 100644 --- a/data/storage/mysql/create_cdrs_tables.sql +++ b/data/storage/mysql/create_cdrs_tables.sql @@ -5,6 +5,7 @@ CREATE TABLE cdrs_primary ( cgrid char(40) NOT NULL, accid varchar(64) NOT NULL, cdrhost varchar(64) NOT NULL, + cdrsource varchar(64) NOT NULL, reqtype varchar(24) NOT NULL, direction varchar(8) NOT NULL, tenant varchar(64) NOT NULL, diff --git a/data/storage/mysql/create_mediator_tables.sql b/data/storage/mysql/create_mediator_tables.sql index d5c98120a..0b2e017d2 100644 --- a/data/storage/mysql/create_mediator_tables.sql +++ b/data/storage/mysql/create_mediator_tables.sql @@ -6,9 +6,10 @@ DROP TABLE IF EXISTS `rated_cdrs`; CREATE TABLE `rated_cdrs` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cgrid` char(40) NOT NULL, + `runid` varchar(64) NOT NULL, `subject` varchar(64) NOT NULL, `cost` DECIMAL(20,4) DEFAULT NULL, `extra_info` text, PRIMARY KEY (`id`), - UNIQUE KEY `costid` (`cgrid`,`subject`) + UNIQUE KEY `costid` (`cgrid`,`runid`) ); diff --git a/mediator/mediator.go b/mediator/mediator.go index 3f4f0d563..a86ccfd98 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -27,7 +27,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/howeyc/fsnotify" "os" "path" "strconv" @@ -83,20 +82,19 @@ func (self *Mediator) loadConfig() error { self.cgrCfg.MediatorTenantFields, self.cgrCfg.MediatorTORFields, self.cgrCfg.MediatorAccountFields, self.cgrCfg.MediatorDestFields, self.cgrCfg.MediatorTimeAnswerFields, self.cgrCfg.MediatorDurationFields} - refIdx := 0 // Subject becomes reference for our checks - if len(cfgVals[refIdx]) == 0 { - return fmt.Errorf("Unconfigured %s fields", fieldKeys[refIdx]) + if len(self.cgrCfg.MediatorRunIds) == 0 { + return errors.New("Unconfigured mediator run_ids") } // All other configured fields must match the length of reference fields for iCfgVal := range cfgVals { - if len(cfgVals[refIdx]) != len(cfgVals[iCfgVal]) { - // Make sure we have everywhere the length of reference key (subject) + if len(self.cgrCfg.MediatorRunIds) != len(cfgVals[iCfgVal]) { + // Make sure we have everywhere the length of runIds return errors.New("Inconsistent lenght of mediator fields.") } } // AccIdField has no special requirements, should just exist - if self.cgrCfg.MediatorAccIdField == "" { + if len(self.cgrCfg.MediatorAccIdField) == 0 { return errors.New("Undefined mediator accid field") } self.accIdField = self.cgrCfg.MediatorAccIdField @@ -136,35 +134,6 @@ func (self *Mediator) loadConfig() error { return nil } -// Watch the specified folder for file moves and parse the files on events -func (self *Mediator) TrackCDRFiles() (err error) { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return - } - defer watcher.Close() - err = watcher.Watch(self.cdrInDir) - if err != nil { - return - } - engine.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cdrInDir)) - for { - select { - case ev := <-watcher.Event: - if ev.IsCreate() && path.Ext(ev.Name) != ".csv" { - engine.Logger.Info(fmt.Sprintf("Parsing: %s", ev.Name)) - err = self.MediateCSVCDR(ev.Name) - if err != nil { - return err - } - } - case err := <-watcher.Error: - engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error())) - } - } - return -} - // Retrive the cost from logging database func (self *Mediator) getCostsFromDB(cdr utils.CDR) (cc *engine.CallCost, err error) { for i := 0; i < 3; i++ { // Mechanism to avoid concurrency between SessionManager writing the costs and mediator picking them up @@ -217,66 +186,7 @@ func (self *Mediator) getCostsFromRater(cdr utils.CDR) (*engine.CallCost, error) return cc, err } -// Parse the files and get cost for every record -func (self *Mediator) MediateCSVCDR(cdrfn string) (err error) { - flag.Parse() - file, err := os.Open(cdrfn) - defer file.Close() - if err != nil { - engine.Logger.Crit(err.Error()) - os.Exit(1) - } - csvReader := csv.NewReader(bufio.NewReader(file)) - _, fn := path.Split(cdrfn) - fout, err := os.Create(path.Join(self.cdrOutDir, fn)) - if err != nil { - return err - } - defer fout.Close() - - w := bufio.NewWriter(fout) - for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() { - //t, _ := time.Parse("2006-01-02 15:04:05", record[5]) - var cc *engine.CallCost - - for runIdx := range self.fieldIdxs["subject"] { // Query costs for every run index given by subject - csvCDR, errCDR := NewFScsvCDR(record, self.accIdIdx, - self.fieldIdxs["subject"][runIdx], - self.fieldIdxs["reqtype"][runIdx], - self.fieldIdxs["direction"][runIdx], - self.fieldIdxs["tenant"][runIdx], - self.fieldIdxs["tor"][runIdx], - self.fieldIdxs["account"][runIdx], - self.fieldIdxs["destination"][runIdx], - self.fieldIdxs["answer_time"][runIdx], - self.fieldIdxs["duration"][runIdx], - self.cgrCfg) - if errCDR != nil { - engine.Logger.Err(fmt.Sprintf(" Could not calculate price for accid: <%s>, err: <%s>", - record[self.accIdIdx], errCDR.Error())) - } - var errCost error - if csvCDR.GetReqType() == utils.PREPAID || csvCDR.GetReqType() == utils.POSTPAID { - // Should be previously calculated and stored in DB - cc, errCost = self.getCostsFromDB(csvCDR) - } else { - cc, errCost = self.getCostsFromRater(csvCDR) - } - cost := "-1" - if errCost != nil || cc == nil { - engine.Logger.Err(fmt.Sprintf(" Could not calculate price for accid: <%s>, err: <%s>, cost: <%v>", csvCDR.GetAccId(), err.Error(), cc)) - } else { - cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64) - engine.Logger.Debug(fmt.Sprintf("Calculated for accid:%s, cost: %v", csvCDR.GetAccId(), cost)) - } - record = append(record, cost) - } - w.WriteString(strings.Join(record, ",") + "\n") - } - w.Flush() - return -} func (self *Mediator) MediateDBCDR(cdr utils.CDR) error { var qryCC *engine.CallCost diff --git a/utils/cdr.go b/utils/cdr.go index 2c3cbb28b..dc1f179aa 100644 --- a/utils/cdr.go +++ b/utils/cdr.go @@ -22,6 +22,8 @@ import ( "time" ) +var PrimaryCdrFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION} + type CDR interface { GetCgrId() string GetAccId() string diff --git a/utils/consts.go b/utils/consts.go index bc24fa015..ec820bf6a 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -63,4 +63,16 @@ const ( JSON = "json" MSGPACK = "msgpack" CSV_LOAD = "CSVLOAD" + ACCID = "accid" + CDRHOST = "cdrhost" + CDRSOURCE = "cdrsource" + REQTYPE = "reqtype" + DIRECTION = "direction" + TENANT = "tenant" + TOR = "tor" + ACCOUNT = "account" + SUBJECT = "subject" + DESTINATION = "destination" + ANSWER_TIME = "answer_time" + DURATION = "duration" )