有一个关于如何为IoT Edge创建基于Python的模块的教程: https://docs.microsoft.com/en-us/azure/iot-edge/tutorial-python-module
正如教程所示,我建议使用带有IoT Edge扩展的Visual Studio代码。然后你得到Python模块模板,Dockerfile等。你可以直接从VS Code推送你的自定义模块到你的私人容器注册表,例如Azure容器注册表并设置部署清单(在哪个Edge设备上运行哪些模块)。
根据评论中的要求,我构建了一个快速完整的示例(尽管没有测试)。使用VS代码IoT Edge扩展创建新的Python模块时,示例仅基于模板示例
import random import time import sys import iothub_client # pylint: disable=E0611 from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError # messageTimeout - the maximum time in milliseconds until a message times out. # The timeout period starts at IoTHubModuleClient.send_event_async. # By default, messages do not expire. MESSAGE_TIMEOUT = 10000 # global counters RECEIVE_CALLBACKS = 0 SEND_CALLBACKS = 0 # Choose HTTP, AMQP or MQTT as transport protocol. Currently only MQTT is supported. PROTOCOL = IoTHubTransportProvider.MQTT # Callback received when the message that we're forwarding is processed. def send_confirmation_callback(message, result, user_context): global SEND_CALLBACKS print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) ) map_properties = message.properties() key_value_pair = map_properties.get_internals() print ( " Properties: %s" % key_value_pair ) SEND_CALLBACKS += 1 print ( " Total calls confirmed: %d" % SEND_CALLBACKS ) class HubManager(object): def __init__( self, protocol=IoTHubTransportProvider.MQTT): self.client_protocol = protocol self.client = IoTHubModuleClient() self.client.create_from_environment(protocol) # set the time until a message times out self.client.set_option("messageTimeout", MESSAGE_TIMEOUT) # Forwards the message received onto the next stage in the process. def forward_event_to_output(self, outputQueueName, event, send_context): self.client.send_event_async( outputQueueName, event, send_confirmation_callback, send_context) def main(protocol): try: print ( "\nPython %s\n" % sys.version ) print ( "IoT Hub Client for Python" ) hub_manager = HubManager(protocol) print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol ) print ( "The sample is now waiting for messages and will indefinitely. Press Ctrl-C to exit. ") while True: # Build the message with simulated telemetry values. # Put your real sensor reading logic here instead temperature = TEMPERATURE + (random.random() * 15) humidity = HUMIDITY + (random.random() * 20) msg_txt_formatted = MSG_TXT % (temperature, humidity) message = IoTHubMessage(msg_txt_formatted) hubManager.forward_event_to_output("output1", message, 0) time.sleep(10) except IoTHubError as iothub_error: print ( "Unexpected error %s from IoTHub" % iothub_error ) return except KeyboardInterrupt: print ( "IoTHubModuleClient sample stopped" ) if __name__ == '__main__': main(PROTOCOL)