[svn.haxx.se] · SVN Dev · SVN Users · SVN Org · TSVN Dev · TSVN Users · Subclipse Dev · Subclipse Users · this month's index

Improve svnsync performance over ra_serf.

From: Lieven Govaerts <lgo_at_mobsol.be>
Date: 2007-10-31 00:04:03 CET

Attached patch is work-in-progress to - hopefully drastically - improve
the performance of 'svnsync sync' over ra_serf.

Problem with 'svnsync sync' right now is that it's a serial process,
with lots of waiting on both the master and slave server. Basically it
comes down to this:
-> set 'currently-copying' revprop on rev 0 of the slave repository
   .. wait on slave server response ..
-> send replay report request for rev. N to master
   .. wait on master server response ..
-> parse replay report and drive editor
-> commit rev. N on the slave repository
   .. wait on slave server response ..
-> read all revprops from the master
   .. wait on master server response ..
-> add all revprops to the slave repository
   .. wait on slave server response ..
-> reset 'currently-copying' revprop
   .. wait on slave server response ..
-> ... start all over for rev. N+1.

While there's little option to reduce these waiting times for ra_neon,
ra_serf has http pipelining support, which comes in handy. What I'd like
to do is this:
-> send request to get all revprops from rev. N from the master
-> send replay report request for rev. N to master
-> send request to get all revprops from rev. N+1 from the master
-> send replay report request for rev. N+1 to master
-> read the revprops for rev. N
-> while opening the editor, add revprops to it.
-> parse replay report for rev. N and drive editor
-> commit rev. N & revprops on the slave repository
   .. wait on slave server response ..
-> back to step 1.

Attached patch will do a part of this. It already eliminates most of the
time spent waiting on the master server for the replay report, by
sending multiple replay requests over the pipeline while still parsing
them one by one. The other parts of above flow aren't changed.

The patch adds a new ra api function, svn_ra_replay_range, which takes a
range of revisions and two callback functions. The first callback
function (report start) is used to create the editor, the second
callback function (report finished) will close the editor and copy all
rev props. With this patch I had to create a second connection to copy
the revprops, I hope to eliminate this 2nd connection again in a later
stage.

Some small tests show that the patch improves svnsync performance with
about 15% (with a relatively fast network connection to the master
server). While it passes the svnsync regression tests over the 4 ra
layers, I've encountered some issues while testing it with different
repositories, so it's not ready to commit.

Review of the above approach and the patch is welcome, specifically I'd
like to know if I'm not breaking any rules concerning atomic behavior,
race conditions etc.

Lieven

Incomplete log message:
[[[

* subversion/libsvn_ra_serf/replay.c
  (svn_ra_serf__replay_range): new function, sends requests to replay a
range
   of revisions and then calls the callbacks while parsing the response.
  (start_replay): calls the revstart callback when encountering the report's
   opening tag.
  (end_replay): calls the revfinish callback when encountering the report's
   closing tag.
  (create_replay_body): share the body creation code between
   svn_ra_serf__replay and svn_ra_serf__replay_range.
  (svn_ra_serf__replay): use the new create_replay_body delegate.

* subversion/svnsync/main.c
  (open_source_session): create a second session to the master repo to
download
   the revprops.
  (replay_baton_t): new baton, passed around between ra_serf and replay
   callback functions.
  (replay_rev_started): replay callback function, opens an editor.
  (replay_rev_finished): replay callback function, closes the editor and
copies
   revprops from master to slave.
  (do_synchronize): use the new api svn_ra_replay_range.
]]]

Index: subversion/include/svn_ra.h
===================================================================
--- subversion/include/svn_ra.h (revision 27493)
+++ subversion/include/svn_ra.h (working copy)
@@ -183,6 +183,34 @@
                                               void *baton,
                                               apr_pool_t *pool);
 
