Support RSocket
Skip to main content

File Upload & Download

In this section we will add very basic file upload/download functionality. All files will be stored in memory, and downloadable by all users.

See resulting code on GitHub

Shared

First, define a mimetype which will represent file names in the payloads. This will be used by both server and client, so place it in the shared module:

chat_filename_mimetype = b'chat/file-name'

Server side

Data-classes

Next, we need a place to store the files in memory. Add a dictionary to the ChatData class to store the files. The keys will be the file names, and the values the file content.

from dataclasses import dataclass, field
from typing import Dict

@dataclass(frozen=True)
class ChatData:
...
files: Dict[str, bytes] = field(default_factory=dict)

Helper methods

Next, define a helper method which extracts the filename from the upload/download payload:

from shared import chat_filename_mimetype
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.helpers import utf8_decode

def get_file_name(composite_metadata: CompositeMetadata):
return utf8_decode(composite_metadata.find_by_mimetype(chat_filename_mimetype)[0].content)

This helper uses the find_by_mimetype method of CompositeMetadata to get a list of metadata items with the specified mimetype.

Endpoints

Next, register the request-response endpoints for uploading and downloading files, and for retrieving a list of available files:

from typing import Awaitable

from shared import chat_filename_mimetype
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.extensions.helpers import composite, metadata_item
from rsocket.frame_helpers import ensure_bytes
from rsocket.helpers import create_response
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter
from rsocket.streams.stream_from_generator import StreamFromGenerator

class ChatUserSession:

def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)

...

@router.response('file.upload')
async def upload_file(payload: Payload, composite_metadata: CompositeMetadata) -> Awaitable[Payload]:
chat_data.files[get_file_name(composite_metadata)] = payload.data
return create_response()

@router.response('file.download')
async def download_file(composite_metadata: CompositeMetadata) -> Awaitable[Payload]:
file_name = get_file_name(composite_metadata)
return create_response(chat_data.files[file_name],
composite(metadata_item(ensure_bytes(file_name), chat_filename_mimetype)))

@router.stream('files')
async def get_file_names() -> Publisher:
count = len(chat_data.files)
generator = ((Payload(ensure_bytes(file_name)), index == count) for (index, file_name) in
enumerate(chat_data.files.keys(), 1))
return StreamFromGenerator(lambda: generator)

The upload_file and download_file methods (Lines 20-29) extract the filename from the metadata using the helper method we created, and set and get the file content from the chat_data storage respectively.

In this section we introduce the second argument which can be passed to routed endpoints. If the session is set up to use composite metadata, the composite_metadata parameter will contain a parsed structure of the metadata in the request payload.

Line 36 uses the StreamFromGenerator helper which creates a stream Publisher/Subscription from a generator factory.

The generator must return a tuple of two values for each iteration:

  • Payload instance
  • boolean value denoting if it is the last element in the generator. The argument for the helper class is a method which returns a generator, not the generator itself.

Large file support

In the download_file method (Line 24), even though the frame size limit is 16MB, larger files can be downloaded. To allow this, fragmentation must be enabled. This is done by adding the fragment_size_bytes argument to the RSocketServer instantiation:

from rsocket.rsocket_server import RSocketServer
from rsocket.transports.tcp import TransportTCP

def session(*connection):
RSocketServer(TransportTCP(*connection),
handler_factory=handler_factory,
fragment_size_bytes=1_000_000)

Client side

Methods

On the client side, we will add 3 methods to access the new server functionality:

  • upload
  • download
  • list_files
from typing import List

from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket
from rsocket.extensions.helpers import composite, route, metadata_item
from rsocket.frame_helpers import ensure_bytes
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload

from shared import chat_filename_mimetype

class ChatClient:

async def upload(self, file_name: str, content: bytes):
await self._rsocket.request_response(Payload(content, composite(
route('file.upload'),
metadata_item(ensure_bytes(file_name), chat_filename_mimetype)
)))

async def download(self, file_name: str):
return await self._rsocket.request_response(Payload(
metadata=composite(
route('file.download'),
metadata_item(ensure_bytes(file_name), chat_filename_mimetype)
)))

async def list_files(self) -> List[str]:
request = Payload(metadata=composite(route('files')))
response = await AwaitableRSocket(self._rsocket).request_stream(request)
return list(map(lambda _: utf8_decode(_.data), response))

Lines 13-17 define the upload method. the Payload of the request-response consists of a body with the file's contents, and metadata which contains routing and the filename. To specify the filename a custom mimetype was used chat/file-name. This mime type was used to create a metadata item using the metadata_item method. the composite method was used to combine the two metadata items to the complete metadata of the payload.

Lines 19-24 define the download method. It is similar to the upload method, except for the absence of the payload data, and a different route: 'file.download'.

Lines 26-32 defines the list_files method. Same as the list_channels method in the previous section, it uses the request-stream 'files' endpoint to get a list of files.

Large file support

Same as on the server size, fragmentation must be enabled to allow uploading files larger than 16MB. This is done by adding the fragment_size_bytes argument to the RSocketClient instantiation. Do this for both clients:

from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.helpers import single_transport_provider
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP

async with RSocketClient(single_transport_provider(TransportTCP(*connection1)),
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA,
fragment_size_bytes=1_000_000) as client1:
...

Test the new functionality

We will try out the new functionality with the following code:

async def files_example(user1: ChatClient, user2: ChatClient):
file_contents = b'abcdefg1234567'
file_name = 'file_name_1.txt'

await user1.upload(file_name, file_contents)

print(f'Files: {await user1.list_files()}')

download = await user2.download(file_name)

if download.data != file_contents:
raise Exception('File download failed')
else:
print(f'Downloaded file: {len(download.data)} bytes')

call the files_example method from the main client method.