Raspberry Pi 0 camera surveillance and python telegram bot

2018-09-05

Instead of having your RPi0 stored away in a drawer, collecting dust, you can turn it into your personal assistant as a telegram bot.

For example, I have used mine (and still using) for things like:

  • Tracking for new house rental listings in a local website -- when I was in search of a new house
  • Camera monitoring my home and notifying me for any events -- this post is for this
  • Tracking sales of my girlfriend's handmade jewels -- because Outlook might send a sale e-mail to the spam folder
  • Replying to predefined commands

All these processed in a python script.

Setting up motion

Start off by downloading and installing the latest motion package for your Pi. You can find the releases page here.

Note: Raspbian's repositories have an outdated motion package and that is the reason we're using the latest one.

As of the time this post is being written, the latest package is 4.1.1-1:

wget https://github.com/Motion-Project/motion/releases/download/release-4.1.1/pi_stretch_motion_4.1.1-1_armhf.deb
sudo dpkg -i pi_stretch_motion_4.1.1-1_armhf.deb

Now, let's create the config file for motion:

mkdir ~/.motion
touch ~/.motion/motion.conf

With the following content:

daemon off
process_id_file ~/.motion/motion.pid
setup_mode off
log_level 6
log_type all
videodevice /dev/video0
v4l2_palette 17
input -1
norm 0
frequency 0
power_line_frequency -1
rotate 0
flip_axis none
width 1024
height 768
framerate 2
minimum_frame_time 0
netcam_keepalive off
netcam_tolerant_check off
rtsp_uses_tcp on
mmalcam_name vc.ril.camera
auto_brightness on
brightness 0
contrast 0
saturation 0
hue 0
roundrobin_frames 1
roundrobin_skip 1
switchfilter off
threshold 3000
threshold_tune off
noise_level 32
noise_tune on
despeckle_filter EedDl
smart_mask_speed 0
lightswitch 0
minimum_motion_frames 1
pre_capture 0
post_capture 0
event_gap 0
max_movie_time 0
emulate_motion off
output_pictures first
output_debug_pictures off
quality 75
picture_type jpeg
ffmpeg_output_movies on
ffmpeg_output_debug_movies off
ffmpeg_bps 400000
ffmpeg_variable_bitrate 0
ffmpeg_video_codec mkv
ffmpeg_duplicate_frames true
timelapse_interval 0
timelapse_mode daily
timelapse_fps 2
timelapse_codec mpg
use_extpipe off
snapshot_interval 0
locate_motion_mode preview
locate_motion_style redbox
text_right %Y-%m-%d\n%T-%q
text_changes on
text_event %Y%m%d%H%M%S
text_double off
target_dir /home/[user]/Documents/motion_tmp
snapshot_filename %Y%m%d%H%M%S-snapshot
picture_filename %Y%m%d%H%M%S-%q
movie_filename %Y%m%d%H%M%S
timelapse_filename %Y%m%d-timelapse
ipv6_enabled off
stream_port 0
stream_quality 50
stream_motion off
stream_maxrate 1
stream_localhost off
stream_limit 0
stream_auth_method 0
webcontrol_port 0
webcontrol_localhost on
webcontrol_html_output on
webcontrol_parms 0
track_type 0
track_auto off
track_iomojo_id 0
track_step_angle_x 10
track_step_angle_y 10
track_move_wait 10
track_speed 255
track_stepsize 40
quiet on
on_picture_save "mv %f ~/Documents/motion/"
on_movie_end "mv %f ~/Documents/motion/"

Note: Change the [user] part on target_dir variable.

This configuration works for me. Documentation about the motion program can be found here.

When motion detects movement, it starts saving pictures and videos on ~/Documents/motion_tmp folder and once the files are written, they are moved to ~/Documents/motion/.

We do that because we want to prevent Telegram bot from sending incomplete mkvs and jpgs.

You can test motion by executing:

motion -c ~/.motion/motion.conf

And terminate it by pressing Control + c.

Creating a telegram bot

Head to BotFather and create your first bot. Save the bot's token, we'll use it in a while.

Setting up requirements

Assuming you already have python3.x and pip installed, we'll need the python-telegram-bot package:

sudo pip3 install -U python-telegram-bot

Now, let's create a directory and the python file:

mkdir ~/telegram-bot
touch ~/telegram-bot/bot.py

Code

Main code

Using your favorite editor, start editing bot.py.

from telegram.ext import Updater, CommandHandler
from telegram import ReplyKeyboardMarkup
from os import remove
import subprocess
import logging
import glob

logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        level=logging.INFO)
logger = logging.getLogger(__name__)
def error(bot, update, error):
    logger.warning('Update "%s" caused error "%s"', update, error)

def main():
    updater = Updater("INSERT YOUR TOKEN HERE")
    dp = updater.dispatcher
    dp.add_error_handler(error)
    updater.start_polling()
    updater.idle()

if __name__ == '__main__':
    main()

The code above starts the bot and sits idle waiting for user interaction but it doesn't respond, yet.

Start command & menu buttons

Let's add a /start command handler by adding:

