Reference

Permissions shortcuts

resolwe.permissions.shortcuts._group_groups(perm_list)[source]

Group permissions by group.

Input is list of tuples of length 3, where each tuple is in following format:

(<group_id>, <group_name>, <single_permission>)

Permissions are regrouped and returned in such way that there is only one tuple for each group:

(<group_id>, <group_name>, [<first_permission>, <second_permission>,...])
Parameters:perm_list (list) – list of touples of length 3
Returns:list tuples with grouped permissions
Return type:list
resolwe.permissions.shortcuts.get_object_perms(obj, user=None)[source]

Return permissions for given object in Resolwe specific format.

Function returns permissions for given object obj in following format:

{
    "type": "group"/"user"/"public",
    "id": <group_or_user_id>,
    "name": <group_or_user_name>,
    "permissions": [<first_permission>, <second_permission>,...]
}

For public type id and name keys are omitted.

If user parameter is given, permissions are limited only to given user, groups he belongs to and public permissions.

Parameters:
  • obj (a subclass of BaseModel) – Resolwe’s DB model’s instance
  • user (User or None) – Django user
Returns:

list of permissions object in described format

Return type:

list

Permissions utils

resolwe.permissions.utils.copy_permissions(src_obj, dest_obj)[source]

Copy permissions form src_obj to dest_obj.

Flow Managers

Workflow workload managers.

resolwe.flow.managers.manager

The global manager instance.

Type:Manager

Dispatcher

class resolwe.flow.managers.dispatcher.Manager(*args, **kwargs)[source]

The manager handles process job dispatching.

Each Data object that’s still waiting to be resolved is dispatched to a concrete workload management system (such as Celery or SLURM). The specific manager for that system (descended from BaseConnector) then handles actual job setup and submission. The job itself is an executor invocation; the executor then in turn sets up a safe and well-defined environment within the workload manager’s task in which the process is finally run.

communicate(data_id=None, run_sync=False, save_settings=True)[source]

Scan database for resolving Data objects and process them.

This is submitted as a task to the manager’s channel workers.

Parameters:
  • data_id – Optional id of Data object which (+ its children) should be processes. If it is not given, all resolving objects are processed.
  • run_sync – If True, wait until all processes spawned from this point on have finished processing. If no processes are spawned, this results in a deadlock, since counts are handled on process finish.
  • save_settings – If True, save the current Django settings context to the global state. This should never be True for “automatic” calls, such as from Django signals, which can be invoked from inappropriate contexts (such as in the listener). For user code, it should be left at the default value. The saved settings are in effect until the next such call.
discover_engines(executor=None)[source]

Discover configured engines.

Parameters:executor – Optional executor module override
execution_barrier()[source]

Wait for executors to finish.

At least one must finish after this point to avoid a deadlock.

get_execution_engine(name)[source]

Return an execution engine instance.

get_executor()[source]

Return an executor instance.

get_expression_engine(name)[source]

Return an expression engine instance.

handle_control_event(message)[source]

Handle an event from the Channels layer.

Channels layer callback, do not call directly.

load_execution_engines(engines)[source]

Load execution engines.

load_executor(executor_name)[source]

Load process executor.

load_expression_engines(engines)[source]

Load expression engines.

override_settings(**kwargs)[source]

Override global settings within the calling context.

Parameters:kwargs – The settings overrides. Same use as for django.test.override_settings().
reset(keep_state=False)[source]

Reset the shared state and drain Django Channels.

Parameters:keep_state – If True, do not reset the shared manager state (useful in tests, where the settings overrides need to be kept). Defaults to False.
run(data, runtime_dir, argv)[source]

Select a concrete connector and run the process through it.

Parameters:
  • data – The Data object that is to be run.
  • runtime_dir – The directory the executor is run from.
  • argv – The argument vector used to spawn the executor.
class resolwe.flow.managers.dispatcher.SettingsJSONifier(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Customized JSON encoder, coercing all unknown types into strings.

Needed due to the class hierarchy coming out of the database, which can’t be serialized using the vanilla json encoder.

default(o)[source]

Try default; otherwise, coerce the object into a string.

resolwe.flow.managers.dispatcher.dependency_status(data)[source]

Return abstracted status of dependencies.

  • STATUS_ERROR .. one dependency has error status or was deleted
  • STATUS_DONE .. all dependencies have done status
  • None .. other

Workload Connectors

The workload management system connectors are used as glue between the Resolwe Manager and various concrete workload management systems that might be used by it. Since the only functional requirement is job submission, they can be simple and nearly contextless.

Base Class

class resolwe.flow.managers.workload_connectors.base.BaseConnector[source]

The abstract base class for workload manager connectors.

The main Manager instance in manager uses connectors to handle communication with concrete backend workload management systems, such as Celery and SLURM. The connectors need not worry about how jobs are discovered or how they’re prepared for execution; this is all done by the manager.

submit(data, runtime_dir, argv)[source]

Submit the job to the workload management system.

Parameters:
  • data – The Data object that is to be run.
  • runtime_dir – The directory the executor is run from.
  • argv – The argument vector used to spawn the executor.

Local Connector

class resolwe.flow.managers.workload_connectors.local.Connector[source]

Local connector for job execution.

submit(data, runtime_dir, argv)[source]

Run process locally.

For details, see submit().

Celery Connector

class resolwe.flow.managers.workload_connectors.celery.Connector[source]

Celery-based connector for job execution.

submit(data, runtime_dir, argv)[source]

Run process.

For details, see submit().

Slurm Connector

class resolwe.flow.managers.workload_connectors.slurm.Connector[source]

Slurm-based connector for job execution.

submit(data, runtime_dir, argv)[source]

Run process with SLURM.

For details, see submit().

Listener

Standalone Redis client used as a contact point for executors.

class resolwe.flow.managers.listener.ExecutorListener(*args, **kwargs)[source]

The contact point implementation for executors.

check_critical_load()[source]

Check for critical load and log an error if necessary.

clear_queue()[source]

Reset the executor queue channel to an empty state.

handle_abort(obj)[source]

Handle an incoming Data abort processing request.

Important

This only makes manager’s state consistent and doesn’t affect Data object in any way. Any changes to the Data must be applied over handle_update method.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'abort',
    'data_id': [id of the :class:`~resolwe.flow.models.Data` object
               this command was triggered by],
}
handle_finish(obj)[source]