+/**
+ * TODO: finish comment!
+ *
+ * Callback function type replay actions.
+ *
+ * @since New in 1.5.
+ */
+typedef svn_error_t *(*svn_ra_replay_revstart_callback_t)
+ (svn_revnum_t current,
+ void *replay_baton,
+ const svn_delta_editor_t **editor,
+ void **edit_baton,
+ apr_pool_t *pool);
+
+ /**
+ * TODO: finish comment!
+ *
+ * Callback function type replay actions.
+ *
+ * @since New in 1.5.
+ */
+typedef svn_error_t *(*svn_ra_replay_revfinish_callback_t)
+ (svn_revnum_t current,
+ void *replay_baton,
+ const svn_delta_editor_t *editor,
+ void *edit_baton,
+ apr_pool_t *pool);
+
 
 /**
  * The update Reporter.
@@ -1487,6 +1515,8 @@
 
 
 /**
+ * TODO: update comment (include start/end revision & callbacks)
+ *
  * Replay the changes from @a revision through @a editor and @a edit_baton.
  *
  * Changes will be limited to those that occur under @a session's URL, and
@@ -1503,6 +1533,36 @@
  *
  * @a pool is used for all allocation.
  *
+ * @since New in 1.5.
+ */
+svn_error_t *
+svn_ra_replay_range(svn_ra_session_t *session,
+ svn_revnum_t start_revision,
+ svn_revnum_t end_revision,
+ svn_revnum_t low_water_mark,
+ svn_boolean_t send_deltas,
+ svn_ra_replay_revstart_callback_t revstart_func,
+ svn_ra_replay_revfinish_callback_t revfinish_func,
+ void *replay_baton,
+ apr_pool_t *pool);
+
+/**
+ * Replay the changes from @a revision through @a editor and @a edit_baton.
+ *
+ * Changes will be limited to those that occur under @a session's URL, and
+ * the server will assume that the client has no knowledge of revisions
+ * prior to @a low_water_mark. These two limiting factors define the portion
+ * of the tree that the server will assume the client already has knowledge of,
+ * and thus any copies of data from outside that part of the tree will be
+ * sent in their entirety, not as simple copies or deltas against a previous
+ * version.
+ *
+ * If @a send_deltas is @c TRUE, the actual text and property changes in
+ * the revision will be sent, otherwise dummy text deltas and null property
+ * changes will be sent instead.
+ *
+ * @a pool is used for all allocation.
+ *
  * @since New in 1.4.
  */
 svn_error_t *svn_ra_replay(svn_ra_session_t *session,
Index: subversion/libsvn_ra/ra_loader.c
===================================================================
--- subversion/libsvn_ra/ra_loader.c (revision 27493)
+++ subversion/libsvn_ra/ra_loader.c (working copy)
@@ -1060,10 +1060,28 @@
                            void *edit_baton,
                            apr_pool_t *pool)
 {
- return session->vtable->replay(session, revision, low_water_mark,
- text_deltas, editor, edit_baton, pool);
+ return session->vtable->replay(session, revision, low_water_mark,
+ text_deltas, editor, edit_baton,
+ pool);
 }
 
+svn_error_t *
+svn_ra_replay_range(svn_ra_session_t *session,
+ svn_revnum_t start_revision,
+ svn_revnum_t end_revision,
+ svn_revnum_t low_water_mark,
+ svn_boolean_t text_deltas,
+ svn_ra_replay_revstart_callback_t revstart_func,
+ svn_ra_replay_revfinish_callback_t revfinish_func,
+ void *replay_baton,
+ apr_pool_t *pool)
+{
+ return session->vtable->replay_range(session, start_revision, end_revision,
+ low_water_mark, text_deltas,
+ revstart_func, revfinish_func,
+ replay_baton, pool);
+}
+
 svn_error_t *svn_ra_has_capability(svn_ra_session_t *session,
                                    svn_boolean_t *has,
                                    const char *capability,
Index: subversion/libsvn_ra/ra_loader.h
===================================================================
--- subversion/libsvn_ra/ra_loader.h (revision 27493)
+++ subversion/libsvn_ra/ra_loader.h (working copy)
@@ -228,7 +228,7 @@
   svn_error_t *(*replay)(svn_ra_session_t *session,
                          svn_revnum_t revision,
                          svn_revnum_t low_water_mark,
- svn_boolean_t text_deltas,
+ svn_boolean_t send_deltas,
                          const svn_delta_editor_t *editor,
                          void *edit_baton,
                          apr_pool_t *pool);
@@ -236,6 +236,16 @@
                                  svn_boolean_t *has,
                                  const char *capability,
                                  apr_pool_t *pool);
+ svn_error_t *
+ (*replay_range)(svn_ra_session_t *session,
+ svn_revnum_t start_revision,
+ svn_revnum_t end_revision,
+ svn_revnum_t low_water_mark,
+ svn_boolean_t text_deltas,
+ svn_ra_replay_revstart_callback_t revstart_func,
+ svn_ra_replay_revfinish_callback_t revfinish_func,
+ void *replay_baton,
+ apr_pool_t *pool);
 } svn_ra__vtable_t;
 
 /* The RA session object. */
Index: subversion/libsvn_ra_local/ra_plugin.c
===================================================================
--- subversion/libsvn_ra_local/ra_plugin.c (revision 27493)
+++ subversion/libsvn_ra_local/ra_plugin.c (working copy)
@@ -1415,6 +1415,50 @@
 }
 
 
+svn_error_t *
+svn_ra_local__replay_range(svn_ra_session_t *session,
+ svn_revnum_t start_revision,
+ svn_revnum_t end_revision,
+ svn_revnum_t low_water_mark,
+ svn_boolean_t send_deltas,
+ svn_ra_replay_revstart_callback_t revstart_func,
+ svn_ra_replay_revfinish_callback_t revfinish_func,
+ void *replay_baton,
+ apr_pool_t *pool)
+{
+ svn_revnum_t rev;
+ apr_pool_t *subpool = svn_pool_create(pool);
+
+ for (rev = start_revision ; rev <= end_revision ; rev++)
+ {
+ svn_ra_local__session_baton_t *sess = session->priv;
+ svn_fs_root_t *root;
+ svn_delta_editor_t *editor;
+ void *edit_baton;
+
+ svn_pool_clear(subpool);
+
+ SVN_ERR(revstart_func(rev, replay_baton,
+ &editor, &edit_baton,
+ subpool));
+
+ SVN_ERR(svn_fs_revision_root(&root, svn_repos_fs(sess->repos),
+ rev, subpool));
+
+ SVN_ERR(svn_repos_replay2(root, sess->fs_path->data, low_water_mark,
+ send_deltas, editor, edit_baton,
+ NULL, NULL, subpool));
+
+ SVN_ERR(revfinish_func(rev, replay_baton,
+ editor, edit_baton,
+ subpool));
+ }
+ svn_pool_destroy(subpool);
+
+ return SVN_NO_ERROR;
+}
+
+
 static svn_error_t *
 svn_ra_local__has_capability(svn_ra_session_t *session,
                              svn_boolean_t *has,
@@ -1479,7 +1523,8 @@
   svn_ra_local__get_lock,
   svn_ra_local__get_locks,
   svn_ra_local__replay,
- svn_ra_local__has_capability
+ svn_ra_local__has_capability,
+ svn_ra_local__replay_range
 };
 
 
