*** xlog.org.c	Sat Jan  1 06:59:30 2005
--- xlog.c	Tue Jan 25 10:05:09 2005
***************
*** 43,48 ****
--- 43,72 ----
  #include "utils/guc.h"
  #include "utils/relcache.h"
  
+ /*-------------------------------------------------------------------------*/
+ 
+ #ifdef O_DIRECT
+ #	define OPEN_DIRECT_FLAG		O_DIRECT
+ #endif
+ 
+ #define XLOG_MULTIPAGE_WRITER_DEBUG
+ 
+ /*-------------------------------------------------------------------------*/
+ 
+ /* O_DIRECT : BEGIN */
+ 
+ /* TODO: Aligment depends on OS and filesystem. */
+ #define O_DIRECT_BUFFER_ALIGN	4096
+ 
+ #ifdef OPEN_DIRECT_FLAG
+ #	define XLOG_EXTRA_BUFFERS		O_DIRECT_BUFFER_ALIGN
+ #	define XLOG_BUFFERS_ALIGN(LEN)	TYPEALIGN(XLOG_EXTRA_BUFFERS, (LEN))
+ #else
+ #	define XLOG_EXTRA_BUFFERS		0
+ #	define XLOG_BUFFERS_ALIGN(LEN)	MAXALIGN(LEN)
+ #endif
+ 
+ /* O_DIRECT : END */
  
  /*
   * This chunk of hackery attempts to determine which file sync methods
***************
*** 465,470 ****
--- 489,559 ----
  static bool read_backup_label(XLogRecPtr *checkPointLoc);
  static void remove_backup_label(void);
  
+ /* BEGIN : XLOG_MULTIPAGE_WRITER */
+ 
+ static struct XLogMultipageData
+ {
+ 	char	*pages;		/* Head of first page */
+ 	int		 size;		/* Total bytes of pages == count(pages) * BLCKSZ */
+ 	int		 offset;	/* Offset in xlog segment file  */
+ } XLogMultipage;
+ 
+ static void XLogMultipageFlush(void)
+ {
+ 	if (!XLogMultipage.pages)
+ 	{	/* No needs to write pages. */
+ 		return;
+ 	}
+ 	
+ 	/* Need to seek in the file? */
+ 	if (openLogOff != XLogMultipage.offset)
+ 	{
+ 		openLogOff = XLogMultipage.offset;
+ 		if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
+ 			ereport(PANIC,
+ 					(errcode_for_file_access(),
+ 					 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
+ 							openLogId, openLogSeg, openLogOff)));
+ 	}
+ 
+ 	/* OK to write the page */
+ 	errno = 0;
+ 	if (write(openLogFile, XLogMultipage.pages, XLogMultipage.size) != XLogMultipage.size)
+ 	{
+ 		/* if write didn't set errno, assume problem is no disk space */
+ 		if (errno == 0)
+ 			errno = ENOSPC;
+ 		ereport(PANIC,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not write to log file %u, segment %u at offset %u: %m",
+ 						openLogId, openLogSeg, openLogOff)));
+ 	}
+ 
+ #ifdef XLOG_MULTIPAGE_WRITER_DEBUG
+ 	elog(LOG, "XLogMultipageFlush writes %d pages.", XLogMultipage.size / BLCKSZ);
+ #endif
+ 
+ 	openLogOff += XLogMultipage.size;
+ 	memset(&XLogMultipage, 0, sizeof(XLogMultipage));
+ }
+ 
+ static void XLogMultipageWrite(char *page, int size, int offset)
+ {
+ 	if (XLogMultipage.pages + XLogMultipage.size == page
+ 		&& XLogMultipage.offset + XLogMultipage.size == offset)
+ 	{	/* Pages are continuous. Append new page. */
+ 		XLogMultipage.size += size;
+ 	}
+ 	else
+ 	{	/* Pages are not continuous. Flush and clear. */
+ 		XLogMultipageFlush();
+ 		XLogMultipage.pages = page;
+ 		XLogMultipage.size = size;
+ 		XLogMultipage.offset = offset;
+ 	}
+ }
+ 
+ /* END : XLOG_MULTIPAGE_WRITER */
  
  /*
   * Insert an XLOG record having the specified RMID and info bytes,
***************
*** 1139,1147 ****
  XLogWrite(XLogwrtRqst WriteRqst)
  {
  	XLogCtlWrite *Write = &XLogCtl->Write;
- 	char	   *from;
  	bool		ispartialpage;
  	bool		use_existent;
  
  	/*
  	 * Update local LogwrtResult (caller probably did this already,
--- 1228,1241 ----
  XLogWrite(XLogwrtRqst WriteRqst)
  {
  	XLogCtlWrite *Write = &XLogCtl->Write;
  	bool		ispartialpage;
  	bool		use_existent;
+ 	int			currentIndex = Write->curridx;
+ 
+ #ifdef XLOG_MULTIPAGE_WRITER_DEBUG
+ 	if (XLogMultipage.pages)
+ 		elog(PANIC, "XLogMultipage.pages not null (%d) : size=%d", __LINE__, XLogMultipage.size);
+ #endif
  
  	/*
  	 * Update local LogwrtResult (caller probably did this already,
***************
*** 1157,1170 ****
  		 * end of the last page that's been initialized by
  		 * AdvanceXLInsertBuffer.
  		 */
