Source code for blume.magic

"""Matplotlib plus async


mainloop()

queues of images

clients set up their own queues

draw from Q

Going round in ever decreasing circles.  

The idea is there is some sort of calculation going on that generates grids of
numbers.

You want to see what is going on without grinding the calculations to a halt.

At the same time, the more you explore the data with plots (thanks matplotlib)
the more questions appear.

Often the calculation has a number of parameters, or a selection of data
streams on which it could be run.

So my code sprouts command line arguments and then the simple U/I might allow
me to control more of the parameters.

This module aims to provide a small number of components to help asynchronous
data exploration and graphical monitoring of data processing pipelines.

It also aims to help with the process of maintenance of data streams
over time.   See metagit.py for more ideas there.

Many magic classes started as objects in other short scripts, see
examples for some of those.

Internals?
==========

I have gone with Tk because it is usually around and I don't need much here.

But I will be thinking about raspberry pi's with sense hats and a mini joy
stick too.

All the user interface and display handling are ultimately done by the
EventLoop object, so a place to start when thinking beyond the current.

For in put the focus is very much on keyboard, so things should work well with
anything that can create a stream of key characters.

I am also using David Beazeley's *curio* module to help me use python3.6+ async
features.  

I am starting off here with some pieces from my project *karmapi*.  

So there will be Pig Farms, Piglets and widgets all mixed up for a while.

Update
======

I am planning to re-work how objects pass between Balls.

Balls await put() and get() messages and not care where they come from.

The plan is for the GeeFarm or Shepherd to allow you to examine queue
statistics and add and delete edges to the graph, with an option
whereby it randomly connects things together until it works or melts
or something.

Make it a bit easier to view and navigate the various graphs.

Leaning to a bunch of different *Controller* objects that you can call
up for different tasks such as viewing queues, starting and stopping
processes.

Other objects, waiting on input.

In short, there is a working prototype.  

It is time to untangle the various objects and mold it into something
smaller and more powerful.

That we can then use in the farm module to build a functioning whole.

Let the balls bounce around randomly with the magic roundabout routing
the photons.

"""
import random

import math

import datetime

import io

import sys

from pathlib import Path

from collections import deque, defaultdict, Counter

import argparse

import operator

from traceback import print_exc

import inspect

import dateutil

import functools

#import curio
import asyncio
curio = asyncio

sleep = curio.sleep
run = curio.run

spawn = curio.create_task

import numpy as np

from PIL import Image

import matplotlib

from matplotlib.transforms import Bbox

from matplotlib import figure, artist, patches, colors, rc

from matplotlib import pyplot as plt

from .modnar import random_colour, random_queue

from . import table

Parser = argparse.ArgumentParser


