Skip to content

ares_lora.ImageManagerException

Bases: Exception

Base class for DFU exceptions

Source code in serial-driver/src/ares_lora/dfu.py
class ImageManagerException(Exception):
    """Base class for DFU exceptions"""

ares_lora.AresImageStates dataclass

Data class for image states.

Attributes:

Name Type Description
slot int

The image slot.

version str

The version of the image.

hash bytes

The hash of the image.

bootable bool

Flag indicating if the image is bootable.

pending bool

Flag indicating image is pending.

confirmed bool

Flag indicating image has been confirmed.

active bool

Flag indicating image is active.

permanent bool

Flag indicating the image is permanent.

Source code in serial-driver/src/ares_lora/dfu.py
@dataclass
class AresImageStates:
    """Data class for image states.

    Attributes:
        slot: The image slot.
        version: The version of the image.
        hash: The hash of the image.
        bootable: Flag indicating if the image is bootable.
        pending: Flag indicating image is pending.
        confirmed: Flag indicating image has been confirmed.
        active: Flag indicating image is active.
        permanent: Flag indicating the image is permanent.
    """
    slot: int = 0
    version: str = ''
    hash: bytes = b''
    bootable: bool = False
    pending: bool = False
    confirmed: bool = False
    active: bool = False
    permanent: bool = False

    @classmethod
    def from_smp_lib(cls, states: ImageState) -> 'AresImageStates':
        """Construct an `AresImageStates` object from an `ImageState` object.

        Args:
            states: The `ImageStates` object.

        Returns:
            A constructed `AresImageStates` object.
        """
        return cls(slot=states.slot, version=states.version, hash=states.hash, bootable=states.bootable,
                   pending=states.pending, confirmed=states.confirmed, active=states.active, permanent=states.permanent)

ares_lora.AresImageStates.from_smp_lib classmethod

from_smp_lib(states: ImageState) -> AresImageStates

Construct an AresImageStates object from an ImageState object.

Parameters:

Name Type Description Default
states ImageState

The ImageStates object.

required

Returns:

Type Description
AresImageStates

A constructed AresImageStates object.

Source code in serial-driver/src/ares_lora/dfu.py
@classmethod
def from_smp_lib(cls, states: ImageState) -> 'AresImageStates':
    """Construct an `AresImageStates` object from an `ImageState` object.

    Args:
        states: The `ImageStates` object.

    Returns:
        A constructed `AresImageStates` object.
    """
    return cls(slot=states.slot, version=states.version, hash=states.hash, bootable=states.bootable,
               pending=states.pending, confirmed=states.confirmed, active=states.active, permanent=states.permanent)

ares_lora.AresUploadStatusBase

Bases: ABC

Base class for upload status CLI utilities. This must be subclassed.

Source code in serial-driver/src/ares_lora/dfu.py
class AresUploadStatusBase(ABC):
    """Base class for upload status CLI utilities. This must be subclassed."""

    def __init__(self, image_len: int):
        """Initializes the base class.

        Args:
            image_len: The length of the image in bytes.
        """
        self._len = image_len

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass

    @abstractmethod
    def update(self, offset: int) -> None:
        """Update method of the base class. This method must be overridden.

        Args:
            offset: The current offset for image writing.
        """
        pass

ares_lora.AresUploadStatusBase.__init__

__init__(image_len: int)

Initializes the base class.

Parameters:

Name Type Description Default
image_len int

The length of the image in bytes.

required
Source code in serial-driver/src/ares_lora/dfu.py
def __init__(self, image_len: int):
    """Initializes the base class.

    Args:
        image_len: The length of the image in bytes.
    """
    self._len = image_len

ares_lora.AresUploadStatusBase.update abstractmethod

update(offset: int) -> None

Update method of the base class. This method must be overridden.

Parameters:

Name Type Description Default
offset int

The current offset for image writing.

required
Source code in serial-driver/src/ares_lora/dfu.py
@abstractmethod
def update(self, offset: int) -> None:
    """Update method of the base class. This method must be overridden.

    Args:
        offset: The current offset for image writing.
    """
    pass

