To Use Consul as Pub Sub
Fleetingto use consul as pub sub
using consul.py
import base64
import json
from consul import Consul
def events_walk(self, name=None, index=None, wait="24h", token=None, dump_old=False):
events = self.event.list(name=name, index=index, token=token)
def payload(event):
payload = json.loads(base64.b64decode(event["Payload"]))
payload["name"] = name
return payload
if dump_old:
for event in events[1]:
yield payload(event)
while True:
new_events = self.event.list(name=name, wait=wait, index=events[0], token=token)
if new_events[0] == events[0]:
print("nothing new")
continue
events = new_events
# it repeats the old events. Only the last one is of interest
yield payload(events[1][-1])
Consul.walk = events_walk
Avoiding using the out of date consul.py, but still getting inspiration from it.
import base64
import json
import os
import requests
def event_list(name=None, index=None, wait=None):
url = f"{os.environ['CONSUL_HTTP_ADDR']}/v1/event/list?name={name}"
if index:
url += f"&index={index}"
if wait:
url += f"&wait={wait}"
resp = requests.get(url)
if resp.status_code // 100 != 2:
raise Exception(resp)
content = resp.json()
index = resp.headers["X-Consul-Index"]
return index, content
def events_walk(name=None, index=None, wait="24h", token=None, dump_old=False):
events = event_list(name=name, index=index)
def payload(event):
payload = json.loads(base64.b64decode(event["Payload"]))
payload["name"] = name
return payload
if dump_old:
for event in events[1]:
yield payload(event)
while True:
new_events = event_list(name=name, wait=wait, index=events[0])
if new_events[0] == events[0]:
print("nothing new")
continue
events = new_events
# it repeats the old events. Only the last one is of interest
yield payload(events[1][-1])
same, async
import base64
import json
import os
import httpx
async def event_list(name=None, index=None, wait=None):
url = f"{os.environ['CONSUL_HTTP_ADDR']}/v1/event/list?name={name}"
if index:
url += f"&index={index}"
if wait:
url += f"&wait={wait}s"
print("fetching", url)
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=wait)
if resp.status_code // 100 != 2:
raise Exception(resp)
content = resp.json()
index = resp.headers["X-Consul-Index"]
return index, content
async def events_walk(
name=None, index=None, wait=24 * 3600, token=None, dump_old=False
):
events = await event_list(name=name, index=index)
def payload(event):
payload = json.loads(base64.b64decode(event["Payload"]))
payload["name"] = name
return payload
if dump_old:
for event in events[1]:
yield payload(event)
while True:
new_events = await event_list(name=name, wait=wait, index=events[0])
if new_events[0] == events[0]:
print("nothing new")
continue
events = new_events
# it repeats the old events. Only the last one is of interest
yield payload(events[1][-1])
same, with curl and jq, to work in shell scripts
# suppose CONSUL_HTTP_ADDR is already set
event_list() {
local name=$1
local index=$2
local wait=$3
local url="$CONSUL_HTTP_ADDR/v1/event/list?name=$name"
if [ -n "$index" ]; then
url+="&index=$index"
fi
if [ -n "$wait" ]; then
url+="&wait=$wait"
fi
curl -s -D - "$url" | awk 'NR==1,/^\r$/' | grep 'X-Consul-Index:' | awk '{print $2}' | tr -d '\r'
curl -s "$url"
}
event_walker () {
local name=$1
local index=""
local wait="24h"
local dump_old=$2
out=$(event_list "$name" "$index")
index=${out%%$'\n'*}
events_json=${out#*$'\n'}
if [ "$dump_old" = "true" ]; then
echo "$events_json" | jq -c '.[]' | while read -r event; do
local payload=$(echo "$event" | jq -r '.Payload' | base64 --decode)
echo "{\"name\":\"$name\", $(echo "$payload" | jq -c '.')}"
done
fi
while true; do
out=$(event_list "$name" "$index" "$wait")
new_index=${out%%$'\n'*}
new_events_json=${out#*$'\n'}
if [ "$new_index" = "$index" ]; then
echo "nothing new"
continue
fi
index=$new_index
local last_event=$(echo "$new_events_json" | jq -c '.[-1]')
local payload=$(echo "$last_event" | jq -r '.Payload' | base64 --decode)
echo "{\"name\":\"$name\", $(echo "$payload" | jq -c '.')}"
done
}