robocluster package

robocluster.Device module

Message passing for robocluster.

class robocluster.device.AttributeDict

Bases: dict

A dictionary that allows you to acces entries like you would attributes in an object.

class robocluster.device.Context

Bases: threading.Thread

static instance()
run()
wait()
class robocluster.device.Device(name, group, network=None, context=None)

Bases: robocluster.looper.Looper

A device to interact with the robocluster network.

__init__(name, group, network=None, context=None)

Initialize the device.

Parameters:
  • name (str) – A name to identify the device by.
  • group (str) – Used to select the multicast address. In order for devices to talk to each other, they must be in the same group.
  • network (str) – IPv4 network to broadcast on (default 0.0.0.0/0)
  • loop (asyncio.AbstractEventLoop, optional) – Event loop to use. Defaults to the current event loop.
every(duration)

Create a background task that runs every duration. Equivilent to:

@device.task
async def loop():
    while True:
        # do things
        ...
        device.sleep(duration)
Parameters:duration (str, int) – How long to sleep in between loops. Takes the same form as duration_to_seconds().
name

Name of the device.

on(event, callback=None)

Add a callback for an event.

This function can be used as a decorator or as a standard function. Example:

@device.on('other-device/hello')
async def callback(event, data):
    print(event, data)
Parameters:
  • event (str) – The event name to react to. You can use file globbing syntax to subscribe to multiple events: ‘/heartbeat’, ‘important/
  • port (str, list, optional) – Specify which ports to listen over. Ports are identified by their name as a string. You can also provide a list of port names to listen to multiple ports. Defaults to None, which listens to all ports.
on_request(endpoint, callback=None)

Add a callback for a request.

publish(topic, data)

Publish to topic.

Parameters:
  • topic (str) – The topic, or event name to broadcast.
  • data – Any arbitrary data that can be encoded and sent over the network. For the default json encoding, dictionaries are a good way to package data.
  • port (str, list, optional) – Specify which ports to publish to. Ports are identified by their name as a string. You can also provide a list of port names to publish over multiple ports. Defaults to ‘multicast’.
request(dest, endpoint, *args, **kwargs)

Request data from another device.

Parameters:
  • dest (str) – The device name to request data from.
  • endpoint (str) – The endpoint to request.
  • *args – Arguments passed to the endpoint.
  • **kwargs – Keyword Arguments passed to the endpoint.
Returns:

The return value of the endpoint that you requested.

send(dest, endpoint, data)

Directly send data to another device.

Parameters:
  • dest (str) – The device name to send to.
  • endpoint (str) – The endpoint to send to on destination device
  • data – Any arbitrary data that can be handled by the transport.
start()

Start device.

stop()

Stop device.

storage

Local device storage.

Use this to store arbitrary data that can be accessed from multiple tasks or callbacks.

task(task)

Create a background task.

This is a decorator function and can be used as follows:

@device.task
async def setup_task():
    ...

This would register the setup_task coroutine to be ran by the device when the device is started.

wait()

Wait for device context to exit.

robocluster.device.group_to_port(group)

robocluster.Member module

exception robocluster.member.Error

Bases: Exception

class robocluster.member.Member(name, network, port, key=None, loop=None)

Bases: robocluster.looper.Looper

is_wanted(name)
on_recv(endpoint, callback)
on_request(endpoint, callback)
publish(endpoint, data)
request(peer, endpoint, *args, **kwargs)
send(peer, endpoint, data)
start()
stop()
subscribe(peer, endpoint, callback)
try_peer(peer)
exception robocluster.member.UnknownPeer

Bases: robocluster.member.Error

robocluster.Looper module

class robocluster.looper.Looper(loop=None)

Bases: object

A wrapper for an event loop that allows for a group of daemon tasks.

The daemon tasks can all be started and stopped.

__init__(loop=None)

Initialize the looper.

When loop is omitted, the current event loop is used.

create_daemon(coro, *args, **kwargs)

Add a daemon task and starts it if the looper is started.

Arguments and keyword arguments past coro are passed down to coro when it is started.

create_task(coro, *args, **kwargs)

Create a task in the event loop.

loop
sleep(seconds)

Suspend execution for seconds.

This method is a coroutine.

start()

Start daemon tasks.

stop()

Stop daemon tasks.

robocluster.Net module

class robocluster.net.AsyncSocket(*args, socket=None, loop=None, **kwargs)

Bases: object

Socket wrapper for asyncio.

accept() -> (socket object, address info)

Wait for an incoming connection. Return a new socket representing the connection, and the address of the client. For IP sockets, the address info is a pair (hostaddr, port).

connect(address)

Connect the socket to a remote address. For IP sockets, the address is a pair (host, port).

connect_ex(address) → errno

This is like connect(address), but returns an error code (the errno value) instead of raising an exception when an error occurs.

dup() → socket object

Duplicate the socket. Return a new socket object connected to the same system resource. The new socket is non-inheritable.

classmethod from_socket(socket, loop=None)

Create an AsyncSocket from a normal socket.

sendfile(file[, offset[, count]]) → sent

Send a file until EOF is reached by using high-performance os.sendfile() and return the total number of bytes which were sent. file must be a regular file object opened in binary mode. If os.sendfile() is not available (e.g. Windows) or file is not a regular file socket.send() will be used instead. offset tells from where to start reading the file. If specified, count is the total number of bytes to transmit as opposed to sending the file until EOF is reached. File position is updated on return or also in case of error in which case file.tell() can be used to figure out the number of bytes which were sent. The socket must be of SOCK_STREAM type. Non-blocking sockets are not supported.

robocluster.Util module

Utility functions for robocluster.

robocluster.util.as_coroutine(func)

Convert a function to a coroutine that can be awaited.

Notes: If the function is already a coroutine, it is returned directly.

robocluster.util.debug(msg)

Print msg if DEBUG is True. Usefull for debugging what the Robocluster internals are doing. To enable debug messages include the following in your program that uses devices:

import robocluster.util; robocluster.util.DEBUG = True
robocluster.util.duration_to_seconds(duration)

Convert duration as a string to seconds if needed.

Parameters:duration (str, float) – time in seconds or as text.
Returns:seconds in duration, or -1 if duration is invalid.
Return type:int
Supported units:
  • ‘m’, ‘minute’, ‘minutes’: 60 seconds
  • ‘s’, ‘second’, ‘seconds’: 1 seconds
  • ‘ms’, ‘millisecond’, ‘milliseconds’: 0.001 seconds
robocluster.util.ip_info(addr)

Verify and detecet ip address family.