2023-07-30 21:04:30 -06:00
|
|
|
# frozen_string_literal: true
|
|
|
|
|
|
|
|
require 'socket'
|
|
|
|
require 'async'
|
|
|
|
require 'sequel'
|
|
|
|
require 'openssl'
|
2023-08-10 15:01:48 -06:00
|
|
|
require 'securerandom'
|
2023-07-30 21:04:30 -06:00
|
|
|
|
|
|
|
# Set up the database
|
|
|
|
DB = Sequel.sqlite # In-memory database
|
2023-08-10 15:01:48 -06:00
|
|
|
|
|
|
|
# Create processors table
|
|
|
|
DB.create_table :processors do
|
|
|
|
primary_key :proto_handler_id
|
|
|
|
String :uuid
|
|
|
|
String :domain
|
|
|
|
Integer :port
|
|
|
|
end
|
|
|
|
|
|
|
|
# Create orchestrators table
|
|
|
|
DB.create_table :orchestrators do
|
|
|
|
primary_key :id
|
|
|
|
String :domain
|
|
|
|
Integer :port
|
|
|
|
end
|
|
|
|
|
|
|
|
# Create blacklist table
|
|
|
|
DB.create_table :blacklist do
|
|
|
|
primary_key :id
|
|
|
|
String :uuid
|
|
|
|
end
|
|
|
|
|
2023-07-30 21:04:30 -06:00
|
|
|
processors = DB[:processors]
|
2023-08-10 15:01:48 -06:00
|
|
|
orchestrators = DB[:orchestrators]
|
|
|
|
blacklist = DB[:blacklist]
|
2023-07-30 21:04:30 -06:00
|
|
|
|
|
|
|
listen_ip = ENV['LISTEN_IP'] || '0.0.0.0'
|
|
|
|
listen_port = ENV['LISTEN_PORT'] || 3080
|
|
|
|
|
|
|
|
server = TCPServer.new(listen_ip, listen_port)
|
|
|
|
|
2023-08-10 15:01:48 -06:00
|
|
|
# This hash will store the recently connected UUIDs with their last validation timestamp
|
|
|
|
recently_connected = {}
|
2023-07-30 21:04:30 -06:00
|
|
|
|
|
|
|
def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
|
|
|
|
ip = '127.0.0.1' if ip.downcase == 'loopback'
|
|
|
|
if ip =~ Resolv::IPv4::Regex
|
|
|
|
TCPSocket.new(ip, port)
|
|
|
|
else
|
|
|
|
ssl_context = OpenSSL::SSL::SSLContext.new
|
|
|
|
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
|
2023-08-10 15:37:51 -06:00
|
|
|
# Add certificate and key configuration here if needed
|
2023-07-30 21:04:30 -06:00
|
|
|
tcp_socket = TCPSocket.new(ip, port)
|
|
|
|
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
|
|
|
|
ssl_socket.sync_close = true
|
|
|
|
ssl_socket.connect
|
|
|
|
ssl_socket
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2023-08-10 15:37:51 -06:00
|
|
|
def handle_input(line, processors, orchestrators, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
|
2023-08-10 15:01:48 -06:00
|
|
|
command, *args = line.split
|
|
|
|
uuid = args.shift if command != 'REGISTER'
|
2023-07-30 21:04:30 -06:00
|
|
|
|
2023-08-10 15:01:48 -06:00
|
|
|
# Check if the UUID is blacklisted
|
2023-08-10 15:37:51 -06:00
|
|
|
return 'TERMINATE' if blacklist.where(uuid:).count.positive?
|
2023-07-30 21:04:30 -06:00
|
|
|
|
2023-08-10 15:01:48 -06:00
|
|
|
# Check if the UUID is in the recently connected cache
|
|
|
|
if recently_connected[uuid] && Time.now - recently_connected[uuid] < 120
|
|
|
|
# UUID is recently connected and within 2 minutes, no need to re-validate
|
2023-08-10 15:37:51 -06:00
|
|
|
elsif command != 'REGISTER' && (uuid.nil? || processors.where(uuid:).count.zero?)
|
2023-08-10 15:01:48 -06:00
|
|
|
# UUID is not recently connected or is invalid, add to blacklist
|
|
|
|
blacklist.insert(uuid:)
|
2023-08-10 15:37:51 -06:00
|
|
|
return 'ERROR Unrecognized UUID'
|
2023-08-10 15:01:48 -06:00
|
|
|
else
|
|
|
|
# UUID is valid, update the recently connected cache
|
|
|
|
recently_connected[uuid] = Time.now
|
|
|
|
end
|
2023-07-30 21:04:30 -06:00
|
|
|
|
2023-08-10 15:37:51 -06:00
|
|
|
handle_command(command, args, processors, orchestrators)
|
|
|
|
end
|
2023-07-30 21:04:30 -06:00
|
|
|
|
2023-08-10 15:37:51 -06:00
|
|
|
def handle_command(command, args, processors, orchestrators) # rubocop:disable Metrics/MethodLength
|
|
|
|
if command.start_with?('ORCHESTRATOR')
|
|
|
|
orchestrator_command = command.split('_', 2).last
|
|
|
|
orchestrator = orchestrators.first # Assuming you want to pass to the first available orchestrator
|
|
|
|
case orchestrator_command
|
|
|
|
when 'REQUEST', 'UPSTATE', 'FINISHED'
|
|
|
|
orchestrator.puts "#{orchestrator_command} #{args.join(' ')}"
|
|
|
|
else
|
|
|
|
puts "Unknown orchestrator command: #{orchestrator_command}"
|
|
|
|
end
|
2023-07-30 21:04:30 -06:00
|
|
|
else
|
2023-08-10 15:37:51 -06:00
|
|
|
consumer = processors.where(uuid: args.last).first # Assuming UUID is the last argument for CHANGE and SHUTDOWN
|
|
|
|
case command
|
|
|
|
when 'CHANGE', 'SHUTDOWN'
|
|
|
|
consumer.puts "#{command} #{args.join(' ')}"
|
|
|
|
else
|
|
|
|
puts "Unknown command: #{command}"
|
|
|
|
end
|
2023-07-30 21:04:30 -06:00
|
|
|
end
|
|
|
|
end
|
2023-08-10 15:01:48 -06:00
|
|
|
|
|
|
|
def register_orchestrator(line, orchestrators)
|
2023-08-10 15:37:51 -06:00
|
|
|
_, domain, port = line.split
|
2023-08-10 15:01:48 -06:00
|
|
|
id = orchestrators.max(:id).to_i + 1
|
|
|
|
orchestrators.insert(id:, domain:, port:)
|
|
|
|
puts "Orchestrator registered with domain: #{domain}, port: #{port}"
|
|
|
|
end
|
|
|
|
|
|
|
|
Async do
|
|
|
|
loop do
|
|
|
|
Async::Task.new do
|
|
|
|
client = server.accept
|
|
|
|
|
|
|
|
begin
|
|
|
|
line = client.gets
|
|
|
|
# Determine if the connection is from an orchestrator for registration
|
2023-08-10 15:37:51 -06:00
|
|
|
if is_orchestrator_registration?(line)
|
2023-08-10 15:01:48 -06:00
|
|
|
register_orchestrator(line, orchestrators)
|
|
|
|
else
|
|
|
|
# Here we handle each line of input from the client
|
2023-08-10 15:37:51 -06:00
|
|
|
handle_input(line, processors, blacklist, recently_connected)
|
2023-08-10 15:01:48 -06:00
|
|
|
end
|
|
|
|
ensure
|
|
|
|
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
|
|
|
|
client.close
|
|
|
|
Async::Task.current.stop
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|