Handle an incoming Data finished processing request.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'finish',
    'data_id': [id of the :class:`~resolwe.flow.models.Data` object
               this command changes],
    'process_rc': [exit status of the processing]
    'spawn_processes': [optional; list of spawn dictionaries],
    'exported_files_mapper': [if spawn_processes present]
}
handle_log(obj)[source]

Handle an incoming log processing request.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'log',
    'message': [log message]
}
handle_update(obj, internal_call=False)[source]

Handle an incoming Data object update request.

Parameters:
  • obj

    The Channels message object. Command object format:

    {
        'command': 'update',
        'data_id': [id of the :class:`~resolwe.flow.models.Data`
                   object this command changes],
        'changeset': {
            [keys to be changed]
        }
    }
    
  • internal_call – If True, this is an internal delegate call, so a reply to the executor won’t be sent.
hydrate_spawned_files(exported_files_mapper, filename, data_id)[source]

Pop the given file’s map from the exported files mapping.

Parameters:
  • exported_files_mapper – The dict of file mappings this process produced.
  • filename – The filename to format and remove from the mapping.
  • data_id – The id of the Data() object owning the mapping.
Returns:

The formatted mapping between the filename and temporary file path.

Return type:

dict

push_stats()[source]

Push current stats to Redis.

run()[source]

Run the main listener run loop.

Doesn’t return until terminate() is called.

terminate()[source]

Stop the standalone manager.

State

Synchronized singleton state container for the manager.

resolwe.flow.managers.state.update_constants()[source]

Recreate channel name constants with changed settings.

This kludge is mostly needed due to the way Django settings are patched for testing and how modules need to be imported throughout the project. On import time, settings are not patched yet, but some of the code needs static values immediately. Updating functions such as this one are then needed to fix dummy values.

class resolwe.flow.managers.state.ManagerState(key_prefix)[source]

State interface implementation.

This holds variables required to be shared between all manager workers and takes care of operation atomiticy and synchronization. Redis facilitates storage shared between workers, whereas atomicity needs to be dealt with explicitly; this interface hides the Redis and Python details required to achieve syntax-transparent atomicity (such as being able to do executor_count += 1, a load-modify-store operation sequence).

Consumer

Manager Channels consumer.

class resolwe.flow.managers.consumer.ManagerConsumer(*args, **kwargs)[source]

Channels consumer for handling manager events.

control_event(message)[source]

Forward control events to the manager dispatcher.

resolwe.flow.managers.consumer.exit_consumer()[source]

Cause the synchronous consumer to exit cleanly.

resolwe.flow.managers.consumer.run_consumer(timeout=None, dry_run=False)[source]

Run the consumer until it finishes processing.

Parameters:
  • timeout – Set maximum execution time before cancellation, or None (default) for unlimited.
  • dry_run – If True, don’t actually dispatch messages, just dequeue them. Defaults to False.
resolwe.flow.managers.consumer.send_event(message)[source]

Construct a Channels event packet with the given message.

Parameters:message – The message to send to the manager workers.

Utilities

Utilities for using global manager features.

resolwe.flow.managers.utils.disable_auto_calls()[source]

Decorator/context manager which stops automatic manager calls.

When entered, automatic communicate() calls from the Django transaction signal are not done.

Flow Executors

Main standalone execution stub, used when the executor is run.

It should be run as a module with one argument: the relative module name of the concrete executor class to use. The current working directory should be where the executors module directory is, so that it can be imported with python’s -m <module> interpreter option.

Usage format:

/path/to/python -m executors .executor_type

Concrete example, run from the directory where ./executors/ is:

/venv/bin/python -m executors .docker

using the python from the venv virtualenv.

Note

The startup code adds the concrete class name as needed, so that in the example above, what’s actually instantiated is .docker.run.FlowExecutor.

Base Class

class resolwe.flow.executors.run.BaseFlowExecutor(*args, **kwargs)[source]

Represents a workflow executor.

end()[source]

End process execution.

get_stdout()[source]

Get process’ standard output.

get_tools_paths()[source]

Get tools paths.

run(data_id, script)[source]

Execute the script and save results.

run_script(script)[source]

Run process script.

start()[source]

Start process execution.

terminate()[source]

Terminate a running script.

update_data_status(**kwargs)[source]

Update (PATCH) Data object.

Parameters:kwargs – The dictionary of Data attributes to be changed.

Flow Executor Preparer

Framework for the manager-resident executor preparation facilities.

class resolwe.flow.executors.prepare.BaseFlowExecutorPreparer[source]

