From 34c45257f42cdba0f240dfcfea6ee123ce0f1bfc Mon Sep 17 00:00:00 2001 From: Alex Kern Date: Wed, 20 Jan 2021 19:28:21 -0800 Subject: [PATCH] Add chunk uploading, begin chunk downloading --- src/components/Downloader.tsx | 94 ++++++++++++++++++++++++++++++----- src/components/Uploader.tsx | 87 ++++++++++++++++++++++++++++++-- src/messages.ts | 38 +++++++------- 3 files changed, 186 insertions(+), 33 deletions(-) diff --git a/src/components/Downloader.tsx b/src/components/Downloader.tsx index 62d9693..fda5330 100644 --- a/src/components/Downloader.tsx +++ b/src/components/Downloader.tsx @@ -1,4 +1,4 @@ -import React, { useCallback, useEffect, useState } from 'react' +import React, { useCallback, useEffect, useRef, useState } from 'react' import { useWebRTC } from './WebRTCProvider' import { browserName, @@ -9,8 +9,9 @@ import { mobileModel, } from 'react-device-detect' import * as t from 'io-ts' -import { decodeMessage, Message, MessageType } from '../messages' +import { ChunkMessage, decodeMessage, Message, MessageType } from '../messages' import { createZipStream } from '../zip-stream' +import { DataConnection } from 'peerjs' const baseURL = process.env.NEXT_PUBLIC_BASE_URL ?? 'http://localhost:3000' @@ -82,6 +83,17 @@ export default function Downloader({ const peer = useWebRTC() const [password, setPassword] = useState('') + const [dataConnection, setDataConnection] = useState( + null, + ) + const [filesInfo, setFilesInfo] = useState | null>(null) + const processChunk = useRef< + ((message: t.TypeOf) => void) | null + >(null) const [shouldAttemptConnection, setShouldAttemptConnection] = useState(false) const [open, setOpen] = useState(false) const [downloading, setDownloading] = useState(false) @@ -96,6 +108,8 @@ export default function Downloader({ reliable: true, }) + setDataConnection(conn) + conn.on('open', () => { setOpen(true) @@ -118,7 +132,11 @@ export default function Downloader({ const message = decodeMessage(data) switch (message.type) { case MessageType.Info: - console.log(message) + setFilesInfo(message.files) + break + + case MessageType.Chunk: + if (processChunk.current) processChunk.current(message) break case MessageType.Error: @@ -133,6 +151,7 @@ export default function Downloader({ }) conn.on('close', () => { + setDataConnection(null) setOpen(false) setDownloading(false) setShouldAttemptConnection(false) @@ -158,16 +177,65 @@ export default function Downloader({ const handleStartDownload = useCallback(() => { setDownloading(true) - // TODO(@kern): Download each file as a ReadableStream - // const blob = new Blob(['support blobs too']) - // const file = { - // name: 'blob-example.txt', - // size: 12, - // stream: () => blob.stream(), - // } - // streamDownloadSingleFile(file) - // streamDownloadMultipleFiles([file]) - }, []) + const fileStreams = filesInfo.map((_info) => { + return new ReadableStream({ + start(ctrl) { + console.log('START') + console.log(ctrl) + }, + async pull(ctrl) { + console.log('PULL') + console.log(ctrl) + }, + }) + }) + + const fileStreamByPath: Record = {} + fileStreams.forEach((stream, i) => { + fileStreamByPath[filesInfo[i].fullPath] = stream + }) + + const processChunkFunc = (message: t.TypeOf): void => { + const stream = fileStreamByPath[message.fullPath] + if (!stream) { + console.error('no stream found for ' + message.fullPath) + return + } + + console.log(stream) + } + processChunk.current = processChunkFunc + + const downloads = filesInfo.map((info, i) => ({ + name: info.fullPath.replace(/^\//, ''), + size: info.size, + stream: () => fileStreams[i], + })) + + let downloadPromise: Promise | null = null + if (downloads.length > 1) { + downloadPromise = streamDownloadMultipleFiles(downloads) + } else if (downloads.length === 1) { + downloadPromise = streamDownloadSingleFile(downloads[0]) + } else { + throw new Error('no files to download') + } + + downloadPromise + .then(() => { + console.log('DONE') + }) + .catch((err) => { + console.error(err) + }) + + const request: t.TypeOf = { + type: MessageType.Start, + fullPath: filesInfo[0].fullPath, + offset: 0, + } + dataConnection.send(request) + }, [dataConnection, filesInfo]) if (downloading) { return
Downloading
diff --git a/src/components/Uploader.tsx b/src/components/Uploader.tsx index 3d2e4ac..8d887b9 100644 --- a/src/components/Uploader.tsx +++ b/src/components/Uploader.tsx @@ -9,11 +9,11 @@ import * as t from 'io-ts' enum UploaderConnectionStatus { Pending = 'PENDING', + Paused = 'PAUSED', Uploading = 'UPLOADING', Done = 'DONE', InvalidPassword = 'INVALID_PASSWORD', Closed = 'CLOSED', - Paused = 'PAUSED', } type UploaderConnection = { @@ -25,9 +25,14 @@ type UploaderConnection = { osVersion?: string mobileVendor?: string mobileModel?: string + uploadingFullPath?: string + uploadingOffset?: number } +// TODO(@kern): Use better values const RENEW_INTERVAL = 5000 // 20 minutes +// const MAX_CHUNK_SIZE = 1024 * 1024 // 1 Mi +const MAX_CHUNK_SIZE = 1 function useUploaderChannel( uploaderPeerID: string, @@ -86,6 +91,20 @@ function useUploaderChannelRenewal(shortSlug: string): void { }, [shortSlug]) } +function validateOffset( + files: UploadedFile[], + fullPath: string, + offset: number, +): UploadedFile { + const validFile = files.find( + (file) => file.fullPath === fullPath && offset <= file.size, + ) + if (!validFile) { + throw new Error('invalid file offset') + } + return validFile +} + function useUploaderConnections( peer: Peer, files: UploadedFile[], @@ -95,6 +114,7 @@ function useUploaderConnections( useEffect(() => { peer.on('connection', (conn: DataConnection) => { + let sendChunkTimeout: number | null = null const newConn = { status: UploaderConnectionStatus.Pending, dataConnection: conn, @@ -151,7 +171,7 @@ function useUploaderConnections( return } - draft.status = UploaderConnectionStatus.Uploading + draft.status = UploaderConnectionStatus.Paused draft.browserName = message.browserName draft.browserVersion = message.browserVersion draft.osName = message.osName @@ -163,6 +183,8 @@ function useUploaderConnections( const fileInfo = files.map((f) => { return { fullPath: f.fullPath, + size: f.size, + type: f.type, } }) @@ -171,10 +193,65 @@ function useUploaderConnections( files: fileInfo, } conn.send(request) + break + } + + case MessageType.Start: { + const fullPath = message.fullPath + let offset = message.offset + const file = validateOffset(files, fullPath, offset) + updateConnection((draft) => { + if (draft.status !== UploaderConnectionStatus.Paused) { + return + } + + draft.status = UploaderConnectionStatus.Uploading + draft.uploadingFullPath = fullPath + draft.uploadingOffset = offset + }) + + const sendNextChunk = () => { + const end = Math.min(file.size, offset + MAX_CHUNK_SIZE) + const chunkSize = end - offset + const request: t.TypeOf = { + type: MessageType.Chunk, + fullPath, + offset, + bytes: file.slice(offset, end), + } + conn.send(request) + + updateConnection((draft) => { + offset = end + draft.uploadingOffset = end + + if (chunkSize < MAX_CHUNK_SIZE) { + draft.status = UploaderConnectionStatus.Paused + } else { + sendChunkTimeout = setTimeout(() => { + sendNextChunk() + }, 0) + } + }) + } + sendNextChunk() - // TODO(@kern): Handle sending chunks break } + + case MessageType.Pause: { + updateConnection((draft) => { + if (draft.status !== UploaderConnectionStatus.Uploading) { + return + } + + draft.status = UploaderConnectionStatus.Paused + if (sendChunkTimeout) { + clearTimeout(sendChunkTimeout) + sendChunkTimeout = null + } + }) + } } } catch (err) { console.error(err) @@ -182,6 +259,10 @@ function useUploaderConnections( }) conn.on('close', (): void => { + if (sendChunkTimeout) { + clearTimeout(sendChunkTimeout) + } + updateConnection((draft) => { if (draft.status === UploaderConnectionStatus.InvalidPassword) { return diff --git a/src/messages.ts b/src/messages.ts index 1dc4614..23a7eba 100644 --- a/src/messages.ts +++ b/src/messages.ts @@ -5,13 +5,14 @@ import { fold } from 'fp-ts/Either' export enum MessageType { RequestInfo = 'REQUEST_INFO', Info = 'INFO', + Pause = 'PAUSE', Start = 'START', Chunk = 'CHUNK', - Pause = 'PAUSE', + Done = 'DONE', Error = 'ERROR', } -const RequestInfoMessage = t.type({ +export const RequestInfoMessage = t.type({ type: t.literal(MessageType.RequestInfo), browserName: t.string, browserVersion: t.string, @@ -22,37 +23,39 @@ const RequestInfoMessage = t.type({ password: t.string, }) -const InfoMessage = t.type({ +export const InfoMessage = t.type({ type: t.literal(MessageType.Info), files: t.array( t.type({ fullPath: t.string, + size: t.number, + type: t.string, }), ), }) -const StartMessage = t.type({ +export const StartMessage = t.type({ type: t.literal(MessageType.Start), - browserName: t.string, - browserVersion: t.string, - osName: t.string, - osVersion: t.string, - mobileVendor: t.string, - mobileModel: t.string, - password: t.string, + fullPath: t.string, + offset: t.number, }) -const ChunkMessage = t.type({ +export const ChunkMessage = t.type({ type: t.literal(MessageType.Chunk), - // TODO(@kern): Chunk + fullPath: t.string, + offset: t.number, + bytes: t.unknown, }) -const PauseMessage = t.type({ +export const PauseMessage = t.type({ type: t.literal(MessageType.Pause), - // TODO(@kern): Pausing }) -const ErrorMessage = t.type({ +export const DoneMessage = t.type({ + type: t.literal(MessageType.Done), +}) + +export const ErrorMessage = t.type({ type: t.literal(MessageType.Error), error: t.string, }) @@ -60,9 +63,10 @@ const ErrorMessage = t.type({ export const Message = t.union([ RequestInfoMessage, InfoMessage, + PauseMessage, StartMessage, ChunkMessage, - PauseMessage, + DoneMessage, ErrorMessage, ])