Chapter 8 ■ CaChes and Message Queues
14 6
Listing 8-3. A ØMQ Messaging Fabric Linking Five Different Workers
#!/usr/bin/env python3
Foundations of Python Network Programming, Third Edition
https://github.com/brandon-rhodes/fopnp/blob/m/py3/chapter08/queuecrazy.py
Small application that uses several different message queues
import random, threading, time, zmq
B = 32 # number of bits of precision in each random integer
def ones_and_zeros(digits):
"""Express n
in at least d
binary digits, with no special prefix."""
return bin(random.getrandbits(digits)).lstrip('0b').zfill(digits)
def bitsource(zcontext, url):
"""Produce random points in the unit square."""
zsock = zcontext.socket(zmq.PUB)
zsock.bind(url)
while True:
zsock.send_string(ones_and_zeros(B * 2))
time.sleep(0.01)
def always_yes(zcontext, in_url, out_url):
"""Coordinates in the lower-left quadrant are inside the unit circle."""
isock = zcontext.socket(zmq.SUB)
isock.connect(in_url)
isock.setsockopt(zmq.SUBSCRIBE, b'00')
osock = zcontext.socket(zmq.PUSH)
osock.connect(out_url)
while True:
isock.recv_string()
osock.send_string('Y')
def judge(zcontext, in_url, pythagoras_url, out_url):
"""Determine whether each input coordinate is inside the unit circle."""
isock = zcontext.socket(zmq.SUB)
isock.connect(in_url)
for prefix in b'01', b'10', b'11':
isock.setsockopt(zmq.SUBSCRIBE, prefix)
psock = zcontext.socket(zmq.REQ)
psock.connect(pythagoras_url)
osock = zcontext.socket(zmq.PUSH)
osock.connect(out_url)
unit = 2 * (B 2)
while True:
bits = isock.recv_string()
n, m = int(bits[::2], 2), int(bits[1::2], 2)
psock.send_json((n, m))
sumsquares = psock.recv_json()
osock.send_string('Y' if sumsquares < unit else 'N')