0ac3147b9b
This commit includes several significant changes: 1. Implemented SSL to encrypt network traffic for secure communication over the open internet. This includes the creation of SSL certificates and the configuration of the server to use these certificates. 2. Created a Dockerfile for containerizing the application. This includes setting up the base image, installing necessary dependencies, and defining the command to run the application. 3. Added Go code for handling pcap files. This includes defining the structure of pcap files and implementing functions for reading and writing these files. 4. Implemented Async fibers for handling multiple connections concurrently. This includes creating a new fiber for each connection and managing these fibers to ensure efficient use of resources. 5. Added functionality to detect if the bind address is a loopback address and replace it with the correct loopback address. This allows the server to run on a single machine for testing and development purposes. 6. Refactored the code to improve readability and maintainability. This includes breaking down complex functions into smaller, more manageable functions and improving the naming of variables and functions for clarity. 7. Updated the code to properly send requests to the destination service. This includes creating a new socket for each request and ensuring that the request is sent over the correct connection. 8. Added error handling to ensure that the server can recover gracefully from errors and continue to function correctly. 9. Created code for use in the ProtocolHandler 10. Create various empty dockerfiles for the other containerized services 11. Create various empty code files for the code to run in the other containerized services 12. Added a Gemfile for each Containerized Service for independant dependency management 13. Defined how the server should respond to NPEP queries 14. Roughed in the systems for using a FQDN & SSL secured internet for data transfer, or loopback for local connections 15. Setup system for requiring at least one Orchestrator to be connected manually (The "Primary") 16. Added documentation for the NPEP protocol 17. Rebase the file structure to keep things more organized
129 lines
3.5 KiB
Ruby
129 lines
3.5 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require 'socket'
|
|
require 'async'
|
|
require 'sequel'
|
|
require 'openssl'
|
|
|
|
# Set up the database
|
|
DB = Sequel.sqlite # In-memory database
|
|
processors = DB[:processors]
|
|
|
|
listen_ip = ENV['LISTEN_IP'] || '0.0.0.0'
|
|
listen_port = ENV['LISTEN_PORT'] || 3080
|
|
|
|
server = TCPServer.new(listen_ip, listen_port)
|
|
|
|
# This hash will store the connections to the consumers
|
|
connections = {}
|
|
|
|
def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
|
|
# If the IP address is set to "loopback", replace it with the actual loopback IP address
|
|
ip = '127.0.0.1' if ip.downcase == 'loopback'
|
|
|
|
if ip =~ Resolv::IPv4::Regex
|
|
# If the IP address is an IPv4 address, create an unencrypted socket
|
|
TCPSocket.new(ip, port)
|
|
else
|
|
# If the IP address is a domain name, create an SSL socket
|
|
ssl_context = OpenSSL::SSL::SSLContext.new
|
|
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
|
|
|
|
tcp_socket = TCPSocket.new(ip, port)
|
|
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
|
|
ssl_socket.sync_close = true
|
|
ssl_socket.connect
|
|
ssl_socket
|
|
end
|
|
end
|
|
|
|
Async do
|
|
loop do
|
|
Async::Task.new do
|
|
client = server.accept
|
|
|
|
begin
|
|
while (line = client.gets)
|
|
# Here we handle each line of input from the client
|
|
handle_input(line, connections)
|
|
end
|
|
ensure
|
|
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
|
|
client.close
|
|
Async::Task.current.stop # Stop the current task
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def handle_input(line, connections) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
|
|
# Split the line into command and args
|
|
command, *args = line.split
|
|
|
|
case command
|
|
when 'NEW_PROCESSOR'
|
|
# Handle new processor registration
|
|
id, domain, port = args
|
|
|
|
# Create a new connection to the processor
|
|
Async do
|
|
processor_connection = create_socket(domain, port)
|
|
|
|
# Store the connection in the hash
|
|
connections[id] = processor_connection
|
|
|
|
# Add the processor to the database
|
|
processors.insert(consumer_id: id, ip: domain, port: port)
|
|
end
|
|
|
|
when 'REQUEST'
|
|
# Handle request from orchestrator
|
|
consumer_id, request_type, pcap_chunk_id = args
|
|
|
|
# Get the connection for this consumer
|
|
connection = connections[consumer_id]
|
|
|
|
# Send the request to the consumer
|
|
connection.puts "REQUEST #{request_type} #{pcap_chunk_id}"
|
|
|
|
when 'UPSTATE'
|
|
# Handle upstate from consumer
|
|
consumer_id, state, pcap_chunk_id = args
|
|
|
|
# Do something with the state update...
|
|
# For now, we'll just print it out
|
|
puts "Consumer #{consumer_id} is now #{state} for chunk #{pcap_chunk_id}"
|
|
|
|
when 'FINISHED'
|
|
# Handle finished from consumer
|
|
consumer_id, pcap_chunk_id = args
|
|
|
|
# Do something with the finished message...
|
|
# For now, we'll just print it out
|
|
puts "Consumer #{consumer_id} has finished chunk #{pcap_chunk_id}"
|
|
|
|
when 'CHANGE'
|
|
# Handle change from orchestrator
|
|
consumer_id, state, urgency = args
|
|
|
|
# Get the connection for this consumer
|
|
connection = connections[consumer_id]
|
|
|
|
# Send the change to the consumer
|
|
connection.puts "CHANGE #{state} #{urgency}"
|
|
|
|
when 'SHUTDOWN'
|
|
# Handle shutdown from orchestrator
|
|
consumer_id, urgency = args
|
|
|
|
# Get the connection for this consumer
|
|
connection = connections[consumer_id]
|
|
|
|
# Send the shutdown to the consumer
|
|
connection.puts "SHUTDOWN #{urgency}"
|
|
|
|
else
|
|
puts "Unknown command: #{command}"
|
|
end
|
|
end
|