{"id":4596,"date":"2024-01-11T18:00:40","date_gmt":"2024-01-11T18:00:40","guid":{"rendered":"https:\/\/ibex.tech\/python\/?p=4596"},"modified":"2024-01-12T11:17:54","modified_gmt":"2024-01-12T11:17:54","slug":"udp-tx-and-receive-non-blocking","status":"publish","type":"post","link":"https:\/\/ibex.tech\/python\/tcp-ip\/udp\/udp-tx-and-receive-non-blocking","title":{"rendered":"UDP tx and receive non blocking"},"content":{"rendered":"\n<h4 class=\"wp-block-heading\">Transmit broadcast packet<\/h4>\n\n\n\n<pre class=\"wp-block-code\"><code>import socket\n\nsocket_udp1 = None\n\n\n#******************************************\n#******************************************\n#********** GET LOCAL IP ADDRESS **********\n#******************************************\n#******************************************\ndef get_local_ip():\n    \n    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\n\n    try:\n        sock.connect(('10.255.255.255', 1))     #Doesn't neeed to be a reachable IP address\n        local_ip = sock.getsockname()&#91;0]\n    except Exception:\n        local_ip = '127.0.0.1'\n    finally:\n        sock.close()\n\n    return local_ip\n\n\n#*********************************\n#*********************************\n#********** OPEN SOCKET **********\n#*********************************\n#*********************************\ndef udp_open_socket(local_port, local_ip):\n    global socket_udp1\n    socket_udp1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)     #Create a UDP socket\n    socket_udp1.bind((local_ip, local_port))                   #Bind the socket to the local IP and port\n    \n    socket_udp1.settimeout(0.001)                              #&lt;&lt;&lt;&lt;SET TIMEOUT FOR calls to recvfrom()  (prevents blocking indefinitely, can't use 0, so set to say 0.001=1mS for applications where we don't want to stall)\n\n\n#**********************************\n#**********************************\n#********** CLOSE SOCKET **********\n#**********************************\n#**********************************\ndef udp_close_socket():\n    socket_udp1.close()\n\n\n#***************************************************\n#***************************************************\n#********** TRANSMIT BROADCAST UDP PACKET **********\n#***************************************************\n#***************************************************\ndef udp_tx_broadcast(packet, remote_port, broadcast_ip):\n    global socket_udp1\n\n    socket_udp1.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)      #Enable broadcasting mode\n    socket_udp1.sendto(packet, (broadcast_ip, remote_port))                #Send the byte array to the broadcast address\n\n\n#************************************************\n#************************************************\n#********** CHECK FOR RECEIVED PACKETS **********\n#************************************************\n#************************************************\ndef udp_check_for_rx():\n    global socket_udp1\n\n    try:\n        while True:     #&lt;Loop forever, our sockets settimeout() will cause \"except socket.timeout:\" to fire on no more packets received\n            data, addr = socket_udp1.recvfrom(1824)         #Wait for a response, timeout will trigger an exception\n\n            #Debug print the response:\n            #print(\"New response:\")\n            #print(data.decode('utf-8'))\n            \n            rx_data = data.decode('utf-8')\n          \n\n    except socket.timeout:\n        #No rx waiting\n        pass\n\n\n#########################\n##### EXAMPLE USAGE #####\n#########################\n    packet = bytes(&#91;0x01, 0x02, 0x04, 0x04])\n    broadcast_ip = \"192.168.1.255\"\n    port = 12345\n    local_ip = get_local_ip()\n    udp_tx_broadcast(packet, port, broadcast_ip, local_ip)\n    time.sleep(3)\n    udp_check_for_rx()    #Non blocking, will exit if nothing received\n\n<\/code><\/pre>\n\n\n\n<p><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Transmit broadcast packet<\/p>\n","protected":false},"author":5,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[365],"tags":[],"class_list":["post-4596","post","type-post","status-publish","format-standard","hentry","category-udp"],"_links":{"self":[{"href":"https:\/\/ibex.tech\/python\/wp-json\/wp\/v2\/posts\/4596","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/ibex.tech\/python\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/ibex.tech\/python\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/ibex.tech\/python\/wp-json\/wp\/v2\/users\/5"}],"replies":[{"embeddable":true,"href":"https:\/\/ibex.tech\/python\/wp-json\/wp\/v2\/comments?post=4596"}],"version-history":[{"count":4,"href":"https:\/\/ibex.tech\/python\/wp-json\/wp\/v2\/posts\/4596\/revisions"}],"predecessor-version":[{"id":4601,"href":"https:\/\/ibex.tech\/python\/wp-json\/wp\/v2\/posts\/4596\/revisions\/4601"}],"wp:attachment":[{"href":"https:\/\/ibex.tech\/python\/wp-json\/wp\/v2\/media?parent=4596"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/ibex.tech\/python\/wp-json\/wp\/v2\/categories?post=4596"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/ibex.tech\/python\/wp-json\/wp\/v2\/tags?post=4596"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}