SolrClient.IndexQ module

Really simple filesystem based queue for de-coupling data getting/processing from indexing into solr. All in all, this module is nothing special, but I suspect a lot of people write something similar; so maybe it will save you some time.

For example, lets say you are working with some data processing that exhibits the following:

  • Outputs a large number of items
  • These items are possibly small
  • You want to process them as fast as possible
  • They don’t have to be indexed right away

Log parsing is a good example; if you wanted to parse a log file and index that data into Solr you would not send an individual update request for each line and instead aggregate them into something more substantial. This can especially become a problem if you are parsing log files with some parallelism.

This is the issue that this sub module resolves. It allows you to create a quick file system based queue of items and then index them into Solr later. It will also maintain an internal buffer and add items to it until a specific size is reached before writing it out to the file system.

Here is the really basic example to illustrate the concept.:

>>> from SolrClient import SolrClient, IndexQ
>>> index = IndexQ('.','testq',size=1)
>>> index.add({'id':'test1'})
17 #By default it returns the buffer offset
>>> index.get_all_as_list()
[]
>>> index.add(finalize=True)
'./testq/todo/testq_2015-10-20-19-7-58-5219.json' #If file was written it will return the filename
>>> index.get_all_as_list()
['./testq/todo/testq_2015-10-20-19-7-58-5219.json']
>>> solr = SolrClient('http://localhost:7050/solr')
>>> index.index(solr,'SolrClient_unittest')

Note that you don’t have to track the output of add method, it is just there to give you a better idea of what it is doing. You can also specify threads to index method to run this quicker, by default it will use one thread. There is also some logging to provide you a better idea of what it is doing.

class SolrClient.IndexQ(basepath, queue, compress=False, compress_complete=False, size=0, devel=False, threshold=0.9, log=None, rotate_complete=None, **kwargs)

IndexQ sub module will help with indexing content into Solr. It can be used to de-couple data processing from indexing.

Each queue is set up with the following directory structure queue_name/

  • todo/
  • done/

Items get saved to the todo directory and once an item is processed it gets moved to the done directory. Items are processed in chronological order.

add(item=None, finalize=False, callback=None)

Takes a string, dictionary or list of items for adding to queue. To help troubleshoot it will output the updated buffer size, however when the content gets written it will output the file path of the new file. Generally this can be safely discarded.

Parameters:
  • item (<dict,list>) – Item to add to the queue. If dict will be converted directly to a list and then to json. List must be a list of dictionaries. If a string is submitted, it will be written out as-is immediately and not buffered.
  • finalize (bool) – If items are buffered internally, it will flush them to disk and return the file name.
  • callback – A callback function that will be called when the item gets written to disk. It will be passed one position argument, the file path of the file written. Note that errors from the callback method will not be re-raised here.
complete(filepath)

Marks the item as complete by moving it to the done directory and optionally gzipping it.

get_all_as_list(dir='_todo_dir')

Returns a list of the the full path to all items currently in the todo directory. The items will be listed in ascending order based on filesystem time. This will re-scan the directory on each execution.

Do not use this to process items, this method should only be used for troubleshooting or something axillary. To process items use get_todo_items() iterator.

get_all_json_from_indexq()

Gets all data from the todo files in indexq and returns one huge list of all data.

get_multi_q(sentinel='STOP')

This helps indexq operate in multiprocessing environment without each process having to have it’s own IndexQ. It also is a handy way to deal with thread / process safety.

This method will create and return a JoinableQueue object. Additionally, it will kick off a back end process that will monitor the queue, de-queue items and add them to this indexq.

The returned JoinableQueue object can be safely passed to multiple worker processes to populate it with data.

To indicate that you are done writing the data to the queue, pass in the sentinel value (‘STOP’ by default).

Make sure you call join_indexer() after you are done to close out the queue and join the worker.

get_todo_items(**kwargs)

Returns an iterator that will provide each item in the todo queue. Note that to complete each item you have to run complete method with the output of this iterator.

That will move the item to the done directory and prevent it from being retrieved in the future.

index(solr, collection, threads=1, send_method='stream_file', **kwargs)

Will index the queue into a specified solr instance and collection. Specify multiple threads to make this faster, however keep in mind that if you specify multiple threads the items may not be in order. Example:

solr = SolrClient('http://localhost:8983/solr/')
for doc in self.docs:
    index.add(doc, finalize=True)
index.index(solr,'SolrClient_unittest')
Parameters:
  • solr (object) – SolrClient object.
  • collection (string) – The name of the collection to index document into.
  • threads (int) – Number of simultaneous threads to spin up for indexing.
  • send_method (string) – SolrClient method to execute for indexing. Default is stream_file
join_indexer()