From: Suparna Bhattacharya <suparna@in.ibm.com>

Async wait on page support. Implements async versions of lock_page,
wait_on_page_locked, and wait_on_page_writeback which accept a 
wait queue entry as a parameter, and where blocking waits 
converted into retry exits if the wait queue entry specifies
an async callback for AIO.


diff -urp base/include/linux/pagemap.h linux-2.6.2/include/linux/pagemap.h
--- base/include/linux/pagemap.h	2004-02-04 13:50:38.888990288 +0530
+++ linux-2.6.2/include/linux/pagemap.h	2004-02-04 13:51:22.215403672 +0530
@@ -152,17 +152,27 @@ static inline void ___add_to_page_cache(
 extern void FASTCALL(__lock_page(struct page *page));
 extern void FASTCALL(unlock_page(struct page *page));
 
-static inline void lock_page(struct page *page)
+
+extern int FASTCALL(__lock_page_wq(struct page *page, wait_queue_t *wait));
+static inline int lock_page_wq(struct page *page, wait_queue_t *wait)
 {
 	if (TestSetPageLocked(page))
-		__lock_page(page);
+		return __lock_page_wq(page, wait);
+	else
+		return 0;
+}
+
+static inline void lock_page(struct page *page)
+{
+	lock_page_wq(page, NULL);
 }
 	
 /*
  * This is exported only for wait_on_page_locked/wait_on_page_writeback.
  * Never use this directly!
  */
-extern void FASTCALL(wait_on_page_bit(struct page *page, int bit_nr));
+extern int FASTCALL(wait_on_page_bit_wq(struct page *page, int bit_nr,
+	wait_queue_t *wait));
 
 /* 
  * Wait for a page to be unlocked.
@@ -171,19 +181,33 @@ extern void FASTCALL(wait_on_page_bit(st
  * ie with increased "page->count" so that the page won't
  * go away during the wait..
  */
-static inline void wait_on_page_locked(struct page *page)
+static inline int wait_on_page_locked_wq(struct page *page, wait_queue_t *wait)
 {
 	if (PageLocked(page))
-		wait_on_page_bit(page, PG_locked);
+		return wait_on_page_bit_wq(page, PG_locked, wait);
+	return 0;
+}
+
+static inline int wait_on_page_writeback_wq(struct page *page,
+						wait_queue_t *wait)
+{
+	if (PageWriteback(page))
+		return wait_on_page_bit_wq(page, PG_writeback, wait);
+	return 0;
+}
+
+static inline void wait_on_page_locked(struct page *page)
+{
+	wait_on_page_locked_wq(page, NULL);
 }
 
 /* 
  * Wait for a page to complete writeback
  */
+
 static inline void wait_on_page_writeback(struct page *page)
 {
-	if (PageWriteback(page))
-		wait_on_page_bit(page, PG_writeback);
+	wait_on_page_writeback_wq(page, NULL);
 }
 
 extern void end_page_writeback(struct page *page);
diff -urp base/mm/filemap.c linux-2.6.2/mm/filemap.c
--- base/mm/filemap.c	2004-02-04 13:50:18.187137448 +0530
+++ linux-2.6.2/mm/filemap.c	2004-02-04 13:51:22.218403216 +0530
@@ -292,22 +292,42 @@ static wait_queue_head_t *page_waitqueue
 	return &zone->wait_table[hash_ptr(page, zone->wait_table_bits)];
 }
 
