DiscordCoreAPI
A Discord bot library written in C++, with custom asynchronous coroutines.
Loading...
Searching...
No Matches
UDPConnection.hpp
Go to the documentation of this file.
1/*
2 MIT License
3
4 DiscordCoreAPI, A bot library for Discord, written in C++, and featuring explicit multithreading through the usage of custom, asynchronous C++ CoRoutines.
5
6 Copyright 2022, 2023 Chris M. (RealTimeChris)
7
8 Permission is hereby granted, free of charge, to any person obtaining a copy
9 of this software and associated documentation files (the "Software"), to deal
10 in the Software without restriction, including without limitation the rights
11 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 copies of the Software, and to permit persons to whom the Software is
13 furnished to do so, subject to the following conditions:
14
15 The above copyright notice and this permission notice shall be included in all
16 copies or substantial portions of the Software.
17
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 SOFTWARE.
25*/
26/// UDPConnection.hpp - Header file for the "udp connection" stuff.
27/// Dec 12, 2021
28/// https://discordcoreapi.com
29/// \file UDPConnection.hpp
30#pragma once
31
37
38namespace discord_core_api {
39
40 namespace discord_core_internal {
41
42 class DiscordCoreAPI_Dll udp_connection {
43 public:
44 friend class voice_connection;
45
46 inline udp_connection() {
47 resampleVector.resize(maxBufferSize);
48 }
49
50 inline udp_connection& operator=(udp_connection&& other) noexcept {
51 resampleVector = std::move(other.resampleVector);
52 outputBuffer = std::move(other.outputBuffer);
53 inputBuffer = std::move(other.inputBuffer);
54 address = std::move(other.address);
55 baseUrl = std::move(other.baseUrl);
56 socket = std::move(other.socket);
57 currentStatus = other.currentStatus;
58 streamType = other.streamType;
59 bytesRead = other.bytesRead;
60 port = other.port;
61 return *this;
62 };
63
64 inline udp_connection(udp_connection&& other) noexcept {
65 *this = std::move(other);
66 };
67
68 inline udp_connection(const jsonifier::string& baseUrlNew, uint16_t portNew, stream_type streamTypeNew,
69 std::coroutine_handle<discord_core_api::co_routine<void, false>::promise_type>* token) {
70 resampleVector.resize(maxBufferSize);
71 streamType = streamTypeNew;
72 baseUrl = baseUrlNew;
73 port = portNew;
74 addrinfo_wrapper hints{};
75 hints->ai_family = AF_INET;
76 hints->ai_socktype = SOCK_DGRAM;
77 hints->ai_protocol = IPPROTO_UDP;
78
79 if (socket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); socket.operator SOCKET() == INVALID_SOCKET) {
80 message_printer::printError<print_message_type::websocket>(reportError("connect::SOCKET(), to: " + baseUrlNew));
81 currentStatus = connection_status::CONNECTION_Error;
82 socket = INVALID_SOCKET;
83 return;
84 }
85
86 unique_ptr<char> optVal{ makeUnique<char>(static_cast<char>(1)) };
87 if (auto returnData = setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, optVal.get(), sizeof(optVal)); returnData < 0) {
88 message_printer::printError<print_message_type::websocket>(reportError("connect::setsockopt(), to: " + baseUrlNew));
89 currentStatus = connection_status::CONNECTION_Error;
90 socket = INVALID_SOCKET;
91 return;
92 }
93
94#if defined _WIN32
95 u_long value02{ 1 };
96 if (ioctlsocket(socket, FIONBIO, &value02)) {
97 message_printer::printError<print_message_type::websocket>(reportError("connect::ioctlsocket(), to: " + baseUrlNew));
98 currentStatus = connection_status::CONNECTION_Error;
99 socket = INVALID_SOCKET;
100 return;
101 }
102#else
103 if (fcntl(socket, F_SETFL, fcntl(socket, F_GETFL, 0) | O_NONBLOCK)) {
104 message_printer::printError<print_message_type::websocket>(reportError("connect::ioctlsocket(), to: " + baseUrlNew));
105 currentStatus = connection_status::CONNECTION_Error;
106 socket = INVALID_SOCKET;
107 return;
108 }
109#endif
110
111 if (streamType == stream_type::none) {
112 if (getaddrinfo(baseUrlNew.data(), jsonifier::toString(portNew).data(), hints, address)) {
113 message_printer::printError<print_message_type::websocket>(reportError("connect::getaddrinfo(), to: " + baseUrlNew));
114 currentStatus = connection_status::CONNECTION_Error;
115 socket = INVALID_SOCKET;
116 return;
117 }
118 if (::connect(socket, address->ai_addr, static_cast<int32_t>(address->ai_addrlen)) == SOCKET_ERROR) {
119 message_printer::printError<print_message_type::websocket>(reportError("connect::connect(), to: " + baseUrlNew));
120 currentStatus = connection_status::CONNECTION_Error;
121 socket = INVALID_SOCKET;
122 return;
123 }
124 } else if (streamType == stream_type::client) {
125 if (getaddrinfo(baseUrlNew.data(), jsonifier::toString(portNew).data(), hints, address)) {
126 message_printer::printError<print_message_type::websocket>(reportError("connect::getaddrinfo(), to: " + baseUrlNew));
127 currentStatus = connection_status::CONNECTION_Error;
128 socket = INVALID_SOCKET;
129 return;
130 }
131 static constexpr jsonifier::string_view connectionString{ "connecting" };
132 jsonifier::string connectionStringNew{ "connecting" };
133 int32_t result{};
134 while ((result == 0 || errno == EWOULDBLOCK || errno == EINPROGRESS) && !token->promise().stopRequested()) {
135 result =
136 sendto(socket, connectionString.data(), static_cast<int32_t>(connectionString.size()), 0, address->ai_addr, static_cast<int32_t>(address->ai_addrlen));
137 std::this_thread::sleep_for(1ns);
138 }
139 socklen_t currentSize{ static_cast<socklen_t>(address->ai_addrlen) };
140 result = 0;
141 while ((result == 0 || errno == EWOULDBLOCK || errno == EINPROGRESS) && !token->promise().stopRequested()) {
142 result = recvfrom(socket, connectionStringNew.data(), static_cast<int32_t>(connectionString.size()), 0, address->ai_addr, &currentSize);
143 std::this_thread::sleep_for(1ns);
144 }
145 } else {
146 hints->ai_flags = AI_PASSIVE;
147 if (getaddrinfo(nullptr, jsonifier::toString(portNew).data(), hints, address)) {
148 message_printer::printError<print_message_type::websocket>(reportError("connect::getaddrinfo(), to: " + baseUrlNew));
149 currentStatus = connection_status::CONNECTION_Error;
150 socket = INVALID_SOCKET;
151 return;
152 }
153 if (auto result = bind(socket, address->ai_addr, static_cast<int32_t>(address->ai_addrlen)); result != 0) {
154 message_printer::printError<print_message_type::websocket>(reportError("connect::bind(), to: " + baseUrlNew));
155 currentStatus = connection_status::CONNECTION_Error;
156 socket = INVALID_SOCKET;
157 return;
158 }
159 jsonifier::string connectionString{};
160 int32_t result{};
161 socklen_t currentSize{ static_cast<socklen_t>(address->ai_addrlen) };
162 connectionString.resize(10);
163 while ((result == 0 || errno == EWOULDBLOCK || errno == EINPROGRESS) && !token->promise().stopRequested()) {
164 result = recvfrom(socket, connectionString.data(), static_cast<int32_t>(connectionString.size()), 0, address->ai_addr, &currentSize);
165 std::this_thread::sleep_for(1ns);
166 }
167 connectionString = "connected1";
168 result = 0;
169 while ((result == 0 || errno == EWOULDBLOCK || errno == EINPROGRESS) && !token->promise().stopRequested()) {
170 result =
171 sendto(socket, connectionString.data(), static_cast<int32_t>(connectionString.size()), 0, address->ai_addr, static_cast<int32_t>(address->ai_addrlen));
172 std::this_thread::sleep_for(1ns);
173 }
174 }
175 }
176
177 inline connection_status processIO() {
178 if (!areWeStillConnected()) {
179 return currentStatus;
180 };
181 pollfd readWriteSet{};
182 readWriteSet.fd = static_cast<SOCKET>(socket);
183 if (outputBuffer.getUsedSpace() > 0) {
184 readWriteSet.events = POLLIN | POLLOUT;
185 } else {
186 readWriteSet.events = POLLIN;
187 }
188 if (auto returnValue = poll(&readWriteSet, 1, 0); returnValue == SOCKET_ERROR) {
189 message_printer::printError<print_message_type::websocket>(reportSSLError("processIO() 00") + reportError("processIO() 00"));
190 currentStatus = connection_status::SOCKET_Error;
191 socket = INVALID_SOCKET;
192 return currentStatus;
193 } else if (returnValue == 0) {
194 return currentStatus;
195 } else {
196 if (readWriteSet.revents & POLLOUT) {
197 if (!processWriteData()) {
198 message_printer::printError<print_message_type::websocket>(reportSSLError("processIO() 01") + reportError("processIO() 01"));
199 currentStatus = connection_status::WRITE_Error;
200 socket = INVALID_SOCKET;
201 return currentStatus;
202 }
203 }
204 if (readWriteSet.revents & POLLIN) {
205 if (!processReadData()) {
206 message_printer::printError<print_message_type::websocket>(reportSSLError("processIO() 02") + reportError("processIO() 02"));
207 currentStatus = connection_status::READ_Error;
208 socket = INVALID_SOCKET;
209 return currentStatus;
210 }
211 }
212 if (readWriteSet.revents & POLLERR) {
213 message_printer::printError<print_message_type::websocket>(reportSSLError("processIO() 03") + reportError("processIO() 03"));
214 currentStatus = connection_status::POLLERR_Error;
215 socket = INVALID_SOCKET;
216 }
217 if (readWriteSet.revents & POLLNVAL) {
218 message_printer::printError<print_message_type::websocket>(reportSSLError("processIO() 04") + reportError("processIO() 04"));
219 currentStatus = connection_status::POLLNVAL_Error;
220 socket = INVALID_SOCKET;
221 }
222 if (readWriteSet.revents & POLLHUP) {
223 currentStatus = connection_status::POLLHUP_Error;
224 socket = INVALID_SOCKET;
225 }
226 }
227 return currentStatus;
228 }
229
230 inline void writeData(jsonifier::string_view_base<uint8_t> dataToWrite) {
231 if (areWeStillConnected()) {
232 uint64_t remainingBytes{ dataToWrite.size() };
233 while (remainingBytes > 0) {
234 uint64_t amountToCollect{};
235 if (dataToWrite.size() >= maxBufferSize) {
236 amountToCollect = maxBufferSize;
237 } else {
238 amountToCollect = dataToWrite.size();
239 }
240 outputBuffer.writeData(dataToWrite.data(), amountToCollect);
241 dataToWrite = jsonifier::string_view_base{ dataToWrite.data() + amountToCollect, dataToWrite.size() - amountToCollect };
242 remainingBytes = dataToWrite.size();
243 }
244 }
245 }
246
247 inline jsonifier::string_view_base<uint8_t> getInputBuffer() {
248 return inputBuffer.readData();
249 }
250
251 inline bool areWeStillConnected() {
252 if (socket.operator SOCKET() != INVALID_SOCKET) {
253 pollfd fdEvent = {};
254 fdEvent.fd = socket;
255 fdEvent.events = POLLOUT;
256 int32_t result = poll(&fdEvent, 1, 1);
257 if (result == SOCKET_ERROR || fdEvent.revents & POLLHUP || fdEvent.revents & POLLNVAL || fdEvent.revents & POLLERR) {
258 socket = INVALID_SOCKET;
259 return false;
260 }
261 return true;
262 } else {
263 return false;
264 }
265 }
266
267 inline bool processWriteData() {
268 if (outputBuffer.getUsedSpace() > 0) {
269 auto bytesToWrite{ outputBuffer.getCurrentTail()->getUsedSpace() };
270 auto newData = outputBuffer.readData();
271 std::memcpy(resampleVector.data(), newData.data(), newData.size());
272 auto writtenBytes{ sendto(socket, resampleVector.data(), static_cast<int32_t>(bytesToWrite), 0, address->ai_addr, static_cast<int32_t>(address->ai_addrlen)) };
273 if (writtenBytes <= 0 && errno != EWOULDBLOCK && errno != EINPROGRESS) {
274 return false;
275 } else if (writtenBytes > 0) {
276 return true;
277 }
278 }
279 return true;
280 }
281
282 inline bool processReadData() {
283 int32_t readBytes{};
284 do {
285 if (!inputBuffer.isItFull()) {
286 socklen_t currentSize{ static_cast<socklen_t>(address->ai_addrlen) };
287 uint64_t bytesToRead{ maxBufferSize };
288 readBytes = recvfrom(static_cast<SOCKET>(socket), resampleVector.data(), static_cast<int32_t>(bytesToRead), 0, address->ai_addr, &currentSize);
289 if (readBytes <= 0 && errno != EWOULDBLOCK && errno != EINPROGRESS) {
290 return false;
291 } else if (readBytes > 0) {
292 inputBuffer.writeData(resampleVector.data(), static_cast<uint64_t>(readBytes));
293 bytesRead += readBytes;
294 handleAudioBuffer();
295 }
296 }
297 } while (readBytes > 0);
298 return true;
299 }
300
301 virtual inline void handleAudioBuffer() = 0;
302
303 virtual inline void disconnect() {
304 socket = INVALID_SOCKET;
305 outputBuffer.clear();
306 inputBuffer.clear();
307 }
308
309 virtual inline ~udp_connection() {
310 disconnect();
311 }
312
313 protected:
314 static constexpr uint64_t maxBufferSize{ (1024 * 16) };
315 jsonifier::vector<char> resampleVector{};
316 ring_buffer<uint8_t, 16> outputBuffer{};
317 ring_buffer<uint8_t, 16> inputBuffer{};
318 connection_status currentStatus{};
319 jsonifier::string baseUrl{};
320 addrinfo_wrapper address{};
321 stream_type streamType{};
322 socket_wrapper socket{};
323 int64_t bytesRead{};
324 uint16_t port{};
325 };
326 }
327}
A co_routine - representing a potentially asynchronous operation/function.
Definition: CoRoutine.hpp:83
stream_type
For selecting the type of streamer that the given bot is, one must be one server and one of client pe...
Definition: Base.hpp:837
The main namespace for the forward-facing interfaces.