Cyberduck Mountain Duck CLI

Ticket #185: Upload_with_status_queue.patch

File Upload_with_status_queue.patch, 9.4 KB (added by dkocher, on Nov 11, 2010 at 3:31:55 PM)

Upload with status queue.

  • source/ch/ethz/ssh2/sftp/SFTPv3Client.java

     
    10921092     * A read  is divided into multiple requests sent sequentially before
    10931093     * reading any status from the server
    10941094     */
    1095     private static class OutstandingRequest {
     1095    private static class OutstandingReadRequest {
    10961096        int req_id;
    10971097        /**
    10981098         * Read offset to request on server starting at the file offset for the first request.
     
    11351135    /**
    11361136     * @param parallelism
    11371137     */
    1138     public void setDownloadRequestParallelism(int parallelism) {
     1138    public void setRequestParallelism(int parallelism) {
    11391139        this.parallelism = Math.min(parallelism, DEFAULT_MAX_PARALLELISM);
    11401140        log.log("setDownloadRequestParallelism:" + this.parallelism);
    11411141    }
     
    11431143    /**
    11441144     * Mapping request ID to request.
    11451145     */
    1146     Map<Integer, OutstandingRequest> pendingQueue
    1147             = new HashMap<Integer, OutstandingRequest>();
     1146    Map<Integer, OutstandingReadRequest> pendingReadQueue
     1147            = new HashMap<Integer, OutstandingReadRequest>();
    11481148
    11491149    /**
    11501150     * Read bytes from a file in a parallel fashion. As many bytes as you want will be read.
     
    11771177        int clientOffset = dstoff;
    11781178
    11791179        long serverOffset = fileOffset;
    1180         for(OutstandingRequest r : pendingQueue.values()) {
     1180        for(OutstandingReadRequest r : pendingReadQueue.values()) {
    11811181            // Server offset should take pending requests into account.
    11821182            serverOffset += r.len;
    11831183        }
    11841184
    11851185        while(true) {
    1186             /* If there was an error and no outstanding request - stop */
    1187             if((pendingQueue.size() == 0) && errorOccured) {
     1186            // Stop if there was an error and no outstanding request
     1187            if((pendingReadQueue.size() == 0) && errorOccured) {
    11881188                break;
    11891189            }
    11901190
    1191             /* Send as many requests as we are allowed to */
    1192             while(pendingQueue.size() < parallelism) {
     1191            // Send as many requests as we are allowed to
     1192            while(pendingReadQueue.size() < parallelism) {
    11931193                if(errorOccured) {
    11941194                    break;
    11951195                }
    1196                 /* Send the next request */
    1197                 OutstandingRequest req = new OutstandingRequest();
     1196                // Send the next read request
     1197                OutstandingReadRequest req = new OutstandingReadRequest();
    11981198                req.req_id = generateNextRequestID();
    11991199                req.serverOffset = serverOffset;
    12001200                req.len = (remaining > len) ? len : remaining;
     
    12071207
    12081208                sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
    12091209
    1210                 pendingQueue.put(req.req_id, req);
     1210                pendingReadQueue.put(req.req_id, req);
    12111211            }
    1212 
    1213             /* Are we done? */
    1214             if(pendingQueue.size() == 0) {
     1212            if(pendingReadQueue.size() == 0) {
    12151213                break;
    12161214            }
    12171215
    1218             /* No, receive a single answer */
     1216            // Receive a single answer
    12191217            byte[] resp = receiveMessage(34000);
    1220 
    12211218            TypesReader tr = new TypesReader(resp);
    1222 
    12231219            int type = tr.readByte();
    1224             int rep_id = tr.readUINT32();
    1225 
    1226             /* Search the pending queue */
    1227             OutstandingRequest req = pendingQueue.get(rep_id);
    1228 
    1229             /* Should shutdown here, no point in going on */
     1220            // Search the pending queue
     1221            OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32());
    12301222            if(null == req) {
    12311223                throw new IOException("The server sent an invalid id field.");
    12321224            }
    1233 
    1234             pendingQueue.remove(rep_id);
    1235 
    1236             /* Evaluate the answer */
     1225            // Evaluate the answer
    12371226            if(type == Packet.SSH_FXP_STATUS) {
    12381227                /* In any case, stop sending more packets */
    12391228
     
    12461235                }
    12471236                // Flag to read all pending requests but don't send any more.
    12481237                errorOccured = true;
    1249                 if(pendingQueue.isEmpty()) {
     1238                if(pendingReadQueue.isEmpty()) {
    12501239                    if(ErrorCodes.SSH_FX_EOF == code) {
    12511240                        return -1;
    12521241                    }
     
    12541243                }
    12551244            }
    12561245            else if(type == Packet.SSH_FXP_DATA) {
    1257                 /* OK, collect data */
     1246                // OK, collect data
    12581247                int readLen = tr.readUINT32();
    12591248
    12601249                if((readLen < 0) || (readLen > req.len)) {
     
    12781267                    log.log("Requesting again: " + req.serverOffset + "/" + req.len);
    12791268                    sendReadRequest(req.req_id, handle, req.serverOffset, req.len);
    12801269
    1281                     pendingQueue.put(req.req_id, req);
     1270                    pendingReadQueue.put(req.req_id, req);
    12821271                }
    12831272                return readLen;
    12841273            }
     
    12911280    }
    12921281
    12931282    /**
     1283     * A read  is divided into multiple requests sent sequentially before
     1284     * reading any status from the server
     1285     */
     1286    private static class OutstandingStatusRequest {
     1287        int req_id;
     1288    }
     1289
     1290    /**
     1291     * Mapping request ID to request.
     1292     */
     1293    Map<Integer, OutstandingStatusRequest> pendingStatusQueue
     1294            = new HashMap<Integer, OutstandingStatusRequest>();
     1295
     1296    /**
    12941297     * Write bytes to a file. If <code>len</code> &gt; 32768, then the write operation will
    12951298     * be split into multiple writes.
    12961299     *
     
    13041307    public void upload(SFTPv3FileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException {
    13051308        checkHandleValidAndOpen(handle);
    13061309
    1307         while(len > 0) {
    1308             int writeRequestLen = len;
     1310        // Send the next write request
     1311        OutstandingStatusRequest req = new OutstandingStatusRequest();
     1312        req.req_id = generateNextRequestID();
    13091313
    1310             if(writeRequestLen > 32768) {
    1311                 writeRequestLen = 32768;
    1312             }
    1313 
    1314             int req_id = generateNextRequestID();
    1315 
    1316             TypesWriter tw = new TypesWriter();
    1317             tw.writeString(handle.fileHandle, 0, handle.fileHandle.length);
    1318             tw.writeUINT64(fileOffset);
    1319             tw.writeString(src, srcoff, writeRequestLen);
     1314        TypesWriter tw = new TypesWriter();
     1315        tw.writeString(handle.fileHandle, 0, handle.fileHandle.length);
     1316        tw.writeUINT64(fileOffset);
     1317        tw.writeString(src, srcoff, len);
    13201318
    1321             log.log("Sending SSH_FXP_WRITE...");
    1322             sendMessage(Packet.SSH_FXP_WRITE, req_id, tw.getBytes());
     1319        log.log("Sending SSH_FXP_WRITE...");
     1320        sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes());
    13231321
    1324             fileOffset += writeRequestLen;
     1322        pendingStatusQueue.put(req.req_id, req);
    13251323
    1326             srcoff += writeRequestLen;
    1327             len -= writeRequestLen;
     1324        // Only read next status if parallelism reached
     1325        while(pendingStatusQueue.size() >= parallelism) {
     1326            this.readStatus();
     1327        }
     1328    }
    13281329
    1329             byte[] resp = receiveMessage(34000);
     1330    private void readStatus() throws IOException {
     1331        byte[] resp = receiveMessage(34000);
    13301332
    1331             TypesReader tr = new TypesReader(resp);
     1333        TypesReader tr = new TypesReader(resp);
     1334        int type = tr.readByte();
    13321335
    1333             int t = tr.readByte();
    1334 
    1335             int rep_id = tr.readUINT32();
    1336             if(rep_id != req_id) {
    1337                 throw new IOException("The server sent an invalid id field.");
    1338             }
     1336        // Search the pending queue
     1337        OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32());
     1338        if(null == status) {
     1339            throw new IOException("The server sent an invalid id field.");
     1340        }
    13391341
    1340             if(t != Packet.SSH_FXP_STATUS) {
    1341                 throw new IOException("The SFTP server sent an unexpected packet type (" + t + ")");
     1342        // Evaluate the answer
     1343        if(type == Packet.SSH_FXP_STATUS) {
     1344            // In any case, stop sending more packets
     1345            int code = tr.readUINT32();
     1346            if(log.isEnabled()) {
     1347                String[] desc = ErrorCodes.getDescription(code);
     1348                log.log("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")");
    13421349            }
    1343 
    1344             int errorCode = tr.readUINT32();
    1345 
    1346             if(errorCode == ErrorCodes.SSH_FX_OK) {
    1347                 continue;
     1350            if(code == ErrorCodes.SSH_FX_OK) {
     1351                return;
    13481352            }
    1349 
    1350             String errorMessage = tr.readString();
    1351 
    1352             throw new SFTPException(errorMessage, errorCode);
     1353            String msg = tr.readString();
     1354            throw new SFTPException(msg, code);
    13531355        }
     1356        throw new IOException("The SFTP server sent an unexpected packet type (" + type + ")");
    13541357    }
    13551358
    13561359    /**
     
    13601363     * @throws IOException
    13611364     */
    13621365    public void closeFile(SFTPv3FileHandle handle) throws IOException {
    1363         if(handle == null) {
    1364             throw new IllegalArgumentException("the handle argument may not be null");
    1365         }
    13661366        try {
     1367            while(!pendingStatusQueue.isEmpty()) {
     1368                this.readStatus();
     1369            }
    13671370            if(!handle.isClosed) {
    13681371                closeHandle(handle.fileHandle);
    13691372            }
swiss made software