Index: subversion/libsvn_ra_neon/ra_neon.h
===================================================================
--- subversion/libsvn_ra_neon/ra_neon.h (revision 27493)
+++ subversion/libsvn_ra_neon/ra_neon.h (working copy)
@@ -1012,6 +1012,19 @@
                     apr_pool_t *pool);
 
 /*
+ * Implements the replay_range RA layer function. */
+svn_error_t *
+svn_ra_neon__replay_range(svn_ra_session_t *session,
+ svn_revnum_t start_revision,
+ svn_revnum_t end_revision,
+ svn_revnum_t low_water_mark,
+ svn_boolean_t send_deltas,
+ svn_ra_replay_revstart_callback_t revstart_func,
+ svn_ra_replay_revfinish_callback_t revfinish_func,
+ void *replay_baton,
+ apr_pool_t *pool);
+
+/*
  * Implements the has_capability RA layer function. */
 svn_error_t *
 svn_ra_neon__has_capability(svn_ra_session_t *session,
Index: subversion/libsvn_ra_neon/replay.c
===================================================================
--- subversion/libsvn_ra_neon/replay.c (revision 27493)
+++ subversion/libsvn_ra_neon/replay.c (working copy)
@@ -486,3 +486,67 @@
                                      FALSE, /* spool response */
                                      pool);
 }
+
+
+svn_error_t *
+svn_ra_neon__replay_range(svn_ra_session_t *session,
+ svn_revnum_t start_revision,
+ svn_revnum_t end_revision,
+ svn_revnum_t low_water_mark,
+ svn_boolean_t send_deltas,
+ svn_ra_replay_revstart_callback_t revstart_func,
+ svn_ra_replay_revfinish_callback_t revfinish_func,
+ void *replay_baton,
+ apr_pool_t *pool)
+{
+ svn_ra_neon__session_t *ras = session->priv;
+ replay_baton_t rb;
+ svn_revnum_t rev;
+ apr_pool_t *subpool = svn_pool_create(pool);
+
+ for (rev = start_revision ; rev <= end_revision ; rev++)
+ {
+ const char *body;
+
+ svn_pool_clear(subpool);
+
+ body = apr_psprintf(subpool,
+ "<S:replay-report xmlns:S=\"svn:\">\n"
+ " <S:revision>%ld</S:revision>\n"
+ " <S:low-water-mark>%ld</S:low-water-mark>\n"
+ " <S:send-deltas>%d</S:send-deltas>\n"
+ "</S:replay-report>",
+ rev, low_water_mark, send_deltas);
+
+
+ memset(&rb, 0, sizeof(rb));
+
+ rb.pool = subpool;
+ rb.dirs = apr_array_make(subpool, 5, sizeof(dir_item_t));
+ rb.prop_pool = svn_pool_create(subpool);
+ rb.prop_accum = svn_stringbuf_create("", rb.prop_pool);
+
+ SVN_ERR(revstart_func(rev, replay_baton,
+ &rb.editor, &rb.edit_baton,
+ subpool));
+
+ SVN_ERR(svn_ra_neon__parsed_request(ras, "REPORT", ras->url->data, body,
+ NULL, NULL,
+ start_element,
+ cdata_handler,
+ end_element,
+ &rb,
+ NULL, /* extra headers */
+ NULL, /* status code */
+ FALSE, /* spool response */
+ subpool));
+
+ SVN_ERR(revfinish_func(rev, replay_baton,
+ rb.editor, rb.edit_baton,
+ subpool));
+ }
+
+ svn_pool_destroy(subpool);
+
+ return SVN_NO_ERROR;
+}
Index: subversion/libsvn_ra_neon/session.c
===================================================================
--- subversion/libsvn_ra_neon/session.c (revision 27493)
+++ subversion/libsvn_ra_neon/session.c (working copy)
@@ -1142,7 +1142,8 @@
   svn_ra_neon__get_lock,
   svn_ra_neon__get_locks,
   svn_ra_neon__replay,
- svn_ra_neon__has_capability
+ svn_ra_neon__has_capability,
+ svn_ra_neon__replay_range,
 };
 
 svn_error_t *
Index: subversion/libsvn_ra_serf/ra_serf.h
===================================================================
--- subversion/libsvn_ra_serf/ra_serf.h (revision 27493)
+++ subversion/libsvn_ra_serf/ra_serf.h (working copy)
@@ -1112,12 +1112,23 @@
 svn_ra_serf__replay(svn_ra_session_t *ra_session,
                     svn_revnum_t revision,
                     svn_revnum_t low_water_mark,
- svn_boolean_t text_deltas,
+ svn_boolean_t send_deltas,
                     const svn_delta_editor_t *editor,
                     void *edit_baton,
                     apr_pool_t *pool);
 
 svn_error_t *
