Multiprocessing functionality¶
This module contains multiprocessing implementation for Environment,
MultiEnvironment.
A MultiEnvironment holds several Environment slaves, which are
spawned on their own processes, and uses managers to obtain much of the same functionality as the single processor
environment. See EnvManager and MultiEnvManager for details.
Warning
This functionality is currently largely untested. However, it seems to work as intended and may be used in
Simulation.
- 
class creamas.mp.EnvManager(environment)[source]¶
- A manager for - Environment, which is a subclass of- aiomas.subproc.Manager.- Managers are used in environments which need to be able to execute commands originating from outside sources, e.g. in slave environments inside a multiprocessing environment. - A manager can spawn other agents into its environment, and can execute other tasks relevant to the environment. The manager should always be the first agent created to the environment. - Note - You should not need to create managers directly, instead pass the desired manager class to an instance of - MultiEnvironmentat its initialization time.- 
create_connections(connection_map)[source]¶
- Create connections for agents in the environment. - This is a managing function for - create_connections().
 - 
get_agents(addr=True, agent_cls=None, as_coro=False)[source]¶
- Get agents from the managed environment. - This is a managing function for the - get_agents(). The returned agent list excludes the environment’s manager agent (i.e. this agent) by design.
 - 
async get_artifacts()[source]¶
- Get all artifacts from the host environment. - Returns
- All the artifacts in the environment. 
 
 - 
get_connections(data=False)[source]¶
- Get connections from the agents in the environment. - This is a managing function for - get_connections().
 - 
async is_ready()[source]¶
- Check if the managed environment is ready. - This is a managing function for - is_ready().
 - 
set_host_manager(addr)[source]¶
- Set host (or master) manager for this manager. - Parameters
- addr – Address for the host manager. 
 
 - 
async spawn_n(agent_cls, n, *args, **kwargs)[source]¶
- Spawn - nagents to the managed environment.- This is a convenience function so that one does not have to repeatedly make connections to the environment to spawn multiple agents with the same parameters. - See - aiomas.subproc.Manager.spawn()for details.
 - 
async trigger_all(*args, **kwargs)[source]¶
- Trigger all agents in the managed environment to act once. - This is a managing function for - trigger_all().
 
- 
- 
class creamas.mp.MultiEnvManager(environment)[source]¶
- A manager for - MultiEnvironment, which is a subclass of- aiomas.subproc.Manager.- A Manager can spawn other agents into its slave environments, and can execute other tasks relevant to the whole environment. The manager should always be the first (and usually only) agent created for the multi-environment’s managing environment. The actual simulation agents should be created to the slave environments, typically using multi-environment’s or its manager’s functionality. - Note - You should not need to create managers directly, instead pass the desired manager class to an instance of - MultiEnvironmentat its initialization time.- 
async create_connections(connection_map)[source]¶
- Create connections for agents in the multi-environment. - This is a managing function for - create_connections().
 - 
async get_agents(addr=True, agent_cls=None)[source]¶
- Get addresses of all agents in all the slave environments. - This is a managing function for - creamas.mp.MultiEnvironment.get_agents().- Note - Since - aiomas.rpc.Proxyobjects do not seem to handle (re)serialization,- addrand- agent_clsparameters are omitted from the call to underlying multi-environment’s- get_agents().- If - aiomas.rpc.Proxyobjects from all the agents are needed, call each slave environment manager’s- get_agents()directly.
 - 
async get_connections(data=True)[source]¶
- Return connections for all the agents in the slave environments. - This is a managing function for - get_connections().
 - 
async get_slave_managers()[source]¶
- Get addresses of the slave environment managers in this multi-environment. 
 - 
handle(msg)[source]¶
- Handle message from a slave manager. - This is a dummy method which should be overridden in a subclass. 
 - 
async is_ready()[source]¶
- A managing function for - is_ready().
 - 
