Socket programozás


import socket

hostname = socket.gethostname()
print(hostname)

addr = socket.gethostbyname('www.example.org')
print(addr)

hostname, aliases, addrs = socket.gethostbyname_ex('www.example.org')
print(hostname, aliases, addrs)

hostname, aliases, addrs = socket.gethostbyaddr('157.181.161.79')
print(hostname, aliases, addrs)

import socket

my_host = socket.gethostname()
hostname, aliases, addrs = socket.gethostbyname_ex(my_host)
print(hostname, aliases, addrs)

DESKTOP-SRVB84Z [] ['172.21.30.1', '192.168.46.1', '192.168.0.88']


Ethernet adapter vEthernet (Default Switch):

   IPv4 Address. . . . . . . . . . . : 172.21.30.1
   Subnet Mask . . . . . . . . . . . : 255.255.240.0
   Default Gateway . . . . . . . . . :

Ethernet adapter VirtualBox Host-Only Network:

   IPv4 Address. . . . . . . . . . . : 192.168.46.1
   Subnet Mask . . . . . . . . . . . : 255.255.255.0
   Default Gateway . . . . . . . . . :

Wireless LAN adapter Wi-Fi:

   IPv4 Address. . . . . . . . . . . : 192.168.0.88
   Subnet Mask . . . . . . . . . . . : 255.255.255.0
   Default Gateway . . . . . . . . . : 192.168.0.1


import socket

for port_num in range(1024):
    try:
        service = socket.getservbyport(port_num)
        print(port_num, service)
    except OSError:
        pass

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

addr = ("127.0.0.1", 8000)
sock.bind(addr)

sock.listen(0)
print(f"Server is listening on {addr[0]}:{addr[0]}")

connection_sock, client_address = sock.accept()
with connection_sock:
    print(f"Connection from {client_address}")

data = connection_sock.recv(16).decode()
print(f"Data: {data}")

sock.close()

addr = ("127.0.0.1", 8000)
sock.connect(addr)

sock.send("Hello".encode())
print("Message sent.")

sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    addr = ("127.0.0.1", 8000)
    sock.bind(addr)
    
    sock.listen(1)
    print(f"Server is listening on {addr[0]}:{addr[1]}")

    connection_sock, client_address = sock.accept()
    with connection_sock:
        print(f"Connection from {client_address}")
        
        data = connection_sock.recv(16).decode()
        print(f"Data: {data}")

import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    addr = ("127.0.0.1", 8000)
    sock.connect(addr)
    sock.send("Hello".encode())
    print("Message sent.")

sock.setblocking(0) # or sock.setblocking(False) or sock.settimeout(0.0) or sock.settimeout(1.0)
sock.setblocking(1) # or sock.setblocking(True) or sock.settimeout(None)

sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

import select

...
inputs = [server] # sockets from which we expect to read
outputs = [] # sockets to which we expect to write
timeout = 1

# Wait for at least one of the sockets to be ready for processing
readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout)
for s in readable:
    if s is server: # new client connect
        ...
    else:
        ... # handle client

import socket
import select
import struct

packer = struct.Struct('iic')
server_addr = ('127.0.0.1', 8000)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
    server.bind(server_addr)
    server.listen(1)
    print("Listening for new connections...")

    inputs = [server]
    timeout = 1
    
    try:
        while True:
            readable, writeable, exceptional = select.select(inputs, inputs, inputs, timeout)
            
            if not (readable or writeable or exceptional):
                continue
            
            for sock in readable:
                if sock is server: # new connection
                    client, client_addr = sock.accept()
                    inputs.append(client)
                    print("Connected: ", client_addr)
                else: # an existing connection is readable
                    data = sock.recv(packer.size)
                    if not data:
                        print("Logout: ", sock)
                        inputs.remove(sock)
                        sock.close()
                    else:
                        a, b, c = packer.unpack(data)
                        res = eval(str(a) + c.decode() + str(b))
                        sock.send(str(res).encode())
    except KeyboardInterrupt:
        print("Closing the server...")

import socket
import struct

