Konubinix' opinionated web of thoughts

To Use Consul as Pub Sub

Fleeting

to use consul as pub sub

using consul.py

import base64
import json

from consul import Consul


def events_walk(self, name=None, index=None, wait="24h", token=None, dump_old=False):
    events = self.event.list(name=name, index=index, token=token)

    def payload(event):
        payload = json.loads(base64.b64decode(event["Payload"]))
        payload["name"] = name
        return payload

    if dump_old:
        for event in events[1]:
            yield payload(event)

    while True:
        new_events = self.event.list(name=name, wait=wait, index=events[0], token=token)
        if new_events[0] == events[0]:
            print("nothing new")
            continue
        events = new_events
        # it repeats the old events. Only the last one is of interest
        yield payload(events[1][-1])


Consul.walk = events_walk

Avoiding using the out of date consul.py, but still getting inspiration from it.

import base64
import json
import os

import requests


def event_list(name=None, index=None, wait=None):
    url = f"{os.environ['CONSUL_HTTP_ADDR']}/v1/event/list?name={name}"
    if index:
        url += f"&index={index}"
    if wait:
        url += f"&wait={wait}"
    resp = requests.get(url)
    if resp.status_code // 100 != 2:
        raise Exception(resp)
    content = resp.json()
    index = resp.headers["X-Consul-Index"]
    return index, content

def events_walk(name=None, index=None, wait="24h", token=None, dump_old=False):
    events = event_list(name=name, index=index)

    def payload(event):
        payload = json.loads(base64.b64decode(event["Payload"]))
        return payload

    if dump_old:
        for event in events[1]:
            yield payload(event)

    while True:
        new_events = event_list(name=name, wait=wait, index=events[0])
        if new_events[0] == events[0]:
            print("nothing new")
            continue
        events = new_events
        # it repeats the old events. Only the last one is of interest
        yield payload(events[1][-1])

same, async

import json
import os
from base64 import b64decode

import httpx


async def event_list(name=None, index=None, wait=None):
    url = f"{os.environ['CONSUL_HTTP_ADDR']}/v1/event/list?name={name}"
    if index:
        url += f"&index={index}"
    if wait:
        url += f"&wait={wait}s"
    print("fetching", url)
    async with httpx.AsyncClient() as client:
        resp = await client.get(url, timeout=wait)
    if resp.status_code // 100 != 2:
        raise Exception(resp)
    content = resp.json()
    index = resp.headers["X-Consul-Index"]
    return index, content


async def events_walk(
    name=None, index=None, wait=24 * 3600, token=None, dump_old=False
):
    def payload(event):
        payload = json.loads(b64decode(event["Payload"]))
        payload["name"] = name
        return payload

    events = await event_list(name=name, index=index)
    index = events[0]

    if dump_old:
        for event in events[1]:
            yield payload(event)

    while True:
        new_events = await event_list(name=name, wait=wait, index=index)
        new_index = new_events[0]

        if new_index == index:
            print("nothing new")
            continue
        elif new_index < index:
            # Reset the index if it goes backwards -> https://developer.hashicorp.com/consul/api-docs/features/blocking
            index = 0
        elif index <= 1:
            # clients should sanity check that their index is at least 1 after
            # each blocking response is handled (Sanity check index is greater
            # than zero) -> https://developer.hashicorp.com/consul/api-docs/features/blocking
            raise NotImplementedError()
        else:
            index = new_index

        events = new_events
        # it repeats the old events. Only the last one is of interest
        yield payload(events[1][-1])

same, with curl and jq, to work in shell scripts

# suppose CONSUL_HTTP_ADDR is already set
event_list() {
    local name=$1
    local index=$2
    local wait="${3-}"
    local url="$CONSUL_HTTP_ADDR/v1/event/list?name=$name"
    if [ -n "$index" ]; then
        url="${url}&index=$index"
    fi
    if [ -n "$wait" ]; then
        url="${url}&wait=$wait"
    fi
    curl -s -D - "$url" | awk 'NR==1,/^\r$/' | grep 'X-Consul-Index:' | awk '{print $2}' | tr -d '\r'
    curl -s "$url"
}

event_walker () {
    local name=$1
    local index=""
    local wait="24h"
    local dump_old="${2-}"

    out=$(event_list "$name" "$index")
    index=${out%%$'\n'*}
    events_json=${out#*$'\n'}

    if [ "$dump_old" = "true" ]; then
        echo "$events_json" | jq -c '.[]' | while read -r event; do
            local payload=$(echo "$event" | jq -r '.Payload' | base64 -d)
            echo "$payload"
        done
    fi

    while true; do
        out=$(event_list "$name" "$index" "$wait")
        new_index=${out%%$'\n'*}
        new_events_json=${out#*$'\n'}
        if [ "$new_index" = "$index" ]; then
            echo "nothing new" >&2
            continue
        fi
        index=$new_index
        local last_event=$(echo "$new_events_json" | jq -c '.[-1]')
        local payload=$(echo "$last_event" | jq -r '.Payload' | base64 -d)
        echo "${payload}"
    done
}