Websocket.lua
require("luasql.exasol.luws")
local ExaError = require("ExaError")
local log = require("remotelog")
local websocket_datahandler = require("luasql.exasol.WebsocketDatahandler")
local Websocket = {}
local CONNECT_RETRY_COUNT<const> = 3
local RECEIVE_TIMEOUT_SECONDS<const> = 5
local function create()
local object = {data_handler = websocket_datahandler:create()}
object.closed = false
Websocket.__index = Websocket
setmetatable(object, Websocket)
return object
end
local function recoverable_connection_error(err)
return string.match(err, ".*failed: connection refused$")
end
local function connect_with_retry(url, websocket_options, remaining_retries)
log.trace("Connecting to websocket url %s with %d remaining retries", url, remaining_retries)
local connection = create()
local websocket, err = wsopen(url, function(conn, opcode, message)
connection.data_handler:handle_data(conn, opcode, message)
end, websocket_options)
if err ~= nil then
wsclose(websocket)
if remaining_retries <= 0 or not recoverable_connection_error(err) then
ExaError:new("E-EDL-1", "Error connecting to {{url}}: {{error}}", {url = url, error = err}):raise()
else
remaining_retries = remaining_retries - 1
log.warn(tostring(ExaError:new("W-EDL-15", "Websocket connection to {{url}} failed with error {{error}}, "
.. "remaining retries: {{remaining_retries}}",
{url = url, error = err, remaining_retries = remaining_retries})))
return connect_with_retry(url, websocket_options, remaining_retries)
end
end
log.trace("Connected to websocket with result %s", websocket)
connection.websocket = websocket
return connection
end
function Websocket.connect(url, connection_properties)
local websocket_options = {
receive_timeout = RECEIVE_TIMEOUT_SECONDS,
ssl_protocol = connection_properties:get_tls_protocol(),
ssl_verify = connection_properties:get_tls_verify(),
ssl_options = connection_properties:get_tls_options()
}
log.debug("Connecting to '%s' with %d retries", url, CONNECT_RETRY_COUNT)
return connect_with_retry(url, websocket_options, CONNECT_RETRY_COUNT)
end
function Websocket:_wait_for_response(timeout_seconds)
log.trace("Waiting %ds for response", timeout_seconds)
local start<const> = os.clock()
local try_count = 0
while true do
local result, err = wsreceive(self.websocket)
if type(err) == "string" then
local wrapped_error = ExaError:new("E-EDL-4", "Error receiving data while waiting for response "
.. "for {{waiting_time}}s: {{error}}",
{error = err, waiting_time = os.clock() - start})
wrapped_error.cause = err
log.error(tostring(wrapped_error))
return wrapped_error
end
local total_wait_time_seconds = os.clock() - start
if self.data_handler:has_received_data() then
log.trace("Received result %s after %fs and %d tries", result, total_wait_time_seconds, try_count)
return nil
end
if total_wait_time_seconds >= timeout_seconds then
return ExaError:new("E-EDL-18",
"Timeout after {{waiting_time}}s and {{try_count}} tries waiting for data, "
.. " last result: {{result}}, last error: {{error}}", {
waiting_time = total_wait_time_seconds,
try_count = try_count,
result = result,
error = err
})
end
try_count = try_count + 1
end
end
function Websocket:send_raw(payload, ignore_response)
if not ignore_response then
self.data_handler:expect_data()
end
local _, err = wssend(self.websocket, 1, payload)
if err == nil then
if ignore_response then
log.trace("Ignore response after sending payload '%s'", payload)
return nil, nil
end
err = self:_wait_for_response(RECEIVE_TIMEOUT_SECONDS)
self.data_handler:expected_data_received()
if err then
return nil, err
else
return self.data_handler:get_data(), nil
end
else
local args = {payload = payload, error = err}
ExaError:new("E-EDL-3", "Error sending payload {{payload}}: {{error}}", args):raise()
end
end
function Websocket:close()
if self.closed then
return
end
log.trace("Closing websocket")
wsclose(self.websocket)
self.websocket = nil
self.closed = true
end
return Websocket