# Subscribing and listening to ZeroMQ messages Have you wondered how blockexplorer displays new blocks or how new transactions show up in wallets[^wallets-note]? ![block explorer](2022-12-06.zmq/explorer.mp4) It's not a rocket science, they connect to [websocket](https://en.wikipedia.org/wiki/WebSocket) exposed via REST server and subscribe to specific 'topics'. ![developer tools](2022-12-06.zmq/01.websocket.png width=600) Where does REST server get this information from? Setting details aside, REST acts as a proxy, it connects to [ZeroMQ](https://zeromq.org) endpoint exposed by `catapult.broker` process. In this short article, I'll show you how to skip **the middleman** and listen directly to what broker has to tell us. ## Preparing for the journey If you're not running a node - the question is - why not? If you are, you're probably running typical _"Dual"_ node (peer+api node) and your `docker-compose.yml` contains - at least - those 4 services (containers): * db - this is container running mongo database * node - this container is running actual catapult client set up in Dual mode * broker - obviously this one runs `broker` process * finally rest-gateway - runs node.js REST layer Now REST can talk to broker, cause within `docker-compose.yml` they're using same network, but if **you** want to talk with broker, you have two options: 1. run tool that we'll discuss later inside broker container OR 2. expose broker port inside `docker-compose.yml`, like so (to be able to connect from host): ```yaml broker: user: '1000:1000' container_name: broker image: symbolplatform/symbol-server:gcc-1.0.3.5 working_dir: /symbol-workdir ports: - '127.0.0.1:7902:7902' <=============================================== command: /bin/bash /symbol-commands/start.sh /usr/catapult ./data broker server broker NORMAL stop_signal: SIGINT restart: on-failure:2 volumes: - ../nodes/node:/symbol-workdir:rw - ./server:/symbol-commands:ro depends_on: - db ``` Note that this will expose broker only on loopback (127.0.0.1) interface, as exposing it to the whole world is likely not the greatest idea. ## We don't even talk anymore I'll be using python, mostly cause current `dev` branch can nicely read block headers, but any language that has [ZMQ bindings](https://zeromq.org/get-started/#pick-your-language) will do the trick. ### Initialization ZMQ bindings are in `zmq` module (`python3 -m pip install zmq`). Connecting to broker is straightforward; broker is using pub-sub messaging pattern, which means that created listener needs to be `SUB` listener: ```py import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:7902") ``` And that's all that's needed. ### Subscribing to topics Now it's time to subscribe to different topics. Note: most likely you'll only be interested in few of them not all of them. I've selected a few to give good overview. ```py block_marker = unhexlify('9FF2D8E480CA6A49')[::-1] finalized_marker = unhexlify('4D4832A031CE7954')[::-1] drop_marker = unhexlify('5C20D68AEE25B0B0')[::-1] transaction_marker = b'a' # 0x61 ut_add_marker = b'u' # 0x75 socket.setsockopt(zmq.SUBSCRIBE, block_marker) socket.setsockopt(zmq.SUBSCRIBE, finalized_marker) socket.setsockopt(zmq.SUBSCRIBE, drop_marker) socket.setsockopt(zmq.SUBSCRIBE, transaction_marker) socket.setsockopt(zmq.SUBSCRIBE, ut_add_marker) ``` Where are all those markers coming from? I'm glad you're asking. We've documented it in _Technical Reference_ in chapter 17. Messaging. | Topic marker name | Topic marker | |-------------------|--------------| | Block | `0x9FF2D8E480CA6A49` | | Drop blocks | `0x5C20D68AEE25B0B0` | | Finalized block | `0x4D4832A031CE7954` | | Transaction | `0x61` | | Unconfirmed transaction add | `0x75` | | Unconfirmed transaction remove | `0x72` | | Partial transaction add | `0x70` | | Partial transaction remove | `0x71` | | Transaction status | `0x73` | | Cosignature | `0x63` | ### Parsing published messages Parsing is pretty straightforward, below some types and objects from SDK are used, but that's just for slightly nicer display. ```py while True: topic = socket.recv() if block_marker == topic: block_header = socket.recv() entity_hash = Hash256(socket.recv()) generation_hash = Hash256(socket.recv()) header = BlockFactory.deserialize(block_header) print(f'block height: {header.height} ({header.height.value}) entity_hash {entity_hash} generation_hash {generation_hash}') print(f'block harvested by: {header.signer_public_key} {facade.network.public_key_to_address(header.signer_public_key)}') elif finalized_marker == topic: body_part_1 = socket.recv() finalization_round = int.from_bytes(body_part_1[0:8], byteorder='little') finalizated_height = int.from_bytes(body_part_1[8:16], byteorder='little') entity_hash = Hash256(body_part_1[16:]) print(f'FINALIZED height: {finalization_round} ({finalizated_height}) entity_hash {entity_hash}') elif drop_marker == topic: body_part_1 = socket.recv() height = int.from_bytes(body_part_1[0:8], byteorder='little') print(f'drop after height: {height}') elif ut_add_marker[0] == topic[0] or transaction_marker[0] == topic[0]: # mind [0] message = 'UT add' if ut_add_marker[0] == topic[0] else 'transaction add' # rest of the topic contains address address = SymbolFacade.Address(topic[1:]) transaction = socket.recv() entity_hash = Hash256(socket.recv()) merkle_component_hash = Hash256(socket.recv()) body_part_1 = socket.recv() height = int.from_bytes(body_part_1[0:8], byteorder='little') print(f'{message} {address} {entity_hash} {height}') else: print("unknown [ %d %s ]" % (len(topic), topic)) ``` One thing that might not be clear is how many times `socket.recv()` should be called per given topic. It depends how actual message is constructed, but we've documented it as well, instead of explaining it, I'll try to show it using pieces of documentation: ![block message layout](2022-12-06.zmq/02.block.png width=600) ![finalized block message layout](2022-12-06.zmq/03.finalized.png width=600) ![transaction add message (mempool)](2022-12-06.zmq/04.transaction.png width=600) One thing worth noting is how finalized block message data is all within single 'packet'. And if **you** got here, there's one more thing worth mentioning. In previous section, I was subscribing to "general" transactions topic with: `socket.setsockopt(zmq.SUBSCRIBE, transaction_marker)` But if you take a look at images (or even better _Technical Reference_), address is a part of a topic. That means you can subscribe to messages that are targetting specific account e.g.: ```py transaction_marker = b'a' address = SymbolFacade.Address('NCHVMMCVPZGUWZTWTLNH46OFRM2QIPILE4SKZEA') scoped_address_marker = transaction_marker + address.bytes socket.setsockopt(zmq.SUBSCRIBE, scoped_address_marker) ``` Finally, bit important detail: I'm using BlockFactory to deserialize block_header, but there's ## Code, output and some more random comments You can find whole code here: https://gist.github.com/gimre-xymcity/718cc15d9e9c3ff48a493bcfb7986834 Let's take a look at piece of output: ```diff block height: 0x00000000001BB327 (1815335) entity_hash C8E617712A81D5DC13DC36C699E5DCA42C11972120E7A4DB37187E9C22941FE3 generation_hash FFFED500BCB2522FA892265D4F379AFC55CD1A9155F8FBB2DB43D30C5C99DD82 block harvested by: 44E0DB9EC1FF08C392AAC8A2A787E68C2C8F36324E1065D90D976576580E7EA6 NCE5QOGVUM6ZHJIYXTA6NHZYJUHMBNRNVMG2L4I + transaction add NA6JCCGCVLTNCXFP6ZZHCEKIQN252LEWQMULS5Q 09B00910A2A55ADAFD79AB9CD1170E14B792B01AA64EC10EF263A1AB982CFE89 1815335 + transaction add NA2NFUHQWYIASA5BHFJBM6OBQDEZDI34RUMNDHA 09B00910A2A55ADAFD79AB9CD1170E14B792B01AA64EC10EF263A1AB982CFE89 1815335 + transaction add NB67BYHT34LHNPCEPUVIIHPNXZE7FRTX5BHQJVA 09B00910A2A55ADAFD79AB9CD1170E14B792B01AA64EC10EF263A1AB982CFE89 1815335 block height: 0x00000000001BB328 (1815336) entity_hash 9EE91BB4BFE409A11789D5FDD398384BC86C528208C50E21884B6139E14E874E generation_hash FFED290A09BA44F1352F93BE6AA5FEF6400C7D1D6A69EC2662B98F4B12A9EC28 block harvested by: 68ADEC7660181CD266F97BABAB6C9905D0DD7F669C2B107BBFB68B98074CCB9B NDGGHWO5PXID32IPA2C3EZCIMC7WHOZTXZDLTYY ``` What you can observe here is that single notification resulted in 3 published messages (marked for readability). If you've read previous part until the end, you might know already why. All those 3 messages are sent with 3 different topics? Why? The answer is sort of obvious, the transaction in question is an aggregate transaction and `broker` must notify all accounts involved: * multisig account: `NA2NFUHQWYIASA5BHFJBM6OBQDEZDI34RUMNDHA` * sender (cosignatory): `NB67BYHT34LHNPCEPUVIIHPNXZE7FRTX5BHQJVA` * destination account: `NA6JCCGCVLTNCXFP6ZZHCEKIQN252LEWQMULS5Q` This gets _a little_ noisy with larger aggregates (or more cosignatories) - that's also reason, why REST layer is supposed to subscribe only to 'scoped' topics (so including addresses). ## Mongo-less broker (optional / fun stuff) As we've repeated multiple times, catapult is _very_ flexible when it comes to setting it up. To snoop on zmq messages, you can pretty easily setup node, that will run `server` and `broker` but without mongo (and without REST, as REST layer is quite useless without mongo). 1. `catapult.server` needs to be configured like in usual Dual mode setup, this means that `filespooling` and `partialtransaction` extensions should be set to true in `config-extensions-server.properties` ``` [extensions] # api extensions extension.filespooling = true extension.partialtransaction = true # addressextraction must be first because mongo and zeromq depend on extracted addresses extension.addressextraction = false extension.mongo = false extension.zeromq = false ... ``` 2. `broker` needs zeromq but not mongo, `config-extensions-broker.properties` ``` # addressextraction must be first because mongo and zeromq depend on extracted addresses extension.addressextraction = true extension.mongo = false extension.zeromq = true extension.hashcache = true ``` 3. last but not least, `docker-compose.yml` should only contain entries for `server` and `broker` services (containers) Sidenote, if anyone in future would like to push data to different database, the only thing that's needed is writing proper extension, that will be loaded through broker. --- [^wallets-note]: not all wallets might support this, but they usually support at least multisig showing