ares_lora.AresDfu

Device firmware update manager for Ares LoRa.

Source code in serial-driver/src/ares_lora/dfu.py
class AresDfu:
    """Device firmware update manager for Ares LoRa."""

    def __init__(self, port: str, verbose: bool = False,
                 upload_status_cls: Type[AresUploadStatusBase] = AresUploadStatus):
        """Initializes AresDfu.

        Args:
            port: The port that supports the Dfu protocol.
            verbose: Run DFU in verbose mode.
            upload_status_cls: The upload status class.
        """
        self._port = port
        self._verbose = verbose
        self._upload_status_cls = upload_status_cls

    async def _read_image_states(self, timeout: float = 2.0) -> list[ImageState]:
        async with SMPClient(SMPSerialTransport(), self._port) as client:
            response = await client.request(ImageStatesRead(), timeout_s=timeout)
            if success(response):
                return response.images
            elif error(response):
                raise ImageManagerException(f"Received error: {response}")
            else:
                raise ImageManagerException(f"Unknown response: {response}")

    def read_image_states(self, timeout: float = 2.0) -> tuple[AresImageStates, ...]:
        """Read the images and their states from the microcontroller.

        Args:
            timeout: Timeout in seconds to complete this operation.

        Returns:
            A tuple of AresImageStates objects.

        Raises:
            ImageManagerException: SMP Response errors or unknown responses.
        """
        states = asyncio.run(self._read_image_states(timeout))
        ret = [AresImageStates.from_smp_lib(state) for state in states]
        return tuple(ret)

    async def __upload_image_run(self, image: bytes, slot: int, timeout_s: float = 2.5,
                                 update_cb: Callable[[int], None] | None = None) -> None:
        async with SMPClient(SMPSerialTransport(), self._port) as client:
            async for offset in client.upload(image, slot, first_timeout_s=timeout_s, use_sha=True):
                if update_cb is not None:
                    update_cb(offset)
            await self._mark_image_pending(1, timeout_s)

    async def __upload_image(self, image: Path | str, slot: int, update_status: Type[AresUploadStatusBase] | None,
                             timeout_s: float = 2.5):
        image_path = Path(image)
        with open(image_path, "rb") as f:
            image: bytes = f.read()

        if update_status is None:
            await self.__upload_image_run(image, slot, timeout_s, None)
            return
        with update_status(len(image)) as status:
            await self.__upload_image_run(image, slot, timeout_s, status.update)

    async def _upload_image(self, image_path: str | Path, slot: int = 0, retries: int = 3, timeout_s: float = 2.5):
        status_cls = None
        if self._verbose:
            status_cls = self._upload_status_cls
        while True:
            try:
                await self.__upload_image(image_path, slot, status_cls, timeout_s)
            except Exception as e:
                retries -= 1
                if retries < 0:
                    raise ImageManagerException(e)
                await asyncio.sleep(1)
            else:
                break

    def upload_image(self, image_path: str | Path, slot: int = 0, retries: int = 3, timeout: float = 40.0) -> None:
        """Upload an image to the microcontroller.

        Args:
            image_path: The file path to the image to upload.
            slot: The slot to upload the image to.
            retries: The maximum amount of retries.
            timeout: Timeout of the first write request in seconds.

        Raises:
            ImageManagerException: Upload image failed.
        """
        asyncio.run(self._upload_image(image_path, slot, retries, timeout))

    async def _reset_mcu(self, force: Literal[0, 1] = 0, timeout: float = 2.5):
        async with SMPClient(SMPSerialTransport(), self._port) as client:
            response = await client.request(ResetWrite(force=force), timeout_s=timeout)
            if error_v1(response):
                if response.rc != smperror.MGMT_ERR.EOK:
                    raise ImageManagerException("Response is not OK")
            elif error_v2(response):
                if response.err.rc != OS_MGMT_RET_RC.OK:
                    raise ImageManagerException("Response is not OK")

    def reset_mcu(self, force: bool = False, timeout: float = 2.5):
        """Reset the microcontroller.

        Args:
            force: Force the reset. This will cause a cold restart as opposed to a warm restart.
            timeout: The timeout of the reset request in seconds.

        Raises:
            ImageManagerException: Reset request failed.
        """
        if force:
            force: Literal[0, 1] = 1
        else:
            force: Literal[0, 1] = 0
        asyncio.run(self._reset_mcu(force, timeout))

    async def _confirm_image(self, slot: int, force: bool, timeout: float) -> None:
        images = await self._read_image_states(timeout)
        if slot < 0 or slot >= len(images):
            raise ImageManagerException("Invalid slot")

        image = images[slot]
        if not image.active or not image.bootable or image.confirmed:
            # Invalid image
            if not image.active and not force:
                raise ImageManagerException("Cannot confirm image that is not active")
            if not image.bootable:
                raise ImageManagerException("Cannot confirm image that is not bootable")
            if image.confirmed and not force:
                raise ImageManagerException("Cannot confirm image that has been confirmed already")
        async with SMPClient(SMPSerialTransport(), self._port) as client:
            await client.request(ImageStatesWrite(hash=image.hash, confirm=True), timeout_s=timeout)

    def confirm_image(self, slot: int = 0, force: bool = False, timeout: float = 2.5):
        """Confirm an image.

        Args:
            slot: The image to confirm (index).
            force: Force the confirmation.
            timeout: The response timeout in seconds.

        Raises:
            ImageManagerException: Unable to confirm image.
        """
        asyncio.run(self._confirm_image(slot, force, timeout))

    async def _mark_image_pending(self, slot: int = 1, timeout: float = 2.5):
        images = await self._read_image_states(timeout)
        if slot >= len(images):
            raise ImageManagerException("Cannot mark image as pending since it doesn't exist")
        async with SMPClient(SMPSerialTransport(), self._port) as client:
            await client.request(ImageStatesWrite(hash=images[slot].hash), timeout_s=timeout)

    def mark_image_pending(self, slot: int = 1, timeout: float = 2.5):
        """Mark an image as pending.

        Args:
            slot: The image to mark as pending (index).
            timeout: The timeout for the request in seconds.

        Raises:
            ImageManagerException: Unable to mark image as pending.
        """
        asyncio.run(self._mark_image_pending(slot, timeout))