server_addr = ('127.0.0.1', 8000)
packer = struct.Struct('iic')
    
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
    try:
        a = input("Give me a number: ")
        b = input("Give me an operator: ")
        c = input("Give me a number: ")
        
        if len(b) > 1:
            print("Operator should be only 1 character!")
            exit(1)

        if b not in "+-*/%":
            print(f"This is not an operator: {b}")
            exit(1)

        packed_data = packer.pack(int(a), int(c), b.encode())
        client.connect(server_addr)
        
        client.send(packed_data)
        data = client.recv(200)
        print("Result: ", data.decode())
    except ValueError:
        print("Please use numbers!")
    except KeyboardInterrupt:
        print("\nInterrupted")

import socket

server_addr = ('127.0.0.1', 8000)
    
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server:
    server.bind(server_addr)
    print("Waiting for any data...")
    data, addr = server.recvfrom(1024)
    print("Data:", data.decode())
    print("Address:", addr)

import socket

server_addr = ('127.0.0.1', 8000)
    
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as client:
    client.sendto("Hello".encode(), server_addr)
    print("Data sent.")

import socket

server_addr = ('127.0.0.1', 8000)

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server:
    with open('netNew.jpg', 'wb') as f:
        server.bind(server_addr)

        data, client_addr = server.recvfrom(200)
        while data:
            f.write(data)
            server.sendto('OK'.encode(), client_addr)
            data, client_addr = server.recvfrom(200)
        server.sendto('OK'.encode(), client_addr)

import socket

server_addr = ('127.0.0.1', 8000)

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as client:
    with open('net.jpg', 'rb') as f:
        data = f.read(200)
        while data:
            client.sendto(data, server_addr)
            client.recvfrom(4)
            data = f.read(200)
        client.sendto(''.encode(), server_addr)
        client.recvfrom(4)

import socket
import select

proxy_addr = ('localhost', 9000)
people_inf_elte_hu_addr = ('people.inf.elte.hu', 80)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as proxy:
    proxy.bind(proxy_addr)
    proxy.listen(1)
    print("Listening for new connections...")

    inputs = [proxy]
    timeout = 1

    try:
        while True:
            readable, writeable, exceptional = select.select(inputs, inputs, inputs, timeout)
            if not (readable or writeable or exceptional):
                continue
            for sock in readable:
                if sock is proxy: # new connection
                    client, client_addr = sock.accept()
                    inputs.append(client)
                    print("Connected: ", client_addr)
                else: # an existing connection is readable
                    data = sock.recv(65000)
                    if not data:
                        print("Logout: ", sock)
                        inputs.remove(sock)
                        sock.close()
                    else:
                        get_request = str(data.decode()).replace("GET /", "GET /pgm6rw/telkom").replace("localhost:9000", f"{people_inf_elte_hu_addr[0]}:{people_inf_elte_hu_addr[1]}").encode()
                        print(get_request)
                        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
                            server.connect(people_inf_elte_hu_addr)
                            server.send(get_request)
                            html = server.recv(65000)
                            sock.send(html)

    except KeyboardInterrupt:
        print("Closing the server...")

import binascii, zlib

test_string = "Fill this with something".encode('utf-8')
print(hex(binascii.crc32(bytearray(test_string))))
print(hex(zlib.crc32(test_string)))

import hashlib

test_string = "Fill this with something".encode('utf-8')
m = hashlib.md5()
m.update(test_string)
print(m.hexdigest())

import hashlib

test_string = "Fill this with something".encode('utf-8')
m = hashlib.sha1()
m.update(test_string)
print(m.hexdigest())

import socket
import struct

interface = "lo"

# Create a raw socket
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
sock.bind((interface, 0))

# Convert MAC addresses to binary format
src_mac = "00:0a:95:9d:68:16"
dst_mac = "ff:ff:ff:ff:ff:ff"
src_mac = bytes.fromhex(src_mac.replace(':', ''))
dst_mac = bytes.fromhex(dst_mac.replace(':', ''))

# Construct the Ethernet header
eth_header = struct.pack('!6s6sH', dst_mac, src_mac, 0x8100)

# Construct the VLAN header
vlan_id = 100
tci = (0 << 13) | (0 << 12) | vlan_id
vlan_header = struct.pack('!HH', tci, 0x0800)

# Construct the payload
payload = b'Hello'

# Combine the headers and payload into the complete frame
frame = eth_header + vlan_header + payload

# Send the frame
sock.send(frame)
print(f"Sent Ethernet frame with VLAN ID {vlan_id} on interface {interface}")

Feladatok