DiscordCoreAPI
A Discord bot library written in C++, with custom asynchronous coroutines.
Loading...
Searching...
No Matches
VoiceConnection.cpp
Go to the documentation of this file.
1/*
2 DiscordCoreAPI, A bot library for Discord, written in C++, and featuring explicit multithreading through the usage of custom, asynchronous C++ CoRoutines.
3
4 Copyright 2021, 2022 Chris M. (RealTimeChris)
5
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2.1 of the License, or (at your option) any later version.
10
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
19 USA
20*/
21/// VoiceConnection.cpp - Source file for the voice connection class.
22/// Jul 15, 2021
23/// https://discordcoreapi.com
24/// \file VoiceConnection.cpp
25
28
29namespace DiscordCoreAPI {
30
31 VoiceSocketReadyData::VoiceSocketReadyData(Value jsonData) {
32 this->ip = getString(jsonData, "ip");
33 this->ssrc = getUint32(jsonData, "ssrc");
34 Array arrayValue{};
35 if (jsonData["modes"].get(arrayValue) == ErrorCode::Success) {
36 this->mode.clear();
37 for (JsonifierResult<Value> value: arrayValue) {
38 if (std::string{ value.getString().value() } == "xsalsa20_poly1305") {
39 this->mode = std::string{ value.getString().value() };
40 }
41 }
42 }
43 this->port = getUint64(jsonData, "port");
44 }
45
46 VoiceUser::VoiceUser(Snowflake userIdNew) noexcept {
47 this->userId = userIdNew;
48 }
49
50 VoiceUser& VoiceUser::operator=(VoiceUser&& data) noexcept {
51 this->payloads = std::move(data.payloads);
52 this->decoder = std::move(data.decoder);
53 this->userId = data.userId;
54 return *this;
55 }
56
57 DiscordCoreInternal::OpusDecoderWrapper& VoiceUser::getDecoder() noexcept {
58 return this->decoder;
59 }
60
61 void VoiceUser::insertPayload(std::basic_string_view<std::byte> data) noexcept {
62 this->payloads.writeData(data);
63 }
64
65 std::basic_string_view<std::byte> VoiceUser::extractPayload() noexcept {
66 return this->payloads.readData();
67 }
68
69 Snowflake VoiceUser::getUserId() noexcept {
70 return this->userId;
71 }
72
73 RTPPacketEncrypter::RTPPacketEncrypter(uint32_t ssrcNew, const std::basic_string<std::byte>& keysNew) noexcept {
74 this->keys = keysNew;
75 this->ssrc = ssrcNew;
76 }
77
78 std::basic_string_view<std::byte> RTPPacketEncrypter::encryptPacket(DiscordCoreInternal::EncoderReturnData& audioData) noexcept {
79 if (this->keys.size() > 0) {
80 ++this->sequence;
81 this->timeStamp += static_cast<uint32_t>(audioData.sampleCount);
82 const uint8_t headerSize{ 12 };
83 char header[headerSize]{};
84 storeBits(header, this->version);
85 storeBits(header + 1, this->flags);
86 storeBits(header + 2, this->sequence);
87 storeBits(header + 4, this->timeStamp);
88 storeBits(header + 8, this->ssrc);
89 uint8_t nonceForLibSodium[crypto_secretbox_NONCEBYTES]{};
90 for (int8_t x = 0; x < headerSize; ++x) {
91 nonceForLibSodium[x] = header[x];
92 }
93 const uint64_t numOfBytes{ headerSize + audioData.data.size() + crypto_secretbox_MACBYTES };
94 if (this->data.size() < numOfBytes) {
95 this->data.resize(numOfBytes);
96 }
97 for (int8_t x = 0; x < headerSize; ++x) {
98 this->data[x] = static_cast<std::byte>(header[x]);
99 }
100 if (crypto_secretbox_easy(reinterpret_cast<uint8_t*>(this->data.data()) + headerSize,
101 reinterpret_cast<const uint8_t*>(audioData.data.data()), audioData.data.size(), nonceForLibSodium,
102 reinterpret_cast<uint8_t*>(this->keys.data())) != 0) {
103 return {};
104 }
105 return std::basic_string_view<std::byte>{ this->data.data(), numOfBytes };
106 }
107 return {};
108 }
109
110 MovingAverager::MovingAverager(size_t collectionCountNew) noexcept {
111 this->collectionCount = collectionCountNew;
112 }
113
114 void MovingAverager::insertValue(int64_t value) noexcept {
115 this->values.emplace_front(value);
116 if (this->values.size() >= this->collectionCount) {
117 this->values.pop_back();
118 }
119 }
120
121 float MovingAverager::getCurrentValue() noexcept {
122 float returnValue{};
123 if (this->values.size() > 0) {
124 for (auto& value: this->values) {
125 returnValue += static_cast<float>(value);
126 }
127 return returnValue / static_cast<float>(this->values.size());
128 } else {
129 return 0.0f;
130 }
131 }
132
133 VoiceConnectionBridge::VoiceConnectionBridge(DiscordCoreClient* clientPtrNew, std::basic_string<std::byte>& encryptionKeyNew,
134 StreamType streamType, Snowflake guildIdNew)
135 : UDPConnection(streamType, clientPtrNew->getConfigManager().doWePrintWebSocketErrorMessages()) {
136 this->encryptionKey = encryptionKeyNew;
137 this->downSampledVector.resize(23040);
138 this->upSampledVector.resize(23040);
139 this->clientPtr = clientPtrNew;
140 this->guildId = guildIdNew;
141 }
142
143 inline void VoiceConnectionBridge::collectEightElements(opus_int32* dataIn, opus_int16* dataOut) noexcept {
144 __m256 currentSamplesNew256{ _mm256_mul_ps(_mm256_cvtepi32_ps(_mm256_loadu_epi32(dataIn)),
145 _mm256_add_ps(_mm256_set1_ps(this->currentGain),
146 _mm256_mul_ps(_mm256_set1_ps(this->increment), _mm256_set_ps(0.0f, 1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f)))) };
147 __m256i currentSamplesNewer256{ _mm256_cvtps_epi32(_mm256_blendv_ps(
148 _mm256_max_ps(currentSamplesNew256, _mm256_set1_ps(static_cast<float>(std::numeric_limits<opus_int16>::min()))),
149 _mm256_min_ps(currentSamplesNew256, _mm256_set1_ps(static_cast<float>(std::numeric_limits<opus_int16>::max()))),
150 _mm256_cmp_ps(currentSamplesNew256, _mm256_set1_ps(0.0f), _CMP_GE_OQ))) };
151 _mm_storeu_epi16(dataOut,
152 _mm_packs_epi32(_mm256_extractf128_si256(currentSamplesNewer256, 0), _mm256_extractf128_si256(currentSamplesNewer256, 1)));
153 }
154
155 inline void VoiceConnectionBridge::applyGainRamp(int64_t sampleCount) noexcept {
156 this->increment = (this->endGain - this->currentGain) / static_cast<float>(sampleCount);
157 for (int64_t x = 0; x < sampleCount / 8; ++x) {
158 this->collectEightElements(this->upSampledVector.data() + (x * 8), this->downSampledVector.data() + (x * 8));
159 this->currentGain += this->increment * 8.0f;
160 }
161 }
162
163 void VoiceConnectionBridge::parseOutgoingVoiceData() noexcept {
164 std::basic_string_view<std::byte> buffer = this->getInputBuffer();
165 if (buffer == reinterpret_cast<const std::byte*>("goodbye")) {
166 this->clientPtr->getVoiceConnection(this->guildId)->onClosed();
167 return;
168 }
169 if (buffer.size() > 0) {
170 AudioFrameData frame{};
171 frame += buffer;
172 this->clientPtr->getSongAPI(this->guildId)->audioDataBuffer.send(std::move(frame));
173 }
174 }
175
176 void VoiceConnectionBridge::handleAudioBuffer() noexcept {
177 this->parseOutgoingVoiceData();
178 }
179
180 void VoiceConnectionBridge::mixAudio() noexcept {
181 opus_int32 voiceUserCountReal{};
182 int64_t decodedSize{};
183 std::fill(this->upSampledVector.data(), this->upSampledVector.data() + this->upSampledVector.size(), 0);
184 for (auto& [key, value]: this->clientPtr->getVoiceConnection(this->guildId)->voiceUsers) {
185 std::basic_string_view<std::byte> payload{ value->extractPayload() };
186 if (payload.size() == 0) {
187 continue;
188 } else {
189 const uint64_t headerSize{ 12 };
190 const uint64_t csrcCount{ static_cast<uint64_t>(payload[0]) & 0b0000'1111 };
191 const uint64_t offsetToData{ headerSize + sizeof(uint32_t) * csrcCount };
192 const uint64_t encryptedDataLength{ payload.size() - offsetToData };
193
194 if (this->decryptedDataString.size() < encryptedDataLength) {
195 this->decryptedDataString.resize(encryptedDataLength);
196 }
197
198 std::byte nonce[24]{};
199 for (int32_t x = 0; x < headerSize; ++x) {
200 nonce[x] = payload[x];
201 }
202
203 if (crypto_secretbox_open_easy(reinterpret_cast<uint8_t*>(this->decryptedDataString.data()),
204 reinterpret_cast<const uint8_t*>(payload.data()) + offsetToData, encryptedDataLength,
205 reinterpret_cast<uint8_t*>(nonce), reinterpret_cast<uint8_t*>(this->encryptionKey.data()))) {
206 continue;
207 }
208
209 std::basic_string_view newString{ this->decryptedDataString.data(), encryptedDataLength - crypto_secretbox_MACBYTES };
210
211 if (static_cast<int8_t>(payload[0] >> 4) & 0b0001) {
212 const uint16_t extenstionLengthInWords{ ntohs(*reinterpret_cast<const uint16_t*>(&newString[2])) };
213 const size_t extensionLength{ sizeof(uint32_t) * extenstionLengthInWords };
214 const size_t extensionHeaderLength{ sizeof(uint16_t) * 2 };
215 newString = newString.substr(extensionHeaderLength + extensionLength);
216 }
217
218 if (newString.size() > 0) {
219 std::basic_string_view<opus_int16> decodedData{};
220 try {
221 decodedData = value->getDecoder().decodeData(newString);
222 } catch (...) {
223 reportException("VoiceConnection::mixAudio()");
224 }
225 if (decodedData.size() > 0) {
226 decodedSize = std::max(decodedSize, static_cast<int64_t>(decodedData.size()));
227 ++voiceUserCountReal;
228 for (size_t x = 0; x < decodedData.size() / 8; ++x) {
229 _mm256_storeu_epi32(this->upSampledVector.data() + (x * 8),
230 _mm256_add_epi32(_mm256_loadu_epi32(this->upSampledVector.data() + (x * 8)),
231 _mm256_cvtepi16_epi32(_mm_loadu_epi16(decodedData.data() + (x * 8)))));
232 }
233 }
234 }
235 }
236 }
237 if (decodedSize > 0) {
238 this->voiceUserCountAverage.insertValue(voiceUserCountReal);
239 this->endGain = 1.0f / this->voiceUserCountAverage.getCurrentValue();
240 this->applyGainRamp(decodedSize);
241 this->writeData(std::basic_string_view<std::byte>{ reinterpret_cast<std::byte*>(this->downSampledVector.data()),
242 static_cast<size_t>(decodedSize * 2) });
243 this->currentGain = this->endGain;
244 }
245 }
246
247 VoiceConnection::VoiceConnection(DiscordCoreClient* clientPtrNew, DiscordCoreInternal::WebSocketClient* baseShardNew,
248 std::atomic_bool* doWeQuitNew) noexcept
249 : WebSocketCore(&clientPtrNew->configManager, DiscordCoreInternal::WebSocketType::Voice),
250 UDPConnection(StreamType::None, clientPtrNew->configManager.doWePrintWebSocketErrorMessages()) {
251 this->dataOpCode = DiscordCoreInternal::WebSocketOpCode::Op_Text;
252 this->configManager = &clientPtrNew->configManager;
253 this->msPerPacket = 20;
254 this->samplesPerPacket = sampleRatePerSecond / 1000 * msPerPacket;
255 this->discordCoreClient = clientPtrNew;
256 this->baseShard = baseShardNew;
257 this->doWeQuit = doWeQuitNew;
258 }
259
260 Snowflake VoiceConnection::getChannelId() noexcept {
261 return this->voiceConnectInitData.channelId;
262 }
263
264 void VoiceConnection::parseIncomingVoiceData(std::basic_string_view<std::byte> rawDataBufferNew) noexcept {
265 if (rawDataBufferNew.size() <= 39) {
266 return;
267 }
268 const uint32_t speakerSsrc{ ntohl(*reinterpret_cast<const uint32_t*>(rawDataBufferNew.data() + 8)) };
269 if (this->voiceUsers.contains(speakerSsrc)) {
270 if (72 <= (static_cast<int8_t>(rawDataBufferNew[1]) & 0b0111'1111) &&
271 ((static_cast<int8_t>(rawDataBufferNew[1]) & 0b0111'1111) <= 76)) {
272 return;
273 }
274 this->voiceUsers[speakerSsrc]->insertPayload(rawDataBufferNew);
275 }
276 }
277
278 void VoiceConnection::connect(const VoiceConnectInitData& initData) noexcept {
279 this->voiceConnectInitData = initData;
280 this->connections = std::make_unique<ConnectionPackage>();
281 this->connections->currentReconnectTries = this->currentReconnectTries;
282 this->connections->currentShard = this->shard[0];
283 this->activeState.store(VoiceActiveState::Connecting);
284 if (!this->taskThread01) {
285 this->taskThread01 = std::make_unique<std::jthread>([=, this](std::stop_token token) {
286 this->runVoice(token);
287 });
288 }
289 StopWatch stopWatch{ 15000us };
290 while (!WebSocketCore::areWeStillConnected() && !stopWatch.hasTimePassed()) {
291 std::this_thread::sleep_for(1us);
292 }
293 }
294
295 UnboundedMessageBlock<AudioFrameData>& VoiceConnection::getAudioBuffer() noexcept {
296 return this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer;
297 }
298
299 void VoiceConnection::checkForAndSendHeartBeat(const bool isImmedate) noexcept {
300 if (this->heartBeatStopWatch.hasTimePassed() || isImmedate) {
301 Serializer data{};
302 data["d"] = std::chrono::duration_cast<Nanoseconds>(HRClock::now().time_since_epoch()).count();
303 data["op"] = 3;
304 data.refreshString(JsonifierSerializeType::Json);
305 std::string string{ data.operator std::string() };
306 this->createHeader(string, this->dataOpCode);
307 if (!this->sendMessage(string, true)) {
308 this->onClosed();
309 return;
310 }
311 if (this->activeState.load() == VoiceActiveState::Paused || this->activeState.load() == VoiceActiveState::Stopped) {
312 this->sendSilence();
313 }
314 this->haveWeReceivedHeartbeatAck = false;
315 this->heartBeatStopWatch.resetTimer();
316 }
317 }
318
319 void VoiceConnection::sendSpeakingMessage(const bool isSpeaking) noexcept {
320 DiscordCoreInternal::SendSpeakingData data{};
321 if (!isSpeaking) {
322 data.type = static_cast<DiscordCoreInternal::SendSpeakingType>(0);
323 this->sendSilence();
324 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Write_Only);
325 } else {
326 data.type = DiscordCoreInternal::SendSpeakingType::Microphone;
327 }
328 data.delay = 0;
329 data.ssrc = this->audioSSRC;
330 auto serializer = data.operator Serializer();
331 serializer.refreshString(JsonifierSerializeType::Json);
332 std::string string{ serializer.operator std::string() };
333 this->createHeader(string, this->dataOpCode);
334 this->sendMessage(string, true);
335 }
336
337 void VoiceConnection::checkForConnections(std::stop_token token) noexcept {
338 if (this->connections) {
339 this->connections.reset(nullptr);
340 this->currentState.store(DiscordCoreInternal::WebSocketState::Disconnected);
341 WebSocketCore::ssl = nullptr;
342 if (this->voiceConnectInitData.streamInfo.type != StreamType::None) {
343 WebSocketCore::outputBuffer.clear();
344 WebSocketCore::inputBuffer.clear();
345 UDPConnection::inputBuffer.clear();
346 UDPConnection::outputBuffer.clear();
347 if (this->streamSocket) {
348 this->streamSocket->inputBuffer.clear();
349 this->streamSocket->outputBuffer.clear();
350 }
351 this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer.clearContents();
352 }
353 WebSocketCore::socket = SOCKET_ERROR;
354 UDPConnection::disconnect();
355 this->closeCode = 0;
356 this->areWeHeartBeating = false;
357 StopWatch stopWatch{ 10000ms };
358 this->connectionState.store(VoiceConnectionState::Collecting_Init_Data);
359 while (this->baseShard->currentState.load() != DiscordCoreInternal::WebSocketState::Authenticated && !token.stop_requested()) {
360 if (stopWatch.hasTimePassed() || this->activeState.load() == VoiceActiveState::Exiting) {
361 return;
362 }
363 std::this_thread::sleep_for(1ms);
364 }
365 this->connectInternal(token);
366 this->sendSpeakingMessage(true);
367 this->activeState.store(VoiceActiveState::Playing);
368 }
369 }
370
371 bool VoiceConnection::onMessageReceived(std::string_view data) noexcept {
372 DiscordCoreInternal::WebSocketMessage message{};
373 Value value{};
374 if (this->parser.parseJson(data).get(value) == ErrorCode::Success) {
375 message = DiscordCoreInternal::WebSocketMessage{ value };
376 }
377
378 if (this->configManager->doWePrintWebSocketSuccessMessages()) {
379 cout << shiftToBrightGreen() << "Message received from Voice WebSocket: " << data << reset() << endl << endl;
380 }
381 switch (static_cast<VoiceSocketOpCodes>(message.op)) {
382 case VoiceSocketOpCodes::Ready_Server: {
383 const VoiceSocketReadyData data{ value["d"].value() };
384 this->audioSSRC = data.ssrc;
385 this->voiceIp = data.ip;
386 this->port = data.port;
387 this->audioEncryptionMode = data.mode;
388 this->connectionState.store(VoiceConnectionState::Initializing_DatagramSocket);
389 break;
390 }
391 case VoiceSocketOpCodes::Session_Description: {
392 auto arrayValue = getArray(value["d"].value(), "secret_key");
393 if (arrayValue.didItSucceed) {
394 std::basic_string<std::byte> secretKey{};
395 for (auto iterator = arrayValue.arrayValue.begin(); iterator != arrayValue.arrayValue.end(); ++iterator) {
396 secretKey.push_back(static_cast<std::byte>(iterator.operator*().getUint64().value()));
397 }
398 this->encryptionKey = secretKey;
399 }
400 this->packetEncrypter = RTPPacketEncrypter{ this->audioSSRC, this->encryptionKey };
401 this->connectionState.store(VoiceConnectionState::Collecting_Init_Data);
402 break;
403 }
404 case VoiceSocketOpCodes::Speaking: {
405 const uint32_t ssrc = getUint32(value["d"].value(), "ssrc");
406 std::unique_ptr<VoiceUser> user{ std::make_unique<VoiceUser>(stoull(getString(value["d"].value(), "user_id"))) };
407 if (!Users::getCachedUser({ .userId = user->getUserId() }).getFlagValue(UserFlags::Bot) ||
408 this->voiceConnectInitData.streamInfo.streamBotAudio) {
409 if (!this->voiceUsers.contains(ssrc)) {
410 this->voiceUsers.emplace(std::make_pair(ssrc, std::move(user)));
411 }
412 }
413 break;
414 }
415 case VoiceSocketOpCodes::Heartbeat_ACK: {
416 this->haveWeReceivedHeartbeatAck = true;
417 break;
418 }
419 case VoiceSocketOpCodes::Hello: {
420 this->heartBeatStopWatch =
421 StopWatch{ Milliseconds{ static_cast<uint32_t>(getFloat(value["d"].value(), "heartbeat_interval")) } };
422 this->areWeHeartBeating = true;
423 this->connectionState.store(VoiceConnectionState::Sending_Identify);
424 this->currentState.store(DiscordCoreInternal::WebSocketState::Authenticated);
425 this->haveWeReceivedHeartbeatAck = true;
426 break;
427 }
428 case VoiceSocketOpCodes::Resumed: {
429 this->connectionState.store(VoiceConnectionState::Initializing_DatagramSocket);
430 break;
431 }
432 case VoiceSocketOpCodes::Client_Disconnect: {
433 const auto userId = stoull(getString(value["d"].value(), "user_id"));
434 for (auto& [key, value]: this->voiceUsers) {
435 if (userId == value->getUserId()) {
436 this->voiceUsers.erase(key);
437 break;
438 }
439 }
440 break;
441 }
442 }
443 return true;
444 }
445
446 void VoiceConnection::connectInternal(std::stop_token token) noexcept {
447 StopWatch stopWatch{ 10000ms };
448 if (this->currentReconnectTries >= this->maxReconnectTries) {
449 this->doWeQuit->store(true);
450 if (this->configManager->doWePrintWebSocketErrorMessages()) {
451 cout << "VoiceConnection::connect() Error: Failed to connect to voice channel!" << endl << endl;
452 }
453 return;
454 }
455 switch (this->connectionState.load()) {
456 case VoiceConnectionState::Collecting_Init_Data: {
457 this->baseShard->voiceConnectionDataBuffersMap[this->voiceConnectInitData.guildId.operator size_t()] =
458 &this->voiceConnectionDataBuffer;
459 this->baseShard->voiceConnectionDataBuffersMap[this->voiceConnectInitData.guildId.operator size_t()]->clearContents();
460 this->baseShard->getVoiceConnectionData(this->voiceConnectInitData);
461
462 if (waitForTimeToPass(this->voiceConnectionDataBuffer, this->voiceConnectionData, 10000)) {
463 ++this->currentReconnectTries;
464 this->onClosed();
465 return;
466 }
467 this->baseUrl = this->voiceConnectionData.endPoint.substr(0, this->voiceConnectionData.endPoint.find(":"));
468 this->connectionState.store(VoiceConnectionState::Initializing_WebSocket);
469 this->connectInternal(token);
470 break;
471 }
472 case VoiceConnectionState::Initializing_WebSocket: {
473 this->currentState.store(DiscordCoreInternal::WebSocketState::Upgrading);
474 if (!WebSocketCore::connect(this->baseUrl, "/?v=4", 443, this->configManager->doWePrintWebSocketErrorMessages(), false)) {
475 ++this->currentReconnectTries;
476 this->onClosed();
477 return;
478 }
479 this->shard[0] = 0;
480 this->shard[1] = 1;
481 while (this->currentState.load() != DiscordCoreInternal::WebSocketState::Collecting_Hello && !token.stop_requested()) {
482 if (WebSocketCore::processIO(10) == DiscordCoreInternal::ProcessIOResult::Error) {
483 ++this->currentReconnectTries;
484 this->onClosed();
485 return;
486 }
487 }
488 this->connectionState.store(VoiceConnectionState::Collecting_Hello);
489 this->connectInternal(token);
490 break;
491 }
492 case VoiceConnectionState::Collecting_Hello: {
493 stopWatch.resetTimer();
494 while (this->connectionState.load() != VoiceConnectionState::Sending_Identify && !token.stop_requested()) {
495 if (stopWatch.hasTimePassed()) {
496 ++this->currentReconnectTries;
497 this->onClosed();
498 return;
499 }
500 if (WebSocketCore::processIO(100) == DiscordCoreInternal::ProcessIOResult::Error) {
501 ++this->currentReconnectTries;
502 this->onClosed();
503 return;
504 }
505 std::this_thread::sleep_for(1ms);
506 }
507 this->currentReconnectTries = 0;
508 this->connectInternal(token);
509 break;
510 }
511 case VoiceConnectionState::Sending_Identify: {
512 this->haveWeReceivedHeartbeatAck = true;
513 DiscordCoreInternal::VoiceIdentifyData data{};
514 data.connectInitData = this->voiceConnectInitData;
515 data.connectionData = this->voiceConnectionData;
516 auto serializer = data.operator Serializer();
517 serializer.refreshString(JsonifierSerializeType::Json);
518 std::string string{ serializer.operator std::string() };
519 this->createHeader(string, this->dataOpCode);
520 if (!WebSocketCore::sendMessage(string, true)) {
521 ++this->currentReconnectTries;
522 this->onClosed();
523 return;
524 }
525 this->connectionState.store(VoiceConnectionState::Collecting_Ready);
526 this->connectInternal(token);
527 break;
528 }
529 case VoiceConnectionState::Collecting_Ready: {
530 stopWatch.resetTimer();
531 while (this->connectionState.load() != VoiceConnectionState::Initializing_DatagramSocket && !token.stop_requested()) {
532 if (stopWatch.hasTimePassed()) {
533 ++this->currentReconnectTries;
534 this->onClosed();
535 return;
536 }
537 if (WebSocketCore::processIO(100) == DiscordCoreInternal::ProcessIOResult::Error) {
538 ++this->currentReconnectTries;
539 this->onClosed();
540 return;
541 }
542 std::this_thread::sleep_for(1ms);
543 }
544 this->connectInternal(token);
545 break;
546 }
547 case VoiceConnectionState::Initializing_DatagramSocket: {
548 if (!this->voiceConnect()) {
549 ++this->currentReconnectTries;
550 this->onClosed();
551 return;
552 }
553 this->connectionState.store(VoiceConnectionState::Sending_Select_Protocol);
554 this->connectInternal(token);
555 break;
556 }
557 case VoiceConnectionState::Sending_Select_Protocol: {
558 DiscordCoreInternal::VoiceSocketProtocolPayloadData data{};
559 data.voiceEncryptionMode = this->audioEncryptionMode;
560 data.externalIp = this->externalIp;
561 data.voicePort = this->port;
562 auto serializer = data.operator Serializer();
563 serializer.refreshString(JsonifierSerializeType::Json);
564 std::string string{ serializer.operator std::string() };
565 this->createHeader(string, this->dataOpCode);
566 if (!WebSocketCore::sendMessage(string, true)) {
567 ++this->currentReconnectTries;
568 this->onClosed();
569 return;
570 }
571 this->connectionState.store(VoiceConnectionState::Collecting_Session_Description);
572 this->connectInternal(token);
573 break;
574 }
575 case VoiceConnectionState::Collecting_Session_Description: {
576 stopWatch.resetTimer();
577 while (this->connectionState.load() != VoiceConnectionState::Collecting_Init_Data && !token.stop_requested()) {
578 if (stopWatch.hasTimePassed()) {
579 ++this->currentReconnectTries;
580 ;
581 this->onClosed();
582 return;
583 }
584 if (WebSocketCore::processIO(100) == DiscordCoreInternal::ProcessIOResult::Error) {
585 ++this->currentReconnectTries;
586 ;
587 this->onClosed();
588 return;
589 }
590 std::this_thread::sleep_for(1ms);
591 }
592 this->baseShard->voiceConnectionDataBuffersMap[this->voiceConnectInitData.guildId.operator size_t()]->clearContents();
593 this->connectionState.store(VoiceConnectionState::Collecting_Init_Data);
594 if (this->voiceConnectInitData.streamInfo.type != StreamType::None) {
595 this->streamSocket = std::make_unique<VoiceConnectionBridge>(this->discordCoreClient, this->encryptionKey,
596 this->voiceConnectInitData.streamInfo.type, this->voiceConnectInitData.guildId);
597 if (!this->streamSocket->connect(this->voiceConnectInitData.streamInfo.address,
598 this->voiceConnectInitData.streamInfo.port, token)) {
599 ++this->currentReconnectTries;
600 this->onClosed();
601 return;
602 }
603 }
604 this->activeState.store(VoiceActiveState::Playing);
605 this->play();
606 return;
607 }
608 }
609 }
610
611 void VoiceConnection::runVoice(std::stop_token token) noexcept {
612 StopWatch stopWatch{ 20000ms };
613 StopWatch sendSilenceStopWatch{ 5000ms };
614 while (!token.stop_requested() && !this->doWeQuit->load() && this->activeState.load() != VoiceActiveState::Exiting) {
615 switch (this->activeState.load()) {
616 case VoiceActiveState::Connecting: {
617 while (!token.stop_requested() && this->activeState.load() == VoiceActiveState::Connecting) {
618 std::this_thread::sleep_for(1ms);
619 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
620 if (WebSocketCore::processIO(10) == DiscordCoreInternal::ProcessIOResult::Error) {
621 this->onClosed();
622 }
623 }
624 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
625 this->checkForAndSendHeartBeat(false);
626 }
627 this->checkForConnections(token);
628 }
629 break;
630 }
631 case VoiceActiveState::Stopped: {
632 this->areWePlaying.store(false);
633 this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer.clearContents();
634 this->clearAudioData();
635 while (!token.stop_requested() && this->activeState.load() == VoiceActiveState::Stopped) {
636 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Both);
637 std::this_thread::sleep_for(1ms);
638 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
639 if (WebSocketCore::processIO(10) == DiscordCoreInternal::ProcessIOResult::Error) {
640 this->onClosed();
641 }
642 }
643 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
644 this->checkForAndSendHeartBeat(false);
645 }
646 this->checkForConnections(token);
647 }
648 break;
649 }
650 case VoiceActiveState::Paused: {
651 this->areWePlaying.store(false);
652 while (!token.stop_requested() && this->activeState.load() == VoiceActiveState::Paused) {
653 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Both);
654 std::this_thread::sleep_for(1ms);
655 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
656 if (WebSocketCore::processIO(10) == DiscordCoreInternal::ProcessIOResult::Error) {
657 this->onClosed();
658 }
659 }
660 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
661 this->checkForAndSendHeartBeat(false);
662 }
663 this->checkForConnections(token);
664 }
665 break;
666 }
667 case VoiceActiveState::Playing: {
668 this->xferAudioData.type = AudioFrameType::Unset;
669 this->xferAudioData.data.clear();
670
671 stopWatch.resetTimer();
672 while (!token.stop_requested() && !UDPConnection::areWeStillConnected()) {
673 if (stopWatch.hasTimePassed() || this->activeState.load() == VoiceActiveState::Exiting) {
674 return;
675 }
676 std::this_thread::sleep_for(1ms);
677 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
678 if (WebSocketCore::processIO(10) == DiscordCoreInternal::ProcessIOResult::Error) {
679 this->onClosed();
680 }
681 }
682 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
683 this->checkForAndSendHeartBeat(false);
684 }
685 this->checkForConnections(token);
686 }
687
688 auto targetTime{ HRClock::now() + this->intervalCount };
689 this->sendSpeakingMessage(false);
690 this->sendSpeakingMessage(true);
691
692 while (!token.stop_requested() && this->activeState.load() == VoiceActiveState::Playing) {
693 int64_t bytesPerSample{ 4 };
694 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
695 this->checkForAndSendHeartBeat(false);
696 }
697 this->checkForConnections(token);
698 this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)
699 ->audioDataBuffer.tryReceive(this->xferAudioData);
700 AudioFrameType frameType{ this->xferAudioData.type };
701 if (this->xferAudioData.currentSize % 480 != 0 || this->xferAudioData.currentSize == 0) {
702 this->areWePlaying.store(false);
703 this->xferAudioData.clearData();
704 } else {
705 this->intervalCount =
706 Nanoseconds{ static_cast<uint64_t>(static_cast<double>(this->xferAudioData.currentSize / bytesPerSample) /
707 static_cast<double>(this->sampleRatePerSecond) * static_cast<double>(this->nsPerSecond)) };
708 this->areWePlaying.store(true);
709 this->audioData.writeData(static_cast<std::basic_string_view<std::byte>>(this->xferAudioData.data));
710 this->currentGuildMemberId = this->xferAudioData.guildMemberId;
711 }
712 std::basic_string_view<std::byte> frame{};
713 bool doWeBreak{};
714 switch (frameType) {
715 case AudioFrameType::RawPCM: {
716 if (this->audioData.getCurrentTail()->getUsedSpace() >=
717 static_cast<uint64_t>(this->samplesPerPacket * bytesPerSample)) {
718 auto encodedFrameData = this->encoder.encodeData(this->audioData.readData());
719 if (encodedFrameData.data.size() != 0) {
720 frame = this->packetEncrypter.encryptPacket(encodedFrameData);
721 }
722 }
723 break;
724 }
725 case AudioFrameType::Skip: {
726 SongCompletionEventData completionEventData{};
727 completionEventData.guild = Guilds::getCachedGuild({ .guildId = this->voiceConnectInitData.guildId });
728 if (this->currentGuildMemberId != 0) {
729 completionEventData.guildMember = GuildMembers::getCachedGuildMember(
730 { .guildMemberId = this->currentGuildMemberId, .guildId = this->voiceConnectInitData.guildId });
731 }
732 completionEventData.wasItAFail = false;
733 DiscordCoreClient::getSongAPI(this->voiceConnectInitData.guildId)
734 ->onSongCompletionEvent(completionEventData);
735 this->areWePlaying.store(false);
736 doWeBreak = true;
737 this->xferAudioData.type = AudioFrameType::Unset;
738 this->xferAudioData.clearData();
739 this->audioData.clear();
740 break;
741 }
742 case AudioFrameType::Unset: {
743 this->sendSilence();
744 break;
745 }
746 }
747 if (doWeBreak) {
748 continue;
749 }
750 auto waitTime = targetTime - HRClock::now();
751 auto waitTimeCount = waitTime.count();
752 int64_t minimumFreeTimeForCheckingProcessIO{ static_cast<int64_t>(
753 static_cast<double>(this->intervalCount.count()) * 0.70l) };
754 if (waitTimeCount >= minimumFreeTimeForCheckingProcessIO && !token.stop_requested() &&
755 VoiceConnection::areWeConnected()) {
756 if (WebSocketCore::processIO(0) == DiscordCoreInternal::ProcessIOResult::Error) {
757 this->onClosed();
758 }
759 }
760
761 waitTime = targetTime - HRClock::now();
762 waitTimeCount = static_cast<int64_t>(static_cast<double>(waitTime.count()) * 0.95l);
763 if (waitTimeCount > 0) {
764 nanoSleep(waitTimeCount);
765 }
766 waitTime = targetTime - HRClock::now();
767 waitTimeCount = waitTime.count();
768 if (waitTimeCount > 0 && waitTimeCount < this->intervalCount.count()) {
769 spinLock(waitTimeCount);
770 }
771 if (frame.size() > 0) {
772 this->xferAudioData.clearData();
773 UDPConnection::writeData(frame);
774 if (UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Both) ==
775 DiscordCoreInternal::ProcessIOResult::Error) {
776 this->onClosed();
777 }
778 } else {
779 if (UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Read_Only) ==
780 DiscordCoreInternal::ProcessIOResult::Error) {
781 this->onClosed();
782 }
783 }
784
785 targetTime = HRClock::now() + this->intervalCount;
786
787 if (this->streamSocket && this->streamSocket->areWeStillConnected()) {
788 this->streamSocket->mixAudio();
789 if (this->streamSocket->processIO(DiscordCoreInternal::ProcessIOType::Both) ==
790 DiscordCoreInternal::ProcessIOResult::Error) {
791 this->onClosed();
792 }
793 }
794 }
795 break;
796 }
797 case VoiceActiveState::Exiting: {
798 return;
799 }
800 }
801 if (token.stop_requested() || this->activeState == VoiceActiveState::Exiting) {
802 return;
803 }
804 std::this_thread::sleep_for(1ms);
805 }
806 };
807
808 bool VoiceConnection::areWeCurrentlyPlaying() noexcept {
809 return (this->areWePlaying.load() && this->activeState.load() == VoiceActiveState::Playing) ||
810 this->activeState.load() == VoiceActiveState::Paused;
811 }
812
813 void VoiceConnection::handleAudioBuffer() noexcept {
814 if (this->connectionState.load() == VoiceConnectionState::Initializing_DatagramSocket) {
815 } else {
816 std::basic_string_view<std::byte> string = UDPConnection::getInputBuffer();
817 if (this->streamSocket && this->encryptionKey.size() > 0) {
818 this->parseIncomingVoiceData(string);
819 }
820 }
821 }
822
823 void VoiceConnection::clearAudioData() noexcept {
824 if (this->xferAudioData.data.size() != 0) {
825 this->xferAudioData.clearData();
826 }
827 this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer.clearContents();
828 }
829
830 bool VoiceConnection::areWeConnected() noexcept {
831 return WebSocketCore::areWeStillConnected() && UDPConnection::areWeStillConnected();
832 }
833
834 bool VoiceConnection::voiceConnect() noexcept {
835 if (!UDPConnection::areWeStillConnected() && UDPConnection::connect(this->voiceIp, this->port)) {
836 std::byte packet[74]{};
837 const uint16_t val1601{ 0x01 };
838 const uint16_t val1602{ 70 };
839 packet[0] = static_cast<std::byte>(val1601 >> 8);
840 packet[1] = static_cast<std::byte>(val1601 >> 0);
841 packet[2] = static_cast<std::byte>(val1602 >> 8);
842 packet[3] = static_cast<std::byte>(val1602 >> 0);
843 packet[4] = static_cast<std::byte>(this->audioSSRC >> 24);
844 packet[5] = static_cast<std::byte>(this->audioSSRC >> 16);
845 packet[6] = static_cast<std::byte>(this->audioSSRC >> 8);
846 packet[7] = static_cast<std::byte>(this->audioSSRC);
847 UDPConnection::getInputBuffer();
848 UDPConnection::writeData(std::basic_string_view<std::byte>{ packet, std::size(packet) });
849 std::basic_string_view<std::byte> inputStringFirst{};
850 std::basic_string<std::byte> inputString{};
851
852 StopWatch stopWatch{ 5500ms };
853 while (inputStringFirst.size() < 74 && !this->doWeQuit->load() && this->activeState.load() != VoiceActiveState::Exiting) {
854 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Both);
855 inputStringFirst = UDPConnection::getInputBuffer();
856 std::this_thread::sleep_for(1ms);
857 if (stopWatch.hasTimePassed()) {
858 return false;
859 }
860 }
861 inputString.insert(inputString.begin(), inputStringFirst.begin(), inputStringFirst.end());
862 inputString = inputString.substr(8);
863 const auto endLineFind = inputString.find(static_cast<std::byte>('\u0000'), 6);
864 if (endLineFind != std::string::npos) {
865 inputString = inputString.substr(0, endLineFind);
866 }
867 this->externalIp = std::string{ reinterpret_cast<const char*>(inputStringFirst.data()) + 8, inputString.size() };
868 this->voiceConnectionDataBuffer.clearContents();
869 return true;
870 } else {
871 return false;
872 }
873 }
874
875 void VoiceConnection::sendSilence() noexcept {
876 std::vector<std::basic_string<std::byte>> frames{};
877 std::byte arrayNew[3]{};
878 arrayNew[0] = std::byte{ 0xf8 };
879 arrayNew[1] = std::byte{ 0xff };
880 arrayNew[2] = std::byte{ 0xfe };
881 for (size_t x = 0; x < 5; ++x) {
882 DiscordCoreInternal::EncoderReturnData frame{};
883 frame.data = arrayNew;
884 frame.sampleCount = 3;
885 auto packetNew = this->packetEncrypter.encryptPacket(frame);
886 frames.push_back(std::basic_string<std::byte>{ packetNew.data(), packetNew.size() });
887 }
888 for (auto& value: frames) {
889 UDPConnection::writeData(value);
890 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Both);
891 }
892 }
893
894 void VoiceConnection::pauseToggle() noexcept {
895 if (this->activeState.load() == VoiceActiveState::Paused) {
896 this->activeState.store(VoiceActiveState::Playing);
897 } else {
898 this->activeState.store(VoiceActiveState::Paused);
899 }
900 }
901
902 void VoiceConnection::disconnect() noexcept {
903 this->activeState.store(VoiceActiveState::Exiting);
904 std::string payload = "\x03\xE8";
905 this->createHeader(payload, DiscordCoreInternal::WebSocketOpCode::Op_Close);
906 WebSocketCore::writeData(payload, true);
907 WebSocketCore::outputBuffer.clear();
908 WebSocketCore::inputBuffer.clear();
909 WebSocketCore::socket = SOCKET_ERROR;
910 if (this->taskThread01) {
911 this->taskThread01->request_stop();
912 if (this->taskThread01->joinable()) {
913 this->taskThread01->join();
914 this->taskThread01.reset(nullptr);
915 }
916 }
917 WebSocketCore::ssl = nullptr;
918 UDPConnection::disconnect();
919 if (this->streamSocket) {
920 this->streamSocket->disconnect();
921 this->streamSocket.reset(nullptr);
922 }
923 if (DiscordCoreClient::getSongAPI(this->voiceConnectInitData.guildId)) {
924 DiscordCoreClient::getSongAPI(this->voiceConnectInitData.guildId)
925 ->onSongCompletionEvent.remove(DiscordCoreClient::getSongAPI(this->voiceConnectInitData.guildId)->eventToken);
926 }
927 this->closeCode = 0;
928 this->areWeHeartBeating = false;
929 this->currentReconnectTries = 0;
930 this->voiceUsers.clear();
931 this->activeState.store(VoiceActiveState::Connecting);
932 this->connectionState.store(VoiceConnectionState::Collecting_Init_Data);
933 this->currentState.store(DiscordCoreInternal::WebSocketState::Disconnected);
934 this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer.clearContents();
935 }
936
937 void VoiceConnection::reconnect() noexcept {
938 ++this->currentReconnectTries;
939 this->connections = std::make_unique<ConnectionPackage>();
940 this->connections->currentReconnectTries = this->currentReconnectTries;
941 this->connections->currentShard = this->shard[0];
942 }
943
944 void VoiceConnection::onClosed() noexcept {
945 this->connectionState.store(VoiceConnectionState::Collecting_Init_Data);
946 if (this->activeState.load() != VoiceActiveState::Exiting && this->currentReconnectTries < this->maxReconnectTries) {
947 this->reconnect();
948 } else if (this->currentReconnectTries >= this->maxReconnectTries) {
949 VoiceConnection::disconnect();
950 }
951 }
952
953 bool VoiceConnection::stop() noexcept {
954 this->sendSpeakingMessage(false);
955 this->activeState.store(VoiceActiveState::Stopped);
956 return true;
957 }
958
959 bool VoiceConnection::play() noexcept {
960 this->activeState.store(VoiceActiveState::Playing);
961 return true;
962 }
963
964 VoiceConnection::~VoiceConnection() {
965 this->disconnect();
966 }
967
968}
AudioFrameType
Audio frame types.
Definition: Utilities.hpp:825
DiscordCoreAPI_Dll void reportException(const std::string &currentFunctionName, std::source_location location=std::source_location::current())
Prints the current file, line, and column from which the function is being called - typically from wi...
Definition: Utilities.cpp:763
StreamType
For selecting the type of streamer that the given bot is, one must be one server and one of client pe...
Definition: Utilities.hpp:179
The main namespace for this library.
VoiceSocketOpCodes
The various opcodes that could be sent/received by the voice-websocket.
DiscordCoreClient - The main class for this library.
For connecting to a voice-channel. "streamInfo" is used when a socket is created to connect this bot ...
Definition: Utilities.hpp:859
A thread-safe messaging block for data-structures.
Definition: Utilities.hpp:1194