robocluster package¶
robocluster.Device module¶
Message passing for robocluster.
-
class
robocluster.device.AttributeDict¶ Bases:
dictA dictionary that allows you to acces entries like you would attributes in an object.
-
class
robocluster.device.Device(name, group, network=None, context=None)¶ Bases:
robocluster.looper.LooperA device to interact with the robocluster network.
-
__init__(name, group, network=None, context=None)¶ Initialize the device.
Parameters: - name (str) – A name to identify the device by.
- group (str) – Used to select the multicast address. In order for devices to talk to each other, they must be in the same group.
- network (str) – IPv4 network to broadcast on (default 0.0.0.0/0)
- loop (asyncio.AbstractEventLoop, optional) – Event loop to use. Defaults to the current event loop.
-
every(duration)¶ Create a background task that runs every duration. Equivilent to:
@device.task async def loop(): while True: # do things ... device.sleep(duration)
Parameters: duration (str, int) – How long to sleep in between loops. Takes the same form as duration_to_seconds().
-
name¶ Name of the device.
-
on(event, callback=None)¶ Add a callback for an event.
This function can be used as a decorator or as a standard function. Example:
@device.on('other-device/hello') async def callback(event, data): print(event, data)
Parameters: - event (str) – The event name to react to. You can use file globbing syntax to subscribe to multiple events: ‘/heartbeat’, ‘important/’
- port (str, list, optional) – Specify which ports to listen over. Ports are identified by their name as a string. You can also provide a list of port names to listen to multiple ports. Defaults to None, which listens to all ports.
-
on_request(endpoint, callback=None)¶ Add a callback for a request.
-
publish(topic, data)¶ Publish to topic.
Parameters: - topic (str) – The topic, or event name to broadcast.
- data – Any arbitrary data that can be encoded and sent over the network. For the default json encoding, dictionaries are a good way to package data.
- port (str, list, optional) – Specify which ports to publish to. Ports are identified by their name as a string. You can also provide a list of port names to publish over multiple ports. Defaults to ‘multicast’.
-
request(dest, endpoint, *args, **kwargs)¶ Request data from another device.
Parameters: - dest (str) – The device name to request data from.
- endpoint (str) – The endpoint to request.
- *args – Arguments passed to the endpoint.
- **kwargs – Keyword Arguments passed to the endpoint.
Returns: The return value of the endpoint that you requested.
-
send(dest, endpoint, data)¶ Directly send data to another device.
Parameters: - dest (str) – The device name to send to.
- endpoint (str) – The endpoint to send to on destination device
- data – Any arbitrary data that can be handled by the transport.
-
start()¶ Start device.
-
stop()¶ Stop device.
-
storage¶ Local device storage.
Use this to store arbitrary data that can be accessed from multiple tasks or callbacks.
-
task(task)¶ Create a background task.
This is a decorator function and can be used as follows:
@device.task async def setup_task(): ...
This would register the setup_task coroutine to be ran by the device when the device is started.
-
wait()¶ Wait for device context to exit.
-
-
robocluster.device.group_to_port(group)¶
robocluster.Member module¶
-
exception
robocluster.member.Error¶ Bases:
Exception
-
class
robocluster.member.Member(name, network, port, key=None, loop=None)¶ Bases:
robocluster.looper.Looper-
is_wanted(name)¶
-
on_recv(endpoint, callback)¶
-
on_request(endpoint, callback)¶
-
publish(endpoint, data)¶
-
request(peer, endpoint, *args, **kwargs)¶
-
send(peer, endpoint, data)¶
-
start()¶
-
stop()¶
-
subscribe(peer, endpoint, callback)¶
-
try_peer(peer)¶
-
-
exception
robocluster.member.UnknownPeer¶ Bases:
robocluster.member.Error
robocluster.Looper module¶
-
class
robocluster.looper.Looper(loop=None)¶ Bases:
objectA wrapper for an event loop that allows for a group of daemon tasks.
The daemon tasks can all be started and stopped.
-
__init__(loop=None)¶ Initialize the looper.
When loop is omitted, the current event loop is used.
-
create_daemon(coro, *args, **kwargs)¶ Add a daemon task and starts it if the looper is started.
Arguments and keyword arguments past coro are passed down to coro when it is started.
-
create_task(coro, *args, **kwargs)¶ Create a task in the event loop.
-
loop¶
-
sleep(seconds)¶ Suspend execution for seconds.
This method is a coroutine.
-
start()¶ Start daemon tasks.
-
stop()¶ Stop daemon tasks.
-
robocluster.Net module¶
-
class
robocluster.net.AsyncSocket(*args, socket=None, loop=None, **kwargs)¶ Bases:
objectSocket wrapper for asyncio.
-
accept() -> (socket object, address info)¶ Wait for an incoming connection. Return a new socket representing the connection, and the address of the client. For IP sockets, the address info is a pair (hostaddr, port).
-
connect(address)¶ Connect the socket to a remote address. For IP sockets, the address is a pair (host, port).
-
connect_ex(address) → errno¶ This is like connect(address), but returns an error code (the errno value) instead of raising an exception when an error occurs.
-
dup() → socket object¶ Duplicate the socket. Return a new socket object connected to the same system resource. The new socket is non-inheritable.
-
classmethod
from_socket(socket, loop=None)¶ Create an AsyncSocket from a normal socket.
-
sendfile(file[, offset[, count]]) → sent¶ Send a file until EOF is reached by using high-performance os.sendfile() and return the total number of bytes which were sent. file must be a regular file object opened in binary mode. If os.sendfile() is not available (e.g. Windows) or file is not a regular file socket.send() will be used instead. offset tells from where to start reading the file. If specified, count is the total number of bytes to transmit as opposed to sending the file until EOF is reached. File position is updated on return or also in case of error in which case file.tell() can be used to figure out the number of bytes which were sent. The socket must be of SOCK_STREAM type. Non-blocking sockets are not supported.
-
robocluster.Util module¶
Utility functions for robocluster.
-
robocluster.util.as_coroutine(func)¶ Convert a function to a coroutine that can be awaited.
Notes: If the function is already a coroutine, it is returned directly.
-
robocluster.util.debug(msg)¶ Print msg if DEBUG is True. Usefull for debugging what the Robocluster internals are doing. To enable debug messages include the following in your program that uses devices:
import robocluster.util; robocluster.util.DEBUG = True
-
robocluster.util.duration_to_seconds(duration)¶ Convert duration as a string to seconds if needed.
Parameters: duration (str, float) – time in seconds or as text. Returns: seconds in duration, or -1 if duration is invalid. Return type: int - Supported units:
- ‘m’, ‘minute’, ‘minutes’: 60 seconds
- ‘s’, ‘second’, ‘seconds’: 1 seconds
- ‘ms’, ‘millisecond’, ‘milliseconds’: 0.001 seconds
-
robocluster.util.ip_info(addr)¶ Verify and detecet ip address family.