From 81d8715463dd8f81fdfa7b6aedc0973b735b611f Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 8 Apr 2016 13:42:28 +0300 Subject: [PATCH 1/8] started *cgr_rpc action --- engine/action.go | 19 +++++++++++++++++++ engine/actions_test.go | 23 +++++++++++++++++++---- utils/struct.go | 23 +++++++++++++++++++++++ utils/struct_test.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 90 insertions(+), 4 deletions(-) diff --git a/engine/action.go b/engine/action.go index 62ca439dd..3aedbf153 100644 --- a/engine/action.go +++ b/engine/action.go @@ -74,6 +74,7 @@ const ( CDRLOG = "*cdrlog" SET_DDESTINATIONS = "*set_ddestinations" TRANSFER_MONETARY_DEFAULT = "*transfer_monetary_default" + CGR_RPC = "*cgr_rpc" ) func (a *Action) Clone() *Action { @@ -140,6 +141,8 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { return setBalanceAction, true case TRANSFER_MONETARY_DEFAULT: return transferMonetaryDefaultAction, true + case CGR_RPC: + return cgrRPCAction, true } return nil, false } @@ -647,6 +650,22 @@ func transferMonetaryDefaultAction(acc *Account, sq *StatsQueueTriggered, a *Act return nil } +type RPCRequest struct { + Server string + Transport string + Attempts int + Async bool + Arg map[string]interface{} +} + +func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error { + rpcRequest := RPCRequest{} + if err := json.Unmarshal([]byte(a.ExtraParameters), &rpcRequest); err != nil { + return err + } + return nil +} + // Structure to store actions according to weight type Actions []*Action diff --git a/engine/actions_test.go b/engine/actions_test.go index bead47620..9b3dc8f22 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -2102,6 +2102,8 @@ func TestActionCdrlogBalanceValue(t *testing.T) { ID: "cgrates.org:bv", BalanceMap: map[string]Balances{ utils.MONETARY: Balances{&Balance{ + ID: "*default", + Uuid: "25a02c82-f09f-4c6e-bacf-8ed4b076475a", Value: 10, }}, }, @@ -2114,16 +2116,29 @@ func TestActionCdrlogBalanceValue(t *testing.T) { Timing: &RateInterval{}, actions: []*Action{ &Action{ + Id: "RECUR_FOR_V3HSILLMILLD1G", ActionType: TOPUP, - Balance: &BalanceFilter{Value: utils.Float64Pointer(1.1), Type: utils.StringPointer(utils.MONETARY)}, + Balance: &BalanceFilter{ + ID: utils.StringPointer("*default"), + Uuid: utils.StringPointer("25a02c82-f09f-4c6e-bacf-8ed4b076475a"), + Value: utils.Float64Pointer(1.1), + Type: utils.StringPointer(utils.MONETARY), + }, }, &Action{ + Id: "RECUR_FOR_V3HSILLMILLD5G", ActionType: DEBIT, - Balance: &BalanceFilter{Value: utils.Float64Pointer(2.1), Type: utils.StringPointer(utils.MONETARY)}, + Balance: &BalanceFilter{ + ID: utils.StringPointer("*default"), + Uuid: utils.StringPointer("25a02c82-f09f-4c6e-bacf-8ed4b076475a"), + Value: utils.Float64Pointer(2.1), + Type: utils.StringPointer(utils.MONETARY), + }, }, &Action{ + Id: "c", ActionType: CDRLOG, - ExtraParameters: `{"BalanceValue":"BalanceValue"}`, + ExtraParameters: `{"BalanceID":"BalanceID","BalanceUUID":"BalanceUUID","ActionID":"ActionID","BalanceValue":"BalanceValue"}`, }, }, } @@ -2140,7 +2155,7 @@ func TestActionCdrlogBalanceValue(t *testing.T) { if len(cdrs) != 2 || cdrs[0].ExtraFields["BalanceValue"] != "11.1" || cdrs[1].ExtraFields["BalanceValue"] != "9" { - t.Errorf("Wrong cdrlogs: %+v", cdrs[1]) + t.Errorf("Wrong cdrlogs: %", utils.ToIJSON(cdrs)) } } diff --git a/utils/struct.go b/utils/struct.go index 0b16caeef..3da80ec84 100644 --- a/utils/struct.go +++ b/utils/struct.go @@ -18,6 +18,7 @@ along with this program. If not, see package utils import ( + "errors" "reflect" "strconv" "strings" @@ -171,6 +172,28 @@ func FromMapStringString(m map[string]string, in interface{}) { return } +func FromMapStringInterface(m map[string]interface{}, in interface{}) error { + v := reflect.ValueOf(in) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + for fieldName, fieldValue := range m { + field := v.FieldByName(fieldName) + if field.IsValid() { + if !field.IsValid() || !field.CanSet() { + continue + } + structFieldType := field.Type() + val := reflect.ValueOf(fieldValue) + if structFieldType != val.Type() { + return errors.New("Provided value type didn't match obj field type") + } + field.Set(val) + } + } + return nil +} + // Update struct with map fields, returns not matching map keys, s is a struct to be updated func UpdateStructWithStrMap(s interface{}, m map[string]string) []string { notMatched := []string{} diff --git a/utils/struct_test.go b/utils/struct_test.go index 50371ea36..74450a7cb 100644 --- a/utils/struct_test.go +++ b/utils/struct_test.go @@ -84,3 +84,32 @@ func TestStructExtraFields(t *testing.T) { t.Errorf("expected: %v got: %v", ts.ExtraFields, efMap) } } + +func TestStructFromMapStringInterface(t *testing.T) { + ts := &struct { + Name string + Class *string + List []string + Elements struct { + Type string + Value float64 + } + }{} + s := "test2" + m := map[string]interface{}{ + "Name": "test1", + "Class": &s, + "List": []string{"test3", "test4"}, + "Elements": struct { + Type string + Value float64 + }{ + Type: "test5", + Value: 9.8, + }, + } + if err := FromMapStringInterface(m, ts); err != nil { + t.Logf("ts: %+v", ToJSON(ts)) + t.Error("Error converting map to struct: ", err) + } +} From c1bc21b996e67f23c051e5e44f329ad4f5bc0d56 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 18 Apr 2016 13:27:57 +0300 Subject: [PATCH 2/8] added method --- engine/action.go | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/action.go b/engine/action.go index 3aedbf153..a7f9cdb5e 100644 --- a/engine/action.go +++ b/engine/action.go @@ -653,6 +653,7 @@ func transferMonetaryDefaultAction(acc *Account, sq *StatsQueueTriggered, a *Act type RPCRequest struct { Server string Transport string + Method string Attempts int Async bool Arg map[string]interface{} From 75a565a6584d8ed2f83d3c2d0d400eb8725c3f2f Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 18 Apr 2016 18:35:42 +0300 Subject: [PATCH 3/8] started cgr_rpc --- utils/rpc_object_test.go | 21 +++++++++++++++++++++ utils/rpc_objects.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 utils/rpc_object_test.go create mode 100644 utils/rpc_objects.go diff --git a/utils/rpc_object_test.go b/utils/rpc_object_test.go new file mode 100644 index 000000000..8cdf368c3 --- /dev/null +++ b/utils/rpc_object_test.go @@ -0,0 +1,21 @@ +package utils + +import "testing" + +type RpcStruct struct{} + +func (rpc *RpcStruct) Hopa(normal string, out *float64) error { + return nil +} + +func TestRPCObjectSimple(t *testing.T) { + + RegisterRpcObject("", &RpcStruct{}) + if len(RpcObjects) != 1 { + t.Errorf("error registering rpc object: %v", RpcObjects) + } + x, found := RpcObjects["RpcStruct.Hopa"] + if found { + t.Errorf("error getting rpcobject: %v (%+v)", RpcObjects, x) + } +} diff --git a/utils/rpc_objects.go b/utils/rpc_objects.go new file mode 100644 index 000000000..6aef9be2e --- /dev/null +++ b/utils/rpc_objects.go @@ -0,0 +1,38 @@ +package utils + +import "reflect" + +var RpcObjects map[string]interface{} + +type RpcObject struct { + Object interface{} + InParam interface{} + OutParam interface{} +} + +func init() { + RpcObjects = make(map[string]interface{}) +} + +func RegisterRpcObject(name string, rpcObject interface{}) { + objType := reflect.TypeOf(rpcObject) + if name == "" { + val := reflect.ValueOf(rpcObject) + name = objType.Name() + if val.Kind() == reflect.Ptr { + name = objType.Elem().Name() + } + } + for i := 0; i < objType.NumMethod(); i++ { + method := objType.Method(i) + methodType := method.Type + if methodType.NumIn() == 3 { // if it has three parameters (one is self and two are rpc params) + RpcObjects[name+"."+method.Name] = &RpcObject{ + Object: objType, + InParam: reflect.New(methodType.In(1)).Elem().Interface(), + OutParam: reflect.New(methodType.In(2)).Elem().Interface(), + } + } + + } +} From d158fcec80f0a028d9c6c8dceb326de4f336910a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 18 Apr 2016 19:07:37 +0300 Subject: [PATCH 4/8] work in proggress --- engine/action.go | 14 ++++++++++---- utils/rpc_object_test.go | 13 ++++++++++--- utils/rpc_objects.go | 4 ++-- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/engine/action.go b/engine/action.go index a7f9cdb5e..4018a7c1c 100644 --- a/engine/action.go +++ b/engine/action.go @@ -32,6 +32,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) /* @@ -651,19 +652,24 @@ func transferMonetaryDefaultAction(acc *Account, sq *StatsQueueTriggered, a *Act } type RPCRequest struct { - Server string + Address string Transport string Method string Attempts int Async bool - Arg map[string]interface{} + Param string } func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error { - rpcRequest := RPCRequest{} - if err := json.Unmarshal([]byte(a.ExtraParameters), &rpcRequest); err != nil { + req := RPCRequest{} + if err := json.Unmarshal([]byte(a.ExtraParameters), &req); err != nil { return err } + client, err := rpcclient.NewRpcClient(req.Method, req.Address, req.Attempts, 0, req.Transport, nil) + if err != nil { + return nil, err + } + client.Call() return nil } diff --git a/utils/rpc_object_test.go b/utils/rpc_object_test.go index 8cdf368c3..f42ea882a 100644 --- a/utils/rpc_object_test.go +++ b/utils/rpc_object_test.go @@ -8,14 +8,21 @@ func (rpc *RpcStruct) Hopa(normal string, out *float64) error { return nil } -func TestRPCObjectSimple(t *testing.T) { +func (rpc *RpcStruct) Tropa(pointer *string, out *float64) error { + return nil +} +func TestRPCObjectPointer(t *testing.T) { RegisterRpcObject("", &RpcStruct{}) - if len(RpcObjects) != 1 { + if len(RpcObjects) != 2 { t.Errorf("error registering rpc object: %v", RpcObjects) } x, found := RpcObjects["RpcStruct.Hopa"] - if found { + if !found { + t.Errorf("error getting rpcobject: %v (%+v)", RpcObjects, x) + } + x, found = RpcObjects["RpcStruct.Tropa"] + if !found { t.Errorf("error getting rpcobject: %v (%+v)", RpcObjects, x) } } diff --git a/utils/rpc_objects.go b/utils/rpc_objects.go index 6aef9be2e..22028d274 100644 --- a/utils/rpc_objects.go +++ b/utils/rpc_objects.go @@ -29,8 +29,8 @@ func RegisterRpcObject(name string, rpcObject interface{}) { if methodType.NumIn() == 3 { // if it has three parameters (one is self and two are rpc params) RpcObjects[name+"."+method.Name] = &RpcObject{ Object: objType, - InParam: reflect.New(methodType.In(1)).Elem().Interface(), - OutParam: reflect.New(methodType.In(2)).Elem().Interface(), + InParam: reflect.New(methodType.In(1)).Interface(), + OutParam: reflect.New(methodType.In(2).Elem()).Interface(), } } From b198d5fc181288c141e57afc06b298ba486373fa Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 18 Apr 2016 23:51:32 +0300 Subject: [PATCH 5/8] first *rpc_cgr tests --- engine/action.go | 22 ++++++++++++++---- engine/actions_test.go | 38 ++++++++++++++++++++++++++++++ utils/rpc_object_test.go | 28 ---------------------- utils/rpc_objects.go | 38 ------------------------------ utils/rpc_params.go | 50 ++++++++++++++++++++++++++++++++++++++++ utils/rpc_params_test.go | 32 +++++++++++++++++++++++++ 6 files changed, 138 insertions(+), 70 deletions(-) delete mode 100644 utils/rpc_object_test.go delete mode 100644 utils/rpc_objects.go create mode 100644 utils/rpc_params.go create mode 100644 utils/rpc_params_test.go diff --git a/engine/action.go b/engine/action.go index 4018a7c1c..0d4e51fe4 100644 --- a/engine/action.go +++ b/engine/action.go @@ -22,6 +22,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "net/smtp" "path" "reflect" @@ -665,12 +666,25 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti if err := json.Unmarshal([]byte(a.ExtraParameters), &req); err != nil { return err } - client, err := rpcclient.NewRpcClient(req.Method, req.Address, req.Attempts, 0, req.Transport, nil) + log.Printf("REQ: %+v", req) + params, err := utils.GetRpcParams(req.Method) if err != nil { - return nil, err + return err } - client.Call() - return nil + var client rpcclient.RpcClientConnection + if req.Address != utils.INTERNAL { + if client, err = rpcclient.NewRpcClient(req.Method, req.Address, req.Attempts, 0, req.Transport, nil); err != nil { + return err + } + } else { + client = params.Object + } + + in, out := params.InParam, params.OutParam + if err := json.Unmarshal([]byte(req.Param), &in); err != nil { + return err + } + return client.Call(req.Method, in, out) } // Structure to store actions according to weight diff --git a/engine/actions_test.go b/engine/actions_test.go index 9b3dc8f22..738a25d72 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -2159,6 +2159,44 @@ func TestActionCdrlogBalanceValue(t *testing.T) { } } +type TestRPCParameters struct { + status string +} + +type Attr struct { + Name string + Surname string + Age float64 +} + +func (trpcp *TestRPCParameters) Hopa(in Attr, out *float64) error { + trpcp.status = utils.OK + return nil +} + +func (trpcp *TestRPCParameters) Call(string, interface{}, interface{}) error { + return nil +} + +func TestCgrRpcAction(t *testing.T) { + trpcp := &TestRPCParameters{} + utils.RegisterRpcParams("", trpcp) + a := &Action{ + ExtraParameters: `{"Address": "internal", + "Transport": "*gob", + "Method": "TestRPCParameters.Hopa", + "Attempts":1, + "Async" :false, + "Param": "{\"Name\":\"n\", \"Surname\":\"s\", \"Age\":10.2}"}`, + } + if err := cgrRPCAction(nil, nil, a, nil); err != nil { + t.Error("error executing cgr action: ", err) + } + if trpcp.status != utils.OK { + t.Error("RPC not called!") + } +} + /**************** Benchmarks ********************************/ func BenchmarkUUID(b *testing.B) { diff --git a/utils/rpc_object_test.go b/utils/rpc_object_test.go deleted file mode 100644 index f42ea882a..000000000 --- a/utils/rpc_object_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package utils - -import "testing" - -type RpcStruct struct{} - -func (rpc *RpcStruct) Hopa(normal string, out *float64) error { - return nil -} - -func (rpc *RpcStruct) Tropa(pointer *string, out *float64) error { - return nil -} - -func TestRPCObjectPointer(t *testing.T) { - RegisterRpcObject("", &RpcStruct{}) - if len(RpcObjects) != 2 { - t.Errorf("error registering rpc object: %v", RpcObjects) - } - x, found := RpcObjects["RpcStruct.Hopa"] - if !found { - t.Errorf("error getting rpcobject: %v (%+v)", RpcObjects, x) - } - x, found = RpcObjects["RpcStruct.Tropa"] - if !found { - t.Errorf("error getting rpcobject: %v (%+v)", RpcObjects, x) - } -} diff --git a/utils/rpc_objects.go b/utils/rpc_objects.go deleted file mode 100644 index 22028d274..000000000 --- a/utils/rpc_objects.go +++ /dev/null @@ -1,38 +0,0 @@ -package utils - -import "reflect" - -var RpcObjects map[string]interface{} - -type RpcObject struct { - Object interface{} - InParam interface{} - OutParam interface{} -} - -func init() { - RpcObjects = make(map[string]interface{}) -} - -func RegisterRpcObject(name string, rpcObject interface{}) { - objType := reflect.TypeOf(rpcObject) - if name == "" { - val := reflect.ValueOf(rpcObject) - name = objType.Name() - if val.Kind() == reflect.Ptr { - name = objType.Elem().Name() - } - } - for i := 0; i < objType.NumMethod(); i++ { - method := objType.Method(i) - methodType := method.Type - if methodType.NumIn() == 3 { // if it has three parameters (one is self and two are rpc params) - RpcObjects[name+"."+method.Name] = &RpcObject{ - Object: objType, - InParam: reflect.New(methodType.In(1)).Interface(), - OutParam: reflect.New(methodType.In(2).Elem()).Interface(), - } - } - - } -} diff --git a/utils/rpc_params.go b/utils/rpc_params.go new file mode 100644 index 000000000..ffd667b98 --- /dev/null +++ b/utils/rpc_params.go @@ -0,0 +1,50 @@ +package utils + +import ( + "reflect" + + "github.com/cgrates/rpcclient" +) + +var rpcParamsMap map[string]*RpcParams + +type RpcParams struct { + Object rpcclient.RpcClientConnection + InParam interface{} + OutParam interface{} +} + +func init() { + rpcParamsMap = make(map[string]*RpcParams) +} + +func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) { + objType := reflect.TypeOf(obj) + if name == "" { + val := reflect.ValueOf(obj) + name = objType.Name() + if val.Kind() == reflect.Ptr { + name = objType.Elem().Name() + } + } + for i := 0; i < objType.NumMethod(); i++ { + method := objType.Method(i) + methodType := method.Type + if methodType.NumIn() == 3 { // if it has three parameters (one is self and two are rpc params) + rpcParamsMap[name+"."+method.Name] = &RpcParams{ + Object: obj, + InParam: reflect.New(methodType.In(1)).Interface(), + OutParam: reflect.New(methodType.In(2).Elem()).Interface(), + } + } + + } +} + +func GetRpcParams(method string) (*RpcParams, error) { + x, found := rpcParamsMap[method] + if !found { + return nil, ErrNotFound + } + return x, nil +} diff --git a/utils/rpc_params_test.go b/utils/rpc_params_test.go new file mode 100644 index 000000000..a500fd3c2 --- /dev/null +++ b/utils/rpc_params_test.go @@ -0,0 +1,32 @@ +package utils + +import "testing" + +type RpcStruct struct{} + +func (rpc *RpcStruct) Hopa(normal string, out *float64) error { + return nil +} + +func (rpc *RpcStruct) Tropa(pointer *string, out *float64) error { + return nil +} + +func (rpc *RpcStruct) Call(string, interface{}, interface{}) error { + return nil +} + +func TestRPCObjectPointer(t *testing.T) { + RegisterRpcParams("", &RpcStruct{}) + if len(rpcParamsMap) != 2 { + t.Errorf("error registering rpc object: %v", rpcParamsMap) + } + x, found := rpcParamsMap["RpcStruct.Hopa"] + if !found { + t.Errorf("error getting rpcobject: %v (%+v)", rpcParamsMap, x) + } + x, found = rpcParamsMap["RpcStruct.Tropa"] + if !found { + t.Errorf("error getting rpcobject: %v (%+v)", rpcParamsMap, x) + } +} From f08524837a5b8c52e5571d92a1f5aa73ea188021 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 19 Apr 2016 09:57:19 +0300 Subject: [PATCH 6/8] testing *cgr_rpc --- engine/action.go | 4 +++- engine/actions_test.go | 29 +++++++++++++++++++++++++++-- utils/rpc_params.go | 2 +- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/engine/action.go b/engine/action.go index 0d4e51fe4..8aeeb5676 100644 --- a/engine/action.go +++ b/engine/action.go @@ -666,7 +666,6 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti if err := json.Unmarshal([]byte(a.ExtraParameters), &req); err != nil { return err } - log.Printf("REQ: %+v", req) params, err := utils.GetRpcParams(req.Method) if err != nil { return err @@ -681,9 +680,12 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti } in, out := params.InParam, params.OutParam + log.Print("IN: ", reflect.TypeOf(in)) + log.Print(req.Param) if err := json.Unmarshal([]byte(req.Param), &in); err != nil { return err } + log.Print("IN: ", reflect.TypeOf(in)) return client.Call(req.Method, in, out) } diff --git a/engine/actions_test.go b/engine/actions_test.go index 738a25d72..02e941e8f 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "reflect" + "strings" "testing" "time" @@ -2174,8 +2175,32 @@ func (trpcp *TestRPCParameters) Hopa(in Attr, out *float64) error { return nil } -func (trpcp *TestRPCParameters) Call(string, interface{}, interface{}) error { - return nil +func (trpcp *TestRPCParameters) Call(serviceMethod string, args interface{}, reply interface{}) error { + parts := strings.Split(serviceMethod, ".") + if len(parts) != 2 { + return utils.ErrNotImplemented + } + // get method + method := reflect.ValueOf(trpcp).MethodByName(parts[1]) + if !method.IsValid() { + return utils.ErrNotImplemented + } + + // construct the params + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} + + ret := method.Call(params) + if len(ret) != 1 { + return utils.ErrServerError + } + if ret[0].Interface() == nil { + return nil + } + err, ok := ret[0].Interface().(error) + if !ok { + return utils.ErrServerError + } + return err } func TestCgrRpcAction(t *testing.T) { diff --git a/utils/rpc_params.go b/utils/rpc_params.go index ffd667b98..21a4aa5f4 100644 --- a/utils/rpc_params.go +++ b/utils/rpc_params.go @@ -33,7 +33,7 @@ func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) { if methodType.NumIn() == 3 { // if it has three parameters (one is self and two are rpc params) rpcParamsMap[name+"."+method.Name] = &RpcParams{ Object: obj, - InParam: reflect.New(methodType.In(1)).Interface(), + InParam: reflect.Zero(methodType.In(1)).Interface(), OutParam: reflect.New(methodType.In(2).Elem()).Interface(), } } From c174605ab8277fd02f14c2ed5720757bb8cb2109 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 19 Apr 2016 18:14:44 +0300 Subject: [PATCH 7/8] passing call test --- engine/action.go | 7 ++----- utils/rpc_params.go | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/engine/action.go b/engine/action.go index 8aeeb5676..35834d55b 100644 --- a/engine/action.go +++ b/engine/action.go @@ -22,7 +22,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "net/smtp" "path" "reflect" @@ -680,12 +679,10 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti } in, out := params.InParam, params.OutParam - log.Print("IN: ", reflect.TypeOf(in)) - log.Print(req.Param) - if err := json.Unmarshal([]byte(req.Param), &in); err != nil { + x := in + if err := json.Unmarshal([]byte(req.Param), &x); err != nil { return err } - log.Print("IN: ", reflect.TypeOf(in)) return client.Call(req.Method, in, out) } diff --git a/utils/rpc_params.go b/utils/rpc_params.go index 21a4aa5f4..9eaecba1a 100644 --- a/utils/rpc_params.go +++ b/utils/rpc_params.go @@ -33,7 +33,7 @@ func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) { if methodType.NumIn() == 3 { // if it has three parameters (one is self and two are rpc params) rpcParamsMap[name+"."+method.Name] = &RpcParams{ Object: obj, - InParam: reflect.Zero(methodType.In(1)).Interface(), + InParam: (reflect.New(methodType.In(1)).Elem()).Interface(), OutParam: reflect.New(methodType.In(2).Elem()).Interface(), } } From da1a9b344cf8c8cfd37248e8db2dda6c88771c27 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 20 Apr 2016 00:39:14 +0300 Subject: [PATCH 8/8] integration test for *cgr_rpc --- apier/v1/apier.go | 5 +++-- apier/v1/apier_local_test.go | 2 +- data/tariffplans/testtp/Actions.csv | 1 + engine/action.go | 20 ++++++++++++++++---- engine/actions_test.go | 2 +- general_tests/tp_it_test.go | 19 ++++++++++++++++++- utils/rpc_params.go | 4 ++-- utils/rpc_params_test.go | 22 +++++++++++++++++++--- utils/struct.go | 21 +++++++++++++++++++++ 9 files changed, 82 insertions(+), 14 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index b6abf47f0..e8eaa63ed 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -103,11 +103,12 @@ func (self *ApierV1) GetRatingPlan(rplnId string, reply *engine.RatingPlan) erro } func (self *ApierV1) ExecuteAction(attr *utils.AttrExecuteAction, reply *string) error { - accID := utils.AccountKey(attr.Tenant, attr.Account) at := &engine.ActionTiming{ ActionsID: attr.ActionsId, } - at.SetAccountIDs(utils.StringMap{accID: true}) + if attr.Tenant != "" && attr.Account != "" { + at.SetAccountIDs(utils.StringMap{utils.AccountKey(attr.Tenant, attr.Account): true}) + } if err := at.Execute(); err != nil { *reply = err.Error() return err diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 5c2504cec..124b6a984 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -1297,7 +1297,7 @@ func TestApierResetDataAfterLoadFromFolder(t *testing.T) { if rcvStats.Destinations != 5 || rcvStats.RatingPlans != 5 || rcvStats.RatingProfiles != 5 || - rcvStats.Actions != 9 || + rcvStats.Actions != 10 || rcvStats.DerivedChargers != 3 { t.Errorf("Calling ApierV1.GetCacheStats received: %+v", rcvStats) } diff --git a/data/tariffplans/testtp/Actions.csv b/data/tariffplans/testtp/Actions.csv index 98a48be14..1525522f2 100644 --- a/data/tariffplans/testtp/Actions.csv +++ b/data/tariffplans/testtp/Actions.csv @@ -9,3 +9,4 @@ TOPUP_DATA_r,*topup,,,,*monetary,*out,,DATA_DEST,,,*unlimited,,5000000,10,false, TOPUP_DATA_r,*topup,,,,*data,*out,,DATA_DEST,datar,,*unlimited,,50000000000,10,false,false,10 TOPUP_VOICE,*topup,,,,*voice,*out,,GERMANY_MOBILE,,,*unlimited,,50000,10,false,false,10 TOPUP_NEG,*topup,,,,*voice,*out,,GERMANY;!GERMANY_MOBILE,*zero1m,,*unlimited,,100,10,false,false,10 +RPC,*cgr_rpc,"{""Address"": ""localhost:2013"",""Transport"":""*gob"",""Method"":""ApierV2.SetAccount"",""Attempts"":1,""Async"" :false,""Param"":{""Account"":""rpc"",""Tenant"":""cgrates.org""}}",,,,,,,,,,,,,,, diff --git a/engine/action.go b/engine/action.go index 35834d55b..84669ff1f 100644 --- a/engine/action.go +++ b/engine/action.go @@ -657,7 +657,7 @@ type RPCRequest struct { Method string Attempts int Async bool - Param string + Param map[string]interface{} } func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error { @@ -677,13 +677,25 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti } else { client = params.Object } + if client == nil { + return utils.ErrServerError + } in, out := params.InParam, params.OutParam - x := in - if err := json.Unmarshal([]byte(req.Param), &x); err != nil { + p, err := utils.FromMapStringInterfaceValue(req.Param, in) + if err != nil { return err } - return client.Call(req.Method, in, out) + if !req.Async { + err = client.Call(req.Method, p, out) + utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %+v err: %v", out, err)) + return err + } + go func() { + err := client.Call(req.Method, p, out) + utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %+v err: %v", out, err)) + }() + return nil } // Structure to store actions according to weight diff --git a/engine/actions_test.go b/engine/actions_test.go index 02e941e8f..f9297d25a 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -2212,7 +2212,7 @@ func TestCgrRpcAction(t *testing.T) { "Method": "TestRPCParameters.Hopa", "Attempts":1, "Async" :false, - "Param": "{\"Name\":\"n\", \"Surname\":\"s\", \"Age\":10.2}"}`, + "Param": {"Name":"n", "Surname":"s", "Age":10.2}}`, } if err := cgrRPCAction(nil, nil, a, nil); err != nil { t.Error("error executing cgr action: ", err) diff --git a/general_tests/tp_it_test.go b/general_tests/tp_it_test.go index cb66dca3e..8165ed19c 100644 --- a/general_tests/tp_it_test.go +++ b/general_tests/tp_it_test.go @@ -214,7 +214,7 @@ func TestTpZeroNegativeCost(t *testing.T) { } else if cc.GetDuration() != 20*time.Second { t.Errorf("Calling Responder.MaxDebit got callcost: %v", utils.ToIJSON(cc)) } - var acnt *engine.Account + var acnt engine.Account attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1013"} if err := tpRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { t.Error("Got error on ApierV2.GetAccount: ", err.Error()) @@ -222,3 +222,20 @@ func TestTpZeroNegativeCost(t *testing.T) { t.Errorf("Calling ApierV2.GetAccount received: %s", utils.ToIJSON(acnt)) } } + +func TestTpExecuteActionCgrRpc(t *testing.T) { + if !*testIntegration { + return + } + var reply string + if err := tpRPC.Call("ApierV2.ExecuteAction", utils.AttrExecuteAction{ActionsId: "RPC"}, &reply); err != nil { + t.Error("Got error on ApierV2.ExecuteAction: ", err.Error()) + } else if reply != utils.OK { + t.Errorf("Calling ExecuteAction got reply: %s", reply) + } + var acnt engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "rpc"} + if err := tpRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error("Got error on ApierV2.GetAccount: ", err.Error()) + } +} diff --git a/utils/rpc_params.go b/utils/rpc_params.go index 9eaecba1a..69020fdec 100644 --- a/utils/rpc_params.go +++ b/utils/rpc_params.go @@ -10,7 +10,7 @@ var rpcParamsMap map[string]*RpcParams type RpcParams struct { Object rpcclient.RpcClientConnection - InParam interface{} + InParam reflect.Value OutParam interface{} } @@ -33,7 +33,7 @@ func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) { if methodType.NumIn() == 3 { // if it has three parameters (one is self and two are rpc params) rpcParamsMap[name+"."+method.Name] = &RpcParams{ Object: obj, - InParam: (reflect.New(methodType.In(1)).Elem()).Interface(), + InParam: reflect.New(methodType.In(1)), OutParam: reflect.New(methodType.In(2).Elem()).Interface(), } } diff --git a/utils/rpc_params_test.go b/utils/rpc_params_test.go index a500fd3c2..aac04ecdc 100644 --- a/utils/rpc_params_test.go +++ b/utils/rpc_params_test.go @@ -4,11 +4,17 @@ import "testing" type RpcStruct struct{} -func (rpc *RpcStruct) Hopa(normal string, out *float64) error { +type Attr struct { + Name string + Surname string + Age float64 +} + +func (rpc *RpcStruct) Hopa(normal Attr, out *float64) error { return nil } -func (rpc *RpcStruct) Tropa(pointer *string, out *float64) error { +func (rpc *RpcStruct) Tropa(pointer *Attr, out *float64) error { return nil } @@ -25,8 +31,18 @@ func TestRPCObjectPointer(t *testing.T) { if !found { t.Errorf("error getting rpcobject: %v (%+v)", rpcParamsMap, x) } - x, found = rpcParamsMap["RpcStruct.Tropa"] + a := x.InParam + if v, err := FromMapStringInterfaceValue(map[string]interface{}{"Name": "a", "Surname": "b", "Age": 10.2}, a); err != nil || v.(Attr).Name != "a" || v.(Attr).Surname != "b" || v.(Attr).Age != 10.2 { + t.Errorf("error converting to struct: %+v (%v)", v, err) + } + //TODO: make pointer in arguments usable + /*x, found = rpcParamsMap["RpcStruct.Tropa"] if !found { t.Errorf("error getting rpcobject: %v (%+v)", rpcParamsMap, x) } + b := x.InParam + log.Printf("T: %+v", b) + if v, err := FromMapStringInterfaceValue(map[string]interface{}{"Name": "a", "Surname": "b", "Age": 10.2}, b); err != nil || v.(Attr).Name != "a" || v.(Attr).Surname != "b" || v.(Attr).Age != 10.2 { + t.Errorf("error converting to struct: %+v (%v)", v, err) + }*/ } diff --git a/utils/struct.go b/utils/struct.go index 3da80ec84..e6f20fff3 100644 --- a/utils/struct.go +++ b/utils/struct.go @@ -194,6 +194,27 @@ func FromMapStringInterface(m map[string]interface{}, in interface{}) error { return nil } +func FromMapStringInterfaceValue(m map[string]interface{}, v reflect.Value) (interface{}, error) { + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + for fieldName, fieldValue := range m { + field := v.FieldByName(fieldName) + if field.IsValid() { + if !field.IsValid() || !field.CanSet() { + continue + } + structFieldType := field.Type() + val := reflect.ValueOf(fieldValue) + if structFieldType != val.Type() { + return nil, errors.New("Provided value type didn't match obj field type") + } + field.Set(val) + } + } + return v.Interface(), nil +} + // Update struct with map fields, returns not matching map keys, s is a struct to be updated func UpdateStructWithStrMap(s interface{}, m map[string]string) []string { notMatched := []string{}