CDRs refactoring, adding versioning for internal RPC methods

This commit is contained in:
DanB
2016-05-02 17:23:51 +02:00
parent f562b617cc
commit 27d5a7ba2f
9 changed files with 116 additions and 59 deletions

View File

@@ -51,6 +51,7 @@ type ApierV1 struct {
Responder *engine.Responder
CdrStatsSrv rpcclient.RpcClientConnection
Users rpcclient.RpcClientConnection
CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine
}
func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error {

View File

@@ -1274,7 +1274,7 @@ func TestApierLoadTariffPlanFromFolder(t *testing.T) {
} else if reply != "OK" {
t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
time.Sleep(time.Duration(2**waitRater) * time.Millisecond)
}
func TestApierResetDataAfterLoadFromFolder(t *testing.T) {

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"errors"
"fmt"
"github.com/cgrates/cgrates/engine"
@@ -85,3 +86,11 @@ func (apier *ApierV1) RemoveCDRs(attrs utils.RPCCDRsFilter, reply *string) error
*reply = "OK"
return nil
}
// New way of (re-)rating CDRs
func (apier *ApierV1) RateCDRs(attrs utils.AttrRateCDRs, reply *string) error {
if apier.CDRs == nil {
return errors.New("CDRS_NOT_ENABLED")
}
return apier.CDRs.Call("CDRsV1.RateCDRs", attrs, reply)
}

View File

@@ -46,7 +46,7 @@ func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCDR, reply *string) e
return nil
}
// Remotely start mediation with specific runid, runs asynchronously, it's status will be displayed in syslog
// Remotely (re)rating, deprecated
func (self *CdrsV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error {
cdrsFltr, err := attrs.AsCDRsFilter(self.CdrSrv.Timezone())
if err != nil {
@@ -60,9 +60,5 @@ func (self *CdrsV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error {
}
func (self *CdrsV1) StoreSMCost(attr engine.AttrCDRSStoreSMCost, reply *string) error {
if err := self.CdrSrv.StoreSMCost(attr, reply); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return nil
return self.CdrSrv.V1StoreSMCost(attr, reply)
}

View File

@@ -429,6 +429,11 @@ func (self *CGRConfig) checkConfigSanity() error {
return errors.New("SMGeneric not enabled but referenced by DiameterAgent component")
}
}
for _, daPubSubSConn := range self.diameterAgentCfg.PubSubConns {
if daPubSubSConn.Address == utils.MetaInternal && !self.PubSubServerEnabled {
return errors.New("PubSubS not enabled but requested by DiameterAgent component.")
}
}
}
return nil
}

View File

@@ -122,27 +122,12 @@ func (self *CdrServer) RegisterHandlersToServer(server *utils.Server) {
server.RegisterHttpFunc("/freeswitch_json", fsCdrHandler)
}
func (self *CdrServer) ProcessCdr(cdr *CDR, reply *string) error {
cacheKey := "ProcessCdr" + cdr.CGRID
if item, err := self.getCache().Get(cacheKey); err == nil && item != nil {
*reply = item.Value.(string)
return item.Err
}
if err := self.LocalProcessCdr(cdr); err != nil {
self.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return utils.NewErrServerError(err)
}
self.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: utils.OK})
*reply = utils.OK
return nil
}
// RPC method, used to internally process CDR
// Used to internally process CDR
func (self *CdrServer) LocalProcessCdr(cdr *CDR) error {
return self.processCdr(cdr)
}
// RPC method, used to process external CDRs
// Used to process external CDRs
func (self *CdrServer) ProcessExternalCdr(eCDR *ExternalCDR) error {
cdr, err := NewCDRFromExternalCDR(eCDR, self.cgrCfg.DefaultTimezone)
if err != nil {
@@ -171,25 +156,6 @@ func (self *CdrServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error {
return self.cdrDb.SetSMCost(smCost)
}
// RPC method, differs from storeSMCost through it's signature
func (self *CdrServer) StoreSMCost(attr AttrCDRSStoreSMCost, reply *string) error {
return self.storeSMCost(attr.Cost, attr.CheckDuplicate)
}
// Called by rate/re-rate API
func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) error {
cdrs, _, err := self.cdrDb.GetCDRs(cdrFltr, false)
if err != nil {
return err
}
for _, cdr := range cdrs {
if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSCdrReplication) != 0); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Processing CDR %+v, got error: %s", cdr, err.Error()))
}
}
return nil
}
// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline
func (self *CdrServer) processCdr(cdr *CDR) (err error) {
if cdr.Direction == "" {
@@ -508,13 +474,86 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
return nil
}
// Called by rate/re-rate API, FixMe: deprecate it once new APIer structure is operational
func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) error {
cdrs, _, err := self.cdrDb.GetCDRs(cdrFltr, false)
if err != nil {
return err
}
for _, cdr := range cdrs {
if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSCdrReplication) != 0); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Processing CDR %+v, got error: %s", cdr, err.Error()))
}
}
return nil
}
func (self *CdrServer) V1ProcessCDR(cdr *CDR, reply *string) error {
cacheKey := "ProcessCdr" + cdr.CGRID
if item, err := self.getCache().Get(cacheKey); err == nil && item != nil {
*reply = item.Value.(string)
return item.Err
}
if err := self.LocalProcessCdr(cdr); err != nil {
self.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return utils.NewErrServerError(err)
}
self.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: utils.OK})
*reply = utils.OK
return nil
}
// Alias, deprecated after removing CdrServerV1.ProcessCdr
func (self *CdrServer) V1ProcessCdr(cdr *CDR, reply *string) error {
return self.V1ProcessCDR(cdr, reply)
}
// RPC method, differs from storeSMCost through it's signature
func (self *CdrServer) V1StoreSMCost(attr AttrCDRSStoreSMCost, reply *string) error {
if err := self.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return nil
}
// Called by rate/re-rate API, RPC method
func (self *CdrServer) V1RateCDRs(attrs utils.AttrRateCDRs, reply *string) error {
cdrFltr, err := attrs.RPCCDRsFilter.AsCDRsFilter(self.cgrCfg.DefaultTimezone)
if err != nil {
return utils.NewErrServerError(err)
}
cdrs, _, err := self.cdrDb.GetCDRs(cdrFltr, false)
if err != nil {
return err
}
storeCDRs := self.cgrCfg.CDRSStoreCdrs
if attrs.StoreCDRs != nil {
storeCDRs = *attrs.StoreCDRs
}
sendToStats := self.stats != nil
if attrs.SendToStatS != nil {
sendToStats = *attrs.SendToStatS
}
replicate := len(self.cgrCfg.CDRSCdrReplication) != 0
if attrs.ReplicateCDRs != nil {
replicate = *attrs.ReplicateCDRs
}
for _, cdr := range cdrs {
if err := self.deriveRateStoreStatsReplicate(cdr, storeCDRs, sendToStats, replicate); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Processing CDR %+v, got error: %s", cdr, err.Error()))
}
}
return nil
}
func (cdrsrv *CdrServer) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented
}
// get method
method := reflect.ValueOf(cdrsrv).MethodByName(parts[1])
method := reflect.ValueOf(cdrsrv).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
if !method.IsValid() {
return utils.ErrNotImplemented
}

