I have no idea what happened here...

This commit is contained in:
2021-06-30 11:07:28 -04:00
parent b01f236f7c
commit b44a63312e
716 changed files with 236008 additions and 32 deletions

View File

@@ -0,0 +1,3 @@
#!/usr/bin/env python
__all__ = ['color', 'grayscale', 'sstv', 'tests', 'examples']

View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python
from __future__ import print_function, division
from PIL import Image
from argparse import ArgumentParser
from sys import stderr
from pysstv import color, grayscale
SSTV_MODULES = [color, grayscale]
def main():
module_map = build_module_map()
parser = ArgumentParser(
description='Converts an image to an SSTV modulated WAV file.')
parser.add_argument('img_file', metavar='image.png',
help='input image file name')
parser.add_argument('wav_file', metavar='output.wav',
help='output WAV file name')
parser.add_argument(
'--mode', dest='mode', default='MartinM1', choices=module_map,
help='image mode (default: Martin M1)')
parser.add_argument('--rate', dest='rate', type=int, default=48000,
help='sampling rate (default: 48000)')
parser.add_argument('--bits', dest='bits', type=int, default=16,
help='bits per sample (default: 16)')
parser.add_argument('--vox', dest='vox', action='store_true',
help='add VOX tones at the beginning')
parser.add_argument('--fskid', dest='fskid',
help='add FSKID at the end')
parser.add_argument('--chan', dest='chan', type=int,
help='number of channels (default: mono)')
parser.add_argument('--resize', dest='resize', action='store_true',
help='resize the image to the correct size')
parser.add_argument('--keep-aspect-ratio', dest='keep_aspect_ratio', action='store_true',
help='keep the original aspect ratio when resizing (and cut off excess pixels)')
parser.add_argument('--resample', dest='resample', default='lanczos',
choices=('nearest', 'bicubic', 'lanczos'),
help='which resampling filter to use for resizing (see Pillow documentation)')
args = parser.parse_args()
image = Image.open(args.img_file)
mode = module_map[args.mode]
if args.resize and any(i != m for i, m in zip(image.size, (mode.WIDTH, mode.HEIGHT))):
resample = getattr(Image, args.resample.upper())
if args.keep_aspect_ratio:
orig_ratio = image.width / image.height
mode_ratio = mode.WIDTH / mode.HEIGHT
crop = orig_ratio != mode_ratio
else:
crop = False
if crop:
if orig_ratio < mode_ratio:
w = mode.WIDTH
h = int(w / orig_ratio)
else:
h = mode.HEIGHT
w = int(orig_ratio * h)
else:
w = mode.WIDTH
h = mode.HEIGHT
image = image.resize((w, h), resample)
if crop:
x = (image.width - mode.WIDTH) / 2
y = (image.height - mode.HEIGHT) / 2
image = image.crop((x, y, mode.WIDTH + x, mode.HEIGHT + y))
elif not all(i >= m for i, m in zip(image.size, (mode.WIDTH, mode.HEIGHT))):
print(('Image must be at least {m.WIDTH} x {m.HEIGHT} pixels '
'for mode {m.__name__}').format(m=mode), file=stderr)
raise SystemExit(1)
s = mode(image, args.rate, args.bits)
s.vox_enabled = args.vox
if args.fskid:
s.add_fskid_text(args.fskid)
if args.chan:
s.nchannels = args.chan
s.write_wav(args.wav_file)
def build_module_map():
try:
from collections import OrderedDict
module_map = OrderedDict()
except ImportError:
module_map = {}
for module in SSTV_MODULES:
for mode in module.MODES:
module_map[mode.__name__] = mode
return module_map
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,200 @@
#!/usr/bin/env python
from __future__ import division
from six.moves import range, zip
from pysstv.sstv import byte_to_freq, FREQ_BLACK, FREQ_WHITE, FREQ_VIS_START
from pysstv.grayscale import GrayscaleSSTV
from itertools import chain
RED, GREEN, BLUE = range(3)
class ColorSSTV(GrayscaleSSTV):
def on_init(self):
self.pixels = self.image.convert('RGB').load()
def encode_line(self, line):
msec_pixel = self.SCAN / self.WIDTH
image = self.pixels
for index in self.COLOR_SEQ:
for item in self.before_channel(index):
yield item
for col in range(self.WIDTH):
pixel = image[col, line]
freq_pixel = byte_to_freq(pixel[index])
yield freq_pixel, msec_pixel
for item in self.after_channel(index):
yield item
def before_channel(self, index):
return []
after_channel = before_channel
class MartinM1(ColorSSTV):
COLOR_SEQ = (GREEN, BLUE, RED)
VIS_CODE = 0x2c
WIDTH = 320
HEIGHT = 256
SYNC = 4.862
SCAN = 146.432
INTER_CH_GAP = 0.572
def before_channel(self, index):
if index == GREEN:
yield FREQ_BLACK, self.INTER_CH_GAP
def after_channel(self, index):
yield FREQ_BLACK, self.INTER_CH_GAP
class MartinM2(MartinM1):
VIS_CODE = 0x28
WIDTH = 160
SCAN = 73.216
class ScottieS1(MartinM1):
VIS_CODE = 0x3c
SYNC = 9
INTER_CH_GAP = 1.5
SCAN = 138.24 - INTER_CH_GAP
def horizontal_sync(self):
return []
def before_channel(self, index):
if index == RED:
for item in MartinM1.horizontal_sync(self):
yield item
yield FREQ_BLACK, self.INTER_CH_GAP
class ScottieS2(ScottieS1):
VIS_CODE = 0x38
SCAN = 88.064 - ScottieS1.INTER_CH_GAP
WIDTH = 160
class Robot36(ColorSSTV):
VIS_CODE = 0x08
WIDTH = 320
HEIGHT = 240
SYNC = 9
INTER_CH_GAP = 4.5
Y_SCAN = 88
C_SCAN = 44
PORCH = 1.5
SYNC_PORCH = 3
INTER_CH_FREQS = [None, FREQ_WHITE, FREQ_BLACK]
def on_init(self):
self.yuv = self.image.convert('YCbCr').load()
def encode_line(self, line):
pixels = [self.yuv[col, line] for col in range(self.WIDTH)]
channel = 2 - (line % 2)
y_pixel_time = self.Y_SCAN / self.WIDTH
uv_pixel_time = self.C_SCAN / self.WIDTH
return chain(
[(FREQ_BLACK, self.SYNC_PORCH)],
((byte_to_freq(p[0]), y_pixel_time) for p in pixels),
[(self.INTER_CH_FREQS[channel], self.INTER_CH_GAP),
(FREQ_VIS_START, self.PORCH)],
((byte_to_freq(p[channel]), uv_pixel_time) for p in pixels))
class PasokonP3(ColorSSTV):
"""
[ VIS code or horizontal sync here ]
Back porch - 5 time units of black (1500 Hz).
Red component - 640 pixels of 1 time unit each.
Gap - 5 time units of black.
Green component - 640 pixels of 1 time unit each.
Gap - 5 time units of black.
Blue component - 640 pixels of 1 time unit each.
Front porch - 5 time units of black.
Horizontal Sync - 25 time units of 1200 Hz.
"""
TIMEUNIT = 1000/4800. # ms
COLOR_SEQ = (RED, GREEN, BLUE)
VIS_CODE = 0x71
WIDTH = 640
HEIGHT = 480+16
SYNC = 25 * TIMEUNIT
SCAN = WIDTH * TIMEUNIT
INTER_CH_GAP = 5 * TIMEUNIT
def before_channel(self, index):
if index == self.COLOR_SEQ[0]:
yield FREQ_BLACK, self.INTER_CH_GAP
def after_channel(self, index):
yield FREQ_BLACK, self.INTER_CH_GAP
class PasokonP5(PasokonP3):
TIMEUNIT = 1000/3200. # ms
VIS_CODE = 0x72
SYNC = 25 * TIMEUNIT
SCAN = PasokonP3.WIDTH * TIMEUNIT
INTER_CH_GAP = 5 * TIMEUNIT
class PasokonP7(PasokonP3):
TIMEUNIT = 1000/2400. # ms
VIS_CODE = 0xF3
SYNC = 25 * TIMEUNIT
SCAN = PasokonP3.WIDTH * TIMEUNIT
INTER_CH_GAP = 5 * TIMEUNIT
class PD90(ColorSSTV):
VIS_CODE = 0x63
WIDTH = 320
HEIGHT = 256
SYNC = 20
PORCH = 2.08
PIXEL = 0.532
def gen_image_tuples(self):
yuv = self.image.convert('YCbCr').load()
for line in range(0, self.HEIGHT, 2):
for item in self.horizontal_sync():
yield item
yield FREQ_BLACK, self.PORCH
pixels0 = [yuv[col, line] for col in range(self.WIDTH)]
pixels1 = [yuv[col, line + 1] for col in range(self.WIDTH)]
for p in pixels0:
yield byte_to_freq(p[0]), self.PIXEL
for p0, p1 in zip(pixels0, pixels1):
yield byte_to_freq((p0[2] + p1[2]) / 2), self.PIXEL
for p0, p1 in zip(pixels0, pixels1):
yield byte_to_freq((p0[1] + p1[1]) / 2), self.PIXEL
for p in pixels1:
yield byte_to_freq(p[0]), self.PIXEL
class PD120(PD90):
VIS_CODE = 0x5f
WIDTH = 640
HEIGHT = 496
PIXEL = 0.19
class PD160(PD90):
VIS_CODE = 0x62
WIDTH = 512
HEIGHT = 400
PIXEL = 0.382
class PD180(PD120):
VIS_CODE = 0x60
PIXEL = 0.286
class PD240(PD120):
VIS_CODE = 0x61
PIXEL = 0.382
MODES = (MartinM1, MartinM2, ScottieS1, ScottieS2, Robot36,
PasokonP3, PasokonP5, PasokonP7, PD90, PD120, PD160, PD180, PD240)

