From 19e994ca8a8da253b8226e36e2358bd27e2da5b6 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 25 Jul 2014 12:25:23 +0200 Subject: [PATCH] Adding CDRSV1.ProcessCdr API to feed cdrs via RPC --- apier/apier_local_test.go | 25 +++++++++++++++++++++++++ cdrc/cdrc.go | 2 +- cdrs/cdrs.go | 4 ++-- cmd/cgr-engine/cgr-engine.go | 2 ++ 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/apier/apier_local_test.go b/apier/apier_local_test.go index d43291b81..cc0ff1176 100644 --- a/apier/apier_local_test.go +++ b/apier/apier_local_test.go @@ -1442,6 +1442,31 @@ func TestLocalGetCdrs(t *testing.T) { } } +func TestLocalProcessCdr(t *testing.T) { + if !*testLocal { + return + } + var reply string + cdr := utils.StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", + CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002", + SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID, + Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dans", + } + if err := rater.Call("CDRSV1.ProcessCdr", cdr, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if reply != utils.OK { + t.Error("Unexpected reply received: ", reply) + } + var cdrs []*utils.StoredCdr + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to CDR to reach db + req := utils.AttrGetCdrs{} + if err := rater.Call("ApierV1.GetCdrs", req, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 3 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } +} + func TestLocalSetDC(t *testing.T) { if !*testLocal { return diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 4943b5dac..b059c54d9 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -220,7 +220,7 @@ func (self *Cdrc) processFile(filePath string) error { continue } if self.cdrsAddress == utils.INTERNAL { - if err := self.cdrServer.ProcessRawCdr(storedCdr); err != nil { + if err := self.cdrServer.ProcessCdr(storedCdr); err != nil { engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", procRowNr, err.Error())) continue } diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index 7122b20cc..12a07e394 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -88,6 +88,6 @@ func (cdrs *CDRS) RegisterHanlersToServer(server *engine.Server) { } // Used to internally process CDR -func (cdrs *CDRS) ProcessRawCdr(rawCdr utils.RawCdr) error { - return storeAndMediate(rawCdr.AsStoredCdr()) +func (cdrs *CDRS) ProcessCdr(cdr *utils.StoredCdr) error { + return storeAndMediate(cdr) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 2111bc5d6..c40a156e9 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -187,6 +187,8 @@ func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, d } cdrServer = cdrs.New(cdrDb, medi, cfg) cdrServer.RegisterHanlersToServer(server) + engine.Logger.Info("Registering CDRS RPC service.") + server.RpcRegister(&apier.CDRSV1{CdrSrv: cdrServer}) close(doneChan) }