async set_as_host_manager(addr, timeout=5)[source]¶
- Set the this manager as a host manager for the manager in addr. - This is a managing function for - set_host_manager().
 - 
async spawn(agent_cls, *args, addr=None, **kwargs)[source]¶
- Spawn an agent to the environment. - This is a managing function for - spawn().- Note - Since - aiomas.rpc.Proxyobjects do not seem to handle (re)serialization, only the address of the spawned agent is returned.
 - 
async spawn_n(agent_cls, n, *args, addr=None, **kwargs)[source]¶
- Same as - spawn(), but spawn- nagents with same initialization parameters.- This is a managing function for - spawn_n().- Note - Since - aiomas.rpc.Proxyobjects do not seem to handle (re)serialization, only the addresses of the spawned agents are returned.
 - 
async trigger_all(*args, **kwargs)[source]¶
- Trigger all agents in the managed multi-environment to act. - This is a managing function for - trigger_all().
 
- 
async 
- 
class creamas.mp.MultiEnvironment(addr, env_cls, mgr_cls=None, name=None, logger=None, **env_kwargs)[source]¶
- Environment for utilizing multiple processes (and cores) on a single machine. - MultiEnvironmentis not a subclass of- creamas.core.environment.Environment.- MultiEnvironmenthas a master environment, typically containing only a single manager, and a set of slave environments each having their own manager and (once spawned) the actual agents.- Order of usage is: - import aiomas from creamas Environment from creamas.mp import EnvManager, MultiEnvironment from creamas.util import run # Create the multi-environment and the environment used to connect to # slave environments addr = ('localhost', 5555) env_cls = Environment env_kwargs = {'codec': aiomas.MsgPack} menv = MultiEnvironment(addr, env_cls, **env_kwargs) # Define slave environments and their arguments slave_addrs = [('localhost', 5556), ('localhost', 5557)] slave_env_cls = Environment slave_mgr_cls = EnvManager n_slaves = 2 slave_kwargs = [{'codec': aiomas.MsgPack} for _ in range(n_slaves)] # Spawn the actual slave environments menv.spawn_slaves(slave_addrs, slave_env_cls, slave_mgr_cls, slave_kwargs) # Wait that all the slaves are ready, if you need to do some other # preparation before environments' return True for is_ready, then # change check_ready=False run(menv.wait_slaves(10, check_ready=True)) # Do any additional preparations here, like spawning the agents to # slave environments. # Trigger all agents to act run(menv.trigger_all()) # Destroy the environment to free the resources menv.destroy(as_coro=False) - Parameters
- addr – - (HOST, PORT)address for the master environment.
- env_cls – Class for the master environment, used to make connections to the slave environments. Must be a subclass of - Environment.
- mgr_cls – Class for the multi-environment’s manager. 
- name (str) – Name of the environment. Will be shown in logs. 
- logger – Optional. Logger for the master environment. 
 
 - 
add_artifact(artifact)[source]¶
- Add an artifact to the environment. - Parameters
- artifact (object) – Artifact to be added. 
 
 - 
add_artifacts(artifacts)[source]¶
- Add artifacts to - artifacts.- Parameters
- artifacts – list of - Artifactobjects
 
 - 
check_ready()[source]¶
- Check if this multi-environment itself is ready. - Override in subclass if it needs any additional (asynchronous) initialization other than spawning its slave environments. - Return type
- Returns
- This basic implementation returns always True. 
 
 - 
close(folder=None, as_coro=False)[source]¶
- Close the multiprocessing environment and its slave environments. 
 - 
create_connections(connection_map, as_coro=False)[source]¶
- Create agent connections from the given connection map. - Parameters
- connection_map (dict) – A map of connections to be created. Dictionary where keys are agent addresses and values are lists of (addr, data)-tuples suitable for - add_connections().
- as_coro (bool) – If - Truereturns a coroutine, otherwise runs the asynchronous calls to the slave environment managers in the event loop.
 
 - Only the connections for the agents that are in the slave environments are created. 
 - 