Represents the preparation functionality of the executor.

extend_settings(data_id, files, secrets)[source]

Extend the settings the manager will serialize.

Parameters:
  • data_id – The Data object id being prepared for.
  • files – The settings dictionary to be serialized. Keys are filenames, values are the objects that will be serialized into those files. Standard filenames are listed in resolwe.flow.managers.protocol.ExecutorFiles.
  • secrets – Secret files dictionary describing additional secret file content that should be created and made available to processes with special permissions. Keys are filenames, values are the raw strings that should be written into those files.
get_environment_variables()[source]

Return dict of environment variables that will be added to executor.

get_tools_paths()[source]

Get tools’ paths.

post_register_hook(verbosity=1)[source]

Run hook after the ‘register’ management command finishes.

Subclasses may implement this hook to e.g. pull Docker images at this point. By default, it does nothing.

resolve_data_path(data=None, filename=None)[source]

Resolve data path for use with the executor.

Parameters:
  • data – Data object instance
  • filename – Filename to resolve
Returns:

Resolved filename, which can be used to access the given data file in programs executed using this executor

resolve_upload_path(filename=None)[source]

Resolve upload path for use with the executor.

Parameters:filename – Filename to resolve
Returns:Resolved filename, which can be used to access the given uploaded file in programs executed using this executor

Docker Flow Executor

class resolwe.flow.executors.docker.run.FlowExecutor(*args, **kwargs)[source]

Docker executor.

end()[source]

End process execution.

run_script(script)[source]

Execute the script and save results.

start()[source]

Start process execution.

terminate()[source]

Terminate a running script.

Preparation

class resolwe.flow.executors.docker.prepare.FlowExecutorPreparer[source]

Specialized manager assist for the docker executor.

get_environment_variables()[source]

Return dict of environment variables that will be added to executor.

post_register_hook(verbosity=1)[source]

Pull Docker images needed by processes after registering.

resolve_data_path(data=None, filename=None)[source]

Resolve data path for use with the executor.

Parameters:
  • data – Data object instance
  • filename – Filename to resolve
Returns:

Resolved filename, which can be used to access the given data file in programs executed using this executor

resolve_upload_path(filename=None)[source]

Resolve upload path for use with the executor.

Parameters:filename – Filename to resolve
Returns:Resolved filename, which can be used to access the given uploaded file in programs executed using this executor

Local Flow Executor

class resolwe.flow.executors.local.run.FlowExecutor(*args, **kwargs)[source]

Local dataflow executor proxy.

Preparation

class resolwe.flow.executors.local.prepare.FlowExecutorPreparer[source]

Specialized manager assist for the local executor.

extend_settings(data_id, files, secrets)[source]

Prevent processes requiring access to secrets from being run.

Null Flow Executor

class resolwe.flow.executors.null.run.FlowExecutor(*args, **kwargs)[source]

Null dataflow executor proxy.

This executor is intended to be used in tests where you want to save the object to the database but don’t need to run it.

Flow Models

Base Model

Base model for all other models.

class resolwe.flow.models.base.BaseModel(*args, **kwargs)[source]

Abstract model that includes common fields for other models.

class Meta[source]

BaseModel Meta options.

contributor

user that created the entry

created

creation date and time

modified

modified date and time

name

object name

save(*args, **kwargs)[source]

Save the model.

slug

URL slug

version

process version

Collection Model

Postgres ORM model for the organization of collections.

class resolwe.flow.models.collection.BaseCollection(*args, **kwargs)[source]

Template for Postgres model for storing a collection.

class Meta[source]

BaseCollection Meta options.

description

detailed description

descriptor

collection descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

collection descriptor schema

save(*args, **kwargs)[source]

Perform descriptor validation and save object.

tags

tags for categorizing objects

class resolwe.flow.models.Collection(*args, **kwargs)[source]

Postgres model for storing a collection.

duplicate(contributor=None)[source]

Duplicate (make a copy).

duplicated

duplication date and time

is_duplicate()[source]

Return True if collection is a duplicate.

objects = <django.db.models.manager.ManagerFromCollectionQuerySet object>

manager

Data model

Postgres ORM model for keeping the data structured.

class resolwe.flow.models.Data(*args, **kwargs)[source]

Postgres model for storing data.

STATUS_DIRTY = 'DR'

data object is in dirty state

STATUS_DONE = 'OK'

data object is done

STATUS_ERROR = 'ER'

data object is in error state

STATUS_PROCESSING = 'PR'

data object is processing

STATUS_RESOLVING = 'RE'

data object is being resolved

STATUS_UPLOADING = 'UP'

data object is uploading

STATUS_WAITING = 'WT'

data object is waiting

checksum

checksum field calculated on inputs

collection

collection

delete(*args, **kwargs)[source]

Delete the data model.

descriptor

actual descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

data descriptor schema

duplicate(contributor=None, inherit_entity=False, inherit_collection=False)[source]

Duplicate (make a copy).

duplicated

duplication date and time

entity

entity

finished

process finished date date and time (set by resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives)

input

actual inputs used by the process

is_duplicate()[source]

Return True if data object is a duplicate.

location

data location

named_by_user

track if user set the data name explicitly

objects = <django.db.models.manager.ManagerFromDataQuerySet object>

manager

output

actual outputs of the process

parents

dependencies between data objects

process

process used to compute the data object

process_cores

actual allocated cores

process_error

error log message

process_info

info log message

process_memory

actual allocated memory

