Adding CDRSV1.ProcessCdr API to feed cdrs via RPC

This commit is contained in:
DanB
2014-07-25 12:25:23 +02:00
parent 4f5894b27e
commit 19e994ca8a
4 changed files with 30 additions and 3 deletions

View File

@@ -1442,6 +1442,31 @@ func TestLocalGetCdrs(t *testing.T) {
}
}
func TestLocalProcessCdr(t *testing.T) {
if !*testLocal {
return
}
var reply string
cdr := utils.StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf",
CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002",
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dans",
}
if err := rater.Call("CDRSV1.ProcessCdr", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
var cdrs []*utils.StoredCdr
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to CDR to reach db
req := utils.AttrGetCdrs{}
if err := rater.Call("ApierV1.GetCdrs", req, &cdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(cdrs) != 3 {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
}
}
func TestLocalSetDC(t *testing.T) {
if !*testLocal {
return

View File

@@ -220,7 +220,7 @@ func (self *Cdrc) processFile(filePath string) error {
continue
}
if self.cdrsAddress == utils.INTERNAL {
if err := self.cdrServer.ProcessRawCdr(storedCdr); err != nil {
if err := self.cdrServer.ProcessCdr(storedCdr); err != nil {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", procRowNr, err.Error()))
continue
}

View File

@@ -88,6 +88,6 @@ func (cdrs *CDRS) RegisterHanlersToServer(server *engine.Server) {
}
// Used to internally process CDR
func (cdrs *CDRS) ProcessRawCdr(rawCdr utils.RawCdr) error {
return storeAndMediate(rawCdr.AsStoredCdr())
func (cdrs *CDRS) ProcessCdr(cdr *utils.StoredCdr) error {
return storeAndMediate(cdr)
}

View File

@@ -187,6 +187,8 @@ func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, d
}
cdrServer = cdrs.New(cdrDb, medi, cfg)
cdrServer.RegisterHanlersToServer(server)
engine.Logger.Info("Registering CDRS RPC service.")
server.RpcRegister(&apier.CDRSV1{CdrSrv: cdrServer})
close(doneChan)
}