Bio and Publications

Tuesday, February 28, 2012

Interesting...

Check this out: http://hyperpolyglot.org/scripting.

It's a side-by-side comparison of PHP, Perl, Python, Ruby.

I'm not sure why, but it seems sort of cool.

Things like "offside rule" to describe indentation in Python are confusing at first.  The "regions which define local scope" section on Python makes precious little sense.  The "null test" is inaccurate.  The "Here Document" omits mention of the exec or eval functions.

The "arrays" and "dictionaries" are merely a subset of the built-in structures in Python.  I guess it's tedious to enumerate all the Python features which are lacking from other languages.  Passing numbers or strings by reference in Python is described as "not possible" when actually it's "the only way"; except strings and numbers are immutable, so the distinction is important.

The "C-style for" omits for i in range(10) as an equivalent.  The file-handling section omits mentioning the with statement.

More importantly, the very idea of side-by-side comparison is flawed.  Python features like generator functions, list comprehensions and context managers can't easily be displayed in a chart like this because they don't have trivial mappings to other languages.

The languages are actually different and no simple compiler can translate among them.  That means the side-by-side chart must be misrepresenting each language is small (but important) ways.

Still.  It's very thorough and covers a lot of territory.

Thursday, February 23, 2012

Is Django Suitable?

I got a long list of requirements from a firm that's looking to build a related family of web sites.  They were down to a Django vs. Ruby-on-Rails decision.

As you can see, they've done their homework in thinking through their needs.

