diff --git a/docker/netrave-consumer/Gemfile b/docker/netrave-consumer/Gemfile new file mode 100644 index 0000000..e69de29 diff --git a/docker/netrave-consumer/netrave-consumer.dockerfile b/docker/netrave-consumer/netrave-consumer.dockerfile new file mode 100644 index 0000000..e69de29 diff --git a/docker/netrave-consumer/netrave_consumer.rb b/docker/netrave-consumer/netrave_consumer.rb new file mode 100644 index 0000000..e69de29 diff --git a/docker/netrave-orchestrator/Gemfile b/docker/netrave-orchestrator/Gemfile new file mode 100644 index 0000000..e69de29 diff --git a/docker/netrave-orchestrator/netrave-orchestrator.dockerfile b/docker/netrave-orchestrator/netrave-orchestrator.dockerfile new file mode 100644 index 0000000..e69de29 diff --git a/docker/netrave-orchestrator/netrave_orchestrator.rb b/docker/netrave-orchestrator/netrave_orchestrator.rb new file mode 100644 index 0000000..e69de29 diff --git a/docker/netrave-protohandler/Gemfile b/docker/netrave-protohandler/Gemfile new file mode 100644 index 0000000..75b89af --- /dev/null +++ b/docker/netrave-protohandler/Gemfile @@ -0,0 +1,6 @@ +# frozen_string_literal: true + +source 'https://rubygems.org' + +gem 'async' +gem 'sequel' diff --git a/docker/netrave-protohandler/netrave-protohandler.dockerfile b/docker/netrave-protohandler/netrave-protohandler.dockerfile new file mode 100644 index 0000000..bf15e95 --- /dev/null +++ b/docker/netrave-protohandler/netrave-protohandler.dockerfile @@ -0,0 +1,20 @@ +# Dockerfile +FROM ruby:3.2.2 + +# Set environment variables for IP and Port +ENV LISTEN_IP=0.0.0.0 +ENV LISTEN_PORT=3080 +ENV ORCHESTRATOR_DOMAIN=orchestrator_domain +ENV ORCHESTRATOR_PORT=orchestrator_port + +# Set the working directory in the container +WORKDIR /netrave-protohandler + +# Copy the current directory contents into the container at /netrave-protohandler +COPY . /netrave-protohandler + +# Install any needed packages specified in Gemfile +RUN bundle install + +# Run server.rb when the container launches +CMD ["ruby", "netrave-protohandler.rb"] diff --git a/docker/netrave-protohandler/netrave_protohandler.rb b/docker/netrave-protohandler/netrave_protohandler.rb new file mode 100644 index 0000000..82b217e --- /dev/null +++ b/docker/netrave-protohandler/netrave_protohandler.rb @@ -0,0 +1,128 @@ +# frozen_string_literal: true + +require 'socket' +require 'async' +require 'sequel' +require 'openssl' + +# Set up the database +DB = Sequel.sqlite # In-memory database +processors = DB[:processors] + +listen_ip = ENV['LISTEN_IP'] || '0.0.0.0' +listen_port = ENV['LISTEN_PORT'] || 3080 + +server = TCPServer.new(listen_ip, listen_port) + +# This hash will store the connections to the consumers +connections = {} + +def create_socket(ip, port) # rubocop:disable Metrics/MethodLength + # If the IP address is set to "loopback", replace it with the actual loopback IP address + ip = '127.0.0.1' if ip.downcase == 'loopback' + + if ip =~ Resolv::IPv4::Regex + # If the IP address is an IPv4 address, create an unencrypted socket + TCPSocket.new(ip, port) + else + # If the IP address is a domain name, create an SSL socket + ssl_context = OpenSSL::SSL::SSLContext.new + ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER + + tcp_socket = TCPSocket.new(ip, port) + ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) + ssl_socket.sync_close = true + ssl_socket.connect + ssl_socket + end +end + +Async do + loop do + Async::Task.new do + client = server.accept + + begin + while (line = client.gets) + # Here we handle each line of input from the client + handle_input(line, connections) + end + ensure + # This code will be executed when the fiber is finished, regardless of whether an exception was raised + client.close + Async::Task.current.stop # Stop the current task + end + end + end +end + +def handle_input(line, connections) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + # Split the line into command and args + command, *args = line.split + + case command + when 'NEW_PROCESSOR' + # Handle new processor registration + id, domain, port = args + + # Create a new connection to the processor + Async do + processor_connection = create_socket(domain, port) + + # Store the connection in the hash + connections[id] = processor_connection + + # Add the processor to the database + processors.insert(consumer_id: id, ip: domain, port: port) + end + + when 'REQUEST' + # Handle request from orchestrator + consumer_id, request_type, pcap_chunk_id = args + + # Get the connection for this consumer + connection = connections[consumer_id] + + # Send the request to the consumer + connection.puts "REQUEST #{request_type} #{pcap_chunk_id}" + + when 'UPSTATE' + # Handle upstate from consumer + consumer_id, state, pcap_chunk_id = args + + # Do something with the state update... + # For now, we'll just print it out + puts "Consumer #{consumer_id} is now #{state} for chunk #{pcap_chunk_id}" + + when 'FINISHED' + # Handle finished from consumer + consumer_id, pcap_chunk_id = args + + # Do something with the finished message... + # For now, we'll just print it out + puts "Consumer #{consumer_id} has finished chunk #{pcap_chunk_id}" + + when 'CHANGE' + # Handle change from orchestrator + consumer_id, state, urgency = args + + # Get the connection for this consumer + connection = connections[consumer_id] + + # Send the change to the consumer + connection.puts "CHANGE #{state} #{urgency}" + + when 'SHUTDOWN' + # Handle shutdown from orchestrator + consumer_id, urgency = args + + # Get the connection for this consumer + connection = connections[consumer_id] + + # Send the shutdown to the consumer + connection.puts "SHUTDOWN #{urgency}" + + else + puts "Unknown command: #{command}" + end +end diff --git a/lib/utils/packet_capture.rb b/lib/utils/packet_capture.rb deleted file mode 100644 index 413eb82..0000000 --- a/lib/utils/packet_capture.rb +++ /dev/null @@ -1,47 +0,0 @@ -# frozen_string_literal: true - -require 'pcaprub' -require 'socket' -require_relative 'databasemanager' -require_relative 'logg_man' -require_relative 'redis_queue' - -# Class used to capture packets and not much else -class PacketCapture - INTERFACE_NAME = 'netrave0' - - def initialize(queue, logger) - @loggman = logger - @loggman.log_info("Initializing packet capture for #{INTERFACE_NAME}...") - @capture = Pcap.open_live(INTERFACE_NAME, 65_535, true, 1) - @capture.setfilter('') - @loggman.log_info('Packet capture initialized successfully!') - @queue = queue - end - - def start_capture_loop # rubocop:disable Metrics/MethodLength - @loggman.log_info("Starting packet capture loop for #{@interface}...") - packet_count = 0 - begin - @loggman.log_info("Packet capture loop started for #{@interface}...") - @capture.each_packet do |packet| - # Add packet to queue - @queue.push(packet) - @loggman.log_info("Packet #{packet_count += 1} added to queue.") - end - rescue StopIteration - @loggman.log_warn("Packet capture loop stopped for #{@interface}.") - rescue StandardError => e - @loggman.log_fatal("Packet capture loop stopped for #{@interface}: #{e.message}\n#{e.backtrace}", false) - sleep 1 - retry - ensure - @capture.close - end - end - - def stop_capture - @loggman.log_warn("Stopping packet capture loop for #{@interface}...") - @stop_flag = true - end -end