Select

About

Setup three channels, add them to a poll object and wait for events to occur.

Three channels are polled:

  1. An event channel that waits for a button to be pressed.
  2. A queue channel that the string “foo” is written to in the script.
  3. A socket channel waiting for UDP packets.

NOTE: Change the UDP configuration to match your setup.

Source code

#
# @section License
#
# The MIT License (MIT)
#
# Copyright (c) 2016-2017, Erik Moqvist
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use, copy,
# modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# This file is part of the Pumbaa project.
#


import select
import socket
from sync import Event, Queue
from drivers import Exti
import board

BUTTON_PIN = board.PIN_GPIO0
UDP_ADDRESS = '192.168.1.103'
UDP_PORT = 30303

button = Event()
exti = Exti(BUTTON_PIN, Exti.FALLING, button, 0x1)

queue = Queue()

udp = socket.socket(type=socket.SOCK_DGRAM)
udp.bind((UDP_ADDRESS, UDP_PORT))

poll = select.poll()
poll.register(button)
poll.register(queue)
poll.register(udp)

queue.write('foo')

while True:
    [(channel, eventmask)] = poll.poll()

    if channel is button:
        button.read(0x1)
        print("button")
    elif channel is queue:
        print("queue:", queue.read(3))
    elif channel is udp:
        print("udp:", udp.recv(1024))

The source code can also be found on Github in the examples/select folder.

Build and run

Build and upload the application.

$ cd examples/blink
$ make -s BOARD=esp12e CDEFS_EXTRA="CONFIG_START_NETWORK_INTERFACE_WIFI_SSID=ssid CONFIG_START_NETWORK_INTERFACE_WIFI_PASSWORD=password" run
...
queue: b'foo'

At this point the application is waiting for an event to occur. Send a UDP packet to it from your PC using Python.

>>> import socket
>>> udp = socket.socket(type=socket.SOCK_DGRAM)
>>> udp.sendto('bar', ('192.168.1.103', 30303))

The written packet is received by the application and printed.

udp: b'bar'