I grouped their "high-level requirements" into several categories.  I summarizes the fit with Django here, and provided into details separately.
  • Authentication.  Django supports flexible logins and Python makes it easy to adapt other security API's.  Django and Python assure that this is a solid 10.  
  • Shared Code.  This is handled through Python features that are central to the Django framework.  Shared code management -- with appropriate overrides and customization -- is part of Python and a 10. 
  • Database Access.  While Django provides the necessary access features, database scalability depends on the implementation of the database engine itself.  There are numerous parallelization features that must all be used to maximize database throughput.  Even though the real responsibility for performance is outside Django, the Django flexibility results in a 10.
  • AJAX and Javascript.  Django supports the necessary RESTful API's.  However, Django treats JavaScript as simple static content, offering little specific support.  Since JavaScript support is not an essential part of Django, perhaps this is only a 5.
  • Applications.  The various applications described in the requirements are more-or-less irrelevant to Django.  They can be built easily, but are not first-class features of Django.  In the sense of easy-to-develop, this is a solid 10.  In the sense of already-existing-applications, this may be a 5 if the applications are part of a community like Pinax.  Because the applications do not already exist, this may also be a 0.
  • API.  Python allows use of any API.  Django's transparent use of Python makes it easy to build API's.  This is a feature of Python and scores 10 out of 10.
  • Usability and Developer Skills.  Django's ease-of-use is a direct consequence of the Python programming language. The developers of Django make excellent use of the Python language,  giving this a 10.
  • Performance, Access and Scalability.  For the most part, Django performance comes from consideration of the purpose of all layers of the architecture.  Principle design features include keeping static content separate from dynamic content (reducing Django's workload), and optimizing the database (to hande concurrent access well).  Django provides several internal design features to minimize memory.  Django  encourages proper separation of concerns, giving a 10.
In each of these areas, it's possible to dive into considerable depth.  It was tempting to offer up proof-of-concept code for some of the questions.

Tuesday, February 21, 2012

MADExpo

Looks like fun: Mid-Atlantic Developer Expo.

June 27-29 2012 in Hampton, VA.

Got a nice reminder through the 757 Labs mailing list.

Community is a good thing.

Thursday, February 16, 2012

The Estimation Charade

"In reality, most projects worth doing are not repetitions of previous things."

Thank you for that.  

If it has been done before -- same problem -- same technology -- then we should be able to clone that solution and avoid creating a software development project.  If there's something novel -- new problem -- new technology -- then we can't easily predict the whole sweep of software development effort.

The whole Estimation Charade is an artifact of the way accountants exercise control over the finances.  They require (with the force of law) that budgets be written in advance.  Often in advance of the requirements being known.  When we sit down to fabricate next year's budget, we're dooming some fraction next year's projects to a scramble for funding leading to cancellation and failure.  

Accountants further require that software development be arbitrarily partitioned into "capital" and "expense".  There's no rational distinction between the phases.  The nature and scope of the work don't change at all.  

Yet.  

Somehow, the accountants are happy because some capital budget has been spent (as planned 18 months ago) and now we're spending expense budget.  From an accounting perspective, some kind of capital asset has been created.  

Think of it.  Some lines of code are a capital asset.  Other lines of code are an expense.  

Someday, I'll have to ask some accountants to explain how I can tell which was which.

Tuesday, February 14, 2012

TDRE - Test Driven Reverse Engineering Case Study

Background

Read up on compass variation or declination.  For example, this NOAA site provides some useful information.

Mariners use the magnetic variation to compute the difference between True north (i.e., aligned with the grid on the chart) and Magnetic north (i.e., aligned with the compass.)

The essential use case here is "What's the compass variation at a given point?"  The information is printed on paper charts, but it's more useful to simply calculate it.

There are two magnetic models: the US Department of Defense World Magnetic Model (WMM) and the International Association of Geomagnetism and Aeronomy (IAGA) International Geomagnetic Reference Field (IGRF).

A packaged solution is geomag7.0.  This includes both the WMM and the IGRF models.  This is quite complex.  However, it does have "sample output", which amount to unit test cases.

The essential spherical harmonic model is available separately as a small Fortran program, igrf11.f.

Which leads us to reverse engineering this program into Python.

TDRE Approach

The TDRE approach requires having some test cases to drive the reverse engineering process toward some kind of useful results.

The geomag7.0 package includes two "Sample Output" files that have the relevant unit test cases.  The file has column headings and 16 test cases.  This leads us to the following outline for the unit test application.


    class Test_Geomag( unittest.TestCase ):
        def __init__( self, row ):
            super( Test_Geomag, self ).__init__()
            self.row= row
        def runTest( self ):
            row= self.row
            if details: 
                print( "Source: {0:10s} {1} {2:7s} {3:10s} {4:10s} {5:5s} {6:5s}".format( row['Date'], row['Coord-System'], row['Altitude'], row['Latitude'], row['Longitude'], row['D_deg'], row['D_min'] ),
                file=details )
            
            date= self.parse_date( row['Date'] )
            lat= self.parse_lat_lon( row['Latitude'] )
            lon= self.parse_lat_lon( row['Longitude'] )
            alt= self.parse_altitude(row['Altitude'] )
            
            x, y, z, f = igrf11syn( date, lat*math.pi/180, lon*math.pi/180, alt, coord=row['Coord-System'] )
            D = 180.0/math.pi*math.atan2(y, x) # Declination 

            deg, min = deg2dm( D )
            
            if details: 
                print( "Result: {0:10.5f} {1} K{2:<6.1f} {3:<10.3f} {4:<10.3f} {5:5s} {6:5s}".format( date, row['Coord-System'], alt, lat, lon, str(deg)+"d", str(min)+"m" ), 
                    file=details )
                print( file=details )
            
            self.assertEqual( row['D_deg'], "{0}d".format(deg) )
            self.assertEqual( row['D_min'], "{0}m".format(min) )

    def suite():
        s= unittest.TestSuite()
        with open(sample_output,"r") as expected:
            rdr= csv.DictReader( expected, delimiter=' ', skipinitialspace=True )
            for row in rdr:
                case= Test_Geomag( row )
                s.addTest( case )
        return s

    r = unittest.TextTestRunner(sys.stdout)
    result= r.run( suite() )
    sys.exit(not result.wasSuccessful())


The Test_Geomag class does two things.  First, it parses the source values to create a usable test case.  We've omitted the parsers to reduce clutter.  Second, it produces details to help with debugging.  This is reverse engineering, and there's lots of debugging.  It depends on a global variable, details, which is either set to sys.stderr or None.

This suite() function builds a suite of test cases from the input file.

The unit under test isn't obvious, but there's a call to the igrf11syn() function where the important work gets done.  We can start with this.


def  igrf11syn( date, nlat, elong, alt=0.0, coord='D' ):
    return None, None, None, None


This lets us run the tests and find that we have work to do.

Reverse Engineering

The IGRF11.F fortran code contains this IGRF11SYN "subroutine" that does the work we want.  The geomag 7.0 package has a function called shval3 which is essentially the same thing.

Both are implementations of the same underlying "13th order spherical harmonic series" or a "truncated series expansion".

The Fortran code contains numerous Fortran "optimizations".  These are irritating hackarounds because of actual (and perceived) limitations of Fortran.  They fall into two broad classes.

  • Hand Optimizations.  All repeated expressions were manually hoisted out of their context.  This is clever but makes the code particularly obscure.  It doesn't help when local variables are named ONE, TWO and THREE.  Bad is it is, not much needs to be done about this.  Python code looks a bit like Fortran code, so very little needs to be done except add `math.` to the various function calls like sort, cos and sin.
  • Sparse Array Chicanery.  There are actually two spherical harmonic series.  The older 10-order and the new 13-order.   Each model has two sets of coefficients: g and h.  These form two half-matrices plus a vector.  The old models have 55 g values in one matrix, 55 h values in second matrix, and a set of 10 more g values that form some kind of vector; 160 values.  The new models have 91 g, 91 h and 13 g in the extra vector; 195 values.  There are 23 sets of these coefficients (for 1900, 1905, ... 2015).  The worst case is 23×195=4,485 values.  This appears to be too much memory, so the two matrices and vectors are optimized into a single opaque collection of 3,256 numbers and delightfully complex set of index calculations.
Phase 1.  Do the smallest "literal" transformation of Fortran to Python.

This means things like this:
  • Transforming the subroutine into a Python function with multiple return values.
  • Reasoning out the overall "steps".  There's a bunch of setup followed by the essential series calculation followed by some final calculations.
  • Locating and populating the global variables.
  • Reformatting the if statements.
  • Removing the GOTO's.  Either make them separate functions or properly nest the code.
  • Reformatting the do loop.
  • Handling the 1-based indexing.  In almost all cases, Fortran "arrays" are best handled as Python dictionaries (not lists).  
Once this is done, there are some remaining special-case discrepancies.  Most of these are tacit assumptions about the problem domain that turn out to be untrue.  For example, the Geodetic, Geocentric features seemed needless.  However, they're not handled trivially, and need to be left in place.  Also, conversion of signed values in radians to degrees and minutes isn't trivial. 

This leads to passing all 16 unit tests with the single opaque collection of 3,256 numbers and delightfully complex set of index calculations. 

Phase 2.  Optimize so that it makes some sense in Python.

This involves unwinding the index calculations to simplify the array.  The raw coefficients are available (igrf11coeffs.txt) and they have a sensible structure that separates the two matrices very cleanly.  The code uses the combined matrix (called gh) in a very few places.  The index calculations aren't obvious at all, but a few calls to print reveal how the matrix is accessed.  

Given (1) unit tests that already work and (2) the pattern of access, it's relatively easy to hypothesize a dictionary by year that contains a pair of simple dictionaries, g[n,m] and h[n,m], for the coefficients.

Cleanup and Packaging

Once the tests pass, the package -- as a whole -- needs to be made reasonably Pythonic.   In this case, it means a number of additional changes.  For example, converting the API from degrees to radians, supplying appropriate default values for parameters, providing convenience functions.

Additionally, there are Python ways to populate the coefficients neatly and eliminate global variables.  In this case, it seemed sensible to create a Callable class which could load the coefficients during construction.

Note that there's little point in profiling to apply further optimizations.  The legacy Fortran code was already meticulously hand optimized.  

Thursday, February 9, 2012

PDF Reading

PDF files aren't pleasant.

The good news is that they're documented (http://www.adobe.com/devnet/pdf/pdf_reference.html).

They bad news is that they're rather complex.

I found four Python packages for reading PDF files.
I elected to work with PDFMiner for two reasons.  (1) Pure Python, (2) Reasonably Complete.

This is not, however, much of an endorsement.  The implementation (while seemingly correct for my purposes) needs a fair amount of cleanup.

Here's one example of remarkably poor programming.

# Connect the parser and document objects.
parser.set_document(doc)
doc.set_parser(parser)

Only one of these two is needed; the other is trivially handled as part of the setter method.

Also, the package seems to rely on a huge volume of isinstance type checking.  It's not clear if proper polymorphism is even possible.  But some kind of filter that picked elements by type might be nicer than a lot of isinstance checks.

Annotation Extraction

While shabby, the good news is that PDFMiner seems to reliably extract the annotations on a PDF form.

In a couple of hours, I had this example of how to read a PDF document and collect the data filled into the form.

from pdfminer.pdfparser import PDFParser, PDFDocument
from pdfminer.psparser import PSLiteral
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter, PDFTextExtractionNotAllowed
from pdfminer.pdfdevice import PDFDevice
from pdfminer.pdftypes import PDFObjRef
from pdfminer.layout import LAParams, LTTextBoxHorizontal
from pdfminer.converter import PDFPageAggregator

from collections import defaultdict, namedtuple

TextBlock= namedtuple("TextBlock", ["x", "y", "text"])

class Parser( object ):
    """Parse the PDF.

    1.  Get the annotations into the self.fields dictionary.

    2.  Get the text into a dictionary of text blocks.
        The key to the dictionary is page number (1-based).
        The value in the dictionary is a sequence of items in (-y, x) order.
        That is approximately top-to-bottom, left-to-right.
    """
    def __init__( self ):
        self.fields = {}
        self.text= {}

    def load( self, open_file ):
        self.fields = {}
        self.text= {}

        # Create a PDF parser object associated with the file object.
        parser = PDFParser(open_file)
        # Create a PDF document object that stores the document structure.
        doc = PDFDocument()
        # Connect the parser and document objects.
        parser.set_document(doc)
        doc.set_parser(parser)
        # Supply the password for initialization.
        # (If no password is set, give an empty string.)
        doc.initialize('')
        # Check if the document allows text extraction. If not, abort.
        if not doc.is_extractable:
            raise PDFTextExtractionNotAllowed
        # Create a PDF resource manager object that stores shared resources.
        rsrcmgr = PDFResourceManager()
        # Set parameters for analysis.
        laparams = LAParams()
        # Create a PDF page aggregator object.
        device = PDFPageAggregator(rsrcmgr, laparams=laparams)
        # Create a PDF interpreter object.
        interpreter = PDFPageInterpreter(rsrcmgr, device)

        # Process each page contained in the document.
        for pgnum, page in enumerate( doc.get_pages() ):
            interpreter.process_page(page)
            if page.annots:
                self._build_annotations( page )
            txt= self._get_text( device )
            self.text[pgnum+1]= txt

    def _build_annotations( self, page ):
        for annot in page.annots.resolve():
            if isinstance( annot, PDFObjRef ):
                annot= annot.resolve()
                assert annot['Type'].name == "Annot", repr(annot)
                if annot['Subtype'].name == "Widget":
                    if annot['FT'].name == "Btn":
                        assert annot['T'] not in self.fields
                        self.fields[ annot['T'] ] = annot['V'].name
                    elif annot['FT'].name == "Tx":
                        assert annot['T'] not in self.fields
                        self.fields[ annot['T'] ] = annot['V']
                    elif annot['FT'].name == "Ch":
                        assert annot['T'] not in self.fields
                        self.fields[ annot['T'] ] = annot['V']
                        # Alternative choices in annot['Opt'] )
                    else:
                        raise Exception( "Unknown Widget" )
            else:
                raise Exception( "Unknown Annotation" )
    def _get_text( self, device ):
        text= []
        layout = device.get_result()
        for obj in layout:
            if isinstance( obj, LTTextBoxHorizontal ):
                if obj.get_text().strip():
                    text.append( TextBlock(obj.x0, obj.y1, obj.get_text().strip()) )
        text.sort( key=lambda row: (-row.y, row.x) )
        return text
    def is_recognized( self ):
        """Check for Copyright as well as Revision information on each page."""
        bottom_page_1 = self.text[1][-3:]
        bottom_page_2 = self.text[2][-3:]
        pg1_rev= "Rev 2011.01.17" == bottom_page_1[2].text
        pg2_rev= "Rev 2011.01.17" == bottom_page_2[0].text
        return pg1_rev and pg2_rev 

This gives us a dictionary of field names and values.  Essentially transforming the PDF form into the same kind of data that comes from an HTML POST request.

An important part is that we don't want much of the background text.  Just enough to confirm the version of the form file itself.

The cryptic text.sort( key=lambda row: (-row.y, row.x) ) will sort the text blocks into order from top-to-bottom and left-to-right.  For the most part, a page footer will show up last.  This is not guaranteed, however.  In a multi-column layout, the footer can be so close to the bottom of a column that PDFMiner may put the two text blocks together.

The other unfortunate part is the extremely long (and opaque) setup required to get the data from the page.

Tuesday, February 7, 2012

Multiprocessing Goodness -- Part 2 -- Class Defintions

The multiprocessing module includes a generic Process class, which can be used to wrap a simple function.

The function must be designed to work with Queues or Pipelines or other synchronization techniques.

There's an advantage, however, to defining a class which gracefully handles generator functions.  If we have Generator-Aware multi-processing, we can (1) write our algorithms as generators and then (2) trivially connect Processes with Queues to improve scalability.

We're looking at creating processing "pipelines" using Queues.  That way we can easily handle multiple-producer and multiple-consumer (fan-in, fan-out) processing that enhances concurrency.

See Multiprocessing Goodness -- Part 1 -- Use Cases for more information.

We have three use cases:  Producer, Consumer and Consumer-Producer.

Producer

A Producer gets data from somewhere and populates a queue with it.  This is the source that feeds data into the pipeline.


class ProducerProcess( Process ):
    """Produces items into a Queue.
    
    The "target" must be a generator function which yields
    pickable items.
    """
    def __init__( self, group=None, target=None, name=None, args=None, kwargs=None, output_queue=None, consumers=0 ):
        super( ProducerProcess, self ).__init__( name=name )
        self.target= target
        self.args= args if args is not None else []
        self.kwargs= kwargs if kwargs is not None else {}
        self.output_queue= output_queue
        self.consumers= consumers
    def run( self ):
        target= self.target
        for item in target(*self.args, **self.kwargs):
            self.output_queue.put( item )
        for x in range(self.consumers):
            self.output_queue.put( None )
        self.output_queue.close()


This class will wrap a "target" function which must be a generator.   Every value yielded is put into the "output_queue".  When the source data runs out, enough sentinel tokens are put into the queue to satisfy all consumers.

Consumer

A Consumer gets data from a queue and does some final processing.  Perhaps it loads a database, or writes a file.  It is the sink that consumes data on the pipeline.


class ConsumerProcess( Process ):
    """Consumes items from a Queue.
    
    The "target" must be a function which expects an iterable as it's
    only argument.  Therefore, the args value is not used here.
    """
    def __init__( self, group=None, target=None, name=None, kwargs=None, input_queue=None, producers=0 ):
        super( ConsumerProcess, self ).__init__( name=name )
        self.target= target
        self.kwargs= kwargs if kwargs is not None else {}
        self.input_queue= input_queue
        self.producers= producers
    def items( self ):
        while self.producers != 0:
            for item in iter( self.input_queue.get, None ):
                yield item
            self.producers -= 1
    def run( self ):
        target= self.target
        target( self.items(), **self.kwargs )


This class will wrap a "target" function which must be ready to work with any iterable.  Every value from the queue will be provided to the target function for processing.  When enough sentinel tokens have been consumed from producers, it terminates processing.

Consumer-Producer

The middle of a processing pipeline is consumer-producer processes which consume from one queue and the produce to another queue.


        
class ConsumerProducerProcess( Process ):
    """Consumes items from a Queue and produces items onto a Queue.
    
    The "target" must be a generator function which yields
    pickable items and which expects an iterable as it's
    only argument.  Therefore, the args value is not used here.
    """
    def __init__( self, group=None, target=None, name=None, kwargs=None, input_queue=None, producers=0, output_queue=None, consumers=0 ):
        super( ConsumerProducerProcess, self ).__init__( name=name )
        self.target= target
        self.kwargs= kwargs if kwargs is not None else {}
        self.input_queue= input_queue
        self.producers= producers
        self.output_queue= output_queue
        self.consumers= consumers
    def items( self ):
        while self.producers != 0:
            for item in iter( self.input_queue.get, None ):
                yield item
            self.producers -= 1
    def run( self ):
        target= self.target
        for item in target(self.items(), **self.kwargs):
            self.output_queue.put( item )
        for x in range(self.consumers):
            self.output_queue.put( None )
        self.output_queue.close()


This class will wrap a "target" function which must be a generator function that consumes an iterable.
Every value from the queue is provided to the target generator.  Every value yielded by the generator is sent to the output queue.  The input side counts sentinels to know when to stop.  The output side produces enough sentinels to alert downstream processes.

Target Functions

A producer function must be a generator function of this form


def prod( *args ):
    for item in some_function(*args):
       yield item


A consumer function looks like this:


def cons( source ):
    for item in source:
       final_disposition(item)


Finally, a consumer-producer function looks like this.

def cons_prod( source ):
    for item in source:
       next_value= transform(item)
       yield next_value


These functions can be tested and debugged like this.


for final in consumer( cons_prod( producer( *args ) ) ):
    print( final )


That way we're confident that our algorithm is correct before attempting to scale it with multiprocessing.


Thursday, February 2, 2012

Multiprocessing Goodness -- Part 1 -- Use Case

The advantage of multiprocessing is to have multiple processes working on a problem.  If we break a big problem into small, concurrent steps, we can often get results in less elapsed time by making more effective use of the CPU.  Specifically, we want to make use of non-user time where our process might be waiting for something on the network or waiting for physical I/O to finish.

There are limits on the speedup offered by multiprocessing.  Once utilization gets to 100%×cores, we can't go any faster.  However, there are numerous processes that do a lot of I/O or a lot of network access; we can use Python's multiprocessing module to make more effective use of our CPU.

The easiest approach to multiprocessing is to use the shell's pipeline philosophy.  Break the processing up into small steps, each of which reads from a source stream and writes to an output stream.  The long-standing tradition here is to read from `sys.stdin` and write to `sys.stdout`.  The multiprocessing module, however, gives us tools to achieve this with relatively little pain.

Rather than use a simple pipe, however, we need to use a multiprocessing.Queue.  In shell parlance we might have `func1 | func2 | func3`.

For multiprocessing purposes, we'd have something a hair more complex looking.


    q1 = Queue()
    q2 = Queue()
    p1 = Process( target=func1, kwargs=dict(output=q1))
    p2 = Process( target=func2, kwargs=dict(input=q1, output=q2))
    p3 = Process( target=func3, kwargs=dict(input=q2))
    p1.start()
    p2.start()
    p3.start()


While wordy, it hints at a more generalized approach to have three processes passing data.

Termination

The issue is one of termination.  Most multiprocessing packages (like multiprocessing and celery) presume that your processing pipeline has a fairly long lifetime.  Because of this, it presumes that you can determine that it's idle and kill it off one process at a time.

This isn't a bad assumption, and probably covers a large number of use cases.

It doesn't, however, cover the simple shell-like `func1 | func2 | func3` use case very well at all.

Why not?

Because we can't easily tell when a queue is shutdown for good and all.  A producing process can close a queue, but that's not a piece of information that shows up at the consumer end of the queue.

Queues are designed to be durable and have multiple produces.  There's no easy way to know a Queue is no longer needed.  Each producer would have to attempt to close the Queue and the Queue would have to know the intended number of producers.  If processes are dynamic, then the number of producers may not have a fixed, known-in-advance limit.

The approach, therefore, is to put a sentinel object in the queue.  This way, a consumer knows that production has finished.  It can release resources and exit politely.

Fan-Out and Fan-In

The problem with a sentinel on a multi-producer queue is that there will be multiple sentinels, one from each producer.  And, of course, with a multi-consumer queue, there must be one sentinels for each consumer.

If producers adhere to a sentinel-per-consumer rule, and consumers know to expect a sentinel-per-producer, then we can easily create dynamic multi-processing networks that startup and shutdown quickly and cleanly.

Use Case

Here's a use case.  We want to do whois analysis on IP addresses in a log.

If we have a simple loop to parse the log and do a whois request on each host IP address, the processing will be slow.  It uses approximately no CPU, since it spends almost all of it's time waiting for input from the log, waiting for whois, or waiting for buffers to be written in the output file.

If we make a simple three-step pipeline (parse | whois | report) then we get some improvement in elapsed time, but -- really -- the whois step is killing the throughput.

What we need is a way to run a dozen whois requests concurrently.  This leads us to multiprocessing, fan-out and fan-in.

Here's what we want.


def analyze_ip( logs ):
    user_queue = Queue()
    report_queue= Queue()
    
    user_from_log= ProducerProcess( name='book_users', target=book_users, args=(logs,), output_queue=user_queue, consumers=12 )
    user_from_log.start()
    
    workers= []
    for worker in range(12):
        get_details= ConsumerProducerProcess( name='user_whois', target=user_whois, kwargs=dict(LIVE=False),
        input_queue=user_queue, output_queue=report_queue, producers=1, consumers=1 )
        get_details.start()
        workers.append(get_details)
    
    report= ConsumerProcess( name='final_report', target=final_report, 
        input_queue=report_queue, producers=12 )
    report.start()
                
    user_from_log.join()
    for w in workers:
        w.join()
    report.join()

This will do a number of concurrent whois requests, tying up lots and lots of resources and (hopefully) saturating the CPU with real work.

This shows a fan-out from one ProducerProcess to a dozen ConsumerProducerProcess instances.  It shows a fan-in from the ConsumerProducerProcess to a single ConsumerProcess that writes the final report.

This is trivially scaled up (or done) by changing the number of processes in the middle.

What's important is that the actual functions involved (book_users, user_whois and final_report) are relatively trivial generator functions that consume source data (log files or queue entries) and produce results (queue entries or a report file.)

Also important is the fact that it closes down cleanly.  When the input reaches end-of-file, sentinel values are put into the queues to trickle through and lead to orderly process shutdown.