Konubinix' opinionated web of thoughts

To Use Consul as Pub Sub

Fleeting

to use consul as pub sub

using consul.py

import base64
import json

from consul import Consul


def events_walk(self, name=None, index=None, wait="24h", token=None, dump_old=False):
    events = self.event.list(name=name, index=index, token=token)

    def payload(event):
        payload = json.loads(base64.b64decode(event["Payload"]))
        payload["name"] = name
        return payload

    if dump_old:
        for event in events[1]:
            yield payload(event)

    while True:
        new_events = self.event.list(name=name, wait=wait, index=events[0], token=token)
        if new_events[0] == events[0]:
            print("nothing new")
            continue
        events = new_events
        # it repeats the old events. Only the last one is of interest
        yield payload(events[1][-1])


Consul.walk = events_walk

Avoiding using the out of date consul.py, but still getting inspiration from it.

import base64
import json
import os

import requests


def event_list(name=None, index=None, wait=None):
    url = f"{os.environ['CONSUL_HTTP_ADDR']}/v1/event/list?name={name}"
    if index:
        url += f"&index={index}"
    if wait:
        url += f"&wait={wait}"
    resp = requests.get(url)
    if resp.status_code // 100 != 2:
        raise Exception(resp)
    content = resp.json()
    index = resp.headers["X-Consul-Index"]
    return index, content

def events_walk(name=None, index=None, wait="24h", token=None, dump_old=False):
    events = event_list(name=name, index=index)

    def payload(event):
        payload = json.loads(base64.b64decode(event["Payload"]))
        payload["name"] = name
        return payload

    if dump_old:
        for event in events[1]:
            yield payload(event)

    while True:
        new_events = event_list(name=name, wait=wait, index=events[0])
        if new_events[0] == events[0]:
            print("nothing new")
            continue
        events = new_events
        # it repeats the old events. Only the last one is of interest
        yield payload(events[1][-1])

same, async

import base64
import json
import os

import httpx


async def event_list(name=None, index=None, wait=None):
    url = f"{os.environ['CONSUL_HTTP_ADDR']}/v1/event/list?name={name}"
    if index:
        url += f"&index={index}"
    if wait:
        url += f"&wait={wait}s"
    print("fetching", url)
    async with httpx.AsyncClient() as client:
        resp = await client.get(url, timeout=wait)
    if resp.status_code // 100 != 2:
        raise Exception(resp)
    content = resp.json()
    index = resp.headers["X-Consul-Index"]
    return index, content


async def events_walk(
    name=None, index=None, wait=24 * 3600, token=None, dump_old=False
):
    events = await event_list(name=name, index=index)

    def payload(event):
        payload = json.loads(base64.b64decode(event["Payload"]))
        payload["name"] = name
        return payload

    if dump_old:
        for event in events[1]:
            yield payload(event)

    while True:
        new_events = await event_list(name=name, wait=wait, index=events[0])
        if new_events[0] == events[0]:
            print("nothing new")
            continue
        events = new_events
        # it repeats the old events. Only the last one is of interest
        yield payload(events[1][-1])

same, with curl and jq, to work in shell scripts

# suppose CONSUL_HTTP_ADDR is already set
event_list() {
    local name=$1
    local index=$2
    local wait=$3
    local url="$CONSUL_HTTP_ADDR/v1/event/list?name=$name"
    if [ -n "$index" ]; then
        url+="&index=$index"
    fi
    if [ -n "$wait" ]; then
        url+="&wait=$wait"
    fi
    curl -s -D - "$url" | awk 'NR==1,/^\r$/' | grep 'X-Consul-Index:' | awk '{print $2}' | tr -d '\r'
    curl -s "$url"
}

event_walker () {
    local name=$1
    local index=""
    local wait="24h"
    local dump_old=$2

    out=$(event_list "$name" "$index")
    index=${out%%$'\n'*}
    events_json=${out#*$'\n'}

    if [ "$dump_old" = "true" ]; then
        echo "$events_json" | jq -c '.[]' | while read -r event; do
            local payload=$(echo "$event" | jq -r '.Payload' | base64 --decode)
            echo "{\"name\":\"$name\", $(echo "$payload" | jq -c '.')}"
        done
    fi

    while true; do
        out=$(event_list "$name" "$index" "$wait")
        new_index=${out%%$'\n'*}
        new_events_json=${out#*$'\n'}
        if [ "$new_index" = "$index" ]; then
            echo "nothing new"
            continue
        fi
        index=$new_index
        local last_event=$(echo "$new_events_json" | jq -c '.[-1]')
        local payload=$(echo "$last_event" | jq -r '.Payload' | base64 --decode)
        echo "{\"name\":\"$name\", $(echo "$payload" | jq -c '.')}"
    done
}