ares_lora.AresDfu.__init__

__init__(
    port: str,
    verbose: bool = False,
    upload_status_cls: Type[AresUploadStatusBase] = AresUploadStatus,
)

Initializes AresDfu.

Parameters:

Name Type Description Default
port str

The port that supports the Dfu protocol.

required
verbose bool

Run DFU in verbose mode.

False
upload_status_cls Type[AresUploadStatusBase]

The upload status class.

AresUploadStatus
Source code in serial-driver/src/ares_lora/dfu.py
def __init__(self, port: str, verbose: bool = False,
             upload_status_cls: Type[AresUploadStatusBase] = AresUploadStatus):
    """Initializes AresDfu.

    Args:
        port: The port that supports the Dfu protocol.
        verbose: Run DFU in verbose mode.
        upload_status_cls: The upload status class.
    """
    self._port = port
    self._verbose = verbose
    self._upload_status_cls = upload_status_cls

ares_lora.AresDfu.confirm_image

confirm_image(slot: int = 0, force: bool = False, timeout: float = 2.5)

Confirm an image.

Parameters:

Name Type Description Default
slot int

The image to confirm (index).

0
force bool

Force the confirmation.

False
timeout float

The response timeout in seconds.

2.5

Raises:

Type Description
ImageManagerException

Unable to confirm image.

Source code in serial-driver/src/ares_lora/dfu.py
def confirm_image(self, slot: int = 0, force: bool = False, timeout: float = 2.5):
    """Confirm an image.

    Args:
        slot: The image to confirm (index).
        force: Force the confirmation.
        timeout: The response timeout in seconds.

    Raises:
        ImageManagerException: Unable to confirm image.
    """
    asyncio.run(self._confirm_image(slot, force, timeout))

