Beginning on the meat of the program #4
0
docker/netrave-consumer/Gemfile
Normal file
0
docker/netrave-consumer/Gemfile
Normal file
0
docker/netrave-consumer/netrave-consumer.dockerfile
Normal file
0
docker/netrave-consumer/netrave-consumer.dockerfile
Normal file
0
docker/netrave-consumer/netrave_consumer.rb
Normal file
0
docker/netrave-consumer/netrave_consumer.rb
Normal file
0
docker/netrave-orchestrator/Gemfile
Normal file
0
docker/netrave-orchestrator/Gemfile
Normal file
0
docker/netrave-orchestrator/netrave_orchestrator.rb
Normal file
0
docker/netrave-orchestrator/netrave_orchestrator.rb
Normal file
6
docker/netrave-protohandler/Gemfile
Normal file
6
docker/netrave-protohandler/Gemfile
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
source 'https://rubygems.org'
|
||||||
|
|
||||||
|
gem 'async'
|
||||||
|
gem 'sequel'
|
20
docker/netrave-protohandler/netrave-protohandler.dockerfile
Normal file
20
docker/netrave-protohandler/netrave-protohandler.dockerfile
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
# Dockerfile
|
||||||
|
FROM ruby:3.2.2
|
||||||
|
|
||||||
|
# Set environment variables for IP and Port
|
||||||
|
ENV LISTEN_IP=0.0.0.0
|
||||||
|
ENV LISTEN_PORT=3080
|
||||||
|
ENV ORCHESTRATOR_DOMAIN=orchestrator_domain
|
||||||
|
ENV ORCHESTRATOR_PORT=orchestrator_port
|
||||||
|
|
||||||
|
# Set the working directory in the container
|
||||||
|
WORKDIR /netrave-protohandler
|
||||||
|
|
||||||
|
# Copy the current directory contents into the container at /netrave-protohandler
|
||||||
|
COPY . /netrave-protohandler
|
||||||
|
|
||||||
|
# Install any needed packages specified in Gemfile
|
||||||
|
RUN bundle install
|
||||||
|
|
||||||
|
# Run server.rb when the container launches
|
||||||
|
CMD ["ruby", "netrave-protohandler.rb"]
|
128
docker/netrave-protohandler/netrave_protohandler.rb
Normal file
128
docker/netrave-protohandler/netrave_protohandler.rb
Normal file
|
@ -0,0 +1,128 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require 'socket'
|
||||||
|
require 'async'
|
||||||
|
require 'sequel'
|
||||||
|
require 'openssl'
|
||||||
|
|
||||||
|
# Set up the database
|
||||||
|
DB = Sequel.sqlite # In-memory database
|
||||||
|
processors = DB[:processors]
|
||||||
|
|
||||||
|
listen_ip = ENV['LISTEN_IP'] || '0.0.0.0'
|
||||||
|
listen_port = ENV['LISTEN_PORT'] || 3080
|
||||||
|
|
||||||
|
server = TCPServer.new(listen_ip, listen_port)
|
||||||
|
|
||||||
|
# This hash will store the connections to the consumers
|
||||||
|
connections = {}
|
||||||
|
|
||||||
|
def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
|
||||||
|
# If the IP address is set to "loopback", replace it with the actual loopback IP address
|
||||||
|
ip = '127.0.0.1' if ip.downcase == 'loopback'
|
||||||
|
|
||||||
|
if ip =~ Resolv::IPv4::Regex
|
||||||
|
# If the IP address is an IPv4 address, create an unencrypted socket
|
||||||
|
TCPSocket.new(ip, port)
|
||||||
|
else
|
||||||
|
# If the IP address is a domain name, create an SSL socket
|
||||||
|
ssl_context = OpenSSL::SSL::SSLContext.new
|
||||||
|
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
|
||||||
|
|
||||||
|
tcp_socket = TCPSocket.new(ip, port)
|
||||||
|
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
|
||||||
|
ssl_socket.sync_close = true
|
||||||
|
ssl_socket.connect
|
||||||
|
ssl_socket
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
Async do
|
||||||
|
loop do
|
||||||
|
Async::Task.new do
|
||||||
|
client = server.accept
|
||||||
|
|
||||||
|
begin
|
||||||
|
while (line = client.gets)
|
||||||
|
# Here we handle each line of input from the client
|
||||||
|
handle_input(line, connections)
|
||||||
|
end
|
||||||
|
ensure
|
||||||
|
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
|
||||||
|
client.close
|
||||||
|
Async::Task.current.stop # Stop the current task
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_input(line, connections) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
|
||||||
|
# Split the line into command and args
|
||||||
|
command, *args = line.split
|
||||||
|
|
||||||
|
case command
|
||||||
|
when 'NEW_PROCESSOR'
|
||||||
|
# Handle new processor registration
|
||||||
|
id, domain, port = args
|
||||||
|
|
||||||
|
# Create a new connection to the processor
|
||||||
|
Async do
|
||||||
|
processor_connection = create_socket(domain, port)
|
||||||
|
|
||||||
|
# Store the connection in the hash
|
||||||
|
connections[id] = processor_connection
|
||||||
|
|
||||||
|
# Add the processor to the database
|
||||||
|
processors.insert(consumer_id: id, ip: domain, port: port)
|
||||||
|
end
|
||||||
|
|
||||||
|
when 'REQUEST'
|
||||||
|
# Handle request from orchestrator
|
||||||
|
consumer_id, request_type, pcap_chunk_id = args
|
||||||
|
|
||||||
|
# Get the connection for this consumer
|
||||||
|
connection = connections[consumer_id]
|
||||||
|
|
||||||
|
# Send the request to the consumer
|
||||||
|
connection.puts "REQUEST #{request_type} #{pcap_chunk_id}"
|
||||||
|
|
||||||
|
when 'UPSTATE'
|
||||||
|
# Handle upstate from consumer
|
||||||
|
consumer_id, state, pcap_chunk_id = args
|
||||||
|
|
||||||
|
# Do something with the state update...
|
||||||
|
# For now, we'll just print it out
|
||||||
|
puts "Consumer #{consumer_id} is now #{state} for chunk #{pcap_chunk_id}"
|
||||||
|
|
||||||
|
when 'FINISHED'
|
||||||
|
# Handle finished from consumer
|
||||||
|
consumer_id, pcap_chunk_id = args
|
||||||
|
|
||||||
|
# Do something with the finished message...
|
||||||
|
# For now, we'll just print it out
|
||||||
|
puts "Consumer #{consumer_id} has finished chunk #{pcap_chunk_id}"
|
||||||
|
|
||||||
|
when 'CHANGE'
|
||||||
|
# Handle change from orchestrator
|
||||||
|
consumer_id, state, urgency = args
|
||||||
|
|
||||||
|
# Get the connection for this consumer
|
||||||
|
connection = connections[consumer_id]
|
||||||
|
|
||||||
|
# Send the change to the consumer
|
||||||
|
connection.puts "CHANGE #{state} #{urgency}"
|
||||||
|
|
||||||
|
when 'SHUTDOWN'
|
||||||
|
# Handle shutdown from orchestrator
|
||||||
|
consumer_id, urgency = args
|
||||||
|
|
||||||
|
# Get the connection for this consumer
|
||||||
|
connection = connections[consumer_id]
|
||||||
|
|
||||||
|
# Send the shutdown to the consumer
|
||||||
|
connection.puts "SHUTDOWN #{urgency}"
|
||||||
|
|
||||||
|
else
|
||||||
|
puts "Unknown command: #{command}"
|
||||||
|
end
|
||||||
|
end
|
|
@ -1,47 +0,0 @@
|
||||||
# frozen_string_literal: true
|
|
||||||
|
|
||||||
require 'pcaprub'
|
|
||||||
require 'socket'
|
|
||||||
require_relative 'databasemanager'
|
|
||||||
require_relative 'logg_man'
|
|
||||||
require_relative 'redis_queue'
|
|
||||||
|
|
||||||
# Class used to capture packets and not much else
|
|
||||||
class PacketCapture
|
|
||||||
INTERFACE_NAME = 'netrave0'
|
|
||||||
|
|
||||||
def initialize(queue, logger)
|
|
||||||
@loggman = logger
|
|
||||||
@loggman.log_info("Initializing packet capture for #{INTERFACE_NAME}...")
|
|
||||||
@capture = Pcap.open_live(INTERFACE_NAME, 65_535, true, 1)
|
|
||||||
@capture.setfilter('')
|
|
||||||
@loggman.log_info('Packet capture initialized successfully!')
|
|
||||||
@queue = queue
|
|
||||||
end
|
|
||||||
|
|
||||||
def start_capture_loop # rubocop:disable Metrics/MethodLength
|
|
||||||
@loggman.log_info("Starting packet capture loop for #{@interface}...")
|
|
||||||
packet_count = 0
|
|
||||||
begin
|
|
||||||
@loggman.log_info("Packet capture loop started for #{@interface}...")
|
|
||||||
@capture.each_packet do |packet|
|
|
||||||
# Add packet to queue
|
|
||||||
@queue.push(packet)
|
|
||||||
@loggman.log_info("Packet #{packet_count += 1} added to queue.")
|
|
||||||
end
|
|
||||||
rescue StopIteration
|
|
||||||
@loggman.log_warn("Packet capture loop stopped for #{@interface}.")
|
|
||||||
rescue StandardError => e
|
|
||||||
@loggman.log_fatal("Packet capture loop stopped for #{@interface}: #{e.message}\n#{e.backtrace}", false)
|
|
||||||
sleep 1
|
|
||||||
retry
|
|
||||||
ensure
|
|
||||||
@capture.close
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def stop_capture
|
|
||||||
@loggman.log_warn("Stopping packet capture loop for #{@interface}...")
|
|
||||||
@stop_flag = true
|
|
||||||
end
|
|
||||||
end
|
|
Loading…
Reference in New Issue
Block a user