113270 Views
93709 Views
85773 Views
53388 Views
50571 Views
48758 Views
Operation Pico
Raspberry Pi Home Hub
Hacky Temperature and Humidity Sensor
Robot Makers Almanac
High Five Bot
Making a Custom PCB for BurgerBot
Using the Raspberry Pi Pico's Built-in Temperature Sensor
Getting Started with SQL
Introduction to the Linux Command Line on Raspberry Pi OS
How to install MicroPython
Wall Drawing Robot Tutorial
BrachioGraph Tutorial
KevsRobots Learning Platform
100% Percent Complete
By Kevin McAleer, 4 Minutes
DisPy must be running on the Cluster nodes you wish to use.
I have created a simple docker-compose file to enable a quick setup. Click here to download the docker-compose & associated files: https://github.com/kevinmcaleer/ClusteredPi/tree/main/stacks/octapi.
The easiest way to setup DisPy is:
Change to the octapi folder:
cd ClusteredPi/stacks/octapi
Bring up the container:
docker-compose up -d
The docker image will be created and the container started with this single command.
To stop the container, simply type:
docker-compose down
Run 02_bruteforce_octapi.py to decrypt the message much quicker than running it stand-alone:
02_bruteforce_octapi.py
import dispy rotors = [ "I II III", "I II IV", "I II V", "I III II", "I III IV", "I III V", "I IV II", "I IV III", "I IV V", "I V II", "I V III", "I V IV", "II I III", "II I IV", "II I V", "II III I", "II III IV", "II III V", "II IV I", "II IV III", "II IV V", "II V I", "II V III", "II V IV", "III I II", "III I IV", "III I V", "III II I", "III II IV", "III II V", "III IV I", "III IV II", "III IV V", "IV I II", "IV I III", "IV I V", "IV II I", "IV II III", "IV I V", "IV II I", "IV II III", "IV II V", "IV III I", "IV III II", "IV III V", "IV V I", "IV V II", "IV V III", "V I II", "V I III", "V I IV", "V II I", "V II III", "V II IV", "V III I", "V III II", "V III IV", "V IV I", "V IV II", "V IV III" ] cribtext = "ROBOT" text = "SUPSH GPYCV JTPYF TDQWV HIBEW FPDBN TAUEL IQXMS ZBDCT" text = text.replace(" ","") ciphertext = text ring_choice = "1 1 1" def decrypt_message(rotor_choice, start_pos, ciphertext): from enigma.machine import EnigmaMachine machine = EnigmaMachine.from_key_sheet( rotors=rotor_choice, reflector='B', ring_settings=ring_choice, plugboard_settings='AV BS CG DL FU HZ IN KM OW RX', ) machine.set_display(start_pos) plaintext = machine.process_text(ciphertext).replace("X"," ") # print(plaintext) return (plaintext) def find_rotor_start(rotor_choice:str, ring_choice:str, cipher_text:str, crib_text:str): from enigma.machine import EnigmaMachine alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" machine = EnigmaMachine.from_key_sheet( rotors=rotor_choice, reflector='B', ring_settings=ring_choice, plugboard_settings='AV BS CG DL FU HZ IN KM OW RX', ) for rotor1 in alphabet: for rotor2 in alphabet: for rotor3 in alphabet: # Generate a possible start position start_pos = rotor1 + rotor2 + rotor3 # Set the start position machine.set_display(start_pos) # Attempt to decrypt the plaintext plaintext = machine.process_text(cipher_text) print(plaintext) # if plaintext == crib_text: if crib_text in plaintext: print("Valid settings found!") print(rotor_choice, start_pos) return rotor_choice, ring_choice, start_pos # If no valid settings are found, return None return rotor_choice, ring_choice, "Cannot find settings" # nodes = ['192.168.2.2', '192.168.2.1', '192.168.2.4', '192.168.2.3'] nodes = ['192.168.2.*','192.168.1.*'] cluster = dispy.JobCluster(find_rotor_start, nodes=nodes, loglevel=dispy.logger.DEBUG) print(f" cluster status {cluster.status()}") jobs = [] id = 1 # Submit the jobs for this ring choice for rotor_choice in rotors: job = cluster.submit(rotor_choice, ring_choice, ciphertext, cribtext) job.id = id # Associate an ID to the job jobs.append(job) id += 1 # Next job print( "Waiting..." ) cluster.wait() print( "Collecting job results" ) # Collect and check through the jobs for this ring setting found = False for job in jobs: # Wait for job to finish and return results if job.status == dispy.DispyJob.Finished: rotor_setting, ring_setting, start_pos = job() else: print(f"Job {job.id} did not finish, status is {job.status}") if job.exception: print(f"Job {job.id} failed with exception {job.exception}") if job.status == dispy.DispyJob.Finished: rotor_setting, ring_setting, start_pos = job() # If a start position was found if start_pos != "Cannot find settings": found = True message = decrypt_message(rotor_setting, start_pos, ciphertext) print(f"Rotors {rotor_setting}, ring {ring_setting}, message key was {start_pos}, using crib {cribtext}, message: {message}") if found == False: print( 'Attack unsuccessful' ) cluster.print_status() cluster.close()
< Previous