Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 60 additions & 10 deletions findmy/scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any

import bleak
from bleak import BleakScanner
from typing_extensions import override

Expand Down Expand Up @@ -37,7 +38,7 @@
def _print_scanning_results(seen_devices: dict[str, list[OfflineFindingDevice]]) -> None:
"""Print summary of each device seen."""
# ruff: noqa: T201
print("================ RESULTS =========================")
print("================ RESULTS ====================")
for mac, devices in seen_devices.items():
avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len(
[d for d in devices if d.rssi is not None]
Expand Down Expand Up @@ -69,20 +70,24 @@ class OfflineFindingDevice(ABC):
OF_HEADER_SIZE = 2
OF_TYPE = 0x12

def __init__(
def __init__( # noqa: PLR0913
self,
mac_bytes: bytes,
status_byte: int,
detected_at: datetime,
rssi: int | None = None,
*,
additional_data: dict[Any, Any] | None = None,
device: BLEDevice | None = None,
) -> None:
"""Instantiate an OfflineFindingDevice."""
self._mac_bytes: bytes = mac_bytes
self._status: int = status_byte
self._detected_at: datetime = detected_at
self._rssi: int | None = rssi

self._additional_data: dict[Any, Any] = additional_data or {}
self._device = device

@property
def mac_address(self) -> str:
Expand Down Expand Up @@ -134,25 +139,29 @@ def print(self, file: SupportsWrite[str] | None = None) -> None:

@classmethod
@abstractmethod
def from_payload(
def from_payload( # noqa: PLR0913
cls,
mac_address: str,
payload: bytes,
detected_at: datetime,
rssi: int | None = None,
*,
additional_data: dict[Any, Any] | None = None,
device: BLEDevice | None = None,
) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
raise NotImplementedError

@classmethod
def from_ble_payload(
def from_ble_payload( # noqa: PLR0913
cls,
mac_address: str,
ble_payload: bytes,
detected_at: datetime | None = None,
rssi: int | None = None,
*,
additional_data: dict[Any, Any] | None = None,
device: BLEDevice | None = None,
) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from a BLE packet payload."""
if len(ble_payload) < cls.OF_HEADER_SIZE:
Expand Down Expand Up @@ -180,9 +189,25 @@ def from_ble_payload(
ble_payload[cls.OF_HEADER_SIZE :],
detected_at or datetime.now().astimezone(),
rssi,
additional_data,
additional_data=additional_data,
device=device,
)

async def play_sound(self) -> None:
"""Make the device play a sound."""
if self._device is None:
logger.warning(
"Device instance does not originate from scanner. Communication with the device "
"will initiate a BLE scan first, which might fail."
)

async with bleak.BleakClient(self._device or self.mac_address) as client:
await client.write_gatt_char(
"4F860003-943B-49EF-BED4-2F730304427A",
bytearray([1, 0, 3]),
response=True,
)

@override
def __eq__(self, other: object) -> bool:
if isinstance(other, OfflineFindingDevice):
Expand All @@ -207,10 +232,19 @@ def __init__( # noqa: PLR0913
first_adv_key_bytes: bytes,
detected_at: datetime,
rssi: int | None = None,
*,
additional_data: dict[Any, Any] | None = None,
device: BLEDevice | None = None,
) -> None:
"""Instantiate a NearbyOfflineFindingDevice."""
super().__init__(mac_bytes, status_byte, detected_at, rssi, additional_data)
super().__init__(
mac_bytes,
status_byte,
detected_at,
rssi,
additional_data=additional_data,
device=device,
)
# When nearby, only the first 6 bytes of the public key are transmitted
self._first_adv_key_bytes: bytes = first_adv_key_bytes

Expand Down Expand Up @@ -253,7 +287,9 @@ def from_payload(
payload: bytes,
detected_at: datetime,
rssi: int | None = None,
*,
additional_data: dict[Any, Any] | None = None,
device: BLEDevice | None = None,
) -> NearbyOfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.OF_PAYLOAD_LEN:
Expand All @@ -278,7 +314,8 @@ def from_payload(
partial_pubkey,
detected_at,
rssi,
additional_data,
additional_data=additional_data,
device=device,
)

@override
Expand Down Expand Up @@ -308,10 +345,19 @@ def __init__( # noqa: PLR0913
hint: int,
detected_at: datetime,
rssi: int | None = None,
*,
additional_data: dict[Any, Any] | None = None,
device: BLEDevice | None = None,
) -> None:
"""Initialize a :meth:`SeparatedOfflineFindingDevice`."""
super().__init__(mac_bytes, status, detected_at, rssi, additional_data)
super().__init__(
mac_bytes,
status,
detected_at,
rssi,
additional_data=additional_data,
device=device,
)
self._public_key: bytes = public_key
self._hint: int = hint

Expand Down Expand Up @@ -360,7 +406,9 @@ def from_payload(
payload: bytes,
detected_at: datetime,
rssi: int | None = None,
*,
additional_data: dict[Any, Any] | None = None,
device: BLEDevice | None = None,
) -> SeparatedOfflineFindingDevice | None:
"""Get a SeparatedOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.OF_PAYLOAD_LEN:
Expand Down Expand Up @@ -391,7 +439,8 @@ def from_payload(
hint,
detected_at,
rssi,
additional_data,
additional_data=additional_data,
device=device,
)

@override
Expand Down Expand Up @@ -499,7 +548,8 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None:
apple_data,
detected_at,
rssi,
additional_data,
additional_data=additional_data,
device=device,
)

async def scan_for(
Expand Down