+svn_ra_serf__replay_range(svn_ra_session_t *ra_session,
+ svn_revnum_t start_revision,
+ svn_revnum_t end_revision,
+ svn_revnum_t low_water_mark,
+ svn_boolean_t send_deltas,
+ svn_ra_replay_revstart_callback_t revstart_func,
+ svn_ra_replay_revfinish_callback_t revfinish_func,
+ void *replay_baton,
+ apr_pool_t *pool);
+
+svn_error_t *
 svn_ra_serf__lock(svn_ra_session_t *ra_session,
                   apr_hash_t *path_revs,
                   const char *comment,
Index: subversion/libsvn_ra_serf/replay.c
===================================================================
--- subversion/libsvn_ra_serf/replay.c (revision 27493)
+++ subversion/libsvn_ra_serf/replay.c (working copy)
@@ -91,9 +91,21 @@
   /* are we done? */
   svn_boolean_t done;
 
+ /* callback to get an editor */
+ svn_ra_replay_revstart_callback_t revstart_func;
+ svn_ra_replay_revfinish_callback_t revfinish_func;
+ void *replay_baton;
+
   /* replay receiver function and baton */
   const svn_delta_editor_t *editor;
   void *editor_baton;
+
+ /* current revision */
+ svn_revnum_t revision;
+
+ /* Information needed to create the replay report body */
+ svn_revnum_t low_water_mark;
+ svn_boolean_t send_deltas;
 } replay_context_t;
 
 
@@ -147,6 +159,9 @@
   if (state == NONE &&
       strcmp(name.name, "editor-report") == 0)
     {
+ SVN_ERR(ctx->revstart_func(ctx->revision, ctx->replay_baton,
+ &ctx->editor, &ctx->editor_baton,
+ ctx->pool));
       push_state(parser, ctx, REPORT);
     }
   else if (state == REPORT &&
@@ -414,6 +429,9 @@
       strcmp(name.name, "editor-report") == 0)
     {
       svn_ra_serf__xml_pop_state(parser);
+ SVN_ERR(ctx->revfinish_func(ctx->revision, ctx->replay_baton,
+ ctx->editor, ctx->editor_baton,
+ ctx->pool));
     }
   else if (state == OPEN_DIR && strcmp(name.name, "open-directory") == 0)
     {
@@ -509,6 +527,52 @@
   return SVN_NO_ERROR;
 }
 
+static serf_bucket_t *
+create_replay_body(void *baton,
+ serf_bucket_alloc_t *alloc,
+ apr_pool_t *pool)
+{
+ replay_context_t *ctx = baton;
+ serf_bucket_t *body_bkt, *tmp;
+
+ body_bkt = serf_bucket_aggregate_create(alloc);
+
+ tmp = SERF_BUCKET_SIMPLE_STRING_LEN("<S:replay-report xmlns:S=\"",
+ sizeof("<S:replay-report xmlns:S=\"")-1,
+ alloc);
+ serf_bucket_aggregate_append(body_bkt, tmp);
+
+ tmp = SERF_BUCKET_SIMPLE_STRING_LEN(SVN_XML_NAMESPACE,
+ sizeof(SVN_XML_NAMESPACE)-1,
+ alloc);
+ serf_bucket_aggregate_append(body_bkt, tmp);
+
+ tmp = SERF_BUCKET_SIMPLE_STRING_LEN("\">",
+ sizeof("\">")-1,
+ alloc);
+ serf_bucket_aggregate_append(body_bkt, tmp);
+
+ svn_ra_serf__add_tag_buckets(body_bkt,
+ "S:revision", apr_ltoa(pool, ctx->revision),
+ alloc);
+ svn_ra_serf__add_tag_buckets(body_bkt,
+ "S:low-water-mark",
+ apr_ltoa(pool, ctx->low_water_mark),
+ alloc);
+
+ svn_ra_serf__add_tag_buckets(body_bkt,
+ "S:send-deltas",
+ apr_ltoa(pool, ctx->send_deltas),
+ alloc);
+
+ tmp = SERF_BUCKET_SIMPLE_STRING_LEN("</S:replay-report>",
+ sizeof("</S:replay-report>")-1,
+ alloc);
+ serf_bucket_aggregate_append(body_bkt, tmp);
+
+ return body_bkt;
+}
+
 svn_error_t *
 svn_ra_serf__replay(svn_ra_session_t *ra_session,
                     svn_revnum_t revision,
@@ -522,54 +586,21 @@
   svn_ra_serf__session_t *session = ra_session->priv;
   svn_ra_serf__handler_t *handler;
   svn_ra_serf__xml_parser_t *parser_ctx;
- serf_bucket_t *buckets, *tmp;
 
   replay_ctx = apr_pcalloc(pool, sizeof(*replay_ctx));
   replay_ctx->pool = pool;
   replay_ctx->editor = editor;
   replay_ctx->editor_baton = edit_baton;
   replay_ctx->done = FALSE;
+ replay_ctx->low_water_mark = low_water_mark;
+ replay_ctx->send_deltas = send_deltas;
 
- buckets = serf_bucket_aggregate_create(session->bkt_alloc);
-
- tmp = SERF_BUCKET_SIMPLE_STRING_LEN("<S:replay-report xmlns:S=\"",
- sizeof("<S:replay-report xmlns:S=\"")-1,
- session->bkt_alloc);
- serf_bucket_aggregate_append(buckets, tmp);
-
- tmp = SERF_BUCKET_SIMPLE_STRING_LEN(SVN_XML_NAMESPACE,
- sizeof(SVN_XML_NAMESPACE)-1,
- session->bkt_alloc);
- serf_bucket_aggregate_append(buckets, tmp);
-
- tmp = SERF_BUCKET_SIMPLE_STRING_LEN("\">",
- sizeof("\">")-1,
- session->bkt_alloc);
- serf_bucket_aggregate_append(buckets, tmp);
-
- svn_ra_serf__add_tag_buckets(buckets,
- "S:revision", apr_ltoa(pool, revision),
- session->bkt_alloc);
- svn_ra_serf__add_tag_buckets(buckets,
- "S:low-water-mark",
- apr_ltoa(pool, low_water_mark),
- session->bkt_alloc);
-
- svn_ra_serf__add_tag_buckets(buckets,
- "S:send-deltas",
- apr_ltoa(pool, send_deltas),
- session->bkt_alloc);
-
- tmp = SERF_BUCKET_SIMPLE_STRING_LEN("</S:replay-report>",
- sizeof("</S:replay-report>")-1,
- session->bkt_alloc);
- serf_bucket_aggregate_append(buckets, tmp);
-
   handler = apr_pcalloc(pool, sizeof(*handler));
 
   handler->method = "REPORT";
   handler->path = session->repos_url_str;
- handler->body_buckets = buckets;
+ handler->body_delegate = create_replay_body;
+ handler->body_delegate_baton = replay_ctx;
   handler->body_type = "text/xml";
   handler->conn = session->conns[0];
   handler->session = session;
@@ -588,7 +619,88 @@
 
   svn_ra_serf__request_create(handler);
 
- SVN_ERR(svn_ra_serf__context_run_wait(&replay_ctx->done, session, pool));
+ return SVN_NO_ERROR;
+}
 
