adventofcode.com – after half-way point

This year I have participated in the fantastic project of Advent of Code. It works like a typical advent calendar where every day you have different task to do. However this calendar gives you a programming problems to implement. My solutions can be found on github repo.

Please find below the screenshot of this calendar:

Screenshot from 2018-12-14 09-39-46

Raspberry Pi: radio with play/stop button

I have decided to extend my pi radio and create a prototype of a button with which I could play defined url and stop it as well. I reused my code of mpd client and my button’s experiment to create a handle of single button. As long as the button is pressed the LED diode is on. When the button press is short (i.e. less than 1s) then radio starts playing hardcoded url. When the button press is longer than 1s then radio stops:

#!/usr/bin/env python

import mpd_client
import RPi.GPIO as gpio
import time
import commands

def main_loop():
  gpio.setmode(gpio.BCM)
  gpio.setup(14, gpio.IN)
  gpio.setup(15, gpio.OUT)
  
  try:
    button_down = 0  
    while True:
      button = gpio.input(14)
      if not button:
        gpio.output(15, gpio.HIGH)
        if not button_down:
          button_down = time.time()
      else:
        gpio.output(15, gpio.LOW)
        now = time.time()
        if button_down:
          diff = now - button_down
          if diff  1.0 and diff  3.0 and diff  5.0:
            restart()
        button_down = 0
      time.sleep(100.0/1000.0) 

  except KeyboardInterrupt, e:
    gpio.cleanup()

def play():
  url = 'http://stream4.nadaje.com:8002/muzo'
  ip = '127.0.0.1'
  port = 6600
  print 'Connecting to mpd server:',ip,':',port
  print 'Playing: ', url
  mpd_client.play(ip, port, url)

def stop():
  ip = '127.0.0.1'
  port = 6600
  print 'Connecting to mpd server:',ip,':',port
  print 'Stoping: '
  mpd_client.stop(ip, port)
    
def network_restart():
  cmd = 'service networking restart'
  print 'network restart', cmd
  (status, output) = commands.getstatusoutput(cmd)
  if status:
    sys.stderr.write(output)
    sys.exit(status)
  print output

def restart():
  cmd = 'shutdown -r 0'
  print 'restart', cmd
  (status, output) = commands.getstatusoutput(cmd)
  if status:
    sys.stderr.write(output)
    sys.exit(status)
  print output

main_loop()

The code above contains also some “service” features. When the button will be pressed longer than 3s the network will be restarted, and when it will be pressed longer than 5s the full device will restart.

Moreover with a help of this great tutorial I installed my script as a systemd service which starts automatically with Raspberry pi.

The prototype looks like this:
20180208_151834

For the real implementation I decided to modify a circuit. I removed all the elements and stay only with tact switch. I connected directly first pin of the button to the gpio pin 21 and second one to the gpio ground. There was also a need to modify slightly a setup code:

gpio.setup(21, gpio.IN, pull_up_down=gpio.PUD_UP)

the rest was not changed. With this line I turn on the raspberry’s pull up resistor so it can be safely eliminated from the circuit.

The script can be found on github.

The version with the single button which I placed on the top of the cover20180209_120141:

Raspberry pi GPIO: first steps – testing input

I extended the circuit I made with LED and this time I added the tact switch. The LED is connected through 470Ohm resistor to pin 15. The button’s first pin is connected to the GND and second one to the pin 14 and through 10kOhm resistor to 3.3V output. When the button is pressed it closes the circuit so the current flows from 3.3V through the resistor to the ground and this is why the pin 14 returns false in input.
In my example when the button is pressed it turns on the LED for 1 second:

#!/usr/bin/env python
import RPi.GPIO as gpio
import time
gpio.setmode(gpio.BCM)
gpio.setup(14, gpio.IN)
gpio.setup(15, gpio.OUT)
while True:
  button = gpio.input(14)
  if not button:
    gpio.output(15, gpio.HIGH)
    time.sleep(1)
    gpio.output(15, gpio.LOW)
    while not button:
      button = gpio.input(14)

The tact switch:
20180206_204205

Raspberry pi GPIO: first steps – testing output