-void fastcall wait_on_page_bit(struct page *page, int bit_nr)
+/*
+ * wait for the specified page bit to be cleared
+ * this could be a synchronous wait or could just queue an async
+ * notification callback depending on the wait queue entry parameter
+ *
+ * A NULL wait queue parameter defaults to sync behaviour
+ */
+int fastcall wait_on_page_bit_wq(struct page *page, int bit_nr, wait_queue_t *wait)
 {
 	wait_queue_head_t *waitqueue = page_waitqueue(page);
-	DEFINE_WAIT(wait);
+	DEFINE_WAIT(local_wait);
+
+	if (!wait)
+		wait = &local_wait; /* default to a sync wait entry */
 
 	do {
-		prepare_to_wait(waitqueue, &wait, TASK_UNINTERRUPTIBLE);
+		prepare_to_wait(waitqueue, wait, TASK_UNINTERRUPTIBLE);
 		if (test_bit(bit_nr, &page->flags)) {
 			sync_page(page);
+			if (!is_sync_wait(wait)) {
+				/*
+				 * if we've queued an async wait queue
+				 * callback do not block; just tell the
+				 * caller to return and retry later when
+				 * the callback is notified
+				 */
+				return -EIOCBRETRY;
+			}
 			io_schedule();
 		}
 	} while (test_bit(bit_nr, &page->flags));
-	finish_wait(waitqueue, &wait);
-}
+	finish_wait(waitqueue, wait);
 
-EXPORT_SYMBOL(wait_on_page_bit);
+	return 0;
+}
+EXPORT_SYMBOL(wait_on_page_bit_wq);
 
 /**
  * unlock_page() - unlock a locked page
@@ -317,7 +337,9 @@ EXPORT_SYMBOL(wait_on_page_bit);
  * Unlocks the page and wakes up sleepers in ___wait_on_page_locked().
  * Also wakes sleepers in wait_on_page_writeback() because the wakeup
  * mechananism between PageLocked pages and PageWriteback pages is shared.
- * But that's OK - sleepers in wait_on_page_writeback() just go back to sleep.
+ * But that's OK - sleepers in wait_on_page_writeback() just go back to sleep,
+ * or in case the wakeup notifies async wait queue entries, as in the case
+ * of aio, retries would be triggered and may re-queue their callbacks.
  *
  * The first mb is necessary to safely close the critical section opened by the
  * TestSetPageLocked(), the second mb is necessary to enforce ordering between
@@ -358,26 +380,51 @@ void end_page_writeback(struct page *pag
 EXPORT_SYMBOL(end_page_writeback);
 
 /*
- * Get a lock on the page, assuming we need to sleep to get it.
+ * Get a lock on the page, assuming we need to either sleep to get it
+ * or to queue an async notification callback to try again when its
+ * available.
+ *
+ * A NULL wait queue parameter defaults to sync behaviour. Otherwise
+ * it specifies the wait queue entry to be used for async notification
+ * or waiting.
  *
  * Ugly: running sync_page() in state TASK_UNINTERRUPTIBLE is scary.  If some
  * random driver's requestfn sets TASK_RUNNING, we could busywait.  However
  * chances are that on the second loop, the block layer's plug list is empty,
  * so sync_page() will then return in state TASK_UNINTERRUPTIBLE.
  */
-void fastcall __lock_page(struct page *page)
+int fastcall __lock_page_wq(struct page *page, wait_queue_t *wait)
 {
 	wait_queue_head_t *wqh = page_waitqueue(page);
-	DEFINE_WAIT(wait);
+	DEFINE_WAIT(local_wait);
+
+	if (!wait)
+		wait = &local_wait;
 
 	while (TestSetPageLocked(page)) {
-		prepare_to_wait(wqh, &wait, TASK_UNINTERRUPTIBLE);
+		prepare_to_wait(wqh, wait, TASK_UNINTERRUPTIBLE);
 		if (PageLocked(page)) {
 			sync_page(page);
+			if (!is_sync_wait(wait)) {
+				/*
+				 * if we've queued an async wait queue
+				 * callback do not block; just tell the
+				 * caller to return and retry later when
+				 * the callback is notified
+				 */
+				return -EIOCBRETRY;
+			}
 			io_schedule();
 		}
 	}
-	finish_wait(wqh, &wait);
+	finish_wait(wqh, wait);
+	return 0;
+}
+EXPORT_SYMBOL(__lock_page_wq);
+
+void fastcall __lock_page(struct page *page)
+{
+	__lock_page_wq(page, NULL);
 }
 
 EXPORT_SYMBOL(__lock_page);