Merge branch 'file-monitor'
This commit is contained in:
@@ -4,6 +4,7 @@ import java.util.concurrent.CountDownLatch
|
||||
|
||||
import com.muwire.core.Core
|
||||
import com.muwire.core.MuWireSettings
|
||||
import com.muwire.core.UILoadedEvent
|
||||
import com.muwire.core.connection.ConnectionAttemptStatus
|
||||
import com.muwire.core.connection.ConnectionEvent
|
||||
import com.muwire.core.connection.DisconnectionEvent
|
||||
@@ -83,7 +84,8 @@ class Cli {
|
||||
}
|
||||
core.eventBus.register(AllFilesLoadedEvent.class, fileLoader)
|
||||
core.startServices()
|
||||
|
||||
|
||||
core.eventBus.publish(new UILoadedEvent())
|
||||
println "waiting for files to load"
|
||||
latch.await()
|
||||
// now we begin
|
||||
|
@@ -23,6 +23,7 @@ import com.muwire.core.files.FileSharedEvent
|
||||
import com.muwire.core.files.FileUnsharedEvent
|
||||
import com.muwire.core.files.HasherService
|
||||
import com.muwire.core.files.PersisterService
|
||||
import com.muwire.core.files.DirectoryWatcher
|
||||
import com.muwire.core.hostcache.CacheClient
|
||||
import com.muwire.core.hostcache.HostCache
|
||||
import com.muwire.core.hostcache.HostDiscoveredEvent
|
||||
@@ -70,6 +71,7 @@ public class Core {
|
||||
private final ConnectionEstablisher connectionEstablisher
|
||||
private final HasherService hasherService
|
||||
private final DownloadManager downloadManager
|
||||
private final DirectoryWatcher directoryWatcher
|
||||
|
||||
public Core(MuWireSettings props, File home, String myVersion) {
|
||||
this.home = home
|
||||
@@ -162,6 +164,7 @@ public class Core {
|
||||
|
||||
log.info "initializing persistence service"
|
||||
persisterService = new PersisterService(new File(home, "files.json"), eventBus, 15000, fileManager)
|
||||
eventBus.register(UILoadedEvent.class, persisterService)
|
||||
|
||||
log.info("initializing host cache")
|
||||
File hostStorage = new File(home, "hosts.json")
|
||||
@@ -213,6 +216,9 @@ public class Core {
|
||||
connectionAcceptor = new ConnectionAcceptor(eventBus, connectionManager, props,
|
||||
i2pAcceptor, hostCache, trustService, searchManager, uploadManager, connectionEstablisher)
|
||||
|
||||
log.info("initializing directory watcher")
|
||||
directoryWatcher = new DirectoryWatcher(eventBus, fileManager)
|
||||
eventBus.register(FileSharedEvent.class, directoryWatcher)
|
||||
|
||||
log.info("initializing hasher service")
|
||||
hasherService = new HasherService(new FileHasher(), eventBus, fileManager)
|
||||
@@ -221,9 +227,9 @@ public class Core {
|
||||
|
||||
public void startServices() {
|
||||
hasherService.start()
|
||||
directoryWatcher.start()
|
||||
trustService.start()
|
||||
trustService.waitForLoad()
|
||||
persisterService.start()
|
||||
hostCache.start()
|
||||
connectionManager.start()
|
||||
cacheClient.start()
|
||||
@@ -240,6 +246,8 @@ public class Core {
|
||||
connectionAcceptor.stop()
|
||||
log.info("shutting down connection establisher")
|
||||
connectionEstablisher.stop()
|
||||
log.info("shutting down directory watcher")
|
||||
directoryWatcher.stop()
|
||||
log.info("shutting down connection manager")
|
||||
connectionManager.shutdown()
|
||||
}
|
||||
|
@@ -1,6 +1,11 @@
|
||||
package com.muwire.core
|
||||
|
||||
import java.util.stream.Collectors
|
||||
|
||||
import com.muwire.core.hostcache.CrawlerResponse
|
||||
import com.muwire.core.util.DataUtil
|
||||
|
||||
import net.i2p.data.Base64
|
||||
|
||||
class MuWireSettings {
|
||||
|
||||
@@ -10,10 +15,9 @@ class MuWireSettings {
|
||||
int updateCheckInterval
|
||||
String nickname
|
||||
File downloadLocation
|
||||
String sharedFiles
|
||||
CrawlerResponse crawlerResponse
|
||||
boolean shareDownloadedFiles
|
||||
boolean watchSharedDirectories
|
||||
Set<String> watchedDirectories
|
||||
|
||||
MuWireSettings() {
|
||||
this(new Properties())
|
||||
@@ -26,11 +30,16 @@ class MuWireSettings {
|
||||
nickname = props.getProperty("nickname","MuWireUser")
|
||||
downloadLocation = new File((String)props.getProperty("downloadLocation",
|
||||
System.getProperty("user.home")))
|
||||
sharedFiles = props.getProperty("sharedFiles")
|
||||
downloadRetryInterval = Integer.parseInt(props.getProperty("downloadRetryInterval","15"))
|
||||
updateCheckInterval = Integer.parseInt(props.getProperty("updateCheckInterval","36"))
|
||||
shareDownloadedFiles = Boolean.parseBoolean(props.getProperty("shareDownloadedFiles","true"))
|
||||
watchSharedDirectories = Boolean.parseBoolean(props.getProperty("watchSharedDirectories","true"))
|
||||
|
||||
watchedDirectories = new HashSet<>()
|
||||
if (props.containsKey("watchedDirectories")) {
|
||||
String[] encoded = props.getProperty("watchedDirectories").split(",")
|
||||
encoded.each { watchedDirectories << DataUtil.readi18nString(Base64.decode(it)) }
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void write(OutputStream out) throws IOException {
|
||||
@@ -43,9 +52,14 @@ class MuWireSettings {
|
||||
props.setProperty("downloadRetryInterval", String.valueOf(downloadRetryInterval))
|
||||
props.setProperty("updateCheckInterval", String.valueOf(updateCheckInterval))
|
||||
props.setProperty("shareDownloadedFiles", String.valueOf(shareDownloadedFiles))
|
||||
props.setProperty("watchSharedDirectories", String.valueOf(watchSharedDirectories))
|
||||
if (sharedFiles != null)
|
||||
props.setProperty("sharedFiles", sharedFiles)
|
||||
|
||||
if (!watchedDirectories.isEmpty()) {
|
||||
String encoded = watchedDirectories.stream().
|
||||
map({Base64.encode(DataUtil.encodei18nString(it))}).
|
||||
collect(Collectors.joining(","))
|
||||
props.setProperty("watchedDirectories", encoded)
|
||||
}
|
||||
|
||||
props.store(out, "")
|
||||
}
|
||||
|
||||
|
@@ -1,4 +1,137 @@
|
||||
package com.muwire.core.files
|
||||
|
||||
import java.nio.file.FileSystem
|
||||
import java.nio.file.FileSystems
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import static java.nio.file.StandardWatchEventKinds.*
|
||||
import java.nio.file.WatchEvent
|
||||
import java.nio.file.WatchKey
|
||||
import java.nio.file.WatchService
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import com.muwire.core.EventBus
|
||||
import com.muwire.core.SharedFile
|
||||
|
||||
import groovy.util.logging.Log
|
||||
import net.i2p.util.SystemVersion
|
||||
|
||||
@Log
|
||||
class DirectoryWatcher {
|
||||
|
||||
private static final long WAIT_TIME = 1000
|
||||
|
||||
private static final WatchEvent.Kind[] kinds
|
||||
static {
|
||||
if (SystemVersion.isMac())
|
||||
kinds = [ENTRY_MODIFY, ENTRY_DELETE]
|
||||
else
|
||||
kinds = [ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE]
|
||||
}
|
||||
|
||||
private final EventBus eventBus
|
||||
private final FileManager fileManager
|
||||
private final Thread watcherThread, publisherThread
|
||||
private final Map<File, Long> waitingFiles = new ConcurrentHashMap<>()
|
||||
private WatchService watchService
|
||||
private volatile boolean shutdown
|
||||
|
||||
DirectoryWatcher(EventBus eventBus, FileManager fileManager) {
|
||||
this.eventBus = eventBus
|
||||
this.fileManager = fileManager
|
||||
this.watcherThread = new Thread({watch() } as Runnable, "directory-watcher")
|
||||
watcherThread.setDaemon(true)
|
||||
this.publisherThread = new Thread({publish()} as Runnable, "watched-files-publisher")
|
||||
publisherThread.setDaemon(true)
|
||||
}
|
||||
|
||||
void start() {
|
||||
watchService = FileSystems.getDefault().newWatchService()
|
||||
watcherThread.start()
|
||||
publisherThread.start()
|
||||
}
|
||||
|
||||
void stop() {
|
||||
shutdown = true
|
||||
watcherThread.interrupt()
|
||||
publisherThread.interrupt()
|
||||
watchService.close()
|
||||
}
|
||||
|
||||
void onFileSharedEvent(FileSharedEvent e) {
|
||||
if (!e.file.isDirectory())
|
||||
return
|
||||
Path path = e.file.getCanonicalFile().toPath()
|
||||
path.register(watchService, kinds)
|
||||
|
||||
}
|
||||
|
||||
private void watch() {
|
||||
try {
|
||||
while(!shutdown) {
|
||||
WatchKey key = watchService.take()
|
||||
key.pollEvents().each {
|
||||
switch(it.kind()) {
|
||||
case ENTRY_CREATE: processCreated(key.watchable(), it.context()); break
|
||||
case ENTRY_MODIFY: processModified(key.watchable(), it.context()); break
|
||||
case ENTRY_DELETE: processDeleted(key.watchable(), it.context()); break
|
||||
}
|
||||
}
|
||||
key.reset()
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
if (!shutdown)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void processCreated(Path parent, Path path) {
|
||||
File f= join(parent, path)
|
||||
log.fine("created entry $f")
|
||||
if (f.isDirectory())
|
||||
f.toPath().register(watchService, kinds)
|
||||
}
|
||||
|
||||
private void processModified(Path parent, Path path) {
|
||||
File f = join(parent, path)
|
||||
log.fine("modified entry $f")
|
||||
waitingFiles.put(f, System.currentTimeMillis())
|
||||
}
|
||||
|
||||
private void processDeleted(Path parent, Path path) {
|
||||
File f = join(parent, path)
|
||||
log.fine("deleted entry $f")
|
||||
SharedFile sf = fileManager.fileToSharedFile.get(f)
|
||||
if (sf != null)
|
||||
eventBus.publish(new FileUnsharedEvent(unsharedFile : sf))
|
||||
}
|
||||
|
||||
private static File join(Path parent, Path path) {
|
||||
File parentFile = parent.toFile().getCanonicalFile()
|
||||
new File(parentFile, path.toFile().getName())
|
||||
}
|
||||
|
||||
private void publish() {
|
||||
try {
|
||||
while(!shutdown) {
|
||||
Thread.sleep(WAIT_TIME)
|
||||
long now = System.currentTimeMillis()
|
||||
def published = []
|
||||
waitingFiles.each { file, timestamp ->
|
||||
if (now - timestamp > WAIT_TIME) {
|
||||
log.fine("publishing file $file")
|
||||
eventBus.publish new FileSharedEvent(file : file)
|
||||
published << file
|
||||
}
|
||||
}
|
||||
published.each {
|
||||
waitingFiles.remove(it)
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
if (!shutdown)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -32,7 +32,7 @@ class HasherService {
|
||||
private void process(File f) {
|
||||
f = f.getCanonicalFile()
|
||||
if (f.isDirectory()) {
|
||||
f.listFiles().each {onFileSharedEvent new FileSharedEvent(file: it) }
|
||||
f.listFiles().each {eventBus.publish new FileSharedEvent(file: it) }
|
||||
} else {
|
||||
if (f.length() == 0) {
|
||||
eventBus.publish new FileHashedEvent(error: "Not sharing empty file $f")
|
||||
|
@@ -11,6 +11,7 @@ import com.muwire.core.EventBus
|
||||
import com.muwire.core.InfoHash
|
||||
import com.muwire.core.Service
|
||||
import com.muwire.core.SharedFile
|
||||
import com.muwire.core.UILoadedEvent
|
||||
import com.muwire.core.util.DataUtil
|
||||
|
||||
import groovy.json.JsonOutput
|
||||
@@ -36,14 +37,14 @@ class PersisterService extends Service {
|
||||
timer = new Timer("file persister", true)
|
||||
}
|
||||
|
||||
void start() {
|
||||
timer.schedule({load()} as TimerTask, 1)
|
||||
}
|
||||
|
||||
void stop() {
|
||||
timer.cancel()
|
||||
}
|
||||
|
||||
|
||||
void onUILoadedEvent(UILoadedEvent e) {
|
||||
timer.schedule({load()} as TimerTask, 1)
|
||||
}
|
||||
|
||||
void load() {
|
||||
if (location.exists() && location.isFile()) {
|
||||
def slurper = new JsonSlurper()
|
||||
|
@@ -194,6 +194,13 @@ class MainFrameController {
|
||||
println "unsharing selected files"
|
||||
}
|
||||
|
||||
void saveMuWireSettings() {
|
||||
File f = new File(core.home, "MuWire.properties")
|
||||
f.withOutputStream {
|
||||
core.muOptions.write(it)
|
||||
}
|
||||
}
|
||||
|
||||
void mvcGroupInit(Map<String, String> args) {
|
||||
application.addPropertyChangeListener("core", {e->
|
||||
core = e.getNewValue()
|
||||
|
@@ -1,3 +1,4 @@
|
||||
|
||||
import griffon.core.GriffonApplication
|
||||
import griffon.core.env.Metadata
|
||||
import groovy.util.logging.Log
|
||||
@@ -104,12 +105,6 @@ class Ready extends AbstractLifecycleHandler {
|
||||
it.propertyChange(new PropertyChangeEvent(this, "core", null, core))
|
||||
}
|
||||
|
||||
if (props.sharedFiles != null) {
|
||||
props.sharedFiles.split(",").each {
|
||||
core.eventBus.publish(new FileSharedEvent(file : new File(it)))
|
||||
}
|
||||
}
|
||||
|
||||
core.eventBus.publish(new UILoadedEvent())
|
||||
}
|
||||
}
|
||||
|
@@ -9,6 +9,7 @@ import javax.swing.JTable
|
||||
|
||||
import com.muwire.core.Core
|
||||
import com.muwire.core.InfoHash
|
||||
import com.muwire.core.MuWireSettings
|
||||
import com.muwire.core.Persona
|
||||
import com.muwire.core.connection.ConnectionAttemptStatus
|
||||
import com.muwire.core.connection.ConnectionEvent
|
||||
@@ -19,6 +20,7 @@ import com.muwire.core.files.FileDownloadedEvent
|
||||
import com.muwire.core.files.FileHashedEvent
|
||||
import com.muwire.core.files.FileLoadedEvent
|
||||
import com.muwire.core.files.FileSharedEvent
|
||||
import com.muwire.core.files.FileUnsharedEvent
|
||||
import com.muwire.core.search.QueryEvent
|
||||
import com.muwire.core.search.UIResultBatchEvent
|
||||
import com.muwire.core.search.UIResultEvent
|
||||
@@ -53,6 +55,7 @@ class MainFrameModel {
|
||||
def downloads = []
|
||||
def uploads = []
|
||||
def shared = []
|
||||
def watched = []
|
||||
def connectionList = []
|
||||
def searches = new LinkedList()
|
||||
def trusted = []
|
||||
@@ -131,9 +134,10 @@ class MainFrameModel {
|
||||
core.eventBus.register(QueryEvent.class, this)
|
||||
core.eventBus.register(UpdateAvailableEvent.class, this)
|
||||
core.eventBus.register(FileDownloadedEvent.class, this)
|
||||
core.eventBus.register(FileUnsharedEvent.class, this)
|
||||
|
||||
timer.schedule({
|
||||
int retryInterval = application.context.get("muwire-settings").downloadRetryInterval
|
||||
int retryInterval = core.muOptions.downloadRetryInterval
|
||||
if (retryInterval > 0) {
|
||||
retryInterval *= 60000
|
||||
long now = System.currentTimeMillis()
|
||||
@@ -156,6 +160,10 @@ class MainFrameModel {
|
||||
runInsideUIAsync {
|
||||
trusted.addAll(core.trustService.good.values())
|
||||
distrusted.addAll(core.trustService.bad.values())
|
||||
|
||||
watched.addAll(core.muOptions.watchedDirectories)
|
||||
builder.getVariable("watched-directories-table").model.fireTableDataChanged()
|
||||
watched.each { core.eventBus.publish(new FileSharedEvent(file : new File(it))) }
|
||||
}
|
||||
})
|
||||
|
||||
@@ -236,6 +244,17 @@ class MainFrameModel {
|
||||
}
|
||||
}
|
||||
|
||||
void onFileUnsharedEvent(FileUnsharedEvent e) {
|
||||
InfoHash infohash = e.unsharedFile.infoHash
|
||||
if (!infoHashes.remove(infohash))
|
||||
return
|
||||
runInsideUIAsync {
|
||||
shared.remove(e.unsharedFile)
|
||||
JTable table = builder.getVariable("shared-files-table")
|
||||
table.model.fireTableDataChanged()
|
||||
}
|
||||
}
|
||||
|
||||
void onUploadEvent(UploadEvent e) {
|
||||
runInsideUIAsync {
|
||||
uploads << e.uploader
|
||||
|
@@ -139,16 +139,32 @@ class MainFrameView {
|
||||
panel (constraints: "uploads window"){
|
||||
gridLayout(cols : 1, rows : 2)
|
||||
panel {
|
||||
borderLayout()
|
||||
panel (constraints : BorderLayout.NORTH) {
|
||||
button(text : "Click here to share files", actionPerformed : shareFiles)
|
||||
gridLayout(cols : 2, rows : 1)
|
||||
panel {
|
||||
borderLayout()
|
||||
panel (constraints : BorderLayout.NORTH) {
|
||||
button(text : "Add directories to watch", actionPerformed : watchDirectories)
|
||||
}
|
||||
scrollPane (constraints : BorderLayout.CENTER) {
|
||||
table(id : "watched-directories-table", autoCreateRowSorter: true) {
|
||||
tableModel(list : model.watched) {
|
||||
closureColumn(header: "Watched Directories", type : String, read : { it })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
scrollPane ( constraints : BorderLayout.CENTER) {
|
||||
table(id : "shared-files-table", autoCreateRowSorter: true) {
|
||||
tableModel(list : model.shared) {
|
||||
closureColumn(header : "Name", preferredWidth : 550, type : String, read : {row -> row.file.getAbsolutePath()})
|
||||
closureColumn(header : "Size", preferredWidth : 50, type : Long, read : {row -> row.file.length() })
|
||||
}
|
||||
panel {
|
||||
borderLayout()
|
||||
panel (constraints : BorderLayout.NORTH) {
|
||||
button(text : "Share files", actionPerformed : shareFiles)
|
||||
}
|
||||
scrollPane(constraints : BorderLayout.CENTER) {
|
||||
table(id : "shared-files-table", autoCreateRowSorter: true) {
|
||||
tableModel(list : model.shared) {
|
||||
closureColumn(header : "Name", preferredWidth : 500, type : String, read : {row -> row.file.getAbsolutePath()})
|
||||
closureColumn(header : "Size", preferredWidth : 100, type : Long, read : {row -> row.file.length() })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -466,11 +482,26 @@ class MainFrameView {
|
||||
|
||||
def shareFiles = {
|
||||
def chooser = new JFileChooser()
|
||||
chooser.setDialogTitle("Select file or directory to share")
|
||||
chooser.setFileSelectionMode(JFileChooser.FILES_AND_DIRECTORIES)
|
||||
chooser.setDialogTitle("Select file to share")
|
||||
chooser.setFileSelectionMode(JFileChooser.FILES_ONLY)
|
||||
int rv = chooser.showOpenDialog(null)
|
||||
if (rv == JFileChooser.APPROVE_OPTION) {
|
||||
model.core.eventBus.publish(new FileSharedEvent(file : chooser.getSelectedFile()))
|
||||
}
|
||||
}
|
||||
|
||||
def watchDirectories = {
|
||||
def chooser = new JFileChooser()
|
||||
chooser.setDialogTitle("Select directory to watch")
|
||||
chooser.setFileSelectionMode(JFileChooser.DIRECTORIES_ONLY)
|
||||
int rv = chooser.showOpenDialog(null)
|
||||
if (rv == JFileChooser.APPROVE_OPTION) {
|
||||
File f = chooser.getSelectedFile()
|
||||
model.watched << f.getAbsolutePath()
|
||||
application.context.get("muwire-settings").watchedDirectories << f.getAbsolutePath()
|
||||
mvcGroup.controller.saveMuWireSettings()
|
||||
builder.getVariable("watched-directories-table").model.fireTableDataChanged()
|
||||
model.core.eventBus.publish(new FileSharedEvent(file : f))
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user