+svn_error_t *
+svn_ra_serf__replay_range(svn_ra_session_t *ra_session,
+ svn_revnum_t start_revision,
+ svn_revnum_t end_revision,
+ svn_revnum_t low_water_mark,
+ svn_boolean_t send_deltas,
+ svn_ra_replay_revstart_callback_t revstart_func,
+ svn_ra_replay_revfinish_callback_t revfinish_func,
+ void *replay_baton,
+ apr_pool_t *pool)
+{
+ replay_context_t *replay_ctx;
+ svn_ra_serf__session_t *session = ra_session->priv;
+ svn_ra_serf__handler_t *handler;
+ svn_ra_serf__xml_parser_t *parser_ctx;
+ svn_revnum_t rev = start_revision;
+
+ while (1)
+ {
+ apr_status_t status;
+
+ /* Send pending requests, if any */
+ if (rev <= end_revision)
+ {
+ replay_ctx = apr_pcalloc(pool, sizeof(*replay_ctx));
+ replay_ctx->pool = pool;
+ replay_ctx->revstart_func = revstart_func;
+ replay_ctx->revfinish_func = revfinish_func;
+ replay_ctx->replay_baton = replay_baton;
+ replay_ctx->done = FALSE;
+ replay_ctx->revision = rev;
+ replay_ctx->low_water_mark = low_water_mark;
+ replay_ctx->send_deltas = send_deltas;
+
+ handler = apr_pcalloc(pool, sizeof(*handler));
+
+ handler->method = "REPORT";
+ handler->path = session->repos_url_str;
+ handler->body_delegate = create_replay_body;
+ handler->body_delegate_baton = replay_ctx;
+ handler->conn = session->conns[0];
+ handler->session = session;
+
+ parser_ctx = apr_pcalloc(pool, sizeof(*parser_ctx));
+
+ parser_ctx->pool = pool;
+ parser_ctx->user_data = replay_ctx;
+ parser_ctx->start = start_replay;
+ parser_ctx->end = end_replay;
+ parser_ctx->cdata = cdata_replay;
+ parser_ctx->done = &replay_ctx->done;
+
+ handler->response_handler = svn_ra_serf__handle_xml_parser;
+ handler->response_baton = parser_ctx;
+
+ svn_ra_serf__request_create(handler);
+
+ rev++;
+ }
+
+ /* Run the serf loop, send outgoing and process incoming requests.
+ Continue doing this if there are enough requests sent or there are no
+ more pending requests. */
+ status = serf_context_run(session->context, SERF_DURATION_FOREVER, pool);
+ if (APR_STATUS_IS_TIMEUP(status))
+ {
+ continue;
+ }
+ if (status)
+ {
+ SVN_ERR(session->pending_error);
+
+ return svn_error_wrap_apr(status,
+ _("Error retrieving replay REPORT (%d)"),
+ status);
+ }
+ if (replay_ctx->done)
+ break;
+ }
+
   return SVN_NO_ERROR;
 }
