Transmit broadcast packet

import socket

socket_udp1 = None


#******************************************
#******************************************
#********** GET LOCAL IP ADDRESS **********
#******************************************
#******************************************
def get_local_ip():
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    try:
        sock.connect(('10.255.255.255', 1))     #Doesn't neeed to be a reachable IP address
        local_ip = sock.getsockname()[0]
    except Exception:
        local_ip = '127.0.0.1'
    finally:
        sock.close()

    return local_ip


#*********************************
#*********************************
#********** OPEN SOCKET **********
#*********************************
#*********************************
def udp_open_socket(local_port, local_ip):
    global socket_udp1
    socket_udp1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)     #Create a UDP socket
    socket_udp1.bind((local_ip, local_port))                   #Bind the socket to the local IP and port
    
    socket_udp1.settimeout(0.001)                              #<<<<SET TIMEOUT FOR calls to recvfrom()  (prevents blocking indefinitely, can't use 0, so set to say 0.001=1mS for applications where we don't want to stall)


#**********************************
#**********************************
#********** CLOSE SOCKET **********
#**********************************
#**********************************
def udp_close_socket():
    socket_udp1.close()


#***************************************************
#***************************************************
#********** TRANSMIT BROADCAST UDP PACKET **********
#***************************************************
#***************************************************
def udp_tx_broadcast(packet, remote_port, broadcast_ip):
    global socket_udp1

    socket_udp1.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)      #Enable broadcasting mode
    socket_udp1.sendto(packet, (broadcast_ip, remote_port))                #Send the byte array to the broadcast address


#************************************************
#************************************************
#********** CHECK FOR RECEIVED PACKETS **********
#************************************************
#************************************************
def udp_check_for_rx():
    global socket_udp1

    try:
        while True:     #<Loop forever, our sockets settimeout() will cause "except socket.timeout:" to fire on no more packets received
            data, addr = socket_udp1.recvfrom(1824)         #Wait for a response, timeout will trigger an exception

            #Debug print the response:
            #print("New response:")
            #print(data.decode('utf-8'))
            
            rx_data = data.decode('utf-8')
          

    except socket.timeout:
        #No rx waiting
        pass


#########################
##### EXAMPLE USAGE #####
#########################
    packet = bytes([0x01, 0x02, 0x04, 0x04])
    broadcast_ip = "192.168.1.255"
    port = 12345
    local_ip = get_local_ip()
    udp_tx_broadcast(packet, port, broadcast_ip, local_ip)
    time.sleep(3)
    udp_check_for_rx()    #Non blocking, will exit if nothing received

USEFUL?
We benefit hugely from resources on the web so we decided we should try and give back some of our knowledge and resources to the community by opening up many of our company’s internal notes and libraries through mini sites like this. We hope you find the site helpful.
Please feel free to comment if you can add help to this page or point out issues and solutions you have found, but please note that we do not provide support on this site. If you need help with a problem please use one of the many online forums.

Comments

Your email address will not be published. Required fields are marked *