destroy(folder=None, as_coro=False)[source]¶
- Close the multiprocessing environment and its slave environments. - Deprecated since version 0.4.0: Use - close()instead
 - 
get_agents(addr=True, agent_cls=None, as_coro=False)[source]¶
- Get agents from the slave environments. - Parameters
- Returns
- A coroutine or list of - Proxyobjects or addresses as specified by the input parameters.
 - Slave environment managers are excluded from the returned list by default. Essentially, this method calls each slave environment manager’s - creamas.mp.EnvManager.get_agents()asynchronously.- Note - Calling each slave environment’s manager might be costly in some situations. Therefore, it is advisable to store the returned agent list if the agent sets in the slave environments are not bound to change. 
 - 
get_artifacts(agent_name=None)[source]¶
- Get all artifacts or all artifacts published by a specific agent. 
 - 
get_connections(data=True, as_coro=False)[source]¶
- Return connections from all the agents in the slave environments. 
 - 
async is_ready()[source]¶
- Check if the multi-environment has been fully initialized. - This calls each slave environment managers’ - is_ready()and checks if the multi-environment itself is ready by calling- check_ready().
 - 
save_info(folder, *args, **kwargs)[source]¶
- Save information accumulated during the environment’s lifetime. - Called from - destroy(). Override in subclass.- Parameters
- folder (str) – root folder to save information 
 
 - 
async set_host_manager(addr, timeout=5)[source]¶
- Set this multi-environment’s manager as the host manager for a manager agent in addr 
 - 
async set_host_managers(timeout=5)[source]¶
- Set the master environment’s manager as host manager for the slave environment managers. - Parameters
- timeout (int) – Timeout for connecting to the slave managers. 
 - This enables the slave environment managers to communicate back to the master environment. The master environment manager, - manager, must be an instance of- MultiEnvManageror its subclass if this method is called.
 - 
