diff --git a/cmd/cgr-balancer/cgr-balanncer.go b/cmd/cgr-balancer/cgr-balanncer.go index 3aac65896..a522ac1c1 100644 --- a/cmd/cgr-balancer/cgr-balanncer.go +++ b/cmd/cgr-balancer/cgr-balanncer.go @@ -40,7 +40,7 @@ var ( /* The function that gets the information from the raters using balancer. */ -func GetCost(key *timespans.CallDescriptor) (reply *timespans.CallCost) { +func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost) { err := errors.New("") //not nil value for err != nil { client := raterList.Balance() @@ -49,7 +49,7 @@ func GetCost(key *timespans.CallDescriptor) (reply *timespans.CallCost) { time.Sleep(1 * time.Second) // wait one second and retry } else { reply = ×pans.CallCost{} - err = client.Call("Responder.GetCost", *key, reply) + err = client.Call(method, *key, reply) if err != nil { log.Printf("Got en error from rater: %v", err) } @@ -88,7 +88,7 @@ func main() { go listenToJsonRPCRequests() sm := &sessionmanager.FSSessionManager{} - sm.Connect(new(sessionmanager.DirectSessionDelegate), *freeswitchsrv, *freeswitchpass) + sm.Connect(sessionmanager.NewRPCSessionDelegate(), *freeswitchsrv, *freeswitchpass) listenToHttpRequests() } diff --git a/cmd/cgr-balancer/http_responder.go b/cmd/cgr-balancer/http_responder.go index 8390072db..f4393cf7f 100644 --- a/cmd/cgr-balancer/http_responder.go +++ b/cmd/cgr-balancer/http_responder.go @@ -43,7 +43,7 @@ func getCostHandler(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0]} - callCost := GetCost(arg) + callCost := GetCallCost(arg, "Responder.GetCost") enc.Encode(callCost) } diff --git a/cmd/cgr-balancer/jsonrpc_responder.go b/cmd/cgr-balancer/jsonrpc_responder.go index 0b906415c..dc6bc4af0 100644 --- a/cmd/cgr-balancer/jsonrpc_responder.go +++ b/cmd/cgr-balancer/jsonrpc_responder.go @@ -33,7 +33,12 @@ type Responder byte RPC method thet provides the external RPC interface for getting the rating information. */ func (r *Responder) GetCost(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) { - *replay = *GetCost(&arg) + *replay = *GetCallCost(&arg, "Responder.GetCost") + return +} + +func (r *Responder) Debit(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) { + *replay = *GetCallCost(&arg, "Responder.Debit") return } diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go index 16b11229a..149416ad8 100644 --- a/cmd/cgr-console/cgr-console.go +++ b/cmd/cgr-console/cgr-console.go @@ -29,7 +29,7 @@ import ( var ( server = flag.String("server", "127.0.0.1:2001", "server address host:port") - tor = flag.Int("tor", 0, "Type of record") + tor = flag.String("tor", "0", "Type of record") cstmid = flag.String("cstmid", "vdf", "Customer identificator") subject = flag.String("subject", "rif", "The client who made the call") dest = flag.String("dest", "0256", "Destination prefix") @@ -70,6 +70,11 @@ func main() { if err = client.Call("Responder.GetCost", cd, &result); err == nil { fmt.Println(result) } + case "debit": + result := timespans.CallCost{} + if err = client.Call("Responder.Debit", cd, &result); err == nil { + fmt.Println(result) + } case "getmaxsessiontime": var result float64 if err = client.Call("Responder.GetMaxSessionTime", cd, &result); err == nil { diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index 6cd4bb877..060e2db66 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -148,12 +148,11 @@ func main() { getter, err := timespans.NewRedisStorage(*redissrv, *redisdb) defer getter.Close() if err != nil { - log.Printf("Cannot open storage: %v", err) - os.Exit(1) + log.Fatalf("Cannot open storage: %v", err) } if *standalone { sm := &sessionmanager.FSSessionManager{} - sm.Connect(new(sessionmanager.DirectSessionDelegate), *freeswitchsrv, *freeswitchpass) + sm.Connect(sessionmanager.NewDirectSessionDelegate(getter), *freeswitchsrv, *freeswitchpass) } else { go RegisterToServer(balancer, listen) go StopSingnalHandler(balancer, listen, getter) diff --git a/cmd/cgr-sessionmanager/cgr-sessionmanager.go b/cmd/cgr-sessionmanager/cgr-sessionmanager.go index 13499475e..63fc541f2 100644 --- a/cmd/cgr-sessionmanager/cgr-sessionmanager.go +++ b/cmd/cgr-sessionmanager/cgr-sessionmanager.go @@ -21,6 +21,7 @@ package main import ( "flag" "github.com/rif/cgrates/sessionmanager" + "github.com/rif/cgrates/timespans" "log" ) @@ -28,12 +29,19 @@ var ( balancer = flag.String("balancer", "127.0.0.1:2000", "balancer address host:port") freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port") freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port") + redissrv = flag.String("redissrv", "127.0.0.1:6379", "redis address host:port") + redisdb = flag.Int("redisdb", 10, "redis database number") ) func main() { flag.Parse() sm := &sessionmanager.FSSessionManager{} - sm.Connect(new(sessionmanager.DirectSessionDelegate), *freeswitchsrv, *freeswitchpass) + getter, err := timespans.NewRedisStorage(*redissrv, *redisdb) + defer getter.Close() + if err != nil { + log.Fatalf("Cannot open storage: %v", err) + } + sm.Connect(sessionmanager.NewDirectSessionDelegate(getter), *freeswitchsrv, *freeswitchpass) waitChan := make(<-chan byte) log.Print("CGRateS is listening!") <-waitChan diff --git a/sessionmanager/sessiondelegate.go b/sessionmanager/sessiondelegate.go index 4e0333e4e..4ce24363a 100644 --- a/sessionmanager/sessiondelegate.go +++ b/sessionmanager/sessiondelegate.go @@ -22,7 +22,6 @@ import ( "github.com/rif/cgrates/timespans" "log" "net/rpc" - "net/rpc/jsonrpc" "time" ) @@ -30,11 +29,6 @@ const ( DEBIT_PERIOD = 10 * time.Second ) -var ( - // sample storage for the provided direct implementation - storageGetter, _ = timespans.NewRedisStorage("tcp:127.0.0.1:6379", 10) -) - // Interface for the session delegate objects type SessionDelegate interface { // Called on freeswitch's hearbeat event @@ -50,7 +44,13 @@ type SessionDelegate interface { } // Sample SessionDelegate calling the timespans methods directly -type DirectSessionDelegate byte +type DirectSessionDelegate struct { + storageGetter timespans.StorageGetter +} + +func NewDirectSessionDelegate(storageGetter timespans.StorageGetter) *DirectSessionDelegate { + return &DirectSessionDelegate{storageGetter} +} func (dsd *DirectSessionDelegate) OnHeartBeat(ev Event) { log.Print("♥") @@ -103,7 +103,7 @@ func (dsd *DirectSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) DestinationPrefix: lastCC.DestinationPrefix, Amount: -cost, } - cd.SetStorageGetter(storageGetter) + cd.SetStorageGetter(dsd.storageGetter) cd.DebitCents() } if seconds > 0 { @@ -113,7 +113,7 @@ func (dsd *DirectSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) DestinationPrefix: lastCC.DestinationPrefix, Amount: -seconds, } - cd.SetStorageGetter(storageGetter) + cd.SetStorageGetter(dsd.storageGetter) cd.DebitSeconds() } lastCC.Cost -= cost @@ -121,7 +121,7 @@ func (dsd *DirectSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) } func (dsd *DirectSessionDelegate) LoopAction(s *Session, cd *timespans.CallDescriptor) { - cd.SetStorageGetter(storageGetter) + cd.SetStorageGetter(dsd.storageGetter) cc, err := cd.Debit() if err != nil { log.Printf("Could not complete debit opperation: %v", err) @@ -155,11 +155,7 @@ type RPCSessionDelegate struct { client *rpc.Client } -func NewRPCSessionDelegate(host string) (rpc *RPCSessionDelegate) { - client, err := jsonrpc.Dial("tcp", host) - if err != nil { - log.Fatalf("Could not connect to rater server %v!", err) - } +func NewRPCSessionDelegate(client *rpc.Client) (rpc *RPCSessionDelegate) { return &RPCSessionDelegate{client} } @@ -172,7 +168,69 @@ func (rsd *RPCSessionDelegate) OnChannelAnswer(ev Event, s *Session, sm SessionM } func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { - log.Print("rpc hangup") + lastCC := s.CallCosts[len(s.CallCosts)-1] + // put credit back + start := time.Now() + end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd + refoundDuration := end.Sub(start).Seconds() + cost := 0.0 + seconds := 0.0 + log.Printf("Refund duration: %v", refoundDuration) + for i := len(lastCC.Timespans) - 1; i >= 0; i-- { + ts := lastCC.Timespans[i] + tsDuration := ts.GetDuration().Seconds() + if refoundDuration <= tsDuration { + // find procentage + procentage := (refoundDuration * 100) / tsDuration + tmpCost := (procentage * ts.Cost) / 100 + ts.Cost -= tmpCost + cost += tmpCost + if ts.MinuteInfo != nil { + // DestinationPrefix and Price take from lastCC and above caclulus + seconds += (procentage * ts.MinuteInfo.Quantity) / 100 + } + // set the end time to now + ts.TimeEnd = start + break // do not go to other timespans + } else { + cost += ts.Cost + if ts.MinuteInfo != nil { + seconds += ts.MinuteInfo.Quantity + } + // remove the timestamp entirely + lastCC.Timespans = lastCC.Timespans[:i] + // continue to the next timespan with what is left to refound + refoundDuration -= tsDuration + } + } + if cost > 0 { + cd := ×pans.CallDescriptor{TOR: lastCC.TOR, + CstmId: lastCC.CstmId, + Subject: lastCC.CstmId, + DestinationPrefix: lastCC.DestinationPrefix, + Amount: -cost, + } + var response float64 + err := rsd.client.Call("Responder.DebitCents", cd, &response) + if err != nil { + log.Printf("Debit cents failed: %v", err) + } + } + if seconds > 0 { + cd := ×pans.CallDescriptor{TOR: lastCC.TOR, + CstmId: lastCC.CstmId, + Subject: lastCC.CstmId, + DestinationPrefix: lastCC.DestinationPrefix, + Amount: -seconds, + } + var response float64 + err := rsd.client.Call("Responder.DebitSeconds", cd, &response) + if err != nil { + log.Printf("Debit seconds failed: %v", err) + } + } + lastCC.Cost -= cost + log.Printf("Rambursed %v cents, %v seconds", cost, seconds) } func (rsd *RPCSessionDelegate) LoopAction(s *Session, cd *timespans.CallDescriptor) {