! 		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx]))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
! 				 XLogCtl->xlblocks[Write->curridx].xlogid,
! 				 XLogCtl->xlblocks[Write->curridx].xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
! 		LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx];
  		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
  
  		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
--- 1251,1264 ----
  		 * end of the last page that's been initialized by
  		 * AdvanceXLInsertBuffer.
  		 */
! 		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[currentIndex]))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
! 				 XLogCtl->xlblocks[currentIndex].xlogid,
! 				 XLogCtl->xlblocks[currentIndex].xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
! 		LogwrtResult.Write = XLogCtl->xlblocks[currentIndex];
  		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
  
  		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
***************
*** 1172,1177 ****
--- 1266,1272 ----
  			/*
  			 * Switch to new logfile segment.
  			 */
+ 			XLogMultipageFlush();
  			if (openLogFile >= 0)
  			{
  				if (close(openLogFile))
***************
*** 1242,1275 ****
  		{
  			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
  			openLogFile = XLogFileOpen(openLogId, openLogSeg);
- 			openLogOff = 0;
- 		}
- 
- 		/* Need to seek in the file? */
- 		if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
- 		{
- 			openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
- 			if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
- 				ereport(PANIC,
- 						(errcode_for_file_access(),
- 						 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
- 								openLogId, openLogSeg, openLogOff)));
  		}
  