View File

@@ -0,0 +1,3 @@
#!/usr/bin/env python
__all__ = ['overlay', 'pyaudio_sstv', 'repeater']

View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python
class Image(object):
def __init__(self, content):
self.content = content
def load(self):
return self
def __getitem__(self, item):
if isinstance(item, tuple):
x, y = item
return Image('{0}[(ROW({1}) + COL({2})) * 3'.format(self.content, y, x))
elif isinstance(item, int):
return Image('{0} + RGB({1})]'.format(self.content, item))
else:
raise NotImplementedError()
def __rmul__(self, n):
return Image('({1} * {0})'.format(self.content, float(n)))
def __mul__(self, n):
return Image('({0} * {1})'.format(self.content, float(n)))
def __rtruediv__(self, n):
return Image('({1} / {0})'.format(self.content, n))
def __truediv__(self, n):
return Image('({0} / {1})'.format(self.content, n))
def __radd__(self, n):
return Image('({1} + {0})'.format(self.content, n))
def __add__(self, n):
return Image('({0} + {1})'.format(self.content, n))
def __str__(self):
return self.content
from pysstv.color import MartinM1, MartinM2, PasokonP3, PasokonP5, PasokonP7
import re
supported = [MartinM1, MartinM2, PasokonP3, PasokonP5, PasokonP7]
ROW_RE = re.compile(r'ROW\(\d+\)')
def main(sstv_class=None):
if sstv_class is None:
sstv_class = MartinM1
elif sstv_class not in supported:
raise NotImplementedError()
sstv = sstv_class(Image('img'), 44100, 16)
n = 0
yield '#define ROW(x) x'
yield '#define COL(x) x'
yield '#define RGB(x) x'
yield 'void convert(unsigned char *img, float *freqs, float *msecs, const int width) {\nint frq = 0;'
history = []
lut = {}
same_as = {}
for freq, msec in sstv.gen_freq_bits():
printed = 'freqs[frq] = {1}; msecs[frq++] = {2};'.format(n, freq, msec)
key = ROW_RE.sub('row', printed)
old = lut.get(key)
if old is not None:
same_as[n] = old
else:
lut[key] = n
history.append((printed, key))
n += 1
del lut
m_start, m_len = gen_matches(same_as, history, n)
for i in xrange(same_as[m_start]):
yield history[i][0]
yield 'for (int row = 0; row < width * {0}; row += width) {{'.format(sstv.HEIGHT)
for i in xrange(same_as[m_start], same_as[m_start] + m_len - 1):
yield ' ' + history[i][1]
yield '}'
yield '}}\n\n#define FREQ_COUNT {0}'.format(n)
def gen_matches(same_as, history, n):
cur_start = None
cur_len = None
cur_end = None
for i in xrange(n):
if cur_start is None:
tmp = same_as.get(i)
if tmp is not None:
cur_len = 1
cur_start = i
cur_end = tmp
else:
tmp = same_as.get(i)
if tmp is not None and history[tmp][1] == history[cur_end + 1][1] and cur_start > cur_end:
cur_len += 1
cur_end += 1
else:
if tmp is not None and history[tmp][1] == history[cur_end + 1][1]:
return cur_start, cur_len
tmp = same_as.get(i)
if tmp is None:
cur_start = None
else:
cur_len = 1
cur_start = i
cur_end = tmp
def test(img_file):
from subprocess import Popen, PIPE, check_output
from os import remove, path
from PIL import Image
from datetime import datetime
import struct
exe = './codegen-test-executable'
if not path.exists('stb_image.h'):
from urllib import urlretrieve
urlretrieve('https://raw.githubusercontent.com/nothings/stb/master/stb_image.h', 'stb_image.h')
try:
for sstv_class in supported:
print 'Testing', sstv_class
gcc = Popen(['gcc', '-xc', '-lm', '-o', exe, '-'], stdin=PIPE)
start = datetime.now()
with open(path.join(path.dirname(__file__), 'codeman.c')) as cm:
c_src = cm.read().replace('#include "codegen.c"', '\n'.join(main(sstv_class)))
gcc.communicate(c_src)
gen_elapsed = datetime.now() - start
print ' - gengcc took', gen_elapsed
start = datetime.now()
gen = check_output([exe, img_file])
native_elapsed = datetime.now() - start
print ' - native took', native_elapsed
img = Image.open(img_file)
sstv = sstv_class(img, 44100, 16)
start = datetime.now()
try:
for n, (freq, msec) in enumerate(sstv.gen_freq_bits()):
assert gen[n * 8:(n + 1) * 8] == struct.pack('ff', freq, msec)
except AssertionError:
mode_name = sstv_class.__name__
with file('/tmp/{0}-c.bin'.format(mode_name), 'wb') as f:
f.write(gen)
with file('/tmp/{0}-py.bin'.format(mode_name), 'wb') as f:
for n, (freq, msec) in enumerate(sstv.gen_freq_bits()):
f.write(struct.pack('ff', freq, msec))
with file('/tmp/{0}.c'.format(mode_name), 'w') as f:
f.write(c_src)
print (" ! Outputs are different, they've been saved to "
"/tmp/{0}-{{c,py}}.bin, along with the C source code "
"in /tmp/{0}.c").format(mode_name)
python_elapsed = datetime.now() - start
print ' - python took', python_elapsed
print ' - speedup:', python_elapsed.total_seconds() / native_elapsed.total_seconds()
print 'OK'
finally:
try:
remove(exe)
except OSError:
pass
if __name__ == '__main__':
from sys import argv
if len(argv) > 2 and argv[1] == 'test':
test(argv[2])
else:
print '\n'.join(main())