Recently I have decided to play around with GPIO of my Raspberry Pi. My goal is to find some useful application of LED diodes and tact switches and probably I will implement some additional features to pi radio.
But initially I just want to only turn on the LED diode to understand how it really works. So I created simple circuit with LED and resistor to protect this diode. The output pin gives me 3.3V and my diode allows only 2.2V and 20mA. So the resistor should have 55Ohms, however I used a bit greater resistance 470Ohm (it was the lowest I could find).

Here is the numbering of GPIO pins: GPIO

I wrote the python script which uses pin 14 and 15 to turn on alternately
the diodes green and red every 1 second:

#!/usr/bin/env python

import RPi.GPIO as gpio
import time

gpio.setmode(gpio.BCM)
gpio.setup(14, gpio.OUT)
gpio.setup(15, gpio.OUT)

while True: 
  gpio.output(14, gpio.HIGH) 
  gpio.output(15, gpio.LOW) 
  time.sleep(1) 
  gpio.output(14, gpio.LOW) 
  gpio.output(15, gpio.HIGH) 
  time.sleep(1)

And here is the presentation:
test.gif

Python: converter of video stream to gif

I wrote a script which invokes ffmpeg and convert commands to create the gif file.
usage: output.gif input.mp4 [--fps fps]

#!/usr/bin/env python

import sys
import os
import commands

def make_gif(input_f, output_f, fps):
  tmp = os.path.join('.', 'tmp');
  if not os.path.exists(tmp):
    os.mkdir(tmp)
  to_jpg(tmp, input_f, fps)  
  to_gif(tmp, output_f, fps)  
  clean(tmp)  

def to_jpg(tmp, input_f, fps):
  cmd = 'ffmpeg -i ' + input_f + ' -r ' + str(fps) + ' ' + os.path.join(tmp, 'f%04d.jpg')  
  print 'dumping:', cmd
  (status, output) = commands.getstatusoutput(cmd)
  if status:
    sys.stderr.write(output)
    sys.exit(status)
  print output

def to_gif(tmp, output_f, fps):
  cmd = 'convert -delay ' + str(100/fps) + ' -loop 0 '  + os.path.join(tmp, '*.jpg') + ' ' + output_f
  print 'creating gif:', cmd
  (status, output) = commands.getstatusoutput(cmd)
  if status:
    sys.stderr.write(output)
    sys.exit(status)
  print output

def clean(tmp):
  cmd = 'rm -rf '+ tmp
  print 'cleaning:', cmd
  (status, output) = commands.getstatusoutput(cmd)
  if status:
    sys.stderr.write(output)
    sys.exit(status)
  print output

def num(s):
  try:
    return int(s)
  except ValueError:
    return 0

def main():
  args = sys.argv[1:]
  if not args:
    print "usage: output.gif input.mp4 [--fps fps]";
    sys.exit(1)
  
  if len(args)  1 and args[0] == '--fps':
    fps = num(args[1])
    if fps == 0:
      fps = 5
    del args[0:2]

  print 'Converting',input_f,'to',output_f,'with',fps,'fps'
  make_gif(input_f, output_f, fps)

if __name__ == "__main__":
  main()

The script can be found on github.

Python: mpd client

I wrote a python script to connect to my pi radio and invoke stop or start command to play radio stream.
usage: {start|stop} [--url url] [--ip ip] [--port port]

#!/usr/bin/env python

import sys
from mpd import MPDClient

def connect(ip, port):
  client = MPDClient()
  client.timeout = 60     # network timeout in seconds (floats allowed), default: None
  client.idletimeout = None
  client.connect(ip, port)
  return client

def stop(ip, port):
  client = connect(ip, port)
  client.clear()
  return client

def play(ip, port, url):
  client = stop(ip, port)  
  client.add(url)
  client.play()

def num(s):
  try:
    return int(s)
  except ValueError:
    return 0

def main():
  args = sys.argv[1:]
  if not args:
    print "usage: {start|stop} [--url url] [--ip ip] [--port port]";
    sys.exit(1)

  start = False
  if args[0] == 'start':
    start = True
  elif args[0] == 'stop':
    start = False
  else: 
    print 'command {start|stop} not specified'
    sys.exit(1)
  del args[0]
  
  url = 'http://stream4.nadaje.com:8002/muzo'
  if args and args[0] == '--url':
    url = args[1]
    del args[0:2]
  
  ip = '192.168.0.3'
  if args and args[0] == '--ip':
    ip = args[1]
    del args[0:2]
  
  port = 6600
  if args and args[0] == '--port':
    port = num(args[1])
    del args[0:2]

  print 'Connecting to mpd server:',ip,':',port
  if start:
    print 'Playing: ', url
    play(ip, port, url)
  else:  
    print 'Stopping'
    stop(ip, port)

