Support RSocket
Skip to main content

Statistics

As a last step, we will add passing some statistics between the client and the server:

  • The client will be able to send its memory usage to the server.
  • The server will report the number of users and channels. The client will be able to specify which of these statistics it wants.

See resulting code on GitHub

Shared code

We will define some data-classes to represent the payloads being sent between the client and server:

from dataclasses import dataclass, field
from typing import Optional, List

@dataclass(frozen=True)
class ServerStatistics:
user_count: Optional[int] = None
channel_count: Optional[int] = None

@dataclass()
class ServerStatisticsRequest:
ids: Optional[List[str]] = field(default_factory=lambda: ['users', 'channels'])
period_seconds: Optional[int] = field(default_factory=lambda: 2)

@dataclass(frozen=True)
class ClientStatistics:
memory_usage: Optional[int] = None

Lines 4-7 define the data sent to the client upon request. It contains two optional fields, the user count and the channel count.

Lines 9-12 define a request from the client which specified which statistics it wants and how often to report. The ids list represents the two values in the ServerStatistics class.

Lines 14-16 define the statistics sent from the client to the server.

Server side

We will add two endpoints, one for receiving from the client, and one for requesting specific statistics from the server.

Data storage

First we will add a field on the UserSessionData to store the last statistics sent by the client:

from dataclasses import dataclass
from typing import Optional

from shared import ClientStatistics

@dataclass()
class UserSessionData:
...
statistics: Optional[ClientStatistics] = None

We will add a helper method for creating a new statistics response:

def new_statistics_data(statistics_request: ServerStatisticsRequest):
statistics_data = {}

if 'users' in statistics_request.ids:
statistics_data['user_count'] = len(chat_data.user_session_by_id)

if 'channels' in statistics_request.ids:
statistics_data['channel_count'] = len(chat_data.channel_messages)

return ServerStatistics(**statistics_data)

The method will create a new ServerStatistics (Line 10) based on the ServerStatisticsRequest request (Line 1). The client can request statistics about:

  • User count (Lines 4-5)
  • Channel count (Lines 7-8)

Receive statistics

import json

from shared import ClientStatistics
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter

class ChatUserSession:
def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)

...

@router.fire_and_forget('statistics')
async def receive_statistics(statistics: ClientStatistics):
self._session.statistics = statistics

Lines 14-16 defines an endpoint for receiving statistics from the client. It uses the fire-and-forget request type, since this data is not critical to the application. No return value is required from this method, and if provided will be ignored.

Send statistics

Next we define the endpoint for sending statistics to the client:

import asyncio
import json

from shared import ClientStatistics, ServerStatisticsRequest, ServerStatistics, encode_dataclass
from reactivestreams.publisher import DefaultPublisher
from reactivestreams.subscriber import Subscriber, DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter

class ChatUserSession:
def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)

...

@router.channel('statistics')
async def send_statistics(request: ServerStatisticsRequest):

class StatisticsChannel(DefaultPublisher, DefaultSubscriber, DefaultSubscription):

def __init__(self, session: UserSessionData, requested_statistics: ServerStatisticsRequest):
super().__init__()
self._session = session
self._requested_statistics = requested_statistics

def cancel(self):
self._sender.cancel()

def subscribe(self, subscriber: Subscriber):
super().subscribe(subscriber)
subscriber.on_subscribe(self)
self._sender = asyncio.create_task(self._statistics_sender())

async def _statistics_sender(self):
while True:
try:
await asyncio.sleep(self._requested_statistics.period_seconds)
next_message = new_statistics_data(self._requested_statistics)

self._subscriber.on_next(dataclass_to_payload(next_message))
except Exception:
logging.error('Statistics', exc_info=True)

def on_next(self, value: Payload, is_complete=False):
request = ServerStatisticsRequest(**json.loads(utf8_decode(value.data)))

logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')

if request.ids is not None:
self._requested_statistics.ids = request.ids

if request.period_seconds is not None:
self._requested_statistics.period_seconds = request.period_seconds

response = StatisticsChannel(self._session, request)

return response, response

