#include "platform/chunks_download_strategy.hpp" #include "platform/platform.hpp" #include "coding/file_reader.hpp" #include "coding/file_writer.hpp" #include "coding/varint.hpp" #include "base/assert.hpp" #include "base/logging.hpp" #include "base/macros.hpp" namespace downloader { ChunksDownloadStrategy::ChunksDownloadStrategy(std::vector const & urls) { // init servers list for (size_t i = 0; i < urls.size(); ++i) m_servers.push_back(ServerT(urls[i], SERVER_READY)); } std::pair ChunksDownloadStrategy::GetChunk(RangeT const & range) { std::vector::iterator i = lower_bound(m_chunks.begin(), m_chunks.end(), range.first, LessChunks()); if (i != m_chunks.end() && i->m_pos == range.first) { ASSERT_EQUAL((i + 1)->m_pos, range.second + 1, ()); return std::pair(&(*i), std::distance(m_chunks.begin(), i)); } else { LOG(LERROR, ("Downloader error. Invalid chunk range: ", range)); return std::pair(static_cast(0), -1); } } void ChunksDownloadStrategy::InitChunks(int64_t fileSize, int64_t chunkSize, ChunkStatusT status) { if (chunkSize == 0) { int64_t constexpr kMb = 1024 * 1024; size_t const sizeMb = std::max(fileSize / kMb, static_cast(1)); size_t constexpr kTargetCount = 40; size_t constexpr kMinMb = 1; size_t constexpr kMaxMb = 16; size_t const chunkMb = std::min(std::max(sizeMb / kTargetCount, kMinMb), kMaxMb); size_t chunksCount = sizeMb / chunkMb; if (static_cast(kMb * chunkMb * chunksCount) < fileSize) ++chunksCount; ASSERT_GREATER_OR_EQUAL(static_cast(kMb * chunkMb * chunksCount), fileSize, ()); LOG(LINFO, ("File size", sizeMb, "MB; chunk size", chunkMb, "MB; chunks count", chunksCount)); m_chunks.reserve(chunksCount + 1); chunkSize = chunkMb * kMb; } else { m_chunks.reserve(static_cast(fileSize / chunkSize + 2)); } for (int64_t i = 0; i < fileSize; i += chunkSize) m_chunks.push_back(ChunkT(i, status)); // The last AUX chunk is just used to hold end of the range (eof) for the previous chunk. m_chunks.push_back(ChunkT(fileSize, CHUNK_AUX)); } void ChunksDownloadStrategy::AddChunk(RangeT const & range, ChunkStatusT status) { ASSERT_LESS_OR_EQUAL(range.first, range.second, ()); if (m_chunks.empty()) { ASSERT_EQUAL(range.first, 0, ()); m_chunks.push_back(ChunkT(range.first, status)); } else { ASSERT_EQUAL(m_chunks.back().m_pos, range.first, ()); m_chunks.back().m_status = status; } m_chunks.push_back(ChunkT(range.second + 1, CHUNK_AUX)); } void ChunksDownloadStrategy::SaveChunks(int64_t fileSize, std::string const & fName) { if (!m_chunks.empty()) { try { FileWriter w(fName); WriteVarInt(w, fileSize); w.Write(&m_chunks[0], sizeof(ChunkT) * m_chunks.size()); return; } catch (FileWriter::Exception const & e) { LOG(LWARNING, ("Can't save chunks statuses to file", e.Msg())); } } // Delete if no chunks or some error occured. UNUSED_VALUE(Platform::RemoveFileIfExists(fName)); } int64_t ChunksDownloadStrategy::LoadOrInitChunks(std::string const & fName, int64_t fileSize, int64_t chunkSize) { ASSERT(fileSize > 0, ()); if (Platform::IsFileExistsByFullPath(fName)) { try { FileReader r(fName); ReaderSource src(r); int64_t const readSize = ReadVarInt(src); if (readSize == fileSize) { // Load chunks. uint64_t const size = src.Size(); int const stSize = sizeof(ChunkT); auto const count = static_cast(size / stSize); ASSERT_EQUAL(size, stSize * count, ()); m_chunks.resize(count); src.Read(&m_chunks[0], stSize * count); // Reset status "downloading" to "free". int64_t downloadedSize = 0; size_t completed = 0; for (size_t i = 0; i < count - 1; ++i) { if (m_chunks[i].m_status != CHUNK_COMPLETE) m_chunks[i].m_status = CHUNK_FREE; else { downloadedSize += (m_chunks[i + 1].m_pos - m_chunks[i].m_pos); ++completed; } } LOG(LINFO, ("Resumed file: downloaded", downloadedSize / 1024 / 1024, "out of", fileSize / 1024 / 1024, "MB; completed chunks", completed, "out of", count - 1)); return downloadedSize; } } catch (FileReader::Exception const & e) { LOG(LDEBUG, (e.Msg())); } } InitChunks(fileSize, chunkSize); return 0; } std::string ChunksDownloadStrategy::ChunkFinished(bool success, RangeT const & range) { std::pair res = GetChunk(range); std::string url; // find server which was downloading this chunk if (res.first) { for (size_t s = 0; s < m_servers.size(); ++s) { if (m_servers[s].m_chunkIndex == res.second) { url = m_servers[s].m_url; if (success) { LOG(LDEBUG, ("Completed chunk", m_servers[s].m_chunkIndex, "via", m_servers[s].m_url)); // mark server as free and chunk as ready m_servers[s].m_chunkIndex = SERVER_READY; res.first->m_status = CHUNK_COMPLETE; } else { LOG(LWARNING, ("Failed to dl chunk", m_servers[s].m_chunkIndex, "via", m_servers[s].m_url)); // remove failed server and mark chunk as free m_servers.erase(m_servers.begin() + s); res.first->m_status = CHUNK_FREE; } break; } } } return url; } ChunksDownloadStrategy::ResultT ChunksDownloadStrategy::NextChunk(std::string & outUrl, RangeT & range) { // If no servers at all. if (m_servers.empty()) return EDownloadFailed; // Find first free server. ServerT * server = 0; for (size_t i = 0; i < m_servers.size(); ++i) { if (m_servers[i].m_chunkIndex == SERVER_READY) { server = &m_servers[i]; break; } } if (server == 0) return ENoFreeServers; bool allChunksDownloaded = true; // Find first free chunk. for (size_t i = 0; i < m_chunks.size() - 1; ++i) { switch (m_chunks[i].m_status) { case CHUNK_FREE: server->m_chunkIndex = static_cast(i); outUrl = server->m_url; range.first = m_chunks[i].m_pos; range.second = m_chunks[i + 1].m_pos - 1; m_chunks[i].m_status = CHUNK_DOWNLOADING; LOG(LDEBUG, ("Download chunk", server->m_chunkIndex, "via", outUrl)); return ENextChunk; case CHUNK_DOWNLOADING: allChunksDownloaded = false; break; } } return (allChunksDownloaded ? EDownloadSucceeded : ENoFreeServers); } } // namespace downloader