140860e1ea
Fix the proto_handler to separate request handling from input handling Make the proto_handler just pass messages between. We don't need to actually process anything Add some logic to determine if the request is coming from the orchestrator or consumer Some other various proto_handler fixes
133 lines
4.0 KiB
Ruby
133 lines
4.0 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require 'socket'
|
|
require 'async'
|
|
require 'sequel'
|
|
require 'openssl'
|
|
require 'securerandom'
|
|
|
|
# Set up the database
|
|
DB = Sequel.sqlite # In-memory database
|
|
|
|
# Create processors table
|
|
DB.create_table :processors do
|
|
primary_key :proto_handler_id
|
|
String :uuid
|
|
String :domain
|
|
Integer :port
|
|
end
|
|
|
|
# Create orchestrators table
|
|
DB.create_table :orchestrators do
|
|
primary_key :id
|
|
String :domain
|
|
Integer :port
|
|
end
|
|
|
|
# Create blacklist table
|
|
DB.create_table :blacklist do
|
|
primary_key :id
|
|
String :uuid
|
|
end
|
|
|
|
processors = DB[:processors]
|
|
orchestrators = DB[:orchestrators]
|
|
blacklist = DB[:blacklist]
|
|
|
|
listen_ip = ENV['LISTEN_IP'] || '0.0.0.0'
|
|
listen_port = ENV['LISTEN_PORT'] || 3080
|
|
|
|
server = TCPServer.new(listen_ip, listen_port)
|
|
|
|
# This hash will store the recently connected UUIDs with their last validation timestamp
|
|
recently_connected = {}
|
|
|
|
def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
|
|
ip = '127.0.0.1' if ip.downcase == 'loopback'
|
|
if ip =~ Resolv::IPv4::Regex
|
|
TCPSocket.new(ip, port)
|
|
else
|
|
ssl_context = OpenSSL::SSL::SSLContext.new
|
|
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
|
|
# Add certificate and key configuration here if needed
|
|
tcp_socket = TCPSocket.new(ip, port)
|
|
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
|
|
ssl_socket.sync_close = true
|
|
ssl_socket.connect
|
|
ssl_socket
|
|
end
|
|
end
|
|
|
|
def handle_input(line, processors, orchestrators, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
|
|
command, *args = line.split
|
|
uuid = args.shift if command != 'REGISTER'
|
|
|
|
# Check if the UUID is blacklisted
|
|
return 'TERMINATE' if blacklist.where(uuid:).count.positive?
|
|
|
|
# Check if the UUID is in the recently connected cache
|
|
if recently_connected[uuid] && Time.now - recently_connected[uuid] < 120
|
|
# UUID is recently connected and within 2 minutes, no need to re-validate
|
|
elsif command != 'REGISTER' && (uuid.nil? || processors.where(uuid:).count.zero?)
|
|
# UUID is not recently connected or is invalid, add to blacklist
|
|
blacklist.insert(uuid:)
|
|
return 'ERROR Unrecognized UUID'
|
|
else
|
|
# UUID is valid, update the recently connected cache
|
|
recently_connected[uuid] = Time.now
|
|
end
|
|
|
|
handle_command(command, args, processors, orchestrators)
|
|
end
|
|
|
|
def handle_command(command, args, processors, orchestrators) # rubocop:disable Metrics/MethodLength
|
|
if command.start_with?('ORCHESTRATOR')
|
|
orchestrator_command = command.split('_', 2).last
|
|
orchestrator = orchestrators.first # Assuming you want to pass to the first available orchestrator
|
|
case orchestrator_command
|
|
when 'REQUEST', 'UPSTATE', 'FINISHED'
|
|
orchestrator.puts "#{orchestrator_command} #{args.join(' ')}"
|
|
else
|
|
puts "Unknown orchestrator command: #{orchestrator_command}"
|
|
end
|
|
else
|
|
consumer = processors.where(uuid: args.last).first # Assuming UUID is the last argument for CHANGE and SHUTDOWN
|
|
case command
|
|
when 'CHANGE', 'SHUTDOWN'
|
|
consumer.puts "#{command} #{args.join(' ')}"
|
|
else
|
|
puts "Unknown command: #{command}"
|
|
end
|
|
end
|
|
end
|
|
|
|
def register_orchestrator(line, orchestrators)
|
|
_, domain, port = line.split
|
|
id = orchestrators.max(:id).to_i + 1
|
|
orchestrators.insert(id:, domain:, port:)
|
|
puts "Orchestrator registered with domain: #{domain}, port: #{port}"
|
|
end
|
|
|
|
Async do
|
|
loop do
|
|
Async::Task.new do
|
|
client = server.accept
|
|
|
|
begin
|
|
line = client.gets
|
|
# Determine if the connection is from an orchestrator for registration
|
|
if is_orchestrator_registration?(line)
|
|
register_orchestrator(line, orchestrators)
|
|
else
|
|
# Here we handle each line of input from the client
|
|
handle_input(line, processors, blacklist, recently_connected)
|
|
end
|
|
ensure
|
|
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
|
|
client.close
|
|
Async::Task.current.stop
|
|
end
|
|
end
|
|
end
|
|
end
|