robocluster package

robocluster.Device module

Message passing for robocluster.

class robocluster.device.AttributeDict

Bases: dict

A dictionary that allows you to acces entries like you would attributes in an object.

class robocluster.device.Device(name, group, loop=None)

Bases: object

A device to interact with the robocluster network.

__init__(name, group, loop=None)

Initialize the device.

Parameters:
  • name (str) – A name to identify the device by.
  • group (str) – Used to select the multicast address. In order for devices to talk to each other, they must be in the same group.
  • loop (asyncio.AbstractEventLoop, optional) – Event loop to use. Defaults to the current event loop.
every(duration)

Create a background task that runs every duration. Equivilent to:

@device.task
async def loop():
    while True:
        # do things
        ...
        device.sleep(duration)
Parameters:duration (str, int) – How long to sleep in between loops. Takes the same form as duration_to_seconds().
on(event, ports=None)

Add a callback for an event. This is a decorator function and should be applied to a coroutine. The coroutine should take two parameter, event and data, where event is a string that represents the exact event name that triggered the callback, and data is the arbitrary data that sent as part of the message. Example:

@device.on('other-device/hello')
async def callback(event, data):
    print(event, data)
Parameters:
  • event (str) – The event name to react to. You can use file globbing syntax to subscribe to multiple events: ‘/heartbeat’, ‘important/
  • port (str, list, optional) – Specify which ports to listen over. Ports are identified by their name as a string. You can also provide a list of port names to listen to multiple ports. Defaults to None, which listens to all ports.
publish(topic, data, port='multicast')

Publish to topic.

Parameters:
  • topic (str) – The topic, or event name to broadcast.
  • data – Any arbitrary data that can be encoded and sent over the network. For the default json encoding, dictionaries are a good way to package data.
  • port (str, list, optional) – Specify which ports to publish to. Ports are identified by their name as a string. You can also provide a list of port names to publish over multiple ports. Defaults to ‘multicast’.
reply(event, data)

Reply to a request event.

Parameters:
  • event (str) – The event that triggered the request.
  • data – Any arbitrary data that can be encoded and sent over the network. For the default json encoding, dictionaries are a good way to package data.
request(dest, topic)

Request data from another device.

Parameters:
  • dest (str) – The device name to request data from.
  • topic (str) – The topic or event name to request.
Returns:

The data that you requested.

run()

Run device in foreground.

send(dest, topic, data)

Directly send data to another device. If the device does not have a EgressTcpPort for <dest> yet, it will create one before sending.

Parameters:
  • dest (str) – The device name to send to.
  • topic (str) – The topic or event name to send.
  • data – Any arbitrary data that can be encoded and sent over the network. For the default json encoding, dictionaries are a good way to package data.
sleep(duration)

Sleep the device.

Parameters:duration (str, int) – How long to sleep for. Takes the same format as duration_to_seconds().
start()

Start device.

stop()

Stop running device.

storage

Local device storage. Use it to store arbitrary data that can be accessed from multiple tasks or callbacks.

task(task)

Create a background task.

This is a decorator function and can be used as follows:

@device.task
async def setup_task():
    ...

This would register the setup_task coroutine to be ran by the device when the device is started.

transport = 'json'
wait()

Wait until device exits.

robocluster.Net module

class robocluster.net.AsyncSocket(*args, socket=None, loop=None, **kwargs)

Bases: object

Socket wrapper for asyncio.

accept() -> (socket object, address info)

Wait for an incoming connection. Return a new socket representing the connection, and the address of the client. For IP sockets, the address info is a pair (hostaddr, port).

connect(address)

Connect the socket to a remote address. For IP sockets, the address is a pair (host, port).

connect_ex(address) → errno

This is like connect(address), but returns an error code (the errno value) instead of raising an exception when an error occurs.

dup() → socket object

Duplicate the socket. Return a new socket object connected to the same system resource. The new socket is non-inheritable.

classmethod from_socket(socket, loop=None)

Create an AsyncSocket from a normal socket.

sendfile(file[, offset[, count]]) → sent

Send a file until EOF is reached by using high-performance os.sendfile() and return the total number of bytes which were sent. file must be a regular file object opened in binary mode. If os.sendfile() is not available (e.g. Windows) or file is not a regular file socket.send() will be used instead. offset tells from where to start reading the file. If specified, count is the total number of bytes to transmit as opposed to sending the file until EOF is reached. File position is updated on return or also in case of error in which case file.tell() can be used to figure out the number of bytes which were sent. The socket must be of SOCK_STREAM type. Non-blocking sockets are not supported.

robocluster.net.key_to_multicast(key, family='ipv4')

Convert a key to a local multicast group.

robocluster.Ports module

robocluster.Util module

Utility functions for robocluster.

robocluster.util.as_coroutine(func)

Convert a function to a coroutine that can be awaited.

Notes: If the function is already a coroutine, it is returned directly.

robocluster.util.debug(msg)

Print msg if DEBUG is True. Usefull for debugging what the Robocluster internals are doing. To enable debug messages include the following in your program that uses devices:

import robocluster.util; robocluster.util.DEBUG = True
robocluster.util.duration_to_seconds(duration)

Convert duration as a string to seconds if needed.

Parameters:duration (str, float) – time in seconds or as text.
Returns:seconds in duration, or -1 if duration is invalid.
Return type:int
Supported units:
  • ‘m’, ‘minute’, ‘minutes’: 60 seconds
  • ‘s’, ‘second’, ‘seconds’: 1 seconds
  • ‘ms’, ‘millisecond’, ‘milliseconds’: 0.001 seconds