dp.add_handler(CommandHandler("start", start))

just after the dp.add_error_handler(error) line.

We will also need to define a start function which will be executed once the user types /start.

Two buttons:

  • One to start motion detection
  • One to stop motion detection
def start(bot, update):
    menu = [['/motion_on', '/motion_off']]
    markup = ReplyKeyboardMarkup(menu)
    bot.send_message(update.message.chat.id, 'How can I help you?', reply_markup=markup)

start start

Start motion detection

The /motion_on command executes the function bellow which starts the motion program:

def motion_on(bot, update):
    subprocess.Popen(['motion', '-c', '/path/to/motion.conf'], shell=True, stdin=None, stdout=None, stderr=None, close_fds=True)
    update.message.reply_text("Motion detection on!")

We will also need to add the handler:

dp.add_handler(CommandHandler("motion_on", motion_on))

Stop motion detection

The /motion_off command executes the function bellow which stops the motion program:

def motion_off(bot, update):
    subprocess.run(['killall', 'motion'])
    update.message.reply_text("Motion detection off!")

and the handler accordingly:

dp.add_handler(CommandHandler("motion_off", motion_off))

Send alarm if event detected

At this point, we need a repeating process (job) that checks for events.

j = updater.job_queue
job_motion = j.run_repeating(callback_motion, interval=60, first=0)

this job runs every 1 minute (_interval = 60_ seconds).

the callback_motion function:

def callback_motion(bot, job):
    imgs = glob.glob("/path/to/motion/photos/*.jpg")
    vids = glob.glob("/path/to/motion/videos/*.mkv")

    for img in imgs:
        bot.sendPhoto(chat_id=YOUR_CHAT_ID, photo=open(img, 'rb'),timeout=30)
        remove(img)

    for vid in vids:
        bot.send_video(chat_id=YOUR_CHAT_ID, video=open(vid, 'rb'),timeout=40)
        remove(vid)

In short, it checks for .jpgs and .mkvs in our specified directories, sends them and deletes them.

Here's how to find your chat ID (you will have to enter it in the code above):

While the python script is not running, send a regular message to your bot (e.g. test).

Now, navigate to https://api.telegram.org/<em>botTOKEN_ID</em>/getUpdates.

The chat ID is listed under result > 0 > message > from > id

chat_id

Securing unauthorized access

The access security snippet:

def restricted(func):
    @wraps(func)
    def wrapped(bot, update, *args, **kwargs):
        user_id = update.effective_user.id
        if user_id != YOUR_CHAT_ID:
            print("Unauthorized access denied for {}.".format(update.effective_user.first_name))
            return
        return func(bot, update, *args, **kwargs)
    return wrapped

To take it into action, we have to insert @restricted before each function we have declared.

Complete code

#!/usr/bin/python3
from telegram.ext import Updater, CommandHandler
from telegram import ReplyKeyboardMarkup
from os import remove
import subprocess
import logging
import glob

@wraps(func)
    def wrapped(bot, update, *args, **kwargs):
        user_id = update.effective_user.id
        if user_id != YOUR_CHAT_ID:
            print("Unauthorized access denied for {}.".format(update.effective_user.first_name))
            return
        return func(bot, update, *args, **kwargs)
    return wrapped

logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        level=logging.INFO)
logger = logging.getLogger(__name__)
def error(bot, update, error):
    logger.warning('Update "%s" caused error "%s"', update, error)

@restricted
def start(bot, update):
    menu = [['/motion_on', '/motion_off']]
    markup = ReplyKeyboardMarkup(menu)
    bot.send_message(update.message.chat.id, 'How can I help you?', reply_markup=markup)

@restricted
def motion_on(bot, update):
    subprocess.Popen(['motion', '-c', '~/.motion/motion.conf'], shell=True, stdin=None, stdout=None, stderr=None, close_fds=True)
    update.message.reply_text("Motion detection on!")

@restricted
def motion_off(bot, update):
    subprocess.run(['killall', 'motion'])
    update.message.reply_text("Motion detection off!")

@restricted
def callback_motion(bot, job):
    imgs = glob.glob("/home/[user]/Documents/motion/*.jpg")
    vids = glob.glob("/home/[user]/Documents/motion/*.mkv")
    for img in imgs:
        bot.sendPhoto(chat_id=YOUR_CHAT_ID, photo=open(img, 'rb'),timeout=30)
        remove(img)
    for vid in vids:
        bot.send_video(chat_id=YOUR_CHAT_ID, video=open(vid, 'rb'),timeout=40)
        remove(vid)


def main():
    updater = Updater("BOT TOKEN")
    dp = updater.dispatcher
    dp.add_error_handler(error)
    dp.add_handler(CommandHandler("start", start))
    dp.add_handler(CommandHandler("motion_on", motion_on))
    dp.add_handler(CommandHandler("motion_off", motion_off))
    j = updater.job_queue
    job_motion = j.run_repeating(callback_motion, interval=60, first=0)
    updater.start_polling()
    updater.idle()

if __name__ == '__main__':
    main()

photo_and_video