View File

@@ -545,7 +545,7 @@ func TestTutLocalProcessExternalCdr(t *testing.T) {
Usage: "1", PDD: "7.0", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
}
var reply string
if err := tutLocalRpc.Call("CdrsV2.ProcessExternalCdr", cdr, &reply); err != nil {
if err := tutLocalRpc.Call("CdrsV1.ProcessExternalCdr", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -565,7 +565,7 @@ func TestTutLocalProcessExternalCdrUP(t *testing.T) {
ExtraFields: map[string]string{"Cli": "+4986517174964", "fieldextr2": "valextr2", "SysUserName": utils.USERS},
}
var reply string
if err := tutLocalRpc.Call("CdrsV2.ProcessExternalCdr", cdr, &reply); err != nil {
if err := tutLocalRpc.Call("CdrsV1.ProcessExternalCdr", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -645,7 +645,7 @@ func TestTutLocalCostErrors(t *testing.T) {
Usage: "1", PDD: "7.0", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
}
var reply string
if err := tutLocalRpc.Call("CdrsV2.ProcessExternalCdr", cdr, &reply); err != nil {
if err := tutLocalRpc.Call("CdrsV1.ProcessExternalCdr", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -671,7 +671,7 @@ func TestTutLocalCostErrors(t *testing.T) {
SetupTime: "2014-08-04T13:00:00Z", AnswerTime: "2014-08-04T13:00:07Z",
Usage: "1", PDD: "7.0", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
}
if err := tutLocalRpc.Call("CdrsV2.ProcessExternalCdr", cdr2, &reply); err != nil {
if err := tutLocalRpc.Call("CdrsV1.ProcessExternalCdr", cdr2, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -696,7 +696,7 @@ func TestTutLocalCostErrors(t *testing.T) {
SetupTime: "2014-08-04T13:00:00Z", AnswerTime: "2014-08-04T13:00:07Z",
Usage: "1", PDD: "7.0", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
}
if err := tutLocalRpc.Call("CdrsV2.ProcessExternalCdr", cdr3, &reply); err != nil {
if err := tutLocalRpc.Call("CdrsV1.ProcessExternalCdr", cdr3, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -886,7 +886,7 @@ func TestTutLocalLcrQos(t *testing.T) {
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}}
var reply string
for _, cdr := range []*engine.CDR{testCdr1, testCdr2} {
if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil {
if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -916,7 +916,7 @@ func TestTutLocalLcrQos(t *testing.T) {
Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1003", Subject: "1003", Destination: "1004",
SetupTime: time.Date(2014, 12, 7, 8, 42, 24, 0, time.UTC), AnswerTime: time.Date(2014, 12, 7, 8, 42, 26, 0, time.UTC),
Usage: time.Duration(180) * time.Second, Supplier: "suppl2"}
if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", testCdr3, &reply); err != nil {
if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", testCdr3, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -983,7 +983,7 @@ func TestTutLocalLcrQosThreshold(t *testing.T) {
SetupTime: time.Date(2014, 12, 7, 8, 42, 24, 0, time.UTC), AnswerTime: time.Date(2014, 12, 7, 8, 42, 26, 0, time.UTC),
Usage: time.Duration(60) * time.Second, Supplier: "suppl2"}
var reply string
if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", testCdr4, &reply); err != nil { // Should drop ACD under the 2m required by threshold, removing suppl2 from lcr
if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", testCdr4, &reply); err != nil { // Should drop ACD under the 2m required by threshold, removing suppl2 from lcr
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -1047,7 +1047,7 @@ func TestTutLocalLcrQosThreshold(t *testing.T) {
Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1003", Subject: "1003", Destination: "1004",
SetupTime: time.Date(2014, 12, 7, 8, 42, 24, 0, time.UTC), AnswerTime: time.Date(2014, 12, 7, 8, 42, 26, 0, time.UTC),
Usage: time.Duration(1) * time.Second, Supplier: "suppl2"}
if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", testCdr5, &reply); err != nil { // Should drop ACD under the 1m required by threshold, removing suppl2 from lcr
if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", testCdr5, &reply); err != nil { // Should drop ACD under the 1m required by threshold, removing suppl2 from lcr
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -1325,12 +1325,12 @@ func TestTutLocalPrepaidCDRWithSMCost(t *testing.T) {
},
}
var reply string
if err := tutLocalRpc.Call("CdrsV2.StoreSMCost", &engine.AttrCDRSStoreSMCost{Cost: smCost}, &reply); err != nil {
if err := tutLocalRpc.Call("CdrsV1.StoreSMCost", &engine.AttrCDRSStoreSMCost{Cost: smCost}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil {
if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -1363,7 +1363,7 @@ func TestTutLocalPrepaidCDRWithoutSMCost(t *testing.T) {
Usage: time.Duration(90) * time.Second, Supplier: "suppl1",
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}}
var reply string
if err := tutLocalRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil {
if err := tutLocalRpc.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)

View File

@@ -91,7 +91,7 @@ func (self *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) {
cdr := s.eventStart.AsStoredCdr(self.cgrCfg, self.timezone)
cdr.Usage = s.TotalUsage()
var reply string
self.cdrsrv.Call("CdrServer.ProcessCdr", cdr, &reply)
self.cdrsrv.Call("CdrsV1.ProcessCdr", cdr, &reply)
}
func (self *SMGeneric) indexSession(uuid string, s *SMGSession) {
@@ -474,7 +474,7 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu
OriginID: gev.GetUUID(),
CostDetails: cc,
}
if err := self.cdrsrv.Call("CdrServer.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil && err != utils.ErrExists {
if err := self.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil && err != utils.ErrExists {
withErrors = true
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not save CC: %+v, RunID: %s error: %s", cc, sR.DerivedCharger.RunID, err.Error()))
}
@@ -487,7 +487,7 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu
func (self *SMGeneric) ProcessCdr(gev SMGenericEvent) error {
var reply string
if err := self.cdrsrv.Call("CdrServer.ProcessCdr", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil {
if err := self.cdrsrv.Call("CdrsV1.ProcessCdr", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil {
return err
}
return nil

View File

@@ -1171,3 +1171,10 @@ type AttrSMGGetActiveSessions struct {
Destination *string
Supplier *string
}
type AttrRateCDRs struct {
RPCCDRsFilter
StoreCDRs *bool
SendToStatS *bool // Set to true if the CDRs should be sent to stats server
ReplicateCDRs *bool // Replicate results
}