Chapter 8 ■ CaChes and Message Queues
147
def pythagoras(zcontext, url):
"""Return the sum-of-squares of number sequences."""
zsock = zcontext.socket(zmq.REP)
zsock.bind(url)
while True:
numbers = zsock.recv_json()
zsock.send_json(sum(n * n for n in numbers))
def tally(zcontext, url):
"""Tally how many points fall within the unit circle, and print pi."""
zsock = zcontext.socket(zmq.PULL)
zsock.bind(url)
p = q = 0
while True:
decision = zsock.recv_string()
q += 1
if decision == 'Y':
p += 4
print(decision, p / q)
def start_thread(function, *args):
thread = threading.Thread(target=function, args=args)
thread.daemon = True # so you can easily Ctrl-C the whole program
thread.start()
def main(zcontext):
pubsub = 'tcp://127.0.0.1:6700'
reqrep = 'tcp://127.0.0.1:6701'
pushpull = 'tcp://127.0.0.1:6702'
start_thread(bitsource, zcontext, pubsub)
start_thread(always_yes, zcontext, pubsub, pushpull)
start_thread(judge, zcontext, pubsub, reqrep, pushpull)
start_thread(pythagoras, zcontext, reqrep)
start_thread(tally, zcontext, pushpull)
time.sleep(30)
if name == 'main':
main(zmq.Context())
Every one of these threads is careful to create its own socket or sockets for communication since it is not safe for
two threads to try to share a single messaging socket. But the threads do share a single context object, which assures
they all exist within what you might call a shared arena of URLs, messages, and queues. You will typically want to
create only a single ØMQ context per process.
Even though these sockets are offering methods with names similar to familiar socket operations such as recv()
and send(), keep in mind that they have different semantics. Messages are kept in order and are never duplicated, but
they are cleanly delimited as separate messages instead of being lost in a continuous stream.
This example is obviously contrived so that, within a few lines of code, you have an excuse to use most of the
major messaging patterns offered by a typical queue. The connections that always_yes and the judge make to the
bitsource form a publish-subscribe system, where every connected client receives its own copy of every message
sent by the publisher (minus, in this case, any messages that wind up being filtered out). Each filter applied to a ØMQ
socket adds, not subtracts, to the total number of messages received by opting in to every message whose first few