Index: subversion/libsvn_ra_serf/serf.c
===================================================================
--- subversion/libsvn_ra_serf/serf.c (revision 27493)
+++ subversion/libsvn_ra_serf/serf.c (working copy)
@@ -917,7 +917,8 @@
   svn_ra_serf__get_lock,
   svn_ra_serf__get_locks,
   svn_ra_serf__replay,
- svn_ra_serf__has_capability
+ svn_ra_serf__has_capability,
+ svn_ra_serf__replay_range,
 };
 
 svn_error_t *
Index: subversion/libsvn_ra_svn/client.c
===================================================================
--- subversion/libsvn_ra_svn/client.c (revision 27493)
+++ subversion/libsvn_ra_svn/client.c (working copy)
@@ -2149,6 +2149,54 @@
 }
 
 
+static svn_error_t *
+ra_svn_replay_range(svn_ra_session_t *session,
+ svn_revnum_t start_revision,
+ svn_revnum_t end_revision,
+ svn_revnum_t low_water_mark,
+ svn_boolean_t send_deltas,
+ svn_ra_replay_revstart_callback_t revstart_func,
+ svn_ra_replay_revfinish_callback_t revfinish_func,
+ void *replay_baton,
+ apr_pool_t *pool)
+{
+ svn_ra_svn__session_baton_t *sess = session->priv;
+ svn_revnum_t rev;
+ apr_pool_t *subpool = svn_pool_create(pool);
+
+ for (rev = start_revision ; rev <= end_revision ; rev++)
+ {
+ svn_delta_editor_t *editor;
+ void *edit_baton;
+
+ svn_pool_clear(subpool);
+
+ SVN_ERR(revstart_func(rev, replay_baton,
+ &editor, &edit_baton,
+ subpool));
+
+ SVN_ERR(svn_ra_svn_write_cmd(sess->conn, subpool, "replay", "rrb", rev,
+ low_water_mark, send_deltas));
+
+ SVN_ERR(handle_unsupported_cmd(handle_auth_request(sess, subpool),
+ _("Server doesn't support the replay "
+ "command")));
+
+ SVN_ERR(svn_ra_svn_drive_editor2(sess->conn, subpool, editor, edit_baton,
+ NULL, TRUE));
+
+ SVN_ERR(svn_ra_svn_read_cmd_response(sess->conn, subpool, ""));
+
+ SVN_ERR(revfinish_func(rev, replay_baton,
+ editor, edit_baton,
+ subpool));
+ }
+ svn_pool_destroy(subpool);
+
+ return SVN_NO_ERROR;
+}
+
+
 static svn_error_t *ra_svn_has_capability(svn_ra_session_t *session,
                                           svn_boolean_t *has,
                                           const char *capability,
@@ -2207,7 +2255,8 @@
   ra_svn_get_lock,
   ra_svn_get_locks,
   ra_svn_replay,
- ra_svn_has_capability
+ ra_svn_has_capability,
+ ra_svn_replay_range,
 };
 
 svn_error_t *
Index: subversion/svnsync/main.c
===================================================================
--- subversion/svnsync/main.c (revision 27493)
+++ subversion/svnsync/main.c (working copy)
@@ -966,7 +966,9 @@
 }
 
 
-/* Set *FROM_SESSION to an RA session associated with the source
+/* TODO: fix comment
+ *
+ * Set *FROM_SESSION to an RA session associated with the source
  * repository of the synchronization, as determined by reading
  * svn:sync- properties from the destination repository (associated
  * with TO_SESSION). Set LAST_MERGED_REV to the value of the property
@@ -977,6 +979,7 @@
  */
 static svn_error_t *
 open_source_session(svn_ra_session_t **from_session,
+ svn_ra_session_t **from_rp_session,
                     svn_string_t **last_merged_rev,
                     svn_ra_session_t *to_session,
                     svn_ra_callbacks2_t *callbacks,
@@ -999,12 +1002,21 @@
       (APR_EINVAL, NULL,
        _("Destination repository has not been initialized"));
 
+ /* Open the session to copy the revision data. */
   SVN_ERR(svn_ra_open2(from_session, from_url->data, callbacks, baton,
                        config, pool));
-
   SVN_ERR(check_if_session_is_at_repos_root(*from_session, from_url->data,
                                             pool));
 
+ /* Open the session to copy the revision properties. */
+ if (from_rp_session)
+ {
+ SVN_ERR(svn_ra_open2(from_rp_session, from_url->data, callbacks, baton,
+ config, pool));
+ SVN_ERR(check_if_session_is_at_repos_root(*from_rp_session, from_url->data,
+ pool));
+ }
+
   /* Ok, now sanity check the UUID of the source repository, it
      wouldn't be a good thing to sync from a different repository. */
 
@@ -1019,7 +1031,135 @@
   return SVN_NO_ERROR;
 }
 
+/* Replay baton, used during sychnronization. */
+typedef struct {
+ svn_ra_session_t *from_session;
+ svn_ra_session_t *to_session;
+ svn_ra_session_t *from_rp_session;
+ subcommand_baton_t *sb;
+} replay_baton_t;
 