ares_lora.AresDfu.mark_image_pending

mark_image_pending(slot: int = 1, timeout: float = 2.5)

Mark an image as pending.

Parameters:

Name Type Description Default
slot int

The image to mark as pending (index).

1
timeout float

The timeout for the request in seconds.

2.5

Raises:

Type Description
ImageManagerException

Unable to mark image as pending.

Source code in serial-driver/src/ares_lora/dfu.py
def mark_image_pending(self, slot: int = 1, timeout: float = 2.5):
    """Mark an image as pending.

    Args:
        slot: The image to mark as pending (index).
        timeout: The timeout for the request in seconds.

    Raises:
        ImageManagerException: Unable to mark image as pending.
    """
    asyncio.run(self._mark_image_pending(slot, timeout))

ares_lora.AresDfu.read_image_states

read_image_states(timeout: float = 2.0) -> tuple[AresImageStates, ...]

Read the images and their states from the microcontroller.

Parameters:

Name Type Description Default
timeout float

Timeout in seconds to complete this operation.

2.0

Returns:

Type Description
tuple[AresImageStates, ...]

A tuple of AresImageStates objects.

Raises:

Type Description
ImageManagerException

SMP Response errors or unknown responses.

Source code in serial-driver/src/ares_lora/dfu.py
def read_image_states(self, timeout: float = 2.0) -> tuple[AresImageStates, ...]:
    """Read the images and their states from the microcontroller.

    Args:
        timeout: Timeout in seconds to complete this operation.

    Returns:
        A tuple of AresImageStates objects.

    Raises:
        ImageManagerException: SMP Response errors or unknown responses.
    """
    states = asyncio.run(self._read_image_states(timeout))
    ret = [AresImageStates.from_smp_lib(state) for state in states]
    return tuple(ret)

ares_lora.AresDfu.reset_mcu

reset_mcu(force: bool = False, timeout: float = 2.5)

Reset the microcontroller.

Parameters:

Name Type Description Default
force bool

Force the reset. This will cause a cold restart as opposed to a warm restart.

False
timeout float

The timeout of the reset request in seconds.

2.5

Raises:

Type Description
ImageManagerException

Reset request failed.

Source code in serial-driver/src/ares_lora/dfu.py
def reset_mcu(self, force: bool = False, timeout: float = 2.5):
    """Reset the microcontroller.

    Args:
        force: Force the reset. This will cause a cold restart as opposed to a warm restart.
        timeout: The timeout of the reset request in seconds.

    Raises:
        ImageManagerException: Reset request failed.
    """
    if force:
        force: Literal[0, 1] = 1
    else:
        force: Literal[0, 1] = 0
    asyncio.run(self._reset_mcu(force, timeout))

ares_lora.AresDfu.upload_image

upload_image(
    image_path: str | Path,
    slot: int = 0,
    retries: int = 3,
    timeout: float = 40.0,
) -> None

Upload an image to the microcontroller.

Parameters:

Name Type Description Default
image_path str | Path

The file path to the image to upload.

required
slot int

The slot to upload the image to.

0
retries int

The maximum amount of retries.

3
timeout float

Timeout of the first write request in seconds.

40.0

Raises:

Type Description
ImageManagerException

Upload image failed.

Source code in serial-driver/src/ares_lora/dfu.py
def upload_image(self, image_path: str | Path, slot: int = 0, retries: int = 3, timeout: float = 40.0) -> None:
    """Upload an image to the microcontroller.

    Args:
        image_path: The file path to the image to upload.
        slot: The slot to upload the image to.
        retries: The maximum amount of retries.
        timeout: Timeout of the first write request in seconds.

    Raises:
        ImageManagerException: Upload image failed.
    """
    asyncio.run(self._upload_image(image_path, slot, retries, timeout))