#include "pstream.h"
#include <iostream>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <streambuf>
#include <cstring>
#include <stdlib.h>
#include "warning.h"
#include "blop_bookkeeper.h"
namespace blop
{
using namespace std;
void pipehandler(int)
{
cerr<<"Broken pipe (child caught signal)"<<endl;
exit(0);
}
class pipebuf: public std::streambuf
{
private:
int status_;
int child_pid_;
std::string cmd_;
int fd_in_,fd_out_;
int put_buffer();
void fetch_status(int);
public:
bool is_open();
pipebuf *open(const var &name, int mode);
int close();
void setinputbuffer(int size);
void setoutputbuffer(int size);
pipebuf();
~pipebuf();
int sync();
int overflow(int c);
int underflow();
int uflow();
const std::string &cmd() const { return cmd_; }
void kill(int sig=SIGTERM);
};
void pipebuf::kill(int sig)
{
if(child_pid_ <= 0) return;
sync();
::kill(child_pid_,sig);
}
pipebuf::pipebuf()
{
setg(0,0,0);
setp(0,0);
fd_in_ = fd_out_ = child_pid_ = 0;
status_ = 0;
}
pipebuf::~pipebuf()
{
this->close();
delete [] eback();
delete [] pbase();
}
pipebuf *pipebuf::open(const var &cmd, int mode)
{
status_ = 0;
cmd_ = cmd.str();
fd_in_ = fd_out_ = 0;
int fd_read[2] = {0,0};
int fd_write[2] = {0,0};
if(mode & ios::in)
{
if(pipe(fd_read) < 0)
{
std::cerr<<"Error opening a pipe in pipebuf::open"<<endl;
return 0;
}
}
if(mode & ios::out)
{
if(pipe(fd_write) < 0)
{
std::cerr<<"Error opening a pipe in pipebuf::open"<<endl;
return 0;
}
}
if((child_pid_=fork()) < 0)
{
warning::print("Fork error","pipebuf::open");
return 0;
}
if(child_pid_ > 0)
{
if( mode & ios::out )
{
::close(fd_write[0]);
fd_out_ = fd_write[1];
}
if( mode & ios::in )
{
::close(fd_read[1]);
fd_in_ = fd_read[0];
}
return this;
}
else
{
signal(SIGPIPE, pipehandler);
bool ok = true;
if( mode & ios::out )
{
::close(fd_write[1]);
if(fd_write[0] != STDIN_FILENO)
{
if(dup2(fd_write[0],STDIN_FILENO) != STDIN_FILENO)
{
std::cerr<<"dup2 failed in pipebuf::open"<<endl;
ok = false;
}
::close(fd_write[0]);
}
}
if( mode & ios::in )
{
::close(fd_read[0]);
if(fd_read[1] != STDOUT_FILENO)
{
if(dup2(fd_read[1],STDOUT_FILENO) != STDOUT_FILENO)
{
std::cerr<<"dup2 failed in pipebuf::open"<<endl;
ok = false;
}
::close(fd_read[1]);
}
}
if(ok)
{
const char *shell = "/bin/bash";
struct stat buf;
if(stat(shell,&buf) != 0) shell = "/bin/sh";
execl(shell, "sh", "-c", cmd.c_str(), NULL);
cerr<<"execl failed ...."<<endl;
_exit(127);
}
}
return 0;
}
int pipebuf::close()
{
if(fd_in_ > 0 || fd_out_ > 0) sync();
if(fd_out_ > 0)
{
::close(fd_out_);
fd_out_ = 0;
}
if(fd_in_ > 0)
{
::close(fd_in_);
fd_in_ = 0;
}
if(child_pid_ > 0)
{
int wret,status;
while((wret=waitpid(child_pid_, &status, 0)) < 0)
{
if(errno != EINTR)
{
cerr<<"waitpid returned "<<wret<<endl;
return -1;
}
}
fetch_status(status);
child_pid_ = 0;
}
return status_;
}
void pipebuf::fetch_status(int status)
{
if(WIFEXITED(status))
{
status_ = WEXITSTATUS(status);
}
else if(WIFSIGNALED(status))
{
status_ = WTERMSIG(status);
}
else if(WIFSTOPPED(status))
{
status_ = WSTOPSIG(status);
}
else
{
cerr<<"This should not happend"<<endl;
status_ = status;
}
}
bool pipebuf::is_open()
{
if(child_pid_ <= 0) return false;
if(fd_in_<=0 && fd_out_<=0) return false;
int status;
int ret = waitpid(child_pid_,&status,WNOHANG);
if(ret != 0)
{
child_pid_ = 0;
if(fd_in_ > 0) { ::close(fd_in_); fd_in_ = 0; }
if(fd_out_ > 0) { ::close(fd_out_); fd_out_ = 0; }
if(ret > 0) fetch_status(status);
else status_ = -1;
return false;
}
return true;
}
void pipebuf::setinputbuffer(int size)
{
delete [] eback();
if(size > 0)
{
char *ptr = new char[size];
setg(ptr,ptr+size,ptr+size);
}
else setg(0,0,0);
}
void pipebuf::setoutputbuffer(int size)
{
delete [] pbase();
if(size > 0)
{
char *ptr = new char[size];
setp(ptr,ptr+size);
}
else setp(0,0);
}
int pipebuf::sync()
{
if(pbase() != pptr()) put_buffer();
return 0;
}
int pipebuf::overflow(int c)
{
if(pbase() != pptr())
{
if(put_buffer() == EOF) return EOF;
}
if(c != EOF)
{
if(pbase() == epptr())
{
char cc = (char)c;
if(write(fd_out_,&cc,1) != 1)
{
std::cerr<<"Write error in pipebuf::overflow"<<endl;
return EOF;
}
}
else sputc(c);
}
return 0;
}
int pipebuf::put_buffer()
{
if(write(fd_out_, pbase(), pptr() - pbase()) < 0)
{
std::cerr<<"Write error in pipebuf::put_buffer"<<endl;
return EOF;
}
setp(pbase(), epptr());
return 0;
}
int pipebuf::uflow()
{
char c;
if(read(fd_in_,&c,1) <= 0) return EOF;
return c;
}
int pipebuf::underflow()
{
int size = egptr() - eback();
if(size <= 0)
{
cerr<<"pipebuf::underflow called for unbuffered input !!!"<<endl;
return EOF;
}
int new_size = read(fd_in_, eback(), size);
if(new_size == 0)
{
setg(eback(),eback(),eback());
return EOF;
}
if(new_size < 0)
{
setg(0,0,0);
return EOF;
}
setg ( eback(), eback(), eback() + new_size );
return *eback();
}
std::string pstream_base::cmd() const
{
return (buffer_?buffer_->cmd():"");
}
bool pstream_base::is_open() const
{
if(buffer_ == 0) return false;
return buffer_->is_open();
}
void pstream_base::kill(int sig)
{
if(buffer_ == 0) return;
buffer_->kill(sig);
}
ipstream::ipstream(const ipstream &) : istream(0)
{
buffer_ = 0;
}
ipstream::ipstream() : istream(0)
{
buffer_ = new pipebuf;
istream::init(buffer_);
}
ipstream::ipstream(const var &name) : istream(0)
{
buffer_ = new pipebuf;
istream::init(buffer_);
open(name);
}
ipstream &ipstream::open(const var &name)
{
clear();
buffer_->setinputbuffer(1000);
if(buffer_->open(name,ios::in) == 0)
{
setstate(ios::failbit | ios::badbit);
}
return *this;
}
int ipstream::close()
{
return buffer_->close();
}
ipstream::~ipstream()
{
close();
delete buffer_;
}
void ipstream::ibufsize(int size)
{
buffer_->setinputbuffer(size);
}
opstream::opstream(const opstream &) : ostream(0)
{
buffer_ = 0;
}
opstream::opstream() : ostream(0)
{
buffer_ = new pipebuf;
ostream::init(buffer_);
}
opstream::opstream(const var &name) : ostream(0)
{
buffer_ = new pipebuf;
ostream::init(buffer_);
open(name);
}
opstream &opstream::open(const var &name)
{
clear();
buffer_->setoutputbuffer(1000);
if(buffer_->open(name,ios::out) == 0)
{
setstate(ios::failbit | ios::badbit);
}
return *this;
}
int opstream::close()
{
return buffer_->close();
}
opstream::~opstream()
{
close();
delete buffer_;
}
void opstream::obufsize(int size)
{
buffer_->setoutputbuffer(size);
}
iremotestream::iremotestream(const string &cmd, const char *name, ios_base::openmode mode)
: cmd_template_(cmd)
{
iremotestream::open(name,mode);
}
void iremotestream::open(const char *name, ios_base::openmode mode)
{
remote_filename_ = name;
local_filename_ = blop_bookkeeper::tmpfile("IREMOTE-XXXXXX");
char cmd[1000];
sprintf(cmd,cmd_template_.c_str(),remote_filename_.c_str(),local_filename_.c_str());
if(system(cmd) != 0)
{
warning::print(var("Failed to execute ") & cmd);
}
ifstream::open(local_filename_.c_str(),mode);
}
iscpstream::iscpstream(const char *name, ios_base::openmode mode)
: iremotestream("scp %s %s",strstr(name,"scp:
{}
ihttpstream::ihttpstream(const char *name, ios_base::openmode mode)
: iremotestream("wget %s -O %s",strstr(name,"scp:
{}
oremotestream::oremotestream(const string &cmd, const char *name, ios_base::openmode mode)
: cmd_template_(cmd)
{
oremotestream::open(name,mode);
}
void oremotestream::open(const char *name, ios_base::openmode mode)
{
if(mode != ios_base::out)
{
warning::print("At the moment only ios_base::out is supported as the openmode",
"oremotestream::open(const char *,ios_base::openmode)");
}
remote_filename_ = name;
local_filename_ = blop_bookkeeper::tmpfile("OREMOTE-XXXXXX");
ofstream::open(local_filename_.c_str(),mode);
}
void oremotestream::close()
{
ofstream::close();
char cmd[1000];
sprintf(cmd,cmd_template_.c_str(),local_filename_.c_str(),remote_filename_.c_str());
if(system(cmd) != 0)
{
warning::print(var("Failed to execute ") & cmd);
}
}
oremotestream::~oremotestream()
{
oremotestream::close();
}
oscpstream::oscpstream(const char *name, ios_base::openmode mode)
: oremotestream("scp %s %s",strstr(name,"scp:
{}
}