if __name__ == "__main__":
  main()

The script can be found on github.

Python: iwlist scan analyser

I wrote a Python script to check the channels of the surrounding wifi networks. Recently I was facing some issues with the quality of signal so I decided to check what’s going on with the help of the powerful linux tool. In scanning mode iwlist provides huge amount of information but in fact I just needed to know which networks occupied which channels. So I wrote an analyser which reads in standard input (e.g. using pipe) the output of iwlist.
usage: iwlist wlp3s0 scan |./analyser.py

The analyser lists the channels with frequency and below the names, link qualities and signal power of the networks using this channel:

		name:               	quality:  	power:    
Channel: 1 (2.412 GHz)
		Name18272346        	36/70     	-74 dBm   
		Name20053835        	27/70     	-83 dBm   
		Name3742A1          	26/70     	-84 dBm   
		Name4123456         	25/70     	-85 dBm   
		Name57005901        	23/70     	-87 dBm   
		Name6D2A47          	22/70     	-88 dBm   
Channel: 11 (2.462 GHz)
		Name119496          	50/70     	-60 dBm   
		Name20682504        	39/70     	-71 dBm   
Channel: 13 (2.472 GHz)
		Name10682504          	38/70     	-72 dBm   
Channel: 52 (5.26 GHz)
		Name10682504        	32/70     	-78 dBm
#!/usr/bin/python

import sys
import re

def num(s):
  try:
    return int(s)
  except ValueError:
    return 0
  
def first(x):
  match = re.search(r'Channel: (\d+)', x)
  if match:
    return num(match.group(1))
  return x

def last(x):
  power = x[2]  
  match = re.search(r'-(\d+) dBm', power)
  if match:
    return match.group(1)  
  return power
  
def print_cells(cells):
  sorted_cells = sorted(cells, key=last)  
  for cell in sorted_cells:
    print '\t\t','\t'.join(cell)

def process_channels(channels):
  print_cells([(crop('name:', 20), crop('quality:', 10), crop('power:', 10))])
  sorted_keys = sorted(channels, key=first)
  for channel in sorted_keys:
      print channel
      print_cells(channels[channel])

def crop(name, chars):
  if len(name) > chars:  
    return name[:chars]
  else:
    return name.ljust(chars)

def main():
  channels = {}
  channel = ''
  name = ''
  quality = ''
  level = ''
  for line in sys.stdin:
    to_add = False
    match = re.search(r'Frequency:(.+) GHz \(Channel (\d+)\)', line)
    if match:
      channel = 'Channel: '+ match.group(2) + ' ('+ match.group(1)+' GHz)'
      name = ''
      quality = ''
      level = ''
    match = re.search(r'Quality=(\S+)\s+Signal level=(.+) dBm', line)
    if match:
      quality = crop(match.group(1), 10)
      level = crop(match.group(2) + ' dBm', 10)
    match = re.search(r'ESSID:"(.+)"', line)
    if match:
      name = crop(match.group(1), 20)
      to_add = True

    if to_add:
      if channel in channels:
        channels[channel].append((name, quality, level))
      else:
        channels[channel] = [(name, quality, level)]  

  process_channels(channels)
    
if __name__ == "__main__":
  main()

The script can be found on github.

Python: simple script to backup PostgreSQL database

I wrote a Python script to make a backup of the PostgreSQL database.
usage: [--dumpdir dir] [--clean days] database user
where:
dumpdir – destination directory for dump
clean – number of days back in the history to keep the previous dumps
database – database name
user – database user

#!/usr/bin/python

import sys
import os
import commands
from time import gmtime, strftime

def get_size(dumpname):
 cmd = 'du -sh ' + dumpname
 (status, output) = commands.getstatusoutput(cmd)
 if status:
   sys.stderr.write(output)
   sys.exit(status)
 return output

