diff --git a/.gitignore b/.gitignore index 39672af..238d4ac 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,10 @@ /.venv /__pycache__ +__pycache__/ # User config (contains GPS coordinates, shutter names, MQTT credentials) operateShutters.conf # Log files (contain personal data and occupancy patterns) *.log -*.log.* \ No newline at end of file +*.log.* diff --git a/README.md b/README.md index 5a99586..3367298 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,15 @@ This project has been developed and tested with a Raspberry Pi B+ and a Raspberr Wi-Fi connectivity and Ethernet cable should both work. Note that the hardware has to be reasonably close (i.e. in the same house or in the same aisle of your mansion: just like a physical remote) to the shutters you operate, as the signal strength will otherwise not be sufficient. -As of now, you have to build your own hardware. Here are the steps to do so. +Pi-Somfy supports two RF hardware options: + +1. A raw 433 MHz ASK/OOK transmitter module. This is the original low-cost hardware path. +1. An E07-M1101D-SMA CC1101 module. This is controlled over SPI and transmits the same Somfy RTS waveform through the CC1101 radio. + +### Raw 433 MHz transmitter + +For the original raw 433 MHz transmitter, you have to build your own hardware. Here are the steps to do so. + 1. You need the RF Transmitter. If you wish to order it from eBay, this link may be helpful:
[Order](https://www.ebay.com/sch/sis.html?_nkw=5x+433Mhz+RF+transmitter+and+receiver+kit+Module+Arduino+ARM+WL+MCU+Raspberry).
Note that desoldering a 3 pin component isn't trivial, so ordering more than one may be a good idea in case of a screw up. 1. You need an oscillator for a 433.42 MHz frequency. The above RF transmitter comes with a common 433.93 MHz one, which will not work with your Somfy shutter. If you wish to order it from eBay, this link may be helpful:
[Order](https://www.ebay.com/sch/sis.html?_nkw=433.42M+R433+F433+SAW+Resonator+Crystals+TO-39) 1. You will need cables to connect the transmitter to the Raspberry Pi. Any cable will do obviously, but I found these quite helpful.
[Order](https://www.ebay.com/itm/40Pin-Multicolored-Dupont-Wire-Kits-Breadboard-Female-Jumper-Ribbon-Cable/113310899442) @@ -33,11 +41,57 @@ Note that I used GPIO 4 but you can change the value of __TXGPIO__ to whatever y OK. now this all should look like this. Note that some of the pictures are a bit confusing with regards to which GPIO a cable connects to. It's easier to see on the above diagram. But if you struggle, maybe the [Wiring Diagram](documentation/Wiring%20Diagram.txt) helps. +Set the top-level RF backend in `operateShutters.conf`: + +```ini +TXGPIO = 4 +RFBackend = raw_433 +``` + +Raw 433 MHz transmitter connection photos: ![Full Picture](documentation/Full%20Assembly.jpg)
![Pi Connection](documentation/Connection.jpg)
![RF Transmitter Connection](documentation/Sender.jpg)
+### E07-M1101D-SMA / CC1101 module + +Pi-Somfy can also drive an E07-M1101D-SMA CC1101 module. This module is controlled over SPI and uses the CC1101 asynchronous transmit mode, with the Somfy RTS waveform driven into GDO0. + +The E07-M1101D-SMA must be powered from 3.3V. Do not connect VCC or any logic pin to 5V. + +| E07-M1101D-SMA pin | Raspberry Pi 4 physical pin | Raspberry Pi signal | +| --- | ---: | --- | +| 1 GND | 6 | GND | +| 2 VCC | 1 or 17 | 3.3V | +| 3 GDO0 | 7 | GPIO4 / TXGPIO | +| 4 CSN | 24 | SPI0 CE0 / GPIO8 | +| 5 SCK | 23 | SPI0 SCLK / GPIO11 | +| 6 MOSI | 19 | SPI0 MOSI / GPIO10 | +| 7 MISO/GDO1 | 21 | SPI0 MISO / GPIO9 | +| 8 GDO2 | 22 | GPIO25, optional | + +Enable SPI on the Pi, then set the top-level RF backend in `operateShutters.conf`: + +```ini +TXGPIO = 4 +SendRepeat = 5 +RFBackend = cc1101 +CC1101Frequency = 433.42 +CC1101SPIBus = 0 +CC1101SPIDevice = 0 +CC1101OutputPower = 0xC6 +CC1101TransmitSettleSeconds = 0.05 +``` + +`CC1101TransmitSettleSeconds` keeps GDO0 low briefly after the CC1101 enters asynchronous TX mode before the Somfy waveform starts. `0.05` has worked reliably with an E07-M1101D-SMA on a Raspberry Pi 4. If one shade still misses commands, increase `SendRepeat` first because it sends more copies of the same Somfy frame. + +CC1101 connection photos to add: + +- `documentation/CC1101 Full Assembly.jpg` +- `documentation/CC1101 Pi Connection.jpg` +- `documentation/CC1101 Module Connection.jpg` + ## 3 Software If you are new to using a Raspberry Pi and Linux please refer to other sources for coming up to speed with the environment. Having a base knowledge will go a long way. This [site](https://www.raspberrypi.org/help/) is a great place to start if you are new to these topics. @@ -46,7 +100,7 @@ If you are not familiar with remote login commands for Linux/Unix, two useful co The Raspberry Pi organization has documentation on installing an operating system on your Raspberry Pi. It is located [here](https://www.raspberrypi.org/documentation/installation/installing-images/README.md). -Once the Pi has its basic setup (an operating system and an internet connection) working, ssh into your Raspberry Pi and you should find that you are in the directory /home/pi. Note: if you prefer not to use a headless system, you can also open a terminal windows directly on the Pi. +Once the Pi has its basic setup (an operating system and an internet connection) working, ssh into your Raspberry Pi. The examples below install into `~/Pi-Somfy`; if you use a different directory, run the commands from that checkout. The next step is to download the Pi-Somfy project files to your Raspberry Pi. The easiest way to do this is to use the "git" program. Most Raspberry Pi distributions include the git program (except Debian Lite). @@ -58,44 +112,75 @@ sudo apt-get install git ``` (If git isn't installed, it will install it; if it was previously, it will update it) -Once git is installed on your system, make sure you are in the /home/pi directory, then type: +Once git is installed on your system, clone the project: ```sh +cd ~ git clone https://github.com/Nickduino/Pi-Somfy.git +cd Pi-Somfy ``` -The above command will make a directory in /home/pi named Pi-Somfy and put the project files in this directory. +The above command will make a directory named `Pi-Somfy` and put the project files in this directory. -Next, we need to install Python Libraries. Pi-Somfy requires Python 3. Ensure pip3 is installed: +Next, install the Raspberry Pi OS packages used by Pi-Somfy. On current Raspberry Pi OS releases, the GPIO package is `python3-pigpio`; the bare `pigpio` package name may not be installable. ```sh sudo apt-get update -sudo apt-get install python3-pip +sudo apt-get install python3-venv python3-pip python3-pigpio python3-lgpio python3-spidev +``` + +For the E07-M1101D-SMA / CC1101 backend, enable SPI: + +```sh +sudo raspi-config nonint do_spi 0 +ls -l /dev/spidev* ``` -Next, we need to install the PIGPIO libraries, to do so, type: +If `/dev/spidev0.0` and `/dev/spidev0.1` do not appear, reboot and check again. + +Create a Python virtual environment that can see the Raspberry Pi OS GPIO/SPI packages, then install the Python requirements: ```sh -sudo apt-get install pigpio +python3 -m venv --system-site-packages .venv +.venv/bin/python -m pip install --upgrade pip setuptools wheel +.venv/bin/python -m pip install -r requirements.txt ``` -Next install the required Python Libraries: +The `--system-site-packages` option is intentional: Raspberry Pi OS supplies `pigpio`, `lgpio`, and `spidev` as apt packages, while `requirements.txt` installs or verifies the Python packages used by the app, including `cc1101`. + +If this is a new install, create the config from the default and choose the RF backend: ```sh -sudo pip3 install -r requirements.txt +cp defaultConfig.conf operateShutters.conf +``` + +For the raw 433 MHz transmitter, keep: + +```ini +RFBackend = raw_433 ``` -Next, let's test if it all works. Start `operateShutters.py` by typing: +For the E07-M1101D-SMA / CC1101 module, set: + +```ini +SendRepeat = 5 +RFBackend = cc1101 +CC1101Frequency = 433.42 +CC1101OutputPower = 0xC6 +CC1101TransmitSettleSeconds = 0.05 +``` + +Next, test the Python environment by typing: ```sh -sudo python3 /home/pi/Pi-Somfy/operateShutters.py +.venv/bin/python operateShutters.py -h ``` You should see the help text explaining the [Command Line Interface](documentation/p4.png) ## 4 Usage -Note that the config file won't exist the first time you run the application. In that case, a new config file will be created based on the name you specified (e.g. /home/pi/Pi-Somfy/operateShutters.conf). Once it has been created, you can modify it to change your need (SSL or not, which port is used, etc.), it will not be erased with an update. If you messed up something, just delete it and relaunch operateShutters.py, a new vanilla copy will be generated. +Note that the config file won't exist the first time you run the application unless you copied it from `defaultConfig.conf` during install. In that case, a new config file will be created based on the name you specified (e.g. `operateShutters.conf` in your checkout). Once it has been created, you can modify it to change your need (SSL or not, which port is used, etc.), it will not be erased with an update. If you messed up something, just delete it and relaunch operateShutters.py, a new vanilla copy will be generated. You have 6 ways to operate. The recommended operation mode is mode 5. But the other 5 modes are explained here for completeness: @@ -128,46 +213,52 @@ You have 6 ways to operate. The recommended operation mode is mode 5. But the ot **Examples:** All three command the shutter named corridor. The first one will raise it. The second one will lower it. The third one will lower the shutter at sunset and raise it again 60 minutes after sunrise. ```sh -sudo /home/pi/Pi-Somfy/operateShutters.py corridor -c /home/pi/Pi-Somfy/operateShutters.conf -u -sudo /home/pi/Pi-Somfy/operateShutters.py corridor -c /home/pi/Pi-Somfy/operateShutters.conf -d -sudo /home/pi/Pi-Somfy/operateShutters.py corridor -c /home/pi/Pi-Somfy/operateShutters.conf -dd 0 60 +.venv/bin/python operateShutters.py corridor -c operateShutters.conf -u +.venv/bin/python operateShutters.py corridor -c operateShutters.conf -d +.venv/bin/python operateShutters.py corridor -c operateShutters.conf -dd 0 60 ``` 2. Manually start Web interface only
You can start the web-interface by typing:
Once started, you can access the web interface at http://IPaddressOfYourPi:80. From there you can further modify your settings. ```sh -sudo python3 /home/pi/Pi-Somfy/operateShutters.py -c /home/pi/Pi-Somfy/operateShutters.conf -a +sudo .venv/bin/python operateShutters.py -c operateShutters.conf -a ``` 3. Manually start Web interface and Alexa interface
You can start the web-interface by typing: ```sh -sudo python3 /home/pi/Pi-Somfy/operateShutters.py -c /home/pi/Pi-Somfy/operateShutters.conf -a -e +sudo .venv/bin/python operateShutters.py -c operateShutters.conf -a -e ``` 4. Manually start Web interface and MQTT integration (for Home Assistant)
You can start the web-interface by typing: ```sh -sudo python3 /home/pi/Pi-Somfy/operateShutters.py -c /home/pi/Pi-Somfy/operateShutters.conf -a -m +sudo .venv/bin/python operateShutters.py -c operateShutters.conf -a -m ``` 5. Finally, the recommended way to operate it is using a systemd service on boot time. You can do so by typing: ```sh -sudo bash /home/pi/Pi-Somfy/installService.sh +sudo bash ./installService.sh ``` -The service will be installed as a system service right after establishing network connectivity. +The service will be installed as `pi-somfy.service` and starts after network connectivity. +By default, the service runs the web interface with `-a`. To also enable MQTT and Alexa, install it like this: + +```sh +PI_SOMFY_ARGS="-a -m -e" sudo -E bash ./installService.sh +``` + If you want to stop the service simply type: ```sh -sudo systemctl stop shutters.service +sudo systemctl stop pi-somfy.service ``` If you want to start the service simply type: ```sh -sudo systemctl start shutters.service +sudo systemctl start pi-somfy.service ``` If you want to restart the service simply type: ```sh -sudo systemctl restart shutters.service +sudo systemctl restart pi-somfy.service ``` -Note, currently the service expects python3 for starting up. +The installer writes the service file with the path to your current checkout and uses `.venv/bin/python` when that virtual environment exists. ``` -ExecStart=sudo /usr/bin/python3 /home/pi/Pi-Somfy/operateShutters.py -c /home/pi/Pi-Somfy/operateShutters.conf -a -e -m +ExecStart=/path/to/Pi-Somfy/.venv/bin/python /path/to/Pi-Somfy/operateShutters.py -c /path/to/Pi-Somfy/operateShutters.conf -a ``` 6. Alternatively, you can use cron to run the program at boot time. You can do so by typing: @@ -177,8 +268,8 @@ sudo crontab -e Note, that "crontab -e" will just open a console-based text editor that you can edit the crontab script. The first time you run "crontab -e" you will be prompted to choose the editor. I recommend nano. From the crontab window, add the following to the bottom of the crontab script ``` -@reboot sleep 60;python3 /home/pi/Pi-Somfy/operateShutters.py -c /home/pi/Pi-Somfy/operateShutters.conf -a -e -m -0 * * * * python3 /home/pi/Pi-Somfy/operateShutters.py -c /home/pi/Pi-Somfy/operateShutters.conf -a -e -m +@reboot sleep 60; cd /path/to/Pi-Somfy && .venv/bin/python operateShutters.py -c operateShutters.conf -a -e -m +0 * * * * cd /path/to/Pi-Somfy && .venv/bin/python operateShutters.py -c operateShutters.conf -a -e -m ``` And save the crontab schedule. (if using nano type press ctrl-o to save the file, ctrl-x to exit nano). Now, every time your system is booted operateShutters will start. @@ -284,7 +375,7 @@ If you choose not to use the Home Assistant add-in, you can download the [Mosqui Second, start `operateShutters.py` with the "-m" option. This should look similar to this: ```sh -operateShutters.py -c /home/pi/Pi-Somfy/operateShutters.conf -a -m +.venv/bin/python operateShutters.py -c operateShutters.conf -a -m ``` And that's it, you are all set. @@ -355,10 +446,10 @@ The project has been updated to use current software libraries and fix a number If you already have Pi-Somfy installed, follow these steps to upgrade: ```sh -cd /home/pi/Pi-Somfy +cd ~/Pi-Somfy git pull -sudo pip3 install -r requirements.txt -sudo systemctl restart shutters.service +.venv/bin/python -m pip install -r requirements.txt +sudo systemctl restart pi-somfy.service ``` Note: if you are not using MQTT (`-m` flag), `paho-mqtt` is no longer required and you can skip installing it. Your existing `operateShutters.conf` will be preserved — the upgrade only replaces code files. diff --git a/cc1101_backend.py b/cc1101_backend.py new file mode 100644 index 0000000..1b3f443 --- /dev/null +++ b/cc1101_backend.py @@ -0,0 +1,107 @@ +#!/usr/bin/python3 + +import time + +import cc1101 + + +class CC1101Config: + DEFAULT_FREQUENCY_MHZ = 433.42 + DEFAULT_SPI_BUS = 0 + DEFAULT_SPI_DEVICE = 0 + DEFAULT_OUTPUT_POWER = 0xC6 + DEFAULT_TRANSMIT_SETTLE_SECONDS = 0.05 + SYMBOL_RATE_BAUD = 1562.5 + + def __init__( + self, + frequency_mhz=DEFAULT_FREQUENCY_MHZ, + spi_bus=DEFAULT_SPI_BUS, + spi_device=DEFAULT_SPI_DEVICE, + output_power=DEFAULT_OUTPUT_POWER, + transmit_settle_seconds=DEFAULT_TRANSMIT_SETTLE_SECONDS, + ): + self.frequency_mhz = float(frequency_mhz) + self.spi_bus = int(spi_bus) + self.spi_device = int(spi_device) + self.output_power = int(output_power) + self.transmit_settle_seconds = float(transmit_settle_seconds) + self.symbol_rate_baud = self.SYMBOL_RATE_BAUD + + @classmethod + def from_app_config(cls, config): + if hasattr(config, "ReadValue"): + return cls( + frequency_mhz=config.ReadValue( + "CC1101Frequency", + return_type=float, + default=cls.DEFAULT_FREQUENCY_MHZ, + section="General", + ), + spi_bus=config.ReadValue( + "CC1101SPIBus", + return_type=int, + default=cls.DEFAULT_SPI_BUS, + section="General", + ), + spi_device=config.ReadValue( + "CC1101SPIDevice", + return_type=int, + default=cls.DEFAULT_SPI_DEVICE, + section="General", + ), + output_power=config.ReadValue( + "CC1101OutputPower", + return_type=int, + default=cls.DEFAULT_OUTPUT_POWER, + section="General", + ), + transmit_settle_seconds=config.ReadValue( + "CC1101TransmitSettleSeconds", + return_type=float, + default=cls.DEFAULT_TRANSMIT_SETTLE_SECONDS, + section="General", + ), + ) + return cls( + frequency_mhz=getattr(config, "CC1101Frequency", cls.DEFAULT_FREQUENCY_MHZ), + spi_bus=getattr(config, "CC1101SPIBus", cls.DEFAULT_SPI_BUS), + spi_device=getattr(config, "CC1101SPIDevice", cls.DEFAULT_SPI_DEVICE), + output_power=getattr(config, "CC1101OutputPower", cls.DEFAULT_OUTPUT_POWER), + transmit_settle_seconds=getattr( + config, + "CC1101TransmitSettleSeconds", + cls.DEFAULT_TRANSMIT_SETTLE_SECONDS, + ), + ) + + @property + def frequency_hz(self): + return self.frequency_mhz * 1000000 + + @property + def output_power_table(self): + return (0, self.output_power) + + +class CC1101Transmitter: + def __init__(self, config, waveform_transmitter): + self.config = config + self.waveform_transmitter = waveform_transmitter + self.radio = cc1101.CC1101( + spi_bus=self.config.spi_bus, + spi_chip_select=self.config.spi_device, + lock_spi_device=True, + ) + + def transmit(self, frame, repetition): + if hasattr(self.waveform_transmitter, "set_idle_low"): + self.waveform_transmitter.set_idle_low() + with self.radio as radio: + radio.set_base_frequency_hertz(self.config.frequency_hz) + radio.set_symbol_rate_baud(self.config.symbol_rate_baud) + radio.set_output_power(self.config.output_power_table) + with radio.asynchronous_transmission(): + if self.config.transmit_settle_seconds > 0: + time.sleep(self.config.transmit_settle_seconds) + self.waveform_transmitter.transmit(frame, repetition) diff --git a/config.py b/config.py index 8973050..4dee540 100644 --- a/config.py +++ b/config.py @@ -126,6 +126,7 @@ def __init__(self, filename = None, section = None, log = None): self.UseHttps = False self.HTTPPort = 80 self.HTTPSPort = 443 + self.RFBackend = "raw_433" self.RTS_Address = "0x279620" self.MQTT_ClientID = "somfy-mqtt-bridge" self.Shutters = {} @@ -150,7 +151,19 @@ def __init__(self, filename = None, section = None, log = None): # -------------------- MyConfig::LoadConfig----------------------------------- def LoadConfig(self): - parameters = {'LogLocation': str, 'Latitude': float, 'Longitude': float, 'SendRepeat': int, 'UseHttps': bool, 'HTTPPort': int, 'HTTPSPort': int, 'TXGPIO': int, 'RTS_Address': str, "Password": str} + parameters = { + 'LogLocation': str, + 'Latitude': float, + 'Longitude': float, + 'SendRepeat': int, + 'UseHttps': bool, + 'HTTPPort': int, + 'HTTPSPort': int, + 'TXGPIO': int, + 'RFBackend': str, + 'RTS_Address': str, + "Password": str + } for key, type in parameters.items(): try: @@ -160,6 +173,8 @@ def LoadConfig(self): self.LogErrorLine("Missing config file or config file entries in Section General for key "+key+": " + str(e1)) return False + self.RFBackend = self.RFBackend.strip().lower() + parameters = {'MQTT_Server': str, 'MQTT_Port': int, 'MQTT_User': str, 'MQTT_Password': str, 'MQTT_ClientID': str, 'EnableDiscovery': bool} for key, type in parameters.items(): @@ -262,10 +277,14 @@ def ReadValue(self, Entry, return_type = str, default = None, section = None, No elif return_type == float: return self.config.getfloat(sect, Entry) elif return_type == int: - if self.config.get(sect, Entry) == 'None': + value = self.config.get(sect, Entry) + if value == 'None': return None else: - return self.config.getint(sect, Entry) + try: + return int(value, 0) + except ValueError: + return self.config.getint(sect, Entry) else: self.LogErrorLine("Error in MyConfig:ReadValue: invalid type:" + str(return_type)) return default diff --git a/defaultConfig.conf b/defaultConfig.conf index bb0599b..f510564 100644 --- a/defaultConfig.conf +++ b/defaultConfig.conf @@ -16,6 +16,7 @@ Longitude = 0 # Repeat each command a certain number of times. This is to ensure it works # if the remote is far away from the shutter and sometimes EMI prevents a # signal to go through +# E07-M1101D-SMA / CC1101 deployments that miss commands may need SendRepeat = 5. # This option only applies if a shutter is raised or lowered in full. If # a shutter is only raised or lowered for a given amount of seconds, this # option does not apply for obvious reasons. @@ -25,6 +26,19 @@ SendRepeat = 2 # emitter is connected to. The default value is 4 TXGPIO = 4 +# (Optional) RF backend used for transmission. Supported values: +# raw_433 - raw 433.42 MHz ASK/OOK transmitter driven from TXGPIO +# cc1101 - CC1101 modules such as the E07-M1101D-SMA, with GDO0 wired to TXGPIO +RFBackend = raw_433 + +# (Optional) CC1101 backend settings used only when RFBackend = cc1101. +# The E07-M1101D-SMA is a 3.3V SPI CC1101 module. Do not power it from 5V. +CC1101Frequency = 433.42 +CC1101SPIBus = 0 +CC1101SPIDevice = 0 +CC1101OutputPower = 0xC6 +CC1101TransmitSettleSeconds = 0.05 + # This parameter, if true will enable the use of HTTPS # (secure HTTP) in the Flask web app or user name and password # authentication, depending on the options below. This option is only @@ -130,5 +144,3 @@ EnableDiscovery = true # - shutterIds: Array of shutterIds to operate # [Scheduler] - - diff --git a/installService.sh b/installService.sh index bc755d2..b996aeb 100644 --- a/installService.sh +++ b/installService.sh @@ -1,8 +1,50 @@ #!/bin/bash +set -euo pipefail -cd /home/pi/Pi-Somfy/ -cp /home/pi/Pi-Somfy/shutters.service /etc/systemd/system/shutters.service +APP_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CONFIG_FILE="${PI_SOMFY_CONFIG:-$APP_DIR/operateShutters.conf}" +SERVICE_NAME="${PI_SOMFY_SERVICE_NAME:-pi-somfy}" +SERVICE_FILE="/etc/systemd/system/${SERVICE_NAME}.service" +APP_ARGS="${PI_SOMFY_ARGS:--a}" -systemctl daemon-reload -systemctl enable shutters -systemctl start shutters +if [ -n "${PI_SOMFY_PYTHON:-}" ]; then + PYTHON_BIN="$PI_SOMFY_PYTHON" +elif [ -x "$APP_DIR/.venv/bin/python" ]; then + PYTHON_BIN="$APP_DIR/.venv/bin/python" +else + PYTHON_BIN="/usr/bin/python3" +fi + +if [ ! -f "$CONFIG_FILE" ]; then + cp "$APP_DIR/defaultConfig.conf" "$CONFIG_FILE" +fi + +if [ "$(id -u)" -eq 0 ]; then + SUDO=() +else + SUDO=(sudo) +fi + +# Default service unit: pi-somfy.service +"${SUDO[@]}" tee "$SERVICE_FILE" >/dev/null <> (7 - (i%8))) & 1): - wf.append(pigpio.pulse(0, 1<> (7 - (i%8))) & 1): - wf.append(pigpio.pulse(0, 1<> (7 - (i%8))) & 1): - pulses.append(lgpio.pulse(0, 1, 640)) - pulses.append(lgpio.pulse(1, 1, 640)) - else: - pulses.append(lgpio.pulse(1, 1, 640)) - pulses.append(lgpio.pulse(0, 1, 640)) - - pulses.append(lgpio.pulse(0, 1, 30415)) # interframe gap - - for j in range(1, repetition): # repeating frames - for i in range(7): # hardware synchronization - pulses.append(lgpio.pulse(1, 1, 2560)) - pulses.append(lgpio.pulse(0, 1, 2560)) - pulses.append(lgpio.pulse(1, 1, 4550)) # software synchronization - pulses.append(lgpio.pulse(0, 1, 640)) - - for i in range(0, 56): # manchester encoding of payload data - if ((self.frame[int(i/8)] >> (7 - (i%8))) & 1): - pulses.append(lgpio.pulse(0, 1, 640)) - pulses.append(lgpio.pulse(1, 1, 640)) - else: - pulses.append(lgpio.pulse(1, 1, 640)) - pulses.append(lgpio.pulse(0, 1, 640)) - - pulses.append(lgpio.pulse(0, 1, 30415)) # interframe gap - - lgpio.tx_wave(h, self.TXGPIO, pulses) - while lgpio.tx_busy(h, self.TXGPIO, lgpio.TX_WAVE): - time.sleep(0.001) - - lgpio.gpio_free(h, self.TXGPIO) - lgpio.gpiochip_close(h) - class operateShutters(MyLog): def __init__(self, args = None): diff --git a/raw_433_backend.py b/raw_433_backend.py new file mode 100644 index 0000000..aa1ffc3 --- /dev/null +++ b/raw_433_backend.py @@ -0,0 +1,191 @@ +#!/usr/bin/python3 + +import time + + +class Raw433Config: + DEFAULT_TXGPIO = 4 + + def __init__(self, tx_gpio=DEFAULT_TXGPIO): + if tx_gpio is None: + tx_gpio = self.DEFAULT_TXGPIO + self.tx_gpio = int(tx_gpio) + + @classmethod + def from_app_config(cls, config): + if hasattr(config, "ReadValue"): + return cls( + tx_gpio=config.ReadValue( + "TXGPIO", + return_type=int, + default=cls.DEFAULT_TXGPIO, + section="General", + ) + ) + return cls(tx_gpio=getattr(config, "TXGPIO", cls.DEFAULT_TXGPIO)) + + +class Raw433Transmitter: + def __init__( + self, + config, + is_pi5=False, + pigpio_module=None, + lgpio_module=None, + lgpio_chip=4, + ): + self.config = config + self.is_pi5 = is_pi5 + self.pigpio = pigpio_module + self.lgpio = lgpio_module + self.lgpio_chip = lgpio_chip + + def transmit(self, frame, repetition): + if self.is_pi5: + self._send_lgpio(frame, repetition) + else: + self._send_pigpio(frame, repetition) + + def set_idle_low(self): + if self.is_pi5: + self._set_idle_low_lgpio() + else: + self._set_idle_low_pigpio() + + def _load_pigpio(self): + if self.pigpio is not None: + return self.pigpio + import pigpio + return pigpio + + def _load_lgpio(self): + if self.lgpio is not None: + return self.lgpio + import lgpio + return lgpio + + def _send_pigpio(self, frame, repetition): + pigpio = self._load_pigpio() + pi = pigpio.pi() + + if not pi.connected: + exit() + + tx_gpio = self.config.tx_gpio + pi.wave_add_new() + pi.set_mode(tx_gpio, pigpio.OUTPUT) + pi.write(tx_gpio, 0) + + wf = [] + wf.append(pigpio.pulse(1 << tx_gpio, 0, 9415)) # wake up pulse + wf.append(pigpio.pulse(0, 1 << tx_gpio, 89565)) # silence + for i in range(2): # hardware synchronization + wf.append(pigpio.pulse(1 << tx_gpio, 0, 2560)) + wf.append(pigpio.pulse(0, 1 << tx_gpio, 2560)) + wf.append(pigpio.pulse(1 << tx_gpio, 0, 4550)) # software synchronization + wf.append(pigpio.pulse(0, 1 << tx_gpio, 640)) + + for i in range(0, 56): # manchester encoding of payload data + if ((frame[int(i / 8)] >> (7 - (i % 8))) & 1): + wf.append(pigpio.pulse(0, 1 << tx_gpio, 640)) + wf.append(pigpio.pulse(1 << tx_gpio, 0, 640)) + else: + wf.append(pigpio.pulse(1 << tx_gpio, 0, 640)) + wf.append(pigpio.pulse(0, 1 << tx_gpio, 640)) + + wf.append(pigpio.pulse(0, 1 << tx_gpio, 30415)) # interframe gap + + for j in range(1, repetition): # repeating frames + for i in range(7): # hardware synchronization + wf.append(pigpio.pulse(1 << tx_gpio, 0, 2560)) + wf.append(pigpio.pulse(0, 1 << tx_gpio, 2560)) + wf.append(pigpio.pulse(1 << tx_gpio, 0, 4550)) # software synchronization + wf.append(pigpio.pulse(0, 1 << tx_gpio, 640)) + + for i in range(0, 56): # manchester encoding of payload data + if ((frame[int(i / 8)] >> (7 - (i % 8))) & 1): + wf.append(pigpio.pulse(0, 1 << tx_gpio, 640)) + wf.append(pigpio.pulse(1 << tx_gpio, 0, 640)) + else: + wf.append(pigpio.pulse(1 << tx_gpio, 0, 640)) + wf.append(pigpio.pulse(0, 1 << tx_gpio, 640)) + + wf.append(pigpio.pulse(0, 1 << tx_gpio, 30415)) # interframe gap + + pi.wave_add_generic(wf) + wid = pi.wave_create() + pi.wave_send_once(wid) + while pi.wave_tx_busy(): + pass + pi.wave_delete(wid) + pi.stop() + + def _set_idle_low_pigpio(self): + pigpio = self._load_pigpio() + pi = pigpio.pi() + + if not pi.connected: + exit() + + tx_gpio = self.config.tx_gpio + pi.set_mode(tx_gpio, pigpio.OUTPUT) + pi.write(tx_gpio, 0) + pi.stop() + + def _send_lgpio(self, frame, repetition): + lgpio = self._load_lgpio() + tx_gpio = self.config.tx_gpio + h = lgpio.gpiochip_open(self.lgpio_chip) + lgpio.gpio_claim_output(h, tx_gpio, 0) + + pulses = [] + pulses.append(lgpio.pulse(1, 1, 9415)) # wake up pulse + pulses.append(lgpio.pulse(0, 1, 89565)) # silence + for i in range(2): # hardware synchronization + pulses.append(lgpio.pulse(1, 1, 2560)) + pulses.append(lgpio.pulse(0, 1, 2560)) + pulses.append(lgpio.pulse(1, 1, 4550)) # software synchronization + pulses.append(lgpio.pulse(0, 1, 640)) + + for i in range(0, 56): # manchester encoding of payload data + if ((frame[int(i / 8)] >> (7 - (i % 8))) & 1): + pulses.append(lgpio.pulse(0, 1, 640)) + pulses.append(lgpio.pulse(1, 1, 640)) + else: + pulses.append(lgpio.pulse(1, 1, 640)) + pulses.append(lgpio.pulse(0, 1, 640)) + + pulses.append(lgpio.pulse(0, 1, 30415)) # interframe gap + + for j in range(1, repetition): # repeating frames + for i in range(7): # hardware synchronization + pulses.append(lgpio.pulse(1, 1, 2560)) + pulses.append(lgpio.pulse(0, 1, 2560)) + pulses.append(lgpio.pulse(1, 1, 4550)) # software synchronization + pulses.append(lgpio.pulse(0, 1, 640)) + + for i in range(0, 56): # manchester encoding of payload data + if ((frame[int(i / 8)] >> (7 - (i % 8))) & 1): + pulses.append(lgpio.pulse(0, 1, 640)) + pulses.append(lgpio.pulse(1, 1, 640)) + else: + pulses.append(lgpio.pulse(1, 1, 640)) + pulses.append(lgpio.pulse(0, 1, 640)) + + pulses.append(lgpio.pulse(0, 1, 30415)) # interframe gap + + lgpio.tx_wave(h, tx_gpio, pulses) + while lgpio.tx_busy(h, tx_gpio, lgpio.TX_WAVE): + time.sleep(0.001) + + lgpio.gpio_free(h, tx_gpio) + lgpio.gpiochip_close(h) + + def _set_idle_low_lgpio(self): + lgpio = self._load_lgpio() + tx_gpio = self.config.tx_gpio + h = lgpio.gpiochip_open(self.lgpio_chip) + lgpio.gpio_claim_output(h, tx_gpio, 0) + lgpio.gpio_write(h, tx_gpio, 0) + lgpio.gpio_free(h, tx_gpio) + lgpio.gpiochip_close(h) diff --git a/requirements.txt b/requirements.txt index 8ecb407..01ad6fa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ +# Raspberry Pi OS users should install GPIO/SPI bindings with apt first: +# sudo apt-get install python3-pigpio python3-lgpio python3-spidev +# Then install these requirements from a venv created with --system-site-packages. ephem configparser Flask @@ -5,3 +8,4 @@ paho-mqtt pigpio lgpio requests +cc1101 diff --git a/rf_backend.py b/rf_backend.py new file mode 100644 index 0000000..0357eaf --- /dev/null +++ b/rf_backend.py @@ -0,0 +1,38 @@ +#!/usr/bin/python3 + +from cc1101_backend import CC1101Config +from cc1101_backend import CC1101Transmitter +from raw_433_backend import Raw433Config +from raw_433_backend import Raw433Transmitter + + +def get_backend_name(config): + return getattr(config, "RFBackend", "raw_433").strip().lower() + + +def create_transmitter( + config, + is_pi5=False, + pigpio_module=None, + lgpio_module=None, + lgpio_chip=4, +): + backend_name = get_backend_name(config) + raw_433_transmitter = Raw433Transmitter( + Raw433Config.from_app_config(config), + is_pi5=is_pi5, + pigpio_module=pigpio_module, + lgpio_module=lgpio_module, + lgpio_chip=lgpio_chip, + ) + + if backend_name == "raw_433": + return raw_433_transmitter + if backend_name == "cc1101": + return CC1101Transmitter( + CC1101Config.from_app_config(config), + raw_433_transmitter, + ) + raise ValueError( + "Unsupported RFBackend: " + str(backend_name) + ". Use raw_433 or cc1101." + ) diff --git a/shutters.service b/shutters.service index 929c8db..0049523 100644 --- a/shutters.service +++ b/shutters.service @@ -1,15 +1,17 @@ - +# Example only. installService.sh generates an install-specific +# /etc/systemd/system/pi-somfy.service using the checkout path. [Unit] Description=Pi Somfy Shutter Service After=network-online.target +Wants=network-online.target [Service] -User=pi -ExecStart=sudo bash /home/pi/Pi-Somfy/start.sh +Type=simple +WorkingDirectory=/opt/Pi-Somfy Environment=PYTHONUNBUFFERED=1 +ExecStart=/opt/Pi-Somfy/.venv/bin/python /opt/Pi-Somfy/operateShutters.py -c /opt/Pi-Somfy/operateShutters.conf -a Restart=on-failure -Type=exec - +RestartSec=5 [Install] -WantedBy=multi-user.target \ No newline at end of file +WantedBy=multi-user.target diff --git a/start.sh b/start.sh index 14f2a97..2041667 100644 --- a/start.sh +++ b/start.sh @@ -1,3 +1,19 @@ #!/bin/bash +set -euo pipefail -sudo /usr/bin/python3 /home/pi/Pi-Somfy/operateShutters.py -c /home/pi/Pi-Somfy/operateShutters.conf -a -m -e & \ No newline at end of file +APP_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CONFIG_FILE="${PI_SOMFY_CONFIG:-$APP_DIR/operateShutters.conf}" + +if [ -n "${PI_SOMFY_PYTHON:-}" ]; then + PYTHON_BIN="$PI_SOMFY_PYTHON" +elif [ -x "$APP_DIR/.venv/bin/python" ]; then + PYTHON_BIN="$APP_DIR/.venv/bin/python" +else + PYTHON_BIN="/usr/bin/python3" +fi + +if [ "$#" -eq 0 ]; then + set -- -a -m -e +fi + +exec "$PYTHON_BIN" "$APP_DIR/operateShutters.py" -c "$CONFIG_FILE" "$@" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/test_cc1101_backend.py b/tests/test_cc1101_backend.py new file mode 100644 index 0000000..b2cbb7c --- /dev/null +++ b/tests/test_cc1101_backend.py @@ -0,0 +1,163 @@ +import contextlib +import inspect +import sys +import types +import unittest +from unittest import mock + +fake_cc1101 = types.ModuleType("cc1101") +sys.modules["cc1101"] = fake_cc1101 + +import cc1101_backend +from cc1101_backend import CC1101Config, CC1101Transmitter + + +class FakeRadio: + instances = [] + + def __init__(self, spi_bus, spi_chip_select, lock_spi_device): + self.spi_bus = spi_bus + self.spi_chip_select = spi_chip_select + self.lock_spi_device = lock_spi_device + self.calls = [] + FakeRadio.instances.append(self) + + def __enter__(self): + self.calls.append(("enter",)) + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.calls.append(("exit", exc_type)) + return False + + def set_base_frequency_hertz(self, frequency_hertz): + self.calls.append(("frequency", frequency_hertz)) + + def set_symbol_rate_baud(self, symbol_rate): + self.calls.append(("symbol_rate", symbol_rate)) + + def set_output_power(self, output_power): + self.calls.append(("output_power", tuple(output_power))) + + @contextlib.contextmanager + def asynchronous_transmission(self): + self.calls.append(("async_enter",)) + yield "GDO0" + self.calls.append(("async_exit",)) + + +class CC1101BackendTest(unittest.TestCase): + def setUp(self): + FakeRadio.instances = [] + fake_cc1101.CC1101 = FakeRadio + if hasattr(cc1101_backend, "cc1101"): + cc1101_backend.cc1101.CC1101 = FakeRadio + + def test_imports_cc1101_at_module_load_time(self): + self.assertIs(fake_cc1101, cc1101_backend.cc1101) + + def test_transmitter_uses_imported_cc1101_module(self): + self.assertNotIn( + "cc1101_module", + inspect.signature(CC1101Transmitter).parameters, + ) + + def test_config_is_derived_from_app_config(self): + app_config = types.SimpleNamespace( + CC1101Frequency=433.42, + CC1101SPIBus=0, + CC1101SPIDevice=1, + CC1101OutputPower=0xC6, + CC1101TransmitSettleSeconds=0.02, + ) + + config = CC1101Config.from_app_config(app_config) + + self.assertEqual(433.42e6, config.frequency_hz) + self.assertEqual(0, config.spi_bus) + self.assertEqual(1, config.spi_device) + self.assertEqual(0xC6, config.output_power) + self.assertEqual(0.02, config.transmit_settle_seconds) + self.assertEqual(1562.5, config.symbol_rate_baud) + + def test_config_reads_values_from_myconfig_interface(self): + class AppConfig: + def ReadValue(self, entry, return_type=str, default=None, section=None): + values = { + "CC1101Frequency": "433.42", + "CC1101SPIBus": "0", + "CC1101SPIDevice": "1", + "CC1101OutputPower": "0xC6", + "CC1101TransmitSettleSeconds": "0.02", + } + value = values.get(entry) + if value is None: + return default + if return_type == int: + return int(value, 0) + if return_type == float: + return float(value) + return value + + config = CC1101Config.from_app_config(AppConfig()) + + self.assertEqual(433.42e6, config.frequency_hz) + self.assertEqual(0, config.spi_bus) + self.assertEqual(1, config.spi_device) + self.assertEqual(0xC6, config.output_power) + self.assertEqual(0.02, config.transmit_settle_seconds) + + def test_transmitter_configures_radio_and_calls_waveform_callback(self): + config = CC1101Config( + frequency_mhz=433.42, + spi_bus=0, + spi_device=0, + output_power=0xC6, + ) + waveform_transmitter = types.SimpleNamespace( + set_idle_low=lambda: calls.append(("idle_low",)), + transmit=lambda frame, repetition: calls.append(("waveform", frame, repetition)), + ) + frame = bytearray([0] * 7) + calls = [] + + transmitter = CC1101Transmitter(config, waveform_transmitter) + with mock.patch("cc1101_backend.time.sleep") as sleep_mock: + transmitter.transmit(frame, 3) + + self.assertEqual([("idle_low",), ("waveform", frame, 3)], calls) + sleep_mock.assert_called_once_with(0.05) + self.assertEqual(1, len(FakeRadio.instances)) + radio = FakeRadio.instances[0] + self.assertEqual(0, radio.spi_bus) + self.assertEqual(0, radio.spi_chip_select) + self.assertTrue(radio.lock_spi_device) + self.assertEqual( + [ + ("enter",), + ("frequency", 433.42e6), + ("symbol_rate", 1562.5), + ("output_power", (0, 0xC6)), + ("async_enter",), + ("async_exit",), + ("exit", None), + ], + radio.calls, + ) + + def test_transmitter_reuses_radio_object_between_transmits(self): + config = CC1101Config() + waveform_transmitter = types.SimpleNamespace(transmit=lambda frame, repetition: calls.append(("waveform", repetition))) + frame = bytearray([0] * 7) + calls = [] + + transmitter = CC1101Transmitter(config, waveform_transmitter) + transmitter.transmit(frame, 1) + transmitter.transmit(frame, 2) + + self.assertEqual([("waveform", 1), ("waveform", 2)], calls) + self.assertEqual(1, len(FakeRadio.instances)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_raw_433_backend.py b/tests/test_raw_433_backend.py new file mode 100644 index 0000000..4c9d27f --- /dev/null +++ b/tests/test_raw_433_backend.py @@ -0,0 +1,185 @@ +import unittest + +from raw_433_backend import Raw433Config, Raw433Transmitter + + +class FakePi: + instances = [] + + def __init__(self): + self.connected = True + self.calls = [] + FakePi.instances.append(self) + + def wave_add_new(self): + self.calls.append(("wave_add_new",)) + + def set_mode(self, gpio, mode): + self.calls.append(("set_mode", gpio, mode)) + + def write(self, gpio, level): + self.calls.append(("write", gpio, level)) + + def wave_add_generic(self, waveform): + self.calls.append(("wave_add_generic", waveform)) + + def wave_create(self): + self.calls.append(("wave_create",)) + return 7 + + def wave_send_once(self, wave_id): + self.calls.append(("wave_send_once", wave_id)) + + def wave_tx_busy(self): + self.calls.append(("wave_tx_busy",)) + return False + + def wave_delete(self, wave_id): + self.calls.append(("wave_delete", wave_id)) + + def stop(self): + self.calls.append(("stop",)) + + +class FakePigpio: + OUTPUT = "output" + + def pi(self): + return FakePi() + + def pulse(self, gpio_on, gpio_off, delay): + return ("pulse", gpio_on, gpio_off, delay) + + +class FakeLgpio: + TX_WAVE = "tx_wave" + + def __init__(self): + self.calls = [] + + def gpiochip_open(self, chip): + self.calls.append(("gpiochip_open", chip)) + return "handle" + + def gpio_claim_output(self, handle, gpio, level=0): + self.calls.append(("gpio_claim_output", handle, gpio, level)) + + def gpio_write(self, handle, gpio, level): + self.calls.append(("gpio_write", handle, gpio, level)) + + + def pulse(self, level, mask, delay): + return ("pulse", level, mask, delay) + + def tx_wave(self, handle, gpio, pulses): + self.calls.append(("tx_wave", handle, gpio, pulses)) + + def tx_busy(self, handle, gpio, tx_type): + self.calls.append(("tx_busy", handle, gpio, tx_type)) + return False + + def gpio_free(self, handle, gpio): + self.calls.append(("gpio_free", handle, gpio)) + + def gpiochip_close(self, handle): + self.calls.append(("gpiochip_close", handle)) + + +class Raw433BackendTest(unittest.TestCase): + def setUp(self): + FakePi.instances = [] + + def test_config_reads_tx_gpio_from_app_config(self): + class AppConfig: + def ReadValue(self, entry, return_type=str, default=None, section=None): + if entry == "TXGPIO": + return 17 + return default + + config = Raw433Config.from_app_config(AppConfig()) + + self.assertEqual(17, config.tx_gpio) + + def test_config_uses_default_gpio_when_config_value_is_none(self): + config = Raw433Config(tx_gpio=None) + + self.assertEqual(4, config.tx_gpio) + + def test_pigpio_transmitter_sends_waveform(self): + transmitter = Raw433Transmitter( + Raw433Config(tx_gpio=4), + is_pi5=False, + pigpio_module=FakePigpio(), + ) + frame = bytearray([0] * 7) + + transmitter.transmit(frame, 1) + + fake_pi = FakePi.instances[0] + self.assertIn(("set_mode", 4, "output"), fake_pi.calls) + self.assertIn(("write", 4, 0), fake_pi.calls) + wave_call = [call for call in fake_pi.calls if call[0] == "wave_add_generic"][0] + waveform = wave_call[1] + self.assertEqual(("pulse", 1 << 4, 0, 9415), waveform[0]) + self.assertIn(("wave_send_once", 7), fake_pi.calls) + self.assertIn(("stop",), fake_pi.calls) + + def test_pigpio_transmitter_can_set_idle_low(self): + transmitter = Raw433Transmitter( + Raw433Config(tx_gpio=4), + is_pi5=False, + pigpio_module=FakePigpio(), + ) + + transmitter.set_idle_low() + + fake_pi = FakePi.instances[0] + self.assertEqual( + [("set_mode", 4, "output"), ("write", 4, 0), ("stop",)], + fake_pi.calls, + ) + + def test_lgpio_transmitter_sends_waveform(self): + fake_lgpio = FakeLgpio() + transmitter = Raw433Transmitter( + Raw433Config(tx_gpio=4), + is_pi5=True, + lgpio_module=fake_lgpio, + lgpio_chip=4, + ) + frame = bytearray([0] * 7) + + transmitter.transmit(frame, 1) + + self.assertEqual(("gpiochip_open", 4), fake_lgpio.calls[0]) + self.assertEqual(("gpio_claim_output", "handle", 4, 0), fake_lgpio.calls[1]) + tx_wave_call = [call for call in fake_lgpio.calls if call[0] == "tx_wave"][0] + self.assertEqual("handle", tx_wave_call[1]) + self.assertEqual(4, tx_wave_call[2]) + self.assertEqual(("pulse", 1, 1, 9415), tx_wave_call[3][0]) + + def test_lgpio_transmitter_can_set_idle_low(self): + fake_lgpio = FakeLgpio() + transmitter = Raw433Transmitter( + Raw433Config(tx_gpio=4), + is_pi5=True, + lgpio_module=fake_lgpio, + lgpio_chip=4, + ) + + transmitter.set_idle_low() + + self.assertEqual( + [ + ("gpiochip_open", 4), + ("gpio_claim_output", "handle", 4, 0), + ("gpio_write", "handle", 4, 0), + ("gpio_free", "handle", 4), + ("gpiochip_close", "handle"), + ], + fake_lgpio.calls, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_rf_backend.py b/tests/test_rf_backend.py new file mode 100644 index 0000000..ddee6c2 --- /dev/null +++ b/tests/test_rf_backend.py @@ -0,0 +1,163 @@ +import contextlib +import inspect +import importlib +import json +import sys +import types +import unittest +from unittest import mock + + +def _install_import_stubs(): + sys.modules.setdefault("ephem", types.ModuleType("ephem")) + + pigpio = types.ModuleType("pigpio") + pigpio.OUTPUT = 0 + pigpio.pulse = lambda *args, **kwargs: (args, kwargs) + pigpio.pi = lambda: types.SimpleNamespace(connected=True) + sys.modules.setdefault("pigpio", pigpio) + + lgpio = types.ModuleType("lgpio") + lgpio.TX_WAVE = 1 + sys.modules.setdefault("lgpio", lgpio) + + cc1101 = types.ModuleType("cc1101") + sys.modules.setdefault("cc1101", cc1101) + + flask = types.ModuleType("flask") + flask.Flask = object + flask.render_template = lambda *args, **kwargs: "" + flask.request = types.SimpleNamespace(url="", method="GET", values={}, headers={}) + flask.Response = lambda *args, **kwargs: None + flask.jsonify = lambda *args, **kwargs: None + flask.json = json + sys.modules.setdefault("flask", flask) + + +_install_import_stubs() +cc1101_backend = importlib.import_module("cc1101_backend") +operateShutters = importlib.import_module("operateShutters") + + +class FakeConfig: + TXGPIO = 4 + RFBackend = "raw_433" + CC1101Frequency = 433.42 + CC1101SPIBus = 0 + CC1101SPIDevice = 0 + CC1101OutputPower = 0xC6 + + def __init__(self): + self.Shutters = {"279620": {"name": "Test", "code": 1}} + + def setCode(self, shutter_id, code): + self.Shutters[shutter_id]["code"] = code + + +class FakeRadio: + instances = [] + + def __init__(self, spi_bus, spi_chip_select, lock_spi_device): + self.spi_bus = spi_bus + self.spi_chip_select = spi_chip_select + self.lock_spi_device = lock_spi_device + self.calls = [] + FakeRadio.instances.append(self) + + def __enter__(self): + self.calls.append(("enter",)) + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.calls.append(("exit", exc_type)) + return False + + def set_base_frequency_hertz(self, frequency_hertz): + self.calls.append(("frequency", frequency_hertz)) + + def set_symbol_rate_baud(self, symbol_rate): + self.calls.append(("symbol_rate", symbol_rate)) + + def set_output_power(self, output_power): + self.calls.append(("output_power", tuple(output_power))) + + @contextlib.contextmanager + def asynchronous_transmission(self): + self.calls.append(("async_enter",)) + yield "GDO0" + self.calls.append(("async_exit",)) + + +class BackendDispatchTest(unittest.TestCase): + def setUp(self): + FakeRadio.instances = [] + sys.modules["cc1101"].CC1101 = FakeRadio + cc1101_backend.cc1101.CC1101 = FakeRadio + + def test_shutter_uses_imported_cc1101_module(self): + self.assertNotIn( + "cc1101_module", + inspect.signature(operateShutters.Shutter).parameters, + ) + + def test_raw_433_backend_uses_existing_waveform_path(self): + config = FakeConfig() + config.RFBackend = "raw_433" + transmitter = mock.Mock() + shutter = operateShutters.Shutter(config=config, rf_transmitter=transmitter) + + shutter.sendCommand("279620", shutter.buttonUp, 2) + + transmitter.transmit.assert_called_once_with(shutter.frame, 2) + self.assertEqual(2, config.Shutters["279620"]["code"]) + + def test_cc1101_backend_configures_radio_and_reuses_waveform(self): + config = FakeConfig() + config.RFBackend = "cc1101" + config.CC1101Frequency = 433.42 + config.CC1101SPIBus = 0 + config.CC1101SPIDevice = 0 + config.CC1101OutputPower = 0xC6 + shutter = operateShutters.Shutter(config=config) + shutter.rf_transmitter.waveform_transmitter.set_idle_low = mock.Mock() + shutter.rf_transmitter.waveform_transmitter.transmit = mock.Mock() + + shutter.sendCommand("279620", shutter.buttonDown, 3) + + self.assertEqual(1, len(FakeRadio.instances)) + radio = FakeRadio.instances[0] + self.assertEqual(0, radio.spi_bus) + self.assertEqual(0, radio.spi_chip_select) + self.assertTrue(radio.lock_spi_device) + self.assertIn(("frequency", 433.42e6), radio.calls) + self.assertIn(("symbol_rate", 1562.5), radio.calls) + self.assertIn(("output_power", (0, 0xC6)), radio.calls) + self.assertLess(radio.calls.index(("async_enter",)), radio.calls.index(("async_exit",))) + shutter.rf_transmitter.waveform_transmitter.set_idle_low.assert_called_once_with() + shutter.rf_transmitter.waveform_transmitter.transmit.assert_called_once_with(shutter.frame, 3) + self.assertEqual(2, config.Shutters["279620"]["code"]) + + def test_cc1101_backend_reuses_transmitter_between_commands(self): + config = FakeConfig() + config.RFBackend = "cc1101" + shutter = operateShutters.Shutter(config=config) + + shutter.rf_transmitter.waveform_transmitter.set_idle_low = mock.Mock() + shutter.rf_transmitter.waveform_transmitter.transmit = mock.Mock() + shutter.sendCommand("279620", shutter.buttonUp, 1) + shutter.sendCommand("279620", shutter.buttonStop, 1) + + self.assertEqual(1, len(FakeRadio.instances)) + self.assertEqual( + [mock.call(), mock.call()], + shutter.rf_transmitter.waveform_transmitter.set_idle_low.call_args_list, + ) + self.assertEqual( + [mock.call(shutter.frame, 1), mock.call(shutter.frame, 1)], + shutter.rf_transmitter.waveform_transmitter.transmit.call_args_list, + ) + self.assertEqual(3, config.Shutters["279620"]["code"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_rf_backend_factory.py b/tests/test_rf_backend_factory.py new file mode 100644 index 0000000..beb548b --- /dev/null +++ b/tests/test_rf_backend_factory.py @@ -0,0 +1,79 @@ +import inspect +import sys +import types +import unittest + +fake_cc1101 = types.ModuleType("cc1101") +sys.modules["cc1101"] = fake_cc1101 + +from cc1101_backend import CC1101Transmitter +import cc1101_backend +from raw_433_backend import Raw433Transmitter +from rf_backend import create_transmitter + + +class FakeRadio: + def __init__(self, spi_bus, spi_chip_select, lock_spi_device): + pass + + +class FakePigpio: + OUTPUT = "output" + + +class BackendFactoryTest(unittest.TestCase): + def setUp(self): + fake_cc1101.CC1101 = FakeRadio + cc1101_backend.cc1101.CC1101 = FakeRadio + + def test_factory_uses_imported_cc1101_module(self): + self.assertNotIn( + "cc1101_module", + inspect.signature(create_transmitter).parameters, + ) + + def test_creates_raw_433_transmitter_by_default(self): + config = types.SimpleNamespace(RFBackend="raw_433", TXGPIO=4) + + transmitter = create_transmitter( + config, + is_pi5=False, + pigpio_module=FakePigpio(), + ) + + self.assertIsInstance(transmitter, Raw433Transmitter) + + def test_rejects_gpio_backend_alias(self): + config = types.SimpleNamespace(RFBackend="gpio", TXGPIO=4) + + with self.assertRaisesRegex(ValueError, "raw_433 or cc1101"): + create_transmitter(config, is_pi5=False, pigpio_module=FakePigpio()) + + def test_creates_cc1101_transmitter_wrapping_raw_433_transmitter(self): + config = types.SimpleNamespace( + RFBackend="cc1101", + TXGPIO=4, + CC1101Frequency=433.42, + CC1101SPIBus=0, + CC1101SPIDevice=0, + CC1101OutputPower=0xC6, + ) + + transmitter = create_transmitter( + config, + is_pi5=False, + pigpio_module=FakePigpio(), + ) + + self.assertIsInstance(transmitter, CC1101Transmitter) + self.assertIsInstance(transmitter.waveform_transmitter, Raw433Transmitter) + + def test_rejects_unknown_backend(self): + config = types.SimpleNamespace(RFBackend="other", TXGPIO=4) + + with self.assertRaisesRegex(ValueError, "Unsupported RFBackend"): + create_transmitter(config, is_pi5=False, pigpio_module=FakePigpio()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_rf_config.py b/tests/test_rf_config.py new file mode 100644 index 0000000..a03b237 --- /dev/null +++ b/tests/test_rf_config.py @@ -0,0 +1,85 @@ +import os +import tempfile +import textwrap +import unittest + +from config import MyConfig + + +class RFConfigTest(unittest.TestCase): + def _load_config(self, content): + fd, path = tempfile.mkstemp(prefix="pi-somfy-test-", suffix=".conf") + try: + with os.fdopen(fd, "w") as config_file: + config_file.write(textwrap.dedent(content)) + + config = MyConfig(filename=path) + self.assertTrue(config.LoadConfig()) + return config + finally: + os.unlink(path) + + def test_defaults_to_raw_433_backend_for_existing_configs(self): + config = self._load_config( + """ + [General] + LogLocation = /tmp/ + Latitude = 51.4769 + Longitude = 0 + SendRepeat = 2 + TXGPIO = 4 + UseHttps = False + HTTPPort = 80 + HTTPSPort = 443 + RTS_Address = 0x279620 + + [MQTT] + [Shutters] + [ShutterRollingCodes] + [ShutterIntermediatePositions] + [Scheduler] + """ + ) + + self.assertEqual("raw_433", config.RFBackend) + self.assertFalse(hasattr(config, "CC1101Frequency")) + self.assertFalse(hasattr(config, "CC1101SPIBus")) + self.assertFalse(hasattr(config, "CC1101SPIDevice")) + self.assertFalse(hasattr(config, "CC1101OutputPower")) + + def test_parses_cc1101_backend_options(self): + config = self._load_config( + """ + [General] + LogLocation = /tmp/ + Latitude = 51.4769 + Longitude = 0 + SendRepeat = 2 + TXGPIO = 4 + RFBackend = cc1101 + CC1101Frequency = 433.42 + CC1101SPIBus = 0 + CC1101SPIDevice = 0 + CC1101OutputPower = 0xC6 + UseHttps = False + HTTPPort = 80 + HTTPSPort = 443 + RTS_Address = 0x279620 + + [MQTT] + [Shutters] + [ShutterRollingCodes] + [ShutterIntermediatePositions] + [Scheduler] + """ + ) + + self.assertEqual("cc1101", config.RFBackend) + self.assertFalse(hasattr(config, "CC1101Frequency")) + self.assertFalse(hasattr(config, "CC1101SPIBus")) + self.assertFalse(hasattr(config, "CC1101SPIDevice")) + self.assertFalse(hasattr(config, "CC1101OutputPower")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_service_scripts.py b/tests/test_service_scripts.py new file mode 100644 index 0000000..5f1ae48 --- /dev/null +++ b/tests/test_service_scripts.py @@ -0,0 +1,32 @@ +from pathlib import Path +import re +import unittest + + +ROOT = Path(__file__).resolve().parents[1] + + +class ServiceScriptTest(unittest.TestCase): + def test_service_helpers_do_not_assume_pi_home_directory(self): + for relative_path in ("installService.sh", "start.sh", "shutters.service"): + content = (ROOT / relative_path).read_text() + self.assertNotIn("/home/pi", content) + self.assertNotIn("User=pi", content) + + def test_start_script_runs_foreground_python_process(self): + content = (ROOT / "start.sh").read_text() + + self.assertIn("exec", content) + self.assertNotIn("sudo /usr/bin/python3", content) + self.assertIsNone(re.search(r"\s&\s*(?:#.*)?$", content, re.MULTILINE)) + + def test_installer_generates_pi_somfy_systemd_service(self): + content = (ROOT / "installService.sh").read_text() + + self.assertIn("BASH_SOURCE", content) + self.assertIn("pi-somfy.service", content) + self.assertIn("operateShutters.conf", content) + + +if __name__ == "__main__": + unittest.main()