process_pid

process id

process_progress

progress

process_rc

return code

process_warning

warning log message

resolve_secrets()[source]

Retrieve handles for all basic:secret: fields on input.

The process must have the secrets resource requirement specified in order to access any secrets. Otherwise this method will raise a PermissionDenied exception.

Returns:A dictionary of secrets where key is the secret handle and value is the secret value.
save(render_name=False, *args, **kwargs)[source]

Save the data model.

save_dependencies(instance, schema)[source]

Save data: and list:data: references as parents.

save_storage(instance, schema)[source]

Save basic:json values to a Storage collection.

scheduled

date and time when process was dispatched to the scheduling system (set by``resolwe.flow.managers.dispatcher.Manager.run``

size

total size of data’s outputs in bytes

started

process started date and time (set by resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives)

status

Data status

It can be one of the following:

tags

tags for categorizing objects

class resolwe.flow.models.DataDependency(*args, **kwargs)[source]

Dependency relation between data objects.

KIND_IO = 'io'

child uses parent’s output as its input

KIND_SUBPROCESS = 'subprocess'

child was spawned by the parent

child

child data object

kind

kind of dependency

parent

parent data object

class resolwe.flow.models.DataLocation(*args, **kwargs)[source]

Location data of the data object.

get_path(prefix=None, filename=None)[source]

Compose data location path.

get_runtime_path(filename=None)[source]

Compose data runtime location path.

purged

indicate wether the object was processed by purge

subpath

subpath of data location

Entity–relationship model

Postgres ORM to define the entity–relationship model that describes how data objects are related in a specific domain.

class resolwe.flow.models.Entity(*args, **kwargs)[source]

Postgres model for storing entities.

collection

collection to which entity belongs

duplicate(contributor=None, inherit_collection=False)[source]

Duplicate (make a copy).

duplicated

duplication date and time

is_duplicate()[source]

Return True if entity is a duplicate.

move_to_collection(source_collection, destination_collection)[source]

Move entity to destination collection.

objects = <django.db.models.manager.ManagerFromEntityQuerySet object>

manager

type

entity type

class resolwe.flow.models.Relation(*args, **kwargs)[source]

Relations between entities.

The Relation model defines the associations and dependencies between entities in a given collection:

{
    "collection": "<collection_id>",
    "type": "comparison",
    "category": "case-control study",
    "entities": [
        {"enetity": "<entity1_id>", "label": "control"},
        {"enetity": "<entity2_id>", "label": "case"},
        {"enetity": "<entity3_id>", "label": "case"}
    ]
}

Relation type defines a specific set of associations among entities. It can be something like group, comparison or series. The relation type is an instance of RelationType and should be defined in any Django app that uses relations (e.g., as a fixture). Multiple relations of the same type are allowed on the collection.

Relation category defines a specific use case. The relation category must be unique in a collection, so that users can distinguish between different relations. In the example above, we could add another comparison relation of category, say Case-case study to compare <entity2> with <entity3>.

Relation is linked to resolwe.flow.models.Collection to enable defining different relations structures in different collections. This also greatly speed up retrieving of relations, as they are envisioned to be mainly used on a collection level.

unit defines units used in partitions where it is applicable, e.g. in relations of type series.

category

category of the relation

collection

collection to which relation belongs

entities

partitions of entities in the relation

type

type of the relation

unit

unit used in the partitions’ positions (where applicable, e.g. for serieses)

class resolwe.flow.models.RelationType(*args, **kwargs)[source]

Model for storing relation types.

name

relation type name

ordered

indicates if order of entities in relation is important or not

DescriptorSchema model

Postgres ORM model for storing descriptors.

class resolwe.flow.models.DescriptorSchema(*args, **kwargs)[source]

Postgres model for storing descriptors.

description

detailed description

schema

user descriptor schema represented as a JSON object

Process model

Postgres ORM model for storing processes.

class resolwe.flow.models.Process(*args, **kwargs)[source]

Postgres model for storing processes.

PERSISTENCE_CACHED = 'CAC'

cached persistence

PERSISTENCE_RAW = 'RAW'

raw persistence

PERSISTENCE_TEMP = 'TMP'

temp persistence

category

category

data_name

template for name of Data object created with Process

description

detailed description

entity_always_create

Create new entity, regardless of entity_input or entity_descriptor_schema fields.

entity_descriptor_schema

Slug of the descriptor schema assigned to the Entity created with entity_type.

entity_input

Limit the entity selection in entity_type to a single input.

entity_type

Automatically add Data object created with this process to an Entity object representing a data-flow. If all input Data objects belong to the same entity, add newly created Data object to it, otherwise create a new one.

get_resource_limits()[source]

Get the core count and memory usage limits for this process.

Returns:A dictionary with the resource limits, containing the following keys:
  • memory: Memory usage limit, in MB. Defaults to 4096 if not otherwise specified in the resource requirements.
  • cores: Core count limit. Defaults to 1.
Return type:dict
input_schema

process input schema (describes input parameters, form layout “Inputs” for Data.input)

Handling:

  • schema defined by: dev
  • default by: user
  • changable by: none
is_active

designates whether this process should be treated as active

output_schema

process output schema (describes output JSON, form layout “Results” for Data.output)

Handling:

  • schema defined by: dev
  • default by: dev
  • changable by: dev

Implicitly defined fields (by resolwe.flow.management.commands.register() or resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives):

  • progress of type basic:float (from 0.0 to 1.0)
  • proc of type basic:group containing:
    • stdout of type basic:text
    • rc of type basic:integer
    • task of type basic:string (celery task id)
    • worker of type basic:string (celery worker hostname)
    • runtime of type basic:string (runtime instance hostname)
    • pid of type basic:integer (process ID)
persistence

Persistence of Data objects created with this process. It can be one of the following:

Note

If persistence is set to PERSISTENCE_CACHED or PERSISTENCE_TEMP, the process must be idempotent.

requirements

process requirements

run

process command and environment description for internal use

Handling:

  • schema defined by: dev
  • default by: dev
  • changable by: dev
scheduling_class

process scheduling class

type

data type

Storage model

Postgres ORM model for storing JSON.

class resolwe.flow.models.Storage(*args, **kwargs)[source]

Postgres model for storing storages.

data

corresponding data objects

json

actual JSON stored

objects = <resolwe.flow.models.storage.StorageManager object>

storage manager

Secret model

Postgres ORM model for storing secrets.

class resolwe.flow.models.Secret(*args, **kwargs)[source]

Postgres model for storing secrets.

ProcessMigrationHistory model

Postgres ORM model for storing proces migration history.

class resolwe.flow.models.ProcessMigrationHistory(*args, **kwargs)[source]

Model for storing process migration history.

DataMigrationHistory model

Postgres ORM model for storing data migration history.

class resolwe.flow.models.DataMigrationHistory(*args, **kwargs)[source]

Model for storing data migration history.

Flow Utilities

Data Purge

resolwe.flow.utils.purge.get_purge_files(root, output, output_schema, descriptor, descriptor_schema)[source]

Get files to purge.

resolwe.flow.utils.purge.location_purge(location_id, delete=False, verbosity=0)[source]

Print and conditionally delete files not referenced by meta data.

Parameters:
  • location_id – Id of the DataLocation model that data objects reference to.
  • delete – If True, then delete unreferenced files.
resolwe.flow.utils.purge.purge_all(delete=False, verbosity=0)[source]

Purge all data locations.

Resolwe Exceptions Utils

Utils functions for working with exceptions.

resolwe.flow.utils.exceptions.resolwe_exception_handler(exc, context)[source]

Handle exceptions raised in API and make them nicer.

To enable this, you have to add it to the settings:

REST_FRAMEWORK = {
    'EXCEPTION_HANDLER': 'resolwe.flow.utils.exceptions.resolwe_exception_handler',
}

Statistics

Various statistical utilities, used mostly for manager load tracking.

class resolwe.flow.utils.stats.NumberSeriesShape[source]

Helper class for computing characteristics for numerical data.

Given a series of numerical data, the class will keep a record of the extremes seen, arithmetic mean and standard deviation.

to_dict()[source]

Pack the stats computed into a dictionary.

update(num)[source]

Update metrics with the new number.

class resolwe.flow.utils.stats.SimpleLoadAvg(intervals)[source]

Helper class for a sort of load average based on event times.

Given a series of queue depth events, it will compute the average number of events for three different window lengths, emulating a form of ‘load average’. The calculation itself is modelled after the Linux scheduler, with a 5-second sampling rate. Because we don’t get consistent (time-wise) samples, the sample taken is the average of a simple moving window for the last 5 seconds; this is to avoid numerical errors if actual time deltas were used to compute the scaled decay.

add(count, timestamp=None)[source]

Add a value at the specified time to the series.

Parameters:
  • count – The number of work items ready at the specified time.
  • timestamp – The timestamp to add. Defaults to None, meaning current time. It should be strictly greater (newer) than the last added timestamp.
to_dict()[source]

Pack the load averages into a nicely-keyed dictionary.

Flow Management

Delete Unreferenced Files

class resolwe.flow.management.commands.purge.Command(stdout=None, stderr=None, no_color=False, force_color=False)[source]

Purge files with no reference in Data objects, and orphaned storages.

add_arguments(parser)[source]

Command arguments.

handle(*args, **options)[source]

Call purge_all().

Register Processes

class resolwe.flow.management.commands.register.Command(stdout=None, stderr=None, no_color=False, force_color=False)[source]

Register processes.

add_arguments(parser)[source]

Command arguments.

find_descriptor_schemas(schema_file)[source]

Find descriptor schemas in given path.

find_schemas(schema_path, schema_type='process', verbosity=1)[source]

Find schemas in packages that match filters.

handle(*args, **options)[source]

Register processes.

register_descriptors(descriptor_schemas, user, force=False, verbosity=1)[source]

Read and register descriptors.

register_processes(process_schemas, user, force=False, verbosity=1)[source]

Read and register processors.

retire(process_schemas)[source]

Retire obsolete processes.

Remove old process versions without data. Find processes that have been registered but do not exist in the code anymore, then:

  • If they do not have data: remove them
  • If they have data: flag them not active (is_active=False)
valid(instance, schema)[source]

Validate schema.

Elastic

Framework for advanced indexing of Django models with ElasticSearch.

To register index processor, create elastic_indexes.py file int your app and put subclass of BaseIndex in it. It will automatically register and index all objects specified in it.

For building the index for the first time or manually updating it, run:

python manage.py elastic_index

Elastic Indices

Main two classes

class resolwe.elastic.indices.BaseDocument(meta=None, **kwargs)[source]

Base document class to build ElasticSearch documents.

This is standard elasticsearch-dsl DocType class with already added fields for handling permissions.

groups_with_permissions = None

list of group ids with view permission on the object

public_permission = None

identifies if object has public view permission assigned

users_with_permissions = None

list of user ids with view permission on the object

class resolwe.elastic.indices.BaseIndex[source]

Base index class.

Builds ElasticSearch index for specific type of objects. Index is based on document type defined in document_type. Fields are determined from document and are populated with one of the following methods (in the exact order):

  • get_<field_name>_value method is used
  • mapping[<feild_name>] is used - if value is callable, it is called with current object as only argument
  • value is extracted from the object’s field with the same name

To make the index, caall run function. Index is build for all objects in queryset. To build index for just one object, specify it in obj parameter of run function.

To work properly, subclass of this class must override following attributes:

  • object_type - class to which object must belong to be processed
  • document_class - subclass of BaseDocument that is used to build actual index

Additional (optional) methods and attributes that can be overriden are:

build(obj=None, queryset=None, push=True)[source]

Build indexes.

connection_thread_id = None

id of thread id where connection was established

create_mapping()[source]

Create the mappings in elasticsearch.

destroy()[source]

Destroy an index.

document_class = None

document class used to create index

filter(obj)[source]

Determine if object should be processed.

If False is returned, processingg of the current object will be aborted.

generate_id(obj)[source]

Generate unique document id for ElasticSearch.

get_dependencies()[source]

Return dependencies, which should trigger updates of this index.

get_object_id(obj)[source]

Return unique identifier of the object.

Object’s id is returned by default. This method can be overriden if object doesn’t have id attribute.

get_permissions(obj)[source]

Return users and groups with view permission on the current object.

Return a dict with two keys - users and groups - which contain list of ids of users/groups with view permission.

mapping = {}

mapping used for building document

object_type = None

type of object that are indexed, i.e. Django model

preprocess_object(obj)[source]

Preprocess object before indexing.

This function is called before func:process_object. It can be used for advanced pre-processing of the object, i.e. adding annotations that will be used in multiple fields.

process_object(obj)[source]

Process current object and push it to the ElasticSearch.

push()[source]

Push built documents to ElasticSearch.

push_queue = None

list of built documents waiting to be pushed

queryset = None

queryset of objects to index

remove_object(obj)[source]

Remove current object from the ElasticSearch.

search()[source]

Return search query of document object.

testing_postfix = ''

auto generated ES index postfix used in tests

Elastic Viewsets

class resolwe.elastic.viewsets.ElasticSearchMixin(*args, **kwargs)[source]

Mixin to use Django REST Framework with ElasticSearch based querysets.

This mixin adds following methods:
filter_permissions(search)[source]

Filter given query based on permissions of the user in the request.

Parameters:search – ElasticSearch query object

Filter given search by the filter parameter given in request.

Parameters:search – ElasticSearch query object
get_always_allowed_arguments()[source]

Return query arguments which are always allowed.

get_query_param(key, default=None)[source]

Get query parameter uniformly for GET and POST requests.

get_query_params()[source]

Get combined query parameters (GET and POST).

Order given search by the ordering parameter given in request.

Parameters:search – ElasticSearch query object

Elastic Index Builder

Elastic Paginators

Paginator classes used in Elastic app.

class resolwe.elastic.pagination.LimitOffsetPostPagination[source]

Limit/offset paginator.

This is standard limit/offset paginator from Django REST framework, with difference that it supports passing limit and offset attributes also in the body of the request (not just as query parameter).

Elastic Utils

Collection of convenient functions and shortcuts that simplifies using the app.

resolwe.elastic.utils.const(con)[source]

Define a constant mapping for elastic search index.

This helper may be used to define index mappings, where the indexed value is always set to a specific constant. Example:

mapping = {'field': const('I am a constant')}

Elastic Management commands

Elastic app includes following Django management commands:

Command: elastic_index

Command: elastic_mapping

Command: elastic_purge

Resolwe Test Framework

Resolwe Test Cases

class resolwe.test.TestCaseHelpers(methodName='runTest')[source]

Mixin for test case helpers.

assertAlmostEqualGeneric(actual, expected, msg=None)[source]

Assert almost equality for common types of objects.

This is the same as assertEqual(), but using assertAlmostEqual() when floats are encountered inside common containers (currently this includes dict, list and tuple types).

Parameters:
  • actual – object to compare
  • expected – object to compare against
  • msg – optional message printed on failures
keep_data(mock_purge=True)[source]

Do not delete output files after tests.

setUp()[source]

Prepare environment for test.

tearDown()[source]

Cleanup environment.

class resolwe.test.TransactionTestCase(methodName='runTest')[source]

Base class for writing Resolwe tests not enclosed in a transaction.

It is based on Django’s TransactionTestCase. Use it if you need to access the test’s database from another thread/process.

setUp()[source]

Initialize test data.

class resolwe.test.TestCase(methodName='runTest')[source]

Base class for writing Resolwe tests.

It is based on TransactionTestCase and Django’s TestCase. The latter encloses the test code in a database transaction that is rolled back at the end of the test.

class resolwe.test.ProcessTestCase(methodName='runTest')[source]

Base class for writing process tests.

It is a subclass of TransactionTestCase with some specific functions used for testing processes.

To write a process test use standard Django’s syntax for writing tests and follow the next steps:

  1. Put input files (if any) in tests/files directory of a Django application.
  2. Run the process using run_process().
  3. Check if the process has the expected status using assertStatus().
  4. Check process’s output using assertFields(), assertFile(), assertFileExists(), assertFiles() and assertJSON().

Note

When creating a test case for a custom Django application, subclass this class and over-ride the self.files_path with:

self.files_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'files')