def dump_db(database, user, dumpname):
  cmd = 'pg_dump ' + database + ' -Z6 -U ' + user + ' -f ' + dumpname # + ' --table main.placeholder'
  print 'dumping:', cmd
  (status, output) = commands.getstatusoutput(cmd)
  if status:
    sys.stderr.write(output)
    sys.exit(status)
  print 'dump created: ' + get_size(dumpname)

def clean_old_dumps(days, dumpdir, namebase):
  cmd = "find " + dumpdir + " -maxdepth 1 -mtime +" + str(days) + " -name \"" + namebase + "*.gz\" -exec rm -rf '{}' ';'"
  print 'removing:', cmd
  (status, output) = commands.getstatusoutput(cmd)
  if status:
    sys.stderr.write(output)
    sys.exit(status)

def get_time_name(time):
  return strftime("%Y-%m-%d_%H_%M_%S", time)  

def num(s):
  try:
    return int(s)
  except ValueError:
    return 0

def main():
  args = sys.argv[1:]
  if not args:
    print "usage: [--dumpdir dir] [--clean days] database user";
    sys.exit(1)

  dumpdir = '/tmp'
  if args[0] == '--dumpdir':
    dumpdir = args[1]
    del args[0:2]

  clean = 0
  if len(args) > 1 and args[0] == '--clean':
    clean = num(args[1])
    del args[0:2]

  if len(args) < 2:
    print "error: must specify database and user"
    sys.exit(1)

  database = args[0]
  user = args[1]
  namebase = 'dump_' + database + '_'
  dumpname = os.path.join(dumpdir, namebase + get_time_name(gmtime()) + '.sql.gz')

  dump_db(database, user, dumpname)

  if clean:
    print "Removing dumps made before (days):", clean
    clean_old_dumps(clean, dumpdir, namebase)

if __name__ == "__main__":
  main()

The script can be found on github.

Python exercise: scan open ports of other machines in the local network

I wrote a simple script to fast check open ports in my local network. This script skips the VPN connections.

Usage:
usage: [--max ip] port [port ...]
where:
ip is the maximum ip to scan

#!/usr/bin/python

import socket
from netifaces import interfaces, ifaddresses, AF_INET
import re
import sys

def get_my_ip():
  result = []
  print 'retrieving ip addresses ...'
  for ifaceName in interfaces():
    addresses = [i['addr'] for i in ifaddresses(ifaceName).setdefault(AF_INET, [{'addr':'No IP addr'}] )]
    if not addresses:
      continue;
    address = addresses[0]
    if address == 'No IP addr':
      continue
    if ifaceName == 'lo':
      print 'Skip local: ', address
      continue
    match = re.search(r'tap', ifaceName)
    if match:
      print 'Skip vpn: ', address
      continue
    result.append(address)
  print 'found:',', '.join(result)
  return result

def check_ips(ip, maxip, ports):
  match = re.search(r'(\d+.\d+.+\d+).\d+', ip)
  if match:
    ip_base = match.group(1)
    ip_base = ip_base+'.'
    ports_str = ', '.join(str(x) for x in ports)
    print 'checking network: ',ip_base+'1-'+str(maxip)+':'+ports_str
    for i in range(1,maxip):
      ip_to_check = ip_base+str(i)
      check_ip_ports(ip_to_check, ports)

def check_ip_ports(ip, ports):
  for port in ports:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    result = sock.connect_ex((ip,port))
    if result:
      continue
    print 'Ip:', ip, 'has open port', port

def num(s):
  try:
    return int(s)
  except ValueError:
    return 0

def main():
  args = sys.argv[1:]
  if not args:
    print "usage: [--max ip] port [port ...]";
    sys.exit(1)

  maxip = 0
  if args[0] == '--max':
    maxip = num(args[1])
    del args[0:2]

  if not maxip:
    maxip = 255

  if len(args) == 0:
    print "error: must specify one or more ports"
    sys.exit(1)

  ports = []
  for arg in args:
    port = num(arg)
    if port:
      ports.append(port)

  if len(ports) == 0:
    print "error: must specify one or more ports"
    sys.exit(1)

  ips = get_my_ip();
  for ip in ips:
    check_ips(ip, maxip, ports)  

if __name__ == "__main__":
  main()

The script can be found on github.