diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 21d64d422..146065699 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -51,6 +51,7 @@ type ApierV1 struct { Responder *engine.Responder CdrStatsSrv rpcclient.RpcClientConnection Users rpcclient.RpcClientConnection + CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine } func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error { diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 0e1e7f09e..7bc966260 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -1274,7 +1274,7 @@ func TestApierLoadTariffPlanFromFolder(t *testing.T) { } else if reply != "OK" { t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply) } - time.Sleep(time.Duration(*waitRater) * time.Millisecond) + time.Sleep(time.Duration(2**waitRater) * time.Millisecond) } func TestApierResetDataAfterLoadFromFolder(t *testing.T) { diff --git a/apier/v1/cdrs.go b/apier/v1/cdrs.go index 4e87a2120..a4ce0b5af 100644 --- a/apier/v1/cdrs.go +++ b/apier/v1/cdrs.go @@ -19,6 +19,7 @@ along with this program. If not, see package v1 import ( + "errors" "fmt" "github.com/cgrates/cgrates/engine" @@ -85,3 +86,11 @@ func (apier *ApierV1) RemoveCDRs(attrs utils.RPCCDRsFilter, reply *string) error *reply = "OK" return nil } + +// New way of (re-)rating CDRs +func (apier *ApierV1) RateCDRs(attrs utils.AttrRateCDRs, reply *string) error { + if apier.CDRs == nil { + return errors.New("CDRS_NOT_ENABLED") + } + return apier.CDRs.Call("CDRsV1.RateCDRs", attrs, reply) +} diff --git a/apier/v1/cdrsv1.go b/apier/v1/cdrsv1.go index 09df4c5a6..c6b93d504 100644 --- a/apier/v1/cdrsv1.go +++ b/apier/v1/cdrsv1.go @@ -46,7 +46,7 @@ func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCDR, reply *string) e return nil } -// Remotely start mediation with specific runid, runs asynchronously, it's status will be displayed in syslog +// Remotely (re)rating, deprecated func (self *CdrsV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error { cdrsFltr, err := attrs.AsCDRsFilter(self.CdrSrv.Timezone()) if err != nil { @@ -60,9 +60,5 @@ func (self *CdrsV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error { } func (self *CdrsV1) StoreSMCost(attr engine.AttrCDRSStoreSMCost, reply *string) error { - if err := self.CdrSrv.StoreSMCost(attr, reply); err != nil { - return utils.NewErrServerError(err) - } - *reply = utils.OK - return nil + return self.CdrSrv.V1StoreSMCost(attr, reply) } diff --git a/config/config.go b/config/config.go index 016554415..736923a6c 100644 --- a/config/config.go +++ b/config/config.go @@ -429,6 +429,11 @@ func (self *CGRConfig) checkConfigSanity() error { return errors.New("SMGeneric not enabled but referenced by DiameterAgent component") } } + for _, daPubSubSConn := range self.diameterAgentCfg.PubSubConns { + if daPubSubSConn.Address == utils.MetaInternal && !self.PubSubServerEnabled { + return errors.New("PubSubS not enabled but requested by DiameterAgent component.") + } + } } return nil } diff --git a/engine/cdrs.go b/engine/cdrs.go index 285666ae7..56b643bac 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -122,27 +122,12 @@ func (self *CdrServer) RegisterHandlersToServer(server *utils.Server) { server.RegisterHttpFunc("/freeswitch_json", fsCdrHandler) } -func (self *CdrServer) ProcessCdr(cdr *CDR, reply *string) error { - cacheKey := "ProcessCdr" + cdr.CGRID - if item, err := self.getCache().Get(cacheKey); err == nil && item != nil { - *reply = item.Value.(string) - return item.Err - } - if err := self.LocalProcessCdr(cdr); err != nil { - self.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) - return utils.NewErrServerError(err) - } - self.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: utils.OK}) - *reply = utils.OK - return nil -} - -// RPC method, used to internally process CDR +// Used to internally process CDR func (self *CdrServer) LocalProcessCdr(cdr *CDR) error { return self.processCdr(cdr) } -// RPC method, used to process external CDRs +// Used to process external CDRs func (self *CdrServer) ProcessExternalCdr(eCDR *ExternalCDR) error { cdr, err := NewCDRFromExternalCDR(eCDR, self.cgrCfg.DefaultTimezone) if err != nil { @@ -171,25 +156,6 @@ func (self *CdrServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error { return self.cdrDb.SetSMCost(smCost) } -// RPC method, differs from storeSMCost through it's signature -func (self *CdrServer) StoreSMCost(attr AttrCDRSStoreSMCost, reply *string) error { - return self.storeSMCost(attr.Cost, attr.CheckDuplicate) -} - -// Called by rate/re-rate API -func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) error { - cdrs, _, err := self.cdrDb.GetCDRs(cdrFltr, false) - if err != nil { - return err - } - for _, cdr := range cdrs { - if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSCdrReplication) != 0); err != nil { - utils.Logger.Err(fmt.Sprintf(" Processing CDR %+v, got error: %s", cdr, err.Error())) - } - } - return nil -} - // Returns error if not able to properly store the CDR, mediation is async since we can always recover offline func (self *CdrServer) processCdr(cdr *CDR) (err error) { if cdr.Direction == "" { @@ -508,13 +474,86 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { return nil } +// Called by rate/re-rate API, FixMe: deprecate it once new APIer structure is operational +func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) error { + cdrs, _, err := self.cdrDb.GetCDRs(cdrFltr, false) + if err != nil { + return err + } + for _, cdr := range cdrs { + if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSCdrReplication) != 0); err != nil { + utils.Logger.Err(fmt.Sprintf(" Processing CDR %+v, got error: %s", cdr, err.Error())) + } + } + return nil +} + +func (self *CdrServer) V1ProcessCDR(cdr *CDR, reply *string) error { + cacheKey := "ProcessCdr" + cdr.CGRID + if item, err := self.getCache().Get(cacheKey); err == nil && item != nil { + *reply = item.Value.(string) + return item.Err + } + if err := self.LocalProcessCdr(cdr); err != nil { + self.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) + return utils.NewErrServerError(err) + } + self.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: utils.OK}) + *reply = utils.OK + return nil +} + +// Alias, deprecated after removing CdrServerV1.ProcessCdr +func (self *CdrServer) V1ProcessCdr(cdr *CDR, reply *string) error { + return self.V1ProcessCDR(cdr, reply) +} + +// RPC method, differs from storeSMCost through it's signature +func (self *CdrServer) V1StoreSMCost(attr AttrCDRSStoreSMCost, reply *string) error { + if err := self.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil +} + +// Called by rate/re-rate API, RPC method +func (self *CdrServer) V1RateCDRs(attrs utils.AttrRateCDRs, reply *string) error { + cdrFltr, err := attrs.RPCCDRsFilter.AsCDRsFilter(self.cgrCfg.DefaultTimezone) + if err != nil { + return utils.NewErrServerError(err) + } + cdrs, _, err := self.cdrDb.GetCDRs(cdrFltr, false) + if err != nil { + return err + } + storeCDRs := self.cgrCfg.CDRSStoreCdrs + if attrs.StoreCDRs != nil { + storeCDRs = *attrs.StoreCDRs + } + sendToStats := self.stats != nil + if attrs.SendToStatS != nil { + sendToStats = *attrs.SendToStatS + } + replicate := len(self.cgrCfg.CDRSCdrReplication) != 0 + if attrs.ReplicateCDRs != nil { + replicate = *attrs.ReplicateCDRs + } + for _, cdr := range cdrs { + if err := self.deriveRateStoreStatsReplicate(cdr, storeCDRs, sendToStats, replicate); err != nil { + utils.Logger.Err(fmt.Sprintf(" Processing CDR %+v, got error: %s", cdr, err.Error())) + } + } + return nil +} + func (cdrsrv *CdrServer) Call(serviceMethod string, args interface{}, reply interface{}) error { parts := strings.Split(serviceMethod, ".") if len(parts) != 2 { return utils.ErrNotImplemented } // get method - method := reflect.ValueOf(cdrsrv).MethodByName(parts[1]) + method := reflect.ValueOf(cdrsrv).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method if !method.IsValid() { return utils.ErrNotImplemented } diff --git a/general_tests/tutorial_local_test.go b/general_tests/tutorial_local_test.go index 17d986ea9..d57ce2fdd 100644 --- a/general_tests/tutorial_local_test.go +++ b/general_tests/tutorial_local_test.go @@ -545,7 +545,7 @@ func TestTutLocalProcessExternalCdr(t *testing.T) { Usage: "1", PDD: "7.0", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, } var reply string - if err := tutLocalRpc.Call("CdrsV2.ProcessExternalCdr", cdr, &reply); err != nil { + if err := tutLocalRpc.Call("CdrsV1.ProcessExternalCdr", cdr, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -565,7 +565,7 @@ func TestTutLocalProcessExternalCdrUP(t *testing.T) { ExtraFields: map[string]string{"Cli": "+4986517174964", "fieldextr2": "valextr2", "SysUserName": utils.USERS}, } var reply string - if err := tutLocalRpc.Call("CdrsV2.ProcessExternalCdr", cdr, &reply); err != nil { + if err := tutLocalRpc.Call("CdrsV1.ProcessExternalCdr", cdr, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -645,7 +645,7 @@ func TestTutLocalCostErrors(t *testing.T) { Usage: "1", PDD: "7.0", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, } var reply string - if err := tutLocalRpc.Call("CdrsV2.ProcessExternalCdr", cdr, &reply); err != nil { + if err := tutLocalRpc.Call("CdrsV1.ProcessExternalCdr", cdr, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -671,7 +671,7 @@ func TestTutLocalCostErrors(t *testing.T) { SetupTime: "2014-08-04T13:00:00Z", AnswerTime: "2014-08-04T13:00:07Z", Usage: "1", PDD: "7.0", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, } - if err := tutLocalRpc.Call("CdrsV2.ProcessExternalCdr", cdr2, &reply); err != nil { + if err := tutLocalRpc.Call("CdrsV1.ProcessExternalCdr", cdr2, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -696,7 +696,7 @@ func TestTutLocalCostErrors(t *testing.T) { SetupTime: "2014-08-04T13:00:00Z", AnswerTime: "2014-08-04T13:00:07Z", Usage: "1", PDD: "7.0", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, } - if err := tutLocalRpc.Call("CdrsV2.ProcessExternalCdr", cdr3, &reply); err != nil { + if err := tutLocalRpc.Call("CdrsV1.ProcessExternalCdr", cdr3, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -886,7 +886,7 @@ func TestTutLocalLcrQos(t *testing.T) { ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}} var reply string for _, cdr := range []*engine.CDR{testCdr1, testCdr2} { - if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil { + if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -916,7 +916,7 @@ func TestTutLocalLcrQos(t *testing.T) { Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1003", Subject: "1003", Destination: "1004", SetupTime: time.Date(2014, 12, 7, 8, 42, 24, 0, time.UTC), AnswerTime: time.Date(2014, 12, 7, 8, 42, 26, 0, time.UTC), Usage: time.Duration(180) * time.Second, Supplier: "suppl2"} - if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", testCdr3, &reply); err != nil { + if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", testCdr3, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -983,7 +983,7 @@ func TestTutLocalLcrQosThreshold(t *testing.T) { SetupTime: time.Date(2014, 12, 7, 8, 42, 24, 0, time.UTC), AnswerTime: time.Date(2014, 12, 7, 8, 42, 26, 0, time.UTC), Usage: time.Duration(60) * time.Second, Supplier: "suppl2"} var reply string - if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", testCdr4, &reply); err != nil { // Should drop ACD under the 2m required by threshold, removing suppl2 from lcr + if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", testCdr4, &reply); err != nil { // Should drop ACD under the 2m required by threshold, removing suppl2 from lcr t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -1047,7 +1047,7 @@ func TestTutLocalLcrQosThreshold(t *testing.T) { Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1003", Subject: "1003", Destination: "1004", SetupTime: time.Date(2014, 12, 7, 8, 42, 24, 0, time.UTC), AnswerTime: time.Date(2014, 12, 7, 8, 42, 26, 0, time.UTC), Usage: time.Duration(1) * time.Second, Supplier: "suppl2"} - if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", testCdr5, &reply); err != nil { // Should drop ACD under the 1m required by threshold, removing suppl2 from lcr + if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", testCdr5, &reply); err != nil { // Should drop ACD under the 1m required by threshold, removing suppl2 from lcr t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -1325,12 +1325,12 @@ func TestTutLocalPrepaidCDRWithSMCost(t *testing.T) { }, } var reply string - if err := tutLocalRpc.Call("CdrsV2.StoreSMCost", &engine.AttrCDRSStoreSMCost{Cost: smCost}, &reply); err != nil { + if err := tutLocalRpc.Call("CdrsV1.StoreSMCost", &engine.AttrCDRSStoreSMCost{Cost: smCost}, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) } - if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil { + if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -1363,7 +1363,7 @@ func TestTutLocalPrepaidCDRWithoutSMCost(t *testing.T) { Usage: time.Duration(90) * time.Second, Supplier: "suppl1", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}} var reply string - if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil { + if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 99a45bd8f..7fc17f5dc 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -91,7 +91,7 @@ func (self *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) { cdr := s.eventStart.AsStoredCdr(self.cgrCfg, self.timezone) cdr.Usage = s.TotalUsage() var reply string - self.cdrsrv.Call("CdrServer.ProcessCdr", cdr, &reply) + self.cdrsrv.Call("CdrsV1.ProcessCdr", cdr, &reply) } func (self *SMGeneric) indexSession(uuid string, s *SMGSession) { @@ -474,7 +474,7 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu OriginID: gev.GetUUID(), CostDetails: cc, } - if err := self.cdrsrv.Call("CdrServer.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil && err != utils.ErrExists { + if err := self.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil && err != utils.ErrExists { withErrors = true utils.Logger.Err(fmt.Sprintf(" Could not save CC: %+v, RunID: %s error: %s", cc, sR.DerivedCharger.RunID, err.Error())) } @@ -487,7 +487,7 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu func (self *SMGeneric) ProcessCdr(gev SMGenericEvent) error { var reply string - if err := self.cdrsrv.Call("CdrServer.ProcessCdr", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil { + if err := self.cdrsrv.Call("CdrsV1.ProcessCdr", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil { return err } return nil diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 12bfc9173..80b428e9a 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1171,3 +1171,10 @@ type AttrSMGGetActiveSessions struct { Destination *string Supplier *string } + +type AttrRateCDRs struct { + RPCCDRsFilter + StoreCDRs *bool + SendToStatS *bool // Set to true if the CDRs should be sent to stats server + ReplicateCDRs *bool // Replicate results +}