Danger

If output files don’t exist in tests/files directory of a Django application, they are created automatically. But you have to check that they are correct before using them for further runs.

assertDir(obj, field_path, fn)[source]

Compare process output directory to correct compressed directory.

Parameters:
  • obj (Data) – object that includes the directory to compare
  • field_path (str) – path to Data object’s field with the file name
  • fn (str) – file name (and relative path) of the correct compressed directory to compare against. Path should be relative to the tests/files directory of a Django application. Compressed directory needs to be in tar.gz format.
assertDirExists(obj, field_path)[source]

Assert that a directory in the output field of the given object exists.

Parameters:
  • obj – object that includes the file for which to check if it exists
  • field_path – directory name/path
assertDirStructure(obj, field_path, dir_struct, exact=True)[source]

Assert correct tree structure in output field of given object.

Only names of directories and files are asserted. Content of files is not compared.

Parameters:
  • obj (Data) – object that includes the directory to compare
  • dir_path (str) – path to the directory to compare
  • dir_struct (dict) – correct tree structure of the directory. Dictionary keys are directory and file names with the correct nested structure. Dictionary value associated with each directory is a new dictionary which lists the content of the directory. Dictionary value associated with each file name is None
  • exact (bool) – if True tested directory structure must exactly match dir_struct. If False dir_struct must be a partial structure of the directory to compare
