Source code for switchbot_api.switchbot_scanner

'''
Python-Switchbot-BLE: A Python library for interfacing with Switchbot devices over Bluetooth Low Energy (BLE)
Copyright (C) 2023  Benjamin Carlson
'''


from bleak import BleakScanner, BLEDevice, AdvertisementData
from typing import Tuple, Optional, List
import asyncio
import platform

from .switchbot import VirtualSwitchBot


[docs]class SwitchBotScanner: # Service UUID UNKNOWN_SERVICE_DATA_UUID = "00000d00-0000-1000-8000-00805f9b34fb" # Manufacturer ID for Nordic Semiconductors NORDIC_MANUFACTURER_ID = 0x59 def __init__(self, bot_count : int = 1) -> None: self._bot_count = bot_count self._found_mac_addrs : List[str] = [] def _filter_device_adv( self, device: BLEDevice, adv: AdvertisementData ) -> Tuple[bool, Optional[bytearray]]: """ Determines if device is a SwitchBot or not :param device: Device object :type device: BLEDevice :param adv: Device advertisement data :type adv: AdvertisementData :return: Whether the device is a SwitchBot and if so, the associated service data :rtype: Tuple[bool, Optional[bytes]] """ man_data = adv.manufacturer_data.get(self.NORDIC_MANUFACTURER_ID) if man_data is None: return False, None service_data = adv.service_data.get(self.UNKNOWN_SERVICE_DATA_UUID) if service_data is None: return False, None mac_addr = device.address # macOS does not provide the MAC address, only a "UUID", so we can't check it if platform.system() != "Darwin": # I am not sure if this is only for SwitchBot or is a general pattern mac_bytes = bytearray(int(x, base=16) for x in mac_addr.split(":")) if mac_bytes != man_data: return False, None print(f"Found SwitchBot: {device.name} ({mac_addr})") print(f" - RSSI: {adv.rssi}") return True, bytearray(service_data) def __aiter__(self): return self async def __anext__(self): """ Scans for the next SwitchBot until the desired number of SwitchBots are found :return: The VirtualSwitchBot found :rtype: AsyncIterator[VirtualSwitchBot] """ async with BleakScanner() as scanner: bots_found = 0 while True: if bots_found >= self._bot_count: print(f"Found {bots_found} SwitchBots, stopping scanner...") raise StopAsyncIteration await asyncio.sleep(1.0) data = scanner.discovered_devices_and_advertisement_data for _, (dis_device, dis_advertisement) in data.items(): bot_address = dis_device.address # Ignore if we already found this device if bot_address in self._found_mac_addrs: continue is_switchbot, dev_service_data = self._filter_device_adv( dis_device, dis_advertisement ) if is_switchbot: self._found_mac_addrs.append(bot_address) bots_found += 1 switch_bot = VirtualSwitchBot(bot_address) switch_bot.info.read_service_bytes(dev_service_data) return switch_bot # Alias for easy use next_bot = __anext__