Updated cgrRPC action

This commit is contained in:
Trial97
2020-07-31 12:17:41 +03:00
committed by Dan Christian Bogos
parent fd2a5e0bc8
commit e710e4a25e
9 changed files with 155 additions and 199 deletions

View File

@@ -741,53 +741,47 @@ Sq - CDRStatsQueueTriggered object
We can actually use everythiong that go templates offer. You can read more here: https://golang.org/pkg/text/template/
*/
func cgrRPCAction(ub *Account, a *Action, acs Actions, extraData interface{}) error {
func cgrRPCAction(ub *Account, a *Action, acs Actions, extraData interface{}) (err error) {
// parse template
tmpl := template.New("extra_params")
tmpl.Delims("<<", ">>")
t, err := tmpl.Parse(a.ExtraParameters)
if err != nil {
if tmpl, err = tmpl.Parse(a.ExtraParameters); err != nil {
utils.Logger.Err(fmt.Sprintf("error parsing *cgr_rpc template: %s", err.Error()))
return err
return
}
var buf bytes.Buffer
if err = t.Execute(&buf, struct {
if err = tmpl.Execute(&buf, struct {
Account *Account
Action *Action
Actions Actions
ExtraData interface{}
}{ub, a, acs, extraData}); err != nil {
utils.Logger.Err(fmt.Sprintf("error executing *cgr_rpc template %s:", err.Error()))
return err
return
}
processedExtraParam := buf.String()
//utils.Logger.Info("ExtraParameters: " + parsedExtraParameters)
req := RPCRequest{}
if err := json.Unmarshal([]byte(processedExtraParam), &req); err != nil {
return err
var req RPCRequest
if err = json.Unmarshal(buf.Bytes(), &req); err != nil {
return
}
params, err := utils.GetRpcParams(req.Method)
if err != nil {
return err
var params *utils.RpcParams
if params, err = utils.GetRpcParams(req.Method); err != nil {
return
}
var client rpcclient.ClientConnector
if req.Address != utils.MetaInternal {
if client, err = rpcclient.NewRPCClient(utils.TCP, req.Address, false, "", "", "",
req.Attempts, 0, config.CgrConfig().GeneralCfg().ConnectTimeout,
config.CgrConfig().GeneralCfg().ReplyTimeout, req.Transport,
nil, false); err != nil {
return err
}
} else {
if req.Address == utils.MetaInternal {
client = params.Object.(rpcclient.ClientConnector)
} else if client, err = rpcclient.NewRPCClient(utils.TCP, req.Address, false, "", "", "",
req.Attempts, 0, config.CgrConfig().GeneralCfg().ConnectTimeout,
config.CgrConfig().GeneralCfg().ReplyTimeout, req.Transport,
nil, false); err != nil {
return
}
in, out := params.InParam, params.OutParam
//utils.Logger.Info("Params: " + utils.ToJSON(req.Params))
//p, err := utils.FromMapStringInterfaceValue(req.Params, in)
mapstructure.Decode(req.Params, in)
if err != nil {
if err = mapstructure.Decode(req.Params, in); err != nil {
utils.Logger.Info("<*cgr_rpc> err: " + err.Error())
return err
return
}
if in == nil {
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> nil params err: req.Params: %+v params: %+v", req.Params, params))
@@ -797,13 +791,13 @@ func cgrRPCAction(ub *Account, a *Action, acs Actions, extraData interface{}) er
if !req.Async {
err = client.Call(req.Method, in, out)
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %s err: %v", utils.ToJSON(out), err))
return err
return
}
go func() {
err := client.Call(req.Method, in, out)
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %s err: %v", utils.ToJSON(out), err))
}()
return nil
return
}
func topupZeroNegativeAction(ub *Account, a *Action, acs Actions, extraData interface{}) error {

View File

@@ -1368,8 +1368,8 @@ func (tpr *TpReader) LoadAll() (err error) {
return nil
}
func (tpr *TpReader) IsValid() bool {
valid := true
func (tpr *TpReader) IsValid() (valid bool) {
valid = true
for rplTag, rpl := range tpr.ratingPlans {
if !rpl.isContinous() {
log.Printf("The rating plan %s is not covering all weekdays", rplTag)
@@ -1384,7 +1384,7 @@ func (tpr *TpReader) IsValid() bool {
valid = false
}
}
return valid
return
}
func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error) {
@@ -1398,9 +1398,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("Destinations:")
}
for _, d := range tpr.destinations {
err = tpr.dm.SetDestination(d, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.SetDestination(d, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Print("\t", d.Id, " : ", d.Prefixes)
@@ -1420,9 +1419,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("Rating Plans:")
}
for _, rp := range tpr.ratingPlans {
err = tpr.dm.SetRatingPlan(rp, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.SetRatingPlan(rp, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Print("\t", rp.Id)
@@ -1435,9 +1433,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("Rating Profiles:")
}
for _, rp := range tpr.ratingProfiles {
err = tpr.dm.SetRatingProfile(rp, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.SetRatingProfile(rp, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Print("\t", rp.Id)
@@ -1462,7 +1459,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Println("\tTask: ", t)
}
if err = tpr.dm.DataDB().PushTask(t); err != nil {
return err
return
}
}
if len(ap.AccountIDs) == 0 {
@@ -1474,14 +1471,13 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Println("\tTask: ", t)
}
if err = tpr.dm.DataDB().PushTask(t); err != nil {
return err
return
}
}
}
}
err = tpr.dm.SetActionPlan(k, ap, false, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.SetActionPlan(k, ap, false, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Println("\t", k)
@@ -1498,14 +1494,11 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
for id, vals := range tpr.acntActionPlans {
log.Printf("\t %s : %+v", id, vals)
}
}
if verbose {
log.Print("Action Triggers:")
}
for k, atrs := range tpr.actionsTriggers {
err = tpr.dm.SetActionTriggers(k, atrs, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.SetActionTriggers(k, atrs, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Println("\t", k)
@@ -1518,9 +1511,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("Shared Groups:")
}
for k, sg := range tpr.sharedGroups {
err = tpr.dm.SetSharedGroup(sg, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.SetSharedGroup(sg, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Println("\t", k)
@@ -1533,9 +1525,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("Actions:")
}
for k, as := range tpr.actions {
err = tpr.dm.SetActions(k, as, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.SetActions(k, as, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Println("\t", k)
@@ -1548,9 +1539,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("Account Actions:")
}
for _, ub := range tpr.accountActions {
err = tpr.dm.SetAccount(ub)
if err != nil {
return err
if err = tpr.dm.SetAccount(ub); err != nil {
return
}
if verbose {
log.Println("\t", ub.ID)
@@ -1560,12 +1550,12 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("Filters:")
}
for _, tpTH := range tpr.filters {
th, err := APItoFilter(tpTH, tpr.timezone)
if err != nil {
return err
var th *Filter
if th, err = APItoFilter(tpTH, tpr.timezone); err != nil {
return
}
if err = tpr.dm.SetFilter(th, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", th.TenantID())
@@ -1578,12 +1568,12 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("ResourceProfiles:")
}
for _, tpRsp := range tpr.resProfiles {
rsp, err := APItoResource(tpRsp, tpr.timezone)
if err != nil {
return err
var rsp *ResourceProfile
if rsp, err = APItoResource(tpRsp, tpr.timezone); err != nil {
return
}
if err = tpr.dm.SetResourceProfile(rsp, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", rsp.TenantID())
@@ -1610,12 +1600,12 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("StatQueueProfiles:")
}
for _, tpST := range tpr.sqProfiles {
st, err := APItoStats(tpST, tpr.timezone)
if err != nil {
return err
var st *StatQueueProfile
if st, err = APItoStats(tpST, tpr.timezone); err != nil {
return
}
if err = tpr.dm.SetStatQueueProfile(st, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", st.TenantID())
@@ -1630,13 +1620,13 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
for _, sqTntID := range tpr.statQueues {
metrics := make(map[string]StatMetric)
for _, metric := range tpr.sqProfiles[utils.TenantID{Tenant: sqTntID.Tenant, ID: sqTntID.ID}].Metrics {
if stsMetric, err := NewStatMetric(metric.MetricID,
var stsMetric StatMetric
if stsMetric, err = NewStatMetric(metric.MetricID,
tpr.sqProfiles[utils.TenantID{Tenant: sqTntID.Tenant, ID: sqTntID.ID}].MinItems,
metric.FilterIDs); err != nil {
return err
} else {
metrics[metric.MetricID] = stsMetric
return
}
metrics[metric.MetricID] = stsMetric
}
sq := &StatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID, SQMetrics: metrics}
if err = tpr.dm.SetStatQueue(sq); err != nil {
@@ -1653,12 +1643,12 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("ThresholdProfiles:")
}
for _, tpTH := range tpr.thProfiles {
th, err := APItoThresholdProfile(tpTH, tpr.timezone)
if err != nil {
return err
var th *ThresholdProfile
if th, err = APItoThresholdProfile(tpTH, tpr.timezone); err != nil {
return
}
if err = tpr.dm.SetThresholdProfile(th, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", th.TenantID())
@@ -1672,7 +1662,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
}
for _, thd := range tpr.thresholds {
if err = tpr.dm.SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}); err != nil {
return err
return
}
if verbose {
log.Print("\t", thd.TenantID())
@@ -1685,12 +1675,12 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("RouteProfiles:")
}
for _, tpTH := range tpr.routeProfiles {
th, err := APItoRouteProfile(tpTH, tpr.timezone)
if err != nil {
return err
var th *RouteProfile
if th, err = APItoRouteProfile(tpTH, tpr.timezone); err != nil {
return
}
if err = tpr.dm.SetRouteProfile(th, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", th.TenantID())
@@ -1703,12 +1693,12 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("AttributeProfiles:")
}
for _, tpTH := range tpr.attributeProfiles {
th, err := APItoAttributeProfile(tpTH, tpr.timezone)
if err != nil {
return err
var th *AttributeProfile
if th, err = APItoAttributeProfile(tpTH, tpr.timezone); err != nil {
return
}
if err = tpr.dm.SetAttributeProfile(th, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", th.TenantID())
@@ -1721,13 +1711,12 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("ChargerProfiles:")
}
for _, tpTH := range tpr.chargerProfiles {
th, err := APItoChargerProfile(tpTH, tpr.timezone)
if err != nil {
return err
var th *ChargerProfile
if th, err = APItoChargerProfile(tpTH, tpr.timezone); err != nil {
return
}
if err = tpr.dm.SetChargerProfile(th, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", th.TenantID())
@@ -1740,12 +1729,12 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("DispatcherProfiles:")
}
for _, tpTH := range tpr.dispatcherProfiles {
th, err := APItoDispatcherProfile(tpTH, tpr.timezone)
if err != nil {
return err
var th *DispatcherProfile
if th, err = APItoDispatcherProfile(tpTH, tpr.timezone); err != nil {
return
}
if err = tpr.dm.SetDispatcherProfile(th, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", th.TenantID())
@@ -1760,7 +1749,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
for _, tpTH := range tpr.dispatcherHosts {
th := APItoDispatcherHost(tpTH)
if err = tpr.dm.SetDispatcherHost(th); err != nil {
return err
return
}
if verbose {
log.Print("\t", th.TenantID())
@@ -1774,12 +1763,12 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("RateProfiles:")
}
for _, tpTH := range tpr.rateProfiles {
th, err := APItoRateProfile(tpTH, tpr.timezone)
if err != nil {
return err
var th *RateProfile
if th, err = APItoRateProfile(tpTH, tpr.timezone); err != nil {
return
}
if err = tpr.dm.SetRateProfile(th, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", th.TenantID())
@@ -1794,7 +1783,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
}
for _, t := range tpr.timings {
if err = tpr.dm.SetTiming(t); err != nil {
return err
return
}
if verbose {
log.Print("\t", t.ID)
@@ -1809,7 +1798,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("Rebuilding reverse destinations")
}
if err = tpr.dm.DataDB().RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil {
return err
return
}
}
if len(tpr.acntActionPlans) > 0 {
@@ -1817,14 +1806,11 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
log.Print("Rebuilding account action plans")
}
if err = tpr.dm.DataDB().RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil {
return err
return
}
}
}
if err = tpr.dm.SetLoadIDs(loadIDs); err != nil {
return err
}
return
return tpr.dm.SetLoadIDs(loadIDs)
}
func (tpr *TpReader) ShowStatistics() {
@@ -1900,7 +1886,7 @@ func (tpr *TpReader) ShowStatistics() {
log.Print("RateProfiles: ", len(tpr.rateProfiles))
}
// Returns the identities loaded for a specific category, useful for cache reloads
// GetLoadedIds returns the identities loaded for a specific category, useful for cache reloads
func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
switch categ {
case utils.DESTINATION_PREFIX:
@@ -2065,9 +2051,8 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
loadID := time.Now().UnixNano()
loadIDs := make(map[string]int64)
for _, d := range tpr.destinations {
err = tpr.dm.RemoveDestination(d.Id, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.RemoveDestination(d.Id, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Print("\t", d.Id, " : ", d.Prefixes)
@@ -2078,14 +2063,11 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
for id, vals := range tpr.revDests {
log.Printf("\t %s : %+v", id, vals)
}
}
if verbose {
log.Print("Rating Plans:")
}
for _, rp := range tpr.ratingPlans {
err = tpr.dm.RemoveRatingPlan(rp.Id, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.RemoveRatingPlan(rp.Id, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Print("\t", rp.Id)
@@ -2095,9 +2077,8 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("Rating Profiles:")
}
for _, rp := range tpr.ratingProfiles {
err = tpr.dm.RemoveRatingProfile(rp.Id, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.RemoveRatingProfile(rp.Id, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Print("\t", rp.Id)
@@ -2107,9 +2088,8 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("Action Plans:")
}
for k := range tpr.actionPlans {
err = tpr.dm.RemoveActionPlan(k, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.RemoveActionPlan(k, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Println("\t", k)
@@ -2120,14 +2100,11 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
for id, vals := range tpr.acntActionPlans {
log.Printf("\t %s : %+v", id, vals)
}
}
if verbose {
log.Print("Action Triggers:")
}
for k := range tpr.actionsTriggers {
err = tpr.dm.RemoveActionTriggers(k, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.RemoveActionTriggers(k, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Println("\t", k)
@@ -2137,9 +2114,8 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("Shared Groups:")
}
for k := range tpr.sharedGroups {
err = tpr.dm.RemoveSharedGroup(k, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.RemoveSharedGroup(k, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Println("\t", k)
@@ -2149,9 +2125,8 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("Actions:")
}
for k := range tpr.actions {
err = tpr.dm.RemoveActions(k, utils.NonTransactional)
if err != nil {
return err
if err = tpr.dm.RemoveActions(k, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Println("\t", k)
@@ -2161,9 +2136,8 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("Account Actions:")
}
for _, ub := range tpr.accountActions {
err = tpr.dm.RemoveAccount(ub.ID)
if err != nil {
return err
if err = tpr.dm.RemoveAccount(ub.ID); err != nil {
return
}
if verbose {
log.Println("\t", ub.ID)
@@ -2174,7 +2148,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
}
for _, tpRsp := range tpr.resProfiles {
if err = tpr.dm.RemoveResourceProfile(tpRsp.Tenant, tpRsp.ID, utils.NonTransactional, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpRsp.Tenant, tpRsp.ID))
@@ -2196,7 +2170,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
}
for _, tpST := range tpr.sqProfiles {
if err = tpr.dm.RemoveStatQueueProfile(tpST.Tenant, tpST.ID, utils.NonTransactional, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpST.Tenant, tpST.ID))
@@ -2218,7 +2192,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
}
for _, tpTH := range tpr.thProfiles {
if err = tpr.dm.RemoveThresholdProfile(tpTH.Tenant, tpTH.ID, utils.NonTransactional, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpTH.Tenant, tpTH.ID))
@@ -2229,7 +2203,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
}
for _, thd := range tpr.thresholds {
if err = tpr.dm.RemoveThreshold(thd.Tenant, thd.ID, utils.NonTransactional); err != nil {
return err
return
}
if verbose {
log.Print("\t", thd.TenantID())
@@ -2241,7 +2215,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
}
for _, tpSpl := range tpr.routeProfiles {
if err = tpr.dm.RemoveRouteProfile(tpSpl.Tenant, tpSpl.ID, utils.NonTransactional, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpSpl.Tenant, tpSpl.ID))
@@ -2254,7 +2228,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
for _, tpAttr := range tpr.attributeProfiles {
if err = tpr.dm.RemoveAttributeProfile(tpAttr.Tenant, tpAttr.ID,
utils.NonTransactional, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpAttr.Tenant, tpAttr.ID))
@@ -2267,7 +2241,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
for _, tpChr := range tpr.chargerProfiles {
if err = tpr.dm.RemoveChargerProfile(tpChr.Tenant, tpChr.ID,
utils.NonTransactional, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpChr.Tenant, tpChr.ID))
@@ -2280,7 +2254,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
for _, tpDsp := range tpr.dispatcherProfiles {
if err = tpr.dm.RemoveDispatcherProfile(tpDsp.Tenant, tpDsp.ID,
utils.NonTransactional, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpDsp.Tenant, tpDsp.ID))
@@ -2292,7 +2266,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
for _, tpDsh := range tpr.dispatcherHosts {
if err = tpr.dm.RemoveDispatcherHost(tpDsh.Tenant, tpDsh.ID,
utils.NonTransactional); err != nil {
return err
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpDsh.Tenant, tpDsh.ID))
@@ -2305,7 +2279,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
for _, tpRp := range tpr.rateProfiles {
if err = tpr.dm.RemoveRateProfile(tpRp.Tenant, tpRp.ID,
utils.NonTransactional, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpRp.Tenant, tpRp.ID))
@@ -2317,7 +2291,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
}
for _, t := range tpr.timings {
if err = tpr.dm.RemoveTiming(t.ID, utils.NonTransactional); err != nil {
return err
return
}
if verbose {
log.Print("\t", t.ID)
@@ -2329,7 +2303,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("Removing reverse destinations")
}
if err = tpr.dm.DataDB().RemoveReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil {
return err
return
}
}
if len(tpr.acntActionPlans) > 0 {
@@ -2337,7 +2311,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("Removing account action plans")
}
if err = tpr.dm.DataDB().RemoveReverseForPrefix(utils.AccountActionPlansPrefix); err != nil {
return err
return
}
}
}
@@ -2348,7 +2322,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
for _, tpFltr := range tpr.filters {
if err = tpr.dm.RemoveFilter(tpFltr.Tenant, tpFltr.ID,
utils.NonTransactional, true); err != nil {
return err
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpFltr.Tenant, tpFltr.ID))
@@ -2423,10 +2397,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
if len(tpr.timings) != 0 {
loadIDs[utils.CacheTimings] = loadID
}
if err = tpr.dm.SetLoadIDs(loadIDs); err != nil {
return err
}
return
return tpr.dm.SetLoadIDs(loadIDs)
}
func (tpr *TpReader) ReloadCache(caching string, verbose bool, opts map[string]interface{}) (err error) {
@@ -2554,15 +2525,15 @@ func (tpr *TpReader) ReloadCache(caching string, verbose bool, opts map[string]i
}
//get loadIDs for all types
loadIDs, err := tpr.dm.GetItemLoadIDs(utils.EmptyString, false)
if err != nil {
return err
var loadIDs map[string]int64
if loadIDs, err = tpr.dm.GetItemLoadIDs(utils.EmptyString, false); err != nil {
return
}
cacheLoadIDs := populateCacheLoadIDs(loadIDs, cacheArgs.ArgsCache)
for key, val := range cacheLoadIDs {
if errCh := Cache.Set(utils.CacheLoadIDs, key, val, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional); errCh != nil {
return errCh
if err = Cache.Set(utils.CacheLoadIDs, key, val, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional); err != nil {
return
}
}
return
@@ -2572,14 +2543,15 @@ func (tpr *TpReader) ReloadScheduler(verbose bool) (err error) {
var reply string
aps, _ := tpr.GetLoadedIds(utils.ACTION_PLAN_PREFIX)
// in case we have action plans reload the scheduler
if len(aps) != 0 {
if verbose {
log.Print("Reloading scheduler")
}
if err = connMgr.Call(tpr.schedulerConns, nil, utils.SchedulerSv1Reload,
new(utils.CGREventWithOpts), &reply); err != nil {
log.Printf("WARNING: Got error on scheduler reload: %s\n", err.Error())
}
if len(aps) == 0 {
return
}
if verbose {
log.Print("Reloading scheduler")
}
if err = connMgr.Call(tpr.schedulerConns, nil, utils.SchedulerSv1Reload,
new(utils.CGREventWithOpts), &reply); err != nil {
log.Printf("WARNING: Got error on scheduler reload: %s\n", err.Error())
}
return
}

View File

@@ -119,12 +119,7 @@ func (apiService *APIerSv1Service) Start() (err error) {
apiService.server.RpcRegister(v1.NewReplicatorSv1(datadb))
}
utils.RegisterRpcParams("", &v1.CDRsV1{})
utils.RegisterRpcParams("", &v1.SMGenericV1{})
utils.RegisterRpcParams("", apiService.api)
utils.RegisterRpcParams(utils.ApierV1, apiService.api)
//backwards compatible
apiService.connChan <- apiService.api
apiService.APIerSv1Chan <- apiService.api

View File

@@ -73,10 +73,6 @@ func (api *APIerSv2Service) Start() (err error) {
api.server.RpcRegisterName(utils.ApierV2, api.api)
}
utils.RegisterRpcParams("", &v2.CDRsV2{})
utils.RegisterRpcParams("", api.api)
utils.RegisterRpcParams(utils.ApierV2, api.api)
api.connChan <- api.api
return
}

View File

@@ -91,8 +91,6 @@ func (rals *RalService) Start() (err error) {
rals.server.RpcRegister(rals.rals)
}
utils.RegisterRpcParams(utils.RALsV1, rals.rals)
rals.connChan <- rals.rals
return
}

View File

@@ -68,8 +68,6 @@ func (resp *ResponderService) Start() (err error) {
resp.server.RpcRegister(resp.resp)
}
utils.RegisterRpcParams("", resp.resp)
resp.connChan <- resp.resp // Rater done
return
}

View File

@@ -20,7 +20,9 @@ package utils
import "reflect"
var rpcParamsMap map[string]*RpcParams
var (
rpcParamsMap = make(map[string]*RpcParams)
)
type RpcParams struct {
Object interface{}
@@ -28,10 +30,6 @@ type RpcParams struct {
OutParam interface{}
}
func init() {
rpcParamsMap = make(map[string]*RpcParams)
}
func RegisterRpcParams(name string, obj interface{}) {
objType := reflect.TypeOf(obj)
if name == "" {
@@ -58,10 +56,10 @@ func RegisterRpcParams(name string, obj interface{}) {
}
}
func GetRpcParams(method string) (*RpcParams, error) {
x, found := rpcParamsMap[method]
if !found {
func GetRpcParams(method string) (params *RpcParams, err error) {
var found bool
if params, found = rpcParamsMap[method]; !found {
return nil, ErrNotFound
}
return x, nil
return
}

View File

@@ -1,8 +1,9 @@
package utils
import (
"reflect"
"testing"
"github.com/mitchellh/mapstructure"
)
type RpcStruct struct{}
@@ -35,17 +36,19 @@ func TestRPCObjectPointer(t *testing.T) {
t.Errorf("error getting rpcobject: %v (%+v)", rpcParamsMap, x)
}
a := x.InParam
if v, err := FromMapStringInterfaceValue(map[string]interface{}{"Name": "a", "Surname": "b", "Age": 10.2}, reflect.ValueOf(a)); err != nil || v.(Attr).Name != "a" || v.(Attr).Surname != "b" || v.(Attr).Age != 10.2 {
t.Errorf("error converting to struct: %+v (%v)", v, err)
if err := mapstructure.Decode(map[string]interface{}{"Name": "a", "Surname": "b", "Age": 10.2}, a); err != nil || a.(*Attr).Name != "a" || a.(*Attr).Surname != "b" || a.(*Attr).Age != 10.2 {
t.Errorf("error converting to struct: %+v (%v)", a, err)
}
/*
//TODO: make pointer in arguments usable
/*x, found = rpcParamsMap["RpcStruct.Tropa"]
x, found = rpcParamsMap["RpcStruct.Tropa"]
if !found {
t.Errorf("error getting rpcobject: %v (%+v)", rpcParamsMap, x)
}
b := x.InParam
log.Printf("T: %+v", b)
if v, err := FromMapStringInterfaceValue(map[string]interface{}{"Name": "a", "Surname": "b", "Age": 10.2}, b); err != nil || v.(Attr).Name != "a" || v.(Attr).Surname != "b" || v.(Attr).Age != 10.2 {
t.Errorf("error converting to struct: %+v (%v)", v, err)
}*/
// log.Printf("T: %+v", b)
if err := mapstructure.Decode(map[string]interface{}{"Name": "a", "Surname": "b", "Age": 10.2}, b); err != nil || b.(*Attr).Name != "a" || b.(*Attr).Surname != "b" || b.(*Attr).Age != 10.2 {
t.Errorf("error converting to struct: %+v (%v)", b, err)
}
*/
}

View File

@@ -74,6 +74,7 @@ func (s *Server) SetDispatched() {
}
func (s *Server) RpcRegister(rcvr interface{}) {
RegisterRpcParams(EmptyString, rcvr)
rpc.Register(rcvr)
s.Lock()
s.rpcEnabled = true
@@ -81,6 +82,7 @@ func (s *Server) RpcRegister(rcvr interface{}) {
}
func (s *Server) RpcRegisterName(name string, rcvr interface{}) {
RegisterRpcParams(name, rcvr)
rpc.RegisterName(name, rcvr)
s.Lock()
s.rpcEnabled = true