Adding async call_url action

This commit is contained in:
DanB
2014-01-21 13:03:47 +01:00
parent 2896053199
commit 080395e7ea
4 changed files with 54 additions and 9 deletions

View File

@@ -976,7 +976,7 @@ func TestApierSetActions(t *testing.T) {
}
}
func TestApierSetActionTimings(t *testing.T) {
func TestApierSetActionPlan(t *testing.T) {
if !*testLocal {
return
}
@@ -1020,7 +1020,7 @@ func TestApierAddTriggeredAction(t *testing.T) {
// Test here AddTriggeredAction
// Test here GetAccountActionTriggers
func TestApierGetAccountActionTriggers(t *testing.T) {
if !*testLocal {
return
@@ -1034,6 +1034,7 @@ func TestApierGetAccountActionTriggers(t *testing.T) {
}
}
// Test here RemAccountActionTriggers
func TestApierRemAccountActionTriggers(t *testing.T) {
if !*testLocal {
@@ -1103,6 +1104,7 @@ func TestApierGetAccountActionPlan(t *testing.T) {
}
}
// Test here RemActionTiming
func TestApierRemActionTiming(t *testing.T) {
if !*testLocal {
@@ -1163,6 +1165,26 @@ func TestApierGetBalance(t *testing.T) {
}
}
// Start with initial balance, top-up to test max_balance
func TestTriggersExecute(t *testing.T) {
if !*testLocal {
return
}
reply := ""
attrs := &AttrSetAccount{Tenant: "cgrates.org", Direction: "*out", Account: "dan8", Type: "*prepaid"}
if err := rater.Call("ApierV1.SetAccount", attrs, &reply); err != nil {
t.Error("Got error on ApierV1.SetAccount: ", err.Error())
} else if reply != "OK" {
t.Errorf("Calling ApierV1.SetAccount received: %s", reply)
}
attrAddBlnc := &AttrAddBalance{Tenant: "cgrates.org", Account: "1008", BalanceId: "*monetary", Direction: "*out", Value: 2}
if err := rater.Call("ApierV1.AddBalance", attrAddBlnc, &reply); err != nil {
t.Error("Got error on ApierV1.AddBalance: ", err.Error())
} else if reply != "OK" {
t.Errorf("Calling ApierV1.AddBalance received: %s", reply)
}
}
// Test here LoadTariffPlanFromFolder
func TestApierLoadTariffPlanFromFolder(t *testing.T) {
if !*testLocal {

View File

@@ -24,6 +24,7 @@ import (
"fmt"
"net/http"
"sort"
"time"
)
/*
@@ -53,6 +54,7 @@ const (
RESET_COUNTER = "*reset_counter"
RESET_COUNTERS = "*reset_counters"
CALL_URL = "*call_url"
CALL_URL_ASYNC = "*call_url_async"
UNLIMITED = "*unlimited"
)
@@ -84,6 +86,8 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
return resetCountersAction, true
case CALL_URL:
return callUrl, true
case CALL_URL_ASYNC:
return callUrlAsync, true
}
return nil, false
}
@@ -186,6 +190,25 @@ func callUrl(ub *UserBalance, a *Action) error {
return err
}
// Does not block for posts, no error reports
func callUrlAsync(ub *UserBalance, a *Action) error {
body, err := json.Marshal(ub)
if err != nil {
return err
}
ubMarshal,_ := json.Marshal(ub)
go func() {
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if _, err = http.Post(a.ExtraParameters, "application/json", bytes.NewBuffer(body)); err == nil {
break // Success, no need to reinterate
}
time.Sleep(time.Duration(i) * time.Minute)
}
}()
return nil
}
// Structure to store actions according to weight
type Actions []*Action

View File

@@ -144,7 +144,7 @@ func (csvr *CSVReader) ShowStatistics() {
// actions
log.Print("Actions: ", len(csvr.actions))
// action timings
log.Print("Action timings: ", len(csvr.actionsTimings))
log.Print("Action plans: ", len(csvr.actionsTimings))
// account actions
log.Print("Account actions: ", len(csvr.accountActions))
}
@@ -195,7 +195,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
}
}
if verbose {
log.Print("Action timings")
log.Print("Action plans")
}
for k, ats := range csvr.actionsTimings {
err = accountingStorage.SetActionTimings(k, ats)
@@ -502,7 +502,7 @@ func (csvr *CSVReader) LoadActions() (err error) {
func (csvr *CSVReader) LoadActionTimings() (err error) {
csvReader, fp, err := csvr.readerFunc(csvr.actiontimingsFn, csvr.sep, utils.ACTION_PLANS_NRCOLS)
if err != nil {
log.Print("Could not load action timings file: ", err)
log.Print("Could not load action plans file: ", err)
// allow writing of the other values
return nil
}
@@ -513,11 +513,11 @@ func (csvr *CSVReader) LoadActionTimings() (err error) {
tag := record[0]
_, exists := csvr.actions[record[1]]
if !exists {
return errors.New(fmt.Sprintf("ActionTiming: Could not load the action for tag: %v", record[1]))
return errors.New(fmt.Sprintf("ActionPlan: Could not load the action for tag: %v", record[1]))
}
t, exists := csvr.timings[record[2]]
if !exists {
return errors.New(fmt.Sprintf("ActionTiming: Could not load the timing for tag: %v", record[2]))
return errors.New(fmt.Sprintf("ActionPlan: Could not load the timing for tag: %v", record[2]))
}
weight, err := strconv.ParseFloat(record[3], 64)
if err != nil {

View File

@@ -104,7 +104,7 @@ func (dbr *DbReader) ShowStatistics() {
// actions
log.Print("Actions: ", len(dbr.actions))
// action timings
log.Print("Action timings: ", len(dbr.actionsTimings))
log.Print("Action plans: ", len(dbr.actionsTimings))
// account actions
log.Print("Account actions: ", len(dbr.accountActions))
}
@@ -151,7 +151,7 @@ func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) {
}
}
if verbose {
log.Print("Action timings")
log.Print("Action plans")
}
for k, ats := range dbr.actionsTimings {
err = accountingStorage.SetActionTimings(k, ats)