godot_osc_demo/osc_sender.gd

120 lines
2.6 KiB
GDScript

extends Node
class_name OSCSender
@export var default_address:String = "127.0.0.1"
@export var default_port: int = 9000
@export var debug: bool = false
var _address: String
var _port: int
var _socket = PacketPeerUDP.new()
func _init(dest = [default_address, default_port]):
set_destination(dest)
func _debug(msg) -> void:
if debug:
print(msg)
func set_destination(dest) -> void:
if dest is int:
_socket.set_dest_address(_address, dest)
_port = dest
elif dest is String:
_socket.set_dest_address(dest, _port)
_address = dest
elif dest is Array:
_socket.set_dest_address(dest[0], dest[1])
_address = dest[0]
_port = dest[1]
func create_message(osc_address: String, arg_types: String = "", values: Array = []) -> Array:
if not osc_address.begins_with("/"):
return [ERR_INVALID_DATA]
var buf = StreamPeerBuffer.new()
buf.set_big_endian(true)
_pack_string(buf, osc_address)
_pack_string(buf, "," + arg_types)
var vidx: int = 0
for i in arg_types.length():
var typetag = arg_types[i]
var inc_vidx = true
match typetag:
"i":
buf.put_32(values[vidx])
"h":
buf.put_64(values[vidx])
"f":
buf.put_float(values[vidx])
"d":
buf.put_double(values[vidx])
"c":
buf.put_32(values[vidx])
"s", "S":
_pack_string(buf, values[vidx])
"b":
_pack_blob(buf, values[vidx])
"t":
assert(values[i].size() == 8)
buf.put_data(values[vidx])
"m", "r":
assert(values[vidx].size() == 4)
buf.put_data(PackedByteArray(values[vidx]))
"TFI":
inc_vidx = false # no argument value sent
_:
_debug("Argument type '%s' not supported." % typetag)
return [ERR_INVALID_PARAMETER]
if inc_vidx:
vidx += 1
return [OK, buf.get_data_array()]
func send_osc(osc_address: String, arg_types: String = "", values: Array = [], dest = null) -> Error:
var res = create_message(osc_address, arg_types, values)
if res[0] != OK:
return res[0]
if dest != null:
set_destination(dest)
_socket.put_packet(res[1])
return OK
func _pack_string(buf: StreamPeerBuffer, s: String) -> void:
## Pack a string into a binary OSC buffer
buf.put_data(s.to_ascii_buffer())
buf.put_u8(0)
# pad to next 32-bit offset
while buf.get_position() % 4:
buf.put_u8(0)
func _pack_blob(buf, blob) -> void:
## Pack a PackedByteArray, Array or String into a binary OSC buffer
if blob is String:
blob = blob.to_utf8_buffer()
elif blob is Array:
blob = PackedByteArray(blob)
assert(blob.size() < 2 ** 31) # blob size per spec is _signed_ 32-bit int
buf.put_32(blob.size())
buf.put_data(blob)
# pad to next 32-bit offset
while buf.get_position() % 4:
buf.put_u8(0)