40 namespace discord_core_internal {
42 class DiscordCoreAPI_Dll udp_connection {
44 friend class voice_connection;
46 DCA_INLINE udp_connection() {
47 resampleVector.resize(maxBufferSize);
50 DCA_INLINE udp_connection& operator=(udp_connection&& other)
noexcept {
51 resampleVector = std::move(other.resampleVector);
52 outputBuffer = std::move(other.outputBuffer);
53 inputBuffer = std::move(other.inputBuffer);
54 address = std::move(other.address);
55 baseUrl = std::move(other.baseUrl);
56 socket = std::move(other.socket);
57 currentStatus = other.currentStatus;
58 streamType = other.streamType;
59 bytesRead = other.bytesRead;
64 DCA_INLINE udp_connection(udp_connection&& other)
noexcept {
65 *
this = std::move(other);
68 DCA_INLINE udp_connection(
const jsonifier::string& baseUrlNew, uint16_t portNew, stream_type streamTypeNew,
69 std::coroutine_handle<discord_core_api::co_routine<void, false>::promise_type>* token) {
70 resampleVector.resize(maxBufferSize);
71 streamType = streamTypeNew;
74 addrinfo_wrapper hints{};
75 hints->ai_family = AF_INET;
76 hints->ai_socktype = SOCK_DGRAM;
77 hints->ai_protocol = IPPROTO_UDP;
79 if (socket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); socket.operator SOCKET() == INVALID_SOCKET) {
80 message_printer::printError<print_message_type::websocket>(reportError(
"connect::SOCKET(), to: " + baseUrlNew));
81 currentStatus = connection_status::CONNECTION_Error;
82 socket = INVALID_SOCKET;
86 unique_ptr<char> optVal{ makeUnique<char>(
static_cast<char>(1)) };
87 if (
auto returnData = setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, optVal.get(),
sizeof(optVal)); returnData < 0) {
88 message_printer::printError<print_message_type::websocket>(reportError(
"connect::setsockopt(), to: " + baseUrlNew));
89 currentStatus = connection_status::CONNECTION_Error;
90 socket = INVALID_SOCKET;
96 if (ioctlsocket(socket, FIONBIO, &value02)) {
97 message_printer::printError<print_message_type::websocket>(reportError(
"connect::ioctlsocket(), to: " + baseUrlNew));
98 currentStatus = connection_status::CONNECTION_Error;
99 socket = INVALID_SOCKET;
103 if (fcntl(socket, F_SETFL, fcntl(socket, F_GETFL, 0) | O_NONBLOCK)) {
104 message_printer::printError<print_message_type::websocket>(reportError(
"connect::ioctlsocket(), to: " + baseUrlNew));
105 currentStatus = connection_status::CONNECTION_Error;
106 socket = INVALID_SOCKET;
111 if (streamType == stream_type::none) {
112 if (getaddrinfo(baseUrlNew.data(), jsonifier::toString(portNew).data(), hints, address)) {
113 message_printer::printError<print_message_type::websocket>(reportError(
"connect::getaddrinfo(), to: " + baseUrlNew));
114 currentStatus = connection_status::CONNECTION_Error;
115 socket = INVALID_SOCKET;
118 if (::connect(socket, address->ai_addr,
static_cast<int32_t
>(address->ai_addrlen)) == SOCKET_ERROR) {
119 message_printer::printError<print_message_type::websocket>(reportError(
"connect::connect(), to: " + baseUrlNew));
120 currentStatus = connection_status::CONNECTION_Error;
121 socket = INVALID_SOCKET;
124 }
else if (streamType == stream_type::client) {
125 if (getaddrinfo(baseUrlNew.data(), jsonifier::toString(portNew).data(), hints, address)) {
126 message_printer::printError<print_message_type::websocket>(reportError(
"connect::getaddrinfo(), to: " + baseUrlNew));
127 currentStatus = connection_status::CONNECTION_Error;
128 socket = INVALID_SOCKET;
131 static constexpr jsonifier::string_view connectionString{
"connecting" };
132 jsonifier::string connectionStringNew{
"connecting" };
134 while ((result == 0 || errno == EWOULDBLOCK || errno == EINPROGRESS) && !token->promise().stopRequested()) {
136 sendto(socket, connectionString.data(),
static_cast<int32_t
>(connectionString.size()), 0, address->ai_addr,
static_cast<int32_t
>(address->ai_addrlen));
137 std::this_thread::sleep_for(1ns);
139 socklen_t currentSize{
static_cast<socklen_t
>(address->ai_addrlen) };
141 while ((result == 0 || errno == EWOULDBLOCK || errno == EINPROGRESS) && !token->promise().stopRequested()) {
142 result = recvfrom(socket, connectionStringNew.data(),
static_cast<int32_t
>(connectionString.size()), 0, address->ai_addr, ¤tSize);
143 std::this_thread::sleep_for(1ns);
146 hints->ai_flags = AI_PASSIVE;
147 if (getaddrinfo(
nullptr, jsonifier::toString(portNew).data(), hints, address)) {
148 message_printer::printError<print_message_type::websocket>(reportError(
"connect::getaddrinfo(), to: " + baseUrlNew));
149 currentStatus = connection_status::CONNECTION_Error;
150 socket = INVALID_SOCKET;
153 if (
auto result = bind(socket, address->ai_addr,
static_cast<int32_t
>(address->ai_addrlen)); result != 0) {
154 message_printer::printError<print_message_type::websocket>(reportError(
"connect::bind(), to: " + baseUrlNew));
155 currentStatus = connection_status::CONNECTION_Error;
156 socket = INVALID_SOCKET;
159 jsonifier::string connectionString{};
161 socklen_t currentSize{
static_cast<socklen_t
>(address->ai_addrlen) };
162 connectionString.resize(10);
163 while ((result == 0 || errno == EWOULDBLOCK || errno == EINPROGRESS) && !token->promise().stopRequested()) {
164 result = recvfrom(socket, connectionString.data(),
static_cast<int32_t
>(connectionString.size()), 0, address->ai_addr, ¤tSize);
165 std::this_thread::sleep_for(1ns);
167 connectionString =
"connected1";
169 while ((result == 0 || errno == EWOULDBLOCK || errno == EINPROGRESS) && !token->promise().stopRequested()) {
171 sendto(socket, connectionString.data(),
static_cast<int32_t
>(connectionString.size()), 0, address->ai_addr,
static_cast<int32_t
>(address->ai_addrlen));
172 std::this_thread::sleep_for(1ns);
177 DCA_INLINE connection_status processIO() {
178 if (!areWeStillConnected()) {
179 return currentStatus;
181 pollfd readWriteSet{};
182 readWriteSet.fd =
static_cast<SOCKET
>(socket);
183 if (outputBuffer.getUsedSpace() > 0) {
184 readWriteSet.events = POLLIN | POLLOUT;
186 readWriteSet.events = POLLIN;
188 if (
auto returnValue = poll(&readWriteSet, 1, 0); returnValue == SOCKET_ERROR) {
189 message_printer::printError<print_message_type::websocket>(reportSSLError(
"processIO() 00") + reportError(
"processIO() 00"));
190 currentStatus = connection_status::SOCKET_Error;
191 socket = INVALID_SOCKET;
192 return currentStatus;
193 }
else if (returnValue == 0) {
194 return currentStatus;
196 if (readWriteSet.revents & POLLOUT) {
197 if (!processWriteData()) {
198 message_printer::printError<print_message_type::websocket>(reportSSLError(
"processIO() 01") + reportError(
"processIO() 01"));
199 currentStatus = connection_status::WRITE_Error;
200 socket = INVALID_SOCKET;
201 return currentStatus;
204 if (readWriteSet.revents & POLLIN) {
205 if (!processReadData()) {
206 message_printer::printError<print_message_type::websocket>(reportSSLError(
"processIO() 02") + reportError(
"processIO() 02"));
207 currentStatus = connection_status::READ_Error;
208 socket = INVALID_SOCKET;
209 return currentStatus;
212 if (readWriteSet.revents & POLLERR) {
213 message_printer::printError<print_message_type::websocket>(reportSSLError(
"processIO() 03") + reportError(
"processIO() 03"));
214 currentStatus = connection_status::POLLERR_Error;
215 socket = INVALID_SOCKET;
217 if (readWriteSet.revents & POLLNVAL) {
218 message_printer::printError<print_message_type::websocket>(reportSSLError(
"processIO() 04") + reportError(
"processIO() 04"));
219 currentStatus = connection_status::POLLNVAL_Error;
220 socket = INVALID_SOCKET;
222 if (readWriteSet.revents & POLLHUP) {
223 currentStatus = connection_status::POLLHUP_Error;
224 socket = INVALID_SOCKET;
227 return currentStatus;
230 DCA_INLINE
void writeData(jsonifier::string_view_base<uint8_t> dataToWrite) {
231 if (areWeStillConnected()) {
232 uint64_t remainingBytes{ dataToWrite.size() };
233 while (remainingBytes > 0) {
234 uint64_t amountToCollect{};
235 if (dataToWrite.size() >= maxBufferSize) {
236 amountToCollect = maxBufferSize;
238 amountToCollect = dataToWrite.size();
240 outputBuffer.writeData(dataToWrite.data(), amountToCollect);
241 dataToWrite = jsonifier::string_view_base{ dataToWrite.data() + amountToCollect, dataToWrite.size() - amountToCollect };
242 remainingBytes = dataToWrite.size();
247 DCA_INLINE jsonifier::string_view_base<uint8_t> getInputBuffer() {
248 return inputBuffer.readData();
251 DCA_INLINE
bool areWeStillConnected() {
252 if (socket.operator SOCKET() != INVALID_SOCKET) {
255 fdEvent.events = POLLOUT;
256 int32_t result = poll(&fdEvent, 1, 1);
257 if (result == SOCKET_ERROR || fdEvent.revents & POLLHUP || fdEvent.revents & POLLNVAL || fdEvent.revents & POLLERR) {
258 socket = INVALID_SOCKET;
267 DCA_INLINE
bool processWriteData() {
268 if (outputBuffer.getUsedSpace() > 0) {
269 auto bytesToWrite{ outputBuffer.getCurrentTail()->getUsedSpace() };
270 auto newData = outputBuffer.readData();
271 std::memcpy(resampleVector.data(), newData.data(), newData.size());
272 auto writtenBytes{ sendto(socket, resampleVector.data(),
static_cast<int32_t
>(bytesToWrite), 0, address->ai_addr,
static_cast<int32_t
>(address->ai_addrlen)) };
273 if (writtenBytes <= 0 && errno != EWOULDBLOCK && errno != EINPROGRESS) {
275 }
else if (writtenBytes > 0) {
282 DCA_INLINE
bool processReadData() {
285 if (!inputBuffer.isItFull()) {
286 socklen_t currentSize{
static_cast<socklen_t
>(address->ai_addrlen) };
287 uint64_t bytesToRead{ maxBufferSize };
288 readBytes = recvfrom(
static_cast<SOCKET
>(socket), resampleVector.data(),
static_cast<int32_t
>(bytesToRead), 0, address->ai_addr, ¤tSize);
289 if (readBytes <= 0 && errno != EWOULDBLOCK && errno != EINPROGRESS) {
291 }
else if (readBytes > 0) {
292 inputBuffer.writeData(resampleVector.data(),
static_cast<uint64_t
>(readBytes));
293 bytesRead += readBytes;
297 }
while (readBytes > 0);
301 virtual DCA_INLINE
void handleAudioBuffer() = 0;
303 virtual DCA_INLINE
void disconnect() {
304 socket = INVALID_SOCKET;
305 outputBuffer.clear();
309 virtual DCA_INLINE ~udp_connection() {
314 static constexpr uint64_t maxBufferSize{ (1024 * 16) };
315 jsonifier::vector<char> resampleVector{};
316 ring_buffer<uint8_t, 16> outputBuffer{};
317 ring_buffer<uint8_t, 16> inputBuffer{};
318 connection_status currentStatus{};
319 jsonifier::string baseUrl{};
320 addrinfo_wrapper address{};
322 socket_wrapper socket{};
stream_type
For selecting the type of streamer that the given bot is, one must be one server and one of client pe...
The main namespace for the forward-facing interfaces.