assertFields(obj, path, value)[source]

Compare object’s field to the given value.

The file size is ignored. Use assertFile to validate file contents.

Parameters:
  • obj (Data) – object with the field to compare
  • path (str) – path to Data object’s field
  • value (str) – desired value of Data object’s field
assertFile(obj, field_path, fn, **kwargs)[source]

Compare a process’s output file to the given correct file.

Parameters:
  • obj (Data) – object that includes the file to compare
  • field_path (str) – path to Data object’s field with the file name
  • fn (str) – file name (and relative path) of the correct file to compare against. Path should be relative to the tests/files directory of a Django application.
  • compression (str) – if not None, files will be uncompressed with the appropriate compression library before comparison. Currently supported compression formats are gzip and zip.
  • filter (FunctionType) – function for filtering the contents of output files. It is used in itertools.filterfalse() function and takes one parameter, a line of the output file. If it returns True, the line is excluded from comparison of the two files.
  • sort (bool) – if set to True, basic sort will be performed on file contents before computing hash value.
assertFileExists(obj, field_path)[source]

Ensure a file in the given object’s field exists.

Parameters:
  • obj (Data) – object that includes the file for which to check if it exists
  • field_path (str) – path to Data object’s field with the file name/path
assertFiles(obj, field_path, fn_list, **kwargs)[source]