+/* Return a replay baton allocated from POOL and populated with
+ data from the provided parameters. */
+static replay_baton_t *
+make_replay_baton(svn_ra_session_t *from_session, svn_ra_session_t *to_session,
+ svn_ra_session_t *from_rp_session,
+ subcommand_baton_t *sb, apr_pool_t *pool)
+{
+ replay_baton_t *rb = apr_pcalloc(pool, sizeof(*rb));
+ rb->from_session = from_session;
+ rb->from_rp_session = from_rp_session;
+ rb->to_session = to_session;
+ rb->sb = sb;
+ return rb;
+}
+
+/* Callback function for svn_ra_replay_range, invoked when starting to parse
+ * a replay report.
+ */
+static svn_error_t *
+replay_rev_started(svn_revnum_t revision,
+ void *replay_baton,
+ const svn_delta_editor_t **editor,
+ void **edit_baton,
+ apr_pool_t *pool)
+{
+ const svn_delta_editor_t *commit_editor;
+ const svn_delta_editor_t *cancel_editor;
+ const svn_delta_editor_t *sync_editor;
+ void *commit_baton;
+ void *cancel_baton;
+ void *sync_baton;
+ replay_baton_t *rb = replay_baton;
+
+ /* We set this property so that if we error out for some reason
+ we can later determine where we were in the process of
+ merging a revision. If we had committed the change, but we
+ hadn't finished copying the revprops we need to know that, so
+ we can go back and finish the job before we move on.
+
+ NOTE: We have to set this before we start the commit editor,
+ because ra_svn doesn't let you change rev props during a
+ commit. */
+ SVN_ERR(svn_ra_change_rev_prop(rb->to_session, 0,
+ SVNSYNC_PROP_CURRENTLY_COPYING,
+ svn_string_createf(pool, "%ld",
+ revision),
+ pool));
+
+ /* The actual copy is just a replay hooked up to a commit. */
+
+ SVN_ERR(svn_ra_get_commit_editor2(rb->to_session, &commit_editor,
+ &commit_baton,
+ "", /* empty log */
+ commit_callback, rb->sb,
+ NULL, FALSE, pool));
+
+ /* There's one catch though, the diff shows us props we can't
+ send over the RA interface, so we need an editor that's smart
+ enough to filter those out for us. */
+
+ SVN_ERR(get_sync_editor(commit_editor, commit_baton, revision - 1,
+ rb->sb->to_url, rb->sb->quiet,
+ &sync_editor, &sync_baton, pool));
+
+ SVN_ERR(svn_delta_get_cancellation_editor(check_cancel, NULL,
+ sync_editor, sync_baton,
+ &cancel_editor,
+ &cancel_baton,
+ pool));
+ *editor = cancel_editor;
+ *edit_baton = cancel_baton;
+
+ return SVN_NO_ERROR;
+}
+
+/* Callback function for svn_ra_replay_range, invoked when finishing parsing
+ * a replay report.
+ */
+static svn_error_t *
+replay_rev_finished(svn_revnum_t revision,
+ void *replay_baton,
+ const svn_delta_editor_t *editor,
+ void *edit_baton,
+ apr_pool_t *pool)
+{
+ replay_baton_t *rb = replay_baton;
+
+ SVN_ERR(editor->close_edit(edit_baton, pool));
+
+ /* Sanity check that we actually committed the revision we meant to. */
+ if (rb->sb->committed_rev != revision)
+ return svn_error_createf
+ (APR_EINVAL, NULL,
+ _("Commit created rev %ld but should have created %ld"),
+ rb->sb->committed_rev, revision);
+
+ /* Ok, we're done with the data, now we just need to do the
+ revprops and we're all set. */
+
+ SVN_ERR(copy_revprops(rb->from_rp_session, rb->to_session, revision, TRUE,
+ rb->sb->quiet, pool));
+
+ /* Ok, we're done, bring the last-merged-rev property up to date. */
+
+ SVN_ERR(svn_ra_change_rev_prop
+ (rb->to_session,
+ 0,
+ SVNSYNC_PROP_LAST_MERGED_REV,
+ svn_string_create(apr_psprintf(pool, "%ld", revision),
+ pool),
+ pool));
+
+ /* And finally drop the currently copying prop, since we're done
+ with this revision. */
+
+ SVN_ERR(svn_ra_change_rev_prop(rb->to_session, 0,
+ SVNSYNC_PROP_CURRENTLY_COPYING,
+ NULL, pool));
+ return SVN_NO_ERROR;
+}
+
 /* Synchronize the repository associated with RA session TO_SESSION,
  * using information found in baton B, while the repository is
  * locked. Implements `with_locked_func_t' interface.
@@ -1028,14 +1168,17 @@
 do_synchronize(svn_ra_session_t *to_session, void *b, apr_pool_t *pool)
 {
   svn_string_t *last_merged_rev;
- svn_revnum_t from_latest, current;
+ svn_revnum_t from_latest;
   svn_ra_session_t *from_session;
+ svn_ra_session_t *from_rp_session;
   subcommand_baton_t *baton = b;
- apr_pool_t *subpool;
   svn_string_t *currently_copying;
   svn_revnum_t to_latest, copying, last_merged;
+ svn_revnum_t start_revision, end_revision;
+ replay_baton_t *rb;
 
- SVN_ERR(open_source_session(&from_session, &last_merged_rev, to_session,
+ SVN_ERR(open_source_session(&from_session, &from_rp_session,
+ &last_merged_rev, to_session,
                               &(baton->source_callbacks), baton->config,
                               baton, pool));
 
@@ -1084,7 +1227,7 @@
         {
           if (copying > last_merged)
             {
- SVN_ERR(copy_revprops(from_session, to_session,
+ SVN_ERR(copy_revprops(from_rp_session, to_session,
                                     to_latest, TRUE, baton->quiet, pool));
               last_merged = copying;
               last_merged_rev = svn_string_create
@@ -1121,104 +1264,29 @@
 
   /* Now check to see if there are any revisions to copy. */
 