[docs] class RoundAbout: """ Pass self around. Just a collection of random queues that everyone shares. For maximal information sharing, I'm curious what happens when objects with the roundabout do something like: await self.put(self) The magic roundabout just looks after the queues. """ # There is only one, initialise attributes as class attributes # default queue is whatever random_queue serves up. # queues = defaultdict(random_queue) counts = Counter() async def put(self, item, name=None): qq = self.queues[name] self.counts.update([f'put {name}']) await qq.put(item) def put_nowait(self, item, name=None): qq = self.queues[name] self.counts.update([f'put_nowait {name}']) try: qq.put_nowait(item) except asyncio.queues.QueueFull: print(f'magic queue {name} is full. size={qq.qsize()}') def get_nowait(self, name=None): qq = self.queues[name] self.counts.update([f'get_nowait {name}']) try: item = qq.get_nowait() except asyncio.queues.QueueFull: print(f'magic queue {name} is full. size={qq.qsize()}') return item async def get(self, name=None): qq = self.queues[name] self.counts.update([f'get {name}']) result = await qq.get() return result
[docs] def select(self, name=None, create=True): """ pick a q create: if True, create if missing -- actually seems to create regardless, why not? """ return self.queues[name]
[docs] def status(self): """ Show some stats """ print("Queue Stats") for name, qq in self.queues.items(): print(name, qq.qsize(), qq.maxsize)
TheMagicRoundAbout = RoundAbout() class Ball: def __init__(self, **kwargs): if kwargs: self.update(kwargs) self.paused = False self.sleep = .01 self.tasks = Tasks() # ho hum update event_map to control ball? # this should be done via roundabout, # let shepherd control things? self.filters = defaultdict(dict) self.add_filter('z', self.sleepy) self.add_filter('w', self.wakey) #self.add_filter(' ', self.toggle_pause) self.add_filter('W', self.dump_roundabout) self.add_filter('j', self.status) def add_filter(self, key=None, coro=None, name='keys'): filters = self.filters[name] if key is None: for char in coro.__name__: if char not in filters: key = char self.filters[name][key] = coro async def dump_roundabout(self): print('DUMPING ROUNDABOUT') print(TheMagicRoundAbout.counts) #await self.put(TheMagicRoundAbout.counts, 'help') counts = TheMagicRoundAbout.counts.most_common() ax = await self.get() ax.bar(range(len(counts)), height=[x[1] for x in counts], tick_label = [x[0] for x in counts]) for tick in ax.get_xticklabels(): tick.update(dict(rotation=45)) ax.show() def __getattr__(self, attr): """ Delegate to TheMagicRoundAbout """ return getattr(TheMagicRoundAbout, attr) def update(self, args): """ Update attributes as per args args is expected to be from argparse. Now it is super easy to write a short module that has a bunch of command line parameters. """ try: data = vars(args) except TypeError: data = args for key, value in data.items(): setattr(self, key, value) def sleepy(self): """ Sleep more between updates """ self.sleep *= 2 print(f'sleepy {self.sleep}') def wakey(self): """ Sleep less between updates """ self.sleep /= 2 print(f'wakey {self.sleep}') def toggle_pause(self): """ Toggle pause flag """ print('toggle pause') self.paused = not self.paused async def start(self): pass async def run(self): await self.tasks.run()
[docs] class Axe: """ A matplotlib axis that has some extra methods The idea is to hand these out to anyone looking for an axis. By wrapping the axis object we can carry around some meta data that might come in useful. But mostly I want an Axis that I can show and hide from the figure and change its layout. I would like to just go with show and hide, but suspect I might need draw too. """ def __init__(self, delegate, carpet): self.delegate = delegate self.carpet = carpet def __getattr__(self, attr): try: return getattr(self.delegate, attr) except AttributeError as e: try: return getattr(plt, attr) except: raise AttributeError
[docs] def position(self, target): """ Set position to that of target """ sps = self.get_subplotspec() self.set_subplotspec(target.get_subplotspec())
[docs] def show(self): """ Show the axes """ self.set_visible(True) if not hasattr(self, 'img'): self._blank() else: self.img.set_visible(True) self.carpet.show(self)
[docs] def hide(self): """ Hide the axes """ self.set_visible(False) if hasattr(self, 'img'): self.img.set_visible(False) self.carpet.hide(self)
[docs] def please_draw(self): """ Try to force a draw of the axes """ print('politely asked to draw')
#self.draw_artist(self)
[docs] def projection(self, name): """ Set the projection Not sure if this is possible. """ ax = self.delegate parms = dict(projection=name, visible = False) pax = ax.figure.subplots(subplot_kw=parms) self.delegate = pax self.position(ax) self.carpet.lookup[id(pax)] = self if hasattr(ax, 'img'): ax.img.remove() # now delete ax ax.remove()
def simplify(self): #self.xaxis.set_visible(False) #self.yaxis.set_visible(False) self.axis('off') def colorbar(self, mappable): self.figure.colorbar(mappable, self) def hide_axes(self): self.get_xaxis().set_visible(False) self.get_yaxis().set_visible(False) def get_id(self): return id(self.delegate) def _blank(self): fig = self.figure bb = self.get_full_bbox() self.img = patches.Rectangle( bb.p0, bb.width, bb.height, facecolor=Colours.next() ) # add patch to the background self.carpet.background.add_artist(self.img) def get_full_bbox(self): # FIXME -- this needs to take account of padding of the figure # see toggle_expand ss = self.get_subplotspec() gs = ss.get_gridspec() nrows, ncols = gs.get_geometry() fig = self.figure fbbox = fig.bbox dpi = fig.dpi spp = fig.subplotpars # set hspace/wspace to zeo hspace, wspace = spp.hspace, spp.wspace spp.hspace, spp.wspace = 0., 0. bottoms, tops, lefts, rights = gs.get_grid_positions(fig) # now calculate our bottom, top, left, right rowstart, rowstop = ss.rowspan[0], ss.rowspan[-1] colstart, colstop = ss.colspan[0], ss.colspan[-1] top = tops[rowstart] left = lefts[colstart] bottom = bottoms[rowstop] right = rights[colstop] # adjust edge box for figure padding if True: if rowstart == 0: top = 1.0 if colstart == 0: left = 0 if rowstop == nrows-1: bottom = 0.0 if colstop == ncols-1: right = 1.0 # restore hspace, wspace in subplotparms spp.hspace, spp.wspace = hspace, wspace bbox = Bbox([[left, bottom], [right, top]]) return bbox
[docs] def get(self): """ Return a fresh axis """ try: return TheMagicRoundAbout.get_nowait() except asyncio.queues.QueueEmpty: return self
class PatchColours: def __init__(self): self.colours = deque( ('skyblue',)) return self.colours = deque( ('skyblue', 'lightgreen', 'yellow', 'pink', 'orange')) for extra in range(4): self.colours.append( [random.random()/2, random.random()/2, random.random()/2]) def next(self): self.colours.rotate() return self.colours[0] Colours = PatchColours() class Interact(Ball): def __init__(self, ball): super().__init__() self.history = deque() self.set_ball(ball) self.add_filter('0', self.toggle) self.add_filter('1', self.add_one) self.add_filter('3', self.sub_one) self.add_filter('2', self.double) self.add_filter('4', self.half) self.add_filter('5', self.flipsign) self.add_filter('8', self.rcycle) self.add_filter('9', self.cycle) self.add_filter('x', self.tenx) self.add_filter('z', self.tenth) self.add_filter('m', self.add_m) self.add_filter('c', self.shorten) self.add_filter('.', self.next_attr) self.add_filter(',', self.prev_attr) self.add_filter('enter', self.re_interact) self.add_filter('escape', self.back) self.add_filter('i', self.interact) async def run(self): """When this is triggered it usually means I want to run self.ball, not myself for now, achieve this by waiting for self.back() and then pushing an r to trigger a run wonder if this works? I wonder how far the magic roundabout can go before descending into chaos the real reason this is here is it is the Sheperd that looks after running things. I think I will have it listen to a 'run' queue. ... added a run queue i think .... let's see """ await self.put(self.ball, 'run') # and switch ourself off await self.put(self, 'run') def set_ball(self, ball): if not isinstance(ball, Ball): try: ball = Wrapper(ball) attrs = deque(ball.attrs) except Exception as e: print(e) print('oops no can interact', ellipsis(repr(ball))) return else: attrs = deque(sorted(vars(ball))) self.attrs = attrs self.ball = ball async def back(self): """ pop back in history """ if not self.history: return ball = self.history.pop() self.set_ball(ball) def interact(self): """ Go into interactive mode hmm.... worm can time """ from pprint import pprint for key, value in sorted(vars(self.ball).items()): rep = ellipsis(repr(value)) print(key, rep, type(value)) print() self.show_current() print() async def re_interact(self): """ Recursively interact mode hmm.... bigger worm can time Idea is to interact on current attribute of whatever current is """ current = self.current() obj = getattr(self.ball, current) print('XXXXX re_interact', current, type(obj)) try: obj = await self.to_table(obj) except: print('hmm... not a table?') self.history.append(self.ball) self.set_ball(obj) self.show_current() async def to_table(self, obj): from astropy.table import Table table = Table(obj) print(table.colnames) key = table.colnames[0] for col in table.colnames[1:]: print('plotting', col) try: ax = await self.get() ax.plot(table[col]) ax.set_xlabel(col) ax.show() except: pass return table def show_current(self): attr = self.current() value = repr(getattr(self.ball, attr)) result = (attr, ellipsis(value)) print(*result) #self.put_nowait(str(result), 'help') def current(self): """ Return current attr """ return self.attrs[0] async def next_attr(self): self.attrs.rotate(-1) self.show_current() async def prev_attr(self): self.attrs.rotate() attr = self.attrs[0] self.show_current() def operate(self, op=operator.add, b=2): key = self.current() value = getattr(self.ball, key) try: items = list(value) for ix, item in enumerate(items): if b is not None: value[ix] = op(item, b) else: value[ix] = op(item) except TypeError: if b is not None: value = op(value, b) else: value = op(value) setattr(self.ball, key, value) print(f'{key}: {value}') self.show_current() def double(self): """ i double """ self.operate(operator.mul, 2) def half(self): """ i half """ self.operate(operator.mul, 1/2) def tenx(self): """ i ten x """ self.operate(operator.mul, 10) def tenth(self): """ i one tenth """ self.operate(operator.mul, 1/10) def add_one(self): """ i one more """ self.operate(operator.add, 1) def add_m(self): """ i one thousand more """ self.operate(operator.add, 1000) def sub_one(self): """ i one less """ self.operate(operator.add, -1) def toggle(self): """ i toggle """ self.operate(operator.not_, None) def flipsign(self): """ i flip """ self.operate(operator.mul, -1) def acycle(self, howmuch=1): key = self.current() value = getattr(self.ball, key) try: value.rotate(howmuch) if value: print(value[0]) except: print(value) try: if howmuch > 0: split = -1 * (1 + howmuch) value = value[:] + value[:howmuch] except: print('no cycle for you', howmuch) def cycle(self): """ i cycle """ self.acycle() def rcycle(self): """ i cycle back """ self.acycle(-1) def shorten(self): key = self.current() value = getattr(self.ball, key) if len(value) > 1: print(f'popping {value[0]}') value.popleft() print(f'new head {value[0]}') class Wrapper: def __init__(self, ball): if isinstance(ball, dict): self.__dict__.update(ball) self.attrs = self.__dict__.keys() if hasattr(ball, 'colnames'): names = {} for name in ball.colnames: names[name] = ball[name] self.__dict__update(names) self.attrs = ball.colnames else: self.attrs = ball.__dict__.keys() # keep a reference to the full thing self.ball = ball def __getattr__(self, attr): return getattr(self.ball, attr)
[docs] class Spell: """ A magic spell, or cast if you like, if it works For help processing lists of dictionaries. Or csv files, where we have to turn strings into values. The game is guessing the types of the columns in a csv file. Use a magic.Spell. """ def __init__(self, cache=None): """ Older idea, updated:: Cache size is how much rewind we get if a type changes If None, everything gets cached. It would make sense to coordinate this with functools.lru_cache, now known as functools.cache (python 3.9). For small datasets this might be what you want. """ # casts by keyword self.casts = {} self.date_parse = date_parse = dateutil.parser.parser().parse self.upcast = {None: int, int: float, float: date_parse, date_parse: str} self.fill = {None: None, int: 0, float: 0.0, date_parse: None, str: ''} # how much data to look at to find casts self.sniff = 10
[docs] def spell(self, data): """ Apply casts to data Would like this to be dynamic, updating the casts as we go """ # short hand for: for xx in self.cast_data(data); yield xx yield from self.cast_data(data)
def find_casts(self, data, sniff=10): sniff = sniff or self.sniff keys = data[0].keys() casts = self.casts upcast = self.upcast for row in data[-self.sniff:]: for key in keys: value = row[key].strip() if value: try: casts.setdefault(key, int)(value) except: casts[key] = upcast[casts[key]] # look for a (first) date key - probably should looke # for all dates, really we are looking for an index here self.datekey = None for key, cast in self.casts.items(): if cast is self.date_parse: self.datekey = key return def check_casts(self, data, sniff=10): self.find_casts(data, sniff) def cast_data(self, data): casts = self.casts fill = self.fill for row in data: result = {} for key, value in row.items(): if key not in casts: print(key) continue cast = casts[key] if not value.strip(): value = fill.setdefault(cast) result[key] = cast(value) yield result def fields(self): return self.casts.keys()
def find_date_key(record): for key, value in record.items(): try: date = dateutil.parser.parser(value) return key except Exception as e: # guess it is not this one print(e) print(key, value, 'is not a date') modes = deque(['grey', 'white', 'black'])
[docs] def fig2data(fig=None, background='grey'): """ Convert a Matplotlib figure to a PIL image. fig: a matplotlib figure return: PIL image There has to be an easier way to do this. FIXME -- turning matplotlib figures into PIL or numpy """ #facecolor = #facecolor = #facecolor = 'black' fig = fig or plt facecolor = modes[0] if hasattr(fig, 'get_facecolor'): facecolor = fig.get_facecolor() #print('facecolor', facecolor) # no renderer without this image = io.BytesIO() fig.savefig(image, facecolor=facecolor, dpi=200) return Image.open(image)
[docs] class Shepherd(Ball): """Watches things nobody else is watching This currently sets up all the message handling and runs everything. To route messages between balls it is seems to be necessary to have access to the graph of relationships. In the current setup this is GeeFarm, but this leaves everything to the Shepherd. The Shepherd in turn does it's work by setting up relays for each edge in the graph, watching all the roundabout queues and passing messages along. There is one relay per edge in the graph and it just connects the output and input queues of objects along that edge. There is a whole other chunk of code dealing with passing keyboard events along a path of objects, and all the related key binding fun. Again, the roundabout holds the data and every Ball has a Roundabout. It might be good to have just one RoundAbout, managed by the Shepherd. Dynamic graph? """ def __init__(self): super().__init__() self.flock = None self.running = {} self.relays = set() self.path = [self] self.interaction = Interact(self.path[-1]) self.add_filter('h', self.show_help) self.add_filter('n', self.next_ball) self.add_filter('p', self.previous_ball) #self.add_filter('u', self.up) #self.add_filter('d', self.down) self.add_filter('r', self.toggle_run) self.add_filter('b', self.start) self.add_filter('I', self.edit_current) self.add_filter('x', self.status) self.add_filter('i', self.interact) # make a little sleepy self.sleep *= 10
[docs] async def interact(self): """ Go into interactive mode hmm.... worm can time """ # Now if interaction has current = self.current() if current is not self.interaction.ball: self.interaction.set_ball(current) self.interaction.interact() if self.interaction not in self.path: self.path.append(self.interaction) self.generate_key_relays() self.put_nowait('interact', 'gkr')
def current(self): return self.path[-1]
[docs] async def status(self): """ Show current status of object graph """ #await self.put(id(TheMagicRoundAbout), 'status') #await self.put(id(TheMagicRoundAbout), 'help') #qsize = self.select('status').qsize() #await helps = [] helps += str(TheMagicRoundAbout.counts).split(',') #for item in self.flock.nodes: # helps.append(str(item)) # # if item is not self: # try: # stat = item.status() # if inspect.iscoroutine(stat): # await stat # except: # pass # print() helps.append('') helps += [str(x.__class__) for x in self.path] helps.append('PATH') for item in self.path: helps.append(str(item)) print('\n'.join(helps)) self.put_nowait('\n'.join(helps), 'help')
[docs] def set_flock(self, flock): """ Supply the flock to be watched """ self.flock = flock
def set_path(self, path=None): self.path = path or [self] def generate_key_relays(self, name='keys'): for rly in self.relays: rly.cancel() self.relays.clear() for target, key, callback in self.generate_key_bindings(): listener = spawn(relay(key, callback)) self.relays.add(listener) self.put_nowait('gkr', 'oldgrey') def generate_key_bindings(self, name='keys'): keys = set() for sheep in reversed(self.path): #msg += repr(sheep) + '\n' lu = sheep.filters[name] for key, value in lu.items(): if key in keys: continue yield sheep, key, value keys.add(key)
[docs] async def show_help(self, name='keys'): """ Show what keys do what """ print('HELP', self.path) # FIXME? msg = [] for sheep, key, callback in self.generate_key_bindings(name=name): doc = self.doc_firstline(callback) msg.append([key, sheep.__class__.__name__, doc]) textmsg = '\n'.join([' '.join(x) for x in msg]) print(textmsg) # hmm -- there's a queue of help messages somewhere # maybe should use that for some other display. hq = self.select('help') if hq.qsize() < hq.maxsize - 1: # if the queue is getting full, then we don't want to hang here. # curiously, maxsize - 1 is the critical size at which it seems # to hang. self.put_nowait(msg, 'help')
[docs] def doc_firstline(self, value): """ Return first line of doc """ doc = value.__doc__ if doc: return doc.split('\n')[0] else: return value.__name__
#return "????"
[docs] async def helper(self): """ Task to run if you want help on the carpet One day it will be easy to start and stop this. """ from blume.legend import Grid print('xxHELPER STARTING UPxx') while True: msg = await self.get('help') ax = await self.get() try: if isinstance(msg, str): msg = [[msg]] widths = get_widths(msg) tab = table.table( ax.delegate, cellText=msg, bbox=(0,0,1,1), cellLoc='center', colWidths=widths) foo = tab[0,0] foo.set_text_props(multialignment='left') except: print(msg) print("HELPER CAUGHT AN EXCEPTION") print_exc() continue ax.axis('off') ax.show()
[docs] async def start(self): """ Start things going FIXME: make it simple """ print('STARTING SHEPHERD') for sheep, info in self.flock.nodes.items(): if sheep is self: print("skipping starting myself") #self.running[self] = True continue print('starting', sheep) # just start all the nodes dolly = sheep.start() if inspect.iscoroutine(dolly): await dolly print('info', info) if info.get('background'): # run in background runner = spawn(canine(sheep)) self.running[sheep] = runner else: self.path.append(sheep) spawn(relay('gkr', self.generate_key_relays)) spawn(relay('run', self.toggle_run, with_message=True)) # add a task to watch tasks self.watcher = spawn(self.task_watcher()) print('sending out ready message to oldgrey') self.put_nowait('ready', 'gkr') #await self.watch_roundabouts() # figure out current path #current = None #for sheep in self.flock: # if current is None: # current = sheep # #self.path.append(current) # what set up is needed for run? # navigate the tree # R for run # S for stop # u/d/p/n up down previous next self.running['helper'] = spawn(self.helper())
async def task_watcher(self): while True: print('Task q size', self.select('task').qsize()) task = await self.get('task') if task.cancelled(): continue if task.done(): if task.exception(): print(task.exception()) else: await self.put(task, 'task')
[docs] async def next_ball(self): """ Move focus to next path management. """ print(f'what is next?: {self.path}') print(f'wtf is going on?') current = self.current() print(f'current is {current}') print(f'doing nodes manipulation') nodes = [n for n in self.flock.nodes if n not in self.path] if nodes: self.path.append(nodes[0]) #print('oldgrey', self.current()) #await self.generate_key_relays() self.put_nowait('done', 'gkr') self.put_nowait(str(self.current()), 'help')
[docs] async def previous_ball(self): """ Move focus to previous """ await self.up()
[docs] async def up(self): """ Move up path """ if len(self.path) > 1: del self.path[-1] print(f'up new path: {self.path}') self.generate_key_relays() self.put_nowait(str(self.current()), 'help') self.put_nowait('done', 'gkr')
[docs] async def down(self): """ Move focus to next node """ await self.next_ball()
#print(f'down new path: {self.path}') #await self.put(str(self.current()), 'help') #await self.put('done', 'oldgrey')
[docs] async def toggle_run(self, sheep=None): """ Toggle run status of sheep """ sheep = sheep or self.current() # run it if not already running if sheep not in self.running: self.running[sheep] = spawn(canine(sheep)) else: task = self.running[sheep] try: await task.cancel() except: print(task) print(type(task)) del self.running[sheep]
[docs] async def edit_current(self): """ open code for current in idle FIXME: what to do with no idle, eg in pyscript land? """ sheep = self.current() filename = inspect.getsourcefile(sheep.__class__) sub = await asyncio.create_subprocess_shell(f'idle {filename}')
# idle seems to want an _W #self._w = None
[docs] async def run(self): """ run the flock Decide what to run. Manage the resulting set of roundabouts. Pass messages along. """ #for sheep in self.flock: #print(f'shepherd running {sheep in self.running} {sheep}') #print(f' {sheep.status()}') #print('SHEPHERD RUN') # delegated to hub #print('shepstart') #nx.draw(self.flock) colours = [] for sheep in self.flock: c = 'blue' if sheep in self.running: c = 'red' if sheep is self.current(): c = 'gold' elif sheep in self.path: c= 'green' colours.append(c)
#ax = await self.get() #nx.draw_networkx(self.flock, node_color=colours, ax=ax)
[docs] async def quit(self): """ Cancel all the tasks """ print('Cancelling runners and relays') tocancel = set(self.running.values()) tocancel.update(self.relays) for task in tocancel: try: task.cancel() except Exception as e: print(f'cancel failed for {task}') print(e)
#print('remaining tasks') #for task in asyncio.all_tasks(): # if not task.cancelled(): # print(task) def __str__(self): return f'shepherd of flock degree {len(self.flock)}'
def get_widths(msg): # find max len of string for each column ncols = len(msg[0]) widths = [] for col in range(ncols): widths.append(max([len(str(x[col])) for x in msg]) + 3) # now normalise total = sum(widths) widths = [x/total for x in widths] print('WWWWWWWWWWWW', widths) return widths class IdleRunner: def __init__(self, filename): self.filename = filename def __call__(self): import subprocess asyncio.subprocess.run(('idle', self.filename))
[docs] class Table: """ Magic table. A list of dictionaries, and ways to explore them? """ def __init__self(self, path=None, data=None, meta_data=None): """ Turn what we are given into a table """ self.path = path or Path() self.data = data self.meta_data = meta_data """ now what? look at data with a magic spell and get meta data. """
def show(): try: plt.show(block=False) except: # sometimes backends have show without block parameter? plt.show()
[docs] class Carpet(Ball): """ Current status: history just added, wormholes opened. FIXME: figure out lifecycle of an Axe generate_mosaic creates and adds to self.axes Need to be able to know: a. Axe has been handed out b. Axe has been shown c. Axe still in history d. geometry -- so we can spot Axe replacing another in same spot. Cases: handed out, still in history == keep handed out, not in history, Deletion: a. not in history b. has been handed out c. not in current image: ie self.showing """ def __init__(self): super().__init__() self.sleep = 0.01 # grid related self.size = [1, 1] # wibni Interact operations worked sanely here self.simple = False self.expanded = None self.output = None self.showing = {} self.history = deque(maxlen=random.randint(10, 20)) self.axes = deque() self.lookup = dict() #self.savefig_dpi = 3000 #self.image = plt.figure(constrained_layout=True, facecolor='grey') self.image = plt.figure() print("GOT IMAGE", self.image) self.background = self.image.add_axes((0,0,1,1)) # keyboard handling self.image.canvas.mpl_connect('key_press_event', self.keypress) # let's see everything #self.log_events() self.add_filter('+', self.more) self.add_filter('=', self.more) self.add_filter('-', self.less) self.add_filter('a', self.add_row) self.add_filter('c', self.add_column) self.add_filter('[', self.history_back) self.add_filter(']', self.history_forward) self.add_filter('S', self.save) self.add_filter('E', self.toggle_expand) self.add_filter('F', self.toggle_expand2) #self.add_filter(' ', self.toggle_pause) def log_events(self): events = [ 'button_press_event', 'button_release_event', 'draw_event', 'key_press_event', 'key_release_event', 'motion_notify_event', 'pick_event', 'resize_event', 'scroll_event', 'figure_enter_event', 'figure_leave_event', 'axes_enter_event', 'axes_leave_event', 'close_event'] connect = self.image.canvas.mpl_connect from functools import partial for event in events: connect(event, partial(self.log_event, name=event)) def log_event(self, event, name=None): print(name, event)
[docs] def keypress(self, event): """ Take keypress events put them out there """ #print('mosaic carpet handling', event) # use select here to get actual magic curio queue # where put can magically be a coroutine or a function according # to context. qq = self.select(event.key) qq.put_nowait(event)
[docs] async def save(self): """ Save current image """ self.image.savefig(f'carpet{datetime.datetime.now()}.png')
# dpi=self.savefig_dpi)
[docs] async def add_row(self): """ add a row to the mosaic """ self.size[0] += 1 await self.rebuild()
async def add_column(self): self.size[1] += 1 await self.rebuild()
[docs] async def more(self): """ Show more pictures """ self.size[0] += 1 self.size[1] += 1 await self.rebuild()
[docs] async def less(self): """ Show fewer pictures """ if self.size[0] > 1: self.size[0] -= 1 if self.size[1] > 1: self.size[1] -= 1 await self.rebuild()
async def rebuild(self): self.hideall() #self.generate_mosaic() print('replay history', len(self.history)) await self.replay_history() def hideall(self): # hide everything currently being shown for key, ax in self.showing.items(): ax.hide() self.showing.clear() # drain any axes waiting in self.axes for ax in self.axes: ax.figure.delaxes(ax.delegate) self.axes.clear() async def history_back(self): await self.history_rotate(-1) async def history_forward(self): await self.history_rotate(1) async def history_rotate(self, n=1): #print('history', len(self.history), 'rotate', n) if len(self.history) == 0: return self.history.rotate(n) # we want to replace the current axes with the value we pop qq = self.select() pos = await self.get() ax = self.history.popleft() ax.position(pos) #ax.set_visible(True) if pos.delegate in self.image.axes: self.image.delaxes(pos.delegate) del pos ax.show() async def replay_history(self, maxback=None): # take a copy of the current history hlen = len(self.history) hlen = min(max(*self.size), hlen) # need to throw away one axis in the queue await self.get() for hh in range(hlen): await self.history_rotate(1)
[docs] def toggle_expand2(self): """ ask figure to expand the axes to fill the space """ fig = self.image fig.subplots_adjust(hspace=0, wspace=0)
[docs] def toggle_expand(self, names=None): """ Toggle making each axis fill its space """ names = names or ["left", "bottom", "right", "top", "wspace", "hspace"] fig = self.image if not self.expanded: self.expanded = {} for name in names: self.expanded[name] = getattr(fig.subplotpars, name) rc('image', aspect='auto') fig.subplots_adjust( left=0, right=1, bottom=0, top=1, hspace=0, wspace=0) else: print(self.expanded) fig.subplots_adjust(**self.expanded) self.expanded = None
[docs] async def poll(self): """ Gui Loop """ # Experiment with sleep to keep gui responsive # but not a cpu hog. event = 0 nap = 0.05 canvas = self.image.canvas while True: #print('RUNNING EVENT LOOP') canvas.flush_events() canvas.start_event_loop(self.sleep) await sleep(self.sleep * 10)
async def start(self): # start some tasks to keep things ticking along #watch_task = await curio.spawn(self.watch()) print("carpet starting tasks") poll_task = spawn(self.poll()) print('POLL TASK SPAWNED') self.tasks = [poll_task] print("DONE STARTED carpet") def generate_mosaic(self): # first try and delete some stuff self.delete_old_axes() # set up the square mosaic for current size mosaic = [] mosaic = np.arange(self.size[0] * self.size[1]) mosaic = mosaic.reshape(*self.size) keys = dict(visible=False) picture = self.image.subplot_mosaic(mosaic, subplot_kw=keys) for key, ax in picture.items(): ax.meta = dict(key=key) axe = Axe(ax, self) self.axes.append(axe) self.lookup[id(ax)] = axe def delete_old_axes(self): naxes = len(self.image.axes) showing = self.showing.values() for ax in self.image.axes: if ax is self.background: continue try: axe = self.lookup[id(ax)] except: print(f'WHOA {id(ax)} {type(ax)} missing from lookup') raise if (axe not in self.history and axe not in showing and hasattr(axe, 'img')): axe.img.remove() ax.figure.delaxes(ax) del self.lookup[id(ax)] del ax async def run(self): # nobody waiting for axes, don't add to the queue if self.select().qsize() > 0: return if not self.axes: self.generate_mosaic() axe = self.axes.popleft() if self.simple: axe.simplify() axe.grid(True) await self.put(axe) def get_axe_geometry(self, axe): return axe.get_subplotspec().get_geometry() def show(self, axe): gg = self.get_axe_geometry(axe) if gg in self.showing: tohide = self.showing[gg] #print(f'Showing {id(tohide)} {tohide.get_visible()}') if tohide is not axe: tohide.hide() self.history.appendleft(axe) self.showing[gg] = axe self.image.canvas.draw_idle() def hide(self, axe): if axe.get_visible(): axe.set_visible(False)
[docs] async def canine(ball): """ A sheep dog, something to control when it pauses and sleeps runner for node.run and more This was an an attempt to factor out some boiler plate from run methods. so run has turned into "do one iteration of what you do" and `canine` here is managing pausing and sleep Now we could loop round doing timeouts on queues and then firing off runs. With a bit more work when building things ... TheMagicRoundAbout time? Update: trying to accommodate balls where run is just a function. Try to accommodate coroutines and couroutine functions. """ print('dog chasing ball:', ball) runs = 0 if isinstance(ball, Ball): run = ball.run else: run = ball sleepy = 0 while True: if hasattr(ball, 'paused'): paused = ball.paused else: paused = False if not paused: try: # gymnastics to deal with callables coroutines # or coroutinefunctions if inspect.iscoroutine(run): result = run else: result = run() # now if it is a coroutine if inspect.iscoroutine(result): #print(f'canine awaits result {runs} for {ball}') await result runs += 1 if hasattr(ball, 'sleep'): sleepy = ball.sleep await sleep(sleepy) except asyncio.CancelledError: print(f'cancelled running of {ball} after {runs} runs') raise except: print_exc() raise
class Task: def __init__(self, task, args=None, kwargs=None, active=True, plot=True): self.task = task self.args = args or [] self.kwargs = kwargs or {} self.active = active self.result = None self.plot = plot async def run(self): args = self.args.copy() if self.plot: ax = await magic.TheMagicRoundAbout.get() args = [ax] + args self.result = await self.task(*args, **self.kwargs) class Tasks: def __init__(self, tasks=None): self.tasks = deque() def add(self, task, plot=False, *args, active=True, **kwargs): """ Add a task """ self.tasks.append(Task(task, args, kwargs, active, plot)) async def run(self, sleep=0): """ Run the tasks """ for task in self.tasks: await task.run() # take a nap between tasks await asyncio.sleep(0) async def runme(): farm = GeeFarm() a = Ball() b = Ball() farm.add_edge(a, b) print('starting farm') await farm.start() print('running farm') await farm.run() def ellipsis(string, maxlen=200, keep=10): if len(string) > maxlen: string = string[:keep] + ' ... ' + string[-keep:] return string async def relay(channel, callback, with_message=False): import traceback while True: msg = await TheMagicRoundAbout.get(channel) try: if with_message: result = callback(msg) else: result = callback() except Exception as e: print(f'{channel} relay exception for {callback}') traceback.print_exc() continue if inspect.iscoroutine(result): try: await result except: traceback.print_exc() def runner(ball): TheMagicRoundAbout.put_nowait(ball, 'run') if __name__ == '__main__': run(runme(), with_monitor=True)