async spawn(agent_cls, *args, addr=None, **kwargs)[source]¶
- Spawn a new agent in a slave environment. - Parameters
- agent_cls (str) – qualname` of the agent class. That is, the name should be in the form - pkg.mod:cls, e.g.- creamas.core.agent:CreativeAgent.
- addr (str) – Optional. Address for the slave enviroment’s manager. If - addris None, spawns the agent in the slave environment with currently smallest number of agents.
 
- Returns
- aiomas.rpc.Proxyand address for the created agent.
 - The - *argsand- **kwargsare passed down to the agent’s- __init__().- Note - Use - spawn_n()to spawn large number of agents with identical initialization parameters.
 - 
async spawn_n(agent_cls, n, *args, addr=None, **kwargs)[source]¶
- Same as - spawn(), but allows spawning multiple agents with the same initialization parameters simultaneously into one slave environment.- Parameters
- agent_cls (str) – - qualnameof the agent class. That is, the name should be in the form of- pkg.mod:cls, e.g.- creamas.core.agent:CreativeAgent.
- n (int) – Number of agents to spawn 
- addr (str) – Optional. Address for the slave enviroment’s manager. If - addris None, spawns the agents in the slave environment with currently smallest number of agents.
 
- Returns
- A list of ( - aiomas.rpc.Proxy, address)-tuples for the spawned agents.
 - The - *argsand- **kwargsare passed down to each agent’s- __init__().
 - 
async spawn_slaves(slave_addrs, slave_env_cls, slave_mgr_cls, slave_kwargs=None)[source]¶
- Spawn slave environments. - Parameters
- slave_addrs – List of (HOST, PORT) addresses for the slave-environments. 
- slave_env_cls – Class for the slave environments. 
- slave_kwargs – If not None, must be a list of the same size as addrs. Each item in the list containing parameter values for one slave environment. 
- slave_mgr_cls – Class of the slave environment managers. 
 
 
 - 
async stop_slaves(timeout=1)[source]¶
- Stop all the slaves by sending a stop-message to their managers. - Parameters
- timeout (int) – Timeout for connecting to each manager. If a connection can not be made before the timeout expires, the resulting error for that particular manager is logged, but the stopping of other managers is not halted. 
 
 - 
async trigger_act(addr)[source]¶
- Trigger agent in - addrto act.- This method is quite inefficient if used repeatedly for a large number of agents. 
 - 
async trigger_all(*args, **kwargs)[source]¶
- Trigger all agents in all the slave environments to - act()asynchronously.- Given arguments and keyword arguments are passed down to each agent’s - creamas.core.agent.CreativeAgent.act().- Note - By design, the manager agents in each slave environment, i.e. - manager, are excluded from acting.
 - 
async wait_slaves(timeout, check_ready=False)[source]¶
- Wait until all slaves are online (their managers accept connections) or timeout expires. - Parameters
- timeout (int) – Timeout (in seconds) after which the method will return even though all the slaves are not online yet. 
- check_ready (bool) – If - Truealso checks if all slave environment’s are ready. A slave environment is assumed to be ready when its manager’s- is_ready()-method returns- True.
 
 
 - 
property addrs¶
- Addresses of the slave environment managers. 
 - 
property artifacts¶
- Published artifacts for all agents. 
 - 
property env¶
- The environment hosting the manager of this multi-environment. - This environment is also used without the manager to connect to the slave environment managers and communicate with them. 
 - 
property logger¶
- Logger for the environment. - Logger should have at least - log()method which takes two arguments: a log level and a message.
 - 
property manager¶
- This multi-environment’s master manager. 
 - 
property name¶
- The name of the environment. 
 
- 
creamas.mp.set_base_timeout(timeout)[source]¶
- Set base timeout (in seconds) for the rpc calls originating from the instances in this module. 
- 
creamas.mp.spawn_container(addr, env_cls=<class 'creamas.core.environment.Environment'>, mgr_cls=<class 'creamas.mp.EnvManager'>, set_seed=True, *args, **kwargs)[source]¶
- Spawn a new environment in a given address as a coroutine. - Arguments and keyword arguments are passed down to the created environment at initialization time. - If setproctitle is installed, this function renames the title of the process to start with ‘creamas’ so that the process is easily identifiable, e.g. with - ps -x | grep creamas.
- 
creamas.mp.spawn_containers(addrs, env_cls=<class 'creamas.core.environment.Environment'>, env_params=None, mgr_cls=<class 'creamas.mp.EnvManager'>, *args, **kwargs)[source]¶
- Spawn environments in a multiprocessing - multiprocessing.Pool.- Arguments and keyword arguments are passed down to the created environments at initialization time if env_params is None. If env_params is not None, then it is assumed to contain individual initialization parameters for each environment in addrs. - Parameters
- addrs – List of (HOST, PORT) addresses for the environments. 
- env_cls – Callable for the environments. Must be a subclass of - Environment.
- env_params (Iterable of same length as addrs or None.) – Initialization parameters for the environments. 
- mgr_cls – Callable for the managers. Must be a subclass of - EnvManager.s
 
- Returns
- The created process pool and the ApplyAsync results for the spawned environments. 
 
- 
async creamas.mp.start(addr, env_cls, mgr_cls, *env_args, **env_kwargs)[source]¶
- Coroutine that starts an environment with - mgr_clsmanager agent.- The agent will connect to addr - ('host', port)and wait for commands to spawn new agents within its environment.- The env_args and env_kwargs will be passed to - env_cls.create()factory function.- This coroutine finishes after manager’s - stop()was called or when a- KeyboardInterruptis raised and calls- env_cls.destroy()before it finishes.- Parameters
- addr – (HOST, PORT) for the new environment 
- env_cls – Class of the environment, subclass of - Environment.
- mgr_cls – Class of the manager agent, subclass of - EnvManager.