Compare a process’s output file to the given correct file.

Parameters:
  • obj (Data) – object which includes the files to compare
  • field_path (str) – path to Data object’s field with the list of file names
  • fn_list (list) – list of file names (and relative paths) of files to compare against. Paths should be relative to the tests/files directory of a Django application.
  • compression (str) – if not None, files will be uncompressed with the appropriate compression library before comparison. Currently supported compression formats are gzip and zip.
  • filter (FunctionType) – Function for filtering the contents of output files. It is used in itertools.filterfalse function and takes one parameter, a line of the output file. If it returns True, the line is excluded from comparison of the two files.
  • sort (bool) – if set to True, basic sort will be performed on file contents before computing hash value.
assertFilesExist(obj, field_path)[source]

Ensure files in the given object’s field exists.

Parameters:
  • obj (Data) – object that includes list of files for which to check existance
  • field_path (str) – path to Data object’s field with the file name/path
assertJSON(obj, storage, field_path, file_name)[source]

Compare JSON in Storage object to the given correct JSON.

Parameters:
  • obj (Data) – object to which the Storage object belongs
  • storage (Storage or str) – object or id which contains JSON to compare
  • field_path (str) – path to JSON subset in the Storage’s object to compare against. If it is empty, the entire object will be compared.
  • file_name (str) –

    file name (and relative path) of the file with the correct JSON to compare against. Path should be relative to the tests/files directory of a Django application.

    Note

    The given JSON file should be compresed with gzip and have the .gz extension.

assertStatus(obj, status)[source]

Check if object’s status is equal to the given status.

Parameters:
  • obj (Data) – object for which to check the status
  • status (str) – desired value of object’s status attribute
files_path

Path to test files.

get_json(file_name, storage)[source]

Return JSON saved in file and test JSON to compare it to.

The method returns a tuple of the saved JSON and the test JSON. In your test you should then compare the test JSON to the saved JSON that is commited to the repository.

The storage argument could be a Storage object, Storage ID or a Python dictionary. The test JSON is assigned a json field of the Storage object or the complete Python dictionary (if a dict is given).

If the file does not exist it is created, the test JSON is written to the new file and an exception is rased.

Parameters:
  • file_name (str) – file name (and relative path) of a JSON file. Path should be relative to the tests/files directory of a Django app. The file name must have a .gz extension.
  • storage (Storage, str or dict) – Storage object, Storage ID or a dict.
Returns:

(reference JSON, test JSON)

Return type:

tuple

preparation_stage()[source]

Context manager to mark input preparation stage.

run_process(process_slug, input_={}, assert_status='OK', descriptor=None, descriptor_schema=None, verbosity=0, tags=None)[source]