! 		/* OK to write the page */
! 		from = XLogCtl->pages + Write->curridx * BLCKSZ;
! 		errno = 0;
! 		if (write(openLogFile, from, BLCKSZ) != BLCKSZ)
! 		{
! 			/* if write didn't set errno, assume problem is no disk space */
! 			if (errno == 0)
! 				errno = ENOSPC;
! 			ereport(PANIC,
! 					(errcode_for_file_access(),
! 					 errmsg("could not write to log file %u, segment %u at offset %u: %m",
! 							openLogId, openLogSeg, openLogOff)));
! 		}
! 		openLogOff += BLCKSZ;
  
  		/*
  		 * If we just wrote the whole last page of a logfile segment,
--- 1337,1349 ----
  		{
  			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
  			openLogFile = XLogFileOpen(openLogId, openLogSeg);
  		}
  
! 		/* Add a page to buffer */
! 		XLogMultipageWrite(
! 			XLogCtl->pages + currentIndex * BLCKSZ,
! 			BLCKSZ,
! 			(LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize);
  
  		/*
  		 * If we just wrote the whole last page of a logfile segment,
***************
*** 1281,1288 ****
  		 * This is also the right place to notify the Archiver that the
  		 * segment is ready to copy to archival storage.
  		 */
! 		if (openLogOff >= XLogSegSize && !ispartialpage)
  		{
  			issue_xlog_fsync();
  			LogwrtResult.Flush = LogwrtResult.Write;	/* end of current page */
  
--- 1355,1363 ----
  		 * This is also the right place to notify the Archiver that the
  		 * segment is ready to copy to archival storage.
  		 */
! 		if (openLogOff + XLogMultipage.size >= XLogSegSize && !ispartialpage)
  		{
+ 			XLogMultipageFlush();
  			issue_xlog_fsync();
  			LogwrtResult.Flush = LogwrtResult.Write;	/* end of current page */
  
***************
*** 1296,1303 ****
  			LogwrtResult.Write = WriteRqst.Write;
  			break;
  		}
! 		Write->curridx = NextBufIdx(Write->curridx);
  	}
  
  	/*
  	 * If asked to flush, do so
--- 1371,1379 ----
  			LogwrtResult.Write = WriteRqst.Write;
  			break;
  		}
! 		currentIndex = NextBufIdx(currentIndex);
  	}
+ 	XLogMultipageFlush();
  
  	/*
  	 * If asked to flush, do so
***************
*** 1333,1338 ****
--- 1409,1416 ----
  		LogwrtResult.Flush = LogwrtResult.Write;
  	}
  
+ 	Write->curridx = currentIndex;
+ 
  	/*
  	 * Update shared-memory status
  	 *
***************
*** 1354,1359 ****
--- 1432,1442 ----
  	}
  
  	Write->LogwrtResult = LogwrtResult;
+ 
+ #ifdef XLOG_MULTIPAGE_WRITER_DEBUG
+ 	if (XLogMultipage.pages)
+ 		elog(PANIC, "XLogMultipage.pages not null (%d) : size=%d", __LINE__, XLogMultipage.size);
+ #endif
  }
  
  /*
***************
*** 1476,1481 ****
--- 1559,1567 ----
  			 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
  			 record.xlogid, record.xrecoff,
  			 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
+ 
+ 	if (XLogMultipage.pages)
+ 		elog(PANIC, "xlog multipage-write usage error at XLogFlush");
  }
  
  /*
***************
*** 3380,3386 ****
  		XLOGbuffers = MinXLOGbuffers;
  
  	return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
! 		+ BLCKSZ * XLOGbuffers +
  		MAXALIGN(sizeof(ControlFileData));
  }
  
--- 3466,3472 ----
  		XLOGbuffers = MinXLOGbuffers;
  
  	return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
! 		+ XLOG_EXTRA_BUFFERS + BLCKSZ * XLOGbuffers +
  		MAXALIGN(sizeof(ControlFileData));
  }
  
***************
*** 3398,3404 ****
  		ShmemInitStruct("XLOG Ctl",
  						MAXALIGN(sizeof(XLogCtlData) +
  								 sizeof(XLogRecPtr) * XLOGbuffers)
! 						+ BLCKSZ * XLOGbuffers,
  						&foundXLog);
  	ControlFile = (ControlFileData *)
  		ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
--- 3484,3490 ----
  		ShmemInitStruct("XLOG Ctl",
  						MAXALIGN(sizeof(XLogCtlData) +
  								 sizeof(XLogRecPtr) * XLOGbuffers)
! 						+ XLOG_EXTRA_BUFFERS + BLCKSZ * XLOGbuffers,
  						&foundXLog);
  	ControlFile = (ControlFileData *)
  		ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
***************
*** 3426,3433 ****
  	 * buffers have worst-case alignment.
  	 */
  	XLogCtl->pages =
! 		((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) +
! 									  sizeof(XLogRecPtr) * XLOGbuffers);
  	memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);
  
  	/*
--- 3512,3519 ----
  	 * buffers have worst-case alignment.
  	 */
  	XLogCtl->pages =
! 		(char*)XLOG_BUFFERS_ALIGN(((char *) XLogCtl)
! 		+ sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers);
  	memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);
  
  	/*
***************
*** 3485,3492 ****
--- 3571,3585 ----
  	/* First timeline ID is always 1 */
  	ThisTimeLineID = 1;
  
+ #if 1
+ 	buffer = (char *) XLOG_BUFFERS_ALIGN(
+ 		malloc(BLCKSZ + XLOG_EXTRA_BUFFERS) );
+ #else
  	/* Use malloc() to ensure buffer is MAXALIGNED */
  	buffer = (char *) malloc(BLCKSZ);
+ #endif
+ 	/* XXX: Does buffer memory-leak? */
+ 
  	page = (XLogPageHeader) buffer;
  	memset(buffer, 0, BLCKSZ);
  
***************
*** 5180,5185 ****
--- 5273,5285 ----
  		new_sync_bit = OPEN_DATASYNC_FLAG;
  	}
  #endif
+ #ifdef OPEN_DIRECT_FLAG
+ 	else if (pg_strcasecmp(method, "open_direct") == 0)
+ 	{
+ 		new_sync_method = SYNC_METHOD_OPEN;
+ 		new_sync_bit = OPEN_DIRECT_FLAG;
+ 	}
+ #endif
  	else
  		return NULL;
  
