PyMPX.queuing module - reference

Queuing - a solution to an ETL problem

Extract Transform and Load (ETL) processes can take a long time to run. Because the data that ETL processes receive can have subtle inconsistencies, and because the servers that ETL processes run on can be capricious, ETL code can and does fail. Thus there is a real need for tools for writing quick, robust ETL code.

One such method is processing data in parallel, and one of the best ways of running code in parallel, is to put work on a queue for multiple processors to pick up and run. Unfortunately, queues are usually designed to run fast, and in memory, and if a process or entire server fails, our only option is to go back and start again.

So there is a requirement for robust queues, that can pick up work part way through, after a failure has occurred.

This module contains various Queues we’ve used at Metapraxis (https://www.metapraxis.com) to overcome some of the ETL challenges we have faced, when loading data into Empower.

class pympx.queuing.PersistentQueue(pickup_file_prefix=None, current_pickup_dir=None, previous_pickup_dir=None, formatter_list=[<function PersistentQueue.return_same>], converter_list=[<function PersistentQueue.return_same>], number_of_workers=1, use_queue=True)[source]

The PersistentQueue behaves like a normal multiprocessing queue, only it records the messages passed to it, so that it can requeue work after a failure.

By maintaining a list of messages in files we can pick up unprocessed work when the processing fails mid way through. The files are kept as a pair, the put_file and the done_file that are kept in a new directory for each procesing in run. The put file records work put on to the queue and the done file records work which has been done. After a failure the difference between the files shows outstanding work, and there are methods in this class to requeue that work.

Attributes
done_file

done_file is the open file that records work on the queue which has had task_done() called on it

put_file

The put_file is the open file that records work put() on the queue

Methods

close()

As a producer, close the queue for new inputs, and place appropriate DONE messages on the queue, so that consumers know that all work has been done, and they can stop.

dispose()

As a consumer, stop using the queue, and crucially, close the done_file, which records successful work completion

fail()

Close the queue for new inputs, and place appropriate FAILED messages on the queue, so that consumers know that an upstream process has failed, and they can stop.

get([block, timeout])

Remove and return an item from the queue.

get_list_of_outstanding_work()

Return a list of items on the queue which had not been completed when this queue was last created.

get_nowait()

Equivalent to get(False).

mark_all_work_done()

Mark all of the work in the queue as done, for the purposes of picking up from a failed laod.

put(obj[, block, timeout])

Put item into the queue.

put_nowait(obj)

Equivalent to put(item, False).

requeue_unprocessed_work()

Requeue any work that was not processed during the previous (failed) load.

return_same()

Default message formatter, used instead of lambda x:x because queues need to be pickled, and lambdas cannot be pickled

task_done(obj)

Write the object to the pickups complete file, then call task_done() on the underlying queue.

__init__(pickup_file_prefix=None, current_pickup_dir=None, previous_pickup_dir=None, formatter_list=[<function PersistentQueue.return_same>], converter_list=[<function PersistentQueue.return_same>], number_of_workers=1, use_queue=True)[source]

Create a multiprocessing Queue, and also create files which record work that has been requested and completed

The pickups directories hold files that allow us to requeue work if there is a failure. The files are groups of _requested and _completed files, which, when read in total, can tell us how much work remains outstanding.

By calling requeue_unprocessed_work() on this queue, unfinished work can be re-queued when a failed process is restarted.

Parameters
  • current_pickup_dir – The directory that will hold pickup files from this run of the process

  • previous_pickup_dir – The directory that will hold pickup files from the previous run of this process

  • pickup_file_prefix – The common prefix for pickup requested and completed files

  • formatter_list – list of functions which will format the objects which are in a message into a string for writing to the pickups file

  • converter_list – list of functions which will convert the strings in the pickups file into an object for a message

  • number_of_workers – how many workers will be pulling work off the queue - we need to know this so we can put the correct number of done messages on the queue

  • use_queue – If set to false we requeue using only the pickups files and not actual queue - this queue can’t be used for multiple simultaneous processes. All work must be written before any work is done

close()[source]

As a producer, close the queue for new inputs, and place appropriate DONE messages on the queue, so that consumers know that all work has been done, and they can stop.

dispose()[source]

As a consumer, stop using the queue, and crucially, close the done_file, which records successful work completion

property done_file

done_file is the open file that records work on the queue which has had task_done() called on it

fail()[source]

Close the queue for new inputs, and place appropriate FAILED messages on the queue, so that consumers know that an upstream process has failed, and they can stop.

get(block=True, timeout=None)[source]

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Prior to 3.0 on POSIX systems, and for all versions on Windows, if block is true and timeout is None, this operation goes into an uninterruptible wait on an underlying lock. This means that no exceptions can occur, and in particular a SIGINT will not trigger a KeyboardInterrupt.

get_list_of_outstanding_work()[source]

Return a list of items on the queue which had not been completed when this queue was last created. The queue uses the files in previous_pickup_dir to compute work that needs to be redone.

get_nowait()[source]

Equivalent to get(False).

mark_all_work_done()[source]

Mark all of the work in the queue as done, for the purposes of picking up from a failed laod.

We need to mark all the work as done at once if it is done as a logical whole. This function calls task_done() on all of the messages left in the queue.

put(obj, block=True, timeout=None)[source]

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If `timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

property put_file

The put_file is the open file that records work put() on the queue

put_nowait(obj)[source]

Equivalent to put(item, False).

requeue_unprocessed_work()[source]

Requeue any work that was not processed during the previous (failed) load. Only do this after this queue has been sent to child processes

return_same()[source]

Default message formatter, used instead of lambda x:x because queues need to be pickled, and lambdas cannot be pickled

task_done(obj)[source]

Write the object to the pickups complete file, then call task_done() on the underlying queue. This will indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

class pympx.queuing.WritableFolderQueue(path, message_extension='.tsv', task_done_directory=None)[source]

A queue which uses a folder (directory) to store its messages. This queue is designed to be used in conjunction with a ReadableFolderQueue or ReadableGroupedFileFolderQueue. This version of the queue is used by the process putting work on to the queue. During processing, or after a failure, it is possible to inspect the directories that the data files reside in, to determine the state of a process which uses the queues. Messages are the data files that need to be processed. Queue like behaviour is maintained by renaming of files when getting them off of the queue.

Methods

close()

As a producer, close the queue for new inputs, and place appropriate DONE messages on the queue, so that consumers know that all work has been done, and they can stop.

dispose()

As a consumer, stop using the queue.

fail()

Close the queue for new inputs, and place appropriate FAILED messages on the queue, so that consumers know that an upstream process has failed, and they can stop.

original_filename_from_wip(msg)

The original name of a work-in-progress file for a given file name on this queue.

purge()

Remove all messages and files from the queue.

purge_control_messages()

Remove old control messages from previous runs

put(msg[, copy])

Put a message on the queue.

reset()

Remove old control messages from previous runs, reset partially processed files to their original names

reset_processing_files()

Reset partially processed files to their original names

sorting_function(filename)

Apply this queue's sorting function to the filename, returning the string that will be sorted in order to determine which file is processed first

wip_filename(msg)

The path to a work-in-progress file for a given file name on the queue.

__init__(path, message_extension='.tsv', task_done_directory=None)[source]

Create an instance of a WritableFolderQueue

Parameters
  • path – Directory where files put onto the queue, and control messages will be stored. Must match the directory specified in related ReadableFolderQueue objects

  • message_extension – Files with this extension placed into the queue will be considered messages to be processed

  • task_done_directory – A directory where files are moved to when task_done() is called on a processed file. If this directory is not set, then processed files must be moved by teh calling process.

close()[source]

As a producer, close the queue for new inputs, and place appropriate DONE messages on the queue, so that consumers know that all work has been done, and they can stop.

dispose()

As a consumer, stop using the queue. Currently this method does nothing, and merely keeps the queue interface aligned, but may be implemented in future if it is needed for orderly queue shut down.

fail()[source]

Close the queue for new inputs, and place appropriate FAILED messages on the queue, so that consumers know that an upstream process has failed, and they can stop.

original_filename_from_wip(msg)

The original name of a work-in-progress file for a given file name on this queue. I.e. The name of the file before it was picked up for processing.

Parameters

msg – File name or file path of the message file, after it was renamed to a work-in-progress file

purge()[source]

Remove all messages and files from the queue. Files and directories in the queue folder will be deleted. Permanently.

purge_control_messages()[source]

Remove old control messages from previous runs

put(msg, copy=False)[source]

Put a message on the queue. The message should be a valid path to file. The file will be renamed and moved or copied into the queue folder.

Parameters
  • msg – The source file path. The file will be put onto the queue with some added metadata in the filename

  • copy – the default behaviour is to move the file into the directory queue. If copy is True, then the file will be copied in, and the original left in place.

reset()[source]

Remove old control messages from previous runs, reset partially processed files to their original names

reset_processing_files()[source]

Reset partially processed files to their original names

sorting_function(filename)

Apply this queue’s sorting function to the filename, returning the string that will be sorted in order to determine which file is processed first

wip_filename(msg)

The path to a work-in-progress file for a given file name on the queue.

Parameters

msg – Original file name or file path of the message file, before it is renamed to a work-in-progress file

class pympx.queuing.ReadableFolderQueue(path, message_extension='.tsv', task_done_directory=None, sorting_function=<function _FolderQueue.<lambda>>)[source]

A queue which uses a folder (directory) to store its messages. This queue is designed to be used in conjunction with a WritableFolderQueue. This version of the queue is used by the process getting work off of the queue. During processing, or after a failure, it is possible to inspect the directories that the data files reside in, to determine the state of a process which uses the queues. Messages are the data files that need to be processed. Queue like behaviour is maintained by renaming of files when getting them off of the queue.

Methods

dispose()

As a consumer, stop using the queue.

get([timeout])

Return the file name of the next file to process.

mark_all_work_done()

Mark all of the work in the queue as done, for the purposes of pickups

original_filename_from_wip(msg)

The original name of a work-in-progress file for a given file name on this queue.

sorting_function(filename)

Apply this queue's sorting function to the filename, returning the string that will be sorted in order to determine which file is processed first

task_done(obj)

Mark a completed task as done, by removing the work in progress file

wip_filename(msg)

The path to a work-in-progress file for a given file name on the queue.

__init__(path, message_extension='.tsv', task_done_directory=None, sorting_function=<function _FolderQueue.<lambda>>)
Parameters
  • path – Directory which files for this process get put into

  • message_extension – Files with this extension will be treated as messages

  • task_done_directory – Completed files will be moved to his directory when task_done is called on the message (i.e. called with the file_path returned by .get())

  • sorting_function – function used sorting the file names of the files in the queue. This function will be applied to each file on the queue and must return a string. The strings will then be sorted in order to determine which file is processed first. Remember that files not on the queue yet will not be involved in the sort. The sorting function must catch errors and return None if there has been an error. The default is to return the same filename that was passed in, resulting in files being processed in name order.

dispose()

As a consumer, stop using the queue. Currently this method does nothing, and merely keeps the queue interface aligned, but may be implemented in future if it is needed for orderly queue shut down.

get(timeout=None)[source]

Return the file name of the next file to process. The message file will be renamed and remain in the folder, and the new name will be returned.

Parameters

timeout – Time in milliseconds to wait for a message on an empty queue before raising an Empty exception.

mark_all_work_done()[source]

Mark all of the work in the queue as done, for the purposes of pickups

We would need to mark all the work as done at once if the work should be done as a logical whole. This function calls task_done() on all of the messages left in the queue

original_filename_from_wip(msg)

The original name of a work-in-progress file for a given file name on this queue. I.e. The name of the file before it was picked up for processing.

Parameters

msg – File name or file path of the message file, after it was renamed to a work-in-progress file

sorting_function(filename)

Apply this queue’s sorting function to the filename, returning the string that will be sorted in order to determine which file is processed first

task_done(obj)[source]

Mark a completed task as done, by removing the work in progress file

Parameters

obj – a dictionary containing a header, or uuid key, or a uuid.uuid1() instance or string representing a uuid1, refering to a message on the queue

wip_filename(msg)

The path to a work-in-progress file for a given file name on the queue.

Parameters

msg – Original file name or file path of the message file, before it is renamed to a work-in-progress file

class pympx.queuing.ReadableGroupedFileFolderQueue(path, message_extension='.tsv', task_done_directory=None, grouping_function=<function ReadableGroupedFileFolderQueue.<lambda>>, item_function=<function ReadableGroupedFileFolderQueue.<lambda>>, list_of_items_in_group=None)[source]

An abstraction over a folder that is being used for work passing between processes.

The functionality it has beyond that of a ReadableFolderQueue is that it returns sets of files. Sometimes the logical unit of work is a set of files (for instance covering all of the units of a business) and our process will need to wait until all of the files have been created before processing that list of files at once.

Get returns a list of file names (i.e. of the renamed files - ready for use) or DONE or FAILED

Methods

dispose()

As a consumer, stop using the queue.

get([timeout])

Return the file names of the next group of files to process in a list.

mark_all_work_done()

Mark all of the work in the queue as done, for the purposes of pickups

original_filename_from_wip(msg)

The original name of a work-in-progress file for a given file name on this queue.

sorting_function(filename)

Apply this queue's sorting function to the filename, returning the string that will be sorted in order to determine which file is processed first

task_done(obj)

Mark a completed task as done, by removing the work in progress file

wip_filename(msg)

The path to a work-in-progress file for a given file name on the queue.

__init__(path, message_extension='.tsv', task_done_directory=None, grouping_function=<function ReadableGroupedFileFolderQueue.<lambda>>, item_function=<function ReadableGroupedFileFolderQueue.<lambda>>, list_of_items_in_group=None)[source]

Create an instance of a ReadableGroupedFileFolderQueue

Parameters
  • path – Directory which files for this process get put into

  • message_extension – Files with this extension will be treated as messages

  • number_of_workers – The number of workers who will be taking messages off of this queue. By knowing the number of workers we can put enough DONE messages on the queue for them all

  • grouping_function – A function, that when applied to a filename will get the string that represents the group of files

  • item_function – A function, that when applied to a filename will get the string that represents the item in the group. This item must be present in the list of items that is parameter list_of_items_in_set

  • list_of_items_in_group – a list of strings with all of the items that make up a complete group of files. When get() is called the Queue will be searched for a full set of files (that return the same string from the grouping function and the fulle set of strings using the item function)

  • task_done_directory – Completed files will be moved to his directory when task_done is called on the message (i.e. called with the file_path returned by .get())

dispose()

As a consumer, stop using the queue. Currently this method does nothing, and merely keeps the queue interface aligned, but may be implemented in future if it is needed for orderly queue shut down.

get(timeout=None)[source]

Return the file names of the next group of files to process in a list. The message files will be renamed and remain in the folder, and the new names will be returned in the list.

Parameters

timeout – Time in milliseconds to wait for a message on an empty queue before raising an Empty exception.

mark_all_work_done()

Mark all of the work in the queue as done, for the purposes of pickups

We would need to mark all the work as done at once if the work should be done as a logical whole. This function calls task_done() on all of the messages left in the queue

original_filename_from_wip(msg)

The original name of a work-in-progress file for a given file name on this queue. I.e. The name of the file before it was picked up for processing.

Parameters

msg – File name or file path of the message file, after it was renamed to a work-in-progress file

sorting_function(filename)

Apply this queue’s sorting function to the filename, returning the string that will be sorted in order to determine which file is processed first

task_done(obj)

Mark a completed task as done, by removing the work in progress file

Parameters

obj – a dictionary containing a header, or uuid key, or a uuid.uuid1() instance or string representing a uuid1, refering to a message on the queue

wip_filename(msg)

The path to a work-in-progress file for a given file name on the queue.

Parameters

msg – Original file name or file path of the message file, before it is renamed to a work-in-progress file