- SVN_ERR(svn_ra_get_latest_revnum(from_session, &from_latest, pool));
+ SVN_ERR(svn_ra_get_latest_revnum(from_rp_session, &from_latest, pool));
 
   if (from_latest < atol(last_merged_rev->data))
     return SVN_NO_ERROR;
 
- subpool = svn_pool_create(pool);
-
   /* Ok, so there are new revisions, iterate over them copying them
      into the destination repository. */
 
- for (current = atol(last_merged_rev->data) + 1;
- current <= from_latest;
- ++current)
- {
- const svn_delta_editor_t *commit_editor;
- const svn_delta_editor_t *cancel_editor;
- const svn_delta_editor_t *sync_editor;
- void *commit_baton;
- void *cancel_baton;
- void *sync_baton;
+ rb = make_replay_baton(from_session, to_session,
+ from_rp_session,
+ baton, pool);
+
+ start_revision = atol(last_merged_rev->data) + 1;
+ end_revision = from_latest;
+
+ SVN_ERR(check_cancel(NULL));
 
- svn_pool_clear(subpool);
- SVN_ERR(check_cancel(NULL));
+ SVN_ERR(svn_ra_replay_range(from_session, start_revision, end_revision,
+ 0, TRUE,
+ replay_rev_started, replay_rev_finished,
+ rb,
+ pool));
 
- /* We set this property so that if we error out for some reason
- we can later determine where we were in the process of
- merging a revision. If we had committed the change, but we
- hadn't finished copying the revprops we need to know that, so
- we can go back and finish the job before we move on.
-
- NOTE: We have to set this before we start the commit editor,
- because ra_svn doesn't let you change rev props during a
- commit. */
- SVN_ERR(svn_ra_change_rev_prop(to_session, 0,
- SVNSYNC_PROP_CURRENTLY_COPYING,
- svn_string_createf(subpool, "%ld",
- current),
- subpool));
-
- /* The actual copy is just a replay hooked up to a commit. */
-
- SVN_ERR(svn_ra_get_commit_editor2(to_session, &commit_editor,
- &commit_baton,
- "", /* empty log */
- commit_callback, baton,
- NULL, FALSE, subpool));
-
- /* There's one catch though, the diff shows us props we can't
- send over the RA interface, so we need an editor that's smart
- enough to filter those out for us. */
-
- SVN_ERR(get_sync_editor(commit_editor, commit_baton, current - 1,
- baton->to_url, baton->quiet,
- &sync_editor, &sync_baton, subpool));
-
- SVN_ERR(svn_delta_get_cancellation_editor(check_cancel, NULL,
- sync_editor, sync_baton,
- &cancel_editor,
- &cancel_baton,
- subpool));
-
- SVN_ERR(svn_ra_replay(from_session, current, 0, TRUE,
- cancel_editor, cancel_baton, subpool));
-
- SVN_ERR(cancel_editor->close_edit(cancel_baton, subpool));
-
- /* Sanity check that we actually committed the revision we meant to. */
- if (baton->committed_rev != current)
- return svn_error_createf
- (APR_EINVAL, NULL,
- _("Commit created rev %ld but should have created %ld"),
- baton->committed_rev, current);
-
- /* Ok, we're done with the data, now we just need to do the
- revprops and we're all set. */
-
- SVN_ERR(copy_revprops(from_session, to_session, current, TRUE,
- baton->quiet, subpool));
-
- /* Ok, we're done, bring the last-merged-rev property up to date. */
-
- SVN_ERR(svn_ra_change_rev_prop
- (to_session,
- 0,
- SVNSYNC_PROP_LAST_MERGED_REV,
- svn_string_create(apr_psprintf(subpool, "%ld", current),
- subpool),
- subpool));
-
- /* And finally drop the currently copying prop, since we're done
- with this revision. */
-
- SVN_ERR(svn_ra_change_rev_prop(to_session, 0,
- SVNSYNC_PROP_CURRENTLY_COPYING,
- NULL, subpool));
- }
-
- svn_pool_destroy(subpool);
   return SVN_NO_ERROR;
 }
 
@@ -1274,7 +1342,8 @@
   svn_revnum_t i;
   svn_revnum_t step = 1;
 
- SVN_ERR(open_source_session(&from_session, &last_merged_rev, to_session,
+ SVN_ERR(open_source_session(&from_session, NULL, &last_merged_rev,
+ to_session,
                               &(baton->source_callbacks), baton->config,
                               baton, pool));
 

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@subversion.tigris.org
For additional commands, e-mail: dev-help@subversion.tigris.org
Received on Wed Oct 31 00:04:21 2007

This is an archived mail posted to the Subversion Dev mailing list.