Run the specified process with the given inputs.

If input is a file, file path should be given relative to the tests/files directory of a Django application. If assert_status is given, check if Data object’s status matches it after the process has finished.

Note

If you need to delay calling the manager, you must put the desired code in a with transaction.atomic() block.

Parameters:
  • process_slug (str) – slug of the Process to run
  • input_ (dict) –

    Process’s input parameters

    Note

    You don’t have to specify parameters with defined default values.

  • assert_status (str) – desired status of the Data object
  • descriptor (dict) – descriptor to set on the Data object
  • descriptor_schema (dict) – descriptor schema to set on the Data object
  • tags (list) – list of tags that will be added to the created Data object
Returns:

object created by Process

Return type:

Data

run_processor(*args, **kwargs)[source]

Run process.

Deprecated method: use run_process.

setUp()[source]

Initialize test data.

tearDown()[source]

Clean up after the test.

class resolwe.test.TransactionResolweAPITestCase(methodName='runTest')[source]

Base class for testing Resolwe REST API.

This class is derived from Django REST Framework’s APITransactionTestCase class and has implemented some basic features that makes testing Resolwe API easier. These features includes following functions:

_get_list(user=None, query_params={})[source]

Make GET request to self.list_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:user (User or None) – User to authenticate in request
Returns:API response object
Return type:Response
_get_detail(pk, user=None, query_params={})[source]

Make GET request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object
  • user (User or None) – User to authenticate in request
Returns:

API response object

Return type:

Response

_post(data={}, user=None, query_params={})[source]

Make POST request to self.list_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • data (dict) – data for posting in request’s body
  • user (User or None) – User to authenticate in request
Returns:

API response object

Return type:

Response

_patch(pk, data={}, user=None, query_params={})[source]

Make PATCH request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object
  • data (dict) – data for posting in request’s body
  • user (User or None) – User to authenticate in request
Returns:

API response object

Return type:

Response

_delete(pk, user=None, query_params={})[source]

Make DELETE request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object
  • user (User or None) – User to authenticate in request
Returns:

API response object

Return type:

Response

_detail_permissions(pk, data={}, user=None)[source]

Make POST request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object
  • data (dict) – data for posting in request’s body
  • user (User or None) – User to authenticate in request
Returns:

API response object

Return type:

Response

It also has included 2 views made from referenced DRF’s ViewSet. First mimic list view and has following links between request’s methods and ViewSet’s methods:

  • GET -> list
  • POST -> create

Second mimic detail view and has following links between request’s methods and ViewSet’s methods:

  • GET -> retrieve
  • PUT -> update
  • PATCH -> partial_update
  • DELETE -> destroy
  • POST -> permissions

If any of the listed methods is not defined in the VievSet, corresponding link is omitted.

Note

self.viewset (instance of DRF’s Viewset) and self.resource_name (string) must be defined before calling super setUp method to work properly.

self.factory is instance of DRF’s APIRequestFactory.

assertKeys(data, wanted)[source]

Assert dictionary keys.

detail_permissions(pk)[source]

Get detail permissions url.

detail_url(pk)[source]

Get detail url.

list_url

Get list url.

setUp()[source]

Prepare data.

class resolwe.test.ResolweAPITestCase(methodName='runTest')[source]

Base class for writing Resolwe API tests.

It is based on TransactionResolweAPITestCase and Django’s TestCase. The latter encloses the test code in a database transaction that is rolled back at the end of the test.

Resolwe Test Helpers and Decorators

resolwe.test.utils.check_installed(command)[source]

Check if the given command is installed.

Parameters:command (str) – name of the command
Returns:(indicator of the availability of the command, message saying command is not available)
Return type:tuple(bool, str)
resolwe.test.utils.check_docker()[source]

Check if Docker is installed and working.

Returns:(indicator of the availability of Docker, reason for unavailability)
Return type:tuple(bool, str)
resolwe.test.utils.with_custom_executor(wrapped=None, **custom_executor_settings)[source]

Decorate unit test to run processes with a custom executor.

Parameters:custom_executor_settings (dict) – custom FLOW_EXECUTOR settings with which you wish to override the current settings
resolwe.test.utils.with_docker_executor(wrapped=None)[source]

Decorate unit test to run processes with the Docker executor.

resolwe.test.utils.with_null_executor(wrapper=None, enabled=None, adapter=None)[source]

Decorate unit test to run processes with the Null executor.

resolwe.test.utils.with_resolwe_host(wrapper=None, enabled=None, adapter=None)[source]

Decorate unit test to give it access to a live Resolwe host.

Set RESOLWE_HOST_URL setting to the address where the testing live Resolwe host listens to.

Note

This decorator must be used with a (sub)class of LiveServerTestCase which starts a live Django server in the background.

resolwe.test.utils.is_testing()[source]

Return current testing status.

This assumes that the Resolwe test runner is being used.

Resolwe Utilities

class resolwe.utils.BraceMessage(fmt, *args, **kwargs)[source]

Log messages with the new {}-string formatting syntax.

Note

When using this helper class, one pays no significant performance penalty since the actual formatting only happens when (and if) the logged message is actually outputted to a log by a handler.

Example of usage:

from resolwe.utils import BraceMessage as __

logger.error(__("Message with {0} {name}", 2, name="placeholders"))

Source: https://docs.python.org/3/howto/logging-cookbook.html#use-of-alternative-formatting-styles.