Cyberduck Mountain Duck CLI

Ticket #185: patch_upload_poc.diff

File patch_upload_poc.diff, 5.7 KB (added by caeies, on Nov 10, 2010 at 6:13:14 PM)

proof of concept for the parallelism upload (I've got maximum bandwith in this case)

  • source/ch/ethz/ssh2/sftp/SFTPv3Client.java

     
    12911291    }
    12921292
    12931293    /**
     1294     * A read  is divided into multiple requests sent sequentially before
     1295     * reading any status from the server
     1296     */
     1297    private static class OutstandingAck {
     1298        int req_id;
     1299        /**
     1300         * Read offset to request on server starting at the file offset for the first request.
     1301         */
     1302        long serverOffset;
     1303        /**
     1304         * Length of requested data
     1305         */
     1306        int len;
     1307        /**
     1308         * Offset in destination buffer
     1309         */
     1310        int dstOffset;
     1311        /**
     1312         * Temporary buffer
     1313         */
     1314        byte[] buffer;
     1315    }
     1316
     1317    private void sendWriteRequest(int id, SFTPv3FileHandle handle, long offset, int len, byte[] src, int srcoff) throws IOException {
     1318        TypesWriter tw = new TypesWriter();
     1319        tw.writeString(handle.fileHandle, 0, handle.fileHandle.length);
     1320        tw.writeUINT64(offset);
     1321        tw.writeString(src, srcoff, len);
     1322
     1323        log.log("Sending SSH_FXP_WRITE (" + id + ") " + offset + "/" + len);
     1324        sendMessage(Packet.SSH_FXP_WRITE, id, tw.getBytes());
     1325    }
     1326
     1327    /**
     1328     * Mapping request ID to request.
     1329     */
     1330    Map<Integer, OutstandingAck> pendingWQueue
     1331            = new HashMap<Integer, OutstandingAck>();
     1332
     1333    /**
    12941334     * Write bytes to a file. If <code>len</code> &gt; 32768, then the write operation will
    12951335     * be split into multiple writes.
    12961336     *
     
    13031343     */
    13041344    public void upload(SFTPv3FileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException {
    13051345        checkHandleValidAndOpen(handle);
     1346        /* check that we didn't received anything bad in our queue, empty the input buffer */
     1347        if( (0 >= len) && pendingWQueue.size() > 0 )
     1348        {
     1349            while(!pendingWQueue.isEmpty()) {
     1350                byte[] resp = receiveMessage(34000);
    13061351
     1352                TypesReader tr = new TypesReader(resp);
     1353
     1354                int t = tr.readByte();
     1355
     1356                int rep_id = tr.readUINT32();
     1357                OutstandingAck rack = pendingWQueue.get(rep_id);
     1358
     1359                if(null == rack) {
     1360                    throw new IOException("The server sent an invalid id field.");
     1361                }
     1362                pendingWQueue.remove(rep_id);
     1363
     1364                if(t != Packet.SSH_FXP_STATUS) {
     1365                throw new IOException("The SFTP server sent an unexpected packet type (" + t + ")");
     1366                }
     1367
     1368                int errorCode = tr.readUINT32();
     1369
     1370                if(errorCode == ErrorCodes.SSH_FX_OK) {
     1371                    continue;
     1372                }
     1373
     1374                String errorMessage = tr.readString();
     1375
     1376                throw new SFTPException(errorMessage, errorCode);
     1377
     1378            }
     1379        }
    13071380        while(len > 0) {
    13081381            int writeRequestLen = len;
    13091382
    13101383            if(writeRequestLen > 32768) {
    13111384                writeRequestLen = 32768;
    13121385            }
     1386            OutstandingAck ack = new OutstandingAck();
     1387            ack.req_id = generateNextRequestID();
     1388            ack.serverOffset = fileOffset;
     1389            ack.len = writeRequestLen;
    13131390
    1314             int req_id = generateNextRequestID();
     1391            sendWriteRequest(ack.req_id, handle, ack.serverOffset, ack.len, src, srcoff);
    13151392
    1316             TypesWriter tw = new TypesWriter();
    1317             tw.writeString(handle.fileHandle, 0, handle.fileHandle.length);
    1318             tw.writeUINT64(fileOffset);
    1319             tw.writeString(src, srcoff, writeRequestLen);
     1393            pendingWQueue.put(ack.req_id, ack);
    13201394
    1321             log.log("Sending SSH_FXP_WRITE...");
    1322             sendMessage(Packet.SSH_FXP_WRITE, req_id, tw.getBytes());
     1395            fileOffset += ack.len;
    13231396
    1324             fileOffset += writeRequestLen;
     1397            srcoff += ack.len;
     1398            len -= ack.len;
    13251399
    1326             srcoff += writeRequestLen;
    1327             len -= writeRequestLen;
    1328 
     1400            if(pendingWQueue.size() >= parallelism)
     1401            {
    13291402            byte[] resp = receiveMessage(34000);
    13301403
    13311404            TypesReader tr = new TypesReader(resp);
     
    13331406            int t = tr.readByte();
    13341407
    13351408            int rep_id = tr.readUINT32();
    1336             if(rep_id != req_id) {
     1409            OutstandingAck rack = pendingWQueue.get(rep_id);
     1410
     1411            if(null == rack) {
    13371412                throw new IOException("The server sent an invalid id field.");
    13381413            }
     1414            pendingWQueue.remove(rep_id);
    13391415
    13401416            if(t != Packet.SSH_FXP_STATUS) {
    13411417                throw new IOException("The SFTP server sent an unexpected packet type (" + t + ")");
     
    13501426            String errorMessage = tr.readString();
    13511427
    13521428            throw new SFTPException(errorMessage, errorCode);
     1429            }
    13531430        }
    13541431    }
    13551432
  • source/ch/cyberduck/core/sftp/SFTPPath.java

     
    528528                            this.getParent().getAbsolute(),
    529529                            "0" + this.attributes().getPermission().getOctalString());
    530530                }
     531                // No parallel requests if the file size is smaller than the buffer.
     532                this.getSession().sftp().setDownloadRequestParallelism(
     533                        (int) (this.attributes().getSize() / Preferences.instance().getInteger("connection.chunksize")) + 1
     534                );
    531535                this.upload(out, in, throttle, listener);
    532536            }
    533537        }
swiss made software