Index: subversion/libsvn_ra_svn/marshal.c =================================================================== --- subversion/libsvn_ra_svn/marshal.c (revision 22233) +++ subversion/libsvn_ra_svn/marshal.c (working copy) @@ -2,7 +2,7 @@ * marshal.c : Marshalling routines for Subversion protocol * * ==================================================================== - * Copyright (c) 2000-2004 CollabNet. All rights reserved. + * Copyright (c) 2000-2006 CollabNet. All rights reserved. * * This software is licensed as described in the file COPYING, which * you should have received as part of this distribution. The terms @@ -51,8 +51,6 @@ assert((sock && !in_file && !out_file) || (!sock && in_file && out_file)); conn->sock = sock; - conn->in_file = in_file; - conn->out_file = out_file; conn->read_ptr = conn->read_buf; conn->read_end = conn->read_buf; conn->write_pos = 0; @@ -60,6 +58,12 @@ conn->block_baton = NULL; conn->capabilities = apr_hash_make(pool); conn->pool = pool; + + if (sock != NULL) + conn->stream = svn_ra_svn__stream_from_sock(sock, pool); + else + conn->stream = svn_ra_svn__stream_from_files(in_file, out_file, pool); + return conn; } @@ -98,36 +102,13 @@ conn->block_handler = handler; conn->block_baton = baton; - if (conn->sock) - apr_socket_timeout_set(conn->sock, interval); - else - apr_file_pipe_timeout_set(conn->out_file, interval); + svn_ra_svn__stream_timeout(conn->stream, interval); } svn_boolean_t svn_ra_svn__input_waiting(svn_ra_svn_conn_t *conn, apr_pool_t *pool) { - apr_pollfd_t pfd; - int n; - - if (conn->sock) - { - pfd.desc_type = APR_POLL_SOCKET; - pfd.desc.s = conn->sock; - } - else - { - pfd.desc_type = APR_POLL_FILE; - pfd.desc.f = conn->in_file; - } - pfd.p = pool; - pfd.reqevents = APR_POLLIN; -#ifndef AS400 - return ((apr_poll(&pfd, 1, &n, 0) == APR_SUCCESS) && n); -#else - /* OS400 requires a pool argument for apr_poll(). */ - return ((apr_poll(&pfd, 1, &n, 0, pool) == APR_SUCCESS) && n); -#endif + return svn_ra_svn__stream_pending(conn->stream); } /* --- WRITE BUFFER MANAGEMENT --- */ @@ -151,19 +132,13 @@ const char *data, apr_size_t len) { const char *end = data + len; - apr_status_t status; apr_size_t count; apr_pool_t *subpool = NULL; while (data < end) { count = end - data; - if (conn->sock) - status = apr_socket_send(conn->sock, data, &count); - else - status = apr_file_write(conn->out_file, data, &count); - if (status) - return svn_error_wrap_apr(status, _("Can't write to connection")); + SVN_ERR(svn_ra_svn__stream_write(conn->stream, data, &count)); if (count == 0) { if (!subpool) @@ -244,19 +219,7 @@ static svn_error_t *readbuf_input(svn_ra_svn_conn_t *conn, char *data, apr_size_t *len) { - apr_status_t status; - - /* Always block for reading. */ - if (conn->sock && conn->block_handler) - apr_socket_timeout_set(conn->sock, -1); - if (conn->sock) - status = apr_socket_recv(conn->sock, data, len); - else - status = apr_file_read(conn->in_file, data, len); - if (conn->sock && conn->block_handler) - apr_socket_timeout_set(conn->sock, 0); - if (status && !APR_STATUS_IS_EOF(status)) - return svn_error_wrap_apr(status, _("Can't read from connection")); + SVN_ERR(svn_ra_svn__stream_read(conn->stream, data, len)); if (*len == 0) return svn_error_create(SVN_ERR_RA_SVN_CONNECTION_CLOSED, NULL, _("Connection closed unexpectedly")); Index: subversion/libsvn_ra_svn/ra_svn.h =================================================================== --- subversion/libsvn_ra_svn/ra_svn.h (revision 22233) +++ subversion/libsvn_ra_svn/ra_svn.h (working copy) @@ -30,6 +30,12 @@ #include #include "svn_ra_svn.h" +typedef svn_boolean_t (*ra_svn_pending_fn_t)(void *baton); + +typedef void (*ra_svn_timeout_fn_t)(void *baton, apr_interval_time_t timeout); + +typedef struct svn_ra_svn__stream_st svn_ra_svn__stream_t; + /* Handler for blocked writes. */ typedef svn_error_t *(*ra_svn_block_handler_t)(svn_ra_svn_conn_t *conn, apr_pool_t *pool, @@ -42,10 +48,13 @@ /* This structure is opaque to the server. The client pokes at the * first few fields during setup and cleanup. */ struct svn_ra_svn_conn_st { - apr_socket_t *sock; /* NULL if using in_file/out_file */ - apr_file_t *in_file; - apr_file_t *out_file; - apr_proc_t *proc; /* Used by client.c when sock is NULL */ + svn_ra_svn__stream_t *stream; +#ifdef SVN_HAVE_SASL + /* Although all reads and writes go through the svn_ra_svn__stream_t + interface, SASL still needs direct access to the underlying socket + for stuff like IP addresses and port numbers. */ + apr_socket_t *sock; +#endif char read_buf[SVN_RA_SVN__READBUF_SIZE]; char *read_ptr; char *read_end; @@ -110,6 +119,35 @@ svn_error_t *svn_ra_svn__handle_failure_status(apr_array_header_t *params, apr_pool_t *pool); +/* Returns a stream that reads/writes from/to SOCK. */ +svn_ra_svn__stream_t *svn_ra_svn__stream_from_sock(apr_socket_t *sock, + apr_pool_t *pool); + +/* Returns a stream that reads from IN_FILE and writes to OUT_FILE. */ +svn_ra_svn__stream_t *svn_ra_svn__stream_from_files(apr_file_t *in_file, + apr_file_t *out_file, + apr_pool_t *pool); + +svn_ra_svn__stream_t *svn_ra_svn__stream_create(void *baton, + svn_read_fn_t read_cb, + svn_write_fn_t write_cb, + ra_svn_timeout_fn_t timeout_cb, + ra_svn_pending_fn_t pending_cb, + apr_pool_t *pool); + +svn_error_t *svn_ra_svn__stream_write(svn_ra_svn__stream_t *stream, + const char *data, apr_size_t *len); + +svn_error_t *svn_ra_svn__stream_read(svn_ra_svn__stream_t *stream, + char *data, apr_size_t *len); + +/* Set the timeout for operations on STREAM to INTERVAL. */ +void svn_ra_svn__stream_timeout(svn_ra_svn__stream_t *stream, + apr_interval_time_t interval); + +/* Return whether or not there is data pending on STREAM. */ +svn_boolean_t svn_ra_svn__stream_pending(svn_ra_svn__stream_t *stream); + /* Respond to an auth request and perform authentication. REALM may * be NULL for the initial authentication exchange of protocol version * 1. */ Index: subversion/libsvn_ra_svn/streams.c =================================================================== --- subversion/libsvn_ra_svn/streams.c (revision 0) +++ subversion/libsvn_ra_svn/streams.c (revision 0) @@ -0,0 +1,238 @@ +/* + * streams.c : stream encapsulation routines for the ra_svn protocol + * + * ==================================================================== + * Copyright (c) 2000-2006 CollabNet. All rights reserved. + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at http://subversion.tigris.org/license-1.html. + * If newer versions of this license are posted there, you may use a + * newer version instead, at your option. + * + * This software consists of voluntary contributions made by many + * individuals. For exact contribution history, see the revision + * history and logs, available at http://subversion.tigris.org/. + * ==================================================================== + */ + + + +#include +#include +#include + +#include "svn_types.h" +#include "svn_error.h" +#include "svn_pools.h" +#include "svn_io.h" +#include "svn_private_config.h" + +#include "ra_svn.h" + +struct svn_ra_svn__stream_st { + svn_stream_t *stream; + void *baton; + ra_svn_pending_fn_t pending_fn; + ra_svn_timeout_fn_t timeout_fn; +}; + +typedef struct { + apr_socket_t *sock; + apr_pool_t *pool; +} sock_baton_t; + +typedef struct { + apr_file_t *in_file; + apr_file_t *out_file; + apr_pool_t *pool; +} file_baton_t; + +static svn_boolean_t pending(apr_pollfd_t *pfd, apr_pool_t *pool) +{ + apr_status_t status; + int n; + + pfd->p = pool; + pfd->reqevents = APR_POLLIN; +#ifdef AS400 + status = apr_poll(pfd, 1, &n, 0, pool); +#else + status = apr_poll(pfd, 1, &n, 0); +#endif + return (status == APR_SUCCESS && n); +} + +static svn_error_t * +file_read_cb(void *baton, char *buffer, apr_size_t *len) +{ + file_baton_t *b = baton; + apr_status_t status = apr_file_read(b->in_file, buffer, len); + + if (status && !APR_STATUS_IS_EOF(status)) + return svn_error_wrap_apr(status, _("Can't read from connection")); + if (*len == 0) + return svn_error_create(SVN_ERR_RA_SVN_CONNECTION_CLOSED, NULL, + _("Connection closed unexpectedly")); + return SVN_NO_ERROR; +} + +static svn_error_t * +file_write_cb(void *baton, const char *buffer, apr_size_t *len) +{ + file_baton_t *b = baton; + apr_status_t status = apr_file_write(b->out_file, buffer, len); + if (status) + return svn_error_wrap_apr(status, _("Can't write to connection")); + return SVN_NO_ERROR; +} + +static void +file_timeout_cb(void *baton, apr_interval_time_t interval) +{ + file_baton_t *b = baton; + apr_file_pipe_timeout_set(b->out_file, interval); +} + +static svn_boolean_t +file_pending_cb(void *baton) +{ + file_baton_t *b = baton; + apr_pollfd_t pfd; + + pfd.desc_type = APR_POLL_FILE; + pfd.desc.f = b->in_file; + + return pending(&pfd, b->pool); +} + +svn_ra_svn__stream_t * +svn_ra_svn__stream_from_files(apr_file_t *in_file, + apr_file_t *out_file, + apr_pool_t *pool) +{ + file_baton_t *b = apr_palloc(pool, sizeof(*b)); + + b->in_file = in_file; + b->out_file = out_file; + b->pool = pool; + + return svn_ra_svn__stream_create(b, file_read_cb, file_write_cb, + file_timeout_cb, file_pending_cb, + pool); +} + +static svn_error_t * +sock_read_cb(void *baton, char *buffer, apr_size_t *len) +{ + sock_baton_t *b = baton; + apr_status_t status; + apr_interval_time_t interval; + + status = apr_socket_timeout_get(b->sock, &interval); + if (status) + return svn_error_wrap_apr(status, _("Can't get socket timeout")); + + /* Always block on read. */ + apr_socket_timeout_set(b->sock, -1); + status = apr_socket_recv(b->sock, buffer, len); + apr_socket_timeout_set(b->sock, interval); + + if (status && !APR_STATUS_IS_EOF(status)) + return svn_error_wrap_apr(status, _("Can't read from connection")); + if (*len == 0) + return svn_error_create(SVN_ERR_RA_SVN_CONNECTION_CLOSED, NULL, + _("Connection closed unexpectedly")); + return SVN_NO_ERROR; +} + +static svn_error_t * +sock_write_cb(void *baton, const char *buffer, apr_size_t *len) +{ + sock_baton_t *b = baton; + apr_status_t status = apr_socket_send(b->sock, buffer, len); + if (status) + return svn_error_wrap_apr(status, _("Can't write to connection")); + return SVN_NO_ERROR; +} + +static void +sock_timeout_cb(void *baton, apr_interval_time_t interval) +{ + sock_baton_t *b = baton; + apr_socket_timeout_set(b->sock, interval); +} + +static svn_boolean_t +sock_pending_cb(void *baton) +{ + sock_baton_t *b = baton; + apr_pollfd_t pfd; + + pfd.desc_type = APR_POLL_SOCKET; + pfd.desc.s = b->sock; + + return pending(&pfd, b->pool); +} + +svn_ra_svn__stream_t * +svn_ra_svn__stream_from_sock(apr_socket_t *sock, + apr_pool_t *pool) +{ + sock_baton_t *b = apr_palloc(pool, sizeof(*b)); + + b->sock = sock; + b->pool = pool; + + return svn_ra_svn__stream_create(b, sock_read_cb, sock_write_cb, + sock_timeout_cb, sock_pending_cb, + pool); +} + +svn_ra_svn__stream_t * +svn_ra_svn__stream_create(void *baton, + svn_read_fn_t read_cb, + svn_write_fn_t write_cb, + ra_svn_timeout_fn_t timeout_cb, + ra_svn_pending_fn_t pending_cb, + apr_pool_t *pool) +{ + svn_ra_svn__stream_t *s = apr_palloc(pool, sizeof(*s)); + s->stream = svn_stream_empty(pool); + svn_stream_set_baton(s->stream, baton); + if (read_cb) + svn_stream_set_read(s->stream, read_cb); + if (write_cb) + svn_stream_set_write(s->stream, write_cb); + s->baton = baton; + s->timeout_fn = timeout_cb; + s->pending_fn = pending_cb; + return s; +} + +svn_error_t * +svn_ra_svn__stream_write(svn_ra_svn__stream_t *stream, + const char *data, apr_size_t *len) +{ + return svn_stream_write(stream->stream, data, len); +} + +svn_error_t * +svn_ra_svn__stream_read(svn_ra_svn__stream_t *stream, char *data, + apr_size_t *len) +{ + return svn_stream_read(stream->stream, data, len); +} + +void +svn_ra_svn__stream_timeout(svn_ra_svn__stream_t *stream, + apr_interval_time_t interval) +{ + stream->timeout_fn(stream->baton, interval); +} + +svn_boolean_t +svn_ra_svn__stream_pending(svn_ra_svn__stream_t *stream) +{ + return stream->pending_fn(stream->baton); +}