DiscordCoreAPI
A Discord bot library written in C++, with custom asynchronous coroutines.
Loading...
Searching...
No Matches
VoiceConnection.cpp
Go to the documentation of this file.
1/*
2 DiscordCoreAPI, A bot library for Discord, written in C++, and featuring explicit multithreading through the usage of custom, asynchronous C++ CoRoutines.
3
4 Copyright 2021, 2022 Chris M. (RealTimeChris)
5
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2.1 of the License, or (at your option) any later version.
10
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
19 USA
20*/
21/// VoiceConnection.cpp - Source file for the voice connection class.
22/// Jul 15, 2021
23/// https://discordcoreapi.com
24/// \file VoiceConnection.cpp
25
28
29namespace DiscordCoreAPI {
30
31 VoiceSocketReadyData::VoiceSocketReadyData(simdjson::ondemand::value jsonObjectData) {
32 this->ip = getString(jsonObjectData, "ip");
33 this->ssrc = getUint32(jsonObjectData, "ssrc");
34 simdjson::ondemand::array arrayValue{};
35 if (jsonObjectData["modes"].get(arrayValue) == simdjson::error_code::SUCCESS) {
36 this->mode.clear();
37 for (simdjson::simdjson_result<simdjson::ondemand::value> value: arrayValue) {
38 if (std::string{ value.get_string().take_value() } == "xsalsa20_poly1305") {
39 this->mode = std::string{ value.get_string().take_value() };
40 }
41 }
42 }
43 this->port = getUint64(jsonObjectData, "port");
44 }
45
46 VoiceUser::VoiceUser(Snowflake userIdNew) noexcept {
47 this->userId = userIdNew;
48 }
49
50 VoiceUser& VoiceUser::operator=(VoiceUser&& data) noexcept {
51 this->payloads = std::move(data.payloads);
52 this->decoder = std::move(data.decoder);
53 this->userId = data.userId;
54 return *this;
55 }
56
57 VoiceUser::VoiceUser(VoiceUser&& data) noexcept {
58 *this = std::move(data);
59 }
60
61 DiscordCoreInternal::OpusDecoderWrapper& VoiceUser::getDecoder() noexcept {
62 return this->decoder;
63 }
64
65 void VoiceUser::insertPayload(std::basic_string_view<std::byte> data) noexcept {
66 if (this->payloads.getFreeSpace() == 0) {
67 this->payloads.getCurrentTail()->clear();
68 this->payloads.modifyReadOrWritePosition(DiscordCoreInternal::RingBufferAccessType::Read, 1);
69 }
70 if (data.size() <= this->payloads.getCurrentHead()->getFreeSpace()) {
71 std::copy(data.data(), data.data() + data.size(), this->payloads.getCurrentHead()->getCurrentHead());
72 this->payloads.getCurrentHead()->modifyReadOrWritePosition(DiscordCoreInternal::RingBufferAccessType::Write, data.size());
73 this->payloads.modifyReadOrWritePosition(DiscordCoreInternal::RingBufferAccessType::Write, 1);
74 }
75 }
76
77 std::basic_string_view<std::byte> VoiceUser::extractPayload() noexcept {
78 std::basic_string_view<std::byte> string{};
79 if (this->payloads.getUsedSpace() > 0 && this->payloads.getCurrentTail()->getUsedSpace() > 0) {
80 string = std::basic_string_view<std::byte>{ this->payloads.getCurrentTail()->getCurrentTail(),
81 this->payloads.getCurrentTail()->getUsedSpace() };
82 this->payloads.getCurrentTail()->clear();
83 this->payloads.modifyReadOrWritePosition(DiscordCoreInternal::RingBufferAccessType::Read, 1);
84 }
85 return string;
86 }
87
88 Snowflake VoiceUser::getUserId() noexcept {
89 return this->userId;
90 }
91
92 RTPPacketEncrypter::RTPPacketEncrypter(uint32_t ssrcNew, const std::basic_string<std::byte>& keysNew) noexcept {
93 this->keys = keysNew;
94 this->ssrc = ssrcNew;
95 }
96
97 std::basic_string_view<std::byte> RTPPacketEncrypter::encryptPacket(DiscordCoreInternal::EncoderReturnData& audioData) noexcept {
98 if (this->keys.size() > 0) {
99 ++this->sequence;
100 this->timeStamp += static_cast<uint32_t>(audioData.sampleCount);
101 const uint8_t headerSize{ 12 };
102 char header[headerSize]{};
103 storeBits(header, this->version);
104 storeBits(header + 1, this->flags);
105 storeBits(header + 2, this->sequence);
106 storeBits(header + 4, this->timeStamp);
107 storeBits(header + 8, this->ssrc);
108 uint8_t nonceForLibSodium[crypto_secretbox_NONCEBYTES]{};
109 for (int8_t x = 0; x < headerSize; ++x) {
110 nonceForLibSodium[x] = header[x];
111 }
112 const uint64_t numOfBytes{ headerSize + audioData.data.size() + crypto_secretbox_MACBYTES };
113 if (this->data.size() < numOfBytes) {
114 this->data.resize(numOfBytes);
115 }
116 for (int8_t x = 0; x < headerSize; ++x) {
117 this->data[x] = static_cast<std::byte>(header[x]);
118 }
119 if (crypto_secretbox_easy(reinterpret_cast<unsigned char*>(this->data.data()) + headerSize,
120 reinterpret_cast<const unsigned char*>(audioData.data.data()), audioData.data.size(), nonceForLibSodium,
121 reinterpret_cast<unsigned char*>(this->keys.data())) != 0) {
122 return {};
123 }
124 return std::basic_string_view<std::byte>{ this->data.data(), numOfBytes };
125 }
126 return {};
127 }
128
129 MovingAverager::MovingAverager(size_t collectionCountNew) noexcept {
130 this->collectionCount = collectionCountNew;
131 }
132
133 void MovingAverager::insertValue(int64_t value) noexcept {
134 this->values.emplace_front(value);
135 if (this->values.size() >= this->collectionCount) {
136 this->values.pop_back();
137 }
138 }
139
140 double MovingAverager::getCurrentValue() noexcept {
141 double returnValue{};
142 if (this->values.size() > 0) {
143 for (auto& value: this->values) {
144 returnValue += static_cast<double>(value);
145 }
146 return returnValue / static_cast<double>(this->values.size());
147 } else {
148 return 0.0f;
149 }
150 }
151
152 VoiceConnectionBridge::VoiceConnectionBridge(DiscordCoreClient* clientPtrNew, StreamType streamType, Snowflake guildIdNew)
153 : UDPConnection(streamType, clientPtrNew->getConfigManager().doWePrintWebSocketErrorMessages()) {
154 this->clientPtr = clientPtrNew;
155 this->guildId = guildIdNew;
156 }
157
158 void VoiceConnectionBridge::parseOutgoingVoiceData() noexcept {
159 std::basic_string_view<std::byte> buffer = this->getInputBuffer();
160 if (buffer.size() > 0) {
161 AudioFrameData frame{};
162 frame += buffer;
163 this->clientPtr->getSongAPI(this->guildId)->audioDataBuffer.send(std::move(frame));
164 }
165 }
166
167 void VoiceConnectionBridge::handleAudioBuffer() noexcept {
168 this->parseOutgoingVoiceData();
169 }
170
171 VoiceConnection::VoiceConnection(DiscordCoreClient* clientPtrNew, DiscordCoreInternal::WebSocketClient* baseShardNew,
172 std::atomic_bool* doWeQuitNew) noexcept
173 : WebSocketCore(&clientPtrNew->configManager, DiscordCoreInternal::WebSocketType::Voice),
174 UDPConnection(StreamType::None, clientPtrNew->configManager.doWePrintWebSocketErrorMessages()) {
175 this->dataOpCode = DiscordCoreInternal::WebSocketOpCode::Op_Text;
176 this->configManager = &clientPtrNew->configManager;
177 this->msPerPacket = 20;
178 this->samplesPerPacket = sampleRatePerSecond / 1000 * msPerPacket;
179 this->discordCoreClient = clientPtrNew;
180 this->downSampledVector.resize(23040);
181 this->upSampledVector.resize(23040);
182 this->baseShard = baseShardNew;
183 this->doWeQuit = doWeQuitNew;
184 }
185
186 Snowflake VoiceConnection::getChannelId() noexcept {
187 return this->voiceConnectInitData.channelId;
188 }
189
190 void VoiceConnection::applyGainRamp(int64_t sampleCount) noexcept {
191 const double increment = (this->currentGain - this->previousGain) / static_cast<double>(sampleCount);
192 for (int64_t x = 0; x < sampleCount / 4; ++x) {
193 __m256d currentSampleNew = _mm256_mul_pd(
194 _mm256_set_pd(static_cast<double>(this->upSampledVector[(x * 4)]), static_cast<double>(this->upSampledVector[(x * 4) + 1]),
195 static_cast<double>(this->upSampledVector[(x * 4) + 2]), static_cast<double>(this->upSampledVector[(x * 4) + 3])),
196 _mm256_add_pd(_mm256_set1_pd(this->currentGain), _mm256_mul_pd(_mm256_set1_pd(increment), _mm256_set_pd(1.0l, 2.0l, 3.0l, 4.0l))));
197 double currentSamplesNew[4]{};
198 _mm256_store_pd(currentSamplesNew,
199 _mm256_blendv_pd(_mm256_max_pd(currentSampleNew, _mm256_set1_pd(static_cast<double>(std::numeric_limits<opus_int16>::min()))),
200 _mm256_min_pd(currentSampleNew, _mm256_set1_pd(static_cast<double>(std::numeric_limits<opus_int16>::max()))),
201 _mm256_cmp_pd(currentSampleNew, _mm256_set1_pd(0.0l), _CMP_GE_OQ)));
202 for (int64_t y = 0; y < 4; ++y) {
203 this->downSampledVector[(x * 4) + y] = static_cast<opus_int16>(currentSamplesNew[y]);
204 }
205 this->currentGain += increment * 4.0l;
206 }
207 }
208
209 void VoiceConnection::parseIncomingVoiceData(std::basic_string_view<std::byte> rawDataBufferNew) noexcept {
210 if (rawDataBufferNew.size() <= 39) {
211 return;
212 }
213 const uint32_t speakerSsrc{ ntohl(*reinterpret_cast<const uint32_t*>(rawDataBufferNew.data() + 8)) };
214 if (this->voiceUsers.contains(speakerSsrc)) {
215 if (72 <= (static_cast<int8_t>(rawDataBufferNew[1]) & 0b0111'1111) && ((static_cast<int8_t>(rawDataBufferNew[1]) & 0b0111'1111) <= 76)) {
216 return;
217 }
218 this->voiceUsers[speakerSsrc]->insertPayload(rawDataBufferNew);
219 }
220 }
221
222 void VoiceConnection::connect(const VoiceConnectInitData& initData) noexcept {
223 this->voiceConnectInitData = initData;
224 this->connections = std::make_unique<ConnectionPackage>();
225 this->connections->currentReconnectTries = this->currentReconnectTries;
226 this->connections->currentShard = this->shard[0];
227 this->activeState.store(VoiceActiveState::Connecting);
228 if (!this->taskThread01) {
229 this->taskThread01 = std::make_unique<std::jthread>([=, this](std::stop_token token) {
230 this->runVoice(token);
231 });
232 }
233 }
234
235 UnboundedMessageBlock<AudioFrameData>& VoiceConnection::getAudioBuffer() noexcept {
236 return this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer;
237 }
238
239 void VoiceConnection::checkForAndSendHeartBeat(const bool isImmedate) noexcept {
240 if (this->heartBeatStopWatch.hasTimePassed() || isImmedate) {
241 Jsonifier data{};
242 data["d"] = std::chrono::duration_cast<Nanoseconds>(HRClock::now().time_since_epoch()).count();
243 data["op"] = 3;
244 data.refreshString(JsonifierSerializeType::Json);
245 std::string string{ data.operator std::string() };
246 this->createHeader(string, this->dataOpCode);
247 if (!this->sendMessage(string, true)) {
248 this->onClosed();
249 return;
250 }
251 this->haveWeReceivedHeartbeatAck = false;
252 this->heartBeatStopWatch.resetTimer();
253 }
254 }
255
256 bool VoiceConnection::onMessageReceived(std::string_view data) noexcept {
257 std::string string{ data };
258 string.reserve(string.size() + simdjson::SIMDJSON_PADDING);
259 DiscordCoreInternal::WebSocketMessage message{};
260 simdjson::ondemand::value value{};
261 if (this->parser.iterate(string.data(), string.length(), string.capacity()).get(value) == simdjson::error_code::SUCCESS) {
262 message = DiscordCoreInternal::WebSocketMessage{ value };
263 }
264
265 if (this->configManager->doWePrintWebSocketSuccessMessages()) {
266 cout << shiftToBrightGreen() << "Message received from Voice WebSocket: " << data << reset() << endl << endl;
267 }
268 switch (static_cast<VoiceSocketOpCodes>(message.op)) {
269 case VoiceSocketOpCodes::Ready_Server: {
270 const VoiceSocketReadyData data{ value["d"] };
271 this->audioSSRC = data.ssrc;
272 this->voiceIp = data.ip;
273 this->port = data.port;
274 this->audioEncryptionMode = data.mode;
275 this->connectionState.store(VoiceConnectionState::Initializing_DatagramSocket);
276 break;
277 }
278 case VoiceSocketOpCodes::Session_Description: {
279 auto arrayValue = getArray(value["d"], "secret_key");
280 if (arrayValue.didItSucceed) {
281 std::basic_string<std::byte> secretKey{};
282 for (simdjson::ondemand::array_iterator iterator = arrayValue.arrayValue.begin(); iterator != arrayValue.arrayValue.end();
283 ++iterator) {
284 secretKey.push_back(static_cast<std::byte>(iterator.operator*().get_uint64().take_value()));
285 }
286 this->encryptionKey = secretKey;
287 }
288 this->packetEncrypter = RTPPacketEncrypter{ this->audioSSRC, this->encryptionKey };
289 this->connectionState.store(VoiceConnectionState::Collecting_Init_Data);
290 break;
291 }
292 case VoiceSocketOpCodes::Speaking: {
293 const uint32_t ssrc = getUint32(value["d"], "ssrc");
294 std::unique_ptr<VoiceUser> user{ std::make_unique<VoiceUser>(stoull(getString(value["d"], "user_id"))) };
295 if (!Users::getCachedUser({ .userId = user->getUserId() }).getFlagValue(UserFlags::Bot) ||
296 this->voiceConnectInitData.streamInfo.streamBotAudio) {
297 if (!this->voiceUsers.contains(ssrc)) {
298 this->voiceUsers.emplace(std::make_pair(ssrc, std::move(user)));
299 }
300 }
301 break;
302 }
303 case VoiceSocketOpCodes::Heartbeat_ACK: {
304 this->haveWeReceivedHeartbeatAck = true;
305 break;
306 }
307 case VoiceSocketOpCodes::Hello: {
308 this->heartBeatStopWatch = StopWatch{ Milliseconds{ static_cast<uint32_t>(getFloat(value["d"], "heartbeat_interval")) } };
309 this->areWeHeartBeating = true;
310 this->connectionState.store(VoiceConnectionState::Sending_Identify);
311 this->currentState.store(DiscordCoreInternal::WebSocketState::Authenticated);
312 this->haveWeReceivedHeartbeatAck = true;
313 break;
314 }
315 case VoiceSocketOpCodes::Resumed: {
316 this->connectionState.store(VoiceConnectionState::Initializing_DatagramSocket);
317 break;
318 }
319 case VoiceSocketOpCodes::Client_Disconnect: {
320 const auto userId = stoull(getString(value["d"], "user_id"));
321 for (auto& [key, value]: this->voiceUsers) {
322 if (userId == value->getUserId()) {
323 this->voiceUsers.erase(key);
324 break;
325 }
326 }
327 break;
328 }
329 }
330 return true;
331 }
332
333 void VoiceConnection::sendSpeakingMessage(const bool isSpeaking) noexcept {
334 DiscordCoreInternal::SendSpeakingData data{};
335 if (!isSpeaking) {
336 data.type = static_cast<DiscordCoreInternal::SendSpeakingType>(0);
337 this->sendSilence();
338 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Write_Only);
339 } else {
340 data.type = DiscordCoreInternal::SendSpeakingType::Microphone;
341 }
342 data.delay = 0;
343 data.ssrc = this->audioSSRC;
344 auto serializer = data.operator Jsonifier();
345 serializer.refreshString(JsonifierSerializeType::Json);
346 std::string string{ serializer.operator std::string() };
347 this->createHeader(string, this->dataOpCode);
348 this->sendMessage(string, true);
349 }
350
351 void VoiceConnection::runVoice(std::stop_token token) noexcept {
352 StopWatch stopWatch{ 20000ms };
353 StopWatch sendSilenceStopWatch{ 5000ms };
354 while (!token.stop_requested() && !this->doWeQuit->load() && this->activeState.load() != VoiceActiveState::Exiting) {
355 switch (this->activeState.load()) {
356 case VoiceActiveState::Connecting: {
357 while (!token.stop_requested() && this->activeState.load() == VoiceActiveState::Connecting) {
358 std::this_thread::sleep_for(1ms);
359 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
360 if (WebSocketCore::processIO(10) == DiscordCoreInternal::ProcessIOResult::Error) {
361 this->onClosed();
362 }
363 }
364 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
365 this->checkForAndSendHeartBeat(false);
366 }
367 this->checkForConnections(token);
368 }
369 break;
370 }
371 case VoiceActiveState::Stopped: {
372 this->areWePlaying.store(false);
373 this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer.clearContents();
374 this->clearAudioData();
375 while (!token.stop_requested() && this->activeState.load() == VoiceActiveState::Stopped) {
376 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Both);
377 if (sendSilenceStopWatch.hasTimePassed()) {
378 sendSilenceStopWatch.resetTimer();
379 this->sendSpeakingMessage(true);
380 this->sendSpeakingMessage(false);
381 }
382 std::this_thread::sleep_for(1ms);
383 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
384 if (WebSocketCore::processIO(10) == DiscordCoreInternal::ProcessIOResult::Error) {
385 this->onClosed();
386 }
387 }
388 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
389 this->checkForAndSendHeartBeat(false);
390 }
391 this->checkForConnections(token);
392 }
393 break;
394 }
395 case VoiceActiveState::Paused: {
396 this->areWePlaying.store(false);
397 while (!token.stop_requested() && this->activeState.load() == VoiceActiveState::Paused) {
398 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Both);
399 if (sendSilenceStopWatch.hasTimePassed()) {
400 sendSilenceStopWatch.resetTimer();
401 this->sendSpeakingMessage(true);
402 this->sendSpeakingMessage(false);
403 }
404 std::this_thread::sleep_for(1ms);
405 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
406 if (WebSocketCore::processIO(10) == DiscordCoreInternal::ProcessIOResult::Error) {
407 this->onClosed();
408 }
409 }
410 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
411 this->checkForAndSendHeartBeat(false);
412 }
413 this->checkForConnections(token);
414 }
415 break;
416 }
417 case VoiceActiveState::Playing: {
418 this->xferAudioData.type = AudioFrameType::Unset;
419 this->xferAudioData.data.clear();
420
421 stopWatch.resetTimer();
422 while (!token.stop_requested() && !UDPConnection::areWeStillConnected()) {
423 if (stopWatch.hasTimePassed() || this->activeState.load() == VoiceActiveState::Exiting) {
424 return;
425 }
426 std::this_thread::sleep_for(1ms);
427 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
428 if (WebSocketCore::processIO(10) == DiscordCoreInternal::ProcessIOResult::Error) {
429 this->onClosed();
430 }
431 }
432 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
433 this->checkForAndSendHeartBeat(false);
434 }
435 this->checkForConnections(token);
436 }
437
438 auto targetTime{ HRClock::now() + this->intervalCount };
439 this->sendSpeakingMessage(false);
440 this->sendSpeakingMessage(true);
441
442 while (!token.stop_requested() && this->activeState.load() == VoiceActiveState::Playing) {
443 int64_t bytesPerSample{ 4 };
444 if (!token.stop_requested() && VoiceConnection::areWeConnected()) {
445 this->checkForAndSendHeartBeat(false);
446 }
447 this->checkForConnections(token);
448 this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer.tryReceive(this->xferAudioData);
449 AudioFrameType frameType{ this->xferAudioData.type };
450 if (this->xferAudioData.currentSize % 480 != 0 || this->xferAudioData.currentSize == 0) {
451 this->areWePlaying.store(false);
452 this->xferAudioData.clearData();
453 } else {
454 this->intervalCount =
455 Nanoseconds{ static_cast<uint64_t>(static_cast<double>(this->xferAudioData.currentSize / bytesPerSample) /
456 static_cast<double>(this->sampleRatePerSecond) * static_cast<double>(this->nsPerSecond)) };
457 this->areWePlaying.store(true);
458 this->audioData += this->xferAudioData;
459 this->currentGuildMemberId = this->xferAudioData.guildMemberId;
460 }
461 std::basic_string_view<std::byte> frame{};
462 bool doWeBreak{};
463 switch (frameType) {
464 case AudioFrameType::RawPCM: {
465 if (this->audioData.getCurrentTail()->getUsedSpace() >=
466 static_cast<uint64_t>(this->samplesPerPacket * bytesPerSample)) {
467 auto encodedFrameData =
468 this->encoder.encodeData(this->audioData.readData(this->samplesPerPacket * bytesPerSample));
469 if (encodedFrameData.data.size() != 0) {
470 frame = this->packetEncrypter.encryptPacket(encodedFrameData);
471 }
472 }
473 break;
474 }
475 case AudioFrameType::Skip: {
476 SongCompletionEventData completionEventData{};
477 completionEventData.guild = Guilds::getCachedGuild({ .guildId = this->voiceConnectInitData.guildId });
478 if (this->currentGuildMemberId != 0) {
479 completionEventData.guildMember = GuildMembers::getCachedGuildMember(
480 { .guildMemberId = this->currentGuildMemberId, .guildId = this->voiceConnectInitData.guildId });
481 }
482 completionEventData.wasItAFail = false;
483 DiscordCoreClient::getSongAPI(this->voiceConnectInitData.guildId)->onSongCompletionEvent(completionEventData);
484 this->areWePlaying.store(false);
485 doWeBreak = true;
486 this->xferAudioData.type = AudioFrameType::Unset;
487 this->xferAudioData.clearData();
488 this->audioData.clear();
489 break;
490 }
491 case AudioFrameType::Unset: {
492 this->sendSilence();
493 break;
494 }
495 }
496 if (doWeBreak) {
497 continue;
498 }
499 auto waitTime = targetTime - HRClock::now();
500 auto waitTimeCount = waitTime.count();
501 int64_t minimumFreeTimeForCheckingProcessIO{ static_cast<int64_t>(static_cast<double>(this->intervalCount.count()) * 0.70l) };
502 if (waitTimeCount >= minimumFreeTimeForCheckingProcessIO && !token.stop_requested() && VoiceConnection::areWeConnected()) {
503 if (WebSocketCore::processIO(1) == DiscordCoreInternal::ProcessIOResult::Error) {
504 this->onClosed();
505 }
506 }
507
508 waitTime = targetTime - HRClock::now();
509 waitTimeCount = static_cast<int64_t>(static_cast<double>(waitTime.count()) * 0.90l);
510 if (waitTimeCount > 0) {
511 nanoSleep(waitTimeCount);
512 }
513 waitTime = targetTime - HRClock::now();
514 waitTimeCount = waitTime.count();
515 if (waitTimeCount > 0 && waitTimeCount < this->intervalCount.count()) {
516 spinLock(waitTimeCount);
517 }
518 if (frame.size() > 0) {
519 this->xferAudioData.clearData();
520 UDPConnection::writeData(frame);
521 if (UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Both) == DiscordCoreInternal::ProcessIOResult::Error) {
522 this->onClosed();
523 }
524 } else {
525 if (UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Read_Only) ==
526 DiscordCoreInternal::ProcessIOResult::Error) {
527 this->onClosed();
528 }
529 }
530
531 targetTime = HRClock::now() + this->intervalCount;
532
533 if (this->streamSocket && this->streamSocket->areWeStillConnected()) {
534 this->mixAudio();
535 if (this->streamSocket->processIO(DiscordCoreInternal::ProcessIOType::Both) ==
536 DiscordCoreInternal::ProcessIOResult::Error) {
537 this->onClosed();
538 }
539 }
540 }
541 break;
542 }
543 case VoiceActiveState::Exiting: {
544 return;
545 }
546 }
547 if (token.stop_requested() || this->activeState == VoiceActiveState::Exiting) {
548 return;
549 }
550 std::this_thread::sleep_for(1ms);
551 }
552 };
553
554 bool VoiceConnection::areWeCurrentlyPlaying() noexcept {
555 return (this->areWePlaying.load() && this->activeState.load() == VoiceActiveState::Playing) ||
556 this->activeState.load() == VoiceActiveState::Paused;
557 }
558
559 void VoiceConnection::checkForConnections(std::stop_token token) noexcept {
560 if (this->connections) {
561 this->connections.reset(nullptr);
562 this->currentState.store(DiscordCoreInternal::WebSocketState::Disconnected);
563 WebSocketCore::ssl = nullptr;
564 if (this->voiceConnectInitData.streamInfo.type != StreamType::None) {
565 WebSocketCore::outputBuffer.clear();
566 WebSocketCore::inputBuffer.clear();
567 UDPConnection::inputBuffer.clear();
568 UDPConnection::outputBuffer.clear();
569 if (this->streamSocket) {
570 this->streamSocket->inputBuffer.clear();
571 this->streamSocket->outputBuffer.clear();
572 }
573 this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer.clearContents();
574 }
575 WebSocketCore::socket = SOCKET_ERROR;
576 UDPConnection::disconnect();
577 this->areWeConnecting.store(true);
578 this->closeCode = 0;
579 this->areWeHeartBeating = false;
580 StopWatch stopWatch{ 10000ms };
581 this->connectionState.store(VoiceConnectionState::Collecting_Init_Data);
582 while (this->baseShard->currentState.load() != DiscordCoreInternal::WebSocketState::Authenticated && !token.stop_requested()) {
583 if (stopWatch.hasTimePassed() || this->activeState.load() == VoiceActiveState::Exiting) {
584 return;
585 }
586 std::this_thread::sleep_for(1ms);
587 }
588 this->connectInternal(token);
589 this->sendSpeakingMessage(true);
590 this->activeState.store(VoiceActiveState::Playing);
591 }
592 }
593
594 void VoiceConnection::handleAudioBuffer() noexcept {
595 if (this->connectionState.load() == VoiceConnectionState::Initializing_DatagramSocket) {
596 } else {
597 std::basic_string_view<std::byte> string = UDPConnection::getInputBuffer();
598 if (this->streamSocket && this->encryptionKey.size() > 0) {
599 this->parseIncomingVoiceData(string);
600 }
601 }
602 }
603
604 void VoiceConnection::connectInternal(std::stop_token token) noexcept {
605 StopWatch stopWatch{ 10000ms };
606 if (this->currentReconnectTries >= this->maxReconnectTries) {
607 this->doWeQuit->store(true);
608 if (this->configManager->doWePrintWebSocketErrorMessages()) {
609 cout << "VoiceConnection::connect() Error: Failed to connect to voice channel!" << endl << endl;
610 }
611 return;
612 }
613 switch (this->connectionState.load()) {
614 case VoiceConnectionState::Collecting_Init_Data: {
615 this->baseShard->voiceConnectionDataBuffersMap[this->voiceConnectInitData.guildId.operator size_t()] =
616 &this->voiceConnectionDataBuffer;
617 this->baseShard->voiceConnectionDataBuffersMap[this->voiceConnectInitData.guildId.operator size_t()]->clearContents();
618 this->baseShard->getVoiceConnectionData(this->voiceConnectInitData);
619
620 if (waitForTimeToPass(this->voiceConnectionDataBuffer, this->voiceConnectionData, 10000)) {
621 ++this->currentReconnectTries;
622 this->onClosed();
623 return;
624 }
625 this->baseUrl = this->voiceConnectionData.endPoint.substr(0, this->voiceConnectionData.endPoint.find(":"));
626 this->connectionState.store(VoiceConnectionState::Initializing_WebSocket);
627 this->connectInternal(token);
628 break;
629 }
630 case VoiceConnectionState::Initializing_WebSocket: {
631 this->currentState.store(DiscordCoreInternal::WebSocketState::Upgrading);
632 if (!WebSocketCore::connect(this->baseUrl, "/?v=4", 443, this->configManager->doWePrintWebSocketErrorMessages(), false)) {
633 ++this->currentReconnectTries;
634 this->onClosed();
635 return;
636 }
637 this->shard[0] = 0;
638 this->shard[1] = 1;
639 while (this->currentState.load() != DiscordCoreInternal::WebSocketState::Collecting_Hello && !token.stop_requested()) {
640 if (WebSocketCore::processIO(10) == DiscordCoreInternal::ProcessIOResult::Error) {
641 ++this->currentReconnectTries;
642 this->onClosed();
643 return;
644 }
645 }
646 this->connectionState.store(VoiceConnectionState::Collecting_Hello);
647 this->connectInternal(token);
648 break;
649 }
650 case VoiceConnectionState::Collecting_Hello: {
651 stopWatch.resetTimer();
652 while (this->connectionState.load() != VoiceConnectionState::Sending_Identify && !token.stop_requested()) {
653 if (stopWatch.hasTimePassed()) {
654 ++this->currentReconnectTries;
655 this->onClosed();
656 return;
657 }
658 if (WebSocketCore::processIO(100) == DiscordCoreInternal::ProcessIOResult::Error) {
659 ++this->currentReconnectTries;
660 this->onClosed();
661 return;
662 }
663 std::this_thread::sleep_for(1ms);
664 }
665 this->currentReconnectTries = 0;
666 this->connectInternal(token);
667 break;
668 }
669 case VoiceConnectionState::Sending_Identify: {
670 this->haveWeReceivedHeartbeatAck = true;
671 DiscordCoreInternal::VoiceIdentifyData data{};
672 data.connectInitData = this->voiceConnectInitData;
673 data.connectionData = this->voiceConnectionData;
674 auto serializer = data.operator Jsonifier();
675 serializer.refreshString(JsonifierSerializeType::Json);
676 std::string string{ serializer.operator std::string() };
677 this->createHeader(string, this->dataOpCode);
678 if (!WebSocketCore::sendMessage(string, true)) {
679 ++this->currentReconnectTries;
680 this->onClosed();
681 return;
682 }
683 this->connectionState.store(VoiceConnectionState::Collecting_Ready);
684 this->connectInternal(token);
685 break;
686 }
687 case VoiceConnectionState::Collecting_Ready: {
688 stopWatch.resetTimer();
689 while (this->connectionState.load() != VoiceConnectionState::Initializing_DatagramSocket && !token.stop_requested()) {
690 if (stopWatch.hasTimePassed()) {
691 ++this->currentReconnectTries;
692 this->onClosed();
693 return;
694 }
695 if (WebSocketCore::processIO(100) == DiscordCoreInternal::ProcessIOResult::Error) {
696 ++this->currentReconnectTries;
697 this->onClosed();
698 return;
699 }
700 std::this_thread::sleep_for(1ms);
701 }
702 this->connectInternal(token);
703 break;
704 }
705 case VoiceConnectionState::Initializing_DatagramSocket: {
706 if (!this->voiceConnect()) {
707 ++this->currentReconnectTries;
708 this->onClosed();
709 return;
710 }
711 this->connectionState.store(VoiceConnectionState::Sending_Select_Protocol);
712 this->connectInternal(token);
713 break;
714 }
715 case VoiceConnectionState::Sending_Select_Protocol: {
716 DiscordCoreInternal::VoiceSocketProtocolPayloadData data{};
717 data.voiceEncryptionMode = this->audioEncryptionMode;
718 data.externalIp = this->externalIp;
719 data.voicePort = this->port;
720 auto serializer = data.operator Jsonifier();
721 serializer.refreshString(JsonifierSerializeType::Json);
722 std::string string{ serializer.operator std::string() };
723 this->createHeader(string, this->dataOpCode);
724 if (!WebSocketCore::sendMessage(string, true)) {
725 ++this->currentReconnectTries;
726 this->onClosed();
727 return;
728 }
729 this->connectionState.store(VoiceConnectionState::Collecting_Session_Description);
730 this->connectInternal(token);
731 break;
732 }
733 case VoiceConnectionState::Collecting_Session_Description: {
734 stopWatch.resetTimer();
735 while (this->connectionState.load() != VoiceConnectionState::Collecting_Init_Data && !token.stop_requested()) {
736 if (stopWatch.hasTimePassed()) {
737 ++this->currentReconnectTries;
738 ;
739 this->onClosed();
740 return;
741 }
742 if (WebSocketCore::processIO(100) == DiscordCoreInternal::ProcessIOResult::Error) {
743 ++this->currentReconnectTries;
744 ;
745 this->onClosed();
746 return;
747 }
748 std::this_thread::sleep_for(1ms);
749 }
750 this->baseShard->voiceConnectionDataBuffersMap[this->voiceConnectInitData.guildId.operator size_t()]->clearContents();
751 this->connectionState.store(VoiceConnectionState::Collecting_Init_Data);
752 if (this->voiceConnectInitData.streamInfo.type != StreamType::None) {
753 if (!this->streamSocket) {
754 this->streamSocket = std::make_unique<VoiceConnectionBridge>(this->discordCoreClient,
755 this->voiceConnectInitData.streamInfo.type, this->voiceConnectInitData.guildId);
756 }
757 if (!this->streamSocket->connect(this->voiceConnectInitData.streamInfo.address, this->voiceConnectInitData.streamInfo.port, false,
758 token)) {
759 ++this->currentReconnectTries;
760 this->onClosed();
761 return;
762 }
763 }
764 this->areWeConnecting.store(false);
765 this->activeState.store(VoiceActiveState::Playing);
766 this->play();
767 return;
768 }
769 }
770 }
771
772 void VoiceConnection::clearAudioData() noexcept {
773 if (this->xferAudioData.data.size() != 0) {
774 this->xferAudioData.clearData();
775 }
776 this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer.clearContents();
777 }
778
779 bool VoiceConnection::areWeConnected() noexcept {
780 return WebSocketCore::areWeStillConnected() && UDPConnection::areWeStillConnected();
781 }
782
783 bool VoiceConnection::voiceConnect() noexcept {
784 if (!UDPConnection::areWeStillConnected() && UDPConnection::connect(this->voiceIp, this->port, false)) {
785 std::byte packet[74]{};
786 const uint16_t val1601{ 0x01 };
787 const uint16_t val1602{ 70 };
788 packet[0] = static_cast<std::byte>(val1601 >> 8);
789 packet[1] = static_cast<std::byte>(val1601 >> 0);
790 packet[2] = static_cast<std::byte>(val1602 >> 8);
791 packet[3] = static_cast<std::byte>(val1602 >> 0);
792 packet[4] = static_cast<std::byte>(this->audioSSRC >> 24);
793 packet[5] = static_cast<std::byte>(this->audioSSRC >> 16);
794 packet[6] = static_cast<std::byte>(this->audioSSRC >> 8);
795 packet[7] = static_cast<std::byte>(this->audioSSRC);
796 UDPConnection::getInputBuffer();
797 UDPConnection::writeData(std::basic_string_view<std::byte>{ packet, std::size(packet) });
798 std::basic_string_view<std::byte> inputStringFirst{};
799 std::basic_string<std::byte> inputString{};
800
801 StopWatch stopWatch{ 5500ms };
802 while (inputStringFirst.size() < 74 && !this->doWeQuit->load() && this->activeState.load() != VoiceActiveState::Exiting) {
803 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Both);
804 inputStringFirst = UDPConnection::getInputBuffer();
805 std::this_thread::sleep_for(1ms);
806 if (stopWatch.hasTimePassed()) {
807 return false;
808 }
809 }
810 inputString.insert(inputString.begin(), inputStringFirst.begin(), inputStringFirst.end());
811 inputString = inputString.substr(8);
812 const auto endLineFind = inputString.find(static_cast<std::byte>('\u0000'), 6);
813 if (endLineFind != std::string::npos) {
814 inputString = inputString.substr(0, endLineFind);
815 }
816 this->externalIp = std::string{ reinterpret_cast<const char*>(inputStringFirst.data()) + 8, inputString.size() };
817 this->voiceConnectionDataBuffer.clearContents();
818 return true;
819 } else {
820 return false;
821 }
822 }
823
824 void VoiceConnection::sendSilence() noexcept {
825 std::vector<std::basic_string<std::byte>> frames{};
826 std::byte arrayNew[3]{};
827 arrayNew[0] = std::byte{ 0xf8 };
828 arrayNew[1] = std::byte{ 0xff };
829 arrayNew[2] = std::byte{ 0xfe };
830 for (size_t x = 0; x < 5; ++x) {
831 DiscordCoreInternal::EncoderReturnData frame{};
832 frame.data = arrayNew;
833 frame.sampleCount = 3;
834 auto packetNew = this->packetEncrypter.encryptPacket(frame);
835 frames.push_back(std::basic_string<std::byte>{ packetNew.data(), packetNew.size() });
836 }
837 for (auto& value: frames) {
838 UDPConnection::writeData(value);
839 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Both);
840 }
841 }
842
843 void VoiceConnection::pauseToggle() noexcept {
844 if (this->activeState.load() == VoiceActiveState::Paused) {
845 this->activeState.store(VoiceActiveState::Playing);
846 } else {
847 this->activeState.store(VoiceActiveState::Paused);
848 }
849 }
850
851 void VoiceConnection::disconnect() noexcept {
852 this->activeState.store(VoiceActiveState::Exiting);
853 std::string payload = "\x03\xE8";
854 this->createHeader(payload, DiscordCoreInternal::WebSocketOpCode::Op_Close);
855 WebSocketCore::writeData(payload, true);
856 UDPConnection::disconnect();
857 WebSocketCore::ssl = nullptr;
858 WebSocketCore::outputBuffer.clear();
859 WebSocketCore::inputBuffer.clear();
860 WebSocketCore::socket = SOCKET_ERROR;
861 if (this->taskThread01) {
862 this->taskThread01->request_stop();
863 if (this->taskThread01->joinable()) {
864 this->taskThread01->join();
865 this->taskThread01.reset(nullptr);
866 }
867 }
868 if (this->streamSocket) {
869 this->streamSocket->disconnect();
870 }
871 if (DiscordCoreClient::getSongAPI(this->voiceConnectInitData.guildId)) {
872 DiscordCoreClient::getSongAPI(this->voiceConnectInitData.guildId)
873 ->onSongCompletionEvent.remove(DiscordCoreClient::getSongAPI(this->voiceConnectInitData.guildId)->eventToken);
874 }
875 this->closeCode = 0;
876 this->areWeHeartBeating = false;
877 this->currentReconnectTries = 0;
878 this->voiceUsers.clear();
879 this->activeState.store(VoiceActiveState::Connecting);
880 this->areWeConnecting.store(true);
881 this->connectionState.store(VoiceConnectionState::Collecting_Init_Data);
882 this->currentState.store(DiscordCoreInternal::WebSocketState::Disconnected);
883 this->discordCoreClient->getSongAPI(this->voiceConnectInitData.guildId)->audioDataBuffer.clearContents();
884 }
885
886 void VoiceConnection::reconnect() noexcept {
887 ++this->currentReconnectTries;
888 this->connections = std::make_unique<ConnectionPackage>();
889 this->connections->currentReconnectTries = this->currentReconnectTries;
890 this->connections->currentShard = this->shard[0];
891 }
892
893 void VoiceConnection::onClosed() noexcept {
894 this->connectionState.store(VoiceConnectionState::Collecting_Init_Data);
895 if (this->activeState.load() != VoiceActiveState::Exiting && this->currentReconnectTries < this->maxReconnectTries) {
896 this->reconnect();
897 } else if (this->currentReconnectTries >= this->maxReconnectTries) {
898 VoiceConnection::disconnect();
899 }
900 }
901
902 void VoiceConnection::mixAudio() noexcept {
903 opus_int32 voiceUserCountReal{};
904 int64_t decodedSize{};
905 std::fill(this->upSampledVector.data(), this->upSampledVector.data() + this->upSampledVector.size(), 0);
906 for (auto& [key, value]: this->voiceUsers) {
907 UDPConnection::processIO(DiscordCoreInternal::ProcessIOType::Read_Only);
908 std::basic_string_view<std::byte> payload{ value->extractPayload() };
909 if (payload.size() == 0) {
910 continue;
911 } else {
912 const uint64_t headerSize{ 12 };
913 const uint64_t csrcCount{ static_cast<uint64_t>(payload[0]) & 0b0000'1111 };
914 const uint64_t offsetToData{ headerSize + sizeof(uint32_t) * csrcCount };
915 const uint64_t encryptedDataLength{ payload.size() - offsetToData };
916
917 if (this->decryptedDataString.size() < encryptedDataLength) {
918 this->decryptedDataString.resize(encryptedDataLength);
919 }
920
921 std::byte nonce[24]{};
922 for (int32_t x = 0; x < headerSize; ++x) {
923 nonce[x] = payload[x];
924 }
925
926 if (crypto_secretbox_open_easy(reinterpret_cast<uint8_t*>(this->decryptedDataString.data()),
927 reinterpret_cast<const uint8_t*>(payload.data()) + offsetToData, encryptedDataLength, reinterpret_cast<uint8_t*>(nonce),
928 reinterpret_cast<uint8_t*>(this->encryptionKey.data()))) {
929 continue;
930 }
931
932 std::basic_string_view newString{ this->decryptedDataString.data(), encryptedDataLength - crypto_secretbox_MACBYTES };
933
934 if (static_cast<int8_t>(payload[0] >> 4) & 0b0001) {
935 const uint16_t extenstionLengthInWords{ ntohs(*reinterpret_cast<const uint16_t*>(&newString[2])) };
936 const size_t extensionLength{ sizeof(uint32_t) * extenstionLengthInWords };
937 const size_t extensionHeaderLength{ sizeof(uint16_t) * 2 };
938 newString = newString.substr(extensionHeaderLength + extensionLength);
939 }
940
941 if (newString.size() > 0) {
942 std::basic_string_view<opus_int16> decodedData{};
943 try {
944 decodedData = value->getDecoder().decodeData(newString);
945 } catch (...) {
946 reportException("VoiceConnection::mixAudio()");
947 }
948 if (decodedData.size() > 0) {
949 decodedSize = std::max(decodedSize, static_cast<int64_t>(decodedData.size()));
950 ++voiceUserCountReal;
951 for (size_t x = 0; x < decodedData.size() / 8; ++x) {
952 __m256i currentValues = _mm256_set_epi32(this->upSampledVector[(x * 8) + 0], this->upSampledVector[(x * 8) + 1],
953 this->upSampledVector[(x * 8) + 2], this->upSampledVector[(x * 8) + 3], this->upSampledVector[(x * 8) + 4],
954 this->upSampledVector[(x * 8) + 5], this->upSampledVector[(x * 8) + 6], this->upSampledVector[(x * 8) + 7]);
955
956 __m256i newValues = _mm256_cvtepi16_epi32(
957 _mm_set_epi16(decodedData[(x * 8) + 0], decodedData[(x * 8) + 1], decodedData[(x * 8) + 2], decodedData[(x * 8) + 3],
958 decodedData[(x * 8) + 4], decodedData[(x * 8) + 5], decodedData[(x * 8) + 6], decodedData[(x * 8) + 7]));
959 __m256i newerValues = _mm256_add_epi32(currentValues, newValues);
960 _mm256_storeu_epi32(&this->upSampledVector[x * 8], newerValues);
961 }
962 }
963 }
964 }
965 }
966 if (decodedSize > 0) {
967 this->voiceUserCountAverage.insertValue(voiceUserCountReal);
968 auto currentPreviousGain = 1.0f / this->voiceUserCountAverage.getCurrentValue();
969 this->currentGain = currentPreviousGain;
970 this->applyGainRamp(decodedSize);
971 this->streamSocket->writeData(std::basic_string_view<std::byte>{ reinterpret_cast<std::byte*>(this->downSampledVector.data()),
972 static_cast<size_t>(decodedSize * 2) });
973 this->previousGain = currentPreviousGain;
974 }
975 }
976
977 bool VoiceConnection::stop() noexcept {
978 this->sendSpeakingMessage(false);
979 this->activeState.store(VoiceActiveState::Stopped);
980 return true;
981 }
982
983 bool VoiceConnection::play() noexcept {
984 this->activeState.store(VoiceActiveState::Playing);
985 return true;
986 }
987
988 VoiceConnection::~VoiceConnection() {
989 this->disconnect();
990 }
991
992}
AudioFrameType
Audio frame types.
Definition: Utilities.hpp:1277
DiscordCoreAPI_Dll void reportException(const std::string &currentFunctionName, std::source_location location=std::source_location::current())
Prints the current file, line, and column from which the function is being called - typically from wi...
Definition: Utilities.cpp:1498
StreamType
For selecting the type of streamer that the given bot is, one must be one server and one of client pe...
Definition: Utilities.hpp:640
The main namespace for this library.
VoiceSocketOpCodes
The various opcodes that could be sent/received by the voice-websocket.
DiscordCoreClient - The main class for this library.
For connecting to a voice-channel. "streamInfo" is used when a socket is created to connect this bot ...
Definition: Utilities.hpp:1311
A thread-safe messaging block for data-structures.
Definition: Utilities.hpp:1645