View File

@@ -0,0 +1,22 @@
#!/usr/bin/env python
"""
This example streams the raw floating point samples to stdout in 4-byte
single precision format, so that it can be processed outside PySSTV.
Usage example: get_floats.py | play -r 44100 -t f32 -c 1 --norm -
"""
from PIL import Image
from pysstv.grayscale import Robot8BW
import struct, sys
def main():
img = Image.open("160x120bw.png")
sstv = Robot8BW(img, 44100, 16)
sstv.vox_enabled = True
for value in sstv.gen_values():
sys.stdout.write(struct.pack('f', value))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,23 @@
#!/usr/bin/env python
"""
This example streams the raw floating point (freq, msec) tuples to stdout
in 4-byte single precision format (8 bytes per tuple), so that it can be
processed outside PySSTV.
Usage example using unixsstv/gen_values:
get_freq_bits.py | gen_values 44100 | play -r 44100 -t f32 -c 1 --norm -
"""
from PIL import Image
from pysstv.color import MartinM1
import struct, sys
def main():
img = Image.open("320x256rgb.png")
sstv = MartinM1(img, 44100, 16)
for freq, msec in sstv.gen_freq_bits():
sys.stdout.write(struct.pack('ff', freq, msec))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,253 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# copy to ~/.gimp-2.8/plug-ins/
# dependencies: GIMP 2.8, python-imaging-tk, python-pyaudio
from gimpfu import register, main, pdb, PF_BOOL, PF_STRING, PF_RADIO, CLIP_TO_IMAGE
from PIL import Image, ImageTk
from Tkinter import Tk, Canvas, Button, Checkbutton, IntVar, Frame, LEFT, NW
from pysstv import __main__ as pysstv_main
from pysstv.examples.pyaudio_sstv import PyAudioSSTV
from pysstv.sstv import SSTV
from itertools import repeat
from threading import Thread
from Queue import Queue, Empty
from time import sleep
import gimp, os
MODULE_MAP = pysstv_main.build_module_map()
class AudioThread(Thread):
def __init__(self, sstv, parent):
Thread.__init__(self)
self.pas = PyAudioSSTV(sstv)
self.parent = parent
def run(self):
self.pas.execute()
self.parent.audio_thread_ended()
def stop(self):
if self.pas is not None:
self.pas.sampler = []
self.pas = None
class Sine1750(SSTV):
encode_line = None
def gen_freq_bits(self):
return repeat((1750, 1000))
class Transmitter(object):
def __init__(self, sstv, root, progress, set_ptt_pin, ptt_state):
def encode_line_hooked(line):
progress.update_image(line)
return self.original_encode_line(line)
self.progress = progress
self.sstv = sstv
self.original_encode_line = sstv.encode_line
sstv.encode_line = encode_line_hooked
self.root = root
self.tx_enabled = IntVar()
self.audio_thread = None
self.stopping = False
self.set_ptt_pin = set_ptt_pin
self.ptt_state = ptt_state
def set_ptt(self, state):
if self.set_ptt_pin is None:
return
if not state:
sleep(0.2)
self.set_ptt_pin(state != self.ptt_state)
if state:
sleep(0.2)
def start_stop_tx(self):
if self.tx_enabled.get():
self.stopping = False
self.audio_thread = AudioThread(self.sstv, self)
self.set_ptt(True)
self.audio_thread.start()
else:
self.stop()
if self.progress is not None:
self.progress.update_image()
def stop(self):
if self.audio_thread is not None:
self.stopping = True
self.audio_thread.stop()
self.set_ptt(False)
def audio_thread_ended(self):
if not self.stopping:
self.set_ptt(False)
self.tx_enabled.set(0)
def close(self):
self.root.destroy()
class CanvasUpdater(Thread):
def __init__(self, progress):
Thread.__init__(self)
self.progress = progress
self.queue = Queue()
self.should_run = True
def stop(self):
self.should_run = False
def update_image(self, line=None):
self.queue.put(line)
def run(self):
while self.should_run:
try:
self.progress.update_image(self.queue.get(timeout=0.5))
except Empty:
pass
class ProgressCanvas(Canvas):
def __init__(self, master, image):
self.height_ratio = 1
width, height = image.size
pixels = image.load()
RED, GREEN, BLUE = range(3)
self.colors = ['#{0:02x}{1:02x}{2:02x}'.format(
contrast(sum(pixels[x, y][RED] for x in xrange(width)) / width),
contrast(sum(pixels[x, y][GREEN] for x in xrange(width)) / width),
contrast(sum(pixels[x, y][BLUE] for x in xrange(width)) / width))
for y in xrange(height)]
if height / float(width) > 1.5:
width *= 2
elif width < 200:
width *= 2
height *= 2
self.height_ratio = 2
if (width, height) != image.size:
image = image.resize((width, height))
Canvas.__init__(self, master, width=width, height=height)
self.tk_img = ImageTk.PhotoImage(image)
self.update_image()
self.pack()
def update_image(self, line=None):
image = self.tk_img
self.create_image(0, 0, anchor=NW, image=image)
if line is not None:
fill = self.colors[line]
line *= self.height_ratio
self.create_line(0, line, image.width(), line, fill=fill)
def contrast(value):
if 80 < value < 160:
return value + (80 if value < 120 else -80)
else:
return 255 - value
def transmit_current_image(image, drawable, mode, vox, fskid, ptt_port, ptt_pin, ptt_state):
sstv = MODULE_MAP[mode]
if ptt_port is not None:
from serial import Serial
set_ptt_pin = getattr(Serial(ptt_port), 'set' + ptt_pin)
set_ptt_pin(ptt_state)
else:
set_ptt_pin = None
pil_img = match_image_with_sstv_mode(image_gimp_to_pil(image), sstv)
root = Tk()
cu = CanvasUpdater(ProgressCanvas(root, pil_img))
cu.start()
tm = Transmitter(init_sstv(sstv, pil_img, vox, fskid), root, cu, set_ptt_pin, ptt_state)
tm1750 = Transmitter(Sine1750(None, 44100, 16), None, None, set_ptt_pin, ptt_state)
buttons = Frame(root)
for text, tram in (('TX', tm), ('1750 Hz', tm1750)):
Checkbutton(buttons, text=text, indicatoron=False, padx=5, pady=5,
variable=tram.tx_enabled, command=tram.start_stop_tx).pack(side=LEFT)
Button(buttons, text="Close", command=tm.close).pack(side=LEFT)
buttons.pack()
root.mainloop()
for obj in (tm, tm1750, cu):
obj.stop()
def image_gimp_to_pil(image):
try:
sandbox = image.duplicate()
for layer in sandbox.layers:
if not layer.visible:
sandbox.remove_layer(layer)
sandbox.merge_visible_layers(CLIP_TO_IMAGE)
layer = sandbox.layers[0]
if not layer.is_rgb:
pdb.gimp_image_convert_rgb(sandbox)
if layer.has_alpha:
pdb.gimp_layer_flatten(layer)
w, h = layer.width, layer.height
pixels = layer.get_pixel_rgn(0, 0, w, h)[:, :] # all pixels
return Image.frombuffer('RGB', (w, h), pixels, 'raw', 'RGB', 0, 0)
finally:
gimp.delete(sandbox)
def match_image_with_sstv_mode(image, mode):
mode_size = mode.WIDTH, mode.HEIGHT
if image.size != mode_size:
image = image.resize(mode_size, Image.ANTIALIAS)
if 'grayscale' in mode.__module__:
image = image.convert('LA').convert('RGB')
return image
def init_sstv(mode, image, vox, fskid):
s = mode(image, 44100, 16)
s.vox_enabled = vox
if fskid:
s.add_fskid_text(fskid)
return s
def get_serial_ports():
try:
if os.name == 'nt':
from serial.tools.list_ports_windows import comports
elif os.name == 'posix':
from serial.tools.list_ports_posix import comports
else:
raise ImportError("Sorry: no implementation for your"
"platform ('%s') available" % (os.name,))
except ImportError:
yield "(couldn't import PySerial)", None
else:
yield "(disabled)", None
for port, desc, _ in comports():
yield '{0} ({1})'.format(port, desc.strip()), port
register(
"pysstv_for_gimp",
"PySSTV for GIMP",
"Transmits the current image using PySSTV",
"Andras Veres-Szentkiralyi",
"Andras Veres-Szentkiralyi",
"November 2013",
"<Image>/PySSTV/Transmit...",
"*",
[
(PF_RADIO, "mode", "SSTV mode", "MartinM1",
tuple((n, n) for n in sorted(MODULE_MAP.iterkeys()))),
(PF_BOOL, "vox", "Include VOX tones", True),
(PF_STRING, "fskid", "FSK ID", ""),
(PF_RADIO, "ptt_port", "PTT port", None,
tuple(get_serial_ports())),
(PF_RADIO, "ptt_pin", "PTT pin", "RTS",
tuple((n, n) for n in ("RTS", "DTR"))),
(PF_BOOL, "ptt_state", "PTT inversion", False),
],
[],
transmit_current_image
)
main()

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python
"""
Demonstrates adding text overlay (callsign, RSV, etc.) using PIL
See example output received by slowrx at the following URL:
http://vsza.hu/c6fa52b2c7b20320bdab2da15877f0efbd466e67d37c8d124a557367de9380da.png
"""
from PIL import Image, ImageFont, ImageDraw
from pysstv.grayscale import Robot8BW
img = Image.open("160x120bw.png")
font = ImageFont.load_default()
draw = ImageDraw.Draw(img)
draw.text((0, 0), "HA5VSA", (255,255,255), font=font)
sstv = Robot8BW(img, 44100, 16)
sstv.write_wav("overlay.wav")

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python
"""
Demonstrates playing the generated samples directly using PyAudio
Tested on PyAudio 0.2.7 http://people.csail.mit.edu/hubert/pyaudio/
"""
from __future__ import division
from pysstv.sstv import SSTV
from time import sleep
from itertools import islice
import struct, pyaudio
class PyAudioSSTV(object):
def __init__(self, sstv):
self.pa = pyaudio.PyAudio()
self.sstv = sstv
self.fmt = '<' + SSTV.BITS_TO_STRUCT[sstv.bits]
def __del__(self):
self.pa.terminate()
def execute(self):
self.sampler = self.sstv.gen_samples()
stream = self.pa.open(
format=self.pa.get_format_from_width(self.sstv.bits // 8),
channels=1, rate=self.sstv.samples_per_sec, output=True,
stream_callback=self.callback)
stream.start_stream()
while stream.is_active():
sleep(0.5)
stream.stop_stream()
stream.close()
def callback(self, in_data, frame_count, time_info, status):
frames = ''.join(struct.pack(self.fmt, b)
for b in islice(self.sampler, frame_count))
return frames, pyaudio.paContinue
def main():
from PIL import Image
from pysstv.grayscale import Robot8BW
img = Image.open("160x120bw.png")
sstv = Robot8BW(img, 44100, 16)
sstv.vox_enabled = True
PyAudioSSTV(sstv).execute()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,73 @@
#!/usr/bin/env python
"""
Simple repeater that monitors a single directory using inotify, and if
an image appears, it tries to repeat it on using the PyAudio example,
trying to match the mode used for receiving it. It can be tested by
simply copying/linking images to the directory or suing an SSTV
receiver such as slowrx or QSSTV.
"""
from __future__ import print_function
from pyinotify import WatchManager, Notifier, ProcessEvent, IN_CREATE
from pyaudio_sstv import PyAudioSSTV
from pysstv.color import MartinM1, MartinM2, ScottieS1, ScottieS2
from pysstv.grayscale import Robot8BW, Robot24BW
from PIL import Image
from os import path
# matches the abbreviations used by slowrx and QSSTV
MODE_MAP = {
'M1': MartinM1,
'M2': MartinM2,
'S1': ScottieS1,
'S2': ScottieS2,
'R8BW': Robot8BW,
'R24BW': Robot24BW,
}
class EventHandler(ProcessEvent):
def process_IN_CREATE(self, event):
filename = event.pathname
print('Found image', filename)
mode = get_module_for_filename(filename)
img = Image.open(filename)
if mode is None:
mode = get_module_for_image(img)
if mode is None:
print('No suitable mode found to repeat', filename)
return
print('Repeating image using', mode.__name__)
sstv = mode(img, 44100, 16)
sstv.vox_enabled = True
PyAudioSSTV(sstv).execute()
def get_module_for_filename(filename):
basename, _ = path.splitext(path.basename(filename))
for mode, module in MODE_MAP.iteritems():
if mode in basename:
return module
def get_module_for_image(image):
size = image.size
for mode in MODE_MAP.itervalues():
if all(i >= m for i, m in zip(size, (mode.WIDTH, mode.HEIGHT))):
return mode
def main():
from sys import argv, stderr
try:
directory = argv[1]
except IndexError:
print("Usage: {0} <directory>".format(argv[0]), file=stderr)
else:
watch(directory)
def watch(directory):
wm = WatchManager()
notifier = Notifier(wm, EventHandler())
wm.add_watch(directory, IN_CREATE)
notifier.loop()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,43 @@
#!/usr/bin/env python
from __future__ import division
from six.moves import range
from pysstv.sstv import SSTV, byte_to_freq
class GrayscaleSSTV(SSTV):
def on_init(self):
self.pixels = self.image.convert('LA').load()
def gen_image_tuples(self):
for line in range(self.HEIGHT):
for item in self.horizontal_sync():
yield item
for item in self.encode_line(line):
yield item
def encode_line(self, line):
msec_pixel = self.SCAN / self.WIDTH
image = self.pixels
for col in range(self.WIDTH):
pixel = image[col, line]
freq_pixel = byte_to_freq(pixel[0])
yield freq_pixel, msec_pixel
class Robot8BW(GrayscaleSSTV):
VIS_CODE = 0x02
WIDTH = 160
HEIGHT = 120
SYNC = 7
SCAN = 60
class Robot24BW(GrayscaleSSTV):
VIS_CODE = 0x0A
WIDTH = 320
HEIGHT = 240
SYNC = 7
SCAN = 93
MODES = (Robot8BW, Robot24BW)

View File

@@ -0,0 +1,140 @@
#!/usr/bin/env python
from __future__ import division, with_statement
from six.moves import range
from six.moves import map
from six.moves import zip
from math import sin, pi
from random import random
from contextlib import closing
from itertools import cycle, chain
from array import array
import wave
FREQ_VIS_BIT1 = 1100
FREQ_SYNC = 1200
FREQ_VIS_BIT0 = 1300
FREQ_BLACK = 1500
FREQ_VIS_START = 1900
FREQ_WHITE = 2300
FREQ_RANGE = FREQ_WHITE - FREQ_BLACK
FREQ_FSKID_BIT1 = 1900
FREQ_FSKID_BIT0 = 2100
MSEC_VIS_START = 300
MSEC_VIS_SYNC = 10
MSEC_VIS_BIT = 30
MSEC_FSKID_BIT = 22
class SSTV(object):
def __init__(self, image, samples_per_sec, bits):
self.image = image
self.samples_per_sec = samples_per_sec
self.bits = bits
self.vox_enabled = False
self.fskid_payload = ''
self.nchannels = 1
self.on_init()
def on_init(self):
pass
BITS_TO_STRUCT = {8: 'b', 16: 'h'}
def write_wav(self, filename):
"""writes the whole image to a Microsoft WAV file"""
fmt = self.BITS_TO_STRUCT[self.bits]
data = array(fmt, self.gen_samples())
if self.nchannels != 1:
data = array(fmt, chain.from_iterable(
zip(*([data] * self.nchannels))))
with closing(wave.open(filename, 'wb')) as wav:
wav.setnchannels(self.nchannels)
wav.setsampwidth(self.bits // 8)
wav.setframerate(self.samples_per_sec)
wav.writeframes(data)
def gen_samples(self):
"""generates discrete samples from gen_values()
performs quantization according to
the bits per sample value given during construction
"""
max_value = 2 ** self.bits
alias = 1 / max_value
amp = max_value // 2
lowest = -amp
highest = amp - 1
alias_cycle = cycle((alias * (random() - 0.5) for _ in range(1024)))
for value, alias_item in zip(self.gen_values(), alias_cycle):
sample = int(value * amp + alias_item)
yield (lowest if sample <= lowest else
sample if sample <= highest else highest)
def gen_values(self):
"""generates samples between -1 and +1 from gen_freq_bits()
performs sampling according to
the samples per second value given during construction
"""
spms = self.samples_per_sec / 1000
offset = 0
samples = 0
factor = 2 * pi / self.samples_per_sec
sample = 0
for freq, msec in self.gen_freq_bits():
samples += spms * msec
tx = int(samples)
freq_factor = freq * factor
for sample in range(tx):
yield sin(sample * freq_factor + offset)
offset += (sample + 1) * freq_factor
samples -= tx
def gen_freq_bits(self):
"""generates tuples (freq, msec) that describe a sine wave segment
frequency "freq" in Hz and duration "msec" in ms
"""
if self.vox_enabled:
for freq in (1900, 1500, 1900, 1500, 2300, 1500, 2300, 1500):
yield freq, 100
yield FREQ_VIS_START, MSEC_VIS_START
yield FREQ_SYNC, MSEC_VIS_SYNC
yield FREQ_VIS_START, MSEC_VIS_START
yield FREQ_SYNC, MSEC_VIS_BIT # start bit
vis = self.VIS_CODE
num_ones = 0
for _ in range(7):
bit = vis & 1
vis >>= 1
num_ones += bit
bit_freq = FREQ_VIS_BIT1 if bit == 1 else FREQ_VIS_BIT0
yield bit_freq, MSEC_VIS_BIT
parity_freq = FREQ_VIS_BIT1 if num_ones % 2 == 1 else FREQ_VIS_BIT0
yield parity_freq, MSEC_VIS_BIT
yield FREQ_SYNC, MSEC_VIS_BIT # stop bit
for freq_tuple in self.gen_image_tuples():
yield freq_tuple
for fskid_byte in map(ord, self.fskid_payload):
for _ in range(6):
bit = fskid_byte & 1
fskid_byte >>= 1
bit_freq = FREQ_FSKID_BIT1 if bit == 1 else FREQ_FSKID_BIT0
yield bit_freq, MSEC_FSKID_BIT
def gen_image_tuples(self):
return []
def add_fskid_text(self, text):
self.fskid_payload += '\x20\x2a{0}\x01'.format(
''.join(chr(ord(c) - 0x20) for c in text))
def horizontal_sync(self):
yield FREQ_SYNC, self.SYNC
def byte_to_freq(value):
return FREQ_BLACK + FREQ_RANGE * value / 255

View File

@@ -0,0 +1,3 @@
#!/usr/bin/env python
__all__ = ['common', 'test_color', 'test_sstv']

View File

@@ -0,0 +1,9 @@
from os import path
import pickle
def get_asset_filename(filename):
return path.join(path.dirname(__file__), 'assets', filename)
def load_pickled_asset(filename):
with open(get_asset_filename(filename + '.p'), 'rb') as f:
return pickle.load(f)

View File

@@ -0,0 +1,44 @@
import unittest
from itertools import islice
from PIL import Image
from pysstv import color
from pysstv.tests.common import get_asset_filename, load_pickled_asset
class TestMartinM1(unittest.TestCase):
def setUp(self):
self.image = Image.new('RGB', (320, 256))
self.s = color.MartinM1(self.image, 48000, 16)
lena = Image.open(get_asset_filename('320x256.png'))
self.lena = color.MartinM1(lena, 48000, 16)
def test_gen_freq_bits(self):
expected = load_pickled_asset("MartinM1_freq_bits")
actual = list(islice(self.s.gen_freq_bits(), 0, 1000))
self.assertEqual(expected, actual)
def test_gen_freq_bits_lena(self):
expected = load_pickled_asset("MartinM1_freq_bits_lena")
actual = list(islice(self.lena.gen_freq_bits(), 0, 1000))
self.assertEqual(expected, actual)
def test_encode_line(self):
zeroth = list(self.s.encode_line(0))
first = list(self.s.encode_line(1))
tenth = list(self.s.encode_line(10))
eleventh = list(self.s.encode_line(11))
self.assertEqual(zeroth, first)
self.assertEqual(tenth, eleventh)
self.assertEqual(zeroth, eleventh)
def test_encode_line_lena(self):
self.maxDiff = None
line_numbers = [1, 10, 100]
for line in line_numbers:
expected = load_pickled_asset("MartinM1_encode_line_lena{0}".format(line))
actual = list(self.lena.encode_line(line))
self.assertEqual(expected, actual)

View File

@@ -0,0 +1,82 @@
import unittest
from itertools import islice
from six.moves import zip
import mock
from mock import MagicMock
from six import BytesIO
from six import PY2
import hashlib
from pysstv import sstv
from pysstv.sstv import SSTV
from pysstv.tests.common import load_pickled_asset
class TestSSTV(unittest.TestCase):
def setUp(self):
self.s = SSTV(False, 48000, 16)
self.s.VIS_CODE = 0x00
self.s.SYNC = 7
def test_horizontal_sync(self):
horizontal_sync = self.s.horizontal_sync()
expected = (1200, self.s.SYNC)
actual = next(iter(horizontal_sync))
self.assertEqual(expected, actual)
def test_gen_freq_bits(self):
gen_freq_bits = self.s.gen_freq_bits()
expected = [(1900, 300),
(1200, 10),
(1900, 300),
(1200, 30),
(1300, 30),
(1300, 30),
(1300, 30),
(1300, 30),
(1300, 30),
(1300, 30),
(1300, 30),
(1300, 30),
(1200, 30)]
actual = list(islice(gen_freq_bits, 0, 1000))
self.assertEqual(expected, actual)
# FIXME: Instead of using a test fixture, 'expected' should be synthesized?
def test_gen_values(self):
gen_values = self.s.gen_values()
expected = load_pickled_asset("SSTV_gen_values")
for e, g in zip(expected, gen_values):
self.assertAlmostEqual(e, g, delta=0.000000001)
def test_gen_samples(self):
gen_values = self.s.gen_samples()
# gen_samples uses random to avoid quantization noise
# by using additive noise, so there's always a chance
# of running the code two consecutive times on the same machine
# and having different results.
# https://en.wikipedia.org/wiki/Quantization_%28signal_processing%29
sstv.random = MagicMock(return_value=0.4) # xkcd:221
expected = load_pickled_asset("SSTV_gen_samples")
actual = list(islice(gen_values, 0, 1000))
for e, a in zip(expected, actual):
self.assertAlmostEqual(e, a, delta=1)
def test_write_wav(self):
self.maxDiff = None
bio = BytesIO()
bio.close = MagicMock() # ignore close() so we can .getvalue()
mock_open = MagicMock(return_value=bio)
ns = '__builtin__' if PY2 else 'builtins'
with mock.patch('{0}.open'.format(ns), mock_open):
self.s.write_wav('unittest.wav')
expected = 'dd7eed880ab3360fb79ce09c469deee2'
data = bio.getvalue()
actual = hashlib.md5(data).hexdigest()
self.assertEqual(expected, actual)
def test_init(self):
self.assertEqual(self.s.image, False)
self.assertEqual(self.s.samples_per_sec, 48000)
self.assertEqual(self.s.bits, 16)