Finish tracer for chunked upload/download

pull/152/head
Alex Kern 5 years ago
parent 34c45257f4
commit 544408479d
No known key found for this signature in database
GPG Key ID: F3141D5EDF48F89F

@ -97,6 +97,7 @@ export default function Downloader({
const [shouldAttemptConnection, setShouldAttemptConnection] = useState(false) const [shouldAttemptConnection, setShouldAttemptConnection] = useState(false)
const [open, setOpen] = useState(false) const [open, setOpen] = useState(false)
const [downloading, setDownloading] = useState(false) const [downloading, setDownloading] = useState(false)
const [done, setDone] = useState(false)
const [errorMessage, setErrorMessage] = useState<string | null>(null) const [errorMessage, setErrorMessage] = useState<string | null>(null)
useEffect(() => { useEffect(() => {
@ -177,32 +178,59 @@ export default function Downloader({
const handleStartDownload = useCallback(() => { const handleStartDownload = useCallback(() => {
setDownloading(true) setDownloading(true)
const fileStreams = filesInfo.map((_info) => { const fileStreamByPath: Record<
return new ReadableStream({ string,
{
stream: ReadableStream
enqueue: (chunk: any) => void
close: () => void
}
> = {}
const fileStreams = filesInfo.map((info) => {
let enqueue: ((chunk: any) => void) | null = null
let close: (() => void) | null = null
const stream = new ReadableStream({
start(ctrl) { start(ctrl) {
console.log('START') enqueue = (chunk: any) => ctrl.enqueue(chunk)
console.log(ctrl) close = () => ctrl.close()
},
async pull(ctrl) {
console.log('PULL')
console.log(ctrl)
}, },
}) })
fileStreamByPath[info.fullPath] = {
stream,
enqueue,
close,
}
return stream
}) })
const fileStreamByPath: Record<string, ReadableStream> = {} let nextFileIndex = 0
fileStreams.forEach((stream, i) => { const startNextFileOrFinish = (): void => {
fileStreamByPath[filesInfo[i].fullPath] = stream if (nextFileIndex >= filesInfo.length) {
}) return
}
const request: t.TypeOf<typeof Message> = {
type: MessageType.Start,
fullPath: filesInfo[nextFileIndex].fullPath,
offset: 0,
}
dataConnection.send(request)
nextFileIndex++
}
const processChunkFunc = (message: t.TypeOf<typeof ChunkMessage>): void => { const processChunkFunc = (message: t.TypeOf<typeof ChunkMessage>): void => {
const stream = fileStreamByPath[message.fullPath] const fileStream = fileStreamByPath[message.fullPath]
if (!stream) { if (!fileStream) {
console.error('no stream found for ' + message.fullPath) console.error('no stream found for ' + message.fullPath)
return return
} }
console.log(stream) const uInt8 = new Uint8Array(message.bytes as ArrayBuffer)
fileStream.enqueue(uInt8)
if (message.final) {
fileStream.close()
startNextFileOrFinish()
}
} }
processChunk.current = processChunkFunc processChunk.current = processChunkFunc
@ -223,20 +251,23 @@ export default function Downloader({
downloadPromise downloadPromise
.then(() => { .then(() => {
console.log('DONE') const request: t.TypeOf<typeof Message> = {
type: MessageType.Done,
}
dataConnection.send(request)
setDone(true)
}) })
.catch((err) => { .catch((err) => {
console.error(err) console.error(err)
}) })
const request: t.TypeOf<typeof Message> = { startNextFileOrFinish()
type: MessageType.Start,
fullPath: filesInfo[0].fullPath,
offset: 0,
}
dataConnection.send(request)
}, [dataConnection, filesInfo]) }, [dataConnection, filesInfo])
if (done) {
return <div>Done!</div>
}
if (downloading) { if (downloading) {
return <div>Downloading</div> return <div>Downloading</div>
} }

@ -31,8 +31,7 @@ type UploaderConnection = {
// TODO(@kern): Use better values // TODO(@kern): Use better values
const RENEW_INTERVAL = 5000 // 20 minutes const RENEW_INTERVAL = 5000 // 20 minutes
// const MAX_CHUNK_SIZE = 1024 * 1024 // 1 Mi const MAX_CHUNK_SIZE = 10 * 1024 * 1024 // 10 Mi
const MAX_CHUNK_SIZE = 1
function useUploaderChannel( function useUploaderChannel(
uploaderPeerID: string, uploaderPeerID: string,
@ -213,11 +212,13 @@ function useUploaderConnections(
const sendNextChunk = () => { const sendNextChunk = () => {
const end = Math.min(file.size, offset + MAX_CHUNK_SIZE) const end = Math.min(file.size, offset + MAX_CHUNK_SIZE)
const chunkSize = end - offset const chunkSize = end - offset
const final = chunkSize < MAX_CHUNK_SIZE
const request: t.TypeOf<typeof Message> = { const request: t.TypeOf<typeof Message> = {
type: MessageType.Chunk, type: MessageType.Chunk,
fullPath, fullPath,
offset, offset,
bytes: file.slice(offset, end), bytes: file.slice(offset, end),
final,
} }
conn.send(request) conn.send(request)
@ -225,7 +226,7 @@ function useUploaderConnections(
offset = end offset = end
draft.uploadingOffset = end draft.uploadingOffset = end
if (chunkSize < MAX_CHUNK_SIZE) { if (final) {
draft.status = UploaderConnectionStatus.Paused draft.status = UploaderConnectionStatus.Paused
} else { } else {
sendChunkTimeout = setTimeout(() => { sendChunkTimeout = setTimeout(() => {
@ -251,6 +252,19 @@ function useUploaderConnections(
sendChunkTimeout = null sendChunkTimeout = null
} }
}) })
break
}
case MessageType.Done: {
updateConnection((draft) => {
if (draft.status !== UploaderConnectionStatus.Paused) {
return
}
draft.status = UploaderConnectionStatus.Done
conn.close()
})
break
} }
} }
} catch (err) { } catch (err) {
@ -264,7 +278,12 @@ function useUploaderConnections(
} }
updateConnection((draft) => { updateConnection((draft) => {
if (draft.status === UploaderConnectionStatus.InvalidPassword) { if (
[
UploaderConnectionStatus.InvalidPassword,
UploaderConnectionStatus.Done,
].includes(draft.status)
) {
return return
} }

@ -45,6 +45,7 @@ export const ChunkMessage = t.type({
fullPath: t.string, fullPath: t.string,
offset: t.number, offset: t.number,
bytes: t.unknown, bytes: t.unknown,
final: t.boolean,
}) })
export const PauseMessage = t.type({ export const PauseMessage = t.type({

Loading…
Cancel
Save