Lines 18-59 defines an endpoint for sending statistics to the client. It uses the request-channel request type, which will allow the client to both receive the server statistics, and update the server as to which statistics it wants to receive.

The handler defined in this endpoint implements both Subscriber and Subscription. This is because it both sends and received.

This handler is similar to the one implemented to deliver messages to users.

In this handler, the _statistics_sender (Lines 36-44) asyncio task is started, and periodically sends server statistics to the client.

To this handler we will add inheriting from DefaultSubscriber. In this interface we implement the on_next method (Lines 46-55), which will consume requests from the client to change the types of statistics the server sends.

Client side

On the client side we will add the methods to access the new server side functionality:

  • send_statistics
  • listen_for_statistics

Send statistics

import resource

from shared import ServerStatistics, ClientStatistics
from rsocket.extensions.helpers import composite, route
from rsocket.payload import Payload

class ChatClient:

async def send_statistics(self):
memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
payload = Payload(encode_dataclass(ClientStatistics(memory_usage=memory_usage)),
metadata=composite(route('statistics')))
await self._rsocket.fire_and_forget(payload)

The send_statistics uses a fire-and-forget request (Line 15) to send statistics to the server. This request does not receive a response, so does not wait for confirmation that the payload was delivered, as it is not critical information (at least for this tutorial).

Receive statistics

Next we will request statistics from the server. First we will define a handler to listen on the channel request and control it:

import json
from asyncio import Event
from datetime import timedelta
from typing import List

from examples.tutorial.step6.models import ServerStatistics, ServerStatisticsRequest, dataclass_to_payload
from reactivestreams.publisher import DefaultPublisher
from reactivestreams.subscriber import DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload

class StatisticsHandler(DefaultPublisher, DefaultSubscriber, DefaultSubscription):

def __init__(self):
super().__init__()
self.done = Event()

def on_next(self, value: Payload, is_complete=False):
statistics = ServerStatistics(**json.loads(utf8_decode(value.data)))
print(statistics)

if is_complete:
self.done.set()

def cancel(self):
self.subscription.cancel()

def set_requested_statistics(self, ids: List[str]):
self._subscriber.on_next(dataclass_to_payload(ServerStatisticsRequest(ids=ids)))

def set_period(self, period: timedelta):
self._subscriber.on_next(
dataclass_to_payload(ServerStatisticsRequest(period_seconds=int(period.total_seconds()))))

This handler is similar to the server side, in that it implements Publisher, Subscriber and Subscription.

Instead of periodically sending messages, as the server did, two methods are added to explicitly send messages:

  • set_requested_statistics (Lines ) is used to change the type of statistics sent
  • set_period (Lines ) is used to change the frequency in which statistics are sent

on_next (Lines ) is implemented to print the statistics received form the server.

cancel (Lines ) can be used to stop receiving the statistics.

Next we will use this new handler in the ChatClient:

from rsocket.extensions.helpers import composite, route
from rsocket.payload import Payload

class ChatClient:

def listen_for_statistics(self, request: ServerStatisticsRequest) -> StatisticsHandler:
self._statistics_subscriber = StatisticsHandler()

response = self._rsocket.request_channel(Payload(
data=encode_dataclass(request),
metadata=composite(route('statistics')
)), publisher=self._statistics_subscriber)

response.subscribe(self._statistics_subscriber)

return self._statistics_subscriber

In Line 7 we instantiate a StatisticsHandler. We save this instance to later communicate over this channel.

Lines 9-12 send the request, and set the publisher as the instance of our handler. This ensures that requests on the handler are sent to the server.

Line 14 then connects the Publisher in the response to the handler, in order to use it to print the received statistics, using the Subscriber functionality of the handler.

Finally on Line 16 we return the handler to allow the application flow to control and interact with the channel.

Test the new functionality

Finally, let's try out this new functionality in the client:

async def statistics_example(user1):
await user1.send_statistics()

statistics_control = user1.listen_for_statistics(
ServerStatisticsRequest(period_seconds=5)
)

await asyncio.sleep(5)

statistics_control.set_requested_statistics(['users'])

await asyncio.sleep(5)

statistics_control.cancel()

Call this new method from the client main method.