diff --git a/src/components/Downloader.tsx b/src/components/Downloader.tsx index fda5330..e19b36a 100644 --- a/src/components/Downloader.tsx +++ b/src/components/Downloader.tsx @@ -97,6 +97,7 @@ export default function Downloader({ const [shouldAttemptConnection, setShouldAttemptConnection] = useState(false) const [open, setOpen] = useState(false) const [downloading, setDownloading] = useState(false) + const [done, setDone] = useState(false) const [errorMessage, setErrorMessage] = useState(null) useEffect(() => { @@ -177,32 +178,59 @@ export default function Downloader({ const handleStartDownload = useCallback(() => { setDownloading(true) - const fileStreams = filesInfo.map((_info) => { - return new ReadableStream({ + const fileStreamByPath: Record< + string, + { + stream: ReadableStream + enqueue: (chunk: any) => void + close: () => void + } + > = {} + const fileStreams = filesInfo.map((info) => { + let enqueue: ((chunk: any) => void) | null = null + let close: (() => void) | null = null + const stream = new ReadableStream({ start(ctrl) { - console.log('START') - console.log(ctrl) - }, - async pull(ctrl) { - console.log('PULL') - console.log(ctrl) + enqueue = (chunk: any) => ctrl.enqueue(chunk) + close = () => ctrl.close() }, }) + fileStreamByPath[info.fullPath] = { + stream, + enqueue, + close, + } + return stream }) - const fileStreamByPath: Record = {} - fileStreams.forEach((stream, i) => { - fileStreamByPath[filesInfo[i].fullPath] = stream - }) + let nextFileIndex = 0 + const startNextFileOrFinish = (): void => { + if (nextFileIndex >= filesInfo.length) { + return + } + + const request: t.TypeOf = { + type: MessageType.Start, + fullPath: filesInfo[nextFileIndex].fullPath, + offset: 0, + } + dataConnection.send(request) + nextFileIndex++ + } const processChunkFunc = (message: t.TypeOf): void => { - const stream = fileStreamByPath[message.fullPath] - if (!stream) { + const fileStream = fileStreamByPath[message.fullPath] + if (!fileStream) { console.error('no stream found for ' + message.fullPath) return } - console.log(stream) + const uInt8 = new Uint8Array(message.bytes as ArrayBuffer) + fileStream.enqueue(uInt8) + if (message.final) { + fileStream.close() + startNextFileOrFinish() + } } processChunk.current = processChunkFunc @@ -223,20 +251,23 @@ export default function Downloader({ downloadPromise .then(() => { - console.log('DONE') + const request: t.TypeOf = { + type: MessageType.Done, + } + dataConnection.send(request) + setDone(true) }) .catch((err) => { console.error(err) }) - const request: t.TypeOf = { - type: MessageType.Start, - fullPath: filesInfo[0].fullPath, - offset: 0, - } - dataConnection.send(request) + startNextFileOrFinish() }, [dataConnection, filesInfo]) + if (done) { + return
Done!
+ } + if (downloading) { return
Downloading
} diff --git a/src/components/Uploader.tsx b/src/components/Uploader.tsx index 8d887b9..37cbe28 100644 --- a/src/components/Uploader.tsx +++ b/src/components/Uploader.tsx @@ -31,8 +31,7 @@ type UploaderConnection = { // TODO(@kern): Use better values const RENEW_INTERVAL = 5000 // 20 minutes -// const MAX_CHUNK_SIZE = 1024 * 1024 // 1 Mi -const MAX_CHUNK_SIZE = 1 +const MAX_CHUNK_SIZE = 10 * 1024 * 1024 // 10 Mi function useUploaderChannel( uploaderPeerID: string, @@ -213,11 +212,13 @@ function useUploaderConnections( const sendNextChunk = () => { const end = Math.min(file.size, offset + MAX_CHUNK_SIZE) const chunkSize = end - offset + const final = chunkSize < MAX_CHUNK_SIZE const request: t.TypeOf = { type: MessageType.Chunk, fullPath, offset, bytes: file.slice(offset, end), + final, } conn.send(request) @@ -225,7 +226,7 @@ function useUploaderConnections( offset = end draft.uploadingOffset = end - if (chunkSize < MAX_CHUNK_SIZE) { + if (final) { draft.status = UploaderConnectionStatus.Paused } else { sendChunkTimeout = setTimeout(() => { @@ -251,6 +252,19 @@ function useUploaderConnections( sendChunkTimeout = null } }) + break + } + + case MessageType.Done: { + updateConnection((draft) => { + if (draft.status !== UploaderConnectionStatus.Paused) { + return + } + + draft.status = UploaderConnectionStatus.Done + conn.close() + }) + break } } } catch (err) { @@ -264,7 +278,12 @@ function useUploaderConnections( } updateConnection((draft) => { - if (draft.status === UploaderConnectionStatus.InvalidPassword) { + if ( + [ + UploaderConnectionStatus.InvalidPassword, + UploaderConnectionStatus.Done, + ].includes(draft.status) + ) { return } diff --git a/src/messages.ts b/src/messages.ts index 23a7eba..602bfb3 100644 --- a/src/messages.ts +++ b/src/messages.ts @@ -45,6 +45,7 @@ export const ChunkMessage = t.type({ fullPath: t.string, offset: t.number, bytes: t.unknown, + final: t.boolean, }) export const PauseMessage = t.type({